You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
3.9 KiB
Lua

--[[lit-meta
name = "creationix/coro-channel"
version = "3.0.3"
homepage = "https://github.com/luvit/lit/blob/master/deps/coro-channel.lua"
description = "An adapter for wrapping uv streams as coro-streams."
tags = {"coro", "adapter"}
license = "MIT"
author = { name = "Tim Caswell" }
]]
-- local p = require('pretty-print').prettyPrint
local function assertResume(thread, ...)
local success, err = coroutine.resume(thread, ...)
if not success then
error(debug.traceback(thread, err), 0)
end
end
local function makeCloser(socket)
local closer = {
read = false,
written = false,
errored = false,
}
local closed = false
local function close()
if closed then return end
closed = true
if not closer.readClosed then
closer.readClosed = true
if closer.onClose then
closer.onClose()
end
end
if not socket:is_closing() then
socket:close()
end
end
closer.close = close
function closer.check()
if closer.errored or (closer.read and closer.written) then
return close()
end
end
return closer
end
local function makeRead(socket, closer)
local paused = true
local queue = {}
local tindex = 0
local dindex = 0
local function dispatch(data)
-- p("<-", data[1])
if tindex > dindex then
local thread = queue[dindex]
queue[dindex] = nil
dindex = dindex + 1
assertResume(thread, unpack(data))
else
queue[dindex] = data
dindex = dindex + 1
if not paused then
paused = true
assert(socket:read_stop())
end
end
end
closer.onClose = function ()
if not closer.read then
closer.read = true
return dispatch {nil, closer.errored}
end
end
local function onRead(err, chunk)
if err then
closer.errored = err
return closer.check()
end
if not chunk then
if closer.read then return end
closer.read = true
dispatch {}
return closer.check()
end
return dispatch {chunk}
end
local function read()
if dindex > tindex then
local data = queue[tindex]
queue[tindex] = nil
tindex = tindex + 1
return unpack(data)
end
if paused then
paused = false
assert(socket:read_start(onRead))
end
queue[tindex] = coroutine.running()
tindex = tindex + 1
return coroutine.yield()
end
-- Auto use wrapper library for backwards compat
return read
end
local function makeWrite(socket, closer)
local function wait()
local thread = coroutine.running()
return function (err)
assertResume(thread, err)
end
end
local function write(chunk)
if closer.written then
return nil, "already shutdown"
end
-- p("->", chunk)
if chunk == nil then
closer.written = true
closer.check()
local success, err = socket:shutdown(wait())
if not success then
return nil, err
end
err = coroutine.yield()
return not err, err
end
local success, err = socket:write(chunk, wait())
if not success then
closer.errored = err
closer.check()
return nil, err
end
err = coroutine.yield()
return not err, err
end
return write
end
local function wrapRead(socket)
local closer = makeCloser(socket)
closer.written = true
return makeRead(socket, closer), closer.close
end
local function wrapWrite(socket)
local closer = makeCloser(socket)
closer.read = true
return makeWrite(socket, closer), closer.close
end
local function wrapStream(socket)
assert(socket
and socket.write
and socket.shutdown
and socket.read_start
and socket.read_stop
and socket.is_closing
and socket.close, "socket does not appear to be a socket/uv_stream_t")
local closer = makeCloser(socket)
return makeRead(socket, closer), makeWrite(socket, closer), closer.close
end
return {
wrapRead = wrapRead,
wrapWrite = wrapWrite,
wrapStream = wrapStream,
}