You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

433 lines
10 KiB
Lua

--[=[
@c VoiceConnection
@d Represents a connection to a Discord voice server.
]=]
local PCMString = require('voice/streams/PCMString')
local PCMStream = require('voice/streams/PCMStream')
local PCMGenerator = require('voice/streams/PCMGenerator')
local FFmpegProcess = require('voice/streams/FFmpegProcess')
local uv = require('uv')
local ffi = require('ffi')
local constants = require('constants')
local opus = require('voice/opus')
local sodium = require('voice/sodium')
local CHANNELS = 2
local SAMPLE_RATE = 48000 -- Hz
local FRAME_DURATION = 20 -- ms
local COMPLEXITY = 5
local MIN_BITRATE = 8000 -- bps
local MAX_BITRATE = 128000 -- bps
local MIN_COMPLEXITY = 0
local MAX_COMPLEXITY = 10
local MAX_SEQUENCE = 0xFFFF
local MAX_TIMESTAMP = 0xFFFFFFFF
local HEADER_FMT = '>BBI2I4I4'
local PADDING = string.rep('\0', 12)
local MS_PER_NS = 1 / (constants.NS_PER_US * constants.US_PER_MS)
local MS_PER_S = constants.MS_PER_S
local max = math.max
local hrtime = uv.hrtime
local ffi_string = ffi.string
local pack = string.pack -- luacheck: ignore
local format = string.format
local insert = table.insert
local running, resume, yield = coroutine.running, coroutine.resume, coroutine.yield
-- timer.sleep is redefined here to avoid a memory leak in the luvit module
local function sleep(delay)
local thread = running()
local t = uv.new_timer()
t:start(delay, 0, function()
t:stop()
t:close()
return assert(resume(thread))
end)
return yield()
end
local function asyncResume(thread)
local t = uv.new_timer()
t:start(0, 0, function()
t:stop()
t:close()
return assert(resume(thread))
end)
end
local function check(n, mn, mx)
if not tonumber(n) or n < mn or n > mx then
return error(format('Value must be a number between %s and %s', mn, mx), 2)
end
return n
end
local VoiceConnection, get = require('class')('VoiceConnection')
function VoiceConnection:__init(channel)
self._channel = channel
self._pending = {}
end
function VoiceConnection:_prepare(key, socket)
self._key = sodium.key(key)
self._socket = socket
self._ip = socket._ip
self._port = socket._port
self._udp = socket._udp
self._ssrc = socket._ssrc
self._mode = socket._mode
self._manager = socket._manager
self._client = socket._client
self._s = 0
self._t = 0
self._encoder = opus.Encoder(SAMPLE_RATE, CHANNELS)
self:setBitrate(self._client._options.bitrate)
self:setComplexity(COMPLEXITY)
self._ready = true
self:_continue(true)
end
function VoiceConnection:_await()
local thread = running()
insert(self._pending, thread)
if not self._timeout then
local t = uv.new_timer()
t:start(10000, 0, function()
t:stop()
t:close()
self._timeout = nil
if not self._ready then
local id = self._channel and self._channel._id
return self:_cleanup(format('voice connection for channel %s failed to initialize', id))
end
end)
self._timeout = t
end
return yield()
end
function VoiceConnection:_continue(success, err)
local t = self._timeout
if t then
t:stop()
t:close()
self._timeout = nil
end
for i, thread in ipairs(self._pending) do
self._pending[i] = nil
assert(resume(thread, success, err))
end
end
function VoiceConnection:_cleanup(err)
self:stopStream()
self._ready = nil
self._channel._parent._connection = nil
self._channel._connection = nil
self:_continue(nil, err or 'connection closed')
end
--[=[
@m getBitrate
@t mem
@r nil
@d Returns the bitrate of the interal Opus encoder in bits per second (bps).
]=]
function VoiceConnection:getBitrate()
return self._encoder:get(opus.GET_BITRATE_REQUEST)
end
--[=[
@m setBitrate
@t mem
@p bitrate number
@r nil
@d Sets the bitrate of the interal Opus encoder in bits per second (bps).
This should be between 8000 and 128000, inclusive.
]=]
function VoiceConnection:setBitrate(bitrate)
bitrate = check(bitrate, MIN_BITRATE, MAX_BITRATE)
self._encoder:set(opus.SET_BITRATE_REQUEST, bitrate)
end
--[=[
@m getComplexity
@t mem
@r number
@d Returns the complexity of the interal Opus encoder.
]=]
function VoiceConnection:getComplexity()
return self._encoder:get(opus.GET_COMPLEXITY_REQUEST)
end
--[=[
@m setComplexity
@t mem
@p complexity number
@r nil
@d Sets the complexity of the interal Opus encoder.
This should be between 0 and 10, inclusive.
]=]
function VoiceConnection:setComplexity(complexity)
complexity = check(complexity, MIN_COMPLEXITY, MAX_COMPLEXITY)
self._encoder:set(opus.SET_COMPLEXITY_REQUEST, complexity)
end
---- debugging
local t0, m0
local t_sum, m_sum, n = 0, 0, 0
local function open() -- luacheck: ignore
-- collectgarbage()
m0 = collectgarbage('count')
t0 = hrtime()
end
local function close() -- luacheck: ignore
local dt = (hrtime() - t0) * MS_PER_NS
local dm = collectgarbage('count') - m0
n = n + 1
t_sum = t_sum + dt
m_sum = m_sum + dm
print(format('dt: %g | dm: %g | avg dt: %g | avg dm: %g', dt, dm, t_sum / n, m_sum / n))
end
---- debugging
function VoiceConnection:_play(stream, duration)
self:stopStream()
self:_setSpeaking(true)
duration = tonumber(duration) or math.huge
local elapsed = 0
local udp, ip, port = self._udp, self._ip, self._port
local ssrc, key = self._ssrc, self._key
local encoder = self._encoder
local frame_size = SAMPLE_RATE * FRAME_DURATION / MS_PER_S
local pcm_len = frame_size * CHANNELS
local start = hrtime()
local reason
while elapsed < duration do
local pcm = stream:read(pcm_len)
if not pcm then
reason = 'stream exhausted or errored'
break
end
local data, len = encoder:encode(pcm, pcm_len, frame_size, pcm_len * 2)
if not data then
reason = 'could not encode audio data'
break
end
local s, t = self._s, self._t
local header = pack(HEADER_FMT, 0x80, 0x78, s, t, ssrc)
s = s + 1
t = t + frame_size
self._s = s > MAX_SEQUENCE and 0 or s
self._t = t > MAX_TIMESTAMP and 0 or t
local encrypted, encrypted_len = sodium.encrypt(data, len, header .. PADDING, key)
if not encrypted then
reason = 'could not encrypt audio data'
break
end
local packet = header .. ffi_string(encrypted, encrypted_len)
udp:send(packet, ip, port)
elapsed = elapsed + FRAME_DURATION
local delay = elapsed - (hrtime() - start) * MS_PER_NS
sleep(max(delay, 0))
if self._paused then
asyncResume(self._paused)
self._paused = running()
local pause = hrtime()
yield()
start = start + hrtime() - pause
asyncResume(self._resumed)
self._resumed = nil
end
if self._stopped then
reason = 'stream stopped'
break
end
end
self:_setSpeaking(false)
if self._stopped then
asyncResume(self._stopped)
self._stopped = nil
end
return elapsed, reason
end
function VoiceConnection:_setSpeaking(speaking)
self._speaking = speaking
return self._socket:setSpeaking(speaking)
end
--[=[
@m playPCM
@t mem
@p source string/function/table/userdata
@op duration number
@r number
@r string
@d Plays PCM data over the established connection. If a duration (in milliseconds)
is provided, the audio stream will automatically stop after that time has elapsed;
otherwise, it will play until the source is exhausted. The returned number is the
time elapsed while streaming and the returned string is a message detailing the
reason why the stream stopped. For more information about acceptable sources,
see the [[voice]] page.
]=]
function VoiceConnection:playPCM(source, duration)
if not self._ready then
return nil, 'Connection is not ready'
end
local t = type(source)
local stream
if t == 'string' then
stream = PCMString(source)
elseif t == 'function' then
stream = PCMGenerator(source)
elseif (t == 'table' or t == 'userdata') and type(source.read) == 'function' then
stream = PCMStream(source)
else
return error('Invalid audio source: ' .. tostring(source))
end
return self:_play(stream, duration)
end
--[=[
@m playFFmpeg
@t mem
@p path string
@op duration number
@r number
@r string
@d Plays audio over the established connection using an FFmpeg process, assuming
FFmpeg is properly configured. If a duration (in milliseconds)
is provided, the audio stream will automatically stop after that time has elapsed;
otherwise, it will play until the source is exhausted. The returned number is the
time elapsed while streaming and the returned string is a message detailing the
reason why the stream stopped. For more information about using FFmpeg,
see the [[voice]] page.
]=]
function VoiceConnection:playFFmpeg(path, duration)
if not self._ready then
return nil, 'Connection is not ready'
end
local stream = FFmpegProcess(path, SAMPLE_RATE, CHANNELS)
local elapsed, reason = self:_play(stream, duration)
stream:close()
return elapsed, reason
end
--[=[
@m pauseStream
@t mem
@r nil
@d Temporarily pauses the audio stream for this connection, if one is active.
Like most Discordia methods, this must be called inside of a coroutine, as it
will yield until the stream is actually paused, usually on the next tick.
]=]
function VoiceConnection:pauseStream()
if not self._speaking then return end
if self._paused then return end
self._paused = running()
return yield()
end
--[=[
@m resumeStream
@t mem
@r nil
@d Resumes the audio stream for this connection, if one is active and paused.
Like most Discordia methods, this must be called inside of a coroutine, as it
will yield until the stream is actually resumed, usually on the next tick.
]=]
function VoiceConnection:resumeStream()
if not self._speaking then return end
if not self._paused then return end
asyncResume(self._paused)
self._paused = nil
self._resumed = running()
return yield()
end
--[=[
@m stopStream
@t mem
@r nil
@d Irreversibly stops the audio stream for this connection, if one is active.
Like most Discordia methods, this must be called inside of a coroutine, as it
will yield until the stream is actually stopped, usually on the next tick.
]=]
function VoiceConnection:stopStream()
if not self._speaking then return end
if self._stopped then return end
self._stopped = running()
self:resumeStream()
return yield()
end
--[=[
@m close
@t ws
@r boolean
@d Stops the audio stream for this connection, if one is active, disconnects from
the voice server, and leaves the corresponding voice channel. Like most Discordia
methods, this must be called inside of a coroutine.
]=]
function VoiceConnection:close()
self:stopStream()
if self._socket then
self._socket:disconnect()
end
local guild = self._channel._parent
return self._client._shards[guild.shardId]:updateVoice(guild._id)
end
--[=[@p channel GuildVoiceChannel/nil The corresponding GuildVoiceChannel for
this connection, if one exists.]=]
function get.channel(self)
return self._channel
end
return VoiceConnection