9.5 代理服务端
UDPack Server 端创建 UDPack 实例对象 udpack 后,可通过在 udpack 上监听 stream 事件获得新打开的 Stream 实例对象 stream。
updack.on('stream', (stream) => {})
然后可通过在 stream 对象上监听 data 事件获得从 UDPack Client 发过来的数据。
stream.on('data', (data) => {})
然后通过 protobuf 解码数据获得 Packet 对象,根据 cmd 命令字进行不同的逻辑处理。如 if (packet.cmd === 'connect')
会根据 packet.authority
参数打开一个到 authority(如: www.google.com) 的连接通道。后续 if (packet.cmd === 'data')
则会把 UDPack Client 发过来的数据通过刚刚打开的连接通道转发给 authority(如: www.google.com)。
const dstSocket = net.createConnection({
port: packet.authority.port,
host: ip
})
上方为打开到 authority 的连接通道的代码。从 authority(如: www.google.com) 发回来的数据,会通过 stream 转发给 UDPack Client。代码如下所示:
dstSocket.on('data', resdata => {
stream._$writeStream.write(resdata, (err) => {
if (err) {
logger.error('_cache.write', err)
} else {
stream._$bytesWritten += resdata.length
}
})
})
浏览网页时,通常上行数据远小于下行数据。例如,下载文件时会有源源不断的数据从资源网站返回到 UDPack Server,再从 UDPack Server 转发给 UDPack Client,最后返回给 Browser。如果企业内部员工较多,大家都通过代理服务器上网,如果所有数据都缓存在内存中,可能会造成内存溢出。因此最好采用文件进行缓冲,UDPack Server 从资源网站接收的数据先写入临时缓存文件,然后从缓存文件中读取数据,并通过 stream 转发给 UDPack Client。通过文件缓冲区,可以避免 UDPack Server 端内存的大量占用。
下面给出较完整的代码:
udpack.on('stream', (stream) => {
stream._$interval = 10,
stream._$sendMaxSpeedKBps = 25 * 1024,
stream._$bytesRead = 0,
stream._$bytesWritten = 0,
stream._$sendBuffersInterval = setInterval(() => {
if (!stream._$readStream) {
let maxBytes = Math.floor((stream._$sendMaxSpeedKBps * 1024 / 1000) * stream._$interval)
let start = stream._$bytesRead,
end = Math.min(stream._$bytesRead + (maxBytes - 1), stream._$bytesWritten - 1),
emitClose = true,
highWaterMark = 256 * 1024
if (start <= end) {
stream._$readStream = fs.createReadStream(
stream._$filename,
{ start, end, emitClose, highWaterMark }
)
stream._$readStream.on('data', (chunk) => {
stream._$bytesRead += chunk.length
if (stream.isReady()) {
let sPacket = Packet.create({
sid: stream.sid,
cmd: 'data',
buf: chunk
})
let buffer = Packet.encode(sPacket).finish()
stream._$readStream.pause()
let cb = (err) => {
if (err) {
stream.write(buffer, cb)
} else {
if (stream._$readStream) {
stream._$readStream.resume()
}
}
}
stream.write(buffer, cb)
}
}).on('close', () => {
delete stream._$readStream
})
}
}
}, stream._$interval)
stream.on('data', (buffer) => {
let packet = Packet.decode(buffer)
stream.sid = packet.sid
if (packet.cmd === 'connect') {
let dstHost = packet.authority.domain || packet.authority.ip
dns.lookup(dstHost, { family: 4 }, (err, ip) => {
if (err || !ip) {
logger.error('dnslookup', err, ip)
} else {
const dstSocket = net.createConnection({
port: packet.authority.port,
host: ip
})
stream.dstSocket = dstSocket
dstSocket
.setNoDelay()
.on('connect', () => {
let connectRes = ConnectRes.create({
succeeded: true,
localAddress: dstSocket.localAddress,
localPort: dstSocket.localPort
})
let sPacket = Packet.create({
sid: packet.sid,
cmd: 'connect',
connectRes
})
let buffer = Packet.encode(sPacket).finish()
stream.write(buffer, (err) => {
if (err) {
logger.error(err.message, err.stack)
}
})
stream._$filename = path.join(_cacheDir, stream.sid)
stream._$writeStream = fs.createWriteStream(stream._$filename)
stream._$writeStream.on('error', (err) => {
logger.error('_cache.write.on.error', err)
})
}).on('data', resdata => {
stream._$writeStream.write(resdata, (err) => {
if (err) {
logger.error('_cache.write', err)
} else {
stream._$bytesWritten += resdata.length
}
})
}).on('end', () => {
stream._$writeStream.end()
}).on('close', () => {
let sPacket = Packet.create({
sid: packet.sid,
cmd: 'terminate',
reason: 'dstSocket closed'
})
let buffer = Packet.encode(sPacket).finish()
stream.write(buffer, (err) => {
if (err) {
logger.error(err.message, err.stack)
}
})
})
}
})
} else if (packet.cmd === 'data') {
let reqdata = packet.buf
let dstSocket = stream.dstSocket
if (!!dstSocket && dstSocket.writable) {
dstSocket.write(reqdata)
}
} else if (packet.cmd === 'terminate') {
stream.close()
}
}).on('close', () => {
clearInterval(stream._$sendBuffersInterval)
if (stream.dstSocket) {
if (!stream.dstSocket.destroyed) {
stream.dstSocket.destroy()
}
}
if (stream._$filename) {
fs.rm(stream._$filename, { force: true }, (err) => {
if (err) {
logger.error(`_cache.delete ${stream._$filename} error`, err)
} else {
logger.debug(`_cache.delete ${stream._$filename}`)
}
})
}
})
})