Client = function (config) { EventEmitter.call(this); this.connectionParameters = new ConnectionParameters(config); this.user = this.connectionParameters.user; this.database = this.connectionParameters.database; this.port = this.connectionParameters.port; this.host = this.connectionParameters.host; this.password = this.connectionParameters.password; var c = config || {}; this._types = new TypeOverrides(c.types); this.connection = c.connection || new Connection({ stream: c.stream, ssl: this.connectionParameters.ssl, keepAlive: c.keepAlive || false }); this.queryQueue = []; this.binary = c.binary || defaults.binary; this.encoding = 'utf8'; this.processID = null; this.secretKey = null; this.ssl = this.connectionParameters.ssl || false; }
...
```js
var pg = require('pg');
// instantiate a new client
// the client will read connection information from
// the same environment variables used by postgres cli tools
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err, result) {
...
Connection = function (config) { EventEmitter.call(this); config = config || {}; this.stream = config.stream || new net.Stream(); this._keepAlive = config.keepAlive; this.lastBuffer = false; this.lastOffset = 0; this.buffer = null; this.offset = null; this.encoding = 'utf8'; this.parsedStatements = {}; this.writer = new Writer(); this.ssl = config.ssl || false; this._ending = false; this._mode = TEXT_MODE; this._emitMessage = false; this._reader = new Reader({ headerSize: 1, lengthPadding: -4 }); var self = this; this.on('newListener', function(eventName) { if(eventName == 'message') { self._emitMessage = true; } }); }
n/a
Pool = function (options) { var config = { Client: Client }; for (var key in options) { config[key] = options[key]; } Pool.call(this, config); }
...
idleTimeoutMillis: 30000, // how long a client is allowed to remain idle before being closed
};
//this initializes a connection pool
//it will keep idle connections open for 30 seconds
//and set a limit of maximum 10 idle clients
var pool = new pg.Pool(config);
// to run a query we can acquire a client from the pool,
// run a query on the client, and then return the client to the pool
pool.connect(function(err, client, done) {
if(err) {
return console.error('error fetching client from pool', err);
}
...
Pool.super_ = function (options, Client) { if (!(this instanceof Pool)) { return new Pool(options, Client) } EventEmitter.call(this) this.options = objectAssign({}, options) this.log = this.options.log || function () { } this.Client = this.options.Client || Client || require('pg').Client this.Promise = this.options.Promise || global.Promise this.options.max = this.options.max || this.options.poolSize || 10 this.options.create = this.options.create || this._create.bind(this) this.options.destroy = this.options.destroy || this._destroy.bind(this) this.pool = new genericPool.Pool(this.options) // Monkey patch to ensure we always finish our work // - There is a bug where callbacks go uncalled if min is not set // - We might still not want a connection to *always* exist // - but we do want to create up to max connections if we have work // - still waiting // This should be safe till the version of pg-pool is upgraded // SEE: https://github.com/coopernurse/node-pool/pull/186 this.pool._ensureMinimum = _ensureMinimum this.onCreate = this.options.onCreate }
n/a
Query = function (config, values, callback) { // use of "new" optional if(!(this instanceof Query)) { return new Query(config, values, callback); } config = utils.normalizeQueryConfig(config, values, callback); this.text = config.text; this.values = config.values; this.rows = config.rows; this.types = config.types; this.name = config.name; this.binary = config.binary; this.stream = config.stream; //use unique portal name each time this.portal = config.portal || ""; this.callback = config.callback; if(process.domain && config.callback) { this.callback = process.domain.bind(config.callback); } this._result = new Result(config.rowMode, config.types); this.isPreparedStatement = false; this._canceledDueToError = false; this._promise = null; EventEmitter.call(this); }
n/a
connection_parameters = function (config) { //if a string is passed, it is a raw connection string so we parse it into a config config = typeof config == 'string' ? parse(config) : (config || {}); //if the config has a connectionString defined, parse IT into the config we use //this will override other default values with what is stored in connectionString if(config.connectionString) { config = parse(config.connectionString); } this.user = val('user', config); this.database = val('database', config); this.port = parseInt(val('port', config), 10); this.host = val('host', config); this.password = val('password', config); this.binary = val('binary', config); this.ssl = typeof config.ssl === 'undefined' ? useSsl() : config.ssl; this.client_encoding = val("client_encoding", config); //a domain socket begins with '/' this.isDomainSocket = (!(this.host||'').indexOf('/')); this.application_name = val('application_name', config, 'PGAPPNAME'); this.fallback_application_name = val('fallback_application_name', config, false); }
n/a
query = function (native) { EventEmitter.call(this); this.native = native; this.text = null; this.values = null; this.name = null; this.callback = null; this.state = 'new'; this._arrayMode = false; //if the 'row' event is listened for //then emit them as they come in //without setting singleRowMode to true //this has almost no meaning because libpq //reads all rows into memory befor returning any this._emitRowEvents = false; this.on('newListener', function(event) { if(event === 'row') this._emitRowEvents = true; }.bind(this)); }
...
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err
, result) {
if (err) throw err;
// just print the result to the console
console.log(result.rows[0]); // outputs: { name: 'brianc' }
// disconnect the client
client.end(function (err) {
...
result = function (rowMode) { this.command = null; this.rowCount = null; this.oid = null; this.rows = []; this.fields = []; this._parsers = []; this.RowCtor = null; this.rowAsArray = rowMode == "array"; if(this.rowAsArray) { this.parseRow = this._parseRowAsArray; } }
n/a
function TypeOverrides(userTypes) { this._types = userTypes || types; this.text = {}; this.binary = {}; }
n/a
Client = function (config) { EventEmitter.call(this); this.connectionParameters = new ConnectionParameters(config); this.user = this.connectionParameters.user; this.database = this.connectionParameters.database; this.port = this.connectionParameters.port; this.host = this.connectionParameters.host; this.password = this.connectionParameters.password; var c = config || {}; this._types = new TypeOverrides(c.types); this.connection = c.connection || new Connection({ stream: c.stream, ssl: this.connectionParameters.ssl, keepAlive: c.keepAlive || false }); this.queryQueue = []; this.binary = c.binary || defaults.binary; this.encoding = 'utf8'; this.processID = null; this.secretKey = null; this.ssl = this.connectionParameters.ssl || false; }
...
```js
var pg = require('pg');
// instantiate a new client
// the client will read connection information from
// the same environment variables used by postgres cli tools
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err, result) {
...
Query = function (config, values, callback) { // use of "new" optional if(!(this instanceof Query)) { return new Query(config, values, callback); } config = utils.normalizeQueryConfig(config, values, callback); this.text = config.text; this.values = config.values; this.rows = config.rows; this.types = config.types; this.name = config.name; this.binary = config.binary; this.stream = config.stream; //use unique portal name each time this.portal = config.portal || ""; this.callback = config.callback; if(process.domain && config.callback) { this.callback = process.domain.bind(config.callback); } this._result = new Result(config.rowMode, config.types); this.isPreparedStatement = false; this._canceledDueToError = false; this._promise = null; EventEmitter.call(this); }
n/a
md5 = function (string) { return crypto.createHash('md5').update(string, 'utf-8').digest('hex'); }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
_pulseQueryQueue = function () { if(this.readyForQuery===true) { this.activeQuery = this.queryQueue.shift(); if(this.activeQuery) { this.readyForQuery = false; this.hasExecuted = true; this.activeQuery.submit(this.connection); } else if(this.hasExecuted) { this.activeQuery = null; this.emit('drain'); } } }
n/a
cancel = function (client, query) { if(client.activeQuery == query) { var con = this.connection; if(this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port); } else { con.connect(this.port, this.host); } //once connection is established send cancel message con.on('connect', function() { con.cancel(client.processID, client.secretKey); }); } else if(client.queryQueue.indexOf(query) != -1) { client.queryQueue.splice(client.queryQueue.indexOf(query), 1); } }
n/a
connect = function (callback) { var self = this; var con = this.connection; if(this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port); } else { con.connect(this.port, this.host); } //once connection is established send startup message con.on('connect', function() { if(self.ssl) { con.requestSsl(); } else { con.startup(self.getStartupConf()); } }); con.on('sslconnect', function() { con.startup(self.getStartupConf()); }); function checkPgPass(cb) { return function(msg) { if (null !== self.password) { cb(msg); } else { pgPass(self.connectionParameters, function(pass){ if (undefined !== pass) { self.connectionParameters.password = self.password = pass; } cb(msg); }); } }; } //password request handling con.on('authenticationCleartextPassword', checkPgPass(function() { con.password(self.password); })); //password request handling con.on('authenticationMD5Password', checkPgPass(function(msg) { var inner = Client.md5(self.password + self.user); var outer = Client.md5(Buffer.concat([new Buffer(inner), msg.salt])); var md5password = "md5" + outer; con.password(md5password); })); con.once('backendKeyData', function(msg) { self.processID = msg.processID; self.secretKey = msg.secretKey; }); //hook up query handling events to connection //after the connection initially becomes ready for queries con.once('readyForQuery', function() { //delegate rowDescription to active query con.on('rowDescription', function(msg) { self.activeQuery.handleRowDescription(msg); }); //delegate dataRow to active query con.on('dataRow', function(msg) { self.activeQuery.handleDataRow(msg); }); //delegate portalSuspended to active query con.on('portalSuspended', function(msg) { self.activeQuery.handlePortalSuspended(con); }); //deletagate emptyQuery to active query con.on('emptyQuery', function(msg) { self.activeQuery.handleEmptyQuery(con); }); //delegate commandComplete to active query con.on('commandComplete', function(msg) { self.activeQuery.handleCommandComplete(msg, con); }); //if a prepared statement has a name and properly parses //we track that its already been executed so we don't parse //it again on the same client con.on('parseComplete', function(msg) { if(self.activeQuery.name) { con.parsedStatements[self.activeQuery.name] = true; } }); con.on('copyInResponse', function(msg) { self.activeQuery.handleCopyInResponse(self.connection); }); con.on('copyData', function (msg) { self.activeQuery.handleCopyData(msg, self.connection); }); con.on('notification', function(msg) { self.emit('notification', msg); }); //process possible callback argument to Client#connect if (callback) { callback(null, self); //remove callback for proper error handling //after the connect event callback = null; } self.emit('connect'); }); con.on('readyForQuery', function() { var activeQuery = self.activeQuery; self.activeQuery = null; self.readyForQuery = true; self._pulseQueryQueue(); if(activeQuery) { activeQuery.handleReadyForQuery(con); } }); con.on('error', function(error) { if(self.activeQuery) { var activeQuery = self.activeQuery; self.activeQuery = null; return activeQuery.handleError(error, con); } if(!callback) { return self.emit('error', error); } con.end(); // make sure ECONNRESET errors don't cause error events callback(error); callback = null; }); con.once('end', function() { if ( callback ) { // haven't received a connection message yet ! var err = new Error('Connection terminated'); callback(err); callback = null; return; } if(self.activeQuery) { var disconnectError = ...
...
// instantiate a new client
// the client will read connection information from
// the same environment variables used by postgres cli tools
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err, result) {
if (err) throw err;
// just print the result to the console
...
copyFrom = function (text) { throw new Error("For PostgreSQL COPY TO/COPY FROM support npm install pg-copy-streams"); }
n/a
copyTo = function (text) { throw new Error("For PostgreSQL COPY TO/COPY FROM support npm install pg-copy-streams"); }
n/a
end = function (cb) { this.connection.end(); if (cb) { this.connection.once('end', cb); } }
...
client.query('SELECT $1::text as name', ['brianc'], function (err, result) {
if (err) throw err;
// just print the result to the console
console.log(result.rows[0]); // outputs: { name: 'brianc' }
// disconnect the client
client.end(function (err) {
if (err) throw err;
});
});
});
```
...
escapeIdentifier = function (str) { var escaped = '"'; for(var i = 0; i < str.length; i++) { var c = str[i]; if(c === '"') { escaped += c + c; } else { escaped += c; } } escaped += '"'; return escaped; }
n/a
escapeLiteral = function (str) { var hasBackslash = false; var escaped = '\''; for(var i = 0; i < str.length; i++) { var c = str[i]; if(c === '\'') { escaped += c + c; } else if (c === '\\') { escaped += c + c; hasBackslash = true; } else { escaped += c; } } escaped += '\''; if(hasBackslash === true) { escaped = ' E' + escaped; } return escaped; }
n/a
getStartupConf = function () { var params = this.connectionParameters; var data = { user: params.user, database: params.database }; var appName = params.application_name || params.fallback_application_name; if (appName) { data.application_name = appName; } return data; }
n/a
getTypeParser = function (oid, format) { return this._types.getTypeParser(oid, format); }
...
fallback_application_name: undefined,
parseInputDatesAsUTC: false
};
var pgTypes = require('pg-types');
// save default parsers
var parseBigInteger = pgTypes.getTypeParser(20, 'text');
var parseBigIntegerArray = pgTypes.getTypeParser(1016, 'text');
//parse int8 so you can get your count values as actual numbers
module.exports.__defineSetter__("parseInt8", function(val) {
pgTypes.setTypeParser(20, 'text', val ? pgTypes.getTypeParser(23, 'text') : parseBigInteger);
pgTypes.setTypeParser(1016, 'text', val ? pgTypes.getTypeParser(1007, 'text') : parseBigIntegerArray);
});
...
query = function (config, values, callback) { //can take in strings, config object or query object var query = (typeof config.submit == 'function') ? config : new Query(config, values, callback); if(this.binary && !query.binary) { query.binary = true; } if(query._result) { query._result._getTypeParser = this._types.getTypeParser.bind(this._types); } this.queryQueue.push(query); this._pulseQueryQueue(); return query; }
...
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err
, result) {
if (err) throw err;
// just print the result to the console
console.log(result.rows[0]); // outputs: { name: 'brianc' }
// disconnect the client
client.end(function (err) {
...
setTypeParser = function (oid, format, parseFn) { return this._types.setTypeParser(oid, format, parseFn); }
...
var pgTypes = require('pg-types');
// save default parsers
var parseBigInteger = pgTypes.getTypeParser(20, 'text');
var parseBigIntegerArray = pgTypes.getTypeParser(1016, 'text');
//parse int8 so you can get your count values as actual numbers
module.exports.__defineSetter__("parseInt8", function(val) {
pgTypes.setTypeParser(20, 'text', val ? pgTypes.getTypeParser(23, 'text
') : parseBigInteger);
pgTypes.setTypeParser(1016, 'text', val ? pgTypes.getTypeParser(1007, 'text') : parseBigIntegerArray);
});
...
Connection = function (config) { EventEmitter.call(this); config = config || {}; this.stream = config.stream || new net.Stream(); this._keepAlive = config.keepAlive; this.lastBuffer = false; this.lastOffset = 0; this.buffer = null; this.offset = null; this.encoding = 'utf8'; this.parsedStatements = {}; this.writer = new Writer(); this.ssl = config.ssl || false; this._ending = false; this._mode = TEXT_MODE; this._emitMessage = false; this._reader = new Reader({ headerSize: 1, lengthPadding: -4 }); var self = this; this.on('newListener', function(eventName) { if(eventName == 'message') { self._emitMessage = true; } }); }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
_readValue = function (buffer) { var length = this.parseInt32(buffer); if(length === -1) return null; if(this._mode === TEXT_MODE) { return this.readString(buffer, length); } return this.readBytes(buffer, length); }
n/a
_send = function (code, more) { if(!this.stream.writable) { return false; } if(more === true) { this.writer.addHeader(code); } else { return this.stream.write(this.writer.flush(code)); } }
n/a
attachListeners = function (stream) { var self = this; stream.on('data', function(buff) { self._reader.addChunk(buff); var packet = self._reader.read(); while(packet) { var msg = self.parseMessage(packet); if(self._emitMessage) { self.emit('message', msg); } self.emit(msg.name, msg); packet = self._reader.read(); } }); stream.on('end', function() { self.emit('end'); }); }
n/a
bind = function (config, more) { //normalize config config = config || {}; config.portal = config.portal || ''; config.statement = config.statement || ''; config.binary = config.binary || false; var values = config.values || []; var len = values.length; var useBinary = false; for (var j = 0; j < len; j++) useBinary |= values[j] instanceof Buffer; var buffer = this.writer .addCString(config.portal) .addCString(config.statement); if (!useBinary) buffer.addInt16(0); else { buffer.addInt16(len); for (j = 0; j < len; j++) buffer.addInt16(values[j] instanceof Buffer); } buffer.addInt16(len); for(var i = 0; i < len; i++) { var val = values[i]; if(val === null || typeof val === "undefined") { buffer.addInt32(-1); } else if (val instanceof Buffer) { buffer.addInt32(val.length); buffer.add(val); } else { buffer.addInt32(Buffer.byteLength(val)); buffer.addString(val); } } if(config.binary) { buffer.addInt16(1); // format codes to use binary buffer.addInt16(1); } else { buffer.addInt16(0); // format codes to use text } //0x42 = 'B' this._send(0x42, more); }
...
//then emit them as they come in
//without setting singleRowMode to true
//this has almost no meaning because libpq
//reads all rows into memory befor returning any
this._emitRowEvents = false;
this.on('newListener', function(event) {
if(event === 'row') this._emitRowEvents = true;
}.bind(this));
};
util.inherits(NativeQuery, EventEmitter);
NativeQuery.prototype.then = function(onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};
...
cancel = function (processID, secretKey) { var bodyBuffer = this.writer .addInt16(1234) .addInt16(5678) .addInt32(processID) .addInt32(secretKey) .flush(); var length = bodyBuffer.length + 4; var buffer = new Writer() .addInt32(length) .add(bodyBuffer) .join(); this.stream.write(buffer); }
n/a
close = function (msg, more) { this.writer.addCString(msg.type + (msg.name || '')); this._send(0x43, more); }
n/a
connect = function (port, host) { if(this.stream.readyState === 'closed') { this.stream.connect(port, host); } else if(this.stream.readyState == 'open') { this.emit('connect'); } var self = this; this.stream.on('connect', function() { if (self._keepAlive) { self.stream.setKeepAlive(true); } self.emit('connect'); }); this.stream.on('error', function(error) { //don't raise ECONNRESET errors - they can & should be ignored //during disconnect if(self._ending && error.code == 'ECONNRESET') { return; } self.emit('error', error); }); this.stream.on('close', function() { // NOTE: node-0.10 emits both 'end' and 'close' // for streams closed by the peer, while // node-0.8 only emits 'close' self.emit('end'); }); if(!this.ssl) { return this.attachListeners(this.stream); } this.stream.once('data', function(buffer) { var responseCode = buffer.toString('utf8'); if(responseCode != 'S') { return self.emit('error', new Error('The server does not support SSL connections')); } var tls = require('tls'); self.stream = tls.connect({ socket: self.stream, servername: host, rejectUnauthorized: self.ssl.rejectUnauthorized, ca: self.ssl.ca, pfx: self.ssl.pfx, key: self.ssl.key, passphrase: self.ssl.passphrase, cert: self.ssl.cert, NPNProtocols: self.ssl.NPNProtocols }); self.attachListeners(self.stream); self.emit('sslconnect'); self.stream.on('error', function(error){ self.emit('error', error); }); }); }
...
// instantiate a new client
// the client will read connection information from
// the same environment variables used by postgres cli tools
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err, result) {
if (err) throw err;
// just print the result to the console
...
describe = function (msg, more) { this.writer.addCString(msg.type + (msg.name || '')); this._send(0x44, more); }
n/a
end = function () { //0x58 = 'X' this.writer.add(emptyBuffer); this._ending = true; this._send(0x58); }
...
client.query('SELECT $1::text as name', ['brianc'], function (err, result) {
if (err) throw err;
// just print the result to the console
console.log(result.rows[0]); // outputs: { name: 'brianc' }
// disconnect the client
client.end(function (err) {
if (err) throw err;
});
});
});
```
...
endCopyFrom = function () { this.stream.write(this.writer.add(emptyBuffer).flush(0x63)); }
n/a
execute = function (config, more) { config = config || {}; config.portal = config.portal || ''; config.rows = config.rows || ''; this.writer .addCString(config.portal) .addInt32(config.rows); //0x45 = 'E' this._send(0x45, more); }
...
console.error('This can cause conflicts and silent errors executing queries');
}
var values = (this.values||[]).map(utils.prepareValue);
//check if the client has already executed this named query
//if so...just execute it again - skip the planning phase
if(client.namedQueries[this.name]) {
return this.native.execute(this.name, values, after);
}
//plan the named query the first time, then execute it
return this.native.prepare(this.name, this.text, values.length, function(err) {
if(err) return after(err);
client.namedQueries[self.name] = true;
return self.native.execute(self.name, values, after);
});
...
flush = function () { //0x48 = 'H' this.writer.add(emptyBuffer); this._send(0x48); }
n/a
parse = function (query, more) { //expect something like this: // { name: 'queryName', // text: 'select * from blah', // types: ['int8', 'bool'] } //normalize missing query names to allow for null query.name = query.name || ''; if (query.name.length > 63) { console.error('Warning! Postgres only supports 63 characters for query names.'); console.error('You supplied', query.name, '(', query.name.length, ')'); console.error('This can cause conflicts and silent errors executing queries'); } //normalize null type array query.types = query.types || []; var len = query.types.length; var buffer = this.writer .addCString(query.name) //name of query .addCString(query.text) //actual query text .addInt16(len); for(var i = 0; i < len; i++) { buffer.addInt32(query.types[i]); } var code = 0x50; this._send(code, more); }
n/a
parseA = function (buffer, length) { var msg = new Message('notification', length); msg.processId = this.parseInt32(buffer); msg.channel = this.parseCString(buffer); msg.payload = this.parseCString(buffer); return msg; }
n/a
parseC = function (buffer, length) { var msg = new Message('commandComplete', length); msg.text = this.parseCString(buffer); return msg; }
n/a
parseCString = function (buffer) { var start = this.offset; var end = indexOf(buffer, 0, start); this.offset = end + 1; return buffer.toString(this.encoding, start, end); }
n/a
parseD = function (buffer, length) { var fieldCount = this.parseInt16(buffer); var msg = new DataRowMessage(length, fieldCount); for(var i = 0; i < fieldCount; i++) { msg.fields.push(this._readValue(buffer)); } return msg; }
n/a
parseE = function (buffer, length) { var fields = {}; var msg, item; var input = new Message('error', length); var fieldType = this.readString(buffer, 1); while(fieldType != '\0') { fields[fieldType] = this.parseCString(buffer); fieldType = this.readString(buffer, 1); } if(input.name === 'error') { // the msg is an Error instance msg = new Error(fields.M); for (item in input) { // copy input properties to the error if(input.hasOwnProperty(item)) { msg[item] = input[item]; } } } else { // the msg is an object literal msg = input; msg.message = fields.M; } msg.severity = fields.S; msg.code = fields.C; msg.detail = fields.D; msg.hint = fields.H; msg.position = fields.P; msg.internalPosition = fields.p; msg.internalQuery = fields.q; msg.where = fields.W; msg.schema = fields.s; msg.table = fields.t; msg.column = fields.c; msg.dataType = fields.d; msg.constraint = fields.n; msg.file = fields.F; msg.line = fields.L; msg.routine = fields.R; return msg; }
n/a
parseField = function (buffer) { var field = new Field(); field.name = this.parseCString(buffer); field.tableID = this.parseInt32(buffer); field.columnID = this.parseInt16(buffer); field.dataTypeID = this.parseInt32(buffer); field.dataTypeSize = this.parseInt16(buffer); field.dataTypeModifier = this.parseInt32(buffer); if(this.parseInt16(buffer) === TEXT_MODE) { this._mode = TEXT_MODE; field.format = FORMAT_TEXT; } else { this._mode = BINARY_MODE; field.format = FORMAT_BINARY; } return field; }
n/a
parseG = function (buffer, length) { var msg = new Message('copyInResponse', length); return this.parseGH(buffer, msg); }
n/a
parseGH = function (buffer, msg) { var isBinary = buffer[this.offset] !== 0; this.offset++; msg.binary = isBinary; var columnCount = this.parseInt16(buffer); msg.columnTypes = []; for(var i = 0; i<columnCount; i++) { msg.columnTypes.push(this.parseInt16(buffer)); } return msg; }
n/a
parseH = function (buffer, length) { var msg = new Message('copyOutResponse', length); return this.parseGH(buffer, msg); }
n/a
parseInt16 = function (buffer) { var value = buffer.readInt16BE(this.offset, true); this.offset += 2; return value; }
n/a
parseInt32 = function (buffer) { var value = buffer.readInt32BE(this.offset, true); this.offset += 4; return value; }
n/a
parseK = function (buffer, length) { var msg = new Message('backendKeyData', length); msg.processID = this.parseInt32(buffer); msg.secretKey = this.parseInt32(buffer); return msg; }
n/a
parseMessage = function (buffer) { this.offset = 0; var length = buffer.length + 4; switch(this._reader.header) { case 0x52: //R return this.parseR(buffer, length); case 0x53: //S return this.parseS(buffer, length); case 0x4b: //K return this.parseK(buffer, length); case 0x43: //C return this.parseC(buffer, length); case 0x5a: //Z return this.parseZ(buffer, length); case 0x54: //T return this.parseT(buffer, length); case 0x44: //D return this.parseD(buffer, length); case 0x45: //E return this.parseE(buffer, length); case 0x4e: //N return this.parseN(buffer, length); case 0x31: //1 return new Message('parseComplete', length); case 0x32: //2 return new Message('bindComplete', length); case 0x33: //3 return new Message('closeComplete', length); case 0x41: //A return this.parseA(buffer, length); case 0x6e: //n return new Message('noData', length); case 0x49: //I return new Message('emptyQuery', length); case 0x73: //s return new Message('portalSuspended', length); case 0x47: //G return this.parseG(buffer, length); case 0x48: //H return this.parseH(buffer, length); case 0x63: //c return new Message('copyDone', length); case 0x64: //d return this.parsed(buffer, length); } }
n/a
parseN = function (buffer, length) { var msg = this.parseE(buffer, length); msg.name = 'notice'; return msg; }
n/a
parseR = function (buffer, length) { var code = 0; var msg = new Message('authenticationOk', length); if(msg.length === 8) { code = this.parseInt32(buffer); if(code === 3) { msg.name = 'authenticationCleartextPassword'; } return msg; } if(msg.length === 12) { code = this.parseInt32(buffer); if(code === 5) { //md5 required msg.name = 'authenticationMD5Password'; msg.salt = new Buffer(4); buffer.copy(msg.salt, 0, this.offset, this.offset + 4); this.offset += 4; return msg; } } throw new Error("Unknown authenticationOk message type" + util.inspect(msg)); }
n/a
parseS = function (buffer, length) { var msg = new Message('parameterStatus', length); msg.parameterName = this.parseCString(buffer); msg.parameterValue = this.parseCString(buffer); return msg; }
n/a
parseT = function (buffer, length) { var msg = new Message(ROW_DESCRIPTION, length); msg.fieldCount = this.parseInt16(buffer); var fields = []; for(var i = 0; i < msg.fieldCount; i++){ fields.push(this.parseField(buffer)); } msg.fields = fields; return msg; }
n/a
parseZ = function (buffer, length) { var msg = new Message('readyForQuery', length); msg.name = 'readyForQuery'; msg.status = this.readString(buffer, 1); return msg; }
n/a
parsed = function (buffer, length) { var msg = new Message('copyData', length); msg.chunk = this.readBytes(buffer, msg.length - 4); return msg; }
n/a
password = function (password) { //0x70 = 'p' this._send(0x70, this.writer.addCString(password)); }
n/a
query = function (text) { //0x51 = Q this.stream.write(this.writer.addCString(text).flush(0x51)); }
...
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err
, result) {
if (err) throw err;
// just print the result to the console
console.log(result.rows[0]); // outputs: { name: 'brianc' }
// disconnect the client
client.end(function (err) {
...
readBytes = function (buffer, length) { return buffer.slice(this.offset, this.offset += length); }
n/a
readString = function (buffer, length) { return buffer.toString(this.encoding, this.offset, (this.offset += length)); }
n/a
requestSsl = function () { this.checkSslResponse = true; var bodyBuffer = this.writer .addInt16(0x04D2) .addInt16(0x162F).flush(); var length = bodyBuffer.length + 4; var buffer = new Writer() .addInt32(length) .add(bodyBuffer) .join(); this.stream.write(buffer); }
n/a
sendCopyFail = function (msg) { //this.stream.write(this.writer.add(emptyBuffer).flush(0x66)); this.writer.addCString(msg); this._send(0x66); }
n/a
sendCopyFromChunk = function (chunk) { this.stream.write(this.writer.add(chunk).flush(0x64)); }
n/a
startup = function (config) { var writer = this.writer .addInt16(3) .addInt16(0) ; Object.keys(config).forEach(function(key){ var val = config[key]; writer.addCString(key).addCString(val); }); writer.addCString('client_encoding').addCString("'utf-8'"); var bodyBuffer = writer.addCString('').flush(); //this message is sent without a code var length = bodyBuffer.length + 4; var buffer = new Writer() .addInt32(length) .add(bodyBuffer) .join(); this.stream.write(buffer); }
n/a
sync = function () { //clear out any pending data in the writer this.writer.flush(0); this.writer.add(emptyBuffer); this._ending = true; this._send(0x53); }
n/a
Pool = function (options) { var config = { Client: Client }; for (var key in options) { config[key] = options[key]; } Pool.call(this, config); }
...
idleTimeoutMillis: 30000, // how long a client is allowed to remain idle before being closed
};
//this initializes a connection pool
//it will keep idle connections open for 30 seconds
//and set a limit of maximum 10 idle clients
var pool = new pg.Pool(config);
// to run a query we can acquire a client from the pool,
// run a query on the client, and then return the client to the pool
pool.connect(function(err, client, done) {
if(err) {
return console.error('error fetching client from pool', err);
}
...
super_ = function (options, Client) { if (!(this instanceof Pool)) { return new Pool(options, Client) } EventEmitter.call(this) this.options = objectAssign({}, options) this.log = this.options.log || function () { } this.Client = this.options.Client || Client || require('pg').Client this.Promise = this.options.Promise || global.Promise this.options.max = this.options.max || this.options.poolSize || 10 this.options.create = this.options.create || this._create.bind(this) this.options.destroy = this.options.destroy || this._destroy.bind(this) this.pool = new genericPool.Pool(this.options) // Monkey patch to ensure we always finish our work // - There is a bug where callbacks go uncalled if min is not set // - We might still not want a connection to *always* exist // - but we do want to create up to max connections if we have work // - still waiting // This should be safe till the version of pg-pool is upgraded // SEE: https://github.com/coopernurse/node-pool/pull/186 this.pool._ensureMinimum = _ensureMinimum this.onCreate = this.options.onCreate }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
_create = function (cb) { this.log('connecting new client') var client = new this.Client(this.options) client.on('error', function (e) { this.log('connected client error:', e) this.pool.destroy(client) e.client = client this.emit('error', e) }.bind(this)) client.connect(function (err) { if (err) { this.log('client connection error:', err) cb(err) } else { this.log('client connected') this.emit('connect', client) cb(null, client) } }.bind(this)) }
n/a
_destroy = function (client) { if (client._destroying) return client._destroying = true client.end() }
n/a
_promise = function (cb, executor) { if (!cb) { return new this.Promise(executor) } function resolved (value) { process.nextTick(function () { cb(null, value) }) } function rejected (error) { process.nextTick(function () { cb(error) }) } executor(resolved, rejected) }
n/a
_promiseNoCallback = function (callback, executor) { return callback ? executor() : new this.Promise(executor) }
n/a
connect = function (cb) { return this._promiseNoCallback(cb, function (resolve, reject) { this.log('acquire client begin') this.pool.acquire(function (err, client) { if (err) { this.log('acquire client. error:', err) if (cb) { cb(err, null, function () {}) } else { reject(err) } return } this.log('acquire client') this.emit('acquire', client) client.release = function (err) { delete client.release if (err) { this.log('destroy client. error:', err) this.pool.destroy(client) } else { this.log('release client') this.pool.release(client) } }.bind(this) if (cb) { cb(null, client, client.release) } else { resolve(client) } }.bind(this)) }.bind(this)) }
...
// instantiate a new client
// the client will read connection information from
// the same environment variables used by postgres cli tools
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err, result) {
if (err) throw err;
// just print the result to the console
...
end = function (cb) { this.log('draining pool') return this._promise(cb, function (resolve, reject) { this.pool.drain(function () { this.log('pool drained, calling destroy all now') this.pool.destroyAllNow(resolve) }.bind(this)) }.bind(this)) }
...
client.query('SELECT $1::text as name', ['brianc'], function (err, result) {
if (err) throw err;
// just print the result to the console
console.log(result.rows[0]); // outputs: { name: 'brianc' }
// disconnect the client
client.end(function (err) {
if (err) throw err;
});
});
});
```
...
query = function (text, values, cb) { if (typeof values === 'function') { cb = values values = undefined } return this._promise(cb, function (resolve, reject) { this.connect(function (err, client, done) { if (err) { return reject(err) } client.query(text, values, function (err, res) { done(err) err ? reject(err) : resolve(res) }) }) }.bind(this)) }
...
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err
, result) {
if (err) throw err;
// just print the result to the console
console.log(result.rows[0]); // outputs: { name: 'brianc' }
// disconnect the client
client.end(function (err) {
...
take = function (cb) { return this._promiseNoCallback(cb, function (resolve, reject) { this.log('acquire client begin') this.pool.acquire(function (err, client) { if (err) { this.log('acquire client. error:', err) if (cb) { cb(err, null, function () {}) } else { reject(err) } return } this.log('acquire client') this.emit('acquire', client) client.release = function (err) { delete client.release if (err) { this.log('destroy client. error:', err) this.pool.destroy(client) } else { this.log('release client') this.pool.release(client) } }.bind(this) if (cb) { cb(null, client, client.release) } else { resolve(client) } }.bind(this)) }.bind(this)) }
n/a
Query = function (config, values, callback) { // use of "new" optional if(!(this instanceof Query)) { return new Query(config, values, callback); } config = utils.normalizeQueryConfig(config, values, callback); this.text = config.text; this.values = config.values; this.rows = config.rows; this.types = config.types; this.name = config.name; this.binary = config.binary; this.stream = config.stream; //use unique portal name each time this.portal = config.portal || ""; this.callback = config.callback; if(process.domain && config.callback) { this.callback = process.domain.bind(config.callback); } this._result = new Result(config.rowMode, config.types); this.isPreparedStatement = false; this._canceledDueToError = false; this._promise = null; EventEmitter.call(this); }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
_getRows = function (connection, rows) { connection.execute({ portal: this.portalName, rows: rows }, true); connection.flush(); }
n/a
catch = function (callback) { return this.promise().catch(callback); }
...
util.inherits(NativeQuery, EventEmitter);
NativeQuery.prototype.then = function(onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};
NativeQuery.prototype.catch = function(callback) {
return this.promise().catch(callback);
};
NativeQuery.prototype.promise = function() {
if (this._promise) return this._promise;
this._promise = new Promise(function(resolve, reject) {
this.once('end', resolve);
this.once('error', reject);
...
handleCommandComplete = function (msg, con) { this._result.addCommandComplete(msg); //need to sync after each command complete of a prepared statement if(this.isPreparedStatement) { con.sync(); } }
n/a
handleCopyData = function (msg, connection) { var chunk = msg.chunk; if(this.stream) { this.stream.handleChunk(chunk); } //if there are no stream (for example when copy to query was sent by //query method instead of copyTo) error will be handled //on copyOutResponse event, so silently ignore this error here }
n/a
handleCopyInResponse = function (connection) { if(this.stream) this.stream.startStreamingToConnection(connection); else connection.sendCopyFail('No source stream defined'); }
n/a
handleDataRow = function (msg) { var row; if (this._canceledDueToError) { return; } try { row = this._result.parseRow(msg.fields); } catch (err) { this._canceledDueToError = err; return; } this.emit('row', row, this._result); if (this._accumulateRows) { this._result.addRow(row); } }
n/a
handleEmptyQuery = function (con) { if (this.isPreparedStatement) { con.sync(); } }
n/a
handleError = function (err, connection) { //need to sync after error during a prepared statement if(this.isPreparedStatement) { connection.sync(); } if(this._canceledDueToError) { err = this._canceledDueToError; this._canceledDueToError = false; } //if callback supplied do not emit error event as uncaught error //events will bubble up to node process if(this.callback) { return this.callback(err); } this.emit('error', err); }
...
client.native.arrayMode = false;
setImmediate(function() {
self.emit('_done');
});
//handle possible query error
if(err) {
return self.handleError(err);
}
var result = new NativeResult();
result.addCommandComplete(self.native.pq);
result.rows = rows;
//emit row events for each row in the result
...
handlePortalSuspended = function (connection) { this._getRows(connection, this.rows); }
n/a
handleReadyForQuery = function (con) { if(this._canceledDueToError) { return this.handleError(this._canceledDueToError, con); } if(this.callback) { this.callback(null, this._result); } this.emit('end', this._result); }
n/a
handleRowDescription = function (msg) { this._result.addFields(msg.fields); this._accumulateRows = this.callback || !this.listeners('row').length; }
n/a
hasBeenParsed = function (connection) { return this.name && connection.parsedStatements[this.name]; }
n/a
prepare = function (connection) { var self = this; //prepared statements need sync to be called after each command //complete or when an error is encountered this.isPreparedStatement = true; //TODO refactor this poor encapsulation if(!this.hasBeenParsed(connection)) { connection.parse({ text: self.text, name: self.name, types: self.types }, true); } if(self.values) { self.values = self.values.map(utils.prepareValue); } //http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY connection.bind({ portal: self.portalName, statement: self.name, values: self.values, binary: self.binary }, true); connection.describe({ type: 'P', name: self.portalName || "" }, true); this._getRows(connection, this.rows); }
...
//check if the client has already executed this named query
//if so...just execute it again - skip the planning phase
if(client.namedQueries[this.name]) {
return this.native.execute(this.name, values, after);
}
//plan the named query the first time, then execute it
return this.native.prepare(this.name, this.text, values.length, function(err) {
if(err) return after(err);
client.namedQueries[self.name] = true;
return self.native.execute(self.name, values, after);
});
}
else if(this.values) {
var vals = this.values.map(utils.prepareValue);
...
promise = function () { if (this._promise) return this._promise; this._promise = new Promise(function(resolve, reject) { this.once('end', resolve); this.once('error', reject); }.bind(this)); return this._promise; }
...
if(event === 'row') this._emitRowEvents = true;
}.bind(this));
};
util.inherits(NativeQuery, EventEmitter);
NativeQuery.prototype.then = function(onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};
NativeQuery.prototype.catch = function(callback) {
return this.promise().catch(callback);
};
NativeQuery.prototype.promise = function() {
...
requiresPreparation = function () { //named queries must always be prepared if(this.name) { return true; } //always prepare if there are max number of rows expected per //portal execution if(this.rows) { return true; } //don't prepare empty text queries if(!this.text) { return false; } //prepare if there are values if(!this.values) { return false; } return this.values.length > 0; }
n/a
submit = function (connection) { if(this.requiresPreparation()) { this.prepare(connection); } else { connection.query(this.text); } }
n/a
then = function (onSuccess, onFailure) { return this.promise().then(onSuccess, onFailure); }
...
if(event === 'row') this._emitRowEvents = true;
}.bind(this));
};
util.inherits(NativeQuery, EventEmitter);
NativeQuery.prototype.then = function(onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};
NativeQuery.prototype.catch = function(callback) {
return this.promise().catch(callback);
};
NativeQuery.prototype.promise = function() {
...
connection_parameters = function (config) { //if a string is passed, it is a raw connection string so we parse it into a config config = typeof config == 'string' ? parse(config) : (config || {}); //if the config has a connectionString defined, parse IT into the config we use //this will override other default values with what is stored in connectionString if(config.connectionString) { config = parse(config.connectionString); } this.user = val('user', config); this.database = val('database', config); this.port = parseInt(val('port', config), 10); this.host = val('host', config); this.password = val('password', config); this.binary = val('binary', config); this.ssl = typeof config.ssl === 'undefined' ? useSsl() : config.ssl; this.client_encoding = val("client_encoding", config); //a domain socket begins with '/' this.isDomainSocket = (!(this.host||'').indexOf('/')); this.application_name = val('application_name', config, 'PGAPPNAME'); this.fallback_application_name = val('fallback_application_name', config, false); }
n/a
getLibpqConnectionString = function (cb) { var params = []; add(params, this, 'user'); add(params, this, 'password'); add(params, this, 'port'); add(params, this, 'application_name'); add(params, this, 'fallback_application_name'); if(this.database) { params.push("dbname='" + this.database + "'"); } if(this.host) { params.push("host=" + this.host); } if(this.isDomainSocket) { return cb(null, params.join(' ')); } if(this.client_encoding) { params.push("client_encoding='" + this.client_encoding + "'"); } dns.lookup(this.host, function(err, address) { if(err) return cb(err, null); params.push("hostaddr=" + address); return cb(null, params.join(' ')); }); }
n/a
query = function (native) { EventEmitter.call(this); this.native = native; this.text = null; this.values = null; this.name = null; this.callback = null; this.state = 'new'; this._arrayMode = false; //if the 'row' event is listened for //then emit them as they come in //without setting singleRowMode to true //this has almost no meaning because libpq //reads all rows into memory befor returning any this._emitRowEvents = false; this.on('newListener', function(event) { if(event === 'row') this._emitRowEvents = true; }.bind(this)); }
...
var client = new pg.Client();
// connect to our database
client.connect(function (err) {
if (err) throw err;
// execute a query on our database
client.query('SELECT $1::text as name', ['brianc'], function (err
, result) {
if (err) throw err;
// just print the result to the console
console.log(result.rows[0]); // outputs: { name: 'brianc' }
// disconnect the client
client.end(function (err) {
...
function EventEmitter() { EventEmitter.init.call(this); }
n/a
catch = function (callback) { return this.promise().catch(callback); }
...
util.inherits(NativeQuery, EventEmitter);
NativeQuery.prototype.then = function(onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};
NativeQuery.prototype.catch = function(callback) {
return this.promise().catch(callback);
};
NativeQuery.prototype.promise = function() {
if (this._promise) return this._promise;
this._promise = new Promise(function(resolve, reject) {
this.once('end', resolve);
this.once('error', reject);
...
handleError = function (err) { var self = this; //copy pq error fields into the error object var fields = self.native.pq.resultErrorFields(); if(fields) { for(var key in fields) { err[key] = fields[key]; } } if(self.callback) { self.callback(err); } else { self.emit('error', err); } self.state = 'error'; }
...
client.native.arrayMode = false;
setImmediate(function() {
self.emit('_done');
});
//handle possible query error
if(err) {
return self.handleError(err);
}
var result = new NativeResult();
result.addCommandComplete(self.native.pq);
result.rows = rows;
//emit row events for each row in the result
...
promise = function () { if (this._promise) return this._promise; this._promise = new Promise(function(resolve, reject) { this.once('end', resolve); this.once('error', reject); }.bind(this)); return this._promise; }
...
if(event === 'row') this._emitRowEvents = true;
}.bind(this));
};
util.inherits(NativeQuery, EventEmitter);
NativeQuery.prototype.then = function(onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};
NativeQuery.prototype.catch = function(callback) {
return this.promise().catch(callback);
};
NativeQuery.prototype.promise = function() {
...
submit = function (client) { this.state = 'running'; var self = this; client.native.arrayMode = this._arrayMode; var after = function(err, rows) { client.native.arrayMode = false; setImmediate(function() { self.emit('_done'); }); //handle possible query error if(err) { return self.handleError(err); } var result = new NativeResult(); result.addCommandComplete(self.native.pq); result.rows = rows; //emit row events for each row in the result if(self._emitRowEvents) { rows.forEach(function(row) { self.emit('row', row, result); }); } //handle successful result self.state = 'end'; self.emit('end', result); if(self.callback) { self.callback(null, result); } }; if(process.domain) { after = process.domain.bind(after); } //named query if(this.name) { if (this.name.length > 63) { console.error('Warning! Postgres only supports 63 characters for query names.'); console.error('You supplied', this.name, '(', this.name.length, ')'); console.error('This can cause conflicts and silent errors executing queries'); } var values = (this.values||[]).map(utils.prepareValue); //check if the client has already executed this named query //if so...just execute it again - skip the planning phase if(client.namedQueries[this.name]) { return this.native.execute(this.name, values, after); } //plan the named query the first time, then execute it return this.native.prepare(this.name, this.text, values.length, function(err) { if(err) return after(err); client.namedQueries[self.name] = true; return self.native.execute(self.name, values, after); }); } else if(this.values) { var vals = this.values.map(utils.prepareValue); this.native.query(this.text, vals, after); } else { this.native.query(this.text, after); } }
n/a
then = function (onSuccess, onFailure) { return this.promise().then(onSuccess, onFailure); }
...
if(event === 'row') this._emitRowEvents = true;
}.bind(this));
};
util.inherits(NativeQuery, EventEmitter);
NativeQuery.prototype.then = function(onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};
NativeQuery.prototype.catch = function(callback) {
return this.promise().catch(callback);
};
NativeQuery.prototype.promise = function() {
...
result = function (rowMode) { this.command = null; this.rowCount = null; this.oid = null; this.rows = []; this.fields = []; this._parsers = []; this.RowCtor = null; this.rowAsArray = rowMode == "array"; if(this.rowAsArray) { this.parseRow = this._parseRowAsArray; } }
n/a
function getTypeParser(oid, format) { format = format || 'text'; if (!typeParsers[format]) { return noParse; } return typeParsers[format][oid] || noParse; }
...
this.fields = [];
this._parsers = [];
}
var ctorBody = "";
for(var i = 0; i < fieldDescriptions.length; i++) {
var desc = fieldDescriptions[i];
this.fields.push(desc);
var parser = this._getTypeParser(desc.dataTypeID, desc.format || 'text');
this._parsers.push(parser);
//this is some craziness to compile the row result parsing
//results in ~60% speedup on large query result sets
ctorBody += inlineParser(desc.name, i);
}
if(!this.rowAsArray) {
this.RowCtor = Function("parsers", "rowData", ctorBody);
...
_parseRowAsArray = function (rowData) { var row = []; for(var i = 0, len = rowData.length; i < len; i++) { var rawValue = rowData[i]; if(rawValue !== null) { row.push(this._parsers[i](rawValue)); } else { row.push(null); } } return row; }
n/a
addCommandComplete = function (msg) { var match; if(msg.text) { //pure javascript match = matchRegexp.exec(msg.text); } else { //native bindings match = matchRegexp.exec(msg.command); } if(match) { this.command = match[1]; //match 3 will only be existing on insert commands if(match[3]) { //msg.value is from native bindings this.rowCount = parseInt(match[3] || msg.value, 10); this.oid = parseInt(match[2], 10); } else { this.rowCount = parseInt(match[2], 10); } } }
...
//handle possible query error
if(err) {
return self.handleError(err);
}
var result = new NativeResult();
result.addCommandComplete(self.native.pq);
result.rows = rows;
//emit row events for each row in the result
if(self._emitRowEvents) {
rows.forEach(function(row) {
self.emit('row', row, result);
});
...
addFields = function (fieldDescriptions) { //clears field definitions //multiple query statements in 1 action can result in multiple sets //of rowDescriptions...eg: 'select NOW(); select 1::int;' //you need to reset the fields if(this.fields.length) { this.fields = []; this._parsers = []; } var ctorBody = ""; for(var i = 0; i < fieldDescriptions.length; i++) { var desc = fieldDescriptions[i]; this.fields.push(desc); var parser = this._getTypeParser(desc.dataTypeID, desc.format || 'text'); this._parsers.push(parser); //this is some craziness to compile the row result parsing //results in ~60% speedup on large query result sets ctorBody += inlineParser(desc.name, i); } if(!this.rowAsArray) { this.RowCtor = Function("parsers", "rowData", ctorBody); } }
n/a
addRow = function (row) { this.rows.push(row); }
n/a
parseRow = function (rowData) { return new this.RowCtor(this._parsers, rowData); }
n/a
function TypeOverrides(userTypes) { this._types = userTypes || types; this.text = {}; this.binary = {}; }
n/a
getOverrides = function (format) { switch(format) { case 'text': return this.text; case 'binary': return this.binary; default: return {}; } }
...
};
TypeOverrides.prototype.setTypeParser = function(oid, format, parseFn) {
if(typeof format == 'function') {
parseFn = format;
format = 'text';
}
this.getOverrides(format)[oid] = parseFn;
};
TypeOverrides.prototype.getTypeParser = function(oid, format) {
format = format || 'text';
return this.getOverrides(format)[oid] || this._types.getTypeParser(oid, format);
};
...
getTypeParser = function (oid, format) { format = format || 'text'; return this.getOverrides(format)[oid] || this._types.getTypeParser(oid, format); }
...
fallback_application_name: undefined,
parseInputDatesAsUTC: false
};
var pgTypes = require('pg-types');
// save default parsers
var parseBigInteger = pgTypes.getTypeParser(20, 'text');
var parseBigIntegerArray = pgTypes.getTypeParser(1016, 'text');
//parse int8 so you can get your count values as actual numbers
module.exports.__defineSetter__("parseInt8", function(val) {
pgTypes.setTypeParser(20, 'text', val ? pgTypes.getTypeParser(23, 'text') : parseBigInteger);
pgTypes.setTypeParser(1016, 'text', val ? pgTypes.getTypeParser(1007, 'text') : parseBigIntegerArray);
});
...
setTypeParser = function (oid, format, parseFn) { if(typeof format == 'function') { parseFn = format; format = 'text'; } this.getOverrides(format)[oid] = parseFn; }
...
var pgTypes = require('pg-types');
// save default parsers
var parseBigInteger = pgTypes.getTypeParser(20, 'text');
var parseBigIntegerArray = pgTypes.getTypeParser(1016, 'text');
//parse int8 so you can get your count values as actual numbers
module.exports.__defineSetter__("parseInt8", function(val) {
pgTypes.setTypeParser(20, 'text', val ? pgTypes.getTypeParser(23, 'text
') : parseBigInteger);
pgTypes.setTypeParser(1016, 'text', val ? pgTypes.getTypeParser(1007, 'text') : parseBigIntegerArray);
});
...
function getTypeParser(oid, format) { format = format || 'text'; if (!typeParsers[format]) { return noParse; } return typeParsers[format][oid] || noParse; }
...
fallback_application_name: undefined,
parseInputDatesAsUTC: false
};
var pgTypes = require('pg-types');
// save default parsers
var parseBigInteger = pgTypes.getTypeParser(20, 'text');
var parseBigIntegerArray = pgTypes.getTypeParser(1016, 'text');
//parse int8 so you can get your count values as actual numbers
module.exports.__defineSetter__("parseInt8", function(val) {
pgTypes.setTypeParser(20, 'text', val ? pgTypes.getTypeParser(23, 'text') : parseBigInteger);
pgTypes.setTypeParser(1016, 'text', val ? pgTypes.getTypeParser(1007, 'text') : parseBigIntegerArray);
});
...
function setTypeParser(oid, format, parseFn) { if(typeof format == 'function') { parseFn = format; format = 'text'; } typeParsers[format][oid] = parseFn; }
...
var pgTypes = require('pg-types');
// save default parsers
var parseBigInteger = pgTypes.getTypeParser(20, 'text');
var parseBigIntegerArray = pgTypes.getTypeParser(1016, 'text');
//parse int8 so you can get your count values as actual numbers
module.exports.__defineSetter__("parseInt8", function(val) {
pgTypes.setTypeParser(20, 'text', val ? pgTypes.getTypeParser(23, 'text
') : parseBigInteger);
pgTypes.setTypeParser(1016, 'text', val ? pgTypes.getTypeParser(1007, 'text') : parseBigIntegerArray);
});
...
create = function (source, transform) { return { parse: function() { return array.parse(source, transform); } }; }
n/a
function normalizeQueryConfig(config, values, callback) { //can take in strings or config objects config = (typeof(config) == 'string') ? { text: config } : config; if(values) { if(typeof values === 'function') { config.callback = values; } else { config.values = values; } } if(callback) { config.callback = callback; } return config; }
n/a
function prepareValueWrapper(value) { //this ensures that extra arguments do not get passed into prepareValue //by accident, eg: from calling values.map(utils.prepareValue) return prepareValue(value); }
n/a