You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
122 lines
2.7 KiB
Lua
122 lines
2.7 KiB
Lua
3 years ago
|
local json = require('json')
|
||
|
local miniz = require('miniz')
|
||
|
local Mutex = require('utils/Mutex')
|
||
|
local Emitter = require('utils/Emitter')
|
||
|
local Stopwatch = require('utils/Stopwatch')
|
||
|
|
||
|
local websocket = require('coro-websocket')
|
||
|
local constants = require('constants')
|
||
|
|
||
|
local inflate = miniz.inflate
|
||
|
local encode, decode, null = json.encode, json.decode, json.null
|
||
|
local ws_parseUrl, ws_connect = websocket.parseUrl, websocket.connect
|
||
|
|
||
|
local GATEWAY_DELAY = constants.GATEWAY_DELAY
|
||
|
|
||
|
local TEXT = 1
|
||
|
local BINARY = 2
|
||
|
local CLOSE = 8
|
||
|
|
||
|
local function connect(url, path)
|
||
|
local options = assert(ws_parseUrl(url))
|
||
|
options.pathname = path
|
||
|
return assert(ws_connect(options))
|
||
|
end
|
||
|
|
||
|
local WebSocket = require('class')('WebSocket', Emitter)
|
||
|
|
||
|
function WebSocket:__init(parent)
|
||
|
Emitter.__init(self)
|
||
|
self._parent = parent
|
||
|
self._mutex = Mutex()
|
||
|
self._sw = Stopwatch()
|
||
|
end
|
||
|
|
||
|
function WebSocket:connect(url, path)
|
||
|
|
||
|
local success, res, read, write = pcall(connect, url, path)
|
||
|
|
||
|
if success then
|
||
|
self._read = read
|
||
|
self._write = write
|
||
|
self._reconnect = nil
|
||
|
self:info('Connected to %s', url)
|
||
|
local parent = self._parent
|
||
|
for message in self._read do
|
||
|
local payload, str = self:parseMessage(message)
|
||
|
if not payload then break end
|
||
|
parent:emit('raw', str)
|
||
|
if self.handlePayload then -- virtual method
|
||
|
self:handlePayload(payload)
|
||
|
end
|
||
|
end
|
||
|
self:info('Disconnected')
|
||
|
else
|
||
|
self:error('Could not connect to %s (%s)', url, res) -- TODO: get new url?
|
||
|
end
|
||
|
|
||
|
self._read = nil
|
||
|
self._write = nil
|
||
|
self._identified = nil
|
||
|
|
||
|
if self.stopHeartbeat then -- virtual method
|
||
|
self:stopHeartbeat()
|
||
|
end
|
||
|
|
||
|
if self.handleDisconnect then -- virtual method
|
||
|
return self:handleDisconnect(url, path)
|
||
|
end
|
||
|
|
||
|
end
|
||
|
|
||
|
function WebSocket:parseMessage(message)
|
||
|
|
||
|
local opcode = message.opcode
|
||
|
local payload = message.payload
|
||
|
|
||
|
if opcode == TEXT then
|
||
|
|
||
|
return decode(payload, 1, null), payload
|
||
|
|
||
|
elseif opcode == BINARY then
|
||
|
|
||
|
payload = inflate(payload, 1)
|
||
|
return decode(payload, 1, null), payload
|
||
|
|
||
|
elseif opcode == CLOSE then
|
||
|
|
||
|
local code, i = ('>H'):unpack(payload)
|
||
|
local msg = #payload > i and payload:sub(i) or 'Connection closed'
|
||
|
self:warning('%i - %s', code, msg)
|
||
|
return nil
|
||
|
|
||
|
end
|
||
|
|
||
|
end
|
||
|
|
||
|
function WebSocket:_send(op, d, identify)
|
||
|
self._mutex:lock()
|
||
|
local success, err
|
||
|
if identify or self._session_id then
|
||
|
if self._write then
|
||
|
success, err = self._write {opcode = TEXT, payload = encode {op = op, d = d}}
|
||
|
else
|
||
|
success, err = false, 'Not connected to gateway'
|
||
|
end
|
||
|
else
|
||
|
success, err = false, 'Invalid session'
|
||
|
end
|
||
|
self._mutex:unlockAfter(GATEWAY_DELAY)
|
||
|
return success, err
|
||
|
end
|
||
|
|
||
|
function WebSocket:disconnect(reconnect)
|
||
|
if not self._write then return end
|
||
|
self._reconnect = not not reconnect
|
||
|
self._write()
|
||
|
self._read = nil
|
||
|
self._write = nil
|
||
|
end
|
||
|
|
||
|
return WebSocket
|