function csv() {
return parser.apply(void 0, arguments);
}n/a
function createWriteStream(options) {
return new CsvTransformStream(options);
}...
* `data`: Emitted with the object or `stringified` version if the `objectMode` is set to `false`.
**`([options])` or `.parse(options)`**
If you use `fast-csv` as a function it returns a transform stream that can be piped into.
```javascript
var stream = fs.createReadStream("my.csv");
var csvStream = csv()
.on("data", function(data){
console.log(data);
})
.on("end", function(){
console.log("done");
...function createWriteStream(options) {
return new CsvTransformStream(options);
}...
### Formatting Functions
**`createWriteStream(options)` or `.format(options)`**
This is the lowest level of the write methods, it creates a stream that can be used to create a CSV of unknown size and pipe to
an output CSV.
```javascript
var csvStream = csv.createWriteStream({headers: true}),
writableStream = fs.createWriteStream("my.csv");
writableStream.on("finish", function(){
console.log("DONE!");
});
csvStream.pipe(writableStream);
...function _extender(obj) {
var ret = obj, i, l;
if (!(obj instanceof Base)) {
var OurBase = Base;
for (i = 0, l = defined.length; i < l; i++) {
var definer = defined[i];
if (definer[0](obj)) {
OurBase = OurBase.extend({instance: definer[1]});
}
}
ret = new OurBase(obj);
ret["__extender__"] = _extender;
}
return ret;
}n/a
function createWriteStream(options) {
return new CsvTransformStream(options);
}...
//Generated CSV
//a,a,b,b,c,c
//a1,a2,b1,b2,c1,c2
```
### Formatting Functions
**`createWriteStream(options)` or `.format(options)`**
This is the lowest level of the write methods, it creates a stream that can be used to create a CSV of unknown size and pipe to
an output CSV.
```javascript
var csvStream = csv.createWriteStream({headers: true}),
writableStream = fs.createWriteStream("my.csv");
...function CsvTransformStream(options) {
options = options || {};
options.objectMode = true;
if (has(options, "transform")) {
// remove so its not set to _transform in Transform constructor
options.consumerTransform = options.transform;
delete options.transform;
}
Transform.call(this, options);
this.formatter = createFormatter(options, this);
this.rowDelimiter = options.rowDelimiter || "\n";
var hasHeaders = has(options, "headers") ? !!options.headers : null,
headers = (hasHeaders && isArray(options.headers)) ? options.headers : null;
this.hasHeaders = hasHeaders;
this.headers = headers;
if (hasHeaders) {
if (headers) {
this.parsedHeaders = true;
this.headersLength = headers.length;
} else {
this.parsedHeaders = false;
}
}
this.hasWrittenHeaders = hasHeaders ? false : true;
this.includeEndRowDelimiter = !!options.includeEndRowDelimiter,
has(options, "consumerTransform") && this.transform(options.consumerTransform);
}n/a
function fromPath(location, options) {
return fs.createReadStream(location).pipe(new ParserStream(options));
}...
})
.on("end", function () {
console.log("done");
});
```
**`.fromPath(path[, options])`**
This method parses a file from the specified path.
```javascript
var csv = require("fast-csv");
csv
...function fromStream(stream, options) {
return stream.pipe(new ParserStream(options));
}...
console.log(data);
})
.on("end", function(){
console.log("done");
});
```
**`.fromStream(stream[, options])`**
This accepted a readable stream to parse data from.
```javascript
var stream = fs.createReadStream("my.csv");
csv
...function fromString(string, options) {
var rs = new stream.Readable();
rs.push(string);
rs.push(null);
return rs.pipe(new ParserStream(options));
}...
console.log(data);
})
.on("end", function(){
console.log("done");
});
```
**`.fromString(string[, options])`**
This method parses a string
```javascript
var csv = require("fast-csv");
var CSV_STRING = 'a,b\n' +
...function csv() {
return parser.apply(void 0, arguments);
}...
**events**
* `data`: Emitted when a record is parsed.
* `data-invalid`: Emitted if there was invalid row encounted, **only emitted if the `validate` function is used or `strictColumnHandling
=true`**.
* `data`: Emitted with the object or `stringified` version if the `objectMode` is set to `false`.
**`([options])` or `.parse(options)`**
If you use `fast-csv` as a function it returns a transform stream that can be piped into.
```javascript
var stream = fs.createReadStream("my.csv");
var csvStream = csv()
...function ParserStream(options) {
options = options || {};
options.objectMode = extended.has(options, "objectMode") ? options.objectMode : true;
stream.Transform.call(this, options);
this.lines = "";
this.decoder = new StringDecoder();
this._parsedHeaders = false;
this._rowCount = -1;
this._emitData = false;
var delimiter;
if (extended.has(options, "delimiter")) {
delimiter = options.delimiter;
if (delimiter.length > 1) {
throw new Error("delimiter option must be one character long");
}
delimiter = extended.escape(delimiter);
} else {
delimiter = DEFAULT_DELIMITER;
}
options.delimiter = delimiter;
this.parser = createParser(options);
this._headers = options.headers;
this._renameHeaders = options.renameHeaders;
this._ignoreEmpty = options.ignoreEmpty;
this._discardUnmappedColumns = options.discardUnmappedColumns;
this._strictColumnHandling = options.strictColumnHandling;
this.__objectMode = options.objectMode;
this.__buffered = [];
return this;
}n/a
function write(arr, options, ws) {
var csvStream = createWriteStream(options), i = -1, l = arr.length;
extended.asyncEach(arr, function (item, cb) {
csvStream.write(item, null, cb);
}, function (err) {
if (err) {
csvStream.emit("error", err);
} else {
csvStream.end();
}
});
return csvStream;
}...
var fileStream = fs.createReadStream("my.csv"),
parser = fastCsv();
fileStream
.on("readable", function () {
var data;
while ((data = fileStream.read()) !== null) {
parser.write(data);
}
})
.on("end", function () {
parser.end();
});
parser
...function writeToBuffer(arr, options, cb) {
if (extended.isFunction(options)) {
cb = options;
options = {};
}
var ws = new stream.Writable(), buffers = [], l = 0;
ws._write = function (data, enc, cb) {
buffers.push(data);
l++;
cb();
};
ws
.on("error", cb)
.on("finish", function () {
cb(null, Buffer.concat(buffers));
});
write(arr, options).pipe(ws);
}n/a
function writeToPath(path, arr, options) {
var stream = fs.createWriteStream(path, {encoding: "utf8"});
return write(arr, options).pipe(stream);
}...
**`writeToPath(path, arr[, options])`**
Write an array of values to the specified path
```javascript
csv
.writeToPath("my.csv", [
["a", "b"],
["a1", "b1"],
["a2", "b2"]
], {headers: true})
.on("finish", function(){
console.log("done!");
});
...function writeToStream(ws, arr, options) {
return write(arr, options).pipe(ws);
}...
**`writeToStream(stream, arr[, options])`**
Write an array of values to a `WritableStream`
```javascript
csv
.writeToStream(fs.createWriteStream("my.csv"), [
["a", "b"],
["a1", "b1"],
["a2", "b2"]
], {headers: true});
```
```javascript
...function writeToString(arr, options, cb) {
if (extended.isFunction(options)) {
cb = options;
options = {};
}
var ws = new stream.Writable(), written = [];
ws._write = function (data, enc, cb) {
written.push(data + "");
cb();
};
ws
.on("error", cb)
.on("finish", function () {
cb(null, written.join(""));
});
write(arr, options).pipe(ws);
}...
console.log("done!");
});
```
**`writeToString(arr[, options], cb)`**
```javascript
csv.writeToString(
[
["a", "b"],
["a1", "b1"],
["a2", "b2"]
],
{headers: true},
function(err, data){
...function _extender(obj) {
var ret = obj, i, l;
if (!(obj instanceof Base)) {
var OurBase = Base;
for (i = 0, l = defined.length; i < l; i++) {
var definer = defined[i];
if (definer[0](obj)) {
OurBase = OurBase.extend({instance: definer[1]});
}
}
ret = new OurBase(obj);
ret["__extender__"] = _extender;
}
return ret;
}n/a
asyncEach = function (arr, iter, cb) {
(function asyncIterator(i, l, rows, cb) {
if (++i < l) {
iter(rows[i], function (err) {
if (err) {
cb(err);
} else {
if ((i % 100) === 0) {
//dont overflow the stack
setImmediate(function () {
asyncIterator(i, l, rows, cb);
});
} else {
asyncIterator(i, l, rows, cb);
}
}
});
} else {
//get out of stack
cb(null, arr);
}
}(-1, arr.length, arr, cb));
}...
function createWriteStream(options) {
return new CsvTransformStream(options);
}
function write(arr, options, ws) {
var csvStream = createWriteStream(options), i = -1, l = arr.length;
extended.asyncEach(arr, function (item, cb) {
csvStream.write(item, null, cb);
}, function (err) {
if (err) {
csvStream.emit("error", err);
} else {
csvStream.end();
}
...function contains(arr, obj) {
return isIn(obj, arr);
}n/a
function containsAt(arr, obj, index) {
if (isArray(arr) && arr.length > index) {
return isEq(arr[index], obj);
}
return false;
}n/a
function deepEqual(actual, expected) {
// 7.1. All identical values are equivalent, as determined by ===.
if (actual === expected) {
return true;
} else if (typeof Buffer !== "undefined" && Buffer.isBuffer(actual) && Buffer.isBuffer(expected)) {
if (actual.length !== expected.length) {
return false;
}
for (var i = 0; i < actual.length; i++) {
if (actual[i] !== expected[i]) {
return false;
}
}
return true;
// 7.2. If the expected value is a Date object, the actual value is
// equivalent if it is also a Date object that refers to the same time.
} else if (isDate(actual) && isDate(expected)) {
return actual.getTime() === expected.getTime();
// 7.3 If the expected value is a RegExp object, the actual value is
// equivalent if it is also a RegExp object with the same source and
// properties (`global`, `multiline`, `lastIndex`, `ignoreCase`).
} else if (isRegExp(actual) && isRegExp(expected)) {
return actual.source === expected.source &&
actual.global === expected.global &&
actual.multiline === expected.multiline &&
actual.lastIndex === expected.lastIndex &&
actual.ignoreCase === expected.ignoreCase;
// 7.4. Other pairs that do not both pass typeof value == 'object',
// equivalence is determined by ==.
} else if (isString(actual) && isString(expected) && actual !== expected) {
return false;
} else if (typeof actual !== 'object' && typeof expected !== 'object') {
return actual === expected;
// 7.5 For all other Object pairs, including Array objects, equivalence is
// determined by having the same number of owned properties (as verified
// with Object.prototype.hasOwnProperty.call), the same set of keys
// (although not necessarily the same order), equivalent values for every
// corresponding key, and an identical 'prototype' property. Note: this
// accounts for both named and indexed properties on Arrays.
} else {
return objEquiv(actual, expected);
}
}n/a
function deepMerge(obj) {
if (!obj) {
obj = {};
}
for (var i = 1, l = arguments.length; i < l; i++) {
_deepMerge(obj, arguments[i]);
}
return obj; // Object
}n/a
function define(tester, decorate) {
if (arguments.length) {
if (typeof tester === "object") {
decorate = tester;
tester = always;
}
decorate = decorate || {};
var proto = {};
decorateProto(proto, decorate);
//handle browsers like which skip over the constructor while looping
if (!proto.hasOwnProperty("constructor")) {
if (decorate.hasOwnProperty("constructor")) {
addMethod(proto, "constructor", decorate.constructor);
} else {
proto.constructor = function () {
this._super(arguments);
};
}
}
defined.push([tester, proto]);
}
return _extender;
}n/a
function escape(str, except) {
return str.replace(/([\.$?*|{}\(\)\[\]\\\/\+^])/g, function (ch) {
if (except && arr.indexOf(except, ch) !== -1) {
return ch;
}
return "\\" + ch;
});
}...
this._emitData = false;
var delimiter;
if (extended.has(options, "delimiter")) {
delimiter = options.delimiter;
if (delimiter.length > 1) {
throw new Error("delimiter option must be one character long");
}
delimiter = extended.escape(delimiter);
} else {
delimiter = DEFAULT_DELIMITER;
}
options.delimiter = delimiter;
this.parser = createParser(options);
this._headers = options.headers;
this._renameHeaders = options.renameHeaders;
...function expose() {
var methods;
for (var i = 0, l = arguments.length; i < l; i++) {
methods = arguments[i];
if (typeof methods === "object") {
merge(_extender, methods, ["define", "extend", "expose", "__defined__"]);
}
}
return _extender;
}n/a
function extend(supr) {
if (supr && supr.hasOwnProperty("__defined__")) {
_extender["__defined__"] = defined = defined.concat(supr["__defined__"]);
}
merge(_extender, supr, ["define", "extend", "expose", "__defined__"]);
return _extender;
}...
}
this.hasWrittenHeaders = hasHeaders ? false : true;
this.includeEndRowDelimiter = !!options.includeEndRowDelimiter,
has(options, "consumerTransform") && this.transform(options.consumerTransform);
}
util.inherits(CsvTransformStream, Transform);
extended(CsvTransformStream).extend({
headers: null,
headersLength: 0,
totalCount: 0,
...function format(str, obj) {
if (obj instanceof Array) {
var i = 0, len = obj.length;
//find the matches
return str.replace(FORMAT_REGEX, function (m, format, type) {
var replacer, ret;
if (i < len) {
replacer = obj[i++];
} else {
//we are out of things to replace with so
//just return the match?
return m;
}
if (m === "%s" || m === "%d" || m === "%D") {
//fast path!
ret = replacer + "";
} else if (m === "%Z") {
ret = replacer.toUTCString();
} else if (m === "%j") {
try {
ret = stringify(replacer);
} catch (e) {
throw new Error("stringExtended.format : Unable to parse json from ", replacer);
}
} else {
format = format.replace(/^\[|\]$/g, "");
switch (type) {
case "s":
ret = formatString(replacer, format);
break;
case "d":
ret = formatNumber(replacer, format);
break;
case "j":
ret = formatObject(replacer, format);
break;
case "D":
ret = date.format(replacer, format);
break;
case "Z":
ret = date.format(replacer, format, true);
break;
}
}
return ret;
});
} else if (isHash(obj)) {
return str.replace(INTERP_REGEX, function (m, format, value) {
value = obj[value];
if (!is.isUndefined(value)) {
if (format) {
if (is.isString(value)) {
return formatString(value, format);
} else if (is.isNumber(value)) {
return formatNumber(value, format);
} else if (is.isDate(value)) {
return date.format(value, format);
} else if (is.isObject(value)) {
return formatObject(value, format);
}
} else {
return "" + value;
}
}
return m;
});
} else {
var args = aSlice.call(arguments).slice(1);
return format(str, args);
}
}...
//Generated CSV
//a,a,b,b,c,c
//a1,a2,b1,b2,c1,c2
```
### Formatting Functions
**`createWriteStream(options)` or `.format(options)`**
This is the lowest level of the write methods, it creates a stream that can be used to create a CSV of unknown size and pipe to
an output CSV.
```javascript
var csvStream = csv.createWriteStream({headers: true}),
writableStream = fs.createWriteStream("my.csv");
...function has(obj, prop) {
return hasOwn.call(obj, prop);
}...
createParser = require("./parser"),
fs = require("fs"),
StringDecoder = require('string_decoder').StringDecoder,
hasIsPaused = !!stream.Transform.prototype.isPaused;
function ParserStream(options) {
options = options || {};
options.objectMode = extended.has(options, "objectMode") ? options.objectMode
: true;
stream.Transform.call(this, options);
this.lines = "";
this.decoder = new StringDecoder();
this._parsedHeaders = false;
this._rowCount = -1;
this._emitData = false;
var delimiter;
...function isInstanceOf(obj, clazz) {
if (isFunction(clazz)) {
return obj instanceof clazz;
} else {
return false;
}
}n/a
function _isArguments(object) {
return toStr.call(object) === '[object Arguments]';
}n/a
function isArray() { [native code] }...
discardUnmappedColumns = this._discardUnmappedColumns,
strictColumnHandling = this._strictColumnHandling,
self = this;
function headerHandler(err, headers) {
if (err) {
cb(err);
} else if (extended.isArray(headers)) {
var headersLength = headers.length,
orig = self.__transform;
self.__transform = function (data, cb) {
var ret = {}, i = -1, val;
if (data.length > headersLength) {
if (discardUnmappedColumns) {
data.splice(headersLength);
...function isBoolean(obj) {
return obj === true || obj === false || toStr.call(obj) === "[object Boolean]";
}...
__endEmitted: false,
__emittedData: false,
__handleLine: function __parseLineData(line, index, ignore, next) {
var ignoreEmpty = this._ignoreEmpty, self = this;
if (extended.isBoolean(ignoreEmpty) && ignoreEmpty && (!line ||
EMPTY.test(line.join("")))) {
return next(null, null);
}
if (!ignore) {
this.__transform(line, function (err, line) {
if (err) {
next(err);
} else {
...function isDate(obj) {
return toStr.call(obj) === '[object Date]';
}n/a
function isDefined(obj) {
return !isUndefined(obj);
}n/a
function isEmpty(str) {
return str.length === 0;
}n/a
function isEq(obj, obj2) {
/*jshint eqeqeq:false*/
return obj == obj2;
}n/a
function isFalse(obj) {
return obj === false;
}n/a
isFunction = function (obj) {
return toStr.call(obj) === '[object Function]';
}...
}
});
},
__transform: defaultTransform,
transform: function (cb) {
if (!extended.isFunction(cb)) {
this.emit("error", new TypeError("fast-csv.FormatterStream#transform requires a function"));
}
if (cb.length === 2) {
this.__transform = cb;
} else {
this.__transform = function (data, next) {
next(null, cb(data));
...function isGt(obj, obj2) {
return obj > obj2;
}n/a
function isGte(obj, obj2) {
return obj >= obj2;
}n/a
function isHash(obj) {
var ret = isObject(obj);
return ret && obj.constructor === Object && !obj.nodeType && !obj.setInterval;
}n/a
function isIn(obj, arr) {
if ((isArray(arr) && Array.prototype.indexOf) || isString(arr)) {
return arr.indexOf(obj) > -1;
} else if (isArray(arr)) {
for (var i = 0, l = arr.length; i < l; i++) {
if (isEq(obj, arr[i])) {
return true;
}
}
}
return false;
}n/a
function length(obj, l) {
if (has(obj, "length")) {
return obj.length === l;
}
return false;
}n/a
function isLike(obj, reg) {
if (isString(reg)) {
return ("" + obj).match(reg) !== null;
} else if (isRegExp(reg)) {
return reg.test(obj);
}
return false;
}n/a
function isLt(obj, obj2) {
return obj < obj2;
}n/a
function isLte(obj, obj2) {
return obj <= obj2;
}n/a
function isNeq(obj, obj2) {
/*jshint eqeqeq:false*/
return obj != obj2;
}n/a
function isNotIn(obj, arr) {
return !isIn(obj, arr);
}n/a
function notLength(obj, l) {
if (has(obj, "length")) {
return obj.length !== l;
}
return false;
}n/a
function isNotLike(obj, reg) {
return !isLike(obj, reg);
}n/a
function isNotNull(obj) {
return !isNull(obj);
}n/a
function isNull(obj) {
return obj === null;
}n/a
function isNumber(obj) {
return toStr.call(obj) === '[object Number]';
}n/a
function isObject(obj) {
var undef;
return obj !== null && typeof obj === "object";
}n/a
function isRegExp(obj) {
return toStr.call(obj) === '[object RegExp]';
}n/a
function isSeq(obj, obj2) {
return obj === obj2;
}n/a
function isSneq(obj, obj2) {
return obj !== obj2;
}n/a
function isString(obj) {
return toStr.call(obj) === '[object String]';
}n/a
function isTrue(obj) {
return obj === true;
}n/a
function isUndefined(obj) {
return typeof obj === 'undefined';
}n/a
function isUndefinedOrNull(obj) {
return isUndefined(obj) || isNull(obj);
}n/a
keys = function (obj) {
var ret = [];
for (var i in obj) {
if (hasOwn.call(obj, i)) {
ret.push(i);
}
}
return ret;
}n/a
function merge(obj) {
if (!obj) {
obj = {};
}
for (var i = 1, l = arguments.length; i < l; i++) {
_merge(obj, arguments[i]);
}
return obj; // Object
}n/a
function multiply(str, times) {
var ret = [];
if (times) {
for (var i = 0; i < times; i++) {
ret.push(str);
}
}
return ret.join("");
}n/a
function notContains(arr, obj) {
return !isIn(obj, arr);
}n/a
function notContainsAt(arr, obj, index) {
if (isArray(arr)) {
return !isEq(arr[index], obj);
}
return false;
}n/a
function notHas(obj, prop) {
return !has(obj, prop);
}n/a
function omit(hash, omitted) {
if (!isHash(hash)) {
throw new TypeError();
}
if (isString(omitted)) {
omitted = [omitted];
}
var objKeys = difference(keys(hash), omitted), key, ret = {};
for (var i = 0, len = objKeys.length; i < len; ++i) {
key = objKeys[i];
ret[key] = hash[key];
}
return ret;
}n/a
function pad(string, length, ch, end) {
string = "" + string; //check for numbers
ch = ch || " ";
var strLen = string.length;
while (strLen < length) {
if (end) {
string += ch;
} else {
string = ch + string;
}
strLen++;
}
return string;
}n/a
function register(alias, extendWith) {
if (!extendWith) {
extendWith = alias;
alias = null;
}
var type = typeof extendWith;
if (alias) {
extended[alias] = extendWith;
} else if (extendWith && type === "function") {
extended.extend(extendWith);
} else if (type === "object") {
extended.expose(extendWith);
} else {
throw new TypeError("extended.register must be called with an extender function");
}
return extended;
}...
var is = require("is-extended"),
hasOwn = Object.prototype.hasOwnProperty;
module.exports = require("extended")()
.register(is)
.register(require("object-extended"))
.register(require("string-extended"))
.register("LINE_BREAK", require("os").EOL)
.register("asyncEach", function (arr, iter, cb) {
(function asyncIterator(i, l, rows, cb) {
...function spreadArgs(f, args, scope) {
var ret;
switch ((args || []).length) {
case 0:
ret = f.call(scope);
break;
case 1:
ret = f.call(scope, args[0]);
break;
case 2:
ret = f.call(scope, args[0], args[1]);
break;
case 3:
ret = f.call(scope, args[0], args[1], args[2]);
break;
default:
ret = f.apply(scope, args);
}
return ret;
}n/a
function style(str, options) {
var ret, i, l;
if (options) {
if (is.isArray(str)) {
ret = [];
for (i = 0, l = str.length; i < l; i++) {
ret.push(style(str[i], options));
}
} else if (options instanceof Array) {
ret = str;
for (i = 0, l = options.length; i < l; i++) {
ret = style(ret, options[i]);
}
} else if (options in styles) {
ret = '\x1B[' + styles[options] + 'm' + str + '\x1B[0m';
}
}
return ret;
}n/a
function _extender(obj) {
var ret = obj, i, l;
if (!(obj instanceof Base)) {
var OurBase = Base;
for (i = 0, l = defined.length; i < l; i++) {
var definer = defined[i];
if (definer[0](obj)) {
OurBase = OurBase.extend({instance: definer[1]});
}
}
ret = new OurBase(obj);
ret["__extender__"] = _extender;
}
return ret;
}n/a
function _extender(obj) {
var ret = obj, i, l;
if (!(obj instanceof Base)) {
var OurBase = Base;
for (i = 0, l = defined.length; i < l; i++) {
var definer = defined[i];
if (definer[0](obj)) {
OurBase = OurBase.extend({instance: definer[1]});
}
}
ret = new OurBase(obj);
ret["__extender__"] = _extender;
}
return ret;
}n/a
function toArray(testStr, delim) {
var ret = [];
if (testStr) {
if (testStr.indexOf(delim) > 0) {
ret = testStr.replace(/\s+/g, "").split(delim);
}
else {
ret.push(testStr);
}
}
return ret;
}n/a
function trim(str) {
return str.replace(/^\s*|\s*$/g, "");
}n/a
function trimLeft(str) {
return str.replace(/^\s*/, "");
}n/a
function trimRight(str) {
return str.replace(/\s*$/, "");
}n/a
function truncate(string, length, end) {
var ret = string;
if (is.isString(ret)) {
if (string.length > length) {
if (end) {
var l = string.length;
ret = string.substring(l - length, l);
} else {
ret = string.substring(0, length);
}
}
} else {
ret = truncate("" + ret, length);
}
return ret;
}n/a
function createWriteStream(options) {
return new CsvTransformStream(options);
}...
//Generated CSV
//a,a,b,b,c,c
//a1,a2,b1,b2,c1,c2
```
### Formatting Functions
**`createWriteStream(options)` or `.format(options)`**
This is the lowest level of the write methods, it creates a stream that can be used to create a CSV of unknown size and pipe to
an output CSV.
```javascript
var csvStream = csv.createWriteStream({headers: true}),
writableStream = fs.createWriteStream("my.csv");
...function createWriteStream(options) {
return new CsvTransformStream(options);
}...
### Formatting Functions
**`createWriteStream(options)` or `.format(options)`**
This is the lowest level of the write methods, it creates a stream that can be used to create a CSV of unknown size and pipe to
an output CSV.
```javascript
var csvStream = csv.createWriteStream({headers: true}),
writableStream = fs.createWriteStream("my.csv");
writableStream.on("finish", function(){
console.log("DONE!");
});
csvStream.pipe(writableStream);
...function write(arr, options, ws) {
var csvStream = createWriteStream(options), i = -1, l = arr.length;
extended.asyncEach(arr, function (item, cb) {
csvStream.write(item, null, cb);
}, function (err) {
if (err) {
csvStream.emit("error", err);
} else {
csvStream.end();
}
});
return csvStream;
}...
var fileStream = fs.createReadStream("my.csv"),
parser = fastCsv();
fileStream
.on("readable", function () {
var data;
while ((data = fileStream.read()) !== null) {
parser.write(data);
}
})
.on("end", function () {
parser.end();
});
parser
...function writeToBuffer(arr, options, cb) {
if (extended.isFunction(options)) {
cb = options;
options = {};
}
var ws = new stream.Writable(), buffers = [], l = 0;
ws._write = function (data, enc, cb) {
buffers.push(data);
l++;
cb();
};
ws
.on("error", cb)
.on("finish", function () {
cb(null, Buffer.concat(buffers));
});
write(arr, options).pipe(ws);
}n/a
function writeToPath(path, arr, options) {
var stream = fs.createWriteStream(path, {encoding: "utf8"});
return write(arr, options).pipe(stream);
}...
**`writeToPath(path, arr[, options])`**
Write an array of values to the specified path
```javascript
csv
.writeToPath("my.csv", [
["a", "b"],
["a1", "b1"],
["a2", "b2"]
], {headers: true})
.on("finish", function(){
console.log("done!");
});
...function writeToStream(ws, arr, options) {
return write(arr, options).pipe(ws);
}...
**`writeToStream(stream, arr[, options])`**
Write an array of values to a `WritableStream`
```javascript
csv
.writeToStream(fs.createWriteStream("my.csv"), [
["a", "b"],
["a1", "b1"],
["a2", "b2"]
], {headers: true});
```
```javascript
...function writeToString(arr, options, cb) {
if (extended.isFunction(options)) {
cb = options;
options = {};
}
var ws = new stream.Writable(), written = [];
ws._write = function (data, enc, cb) {
written.push(data + "");
cb();
};
ws
.on("error", cb)
.on("finish", function () {
cb(null, written.join(""));
});
write(arr, options).pipe(ws);
}...
console.log("done!");
});
```
**`writeToString(arr[, options], cb)`**
```javascript
csv.writeToString(
[
["a", "b"],
["a1", "b1"],
["a2", "b2"]
],
{headers: true},
function(err, data){
...function checkHeaders(stream, item) {
var headers, ret = true;
if (!stream.parsedHeaders) {
stream.parsedHeaders = true;
headers = stream.headers = gatherHeaders(item);
stream.headersLength = headers.length;
}
if (!stream.hasWrittenHeaders) {
stream.totalCount++;
stream.push(new Buffer(stream.formatter(stream.headers, true), "utf8"));
stream.hasWrittenHeaders = true;
ret = isHashArray(item) || !isArray(item);
}
return ret;
}n/a
function createFormatter(options, stream) {
options = options || {};
var delimiter = options.delimiter || ",",
ESCAPE_REGEXP = new RegExp("[" + delimiter + escape(options.rowDelimiter || LINE_BREAK) + "']"),
QUOTE = options.quote || '"',
ESCAPE = options.escape || '"',
REPLACE_REGEXP = new RegExp(QUOTE, "g"),
quoteColumns = has(options, "quoteColumns") ? options.quoteColumns : false,
quoteHeaders = has(options, "quoteHeaders") ? options.quoteHeaders : quoteColumns,
shouldQuote = createQuoteChecker(stream, quoteColumns, quoteHeaders);
function escapeField(field, index, isHeader) {
var escape;
field = field.replace(/\0/g, '');
if ((escape = field.indexOf(QUOTE) !== -1)) {
field = field.replace(REPLACE_REGEXP, ESCAPE + QUOTE);
escape = true;
} else {
escape = field.search(ESCAPE_REGEXP) !== -1;
}
escape = escape || shouldQuote(index, isHeader);
if (escape) {
field = [QUOTE + field + QUOTE];
} else {
field = [field];
}
return field.join("");
}
return function escapeFields(fields, isHeader) {
var i = -1, l = fields.length, ret = [], field;
while (++i < l) {
field = fields[i];
field = (isUndefinedOrNull(field) ? "" : field) + "";
ret.push(escapeField(field, i, isHeader));
}
return ret.join(delimiter);
};
}n/a
function defaultTransform(row, cb) {
return cb(null, row);
}n/a
function transformItem(stream, item) {
var ret;
if (isArray(item)) {
if (isHashArray(item)) {
ret = transformHashArrayData(stream, item);
} else {
ret = transformArrayData(stream, item);
}
} else {
ret = transformHashData(stream, item);
}
return ret;
}n/a
function CsvTransformStream(options) {
options = options || {};
options.objectMode = true;
if (has(options, "transform")) {
// remove so its not set to _transform in Transform constructor
options.consumerTransform = options.transform;
delete options.transform;
}
Transform.call(this, options);
this.formatter = createFormatter(options, this);
this.rowDelimiter = options.rowDelimiter || "\n";
var hasHeaders = has(options, "headers") ? !!options.headers : null,
headers = (hasHeaders && isArray(options.headers)) ? options.headers : null;
this.hasHeaders = hasHeaders;
this.headers = headers;
if (hasHeaders) {
if (headers) {
this.parsedHeaders = true;
this.headersLength = headers.length;
} else {
this.parsedHeaders = false;
}
}
this.hasWrittenHeaders = hasHeaders ? false : true;
this.includeEndRowDelimiter = !!options.includeEndRowDelimiter,
has(options, "consumerTransform") && this.transform(options.consumerTransform);
}n/a
function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);
Duplex.call(this, options);
this._transformState = new TransformState(this);
var stream = this;
// start out asking for a readable event once data is transformed.
this._readableState.needReadable = true;
// we have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false;
if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
if (typeof options.flush === 'function')
this._flush = options.flush;
}
// When the writable side finishes, then flush out anything remaining.
this.once('prefinish', function() {
if (typeof this._flush === 'function')
this._flush(function(er) {
done(stream, er);
});
else
done(stream);
});
}n/a
function defaultTransform(row, cb) {
return cb(null, row);
}...
headersLength: 0,
totalCount: 0,
_transform: function (item, encoding, cb) {
var self = this;
this.__transform(item, function (err, item) {
if (err) {
cb(err);
} else {
if (checkHeaders(self, item)) {
self.push(new Buffer(transformItem(self, item), "utf8"));
}
cb();
..._flush = function (cb) {
if (this.includeEndRowDelimiter) {
this.push(this.rowDelimiter);
}
cb();
}n/a
_transform = function (item, encoding, cb) {
var self = this;
this.__transform(item, function (err, item) {
if (err) {
cb(err);
} else {
if (checkHeaders(self, item)) {
self.push(new Buffer(transformItem(self, item), "utf8"));
}
cb();
}
});
}n/a
transform = function (cb) {
if (!extended.isFunction(cb)) {
this.emit("error", new TypeError("fast-csv.FormatterStream#transform requires a function"));
}
if (cb.length === 2) {
this.__transform = cb;
} else {
this.__transform = function (data, next) {
next(null, cb(data));
};
}
return this;
}...
be provided to validate and emitted as a row.
```javascript
var stream = fs.createReadStream("my.csv");
csv
.fromStream(stream)
.transform(function(data){
return data.reverse(); //reverse each row.
})
.on("data", function(data){
console.log(data);
})
.on("end", function(){
console.log("done");
...function csv() {
return parser.apply(void 0, arguments);
}...
**events**
* `data`: Emitted when a record is parsed.
* `data-invalid`: Emitted if there was invalid row encounted, **only emitted if the `validate` function is used or `strictColumnHandling
=true`**.
* `data`: Emitted with the object or `stringified` version if the `objectMode` is set to `false`.
**`([options])` or `.parse(options)`**
If you use `fast-csv` as a function it returns a transform stream that can be piped into.
```javascript
var stream = fs.createReadStream("my.csv");
var csvStream = csv()
...function createWriteStream(options) {
return new CsvTransformStream(options);
}...
* `data`: Emitted with the object or `stringified` version if the `objectMode` is set to `false`.
**`([options])` or `.parse(options)`**
If you use `fast-csv` as a function it returns a transform stream that can be piped into.
```javascript
var stream = fs.createReadStream("my.csv");
var csvStream = csv()
.on("data", function(data){
console.log(data);
})
.on("end", function(){
console.log("done");
...function createWriteStream(options) {
return new CsvTransformStream(options);
}...
### Formatting Functions
**`createWriteStream(options)` or `.format(options)`**
This is the lowest level of the write methods, it creates a stream that can be used to create a CSV of unknown size and pipe to
an output CSV.
```javascript
var csvStream = csv.createWriteStream({headers: true}),
writableStream = fs.createWriteStream("my.csv");
writableStream.on("finish", function(){
console.log("DONE!");
});
csvStream.pipe(writableStream);
...function createWriteStream(options) {
return new CsvTransformStream(options);
}...
//Generated CSV
//a,a,b,b,c,c
//a1,a2,b1,b2,c1,c2
```
### Formatting Functions
**`createWriteStream(options)` or `.format(options)`**
This is the lowest level of the write methods, it creates a stream that can be used to create a CSV of unknown size and pipe to
an output CSV.
```javascript
var csvStream = csv.createWriteStream({headers: true}),
writableStream = fs.createWriteStream("my.csv");
...function fromPath(location, options) {
return fs.createReadStream(location).pipe(new ParserStream(options));
}...
})
.on("end", function () {
console.log("done");
});
```
**`.fromPath(path[, options])`**
This method parses a file from the specified path.
```javascript
var csv = require("fast-csv");
csv
...function fromStream(stream, options) {
return stream.pipe(new ParserStream(options));
}...
console.log(data);
})
.on("end", function(){
console.log("done");
});
```
**`.fromStream(stream[, options])`**
This accepted a readable stream to parse data from.
```javascript
var stream = fs.createReadStream("my.csv");
csv
...function fromString(string, options) {
var rs = new stream.Readable();
rs.push(string);
rs.push(null);
return rs.pipe(new ParserStream(options));
}...
console.log(data);
})
.on("end", function(){
console.log("done");
});
```
**`.fromString(string[, options])`**
This method parses a string
```javascript
var csv = require("fast-csv");
var CSV_STRING = 'a,b\n' +
...function write(arr, options, ws) {
var csvStream = createWriteStream(options), i = -1, l = arr.length;
extended.asyncEach(arr, function (item, cb) {
csvStream.write(item, null, cb);
}, function (err) {
if (err) {
csvStream.emit("error", err);
} else {
csvStream.end();
}
});
return csvStream;
}...
var fileStream = fs.createReadStream("my.csv"),
parser = fastCsv();
fileStream
.on("readable", function () {
var data;
while ((data = fileStream.read()) !== null) {
parser.write(data);
}
})
.on("end", function () {
parser.end();
});
parser
...function writeToBuffer(arr, options, cb) {
if (extended.isFunction(options)) {
cb = options;
options = {};
}
var ws = new stream.Writable(), buffers = [], l = 0;
ws._write = function (data, enc, cb) {
buffers.push(data);
l++;
cb();
};
ws
.on("error", cb)
.on("finish", function () {
cb(null, Buffer.concat(buffers));
});
write(arr, options).pipe(ws);
}n/a
function writeToPath(path, arr, options) {
var stream = fs.createWriteStream(path, {encoding: "utf8"});
return write(arr, options).pipe(stream);
}...
**`writeToPath(path, arr[, options])`**
Write an array of values to the specified path
```javascript
csv
.writeToPath("my.csv", [
["a", "b"],
["a1", "b1"],
["a2", "b2"]
], {headers: true})
.on("finish", function(){
console.log("done!");
});
...function writeToStream(ws, arr, options) {
return write(arr, options).pipe(ws);
}...
**`writeToStream(stream, arr[, options])`**
Write an array of values to a `WritableStream`
```javascript
csv
.writeToStream(fs.createWriteStream("my.csv"), [
["a", "b"],
["a1", "b1"],
["a2", "b2"]
], {headers: true});
```
```javascript
...function writeToString(arr, options, cb) {
if (extended.isFunction(options)) {
cb = options;
options = {};
}
var ws = new stream.Writable(), written = [];
ws._write = function (data, enc, cb) {
written.push(data + "");
cb();
};
ws
.on("error", cb)
.on("finish", function () {
cb(null, written.join(""));
});
write(arr, options).pipe(ws);
}...
console.log("done!");
});
```
**`writeToString(arr[, options], cb)`**
```javascript
csv.writeToString(
[
["a", "b"],
["a1", "b1"],
["a2", "b2"]
],
{headers: true},
function(err, data){
...function ParserStream(options) {
options = options || {};
options.objectMode = extended.has(options, "objectMode") ? options.objectMode : true;
stream.Transform.call(this, options);
this.lines = "";
this.decoder = new StringDecoder();
this._parsedHeaders = false;
this._rowCount = -1;
this._emitData = false;
var delimiter;
if (extended.has(options, "delimiter")) {
delimiter = options.delimiter;
if (delimiter.length > 1) {
throw new Error("delimiter option must be one character long");
}
delimiter = extended.escape(delimiter);
} else {
delimiter = DEFAULT_DELIMITER;
}
options.delimiter = delimiter;
this.parser = createParser(options);
this._headers = options.headers;
this._renameHeaders = options.renameHeaders;
this._ignoreEmpty = options.ignoreEmpty;
this._discardUnmappedColumns = options.discardUnmappedColumns;
this._strictColumnHandling = options.strictColumnHandling;
this.__objectMode = options.objectMode;
this.__buffered = [];
return this;
}n/a
function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);
Duplex.call(this, options);
this._transformState = new TransformState(this);
var stream = this;
// start out asking for a readable event once data is transformed.
this._readableState.needReadable = true;
// we have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false;
if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
if (typeof options.flush === 'function')
this._flush = options.flush;
}
// When the writable side finishes, then flush out anything remaining.
this.once('prefinish', function() {
if (typeof this._flush === 'function')
this._flush(function(er) {
done(stream, er);
});
else
done(stream);
});
}n/a
__doFlush = function (callback) {
try {
callback();
} catch (e) {
callback(e);
}
}...
_flush: function (callback) {
var self = this;
if (this.lines) {
this._parse(this.lines, false, function (err) {
if (err) {
callback(err);
} else if (!self.isStreamPaused()) {
self.__doFlush(callback);
} else {
self.__pausedDone = function () {
self.__doFlush(callback);
};
}
});
} else {
...__emitRecord = function (dataRow, count) {
if (this._emitData) {
this.push(this.__objectMode ? dataRow : JSON.stringify(dataRow));
}
}...
if (row) {
self.__handleLine(row, (count = ++self._rowCount), false, function (err, dataRow) {
if (err) {
cb(err);
} else {
if (dataRow) {
if (!self.isStreamPaused()) {
self.__emitRecord(dataRow, count);
} else {
self.__buffered.push([dataRow, count]);
}
} else {
count = --self._rowCount;
}
cb();
...__flushPausedBuffer = function () {
var buffered = this.__buffered, l = buffered.length;
if (l) {
var entry;
while (buffered.length) {
entry = buffered.shift();
this.__emitRecord(entry[0], entry[1]);
//handle case where paused is called while emitting data
if (this.isStreamPaused()) {
return;
}
}
buffered.length = 0;
}
if (this.__pausedDone) {
var done = this.__pausedDone;
this.__pausedDone = null;
done();
}
}...
}
} else {
if (!hasIsPaused) {
if (event === "pause") {
this.__paused = true;
} else if (event === "resume") {
this.__paused = false;
this.__flushPausedBuffer();
}
}
spreadArgs(origEmit, arguments, this);
}
},
on: function (evt) {
...function __parseLineData(line, index, ignore, next) {
var ignoreEmpty = this._ignoreEmpty, self = this;
if (extended.isBoolean(ignoreEmpty) && ignoreEmpty && (!line || EMPTY.test(line.join("")))) {
return next(null, null);
}
if (!ignore) {
this.__transform(line, function (err, line) {
if (err) {
next(err);
} else {
self.__validate(line, function (err, isValid, reason) {
if (err) {
next(err);
} else if (isValid) {
next(null, line);
} else {
self.emit("data-invalid", line, index, reason);
next(null, null);
}
});
}
});
} else {
return next(null, line);
}
}...
}
},
__processRows: function (rows, data, cb) {
var self = this, count;
extended.asyncEach(rows, function (row, cb) {
if (row) {
self.__handleLine(row, (count = ++self._rowCount), false, function (err,
dataRow) {
if (err) {
cb(err);
} else {
if (dataRow) {
if (!self.isStreamPaused()) {
self.__emitRecord(dataRow, count);
} else {
...__processHeaders = function (rows, cb) {
var headers = this._headers,
renameHeaders = this._renameHeaders,
discardUnmappedColumns = this._discardUnmappedColumns,
strictColumnHandling = this._strictColumnHandling,
self = this;
function headerHandler(err, headers) {
if (err) {
cb(err);
} else if (extended.isArray(headers)) {
var headersLength = headers.length,
orig = self.__transform;
self.__transform = function (data, cb) {
var ret = {}, i = -1, val;
if (data.length > headersLength) {
if (discardUnmappedColumns) {
data.splice(headersLength);
} else if (strictColumnHandling) {
self.emit("data-invalid", data);
return orig(null, cb);
} else {
self.emit("error", new Error("Unexpected Error: column header mismatch expected: " + headersLength + " columns
got: " + data.length));
return orig(null, cb);
}
} else if (strictColumnHandling && (data.length < headersLength)) {
self.emit("data-invalid", data);
return orig(null, cb);
}
while (++i < headersLength) {
if (isUndefined(headers[i])) {
continue;
}
val = data[i];
ret[headers[i]] = isUndefined(val) ? '' : val;
}
return orig(ret, cb);
};
}
self._parsedHeaders = true;
cb(null);
}
if (renameHeaders) {
if (Array.isArray(headers)) {
rows.shift();
headerHandler(null, headers);
} else {
self.emit("error", new Error("Error renaming headers: new headers must be provided in an array"));
}
} else if (extended.isBoolean(headers) && headers) {
this.__handleLine(rows.shift(), 0, true, headerHandler);
} else {
headerHandler(null, headers);
}
}...
_parse: function _parseLine(data, hasMoreData, cb) {
var rows, self = this;
try {
data = this.parser(data, hasMoreData);
rows = data.rows;
if (rows.length) {
if (!this._parsedHeaders) {
this.__processHeaders(rows, function (err) {
if (err) {
cb(err);
} else {
self.__processRows(rows, data, cb);
}
});
} else {
...__processRows = function (rows, data, cb) {
var self = this, count;
extended.asyncEach(rows, function (row, cb) {
if (row) {
self.__handleLine(row, (count = ++self._rowCount), false, function (err, dataRow) {
if (err) {
cb(err);
} else {
if (dataRow) {
if (!self.isStreamPaused()) {
self.__emitRecord(dataRow, count);
} else {
self.__buffered.push([dataRow, count]);
}
} else {
count = --self._rowCount;
}
cb();
}
});
}
}, function (err) {
if (err) {
cb(err);
} else {
cb(null, data.line);
}
});
}...
rows = data.rows;
if (rows.length) {
if (!this._parsedHeaders) {
this.__processHeaders(rows, function (err) {
if (err) {
cb(err);
} else {
self.__processRows(rows, data, cb);
}
});
} else {
this.__processRows(rows, data, cb);
}
} else {
cb(null, data.line);
...__removeBOM = function (data) {
// Catches EFBBBF (UTF-8 BOM) because the buffer-to-string
// conversion translates it to FEFF (UTF-16 BOM)
if (data && typeof data == 'string' && data.charCodeAt(0) == '0xFEFF') {
return data.slice(1);
}
return data;
}...
},
_transform: function (data, encoding, done) {
var lines = this.lines,
lineData = (lines + this.decoder.write(data)),
self = this;
if (lineData.length > 1) {
lineData = this.__removeBOM(lineData);
this._parse(lineData, true, function (err, lineData) {
if (err) {
done(err);
} else {
self.lines = lineData;
if (!self.isStreamPaused()) {
done();
...__transform = function (data, next) {
return next(null, data);
}...
headersLength: 0,
totalCount: 0,
_transform: function (item, encoding, cb) {
var self = this;
this.__transform(item, function (err, item) {
if (err) {
cb(err);
} else {
if (checkHeaders(self, item)) {
self.push(new Buffer(transformItem(self, item), "utf8"));
}
cb();
...__validate = function (data, next) {
return next(null, true);
}...
return next(null, null);
}
if (!ignore) {
this.__transform(line, function (err, line) {
if (err) {
next(err);
} else {
self.__validate(line, function (err, isValid, reason) {
if (err) {
next(err);
} else if (isValid) {
next(null, line);
} else {
self.emit("data-invalid", line, index, reason);
next(null, null);
..._flush = function (callback) {
var self = this;
if (this.lines) {
this._parse(this.lines, false, function (err) {
if (err) {
callback(err);
} else if (!self.isStreamPaused()) {
self.__doFlush(callback);
} else {
self.__pausedDone = function () {
self.__doFlush(callback);
};
}
});
} else {
if (!this.isStreamPaused()) {
this.__doFlush(callback);
} else {
this.__pausedDone = function () {
self.__doFlush(callback);
};
}
}
}n/a
function _parseLine(data, hasMoreData, cb) {
var rows, self = this;
try {
data = this.parser(data, hasMoreData);
rows = data.rows;
if (rows.length) {
if (!this._parsedHeaders) {
this.__processHeaders(rows, function (err) {
if (err) {
cb(err);
} else {
self.__processRows(rows, data, cb);
}
});
} else {
this.__processRows(rows, data, cb);
}
} else {
cb(null, data.line);
}
} catch (e) {
cb(e);
}
}...
_transform: function (data, encoding, done) {
var lines = this.lines,
lineData = (lines + this.decoder.write(data)),
self = this;
if (lineData.length > 1) {
lineData = this.__removeBOM(lineData);
this._parse(lineData, true, function (err, lineData) {
if (err) {
done(err);
} else {
self.lines = lineData;
if (!self.isStreamPaused()) {
done();
} else {
..._transform = function (data, encoding, done) {
var lines = this.lines,
lineData = (lines + this.decoder.write(data)),
self = this;
if (lineData.length > 1) {
lineData = this.__removeBOM(lineData);
this._parse(lineData, true, function (err, lineData) {
if (err) {
done(err);
} else {
self.lines = lineData;
if (!self.isStreamPaused()) {
done();
} else {
self.__pausedDone = done;
}
}
});
} else {
this.lines = lineData;
if (!this.isStreamPaused()) {
done();
} else {
this.__pausedDone = done;
}
}
}n/a
emit = function (event) {
if (event === "end") {
if (!this.__endEmitted) {
this.__endEmitted = true;
spreadArgs(origEmit, ["end", ++this._rowCount], this);
}
} else {
if (!hasIsPaused) {
if (event === "pause") {
this.__paused = true;
} else if (event === "resume") {
this.__paused = false;
this.__flushPausedBuffer();
}
}
spreadArgs(origEmit, arguments, this);
}
}...
* Node 4 and 5 support.
* Deprecating the `record` event.
# v0.6.0
* Removed try catch from emit to allow bubbling up of errors to process, if one is thrown [#93](https://github.com/C2FO/fast-csv
/issues/93)
* This also fixed issue [#92](https://github.com/C2FO/fast-csv/issues/92) where a loop was entered when `this.emit("error")` was called.
* Added new tests
# v0.5.7
* Strict record handling [#53](https://github.com/C2FO/fast-csv/pull/53) - [@derjust](https://github.com/derjust)
# v0.5.6
...isStreamPaused = function () {
return this.__paused;
}...
extended.asyncEach(rows, function (row, cb) {
if (row) {
self.__handleLine(row, (count = ++self._rowCount), false, function (err, dataRow) {
if (err) {
cb(err);
} else {
if (dataRow) {
if (!self.isStreamPaused()) {
self.__emitRecord(dataRow, count);
} else {
self.__buffered.push([dataRow, count]);
}
} else {
count = --self._rowCount;
}
...on = function (evt) {
if (evt === "data" || evt === "readable") {
this._emitData = true;
}
spreadArgs(origOn, arguments, this);
return this;
}...
If you use `fast-csv` as a function it returns a transform stream that can be piped into.
```javascript
var stream = fs.createReadStream("my.csv");
var csvStream = csv()
.on("data", function(data){
console.log(data);
})
.on("end", function(){
console.log("done");
});
stream.pipe(csvStream);
...transform = function (cb) {
if (!extended.isFunction(cb)) {
this.emit("error", new TypeError("fast-csv.Parser#transform requires a function"));
}
if (cb.length === 2) {
this.__transform = cb;
} else {
this.__transform = function (data, next) {
return next(null, cb(data));
};
}
return this;
}...
be provided to validate and emitted as a row.
```javascript
var stream = fs.createReadStream("my.csv");
csv
.fromStream(stream)
.transform(function(data){
return data.reverse(); //reverse each row.
})
.on("data", function(data){
console.log(data);
})
.on("end", function(){
console.log("done");
...validate = function (cb) {
if (!extended.isFunction(cb)) {
this.emit("error", new TypeError("fast-csv.Parser#validate requires a function"));
}
if (cb.length === 2) {
this.__validate = cb;
} else {
this.__validate = function (data, next) {
return next(null, cb(data));
};
}
return this;
}...
will be emitted with the row and the index.
```javascript
var stream = fs.createReadStream("my.csv");
csv
.fromStream(stream, {headers : true})
.validate(function(data){
return data.age < 50; //all persons must be under the age of 50
})
.on("data-invalid", function(data){
//do something with invalid row
})
.on("data", function(data){
console.log(data);
...