//filter will reemit the data if cb(err,pass) pass is truthy // reduce is more tricky // maybe we want to group the reductions or emit progress updates occasionally // the most basic reduce just emits one 'data' event after it has recieved 'end' var Stream = require('stream').Stream , es = exports , through = require('through') , from = require('from') , duplex = require('duplexer') , map = require('map-stream') , pause = require('pause-stream') , split = require('split') , pipeline = require('stream-combiner') , immediately = global.setImmediate || process.nextTick; es.Stream = Stream //re-export Stream from core es.through = through es.from = from es.duplex = duplex es.map = map es.pause = pause es.split = split es.pipeline = es.connect = es.pipe = pipeline // merge / concat // // combine multiple streams into a single stream. // will emit end only once es.concat = //actually this should be called concat es.merge = function (/*streams...*/) { var toMerge = [].slice.call(arguments) if (toMerge.length === 1 && (toMerge[0] instanceof Array)) { toMerge = toMerge[0] //handle array as arguments object } var stream = new Stream() stream.setMaxListeners(0) // allow adding more than 11 streams var endCount = 0 stream.writable = stream.readable = true if (toMerge.length) { toMerge.forEach(function (e) { e.pipe(stream, {end: false}) var ended = false e.on('end', function () { if(ended) return ended = true endCount ++ if(endCount == toMerge.length) stream.emit('end') }) }) } else { process.nextTick(function () { stream.emit('end') }) } stream.write = function (data) { this.emit('data', data) } stream.destroy = function () { toMerge.forEach(function (e) { if(e.destroy) e.destroy() }) } return stream } // writable stream, collects all events into an array // and calls back when 'end' occurs // mainly I'm using this to test the other functions es.writeArray = function (done) { if ('function' !== typeof done) throw new Error('function writeArray (done): done must be function') var a = new Stream () , array = [], isDone = false a.write = function (l) { array.push(l) } a.end = function () { isDone = true done(null, array) } a.writable = true a.readable = false a.destroy = function () { a.writable = a.readable = false if(isDone) return done(new Error('destroyed before end'), array) } return a } //return a Stream that reads the properties of an object //respecting pause() and resume() es.readArray = function (array) { var stream = new Stream() , i = 0 , paused = false , ended = false stream.readable = true stream.writable = false if(!Array.isArray(array)) throw new Error('event-stream.read expects an array') stream.resume = function () { if(ended) return paused = false var l = array.length while(i < l && !paused && !ended) { stream.emit('data', array[i++]) } if(i == l && !ended) ended = true, stream.readable = false, stream.emit('end') } process.nextTick(stream.resume) stream.pause = function () { paused = true } stream.destroy = function () { ended = true stream.emit('close') } return stream } // // readable (asyncFunction) // return a stream that calls an async function while the stream is not paused. // // the function must take: (count, callback) {... // es.readable = function (func, continueOnError) { var stream = new Stream() , i = 0 , paused = false , ended = false , reading = false stream.readable = true stream.writable = false if('function' !== typeof func) throw new Error('event-stream.readable expects async function') stream.on('end', function () { ended = true }) function get (err, data) { if(err) { stream.emit('error', err) if(!continueOnError) stream.emit('end') } else if (arguments.length > 1) stream.emit('data', data) immediately(function () { if(ended || paused || reading) return try { reading = true func.call(stream, i++, function () { reading = false get.apply(null, arguments) }) } catch (err) { stream.emit('error', err) } }) } stream.resume = function () { paused = false get() } process.nextTick(get) stream.pause = function () { paused = true } stream.destroy = function () { stream.emit('end') stream.emit('close') ended = true } return stream } // // map sync // es.mapSync = function (sync) { return es.through(function write(data) { var mappedData try { mappedData = sync(data) } catch (err) { return this.emit('error', err) } if (mappedData !== undefined) this.emit('data', mappedData) }) } // // log just print out what is coming through the stream, for debugging // es.log = function (name) { return es.through(function (data) { var args = [].slice.call(arguments) if(name) console.error(name, data) else console.error(data) this.emit('data', data) }) } // // child -- pipe through a child process // es.child = function (child) { return es.duplex(child.stdin, child.stdout) } // // parse // // must be used after es.split() to ensure that each chunk represents a line // source.pipe(es.split()).pipe(es.parse()) es.parse = function (options) { var emitError = !!(options ? options.error : false) return es.through(function (data) { var obj try { if(data) //ignore empty lines obj = JSON.parse(data.toString()) } catch (err) { if (emitError) return this.emit('error', err) return console.error(err, 'attempting to parse:', data) } //ignore lines that where only whitespace. if(obj !== undefined) this.emit('data', obj) }) } // // stringify // es.stringify = function () { var Buffer = require('buffer').Buffer return es.mapSync(function (e){ return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n' }) } // // replace a string within a stream. // // warn: just concatenates the string and then does str.split().join(). // probably not optimal. // for smallish responses, who cares? // I need this for shadow-npm so it's only relatively small json files. es.replace = function (from, to) { return es.pipeline(es.split(from), es.join(to)) } // // join chunks with a joiner. just like Array#join // also accepts a callback that is passed the chunks appended together // this is still supported for legacy reasons. // es.join = function (str) { //legacy api if('function' === typeof str) return es.wait(str) var first = true return es.through(function (data) { if(!first) this.emit('data', str) first = false this.emit('data', data) return true }) } // // wait. callback when 'end' is emitted, with all chunks appended as string. // es.wait = function (callback) { var arr = [] return es.through(function (data) { arr.push(data) }, function () { var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr) : arr.join('') this.emit('data', body) this.emit('end') if(callback) callback(null, body) }) } es.pipeable = function () { throw new Error('[EVENT-STREAM] es.pipeable is deprecated') }