byline = function (readStream, options) { return module.exports.createStream(readStream, options); }
n/a
function LineStream(options) { stream.Transform.call(this, options); options = options || {}; // use objectMode to stop the output from being buffered // which re-concatanates the lines, just without newlines. this._readableState.objectMode = true; this._lineBuffer = []; this._keepEmptyLines = options.keepEmptyLines || false; this._lastChunkEndedWithCR = false; // take the source's encoding if we don't have one var self = this; this.on('pipe', function(src) { if (!self.encoding) { // but we can't do this for old-style streams if (src instanceof stream.Readable) { self.encoding = src._readableState.encoding; } } }); }
n/a
createLineStream = function (readStream) { console.log('WARNING: byline#createLineStream is deprecated and will be removed soon'); return createLineStream(readStream); }
n/a
createStream = function (readStream, options) { if (readStream) { return createLineStream(readStream, options); } else { return new LineStream(options); } }
...
You just need to add one line to wrap your readable `Stream` with a `LineStream`.
```javascript
var fs = require('fs'),
byline = require('byline');
var stream = fs.createReadStream('sample.txt');
stream = byline.createStream(stream);
stream.on('data', function(line) {
console.log(line);
});
```
# Piping
...
function LineStream(options) { stream.Transform.call(this, options); options = options || {}; // use objectMode to stop the output from being buffered // which re-concatanates the lines, just without newlines. this._readableState.objectMode = true; this._lineBuffer = []; this._keepEmptyLines = options.keepEmptyLines || false; this._lastChunkEndedWithCR = false; // take the source's encoding if we don't have one var self = this; this.on('pipe', function(src) { if (!self.encoding) { // but we can't do this for old-style streams if (src instanceof stream.Readable) { self.encoding = src._readableState.encoding; } } }); }
n/a
function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); Duplex.call(this, options); this._transformState = new TransformState(this); var stream = this; // start out asking for a readable event once data is transformed. this._readableState.needReadable = true; // we have implemented the _read method, and done the other things // that Readable wants before the first _read call, so unset the // sync guard flag. this._readableState.sync = false; if (options) { if (typeof options.transform === 'function') this._transform = options.transform; if (typeof options.flush === 'function') this._flush = options.flush; } // When the writable side finishes, then flush out anything remaining. this.once('prefinish', function() { if (typeof this._flush === 'function') this._flush(function(er) { done(stream, er); }); else done(stream); }); }
n/a
_flush = function (done) { this._pushBuffer(this._chunkEncoding, 0, done); }
n/a
_pushBuffer = function (encoding, keep, done) { // always buffer the last (possibly partial) line while (this._lineBuffer.length > keep) { var line = this._lineBuffer.shift(); // skip empty lines if (this._keepEmptyLines || line.length > 0 ) { if (!this.push(this._reencode(line, encoding))) { // when the high-water mark is reached, defer pushes until the next tick var self = this; timers.setImmediate(function() { self._pushBuffer(encoding, keep, done); }); return; } } } done(); }
...
if (this._lineBuffer.length > 0) {
this._lineBuffer[this._lineBuffer.length - 1] += lines[0];
lines.shift();
}
this._lastChunkEndedWithCR = chunk[chunk.length - 1] == '\r';
this._lineBuffer = this._lineBuffer.concat(lines);
this._pushBuffer(encoding, 1, done);
};
LineStream.prototype._pushBuffer = function(encoding, keep, done) {
// always buffer the last (possibly partial) line
while (this._lineBuffer.length > keep) {
var line = this._lineBuffer.shift();
// skip empty lines
...
_reencode = function (line, chunkEncoding) { if (this.encoding && this.encoding != chunkEncoding) { return new Buffer(line, chunkEncoding).toString(this.encoding); } else if (this.encoding) { // this should be the most common case, i.e. we're using an encoded source stream return line; } else { return new Buffer(line, chunkEncoding); } }
...
LineStream.prototype._pushBuffer = function(encoding, keep, done) {
// always buffer the last (possibly partial) line
while (this._lineBuffer.length > keep) {
var line = this._lineBuffer.shift();
// skip empty lines
if (this._keepEmptyLines || line.length > 0 ) {
if (!this.push(this._reencode(line, encoding))) {
// when the high-water mark is reached, defer pushes until the next tick
var self = this;
timers.setImmediate(function() {
self._pushBuffer(encoding, keep, done);
});
return;
}
...
_transform = function (chunk, encoding, done) { // decode binary chunks as UTF-8 encoding = encoding || 'utf8'; if (Buffer.isBuffer(chunk)) { if (encoding == 'buffer') { chunk = chunk.toString(); // utf8 encoding = 'utf8'; } else { chunk = chunk.toString(encoding); } } this._chunkEncoding = encoding; // see: http://www.unicode.org/reports/tr18/#Line_Boundaries var lines = chunk.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g); // don't split CRLF which spans chunks if (this._lastChunkEndedWithCR && chunk[0] == '\n') { lines.shift(); } if (this._lineBuffer.length > 0) { this._lineBuffer[this._lineBuffer.length - 1] += lines[0]; lines.shift(); } this._lastChunkEndedWithCR = chunk[chunk.length - 1] == '\r'; this._lineBuffer = this._lineBuffer.concat(lines); this._pushBuffer(encoding, 1, done); }
n/a