第十章 UDPack 应用之实时双向通信框架
应用 UDPack 协议实现一个实时双向通信框架是很容易的,首先要考虑的是引入一个通用的对象序列化协议,可在对象与字节数组之间相互转换。而 msgpack 正是这样的一个对象序列化协议,类似 JSON,但是速度更快,序列化后数据更小。msgpack 有各种语言的实现版本,也包含 javascript 版本。
下面给出该通信框架(udpack.io)的全部代码:
const EventEmitter = require('events')
const UDPack = require('udpack')()
const log4js = require('log4js')
const logger = log4js.getLogger('udpack.io')
const { Encoder, Decoder } = require('@msgpack/msgpack')
const encoder = new Encoder()
const decoder = new Decoder()
const STATE = {
READY: 0,
CLOSE: -1
}
class Socket extends EventEmitter {
constructor(session, opts) {
opts = opts || {}
super(opts)
this._$session = session
this.opts = opts
session._$udpackio_socket = this
this.state = STATE.READY
}
close() {
if (this.state !== STATE.CLOSE) {
this._$session.close()
delete this._$session._$udpackio_socket
delete this._$session
}
return this
}
}
class Channel extends EventEmitter {
constructor(stream, opts) {
opts = opts || {}
super(opts)
this._$stream = stream
this.opts = opts
stream._$udpackio_channel = this
this.state = STATE.READY
}
send(event, data, cb) {
let encoded = encoder.encode([event, data])
let buffer = Buffer.from(encoded)
logger.debug(this._$stream.session.netSessionIdStr(), this._$stream.id, 'length', buffer.length)
this._$stream.write(buffer, cb)
}
close() {
if (this.state !== STATE.CLOSE) {
this._$stream.close()
delete this._$stream._$udpackio_channel
delete this._$stream
}
return this
}
}
class Server extends EventEmitter {
constructor(opts) {
opts = opts || {}
super(opts)
this.opts = opts
const { port, host, pubkey, prikey, authToken } = opts
this.udpack = new UDPack({ port, host, pubkey, prikey, authToken })
this.udpack.on('listening', () => {
logger.debug('listenling on', this.udpack.endpoint)
this.emit('listening')
}).on('session', (session) => {
let socket = new Socket(session)
session.on('goaway', (code) => {
logger.debug(`[session-${session.netSessionIdStr()}] goaway [${code}]`)
socket.close().emit('close')
})
this.emit('connection', socket)
}).on('stream', (stream) => {
let channel = new Channel(stream)
stream.on('data', (buffer) => {
logger.debug(stream.session.netSessionIdStr(), stream.id, 'length', buffer.length)
try {
let [event, data] = decoder.decode(buffer)
channel.emit(event, data)
} catch (e) {
logger.error(stream.session.netSessionIdStr(), stream.id, buffer, e)
}
}).on('close', () => {
channel.close().emit('close')
})
this.emit('channel', channel)
}).on('speed', (speed) => {
this.emit('speed', speed)
})
this.state = STATE.READY
}
close() {
if (this.state !== STATE.CLOSE) {
this.udpack.close()
}
return this
}
}
class Client extends EventEmitter {
constructor(opts) {
opts = opts || {}
super(opts)
this.opts = opts
const { port, host, pubkey, token } = opts
this.udpack = new UDPack({ port, host, pubkey, token })
this.udpack.on('listening', () => {
logger.debug('listenling on', this.udpack.endpoint)
this.emit('listening')
})
}
connect(serverPort, serverAddr, cb) {
this.udpack.on('session', (session) => {
let socket = new Socket(session)
session.on('goaway', (code) => {
logger.debug(`[session-${session.netSessionIdStr()}] goaway [${code}]`)
socket.close().emit('close')
this.udpack.connect(serverPort, serverAddr, () => {
logger.debug('reconnect to server...')
})
})
this.emit('connection', socket)
})
.on('connect_error', (session) => {
this.emit('connect_error')
})
.on('speed', (speed) => {
this.emit('speed', speed)
})
.connect(serverPort, serverAddr, cb)
}
openChannel(cb) {
this.udpack.openStream((err, stream) => {
if (err) {
cb(err)
} else {
let channel = new Channel(stream)
stream.on('data', (buffer) => {
logger.debug(stream.session.netSessionIdStr(), stream.id, 'length', buffer.length)
try {
let [event, data] = decoder.decode(buffer)
channel.emit(event, data)
} catch (e) {
logger.error(stream.session.netSessionIdStr(), stream.id, buffer, e)
}
}).on('close', () => {
channel.close().emit('close')
})
cb(null, channel)
}
})
}
close() {
if (this.state !== STATE.CLOSE) {
this.udpack.close()
}
return this
}
}
module.exports = {
Client, Server
}
可以看到该框架把 UDPack 协议中的 Session 和 Stream 对象分别封装为了 Socket 和 Channel 对象。并用 UDPack 对象分别实现了 Server 以及 Client 对象。
下面给出一段利用上述框架实现文件上传的示例代码:
文件上传服务端
const fs = require('fs')
const { Server } = require('udpack.io')
const env = 'development'
const level = env === 'development' ? 'debug' : 'info'
const log4js = require('log4js')
log4js.configure({
appenders: {
udpack: { type: 'file', filename: 'logs/udpack.log' },
'udpack.io': { type: 'file', filename: 'logs/udpack.io.log' },
server: { type: 'file', filename: 'logs/server.log' },
},
categories: {
udpack: { appenders: ['udpack'], level },
'udpack.io': { appenders: ['udpack.io'], level },
default: { appenders: ['server'], level },
}
})
const logger = log4js.getLogger('server')
let port = 45456,
host = '0.0.0.0',
pubkey = fs.readFileSync(`${__dirname}/keys/pubkey.pem`),
prikey = fs.readFileSync(`${__dirname}/keys/prikey.pem`),
authToken = (token) => (token === 'ea970f20b09311ebbe1d11acebc10dbf')
const opts = { port, host, pubkey, prikey, authToken }
const server = new Server(opts)
let total = 0
let received = 0
console.time('used')
let cb = (err) => {
if (err) {
logger.error(err)
server.close()
}
}
server.on('channel', (channel) => {
channel.on('file', (data) => {
let writeStream = fs.createWriteStream(data.filename)
writeStream.on('error', cb)
total = data.filesize
channel.on('chunk', (data) => {
logger.debug('received data ', data.chunk.length, ' bytes')
received += data.chunk.length
writeStream.write(data.chunk, cb)
}).on('finish', () => {
logger.debug('recv finished')
console.timeEnd('used')
writeStream.end()
channel.send('done', {}, cb)
}).on('done', () => {
server.close()
})
})
}).on('speed', (speed) => {
console.clear()
console.debug(`[${((received / (total || 0.00001)) * 100).toFixed(2)}%]`, `[recv.speed ${speed.recv} KB/s]`)
})
通过 const opts = { port, host, pubkey, prikey, authToken }
创建 Server 对象实例后,监听 channel 事件,并通过 channel 实例读取上传文件数据。这里用了自定义的文件上传协议。当客户端向服务端上传文件时,会触发 channel 对象的 file 事件,此时可打开一个本地文件。然后监听 chunk 和 finish 事件,通过 chunk 事件可获取文件数据流。finish 事件代表文件传输已完成。
文件上传客户端
const fs = require('fs')
const { Client } = require('udpack.io')
const path = require('path')
const env = 'development'
const level = env === 'development' ? 'debug' : 'info'
const log4js = require('log4js')
log4js.configure({
appenders: {
udpack: { type: 'file', filename: 'logs/udpack-client.log' },
'udpack.io': { type: 'file', filename: 'logs/udpack.io-client.log' },
client: { type: 'file', filename: 'logs/client.log' },
},
categories: {
udpack: { appenders: ['udpack'], level },
'udpack.io': { appenders: ['udpack.io'], level },
default: { appenders: ['client'], level },
}
})
const logger = log4js.getLogger('client')
let serverAddr = '127.0.0.1',
serverPort = 45456,
port = 0, // os will select a random port
host = '0.0.0.0',
pubkey = fs.readFileSync(`${__dirname}/keys/pubkey.pem`),
token = 'ea970f20b09311ebbe1d11acebc10dbf'
const opts = { port, host, pubkey, token }
const client = new Client(opts)
client.connect(serverPort, serverAddr, (err) => {
if (err) {
logger.error(err)
client.close()
}
})
let total = 0
let sent = 0
console.time('used')
let cb = (err) => {
if (err) {
logger.error(err)
client.close()
}
}
client.on('connection', (socket) => {
client.openChannel((err, channel) => {
if (err) {
cb(err)
} else {
let args = process.argv.slice(2)
let pathname = args[0]
let filename = path.basename(pathname)
let filesize = fs.statSync(pathname).size
total = filesize
channel.send('file', { filename, filesize }, (err) => {
if (err) {
cb(err)
} else {
const readStream = fs.createReadStream(pathname, { highWaterMark: 1024 * 1024 })
readStream.on('data', (chunk) => {
readStream.pause()
channel.send('chunk', { chunk }, (err) => {
if (err) {
cb(err)
} else {
logger.debug('send data ', chunk.length, ' bytes')
sent += chunk.length
readStream.resume()
}
})
}).on('end', () => {
logger.debug('send finished')
console.timeEnd('used')
channel.send('finish')
}).on('error', cb)
channel.on('done', () => {
channel.send('done', {}, (err) => {
if (err) {
logger.error(err)
}
client.close()
})
})
}
})
}
})
}).on('connect_error', () => {
logger.debug('connect error')
client.close()
}).on('speed', (speed) => {
console.clear()
console.debug(`[${((sent / (total || 0.00001)) * 100).toFixed(2)}%]`, `[send.speed ${speed.send} KB/s]`)
})
通过 const opts = { port, host, pubkey, token }
创建 Client 实例后,可调用 client.connect
连接服务器端,当连接成功后会触发 connection 事件,并获得 socket 对象实例。触发 connection 事件代表连接已完成,此时可通过调用 client.openChannel 方法打开新的 channel,最后通过此 channel 对象与服务端进行通信。
客户端打开 channel 后,首先读取本地文件,并向服务器发送 file 事件,传递文件名和文件大小。然后开始向服务器发送 chunk 事件,传输文件数据。完成后向服务器发送 finish 事件。