9.5 代理服务端

UDPack Server 端创建 UDPack 实例对象 udpack 后,可通过在 udpack 上监听 stream 事件获得新打开的 Stream 实例对象 stream。

updack.on('stream', (stream) => {})

然后可通过在 stream 对象上监听 data 事件获得从 UDPack Client 发过来的数据。

stream.on('data', (data) => {})

然后通过 protobuf 解码数据获得 Packet 对象,根据 cmd 命令字进行不同的逻辑处理。如 if (packet.cmd === 'connect') 会根据 packet.authority 参数打开一个到 authority(如: www.google.com) 的连接通道。后续 if (packet.cmd === 'data') 则会把 UDPack Client 发过来的数据通过刚刚打开的连接通道转发给 authority(如: www.google.com)。

const dstSocket = net.createConnection({
  port: packet.authority.port,
  host: ip
})

上方为打开到 authority 的连接通道的代码。从 authority(如: www.google.com) 发回来的数据,会通过 stream 转发给 UDPack Client。代码如下所示:

dstSocket.on('data', resdata => {
  stream._$writeStream.write(resdata, (err) => {
    if (err) {
      logger.error('_cache.write', err)
    } else {
      stream._$bytesWritten += resdata.length
    }
  })
})

浏览网页时,通常上行数据远小于下行数据。例如,下载文件时会有源源不断的数据从资源网站返回到 UDPack Server,再从 UDPack Server 转发给 UDPack Client,最后返回给 Browser。如果企业内部员工较多,大家都通过代理服务器上网,如果所有数据都缓存在内存中,可能会造成内存溢出。因此最好采用文件进行缓冲,UDPack Server 从资源网站接收的数据先写入临时缓存文件,然后从缓存文件中读取数据,并通过 stream 转发给 UDPack Client。通过文件缓冲区,可以避免 UDPack Server 端内存的大量占用。

下面给出较完整的代码:

udpack.on('stream', (stream) => {
  stream._$interval = 10,
  stream._$sendMaxSpeedKBps = 25 * 1024,
  stream._$bytesRead = 0,
  stream._$bytesWritten = 0,
  stream._$sendBuffersInterval = setInterval(() => {
    if (!stream._$readStream) {
      let maxBytes = Math.floor((stream._$sendMaxSpeedKBps * 1024 / 1000) * stream._$interval)
      let start = stream._$bytesRead,
        end = Math.min(stream._$bytesRead + (maxBytes - 1), stream._$bytesWritten - 1),
        emitClose = true,
        highWaterMark = 256 * 1024
      if (start <= end) {
        stream._$readStream = fs.createReadStream(
          stream._$filename,
          { start, end, emitClose, highWaterMark }
        )
        stream._$readStream.on('data', (chunk) => {
          stream._$bytesRead += chunk.length

          if (stream.isReady()) {
            let sPacket = Packet.create({
              sid: stream.sid,
              cmd: 'data',
              buf: chunk
            })

            let buffer = Packet.encode(sPacket).finish()
            stream._$readStream.pause()
            let cb = (err) => {
              if (err) {
                stream.write(buffer, cb)
              } else {
                if (stream._$readStream) {
                  stream._$readStream.resume()
                }
              }
            }
            stream.write(buffer, cb)
          }
        }).on('close', () => {
          delete stream._$readStream
        })
      }
    }
  }, stream._$interval)

  stream.on('data', (buffer) => {
    let packet = Packet.decode(buffer)
    stream.sid = packet.sid
    if (packet.cmd === 'connect') {
      let dstHost = packet.authority.domain || packet.authority.ip
      dns.lookup(dstHost, { family: 4 }, (err, ip) => {
        if (err || !ip) {
          logger.error('dnslookup', err, ip)
        } else {
          const dstSocket = net.createConnection({
            port: packet.authority.port,
            host: ip
          })

          stream.dstSocket = dstSocket

          dstSocket
          .setNoDelay()
          .on('connect', () => {
            let connectRes = ConnectRes.create({
              succeeded: true,
              localAddress: dstSocket.localAddress,
              localPort: dstSocket.localPort
            })

            let sPacket = Packet.create({
              sid: packet.sid,
              cmd: 'connect',
              connectRes
            })

            let buffer = Packet.encode(sPacket).finish()

            stream.write(buffer, (err) => {
              if (err) {
                logger.error(err.message, err.stack)
              }
            })
            stream._$filename = path.join(_cacheDir, stream.sid)
            stream._$writeStream = fs.createWriteStream(stream._$filename)
            stream._$writeStream.on('error', (err) => {
              logger.error('_cache.write.on.error', err)
            })
          }).on('data', resdata => {
            stream._$writeStream.write(resdata, (err) => {
              if (err) {
                logger.error('_cache.write', err)
              } else {
                stream._$bytesWritten += resdata.length
              }
            })
          }).on('end', () => {
            stream._$writeStream.end()
          }).on('close', () => {
            let sPacket = Packet.create({
              sid: packet.sid,
              cmd: 'terminate',
              reason: 'dstSocket closed'
            })
            let buffer = Packet.encode(sPacket).finish()
            stream.write(buffer, (err) => {
              if (err) {
                logger.error(err.message, err.stack)
              }
            })
          })
        }
      })
    } else if (packet.cmd === 'data') {
      let reqdata = packet.buf
      let dstSocket = stream.dstSocket
      if (!!dstSocket && dstSocket.writable) {
        dstSocket.write(reqdata)
      }
    } else if (packet.cmd === 'terminate') {
      stream.close()
    }
  }).on('close', () => {
    clearInterval(stream._$sendBuffersInterval)
    if (stream.dstSocket) {
      if (!stream.dstSocket.destroyed) {
        stream.dstSocket.destroy()
      }
    }
    if (stream._$filename) {
      fs.rm(stream._$filename, { force: true }, (err) => {
        if (err) {
          logger.error(`_cache.delete ${stream._$filename} error`, err)
        } else {
          logger.debug(`_cache.delete ${stream._$filename}`)
        }
      })
    }
  })
})