You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
191 lines
3.9 KiB
Lua
191 lines
3.9 KiB
Lua
--[[lit-meta
|
|
name = "creationix/coro-channel"
|
|
version = "3.0.3"
|
|
homepage = "https://github.com/luvit/lit/blob/master/deps/coro-channel.lua"
|
|
description = "An adapter for wrapping uv streams as coro-streams."
|
|
tags = {"coro", "adapter"}
|
|
license = "MIT"
|
|
author = { name = "Tim Caswell" }
|
|
]]
|
|
|
|
-- local p = require('pretty-print').prettyPrint
|
|
|
|
local function assertResume(thread, ...)
|
|
local success, err = coroutine.resume(thread, ...)
|
|
if not success then
|
|
error(debug.traceback(thread, err), 0)
|
|
end
|
|
end
|
|
|
|
local function makeCloser(socket)
|
|
local closer = {
|
|
read = false,
|
|
written = false,
|
|
errored = false,
|
|
}
|
|
|
|
local closed = false
|
|
|
|
local function close()
|
|
if closed then return end
|
|
closed = true
|
|
if not closer.readClosed then
|
|
closer.readClosed = true
|
|
if closer.onClose then
|
|
closer.onClose()
|
|
end
|
|
end
|
|
if not socket:is_closing() then
|
|
socket:close()
|
|
end
|
|
end
|
|
|
|
closer.close = close
|
|
|
|
function closer.check()
|
|
if closer.errored or (closer.read and closer.written) then
|
|
return close()
|
|
end
|
|
end
|
|
|
|
return closer
|
|
end
|
|
|
|
local function makeRead(socket, closer)
|
|
local paused = true
|
|
|
|
local queue = {}
|
|
local tindex = 0
|
|
local dindex = 0
|
|
|
|
local function dispatch(data)
|
|
|
|
-- p("<-", data[1])
|
|
|
|
if tindex > dindex then
|
|
local thread = queue[dindex]
|
|
queue[dindex] = nil
|
|
dindex = dindex + 1
|
|
assertResume(thread, unpack(data))
|
|
else
|
|
queue[dindex] = data
|
|
dindex = dindex + 1
|
|
if not paused then
|
|
paused = true
|
|
assert(socket:read_stop())
|
|
end
|
|
end
|
|
end
|
|
|
|
closer.onClose = function ()
|
|
if not closer.read then
|
|
closer.read = true
|
|
return dispatch {nil, closer.errored}
|
|
end
|
|
end
|
|
|
|
local function onRead(err, chunk)
|
|
if err then
|
|
closer.errored = err
|
|
return closer.check()
|
|
end
|
|
if not chunk then
|
|
if closer.read then return end
|
|
closer.read = true
|
|
dispatch {}
|
|
return closer.check()
|
|
end
|
|
return dispatch {chunk}
|
|
end
|
|
|
|
local function read()
|
|
if dindex > tindex then
|
|
local data = queue[tindex]
|
|
queue[tindex] = nil
|
|
tindex = tindex + 1
|
|
return unpack(data)
|
|
end
|
|
if paused then
|
|
paused = false
|
|
assert(socket:read_start(onRead))
|
|
end
|
|
queue[tindex] = coroutine.running()
|
|
tindex = tindex + 1
|
|
return coroutine.yield()
|
|
end
|
|
|
|
-- Auto use wrapper library for backwards compat
|
|
return read
|
|
end
|
|
|
|
local function makeWrite(socket, closer)
|
|
|
|
local function wait()
|
|
local thread = coroutine.running()
|
|
return function (err)
|
|
assertResume(thread, err)
|
|
end
|
|
end
|
|
|
|
local function write(chunk)
|
|
if closer.written then
|
|
return nil, "already shutdown"
|
|
end
|
|
|
|
-- p("->", chunk)
|
|
|
|
if chunk == nil then
|
|
closer.written = true
|
|
closer.check()
|
|
local success, err = socket:shutdown(wait())
|
|
if not success then
|
|
return nil, err
|
|
end
|
|
err = coroutine.yield()
|
|
return not err, err
|
|
end
|
|
|
|
local success, err = socket:write(chunk, wait())
|
|
if not success then
|
|
closer.errored = err
|
|
closer.check()
|
|
return nil, err
|
|
end
|
|
err = coroutine.yield()
|
|
return not err, err
|
|
end
|
|
|
|
return write
|
|
end
|
|
|
|
local function wrapRead(socket)
|
|
local closer = makeCloser(socket)
|
|
closer.written = true
|
|
return makeRead(socket, closer), closer.close
|
|
end
|
|
|
|
local function wrapWrite(socket)
|
|
local closer = makeCloser(socket)
|
|
closer.read = true
|
|
return makeWrite(socket, closer), closer.close
|
|
end
|
|
|
|
local function wrapStream(socket)
|
|
assert(socket
|
|
and socket.write
|
|
and socket.shutdown
|
|
and socket.read_start
|
|
and socket.read_stop
|
|
and socket.is_closing
|
|
and socket.close, "socket does not appear to be a socket/uv_stream_t")
|
|
|
|
local closer = makeCloser(socket)
|
|
return makeRead(socket, closer), makeWrite(socket, closer), closer.close
|
|
end
|
|
|
|
return {
|
|
wrapRead = wrapRead,
|
|
wrapWrite = wrapWrite,
|
|
wrapStream = wrapStream,
|
|
}
|