var fs = require('fs'); var util = require('util'); var stream = require('stream'); var Readable = stream.Readable; var Writable = stream.Writable; var PassThrough = stream.PassThrough; var Pend = require('pend'); var EventEmitter = require('events').EventEmitter; exports.createFromBuffer = createFromBuffer; exports.createFromFd = createFromFd; exports.BufferSlicer = BufferSlicer; exports.FdSlicer = FdSlicer; util.inherits(FdSlicer, EventEmitter); function FdSlicer(fd, options) { options = options || {}; EventEmitter.call(this); this.fd = fd; this.pend = new Pend(); this.pend.max = 1; this.refCount = 0; this.autoClose = !!options.autoClose; } FdSlicer.prototype.read = function(buffer, offset, length, position, callback) { var self = this; self.pend.go(function(cb) { fs.read(self.fd, buffer, offset, length, position, function(err, bytesRead, buffer) { cb(); callback(err, bytesRead, buffer); }); }); }; FdSlicer.prototype.write = function(buffer, offset, length, position, callback) { var self = this; self.pend.go(function(cb) { fs.write(self.fd, buffer, offset, length, position, function(err, written, buffer) { cb(); callback(err, written, buffer); }); }); }; FdSlicer.prototype.createReadStream = function(options) { return new ReadStream(this, options); }; FdSlicer.prototype.createWriteStream = function(options) { return new WriteStream(this, options); }; FdSlicer.prototype.ref = function() { this.refCount += 1; }; FdSlicer.prototype.unref = function() { var self = this; self.refCount -= 1; if (self.refCount > 0) return; if (self.refCount < 0) throw new Error("invalid unref"); if (self.autoClose) { fs.close(self.fd, onCloseDone); } function onCloseDone(err) { if (err) { self.emit('error', err); } else { self.emit('close'); } } }; util.inherits(ReadStream, Readable); function ReadStream(context, options) { options = options || {}; Readable.call(this, options); this.context = context; this.context.ref(); this.start = options.start || 0; this.endOffset = options.end; this.pos = this.start; this.destroyed = false; } ReadStream.prototype._read = function(n) { var self = this; if (self.destroyed) return; var toRead = Math.min(self._readableState.highWaterMark, n); if (self.endOffset != null) { toRead = Math.min(toRead, self.endOffset - self.pos); } if (toRead <= 0) { self.destroyed = true; self.push(null); self.context.unref(); return; } self.context.pend.go(function(cb) { if (self.destroyed) return cb(); var buffer = new Buffer(toRead); fs.read(self.context.fd, buffer, 0, toRead, self.pos, function(err, bytesRead) { if (err) { self.destroy(err); } else if (bytesRead === 0) { self.destroyed = true; self.push(null); self.context.unref(); } else { self.pos += bytesRead; self.push(buffer.slice(0, bytesRead)); } cb(); }); }); }; ReadStream.prototype.destroy = function(err) { if (this.destroyed) return; err = err || new Error("stream destroyed"); this.destroyed = true; this.emit('error', err); this.context.unref(); }; util.inherits(WriteStream, Writable); function WriteStream(context, options) { options = options || {}; Writable.call(this, options); this.context = context; this.context.ref(); this.start = options.start || 0; this.endOffset = (options.end == null) ? Infinity : +options.end; this.bytesWritten = 0; this.pos = this.start; this.destroyed = false; this.on('finish', this.destroy.bind(this)); } WriteStream.prototype._write = function(buffer, encoding, callback) { var self = this; if (self.destroyed) return; if (self.pos + buffer.length > self.endOffset) { var err = new Error("maximum file length exceeded"); err.code = 'ETOOBIG'; self.destroy(); callback(err); return; } self.context.pend.go(function(cb) { if (self.destroyed) return cb(); fs.write(self.context.fd, buffer, 0, buffer.length, self.pos, function(err, bytes) { if (err) { self.destroy(); cb(); callback(err); } else { self.bytesWritten += bytes; self.pos += bytes; self.emit('progress'); cb(); callback(); } }); }); }; WriteStream.prototype.destroy = function() { if (this.destroyed) return; this.destroyed = true; this.context.unref(); }; util.inherits(BufferSlicer, EventEmitter); function BufferSlicer(buffer, options) { EventEmitter.call(this); options = options || {}; this.refCount = 0; this.buffer = buffer; this.maxChunkSize = options.maxChunkSize || Number.MAX_SAFE_INTEGER; } BufferSlicer.prototype.read = function(buffer, offset, length, position, callback) { var end = position + length; var delta = end - this.buffer.length; var written = (delta > 0) ? delta : length; this.buffer.copy(buffer, offset, position, end); setImmediate(function() { callback(null, written); }); }; BufferSlicer.prototype.write = function(buffer, offset, length, position, callback) { buffer.copy(this.buffer, position, offset, offset + length); setImmediate(function() { callback(null, length, buffer); }); }; BufferSlicer.prototype.createReadStream = function(options) { options = options || {}; var readStream = new PassThrough(options); readStream.destroyed = false; readStream.start = options.start || 0; readStream.endOffset = options.end; // by the time this function returns, we'll be done. readStream.pos = readStream.endOffset || this.buffer.length; // respect the maxChunkSize option to slice up the chunk into smaller pieces. var entireSlice = this.buffer.slice(readStream.start, readStream.pos); var offset = 0; while (true) { var nextOffset = offset + this.maxChunkSize; if (nextOffset >= entireSlice.length) { // last chunk if (offset < entireSlice.length) { readStream.write(entireSlice.slice(offset, entireSlice.length)); } break; } readStream.write(entireSlice.slice(offset, nextOffset)); offset = nextOffset; } readStream.end(); readStream.destroy = function() { readStream.destroyed = true; }; return readStream; }; BufferSlicer.prototype.createWriteStream = function(options) { var bufferSlicer = this; options = options || {}; var writeStream = new Writable(options); writeStream.start = options.start || 0; writeStream.endOffset = (options.end == null) ? this.buffer.length : +options.end; writeStream.bytesWritten = 0; writeStream.pos = writeStream.start; writeStream.destroyed = false; writeStream._write = function(buffer, encoding, callback) { if (writeStream.destroyed) return; var end = writeStream.pos + buffer.length; if (end > writeStream.endOffset) { var err = new Error("maximum file length exceeded"); err.code = 'ETOOBIG'; writeStream.destroyed = true; callback(err); return; } buffer.copy(bufferSlicer.buffer, writeStream.pos, 0, buffer.length); writeStream.bytesWritten += buffer.length; writeStream.pos = end; writeStream.emit('progress'); callback(); }; writeStream.destroy = function() { writeStream.destroyed = true; }; return writeStream; }; BufferSlicer.prototype.ref = function() { this.refCount += 1; }; BufferSlicer.prototype.unref = function() { this.refCount -= 1; if (this.refCount < 0) { throw new Error("invalid unref"); } }; function createFromBuffer(buffer, options) { return new BufferSlicer(buffer, options); } function createFromFd(fd, options) { return new FdSlicer(fd, options); }