//filter will reemit the data if cb(err,pass) pass is truthy // reduce is more tricky // maybe we want to group the reductions or emit progress updates occasionally // the most basic reduce just emits one 'data' event after it has recieved 'end' var through = require('through') var Decoder = require('string_decoder').StringDecoder module.exports = split //TODO pass in a function to map across the lines. function split (matcher, mapper, options) { var decoder = new Decoder() var soFar = '' var maxLength = options && options.maxLength; if('function' === typeof matcher) mapper = matcher, matcher = null if (!matcher) matcher = /\r?\n/ function emit(stream, piece) { if(mapper) { try { piece = mapper(piece) } catch (err) { return stream.emit('error', err) } if('undefined' !== typeof piece) stream.queue(piece) } else stream.queue(piece) } function next (stream, buffer) { var pieces = ((soFar != null ? soFar : '') + buffer).split(matcher) soFar = pieces.pop() if (maxLength && soFar.length > maxLength) stream.emit('error', new Error('maximum buffer reached')) for (var i = 0; i < pieces.length; i++) { var piece = pieces[i] emit(stream, piece) } } return through(function (b) { next(this, decoder.write(b)) }, function () { if(decoder.end) next(this, decoder.end()) if(soFar != null) emit(this, soFar) this.queue(null) }) }