tingodb = function (opts) {
opts = opts || {};
var db = require('./tdb.js');
var ObjectID = opts.nativeObjectID ? (require("bson").ObjectID || require("mongodb").ObjectID) : require("./ObjectId");
if (opts.nativeObjectID) {
ObjectID.prototype.valueOf = function () {
return this.toString();
}
}
function gdb (path,optsLocal) {
gdb.superclass.constructor.call(this, path, optsLocal, opts);
this.ObjectID = ObjectID;
this.Code = require('./tcode.js').Code;
this.Binary = require('./tbinary.js').Binary;
this.Finder = require("./finder")(this);
}
var F = function() { }
F.prototype = db.prototype
gdb.prototype = new F()
gdb.prototype.constructor = gdb
gdb.superclass = db.prototype
return {
Db:gdb,
Collection:require('./tcoll.js'),
Code:require('./tcode.js').Code,
Binary:require('./tbinary.js').Binary,
ObjectID:ObjectID
}
}n/a
function ObjectID(val) {
// every new instance will have temporary inproc unique value
// minus sign will let know to db layer that value is temporary
// and need to be replaced
this.id = --inproc_id;
this._init(val);
// we need to keep track all temporary instances with goal
// to update them at later tima
if (this.id<0) {
if (!tempset[this.id])
tempset[this.id]=[this];
else
tempset[this.id].push(this);
}
}n/a
function tdb() {}n/a
function tcache(tdb, size) {
this._tdb = tdb;
this.size = size || 1000;
this._cache = [];
this._cache.length = this.size;
this.clear();
}n/a
function tcoll(tdb) {
this._tdb = null;
this._name = null;
this._store = Object.create ? Object.create(null) : {};
this._fd = null;
this._fsize = null;
this._id = 1;
this._wq = new wqueue();
this._tq = null;
this._idx = Object.create ? Object.create(null) : {};
this._cache = null;
// this._mc = {};
this._check1 = Math.random()*100+1;
// native mongo db compatibility attrs
this.collectionName = null;
if (tdb._stype=="mem") {
this.init = this.initM;
this._put = this._putM;
this._get = this._getM;
} else {
this.init = this.initFS;
this._put = this._putFS;
this._get = this._getFS;
}
}n/a
function tcursor(tcoll, query, fields, opts) {
var self = this;
this.INIT = 0;
this.OPEN = 1;
this.CLOSED = 2;
this.GET_MORE = 3;
this._query = query;
this._c = tcoll;
this._i = 0;
this._skip = 0;
this._limit = null;
this._count = null;
this._items = null;
this._sort = null;
this._hint = opts.hint;
this._arFields = Object.create ? Object.create(null) : {};
this._fieldsType = null;
this._fieldsExcludeId = false;
this._fields = Object.create ? Object.create(null) : {};
this.timeout = _.isUndefined(opts.timeout)?true:opts.timeout;
_.each(fields, function (v,k) {
if (!k && _.isString(v)) {
k=v; v = 1;
}
if (v == 0 || v==1) {
// _id treated specially
if (k=="_id" && v==0) {
self._fieldsExcludeId = true;
return;
}
if (self._fieldsType==null)
self._fieldsType = v;
if (self._fieldsType==v) {
if (k.indexOf("_tiar.")==0)
self._arFields[k.substr(6)]=1;
else
self._fields[k]=v;
} else if (!self._err)
self._err = new Error("Mixed set of projection options (0,1) is not valid");
} else if (!self._err)
self._err = new Error("Unsupported projection option: "+JSON.stringify(v));
})
// _id treated specially
if ((self._fieldsType===0 || self._fieldsType===null) && self._fieldsExcludeId) {
self._fieldsType = 0;
self._fields['_id']=0;
}
}n/a
function tdb(path_, opts, gopts) {
this._gopts = gopts;
this._path = path.resolve(path_);
this._cols = Object.create ? Object.create(null) : {};
this._name = opts.name || path.basename(path_);
this._stype = gopts.memStore?"mem":"fs";
if (this._stype=="mem")
mstore[path_] = this._mstore = mstore[path_] || Object.create ? Object.create(null) : {};
// mongodb compat variables
this.openCalled = false;
}n/a
function tindex(key, tcoll, options, name) {
this.options = options || {};
this.name = name || key+'_';
this._unique = this.options.unique || false;
this._c = tcoll;
this._nuls = Object.create ? Object.create(null) : {};
this._array = this.options._tiarr || false;
this.key = key[0][0];
this.order = key[0][1];
if (key.length > 1) this._sub = key.slice(1);
this._bp = BPlusTree.create({ sort: this.order, order: 100 });
var getter = new tcoll._tdb.Finder.field(this.key);
this._get = new Function("obj", "return " + (this._array ? getter.native3() : getter.native()));
}n/a
function CursorStream(cursor, options) {
if(!(this instanceof CursorStream)) return new CursorStream(cursor);
options = options ? options : {};
Stream.call(this);
this.readable = true;
this.paused = false;
this._cursor = cursor;
this._destroyed = null;
this.options = options;
// give time to hook up events
var self = this;
process.nextTick(function() {
self._init();
});
}n/a
function wqueue(limit, first) {
this.limit = limit || 100;
this._rc = 0;
this._q = [];
this._blocked = false;
this._stoped = false;
this._tc = -1;
this.first = first || function (cb) {cb()};
}n/a
function ObjectID(val) {
// every new instance will have temporary inproc unique value
// minus sign will let know to db layer that value is temporary
// and need to be replaced
this.id = --inproc_id;
this._init(val);
// we need to keep track all temporary instances with goal
// to update them at later tima
if (this.id<0) {
if (!tempset[this.id])
tempset[this.id]=[this];
else
tempset[this.id].push(this);
}
}n/a
createFromHexString = function (str) {
return new ObjectID(str);
}n/a
createFromString = function (str) {
return new ObjectID(str);
}n/a
_init = function (val) {
if (_.isNumber(val))
this.id = val;
else if (val instanceof ObjectID)
this.id = val.id;
else if (_.isString(val)) {
if (/^-?\d+$/.test(val))
this.id = parseInt(val)
else
this.id = NaN;
}
if (val && isNaN(this.id))
throw new Error("ObjectId should be ObjectId (whatever it is designed to be) "+val)
}...
var inproc_id = -1;
var tempset = {};
function ObjectID(val) {
// every new instance will have temporary inproc unique value
// minus sign will let know to db layer that value is temporary
// and need to be replaced
this.id = --inproc_id;
this._init(val);
// we need to keep track all temporary instances with goal
// to update them at later tima
if (this.id<0) {
if (!tempset[this.id])
tempset[this.id]=[this];
else
tempset[this.id].push(this);
..._persist = function (v) {
var oldid = this.id;
if (oldid<0) {
_.each(tempset[oldid], function (oid) {
oid.id = v;
});
delete tempset[oldid];
}
}...
throw new Error("key "+k+" must not start with '$'");
if (k.indexOf('.')!=-1)
throw new Error("key "+k+" must not contain '.'");
}
if (_.isObject(v)) {
if (v instanceof self._tdb.ObjectID) {
if (v.id<0) {
v._persist(++self._id)
}
}
else
self._ensureIds(v)
}
})
return obj;
...equals = function (val) {
if (val instanceof ObjectID)
return this.id==val.id;
else {
var temp = new ObjectID(val);
return this.id==temp.id;
}
}n/a
inspect = function () {
return this.id.toString();
}n/a
toHexString = function () {
var l = this.id.toString();
var zeros = "000000000000000000000000";
return zeros.substr(0,zeros.length - l.length)+l;
}n/a
toJSON = function () {
return this.id;
}...
tcoll.prototype._wrapTypes = function(obj) {
var self = this;
_.each(obj, function (v,k) {
if (_.isDate(v))
obj[k] = {$wrap:"$date",v:v.valueOf(),h:v}
else if (v instanceof self._tdb.ObjectID)
obj[k] = {$wrap:"$oid",v:v.toJSON()}
else if (v instanceof self._tdb.Binary)
obj[k] = {$wrap: "$bin", v: v.toJSON()};
else if (_.isObject(v))
self._wrapTypes(v)
})
return obj;
...toString = function () {
return this.id.toString();
}...
// it's ok also to not provide id, then it will be generated
rooms.insert({name:"room_"+i,_idHome:homeId}, function (err, room) {
console.log(room[0]);
i--;
if (i==0) {
// now lets assume we serving request like
// /rooms?homeid=_some_string_
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
...valueOf = function () {
return this.id;
}...
var s = "(";
var ps = "obj";
for (var i=0;i<path.length;i++) {
ps+="['"+path[i]+"']";
if (i!=path.length-1)
s+=ps+ " && ";
else
s+= "("+ps+" && "+ps+".valueOf()"+") )
x22;;
}
return s;
}
this.native3 = function () {
function steep(ps, path, i) {
ps+="['"+path[i]+"']";
if (path.length==i)
...function tdb() {}n/a
init = function (path, options, cb) {
this._path = path;
cb(null);
}...
if (c) {
if (opts.strict && create) safe.back(cb, new Error("Collection " + cname + " already exists. Currently
in safe mode."));
else safe.back(cb, null, c);
return c;
}
c = new tcoll(this);
self._cols[cname] = c;
c.init(this, cname, opts, create, function (err) {
if (err) {
delete self._cols[cname];
cb(err);
} else
cb(null, c);
});
return c;
...function Binary(buffer, subType) {
if(!(this instanceof Binary)) return new Binary(buffer, subType);
this._bsontype = 'Binary';
if(buffer instanceof Number) {
this.sub_type = buffer;
this.position = 0;
} else {
this.sub_type = subType == null ? BSON_BINARY_SUBTYPE_DEFAULT : subType;
this.position = 0;
}
if(buffer != null && !(buffer instanceof Number)) {
// Only accept Buffer, Uint8Array or Arrays
if(_.isString(buffer)) {
// Different ways of writing the length of the string for the different types
if(typeof Buffer !== 'undefined') {
this.buffer = new Buffer(buffer);
} else if(typeof Uint8Array !== 'undefined' || _.isArray(buffer)) {
this.buffer = writeStringToArray(buffer);
} else {
throw new Error("only String, Buffer, Uint8Array or Array accepted");
}
} else {
this.buffer = buffer;
}
this.position = buffer.length;
} else {
if(typeof Buffer !== 'undefined') {
this.buffer = new Buffer(Binary.BUFFER_SIZE);
} else if(typeof Uint8Array !== 'undefined'){
this.buffer = new Uint8Array(new ArrayBuffer(Binary.BUFFER_SIZE));
} else {
this.buffer = new Array(Binary.BUFFER_SIZE);
}
// Set position to start of buffer
this.position = 0;
}
}...
switch (v.$wrap) {
case "$date": obj[k] = new Date(v.v); break;
case "$oid":
var oid = new self._tdb.ObjectID(v.v);
obj[k]=oid;
break;
case "$bin":
var bin = new self._tdb.Binary(new Buffer(v.v, 'base64'));
obj[k] = bin;
break;
default: self._unwrapTypes(v);
}
}
})
return obj;
...function tcache(tdb, size) {
this._tdb = tdb;
this.size = size || 1000;
this._cache = [];
this._cache.length = this.size;
this.clear();
}n/a
clear = function () {
for (var i=0; i<this._cache.length; i++) {
this._cache[i]={k:null};
}
}...
// var BPlusTree = require('./bplustree');
function tcache (tdb, size) {
this._tdb = tdb;
this.size = size || 1000;
this._cache = [];
this._cache.length = this.size;
this.clear();
}
tcache.prototype.clear = function () {
for (var i=0; i<this._cache.length; i++) {
this._cache[i]={k:null};
}
};
...get = function (k, unsafe) {
var c = this._cache[k%this.size];
return c.k == k ? (unsafe?c.v:this._tdb._cloneDeep(c.v)) : null;
}...
tcoll.prototype._getM = function (pos, unsafe, cb) {
safe.back(cb,null,unsafe?this._mstore[pos-1]:this._tdb._cloneDeep(this._mstore[pos-1]));
};
tcoll.prototype._getFS = function (pos, unsafe, cb) {
var self = this;
var cached = self._cache.get(pos,unsafe);
if (cached)
return safe.back(cb,null,cached);
var b1 = new Buffer(45);
fs.read(self._fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) {
var h1 = JSON.parse(data.toString());
h1.o = parseInt(h1.o,10);
h1.k = parseInt(h1.k,10);
...set = function (k, v) {
this._cache[k%this.size] = {k:k,v:this._tdb._cloneDeep(v)};
}...
}
this._tq = new wqueue(100, function (cb) {
// update indexes
safe.eachOfSeries(self._store, function (rec, k, cb) {
self._get(rec.pos, false, safe.sure(cb, function (obj) {
var id = simplifyKey(obj._id);
_.each(self._idx,function(v, k) {
v.set(obj, id);
});
cb();
}));
}, cb);
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
};
...unset = function (k) {
var c = this._cache[k%this.size];
if (c.k==k)
this._cache[k%this.size]={k:null};
}...
fs.write(self._fd, buf, 0, buf.length, self._fsize, safe.sure(cb, function (written) {
if (remove)
delete self._store[key._id];
else
self._store[key._id] = { pos: self._fsize, sum: key._s };
if (remove || sobj.length > self._cmaxobj)
self._cache.unset(self._fsize)
else
self._cache.set(self._fsize,item);
self._fsize+=written;
// randomly check for non exclusive file usage
// which is growth of file that we are nor aware
// randomly to avoid overhead
if (self._check1==0) {
...function Code(code, scope) {
if(!(this instanceof Code)) return new Code(code, scope);
this._bsontype = 'Code';
this.code = code;
this.scope = scope == null ? {} : scope;
}n/a
function tcoll(tdb) {
this._tdb = null;
this._name = null;
this._store = Object.create ? Object.create(null) : {};
this._fd = null;
this._fsize = null;
this._id = 1;
this._wq = new wqueue();
this._tq = null;
this._idx = Object.create ? Object.create(null) : {};
this._cache = null;
// this._mc = {};
this._check1 = Math.random()*100+1;
// native mongo db compatibility attrs
this.collectionName = null;
if (tdb._stype=="mem") {
this.init = this.initM;
this._put = this._putM;
this._get = this._getM;
} else {
this.init = this.initFS;
this._put = this._putFS;
this._get = this._getFS;
}
}n/a
__find = function (query, fields, skip, limit, sort, hint, arFields, cb) {
var self = this;
var range;
// find sort index
var si = this._bestSortIndex(sort);
// for non empty query check indexes that we can use
var qt = self._tdb.Finder.matcher(query);
var pi = [];
if (_.size(qt)>0) {
_.each(self._idx,function (i) {
var f = _.map(i.fields(), 0);
var e = _.takeWhile(f, function (k) {
return qt._ex(k) == 1 && (!hint || hint[k]);
});
if (e.length > 0) pi.push({ i: i, k: e, e: f.length > e.length });
});
}
// if possible indexes found split the query and process
// indexes separately
if (!_.isEmpty(pi)) {
// choose the most appropriate indexes
reduceIndexSet(pi);
// split query
var io = {};
_.each(pi,function (v) {
_.each(v.k,function (k) {
if (!io[k]) io[k] = qt.split(k);
});
});
// process indexes
var p = _.map(pi,function (st) {
// this action applies to all indexes
var r = io[st.k[0]]._index(st.i);
// process subfields of compound index
_.each(st.k.slice(1),function (k) {
var v = io[k];
r = _(r).map(function (si) { return v._index(si); }).flatten().value();
});
// expand subindexes to plain ids
if (st.e) r = _(r).map(function (si) { return si.all(); }).flatten().value();
// store result of index search
return r;
});
if (p.length == 1) {
p = p[0];
// optimization for the case when search and sorting indexes are the same
if (si && pi[0].i === si) {
var sif = si.fields();
if (_.every(sort, function (v, i) { return sif[i][1] == v[1]; })) {
// sort order exactly matches index order,
// so the result is already sorted
sort = null;
} else if (_.every(sort, function (v, i) { return sif[i][1] == -v[1]; })) {
// sort order is exactly opposite to index order,
// so the result is sorted, but in reverse direction
p.reverse();
sort = null;
}
}
} else {
// TODO: use sort index as intersect base to speedup sorting
p = tutils.intersectIndexes(p);
}
// nowe we have ids, need to convert them to positions
range = _.map(p,function (_id) {
return self._store[_id].pos;
})
} else {
if (si) {
range = _.map(si.all(_.map(sort, 1)), function (_id) {
return self._store[_id].pos;
})
//if (order==-1)
// range.reverse();
sort = null;
} else
range = _(self._store).values().map('pos').value();
}
if (sort && si) {
var ps = {};
_.each(range, function (pos) {
ps[pos] = true;
});
range = [];
_.each(si.all(_.map(sort, 1)), function (_id) {
var pos = self._store[_id].pos;
if (_.has(ps, pos))
range.push(pos);
});
//if (order == -1)
// range.reverse();
sort = null;
}
// no sort, no query then return right away
if (sort==null && (_.size(qt)==0 || qt._args.length==0)) {
if (skip!=0 || limit!=null) {
var c = Math.min(range.length-skip,limit?limit:range.length-skip);
range = range.splice(skip,c)
}
return safe.back(cb,null,range);
}
// check if we can use simple match or array match function
var arrayMatch = false;
if (self._tdb._gopts.searchInArray)
arrayMatch = true;
else {
_.each(qt.fields(), function (v,k) {
if (_.get(arFields, k)) {
arrayMatch = true;
return false;
}
})
}
var matcher = new Function("obj", "return " + (arrayMatch ? qt.native3() : qt.native()));
// create sort index
if (sort) {
si = new tindex(sort,self);
}
// now simple non-index search
var res = [];
var found = 0;
safe.forEachSeries(range, function (pos, cb) {
if (sort==null && limit && res.length>=limit)
return safe.back(cb);
self._get(pos, true, safe.sure(cb, function (obj) {
if (matcher(obj)) {
if (sort!=null || found>=skip) {
if (sort==null)
res.push(pos);
else
si.set(obj,pos);
}
found++;
}
cb()
}))
}, safe.sure(cb, function () {
if (sort) {
res = si.all();
//if (order==-1) {
// res.reverse();
//}
if (skip!=0 || limit!=null) {
var c = Math.min(res.length-skip,limit?limit:res.length-skip);
res = res.splice(skip,c)
}
}
cb(null, ......
if (!_.isObject(doc))
throw new Error("document must be a valid JavaScript object");
var multi = opts.multi || false;
var updater = new Updater(doc, self._tdb);
var $doc = updater.hasAtomic()?null:doc;
this._tq.add(function (cb) {
self.__find(query, null, 0, multi ? null : 1, null, opts.hint, {}, safe.sure(cb, function
(res) {
if (res.length==0) {
if (opts.upsert) {
$doc = $doc || query;
$doc = self._tdb._cloneDeep($doc);
updater.update($doc,true);
if (_.isUndefined($doc._id))
$doc._id = new self._tdb.ObjectID();
..._bestSortIndex = function (sort) {
// no sort
if (!sort) return null;
// exact match
if (this._idx[sort]) return this._idx[sort];
// find potential sort indexes
var pi = [];
_.each(this._idx,function (idx) {
var fields = idx.fields();
var match = _.takeWhile(fields, function (kv, i) {
return i < sort.length ? kv[0] == sort[i][0] : false;
});
if (match.length == sort.length) {
var score = fields.length;
_.each(sort,function (kv, i) {
if (kv[1] != fields[i][1]) score += 1;
});
pi.push({ value: idx, score: score });
}
});
if (pi.length === 0) return null;
// select best index
pi = pi.sort(function (l, r) { return l.score < r.score; });
return pi[0].value;
}...
} while (hit);
}
tcoll.prototype.__find = function (query, fields, skip, limit, sort, hint, arFields, cb) {
var self = this;
var range;
// find sort index
var si = this._bestSortIndex(sort);
// for non empty query check indexes that we can use
var qt = self._tdb.Finder.matcher(query);
var pi = [];
if (_.size(qt)>0) {
_.each(self._idx,function (i) {
var f = _.map(i.fields(), 0);
var e = _.takeWhile(f, function (k) {
..._compact = function (cb) {
var self = this;
var filename = self._filename + '.compact';
fs.open(filename, 'w+', safe.sure(cb, function (fd) {
var b1 = new Buffer(45);
function get(pos, cb) {
fs.read(self._fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) {
var h1 = JSON.parse(data.toString());
h1.o = parseInt(h1.o, 10);
h1.k = parseInt(h1.k, 10);
var b2 = new Buffer(h1.k + h1.o + 3);
fs.read(self._fd, b2, 0, b2.length, pos + 45, safe.sure(cb, function (bytes, data) {
cb(null, Buffer.concat([ b1, b2 ]));
}));
}));
}
var wpos = 0;
var store = Object.create ? Object.create(null) : {};
safe.eachOfSeries(self._store, function (rec, k, cb) {
get(rec.pos, safe.sure(cb, function (data) {
fs.write(fd, data, 0, data.length, wpos, safe.sure(cb, function (written) {
if (written != data.length) return cb(new Error('Insufficient disk space'));
store[k] = { pos: wpos, sum: rec.sum };
wpos += data.length;
cb();
}));
}));
}, function (err) {
if (err) {
fs.close(fd, function () {
fs.unlink(filename, function () {
cb(err);
});
});
return;
}
if (!!process.platform.match(/^win/)) {
// WINDOWS: unsafe because if something fail while renaming file it will not
// restore automatically
fs.close(self._fd, safe.sure(cb,function() {
fs.close(fd, safe.sure(cb,function() {
fs.unlink(self._filename, safe.sure(cb,function () {
fs.rename(filename, self._filename, safe.sure(cb, function () {
fs.open(self._filename, 'a+', safe.sure(cb, function (fd) {
self._fd = fd;
self._fsize = wpos;
self._store = store;
cb();
}));
}));
}));
}));
}));
} else {
// safe way
fs.rename(filename, self._filename, safe.sure(cb, function () {
fs.close(self._fd);
self._fd = fd;
self._fsize = wpos;
self._store = store;
cb();
}));
}
});
}));
}...
if (!found && err)
return cb(err); // nothing read and error, just rise it
safe.run(function (cb) {
var size = _.size(self._store);
// autocompact on certain ratio or err
if (deleted > size || err) {
self._compact(function (errCompact) {
if (errCompact && err)
cb(errCompact);
else {
if (errCompact) console.log(err);
cb();
}
});
..._ensureIds = function (obj) {
var self = this;
_.each(obj, function (v,k) {
if (k.length >0) {
if (k[0]=='$')
throw new Error("key "+k+" must not start with '$'");
if (k.indexOf('.')!=-1)
throw new Error("key "+k+" must not contain '.'");
}
if (_.isObject(v)) {
if (v instanceof self._tdb.ObjectID) {
if (v.id<0) {
v._persist(++self._id)
}
}
else
self._ensureIds(v)
}
})
return obj;
}...
if (_.isObject(v)) {
if (v instanceof self._tdb.ObjectID) {
if (v.id<0) {
v._persist(++self._id)
}
}
else
self._ensureIds(v)
}
})
return obj;
}
tcoll.prototype._unwrapTypes = function(obj) {
..._find = function (query, fields, skip, limit, sort_, hint, arFields, cb) {
var self = this;
this._tq.add(function (cb) {
self.__find(query, fields, skip, limit, sort_, hint, arFields, cb);
}, false, cb);
}...
}
if (self._count !== null) {
process.nextTick(function () {
cb(null, self._count);
});
return;
}
self._c._find(self._query, {}, 0, null, null, self._hint, self._arFields, safe.sure(
cb, function (data) {
self._count = data.length;
cb(null, self._count);
}));
}
tcursor.prototype.setReadPreference = function (the, cb) {
var self = this;
..._getFS = function (pos, unsafe, cb) {
var self = this;
var cached = self._cache.get(pos,unsafe);
if (cached)
return safe.back(cb,null,cached);
var b1 = new Buffer(45);
fs.read(self._fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) {
var h1 = JSON.parse(data.toString());
h1.o = parseInt(h1.o,10);
h1.k = parseInt(h1.k,10);
var b2 = new Buffer(h1.o);
fs.read(self._fd,b2,0,h1.o,pos+45+2+h1.k, safe.trap_sure(cb, function (bytes, data) {
var obj = self._unwrapTypes(JSON.parse(data.toString()));
if (bytes <= self._cmaxobj)
self._cache.set(pos, obj);
cb(null,obj);
}))
}))
}n/a
_getM = function (pos, unsafe, cb) {
safe.back(cb,null,unsafe?this._mstore[pos-1]:this._tdb._cloneDeep(this._mstore[pos-1]));
}n/a
_putFS = function (item, remove, cb) {
var self = this;
self._wq.add(function (cb) {
var err = _.attempt(function () {
item = self._ensureIds(item);
});
if (err) {
err.errmsg = err.toString();
return cb(err);
}
if (_.isUndefined(item._id))
return cb(new Error("Invalid object key (_id)"));
item = self._wrapTypes(item);
var sobj = new Buffer(remove?"":JSON.stringify(item));
item = self._unwrapTypes(item);
var key = {_id:simplifyKey(item._id),_uid:self._id,_dt:(new Date()).valueOf()};
if (remove) key._a = "del";
else {
var hash = crypto.createHash('md5');
hash.update(sobj, 'utf8');
key._s = hash.digest('hex');
}
var skey = new Buffer(JSON.stringify(key));
var zeros = "0000000000";
var lobj = sobj.length.toString();
var lkey = skey.length.toString();
lobj = zeros.substr(0,zeros.length - lobj.length)+lobj;
lkey = zeros.substr(0,zeros.length - lkey.length)+lkey;
var h1={k:lkey,o:lobj,v:"001"};
var buf = new Buffer(JSON.stringify(h1)+"\n"+skey+"\n"+sobj+"\n");
// check index update
if (item && !remove) {
err = _.attempt(function () {
_.each(self._idx,function(v,k) {
v.set(item,key._id,true);
})
});
if (err) {
err.errmsg = err.toString();
return cb(err);
}
}
safe.run(function (cb) {
var rec = self._store[key._id];
if (rec && rec.sum == key._s) return safe.back(cb);
fs.write(self._fd, buf, 0, buf.length, self._fsize, safe.sure(cb, function (written) {
if (remove)
delete self._store[key._id];
else
self._store[key._id] = { pos: self._fsize, sum: key._s };
if (remove || sobj.length > self._cmaxobj)
self._cache.unset(self._fsize)
else
self._cache.set(self._fsize,item);
self._fsize+=written;
// randomly check for non exclusive file usage
// which is growth of file that we are nor aware
// randomly to avoid overhead
if (self._check1==0) {
this._check1 = Math.random()*100+1;
fs.fstat(self._fd, safe.sure(cb, function (stat) {
if (self._fsize!=stat.size)
cb(new Error("File size mismatch. Are you use db/collection exclusively?"))
else
cb()
}))
} else {
self._check1--;
cb();
}
}));
},
function () {
// update index
_.each(self._idx,function(v,k) {
if (!remove)
v.set(item,key._id);
else
v.del(item,key._id);
})
cb(null);
});
}, true, cb);
}n/a
_putM = function (item_, remove, cb) {
var item = this._tdb._cloneDeep(item_);
var self = this;
self._wq.add(function (cb) {
var err = _.attempt(function () {
item = self._ensureIds(item);
});
if (err) {
err.errmsg = err.toString();
return cb(err);
}
if (_.isUndefined(item._id))
return cb(new Error("Invalid object key (_id)"));
var key = {_id:simplifyKey(item._id)};
// check index update
if (item && !remove) {
err = _.attempt(function () {
_.each(self._idx,function(v,k) {
v.set(item,key._id,true);
});
});
if (err) {
err.errmsg = err.toString();
return cb(err);
}
}
if (remove) {
self._mstore[self._store[key._id].pos-1]=null;
delete self._store[key._id];
}
else {
if (self._store[key._id]) {
self._mstore[self._store[key._id].pos-1] = item;
} else {
self._mstore.push(item);
self._store[key._id] = {pos: self._mstore.length};
}
}
// update index
_.each(self._idx,function(v,k) {
if (!remove)
v.set(item,key._id);
else
v.del(item,key._id);
})
cb(null);
}, true, cb);
}n/a
_refreshIndexes = function (cb) {
var self = this;
_.each(self._idx,function(v, k) {
v.clear();
});
safe.eachOfSeries(self._store, function (rec, k, cb) {
self._get(rec.pos, false, safe.sure(cb, function (obj) {
var id = simplifyKey(obj._id);
_.each(self._idx,function(v, k) {
v.set(obj, id);
});
cb();
}));
}, cb);
}...
else {
if (errCompact) console.log(err);
cb();
}
});
} else cb();
}, function () {
self._refreshIndexes(cb);
});
});
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
};
tcoll.prototype.compactCollection = function (cb) {
..._stop = function (cb) {
var self = this;
self._tq.add(function (cb) {
// this will prevent any tasks processed on this instance
self._tq._stoped = true;
if (self._fd) {
fs.close(self._fd,safe.sure(cb, function () {
cb(null,true);
}));
} else
cb(null,false);
},true,cb);
}...
tdb.prototype.close = function (forceClose, cb) {
var self = this;
if (cb==null) cb = forceClose;
cb = cb || function () {};
// stop any further operations on current collections
safe.eachOf(self._cols, function (c, k, cb) {
c._stop(cb)
}, safe.sure(cb, function () {
// and clean list
self._cols = Object.create ? Object.create(null) : {};
this.openCalled = false;
safe.back(cb,null,this);
}))
}
..._unwrapTypes = function (obj) {
var self = this;
_.each(obj, function (v,k) {
if (_.isObject(v)) {
switch (v.$wrap) {
case "$date": obj[k] = new Date(v.v); break;
case "$oid":
var oid = new self._tdb.ObjectID(v.v);
obj[k]=oid;
break;
case "$bin":
var bin = new self._tdb.Binary(new Buffer(v.v, 'base64'));
obj[k] = bin;
break;
default: self._unwrapTypes(v);
}
}
})
return obj;
}...
var b1 = new Buffer(45);
fs.read(self._fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) {
var h1 = JSON.parse(data.toString());
h1.o = parseInt(h1.o,10);
h1.k = parseInt(h1.k,10);
var b2 = new Buffer(h1.o);
fs.read(self._fd,b2,0,h1.o,pos+45+2+h1.k, safe.trap_sure(cb, function (bytes, data) {
var obj = self._unwrapTypes(JSON.parse(data.toString()));
if (bytes <= self._cmaxobj)
self._cache.set(pos, obj);
cb(null,obj);
}))
}))
}
..._wrapTypes = function (obj) {
var self = this;
_.each(obj, function (v,k) {
if (_.isDate(v))
obj[k] = {$wrap:"$date",v:v.valueOf(),h:v}
else if (v instanceof self._tdb.ObjectID)
obj[k] = {$wrap:"$oid",v:v.toJSON()}
else if (v instanceof self._tdb.Binary)
obj[k] = {$wrap: "$bin", v: v.toJSON()};
else if (_.isObject(v))
self._wrapTypes(v)
})
return obj;
}...
if (_.isDate(v))
obj[k] = {$wrap:"$date",v:v.valueOf(),h:v}
else if (v instanceof self._tdb.ObjectID)
obj[k] = {$wrap:"$oid",v:v.toJSON()}
else if (v instanceof self._tdb.Binary)
obj[k] = {$wrap: "$bin", v: v.toJSON()};
else if (_.isObject(v))
self._wrapTypes(v)
})
return obj;
}
tcoll.prototype._ensureIds = function(obj) {
var self = this;
...compactCollection = function (cb) {
var self = this;
self._tq.add(function (cb) {
self._compact(safe.sure(cb, function () {
self._cache.clear();
self._refreshIndexes(cb);
}));
}, true, cb);
}...
}));
};
tdb.prototype.compactDatabase = function (cb) {
var self = this;
self.collections(safe.sure(cb, function(collections) {
safe.forEach(collections, function (c, cb) {
c.compactCollection(cb);
},cb);
}));
};
tdb.prototype.renameCollection = function (on,nn,opts,cb) {
if (cb==null) {
cb = opts;
...count = function (query, options, cb) {
var self = this;
if (arguments.length == 1) {
cb = arguments[0];
options = null;
query = null;
}
if (arguments.length == 2) {
query = arguments[0];
cb = arguments[1];
options = null;
}
if (query==null || _.size(query)==0) {
this._tq.add(function (cb) {
cb(null, _.size(self._store));
},false,cb);
} else
self.find(query, options).count(cb);
}...
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
console.log(count);
console.timeEnd("sample");
})
}
})
}
})
...createIndex = function (obj, options, cb) {
var self = this;
if (_.isFunction(options)) {
cb = options;
options = {};
}
cb = cb || function () {};
options = options || {};
var c = new tcursor(this,{},{},{});
c.sort(obj);
if (c._err)
return safe.back(cb,c._err);
var key = c._sort;
if (key===null)
return safe.back(cb,new Error("No fields are specified"));
var index = self._idx[key];
if (index)
return safe.back(cb,null, index.name);
// force array support when global option is set
if (_.isUndefined(options._tiarr) && self._tdb._gopts.searchInArray)
options._tiarr = true;
var name = options.name || _.map(key, function (v) { return v[0] + '_' + v[1]; }).join('_');
index = new tindex(key, self, options, name);
if (self._tq._tc==-1) {
// if no operation is pending just register index
self._idx[key] = index;
safe.back(cb, null, index.name);
}
else {
// overwise register index operation
this._tq.add(function (cb) {
var range = _.values(self._store);
safe.forEachSeries(range, function (rec, cb) {
self._get(rec.pos, false, safe.sure(cb, function (obj) {
index.set(obj,simplifyKey(obj._id));
cb();
}));
}, safe.sure(cb, function () {
self._idx[key] = index;
cb();
}));
}, true, function (err) {
if (err) cb(err);
else cb(null, index.name);
});
}
}n/a
drop = function (cb) {
this._tdb.dropCollection(this._name,cb);
}...
});
// write results to collection
safe.waterfall([
function (cb) {
self._tdb.collection(opts.out.replace, { strict: 1 }, function (err, col) {
if (err) return cb(null, null);
col.drop(cb);
});
},
function (arg, cb) {
self._tdb.collection(opts.out.replace, {}, cb);
},
function (col, cb) {
var docs = _.map(m, function (value, key) {
...ensureIndex = function (obj, options, cb) {
var self = this;
if (_.isFunction(options)) {
cb = options;
options = {};
}
cb = cb || function () {};
options = options || {};
var c = new tcursor(this,{},{},{});
c.sort(obj);
if (c._err)
return safe.back(cb,c._err);
var key = c._sort;
if (key===null)
return safe.back(cb,new Error("No fields are specified"));
var index = self._idx[key];
if (index)
return safe.back(cb,null, index.name);
// force array support when global option is set
if (_.isUndefined(options._tiarr) && self._tdb._gopts.searchInArray)
options._tiarr = true;
var name = options.name || _.map(key, function (v) { return v[0] + '_' + v[1]; }).join('_');
index = new tindex(key, self, options, name);
if (self._tq._tc==-1) {
// if no operation is pending just register index
self._idx[key] = index;
safe.back(cb, null, index.name);
}
else {
// overwise register index operation
this._tq.add(function (cb) {
var range = _.values(self._store);
safe.forEachSeries(range, function (rec, cb) {
self._get(rec.pos, false, safe.sure(cb, function (obj) {
index.set(obj,simplifyKey(obj._id));
cb();
}));
}, safe.sure(cb, function () {
self._idx[key] = index;
cb();
}));
}, true, function (err) {
if (err) cb(err);
else cb(null, index.name);
});
}
}...
##### searchInArray: true|false Default is false
Globally enables support of search in nested arrays. MongoDB supports this unconditionally. For TingoDB, searching arrays when there
are no arrays incurs a performance penalty. That's why this is switched off by default.
Additionally, and this might be a better approach, nested array support can be enabled for individual indexes or search queries.
To enable nested arrays in individual indexed, use "_tiarr:true" option.
self._cash_transactions.ensureIndex("splits.accountId",{_tiarr:true},cb);
To enable nested arrays in individual queries for fields that do not use indexes, use "_tiarr." to prefix field names.
coll.find({'arr.num':10},{"_tiar.arr.num":0})
#### new Db(path, options)
...find = function () {
var cb = null, query = {}, opts = {}, fields = null, skip = null, limit = null, sort = null;
var argc = arguments.length;
if (argc>0) {
// guess callback, it is always latest
cb = arguments[argc-1];
if (!_.isFunction(cb))
cb=null
else
argc--;
if (argc>0) {
// query should always exist
query = arguments[0]
if (argc>1) {
if (argc==2) {
var val = arguments[1];
// worst case we get either options either fiels
if (_(val).keys().intersection(findOpts).size() > 0)
opts = val
else
fields = val;
} else {
fields = arguments[1];
if (argc == 3)
opts = arguments[2]
else {
skip = arguments[2];
limit = arguments[3]
}
}
}
}
}
opts = opts || {};
skip = skip || opts.skip || null;
limit = limit || opts.limit || null;
fields = fields || opts.fields || null;
sort = sort || opts.sort || null;
var c = new tcursor(this,query, fields, opts);
if (skip) c.skip(skip);
if (limit) c.limit(limit);
if (sort) c.sort(sort);
if (cb)
cb(null, c)
else
return c;
}...
To enable nested arrays in individual indexed, use "_tiarr:true" option.
self._cash_transactions.ensureIndex("splits.accountId",{_tiarr:true},cb);
To enable nested arrays in individual queries for fields that do not use indexes, use "_tiarr." to prefix field names.
coll.find({'arr.num':10},{"_tiar.arr.num":0})
#### new Db(path, options)
The only required parameter is the database path. It should be a valid path to an empty folder or a folder that already contains
collection files.
API extensions
==============
...findAndModify = function (query, sort, doc, opts, cb) {
var self = this;
if (_.isFunction(opts) && cb == null) {
cb = opts;
opts = {};
}
opts = opts || {};
doc = doc || {};
var updater = new Updater(doc, self._tdb);
var $doc = updater.hasAtomic()?null:doc;
var c = new tcursor(this,{}, opts.fields || {},{});
c.sort(sort);
if (c._err)
return safe.back(cb,c._err);
this._tq.add(function (cb) {
self.__find(query, null, 0, 1, c._sort, opts.hint, {}, safe.sure(cb, function (res) {
if (res.length==0) {
if (opts.upsert) {
$doc = $doc || query;
$doc = self._tdb._cloneDeep($doc);
updater.update($doc,true);
if (_.isUndefined($doc._id))
$doc._id = new self._tdb.ObjectID();
self._put($doc, false, safe.sure(cb, function () {
cb(null,opts.new?c._projectFields($doc):{})
}))
} else
cb();
} else {
self._get(res[0], false, safe.sure(cb, function (obj) {
var robj = (opts.new && !opts.remove) ? obj : self._tdb._cloneDeep(obj);
// remove current version of doc from indexes
_.each(self._idx,function(v,k) {
v.del(obj,simplifyKey(obj._id));
})
var udoc = $doc;
if (!$doc) {
udoc = obj;
updater.update(udoc);
}
udoc._id = obj._id;
// put will add it back to indexes
self._put(udoc, opts.remove?true:false, safe.sure(cb,function () {
cb(null,c._projectFields(robj))
}))
}))
}
}))
},true,cb);
}n/a
findAndRemove = function (query, sort, opts, cb) {
var self = this;
if (_.isFunction(sort) && cb == null && opts==null) {
cb = sort;
sort = {}
opts = {};
} else if (_.isFunction(opts) && cb == null) {
cb = opts;
opts = {};
}
opts = opts || {};
sort = sort || {};
var c = new tcursor(this,{},{},{});
// Fix for mongoouse/tungus they pass sort as undefined
c.sort(sort);
if (c._err)
return safe.back(cb,c._err);
this._tq.add(function (cb) {
self.__find(query, null, 0, 1, c._sort, opts.hint, {}, safe.sure(cb, function (res) {
if (res.length==0)
return cb();
self._get(res[0], false, safe.sure(cb, function (obj) {
self._put(obj,true,safe.sure(cb, function () {
cb(null,obj);
}))
}))
}))
},true,cb);
}n/a
findOne = function () {
var args = arguments,
index = -1,
length = nativeMax(args.length - start, 0),
array = Array(length);
while (++index < length) {
array[index] = args[start + index];
}
switch (start) {
case 0: return func.call(this, array);
case 1: return func.call(this, args[0], array);
case 2: return func.call(this, args[0], args[1], array);
}
var otherArgs = Array(start + 1);
index = -1;
while (++index < start) {
otherArgs[index] = args[index];
}
otherArgs[start] = array;
return apply(func, this, otherArgs);
}...
var db = new Db('test', new Server('localhost', 27017));
var collection = db.collection("batch_document_insert_collection_safe");
collection.insert([{hello:'world_safe1'}
, {hello:'world_safe2'}], {w:1}, function(err, result) {
assert.equal(null, err);
collection.findOne({hello:'world_safe2'}, function(err, item) {
assert.equal(null, err);
assert.equal('world_safe2', item.hello);
})
});
```
The same example using TingoDB is as follows:
...group = function () {
var args = arguments,
index = -1,
length = nativeMax(args.length - start, 0),
array = Array(length);
while (++index < length) {
array[index] = args[start + index];
}
switch (start) {
case 0: return func.call(this, array);
case 1: return func.call(this, args[0], array);
case 2: return func.call(this, args[0], args[1], array);
}
var otherArgs = Array(start + 1);
index = -1;
while (++index < start) {
otherArgs[index] = args[index];
}
otherArgs[start] = array;
return apply(func, this, otherArgs);
}n/a
indexExists = function (idx, cb) {
if (!_.isArray(idx))
idx = [idx];
var i = _.intersection(idx,_(this._idx).values().map('name').value());
cb(null,i.length == idx.length);
}n/a
indexes = function (cb) {
var self = this;
this._tq.add(function (cb) {
cb(null, _.values(self._idx));
},false,cb);
}n/a
initFS = function (tdb, name, options, create, cb) {
var self= this;
this._tdb = tdb;
this._cache = new tcache(tdb, tdb._gopts.cacheSize);
this._cmaxobj = tdb._gopts.cacheMaxObjSize || 1024;
this.collectionName = this._name = name;
this._filename = path.join(this._tdb._path, this._name);
if (options.strict) {
var exists = fs.existsSync(self._filename);
if (exists && create) return cb(new Error("Collection " + self._name + " already exists. Currently in safe mode."));
else if (!exists && !create) return cb(new Error("Collection " + self._name + " does not exist. Currently in safe mode."));
}
var pos = 0;
var deleted = 0;
var found = 0;
this._tq = new wqueue(100, function (cb) {
safe.run(function (cb) {
fs.open(self._filename, "a+", safe.sure(cb, function (fd) {
self._fd = fd;
var b1 = new Buffer(45);
safe.whilst(function () { return self._fsize === null; }, function(cb) {
safe.run(function (cb) {
fs.read(fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) {
if (bytes===0) {
self._fsize = pos;
return cb();
}
var h1 = JSON.parse(data.toString());
h1.o = parseInt(h1.o,10);
h1.k = parseInt(h1.k,10);
var b2 = new Buffer(h1.k);
fs.read(fd,b2,0,h1.k,pos+45+1, safe.sure(cb, function (bytes, data) {
var k = JSON.parse(data.toString());
self._id = k._uid;
if (k._a=='del') {
delete self._store[k._id];
deleted++;
} else {
if (self._store[k._id]) deleted++;
self._store[k._id] = { pos: pos, sum: k._s };
}
pos+=45+3+h1.o+h1.k;
found++;
cb();
}));
}));
}, function (err) {
if (err)
cb(new Error(self._name+": Error during load - "+err.toString()));
else
cb();
});
}, cb);
}));
}, function (err) {
if (!found && err)
return cb(err); // nothing read and error, just rise it
safe.run(function (cb) {
var size = _.size(self._store);
// autocompact on certain ratio or err
if (deleted > size || err) {
self._compact(function (errCompact) {
if (errCompact && err)
cb(errCompact);
else {
if (errCompact) console.log(err);
cb();
}
});
} else cb();
}, function () {
self._refreshIndexes(cb);
});
});
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
}n/a
initM = function (tdb, name, options, create, cb) {
var self= this;
this._tdb = tdb;
tdb._mstore = tdb._mstore || Object.create ? Object.create(null) : {};
this.collectionName = this._name = name;
if (options.strict) {
var exists = tdb._mstore[name];
if (exists && create) return cb(new Error("Collection " + self._name + " already exists. Currently in safe mode."));
else if (!exists && !create) return cb(new Error("Collection " + self._name + " does not exist. Currently in safe mode."));
}
tdb._mstore[name] = this._mstore = tdb._mstore[name] || [];
for (var k=0; k< this._mstore.length; k++) {
var o = this._mstore[k];
if (o) {
self._store[simplifyKey(o._id)]={pos:k+1};
}
}
this._tq = new wqueue(100, function (cb) {
// update indexes
safe.eachOfSeries(self._store, function (rec, k, cb) {
self._get(rec.pos, false, safe.sure(cb, function (obj) {
var id = simplifyKey(obj._id);
_.each(self._idx,function(v, k) {
v.set(obj, id);
});
cb();
}));
}, cb);
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
}n/a
insert = function (docs, opts, cb ) {
var self = this;
if (_.isFunction(opts) && cb == null) {
cb = opts;
opts = {};
}
opts = opts || {};
if (opts.w>0 && !_.isFunction(cb))
throw new Error("Callback is required for safe update");
cb = cb || function () {};
if (!_.isArray(docs))
docs = [docs];
this._tq.add(function (cb) {
safe.forEachSeries(docs, function (doc, cb) {
if (_.isUndefined(doc._id)) {
doc._id = new self._tdb.ObjectID();
}
self._put(doc, false, cb);
}, safe.sure(cb, function () {
cb(null, docs);
}))
}, true, cb)
}...
```javascript
var Db = require('mongodb').Db,
Server = require('mongodb').Server,
assert = require('assert');
var db = new Db('test', new Server('localhost', 27017));
var collection = db.collection("batch_document_insert_collection_safe");
collection.insert([{hello:'world_safe1'}
, {hello:'world_safe2'}], {w:1}, function(err, result) {
assert.equal(null, err);
collection.findOne({hello:'world_safe2'}, function(err, item) {
assert.equal(null, err);
assert.equal('world_safe2', item.hello);
})
...mapReduce = function (map, reduce, opts, cb) {
var self = this;
if (_.isFunction(opts)) {
cb = opts;
opts = {};
}
if (!opts.out) return safe.back(cb, new Error('the out option parameter must be defined'));
if (!opts.out.inline && !opts.out.replace) {
return safe.back(cb, new Error('the only supported out options are inline and replace'));
}
code2fn(opts.scope);
var m = {};
function emit(k, v) {
var values = m[k];
if (!values) m[k] = [ v ];
else {
values.push(v);
if (values.length > 1000) values = [ reduce(k, values) ];
}
}
with (opts.scope || {}) {
try {
if (map instanceof Code) {
with (map.scope) {
map = eval('(' + map.code + ')');
}
} else map = eval('(' + map + ')');
if (reduce instanceof Code) {
with (reduce.scope) {
reduce = eval('(' + reduce.code + ')');
}
} else reduce = eval('(' + reduce + ')');
if (finalize instanceof Code) {
with (finalize.scope) {
finalize = eval('(' + finalize.code + ')');
}
} else var finalize = eval('(' + opts.finalize + ')');
} catch (e) {
return safe.back(cb,e);
}
}
self.find(opts.query, null, { limit: opts.limit, sort: opts.sort }, safe.sure(cb, function (c) {
var doc;
safe.doUntil(
function (cb) {
c.nextObject(safe.trap_sure(cb, function (_doc) {
doc = _doc;
if (doc) map.call(doc);
return cb();
}));
},
function () {
return doc === null;
},
safe.trap_sure(cb, function () {
_.each(m,function (v, k) {
v = v.length > 1 ? reduce(k, v) : v[0];
if (finalize) v = finalize(k, v);
m[k] = v;
});
var stats = {};
if (opts.out.inline) return process.nextTick(function () {
cb(null, _.values(m), stats); // execute outside of trap
});
// write results to collection
safe.waterfall([
function (cb) {
self._tdb.collection(opts.out.replace, { strict: 1 }, function (err, col) {
if (err) return cb(null, null);
col.drop(cb);
});
},
function (arg, cb) {
self._tdb.collection(opts.out.replace, {}, cb);
},
function (col, cb) {
var docs = _.map(m, function (value, key) {
return {
_id: key,
value: value
};
});
col.insert(docs, safe.sure(cb, function () {
if (opts.verbose) cb(null, col, stats);
else cb(null, col);
}));
}
], cb);
}
)); // doUntil
}));
}n/a
remove = function (query, opts, cb) {
var self = this;
if (_.isFunction(query)) {
cb = query;
query = opts = {};
} else if (_.isFunction(opts)) {
cb = opts;
opts = {};
}
opts = opts || {};
if (opts.w>0 && !_.isFunction(cb))
throw new Error("Callback is required");
cb = cb || function () {};
var single = opts.single || false;
this._tq.add(function (cb) {
self.__find(query, null, 0, single ? 1 : null, null, opts.hint, {}, safe.sure(cb, function (res) {
safe.forEachSeries(res, function (pos, cb) {
self._get(pos, false, safe.sure(cb, function (obj) {
self._put(obj,true,cb);
}))
}, safe.sure(cb, function () {
cb(null,res.length);
}))
}))
},true,cb);
}n/a
rename = function (nname, opts, cb) {
var self = this;
if (_.isFunction(opts)) {
cb = opts;
opts = {};
}
var err = self._tdb._nameCheck(nname);
if (err)
return safe.back(cb,err);
if (self._tdb._stype=="mem") {
delete self._tdb._cols[self._name];
self._tdb._cols[nname] = self;
delete self._tdb._mstore[self._name];
self._tdb._mstore[nname] = self._mstore;
safe.back(cb,null);
} else {
self._tq.add(function (cb) {
fs.rename(path.join(self._tdb._path,self._name),path.join(self._tdb._path,nname),safe.sure(cb, function () {
delete self._tdb._cols[self._name];
self._tdb._cols[nname] = self;
self.collectionName = self._name = nname;
cb();
}));
},true,cb);
}
}...
}
if (!!process.platform.match(/^win/)) {
// WINDOWS: unsafe because if something fail while renaming file it will not
// restore automatically
fs.close(self._fd, safe.sure(cb,function() {
fs.close(fd, safe.sure(cb,function() {
fs.unlink(self._filename, safe.sure(cb,function () {
fs.rename(filename, self._filename, safe.sure(cb, function () {
fs.open(self._filename, 'a+', safe.sure(cb, function (fd) {
self._fd = fd;
self._fsize = wpos;
self._store = store;
cb();
}));
}));
...save = function (doc, opts, cb) {
var self = this;
cb = _.isFunction(doc)?doc:_.isFunction(opts)?opts:cb;
cb = cb || function () {};
doc = doc || {};
opts = opts || {};
this._tq.add(function (cb) {
var res = doc;
(function(cb) {
if (_.isUndefined(doc._id)) {
doc._id = new self._tdb.ObjectID();
cb()
} else {
var id = simplifyKey(doc._id);
var pos = self._store[id];
// check if document with this id already exist
if (pos) {
// if so we need to fetch it to update index
self._get(pos.pos, false, safe.sure(cb, function (oldDoc) {
// remove current version of doc from indexes
_.each(self._idx,function(v,k) {
v.del(oldDoc,id);
})
res = 1;
cb();
}))
} else cb();
}
})(safe.sure(cb, function () {
self._put(doc, false, safe.sure(cb, function () {
cb(null,res); // when update return 1 when new save return obj
}))
}))
},true,cb);
}n/a
stats = function (cb) {
var self = this;
this._tq.add(function (cb) {
cb(null, {count:_.size(self._store)});
},false,cb);
}n/a
update = function (query, doc, opts, cb) {
var self = this;
if (_.isFunction(opts) && cb == null) {
cb = opts;
}
opts = opts || {};
if (opts.w>0 && !_.isFunction(cb))
throw new Error("Callback is required for safe update");
cb = cb || function () {}
if (!_.isObject(query))
throw new Error("selector must be a valid JavaScript object");
if (!_.isObject(doc))
throw new Error("document must be a valid JavaScript object");
var multi = opts.multi || false;
var updater = new Updater(doc, self._tdb);
var $doc = updater.hasAtomic()?null:doc;
this._tq.add(function (cb) {
self.__find(query, null, 0, multi ? null : 1, null, opts.hint, {}, safe.sure(cb, function (res) {
if (res.length==0) {
if (opts.upsert) {
$doc = $doc || query;
$doc = self._tdb._cloneDeep($doc);
updater.update($doc,true);
if (_.isUndefined($doc._id))
$doc._id = new self._tdb.ObjectID();
self._put($doc, false, safe.sure(cb, function () {
cb(null, 1,{updatedExisting:false,upserted:$doc._id,n:1})
}))
} else
cb(null,0);
} else {
safe.forEachSeries(res, function (pos, cb) {
self._get(pos, false, safe.sure(cb, function (obj) {
// remove current version of doc from indexes
_.each(self._idx,function(v,k) {
v.del(obj,simplifyKey(obj._id));
})
var udoc = $doc;
if (!$doc) {
udoc = obj;
updater.update(udoc);
}
udoc._id = obj._id;
// put will add it back to indexes
self._put(udoc, false, cb);
}))
}, safe.sure(cb,function () {
cb(null, res.length, {updatedExisting:true,n:res.length});
}))
}
}))
},true,cb);
}...
item = self._wrapTypes(item);
var sobj = new Buffer(remove?"":JSON.stringify(item));
item = self._unwrapTypes(item);
var key = {_id:simplifyKey(item._id),_uid:self._id,_dt:(new Date()).valueOf()};
if (remove) key._a = "del";
else {
var hash = crypto.createHash('md5');
hash.update(sobj, 'utf8');
key._s = hash.digest('hex');
}
var skey = new Buffer(JSON.stringify(key));
var zeros = "0000000000";
var lobj = sobj.length.toString();
var lkey = skey.length.toString();
lobj = zeros.substr(0,zeros.length - lobj.length)+lobj;
...function tcursor(tcoll, query, fields, opts) {
var self = this;
this.INIT = 0;
this.OPEN = 1;
this.CLOSED = 2;
this.GET_MORE = 3;
this._query = query;
this._c = tcoll;
this._i = 0;
this._skip = 0;
this._limit = null;
this._count = null;
this._items = null;
this._sort = null;
this._hint = opts.hint;
this._arFields = Object.create ? Object.create(null) : {};
this._fieldsType = null;
this._fieldsExcludeId = false;
this._fields = Object.create ? Object.create(null) : {};
this.timeout = _.isUndefined(opts.timeout)?true:opts.timeout;
_.each(fields, function (v,k) {
if (!k && _.isString(v)) {
k=v; v = 1;
}
if (v == 0 || v==1) {
// _id treated specially
if (k=="_id" && v==0) {
self._fieldsExcludeId = true;
return;
}
if (self._fieldsType==null)
self._fieldsType = v;
if (self._fieldsType==v) {
if (k.indexOf("_tiar.")==0)
self._arFields[k.substr(6)]=1;
else
self._fields[k]=v;
} else if (!self._err)
self._err = new Error("Mixed set of projection options (0,1) is not valid");
} else if (!self._err)
self._err = new Error("Unsupported projection option: "+JSON.stringify(v));
})
// _id treated specially
if ((self._fieldsType===0 || self._fieldsType===null) && self._fieldsExcludeId) {
self._fieldsType = 0;
self._fields['_id']=0;
}
}n/a
_ensure = function (cb) {
var self = this;
if (self._items!=null)
return process.nextTick(cb);
self._c._find(self._query, {}, self._skip, self._limit, self._sort, self._hint, self._arFields, safe.sure_result(cb, function (
data) {
self._items = data;
self._i=0;
}))
}...
tcursor.prototype.nextObject = function (cb) {
var self = this;
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
self._ensure(safe.sure(cb, function () {
if (self._i>=self._items.length)
return cb(null, null);
self._get(self._items[self._i], cb)
self._i++;
}))
}
..._get = function (pos, cb) {
var self = this;
self._c._get(pos, false, safe.sure(cb, function (obj) {
cb(null,self._projectFields(obj))
}))
}...
var ops = {
$range : function () {
this.op = "$range";
this.dump = function () {
return this._args[0].dump() +" in range (" +this._args[1].dump()+","+this._args[2].dump()+","+this
._args[3].dump()+","+this._args[4].dump()+")";
},
this._index = function (index) {
if (index.order > 0) return index.range(this._args[1]._get(), this._args[2]._get
(), this._args[3]._get(), this._args[4]._get());
else return index.range(this._args[2]._get(), this._args[1]._get(), this._args[4]._get(), this._args[3]._get());
}
},
$lt : function () {
this.op = "$lt";
this._index = function (index) {
..._projectFields = function (obj) {
var self = this;
if (self._fieldsType!=null) {
if (self._fieldsType==0) {
applyProjectionDel(obj,self._fields)
}
else
obj = applyProjectionGet(obj, self._fields,self._fieldsExcludeId?{}:{_id:obj._id})
}
return obj;
}...
if (opts.upsert) {
$doc = $doc || query;
$doc = self._tdb._cloneDeep($doc);
updater.update($doc,true);
if (_.isUndefined($doc._id))
$doc._id = new self._tdb.ObjectID();
self._put($doc, false, safe.sure(cb, function () {
cb(null,opts.new?c._projectFields($doc):{})
}))
} else
cb();
} else {
self._get(res[0], false, safe.sure(cb, function (obj) {
var robj = (opts.new && !opts.remove) ? obj : self._tdb._cloneDeep(obj);
// remove current version of doc from indexes
...batchSize = function (v, cb) {
var self = this;
if (!_.isFinite(v)) {
self._err = new Error("batchSize requires an integer");
if (!cb) throw self._err;
}
if (self._i!=0) {
self._err = new Error('Cursor is closed');
if (!cb) throw self._err;
}
if (cb) process.nextTick(function () {cb(self._err,self)})
return this;
}n/a
close = function (cb) {
var self = this;
this._items = [];
this._i=-1;
this._err = null;
if (cb)
process.nextTick(function () {cb(self._err,self)})
return this;
}...
store[k] = { pos: wpos, sum: rec.sum };
wpos += data.length;
cb();
}));
}));
}, function (err) {
if (err) {
fs.close(fd, function () {
fs.unlink(filename, function () {
cb(err);
});
});
return;
}
if (!!process.platform.match(/^win/)) {
...count = function (applySkipLimit, cb) {
var self = this;
if (!cb) {
cb = applySkipLimit;
applySkipLimit = false;
}
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
if ((!self._skip && self._limit === null) || applySkipLimit) {
self._ensure(safe.sure(cb, function () {
cb(null, self._items.length);
}));
return;
}
if (self._count !== null) {
process.nextTick(function () {
cb(null, self._count);
});
return;
}
self._c._find(self._query, {}, 0, null, null, self._hint, self._arFields, safe.sure(cb, function (data) {
self._count = data.length;
cb(null, self._count);
}));
}...
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
console.log(count);
console.timeEnd("sample");
})
}
})
}
})
...each = function (cb) {
if (!_.isFunction(cb))
throw new Error('Callback is required');
var self = this;
if (self.isClosed())
self._err = new Error("Cursor is closed");
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
self._ensure(safe.sure(cb, function () {
safe.forEachSeries(self._i!=0?self._items.slice(self._i,self._items.length):self._items, function (pos,cb1) {
self._get(pos, safe.sure(cb, function (obj) {
cb(null,obj)
cb1();
}))
}, safe.sure(cb, function () {
self._i=self._items.length;
cb(null, null);
}))
}))
}...
tempset[this.id].push(this);
}
}
ObjectID.prototype._persist = function (v) {
var oldid = this.id;
if (oldid<0) {
_.each(tempset[oldid], function (oid) {
oid.id = v;
});
delete tempset[oldid];
}
}
ObjectID.prototype._init = function (val) {
...isClosed = function () {
if (!this._items)
return false;
return this._i==-1 || this._i>=this._items.length;
}...
}
tcursor.prototype.toArray = function (cb) {
if (!_.isFunction(cb))
throw new Error('Callback is required');
var self = this;
if (self.isClosed())
self._err = new Error("Cursor is closed");
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
...limit = function (v, cb) {
var self = this;
if (!_.isFinite(v)) {
self._err = new Error("limit requires an integer");
if (!cb) throw self._err;
}
if (self._i!=0) {
self._err = new Error('Cursor is closed');
if (!cb) throw self._err;
}
if (!self._err) {
this._limit = v==0?null:Math.abs(v);
}
if (cb)
process.nextTick(function () {cb(self._err,self)})
return this;
}...
}
var findOpts = ['limit','sort','fields','skip','hint','timeout','
;batchSize','safe','w'];
tcoll.prototype.findOne = _.rest(function (args) {
var cb = args.pop();
this.find.apply(this,args).limit(1).nextObject(cb);
});
tcoll.prototype.find = function () {
var cb = null, query = {}, opts = {}, fields = null, skip = null, limit = null, sort = null;
var argc = arguments.length;
if (argc>0) {
// guess callback, it is always latest
...nextObject = function (cb) {
var self = this;
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
self._ensure(safe.sure(cb, function () {
if (self._i>=self._items.length)
return cb(null, null);
self._get(self._items[self._i], cb)
self._i++;
}))
}...
}
var findOpts = ['limit','sort','fields','skip','hint','timeout','
;batchSize','safe','w'];
tcoll.prototype.findOne = _.rest(function (args) {
var cb = args.pop();
this.find.apply(this,args).limit(1).nextObject(cb);
});
tcoll.prototype.find = function () {
var cb = null, query = {}, opts = {}, fields = null, skip = null, limit = null, sort = null;
var argc = arguments.length;
if (argc>0) {
// guess callback, it is always latest
...rewind = function () {
this._i=0;
return this;
}n/a
setReadPreference = function (the, cb) {
var self = this;
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
return this;
}n/a
skip = function (v, cb) {
var self = this;
if (!_.isFinite(v)) {
self._err = new Error("skip requires an integer");
if (!cb) throw self._err;
}
if (self._i!=0) {
self._err = new Error('Cursor is closed');
if (!cb) throw self._err;
}
if (!self._err)
this._skip = v;
if (cb)
process.nextTick(function () {cb(self._err,self)})
return this;
}...
limit = limit || opts.limit || null;
fields = fields || opts.fields || null;
sort = sort || opts.sort || null;
var c = new tcursor(this,query, fields, opts);
if (skip) c.skip(skip);
if (limit) c.limit(limit);
if (sort) c.sort(sort);
if (cb)
cb(null, c)
else
return c;
}
...sort = function (v, d, cb) {
var self = this;
if (_.isFunction(d)) {
cb = d;
d = null;
}
if (self._i!=0)
this._err = new Error('Cursor is closed');
if (!this._err) {
this._err = _.attempt(function () {
self.sortValue = v; // just to pass contrib test
self._sort = parseSortList(v, d);
});
}
if (cb)
process.nextTick(function () {cb(self._err, self)})
return this;
}...
cb = options;
options = {};
}
cb = cb || function () {};
options = options || {};
var c = new tcursor(this,{},{},{});
c.sort(obj);
if (c._err)
return safe.back(cb,c._err);
var key = c._sort;
if (key===null)
return safe.back(cb,new Error("No fields are specified"));
...stream = function (options) {
return new CursorStream(this, options);
}n/a
toArray = function (cb) {
if (!_.isFunction(cb))
throw new Error('Callback is required');
var self = this;
if (self.isClosed())
self._err = new Error("Cursor is closed");
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
self._ensure(safe.sure(cb, function () {
safe.mapSeries(self._i!=0?self._items.slice(self._i,self._items.length):self._items, function (pos,cb) {
self._get(pos, safe.sure(cb, function (obj) {
cb(null, obj);
}));
}, safe.sure(cb, function (res) {
self._i=self._items.length;
cb(null, res);
}))
}))
}n/a
function tdb(path_, opts, gopts) {
this._gopts = gopts;
this._path = path.resolve(path_);
this._cols = Object.create ? Object.create(null) : {};
this._name = opts.name || path.basename(path_);
this._stype = gopts.memStore?"mem":"fs";
if (this._stype=="mem")
mstore[path_] = this._mstore = mstore[path_] || Object.create ? Object.create(null) : {};
// mongodb compat variables
this.openCalled = false;
}n/a
_cloneDeep = function (obj) {
var self = this;
return _.cloneDeepWith(obj, function (c) {
if (c instanceof self.ObjectID)
return new c.constructor(c.toString());
if (c instanceof self.Binary)
return new c.constructor(new Buffer(c.value(true)));
});
}...
tcache.prototype.clear = function () {
for (var i=0; i<this._cache.length; i++) {
this._cache[i]={k:null};
}
};
tcache.prototype.set = function (k,v) {
this._cache[k%this.size] = {k:k,v:this._tdb._cloneDeep(v)};
};
tcache.prototype.unset = function (k) {
var c = this._cache[k%this.size];
if (c.k==k)
this._cache[k%this.size]={k:null};
};
..._collection = function (cname, opts, create, cb) {
var err = this._nameCheck(cname);
if (!cb) {
cb = opts;
opts = {};
}
cb = cb || function () {};
if (err)
return safe.back(cb, err);
var self = this;
var c = self._cols[cname];
if (c) {
if (opts.strict && create) safe.back(cb, new Error("Collection " + cname + " already exists. Currently in safe mode."));
else safe.back(cb, null, c);
return c;
}
c = new tcoll(this);
self._cols[cname] = c;
c.init(this, cname, opts, create, function (err) {
if (err) {
delete self._cols[cname];
cb(err);
} else
cb(null, c);
});
return c;
}...
if (!c)
return safe.back(args[args.length - 1], new Error("Collection doesn't exists"));
c.createIndex.apply(c, args);
});
tdb.prototype.collection = function (cname, opts, cb) {
return this._collection(cname, opts,false, cb)
}
tdb.prototype.createCollection = function (cname, opts, cb) {
return this._collection(cname, opts,true, cb)
}
tdb.prototype._nameCheck = function (cname) {
..._nameCheck = function (cname) {
var err = null;
if (!_.isString(cname))
err = new Error("collection name must be a String");
if (!err && cname.length==0)
err = new Error("collection names cannot be empty");
if (!err && cname.indexOf("$")!=-1)
err = new Error("collection names must not contain '$'");
if (!err) {
var di = cname.indexOf(".");
if (di==0 || di==cname.length-1)
err = new Error("collection names must not start or end with '.'");
}
if (!err && cname.indexOf("..")!=-1)
err = new Error("collection names cannot be empty");
return err;
}...
tcoll.prototype.rename = function (nname, opts, cb) {
var self = this;
if (_.isFunction(opts)) {
cb = opts;
opts = {};
}
var err = self._tdb._nameCheck(nname);
if (err)
return safe.back(cb,err);
if (self._tdb._stype=="mem") {
delete self._tdb._cols[self._name];
self._tdb._cols[nname] = self;
delete self._tdb._mstore[self._name];
self._tdb._mstore[nname] = self._mstore;
...close = function (forceClose, cb) {
var self = this;
if (cb==null) cb = forceClose;
cb = cb || function () {};
// stop any further operations on current collections
safe.eachOf(self._cols, function (c, k, cb) {
c._stop(cb)
}, safe.sure(cb, function () {
// and clean list
self._cols = Object.create ? Object.create(null) : {};
this.openCalled = false;
safe.back(cb,null,this);
}))
}...
store[k] = { pos: wpos, sum: rec.sum };
wpos += data.length;
cb();
}));
}));
}, function (err) {
if (err) {
fs.close(fd, function () {
fs.unlink(filename, function () {
cb(err);
});
});
return;
}
if (!!process.platform.match(/^win/)) {
...collection = function (cname, opts, cb) {
return this._collection(cname, opts,false, cb)
}...
```javascript
var Db = require('mongodb').Db,
Server = require('mongodb').Server,
assert = require('assert');
var db = new Db('test', new Server('localhost', 27017));
var collection = db.collection("batch_document_insert_collection_safe");
collection.insert([{hello:'world_safe1'}
, {hello:'world_safe2'}], {w:1}, function(err, result) {
assert.equal(null, err);
collection.findOne({hello:'world_safe2'}, function(err, item) {
assert.equal(null, err);
assert.equal('world_safe2', item.hello);
...collectionNames = function (opts, cb) {
var self = this;
if (_.isUndefined(cb)) {
cb = opts;
opts = {};
}
if (this._stype=="mem") {
cb(null,_(self._mstore).keys().map(function (e) { return opts.namesOnly?e:{name:self._name+"."+e};}).value());
} else {
fs.readdir(self._path, safe.sure(cb,function(files) {
// some collections ca be on disk and some only in memory, we need both
files = _.union(files,_.keys(self._cols));
cb(null,_(files)
.reject(function (e) {return /^\./.test(e);}) // ignore hidden linux alike files
.map(function (e) { return opts.namesOnly?e:{name:self._name+"."+e};})
.value());
}));
}
}...
.value());
}));
}
};
tdb.prototype.collections = function (cb) {
var self = this;
self.collectionNames({namesOnly:1},safe.sure(cb, function (names) {
safe.forEach(names, function (cname, cb) {
self.collection(cname, cb);
},safe.sure(cb, function () {
cb(null, _.values(self._cols));
}));
}));
};
...collections = function (cb) {
var self = this;
self.collectionNames({namesOnly:1},safe.sure(cb, function (names) {
safe.forEach(names, function (cname, cb) {
self.collection(cname, cb);
},safe.sure(cb, function () {
cb(null, _.values(self._cols));
}));
}));
}...
cb(null,true);
}
}))
}
tdb.prototype.dropDatabase = function (cb) {
var self = this;
self.collections(safe.sure(cb, function(collections) {
safe.forEach(collections, function (c, cb) {
self.dropCollection(c.collectionName,cb);
},cb);
}));
};
tdb.prototype.compactDatabase = function (cb) {
...compactDatabase = function (cb) {
var self = this;
self.collections(safe.sure(cb, function(collections) {
safe.forEach(collections, function (c, cb) {
c.compactCollection(cb);
},cb);
}));
}n/a
createCollection = function (cname, opts, cb) {
return this._collection(cname, opts,true, cb)
}n/a
createIndex = function () {
var args = arguments,
index = -1,
length = nativeMax(args.length - start, 0),
array = Array(length);
while (++index < length) {
array[index] = args[start + index];
}
switch (start) {
case 0: return func.call(this, array);
case 1: return func.call(this, args[0], array);
case 2: return func.call(this, args[0], args[1], array);
}
var otherArgs = Array(start + 1);
index = -1;
while (++index < start) {
otherArgs[index] = args[index];
}
otherArgs[start] = array;
return apply(func, this, otherArgs);
}n/a
dropCollection = function (cname, cb) {
var self = this;
var c = this._cols[cname];
if (!c) {
var err = new Error("ns not found");
if (cb) return safe.back(cb, err)
else throw new err;
}
c._stop(safe.sure(cb, function (ondisk) {
delete self._cols[cname];
if (ondisk)
fs.unlink(path.join(self._path,cname),safe.sure(cb, function () {
cb(null, true)
}))
else {
if (self._stype=="mem")
delete self._mstore[cname];
cb(null,true);
}
}))
}...
}));
}
});
}));
};
tcoll.prototype.drop = function (cb) {
this._tdb.dropCollection(this._name,cb);
};
tcoll.prototype.rename = function (nname, opts, cb) {
var self = this;
if (_.isFunction(opts)) {
cb = opts;
opts = {};
...dropDatabase = function (cb) {
var self = this;
self.collections(safe.sure(cb, function(collections) {
safe.forEach(collections, function (c, cb) {
self.dropCollection(c.collectionName,cb);
},cb);
}));
}n/a
open = function (options, cb) {
// actually do nothing for now, we are inproc
// so nothing to open/close... collection will keep going on their own
if (cb==null) cb = options;
cb = cb || function () {};
this.openCalled = true;
safe.back(cb,null,this)
}...
```javascript
var engine = require('./engine');
var db = engine.getDB();
console.time("sample")
db.open(function(err,db) {
db.collection("homes", function (err, homes) {
// it's fine to create ObjectID in advance
// NOTE!!! we get class through engine because its type
// can depends on database type
var homeId = new engine.ObjectID();
// but with TingoDB.ObjectID righ here it will be negative
// which means temporary. However it's unique and can be used for
...renameCollection = function (on, nn, opts, cb) {
if (cb==null) {
cb = opts;
opts = {};
}
cb = cb || safe.noop;
var old = this._cols[on];
if (old)
old.rename(nn, {}, cb)
else
safe.back(cb);
}n/a
function tindex(key, tcoll, options, name) {
this.options = options || {};
this.name = name || key+'_';
this._unique = this.options.unique || false;
this._c = tcoll;
this._nuls = Object.create ? Object.create(null) : {};
this._array = this.options._tiarr || false;
this.key = key[0][0];
this.order = key[0][1];
if (key.length > 1) this._sub = key.slice(1);
this._bp = BPlusTree.create({ sort: this.order, order: 100 });
var getter = new tcoll._tdb.Finder.field(this.key);
this._get = new Function("obj", "return " + (this._array ? getter.native3() : getter.native()));
}n/a
_del = function (k, v, o) {
if (this._sub) {
var s = (_.isNull(k) || _.isUndefined(k)) ? this._nuls[v] : this._bp.get(k);
if (s) s.del(o, v);
return;
}
delete this._nuls[v];
if (this._unique) {
this._bp.del(k);
}
else {
var l = this._bp.get(k);
if (l) {
var i = l.indexOf(v);
if (i != -1)
l.splice(i, 1);
if (l.length===0)
this._bp.del(k);
}
}
}...
};
tindex.prototype.del = function (k_,v) {
var self = this;
var k = this._get(k_);
if (_.isArray(k)) {
_.each(k, function (k1) {
self._del(k1, v, k_);
});
}
else
return this._del(k, v, k_);
};
tindex.prototype._del = function (k, v, o) {
..._set = function (k, v, o) {
if (this._sub) {
var s = (_.isNull(k) || _.isUndefined(k)) ? this._nuls[v] : this._bp.get(k);
if (!s) {
s = new tindex(this._sub, this._c, this.options, this.name + '_' + k);
if (_.isNull(k) || _.isUndefined(k)) this._nuls[v] = s;
else this._bp.set(k, s);
}
s.set(o, v);
return;
}
if (_.isNull(k) || _.isUndefined(k)) {
this._nuls[v]=v;
return;
}
if (this._unique)
return this._bp.set(k,v);
else {
var l = this._bp.get(k);
var n = l || [];
n.push(v);
if (!l) this._bp.set(k,n);
}
}...
if (check) {
if (!this._sub && this._unique && this._bp.get(k) !== null)
throw new Error("duplicate key error index");
} else {
if (_.isArray(k)) {
_.each(k, function (k1) {
self._set(k1, v, k_);
});
}
else
return this._set(k, v, k_);
}
};
...all = function (order, shallow) {
var a = this._bp.all();
var n = _.values(this._nuls);
var r = this.order > 0 ? _.union(n, a) : _.union(a, n);
if (order && order.length > 0) {
if (order[0] != this.order) r = r.reverse();
order = order.slice(1);
}
if (this._sub) return shallow ? r : _(r).map(function (i) { return i.all(order); }).flattenDeep().value();
return this._unique?r:_.flatten(r);
}...
return s;
}
},
$ne : function () {
this.op = "$ne";
this._index = function (index) {
var m = index.match(this._args[1]._get());
var a = index.all();
return _.difference(a,m);
},
this._ex = function (f) {
if (this.fields()[f]!=null)
return 1;
else
return -1;
...clear = function () {
if (this.count())
this._bp = BPlusTree.create({ sort: this.order, order: 100 });
}...
// var BPlusTree = require('./bplustree');
function tcache (tdb, size) {
this._tdb = tdb;
this.size = size || 1000;
this._cache = [];
this._cache.length = this.size;
this.clear();
}
tcache.prototype.clear = function () {
for (var i=0; i<this._cache.length; i++) {
this._cache[i]={k:null};
}
};
...count = function () {
var c = 0;
this._bp.each(function (k,v) {
c += this._sub ? v.count() : v.length;
});
return c;
}...
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
console.log(count);
console.timeEnd("sample");
})
}
})
}
})
...del = function (k_, v) {
var self = this;
var k = this._get(k_);
if (_.isArray(k)) {
_.each(k, function (k1) {
self._del(k1, v, k_);
});
}
else
return this._del(k, v, k_);
}...
}
// update index
_.each(self._idx,function(v,k) {
if (!remove)
v.set(item,key._id);
else
v.del(item,key._id);
})
cb(null);
}, true, cb);
}
tcoll.prototype._putFS = function (item, remove, cb) {
...depth = function () {
return this._sub ? this._sub.length + 1 : 1;
}...
// compare each potential index with each other
_.each(pi,function (v1, i1) {
_.each(pi,function (v2, i2) {
if (i1 == i2) return;
// compare the set of possible keys for both indexes
if (_.union(v1.k, v2.k).length == v1.k.length) {
// key for v2 is a subset of key for v1, check equality
if (v1.k.length == v2.k.length && v1.i.depth() > v2.i.depth()) {
// keys are equal, but the depth of v2 is lower;
// v2 is preferable, strike out v1
pi.splice(i1, 1);
} else {
// in other two cases v1 is preferable, strike out v2
pi.splice(i2, 1);
}
...fields = function () {
var result = [ [ this.key, this.order ] ];
if (this._sub) result = result.concat(this._sub);
return result;
}...
$lt : function () {
this.op = "$lt";
this._index = function (index) {
if (index.order > 0) return index.range(null, this._args[1]._get(), false, true);
else return index.range(this._args[1]._get(), null, true, false);
};
this._ex = function (f) {
if (this.fields()[f]!=null)
return 1;
else
return -1;
}
this.dump = function () {
return this._args[0].dump() +" < " +this._args[1].dump()
}
...inspect = function (depth) {
return '[Index ' + this.name + ']';
}n/a
match = function (k) {
var m = this._bp.get(k);
if (!m) return [];
return this._unique || this._sub ? [ m ] : m;
}...
console.log(room[0]);
i--;
if (i==0) {
// now lets assume we serving request like
// /rooms?homeid=_some_string_
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
console.log(count);
console.timeEnd("sample");
...nuls = function () {
return _.values(this._nuls);
}...
},
$exists : function () {
this.op = "$exists";
this._index = function (index) {
if (this._args[1]._get())
return index.values();
else
return index.nuls();
},
this._ex = function (f) {
if (this.fields()[f]!=null)
return 1;
else
return -1;
}
...range = function (s, e, si, ei) {
var r = this._bp.rangeSync(s,e,si,ei);
return this._unique || this._sub ? r : _.flatten(r);
}...
var ops = {
$range : function () {
this.op = "$range";
this.dump = function () {
return this._args[0].dump() +" in range (" +this._args[1].dump()+","+this._args[2].dump()+","+this
._args[3].dump()+","+this._args[4].dump()+")";
},
this._index = function (index) {
if (index.order > 0) return index.range(this._args[1]._get(), this._args[2]._get
(), this._args[3]._get(), this._args[4]._get());
else return index.range(this._args[2]._get(), this._args[1]._get(), this._args[4]._get(), this._args[3]._get());
}
},
$lt : function () {
this.op = "$lt";
this._index = function (index) {
...set = function (k_, v, check) {
var self = this;
var k = this._get(k_);
if (check) {
if (!this._sub && this._unique && this._bp.get(k) !== null)
throw new Error("duplicate key error index");
} else {
if (_.isArray(k)) {
_.each(k, function (k1) {
self._set(k1, v, k_);
});
}
else
return this._set(k, v, k_);
}
}...
}
this._tq = new wqueue(100, function (cb) {
// update indexes
safe.eachOfSeries(self._store, function (rec, k, cb) {
self._get(rec.pos, false, safe.sure(cb, function (obj) {
var id = simplifyKey(obj._id);
_.each(self._idx,function(v, k) {
v.set(obj, id);
});
cb();
}));
}, cb);
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
};
...values = function () {
var r = this._bp.all();
return this._unique || this._sub ? r : _.flatten(r);
}...
return s;
}
},
$exists : function () {
this.op = "$exists";
this._index = function (index) {
if (this._args[1]._get())
return index.values();
else
return index.nuls();
},
this._ex = function (f) {
if (this.fields()[f]!=null)
return 1;
else
...function CursorStream(cursor, options) {
if(!(this instanceof CursorStream)) return new CursorStream(cursor);
options = options ? options : {};
Stream.call(this);
this.readable = true;
this.paused = false;
this._cursor = cursor;
this._destroyed = null;
this.options = options;
// give time to hook up events
var self = this;
process.nextTick(function() {
self._init();
});
}n/a
_init = function () {
if (this._destroyed) return;
this._next();
}...
var inproc_id = -1;
var tempset = {};
function ObjectID(val) {
// every new instance will have temporary inproc unique value
// minus sign will let know to db layer that value is temporary
// and need to be replaced
this.id = --inproc_id;
this._init(val);
// we need to keep track all temporary instances with goal
// to update them at later tima
if (this.id<0) {
if (!tempset[this.id])
tempset[this.id]=[this];
else
tempset[this.id].push(this);
..._next = function () {
if(this.paused || this._destroyed) return;
var self = this;
// Get the next object
processor(function() {
if(self.paused || self._destroyed) return;
self._cursor.nextObject(function (err, doc) {
self._onNextObject(err, doc);
});
});
}...
/**
* Initialize the cursor.
* @ignore
* @api private
*/
CursorStream.prototype._init = function () {
if (this._destroyed) return;
this._next();
}
/**
* Pull the next document from the cursor.
* @ignore
* @api private
*/
..._onNextObject = function (err, doc) {
if(err) return this.destroy(err);
// when doc is null we hit the end of the cursor
//if(!doc && (this._cursor.state == 1 || this._cursor.state == 2)) {
if(!doc) {
this.emit('end')
return this.destroy();
} else if(doc) {
var data = _.isFunction(this.options.transform) ? this.options.transform(doc) : doc;
this.emit('data', data);
this._next();
}
}...
var self = this;
// Get the next object
processor(function() {
if(self.paused || self._destroyed) return;
self._cursor.nextObject(function (err, doc) {
self._onNextObject(err, doc);
});
});
}
/**
* Handle each document as its returned from the cursor.
* @ignore
...destroy = function (err) {
if (this._destroyed) return;
this._destroyed = true;
this.readable = false;
this._cursor.close();
if(err) {
this.emit('error', err);
}
this.emit('close');
}...
/**
* Handle each document as its returned from the cursor.
* @ignore
* @api private
*/
CursorStream.prototype._onNextObject = function (err, doc) {
if(err) return this.destroy(err);
// when doc is null we hit the end of the cursor
//if(!doc && (this._cursor.state == 1 || this._cursor.state == 2)) {
if(!doc) {
this.emit('end')
return this.destroy();
} else if(doc) {
...pause = function () {
this.paused = true;
}n/a
resume = function () {
var self = this;
// Don't do anything if we are not paused
if(!this.paused) return;
//if(!this._cursor.state == 3) return;
process.nextTick(function() {
self.paused = false;
// Only trigger more fetching if the cursor is open
self._next();
})
}n/a
intersectIndexes = function (indexes, base) {
// do intersection of indexes using hashes
var ops = [], i = 0;
// convert to hashes
for (i=0; i<indexes.length; i++) {
var ids = {};
_.each(indexes[i], function (id) {
ids[id]=id;
})
ops.push(ids);
}
// find minimal one
if (_.isUndefined(base)) {
base = 0;
for (i=0; i<ops.length; i++) {
if (ops[i].length<ops[base].length)
base = i;
}
}
// iterate over it
var m = [];
_.each(indexes[base], function (id) {
var match = true;
for (var i=0; i<ops.length; i++) {
if (i==base) continue;
if (!ops[i][id]) {
match = false;
break;
}
}
if (match)
m.push(id);
})
return m;
}...
// so the result is sorted, but in reverse direction
p.reverse();
sort = null;
}
}
} else {
// TODO: use sort index as intersect base to speedup sorting
p = tutils.intersectIndexes(p);
}
// nowe we have ids, need to convert them to positions
range = _.map(p,function (_id) {
return self._store[_id].pos;
})
} else {
if (si) {
...function wqueue(limit, first) {
this.limit = limit || 100;
this._rc = 0;
this._q = [];
this._blocked = false;
this._stoped = false;
this._tc = -1;
this.first = first || function (cb) {cb()};
}n/a
_exec = function (task, block, cb) {
var self = this;
self._blocked = block;
self._tc++;
if (self._tc==0) {
self._blocked = true;
self.first(function (err) {
if (err) {
// restore to initial state on error
self._blocked = false;
self._tc--;
return cb(err);
}
self._exec(task,block,cb)
})
} else {
self._rc++;
task(function () {
cb.apply(this,arguments)
self._rc--;
if (self._rc==0)
self._blocked = false;
self._ping();
})
}
}...
self.first(function (err) {
if (err) {
// restore to initial state on error
self._blocked = false;
self._tc--;
return cb(err);
}
self._exec(task,block,cb)
})
} else {
self._rc++;
task(function () {
cb.apply(this,arguments)
self._rc--;
if (self._rc==0)
..._ping = function () {
var self = this;
process.nextTick(function () {
while (self._q.length>0 && self._rc<self.limit && !self._blocked && (!self._q[0].block || self._rc==0) ) {
var t = self._q.splice(0,1)[0];
if (self._stoped)
t.cb(new Error("Database is closed"));
else {
self._exec(t.task, t.block, t.cb)
}
}
})
}...
this._stoped = false;
this._tc = -1;
this.first = first || function (cb) {cb()};
}
wqueue.prototype.add = function(task,block,cb) {
this._q.push({task:task,block:block, cb:cb});
this._ping();
}
wqueue.prototype._exec = function (task,block,cb) {
var self = this;
self._blocked = block;
self._tc++;
if (self._tc==0) {
...add = function (task, block, cb) {
this._q.push({task:task,block:block, cb:cb});
this._ping();
}...
});
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
};
tcoll.prototype.compactCollection = function (cb) {
var self = this;
self._tq.add(function (cb) {
self._compact(safe.sure(cb, function () {
self._cache.clear();
self._refreshIndexes(cb);
}));
}, true, cb);
};
...