You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
4.0 KiB

'use strict'
var Transform = require('readable-stream').Transform
var duplexify = require('duplexify')
var WS = require('ws')
var Buffer = require('safe-buffer').Buffer
module.exports = WebSocketStream
function buildProxy (options, socketWrite, socketEnd) {
var proxy = new Transform({
objectMode: options.objectMode
})
proxy._write = socketWrite
proxy._flush = socketEnd
return proxy
}
function WebSocketStream(target, protocols, options) {
var stream, socket
var isBrowser = process.title === 'browser'
var isNative = !!global.WebSocket
var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode
if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
// accept the "options" Object as the 2nd argument
options = protocols
protocols = null
if (typeof options.protocol === 'string' || Array.isArray(options.protocol)) {
protocols = options.protocol;
}
}
if (!options) options = {}
if (options.objectMode === undefined) {
options.objectMode = !(options.binary === true || options.binary === undefined)
}
var proxy = buildProxy(options, socketWrite, socketEnd)
if (!options.objectMode) {
proxy._writev = writev
}
// browser only: sets the maximum socket buffer size before throttling
var bufferSize = options.browserBufferSize || 1024 * 512
// browser only: how long to wait when throttling
var bufferTimeout = options.browserBufferTimeout || 1000
// use existing WebSocket object that was passed in
if (typeof target === 'object') {
socket = target
// otherwise make a new one
} else {
// special constructor treatment for native websockets in browsers, see
// https://github.com/maxogden/websocket-stream/issues/82
if (isNative && isBrowser) {
socket = new WS(target, protocols)
} else {
socket = new WS(target, protocols, options)
}
socket.binaryType = 'arraybuffer'
}
// was already open when passed in
if (socket.readyState === socket.OPEN) {
stream = proxy
} else {
stream = stream = duplexify(undefined, undefined, options)
if (!options.objectMode) {
stream._writev = writev
}
socket.onopen = onopen
}
stream.socket = socket
socket.onclose = onclose
socket.onerror = onerror
socket.onmessage = onmessage
proxy.on('close', destroy)
var coerceToBuffer = !options.objectMode
function socketWriteNode(chunk, enc, next) {
// avoid errors, this never happens unless
// destroy() is called
if (socket.readyState !== socket.OPEN) {
next()
return
}
if (coerceToBuffer && typeof chunk === 'string') {
chunk = Buffer.from(chunk, 'utf8')
}
socket.send(chunk, next)
}
function socketWriteBrowser(chunk, enc, next) {
if (socket.bufferedAmount > bufferSize) {
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
return
}
if (coerceToBuffer && typeof chunk === 'string') {
chunk = Buffer.from(chunk, 'utf8')
}
try {
socket.send(chunk)
} catch(err) {
return next(err)
}
next()
}
function socketEnd(done) {
socket.close()
done()
}
function onopen() {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
}
function onclose() {
stream.end()
stream.destroy()
}
function onerror(err) {
stream.destroy(err)
}
function onmessage(event) {
var data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
}
function destroy() {
socket.close()
}
// this is to be enabled only if objectMode is false
function writev (chunks, cb) {
var buffers = new Array(chunks.length)
for (var i = 0; i < chunks.length; i++) {
if (typeof chunks[i].chunk === 'string') {
buffers[i] = Buffer.from(chunks[i], 'utf8')
} else {
buffers[i] = chunks[i].chunk
}
}
this._write(Buffer.concat(buffers), 'binary', cb)
}
return stream
}