tingodb = function (opts) { opts = opts || {}; var db = require('./tdb.js'); var ObjectID = opts.nativeObjectID ? (require("bson").ObjectID || require("mongodb").ObjectID) : require("./ObjectId"); if (opts.nativeObjectID) { ObjectID.prototype.valueOf = function () { return this.toString(); } } function gdb (path,optsLocal) { gdb.superclass.constructor.call(this, path, optsLocal, opts); this.ObjectID = ObjectID; this.Code = require('./tcode.js').Code; this.Binary = require('./tbinary.js').Binary; this.Finder = require("./finder")(this); } var F = function() { } F.prototype = db.prototype gdb.prototype = new F() gdb.prototype.constructor = gdb gdb.superclass = db.prototype return { Db:gdb, Collection:require('./tcoll.js'), Code:require('./tcode.js').Code, Binary:require('./tbinary.js').Binary, ObjectID:ObjectID } }
n/a
function ObjectID(val) { // every new instance will have temporary inproc unique value // minus sign will let know to db layer that value is temporary // and need to be replaced this.id = --inproc_id; this._init(val); // we need to keep track all temporary instances with goal // to update them at later tima if (this.id<0) { if (!tempset[this.id]) tempset[this.id]=[this]; else tempset[this.id].push(this); } }
n/a
function tdb() {}
n/a
function tcache(tdb, size) { this._tdb = tdb; this.size = size || 1000; this._cache = []; this._cache.length = this.size; this.clear(); }
n/a
function tcoll(tdb) { this._tdb = null; this._name = null; this._store = Object.create ? Object.create(null) : {}; this._fd = null; this._fsize = null; this._id = 1; this._wq = new wqueue(); this._tq = null; this._idx = Object.create ? Object.create(null) : {}; this._cache = null; // this._mc = {}; this._check1 = Math.random()*100+1; // native mongo db compatibility attrs this.collectionName = null; if (tdb._stype=="mem") { this.init = this.initM; this._put = this._putM; this._get = this._getM; } else { this.init = this.initFS; this._put = this._putFS; this._get = this._getFS; } }
n/a
function tcursor(tcoll, query, fields, opts) { var self = this; this.INIT = 0; this.OPEN = 1; this.CLOSED = 2; this.GET_MORE = 3; this._query = query; this._c = tcoll; this._i = 0; this._skip = 0; this._limit = null; this._count = null; this._items = null; this._sort = null; this._hint = opts.hint; this._arFields = Object.create ? Object.create(null) : {}; this._fieldsType = null; this._fieldsExcludeId = false; this._fields = Object.create ? Object.create(null) : {}; this.timeout = _.isUndefined(opts.timeout)?true:opts.timeout; _.each(fields, function (v,k) { if (!k && _.isString(v)) { k=v; v = 1; } if (v == 0 || v==1) { // _id treated specially if (k=="_id" && v==0) { self._fieldsExcludeId = true; return; } if (self._fieldsType==null) self._fieldsType = v; if (self._fieldsType==v) { if (k.indexOf("_tiar.")==0) self._arFields[k.substr(6)]=1; else self._fields[k]=v; } else if (!self._err) self._err = new Error("Mixed set of projection options (0,1) is not valid"); } else if (!self._err) self._err = new Error("Unsupported projection option: "+JSON.stringify(v)); }) // _id treated specially if ((self._fieldsType===0 || self._fieldsType===null) && self._fieldsExcludeId) { self._fieldsType = 0; self._fields['_id']=0; } }
n/a
function tdb(path_, opts, gopts) { this._gopts = gopts; this._path = path.resolve(path_); this._cols = Object.create ? Object.create(null) : {}; this._name = opts.name || path.basename(path_); this._stype = gopts.memStore?"mem":"fs"; if (this._stype=="mem") mstore[path_] = this._mstore = mstore[path_] || Object.create ? Object.create(null) : {}; // mongodb compat variables this.openCalled = false; }
n/a
function tindex(key, tcoll, options, name) { this.options = options || {}; this.name = name || key+'_'; this._unique = this.options.unique || false; this._c = tcoll; this._nuls = Object.create ? Object.create(null) : {}; this._array = this.options._tiarr || false; this.key = key[0][0]; this.order = key[0][1]; if (key.length > 1) this._sub = key.slice(1); this._bp = BPlusTree.create({ sort: this.order, order: 100 }); var getter = new tcoll._tdb.Finder.field(this.key); this._get = new Function("obj", "return " + (this._array ? getter.native3() : getter.native())); }
n/a
function CursorStream(cursor, options) { if(!(this instanceof CursorStream)) return new CursorStream(cursor); options = options ? options : {}; Stream.call(this); this.readable = true; this.paused = false; this._cursor = cursor; this._destroyed = null; this.options = options; // give time to hook up events var self = this; process.nextTick(function() { self._init(); }); }
n/a
function wqueue(limit, first) { this.limit = limit || 100; this._rc = 0; this._q = []; this._blocked = false; this._stoped = false; this._tc = -1; this.first = first || function (cb) {cb()}; }
n/a
function ObjectID(val) { // every new instance will have temporary inproc unique value // minus sign will let know to db layer that value is temporary // and need to be replaced this.id = --inproc_id; this._init(val); // we need to keep track all temporary instances with goal // to update them at later tima if (this.id<0) { if (!tempset[this.id]) tempset[this.id]=[this]; else tempset[this.id].push(this); } }
n/a
createFromHexString = function (str) { return new ObjectID(str); }
n/a
createFromString = function (str) { return new ObjectID(str); }
n/a
_init = function (val) { if (_.isNumber(val)) this.id = val; else if (val instanceof ObjectID) this.id = val.id; else if (_.isString(val)) { if (/^-?\d+$/.test(val)) this.id = parseInt(val) else this.id = NaN; } if (val && isNaN(this.id)) throw new Error("ObjectId should be ObjectId (whatever it is designed to be) "+val) }
...
var inproc_id = -1;
var tempset = {};
function ObjectID(val) {
// every new instance will have temporary inproc unique value
// minus sign will let know to db layer that value is temporary
// and need to be replaced
this.id = --inproc_id;
this._init(val);
// we need to keep track all temporary instances with goal
// to update them at later tima
if (this.id<0) {
if (!tempset[this.id])
tempset[this.id]=[this];
else
tempset[this.id].push(this);
...
_persist = function (v) { var oldid = this.id; if (oldid<0) { _.each(tempset[oldid], function (oid) { oid.id = v; }); delete tempset[oldid]; } }
...
throw new Error("key "+k+" must not start with '$'");
if (k.indexOf('.')!=-1)
throw new Error("key "+k+" must not contain '.'");
}
if (_.isObject(v)) {
if (v instanceof self._tdb.ObjectID) {
if (v.id<0) {
v._persist(++self._id)
}
}
else
self._ensureIds(v)
}
})
return obj;
...
equals = function (val) { if (val instanceof ObjectID) return this.id==val.id; else { var temp = new ObjectID(val); return this.id==temp.id; } }
n/a
inspect = function () { return this.id.toString(); }
n/a
toHexString = function () { var l = this.id.toString(); var zeros = "000000000000000000000000"; return zeros.substr(0,zeros.length - l.length)+l; }
n/a
toJSON = function () { return this.id; }
...
tcoll.prototype._wrapTypes = function(obj) {
var self = this;
_.each(obj, function (v,k) {
if (_.isDate(v))
obj[k] = {$wrap:"$date",v:v.valueOf(),h:v}
else if (v instanceof self._tdb.ObjectID)
obj[k] = {$wrap:"$oid",v:v.toJSON()}
else if (v instanceof self._tdb.Binary)
obj[k] = {$wrap: "$bin", v: v.toJSON()};
else if (_.isObject(v))
self._wrapTypes(v)
})
return obj;
...
toString = function () { return this.id.toString(); }
...
// it's ok also to not provide id, then it will be generated
rooms.insert({name:"room_"+i,_idHome:homeId}, function (err, room) {
console.log(room[0]);
i--;
if (i==0) {
// now lets assume we serving request like
// /rooms?homeid=_some_string_
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
...
valueOf = function () { return this.id; }
...
var s = "(";
var ps = "obj";
for (var i=0;i<path.length;i++) {
ps+="['"+path[i]+"']";
if (i!=path.length-1)
s+=ps+ " && ";
else
s+= "("+ps+" && "+ps+".valueOf()"+") )
x22;;
}
return s;
}
this.native3 = function () {
function steep(ps, path, i) {
ps+="['"+path[i]+"']";
if (path.length==i)
...
function tdb() {}
n/a
init = function (path, options, cb) { this._path = path; cb(null); }
...
if (c) {
if (opts.strict && create) safe.back(cb, new Error("Collection " + cname + " already exists. Currently
in safe mode."));
else safe.back(cb, null, c);
return c;
}
c = new tcoll(this);
self._cols[cname] = c;
c.init(this, cname, opts, create, function (err) {
if (err) {
delete self._cols[cname];
cb(err);
} else
cb(null, c);
});
return c;
...
function Binary(buffer, subType) { if(!(this instanceof Binary)) return new Binary(buffer, subType); this._bsontype = 'Binary'; if(buffer instanceof Number) { this.sub_type = buffer; this.position = 0; } else { this.sub_type = subType == null ? BSON_BINARY_SUBTYPE_DEFAULT : subType; this.position = 0; } if(buffer != null && !(buffer instanceof Number)) { // Only accept Buffer, Uint8Array or Arrays if(_.isString(buffer)) { // Different ways of writing the length of the string for the different types if(typeof Buffer !== 'undefined') { this.buffer = new Buffer(buffer); } else if(typeof Uint8Array !== 'undefined' || _.isArray(buffer)) { this.buffer = writeStringToArray(buffer); } else { throw new Error("only String, Buffer, Uint8Array or Array accepted"); } } else { this.buffer = buffer; } this.position = buffer.length; } else { if(typeof Buffer !== 'undefined') { this.buffer = new Buffer(Binary.BUFFER_SIZE); } else if(typeof Uint8Array !== 'undefined'){ this.buffer = new Uint8Array(new ArrayBuffer(Binary.BUFFER_SIZE)); } else { this.buffer = new Array(Binary.BUFFER_SIZE); } // Set position to start of buffer this.position = 0; } }
...
switch (v.$wrap) {
case "$date": obj[k] = new Date(v.v); break;
case "$oid":
var oid = new self._tdb.ObjectID(v.v);
obj[k]=oid;
break;
case "$bin":
var bin = new self._tdb.Binary(new Buffer(v.v, 'base64'));
obj[k] = bin;
break;
default: self._unwrapTypes(v);
}
}
})
return obj;
...
function tcache(tdb, size) { this._tdb = tdb; this.size = size || 1000; this._cache = []; this._cache.length = this.size; this.clear(); }
n/a
clear = function () { for (var i=0; i<this._cache.length; i++) { this._cache[i]={k:null}; } }
...
// var BPlusTree = require('./bplustree');
function tcache (tdb, size) {
this._tdb = tdb;
this.size = size || 1000;
this._cache = [];
this._cache.length = this.size;
this.clear();
}
tcache.prototype.clear = function () {
for (var i=0; i<this._cache.length; i++) {
this._cache[i]={k:null};
}
};
...
get = function (k, unsafe) { var c = this._cache[k%this.size]; return c.k == k ? (unsafe?c.v:this._tdb._cloneDeep(c.v)) : null; }
...
tcoll.prototype._getM = function (pos, unsafe, cb) {
safe.back(cb,null,unsafe?this._mstore[pos-1]:this._tdb._cloneDeep(this._mstore[pos-1]));
};
tcoll.prototype._getFS = function (pos, unsafe, cb) {
var self = this;
var cached = self._cache.get(pos,unsafe);
if (cached)
return safe.back(cb,null,cached);
var b1 = new Buffer(45);
fs.read(self._fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) {
var h1 = JSON.parse(data.toString());
h1.o = parseInt(h1.o,10);
h1.k = parseInt(h1.k,10);
...
set = function (k, v) { this._cache[k%this.size] = {k:k,v:this._tdb._cloneDeep(v)}; }
...
}
this._tq = new wqueue(100, function (cb) {
// update indexes
safe.eachOfSeries(self._store, function (rec, k, cb) {
self._get(rec.pos, false, safe.sure(cb, function (obj) {
var id = simplifyKey(obj._id);
_.each(self._idx,function(v, k) {
v.set(obj, id);
});
cb();
}));
}, cb);
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
};
...
unset = function (k) { var c = this._cache[k%this.size]; if (c.k==k) this._cache[k%this.size]={k:null}; }
...
fs.write(self._fd, buf, 0, buf.length, self._fsize, safe.sure(cb, function (written) {
if (remove)
delete self._store[key._id];
else
self._store[key._id] = { pos: self._fsize, sum: key._s };
if (remove || sobj.length > self._cmaxobj)
self._cache.unset(self._fsize)
else
self._cache.set(self._fsize,item);
self._fsize+=written;
// randomly check for non exclusive file usage
// which is growth of file that we are nor aware
// randomly to avoid overhead
if (self._check1==0) {
...
function Code(code, scope) { if(!(this instanceof Code)) return new Code(code, scope); this._bsontype = 'Code'; this.code = code; this.scope = scope == null ? {} : scope; }
n/a
function tcoll(tdb) { this._tdb = null; this._name = null; this._store = Object.create ? Object.create(null) : {}; this._fd = null; this._fsize = null; this._id = 1; this._wq = new wqueue(); this._tq = null; this._idx = Object.create ? Object.create(null) : {}; this._cache = null; // this._mc = {}; this._check1 = Math.random()*100+1; // native mongo db compatibility attrs this.collectionName = null; if (tdb._stype=="mem") { this.init = this.initM; this._put = this._putM; this._get = this._getM; } else { this.init = this.initFS; this._put = this._putFS; this._get = this._getFS; } }
n/a
__find = function (query, fields, skip, limit, sort, hint, arFields, cb) { var self = this; var range; // find sort index var si = this._bestSortIndex(sort); // for non empty query check indexes that we can use var qt = self._tdb.Finder.matcher(query); var pi = []; if (_.size(qt)>0) { _.each(self._idx,function (i) { var f = _.map(i.fields(), 0); var e = _.takeWhile(f, function (k) { return qt._ex(k) == 1 && (!hint || hint[k]); }); if (e.length > 0) pi.push({ i: i, k: e, e: f.length > e.length }); }); } // if possible indexes found split the query and process // indexes separately if (!_.isEmpty(pi)) { // choose the most appropriate indexes reduceIndexSet(pi); // split query var io = {}; _.each(pi,function (v) { _.each(v.k,function (k) { if (!io[k]) io[k] = qt.split(k); }); }); // process indexes var p = _.map(pi,function (st) { // this action applies to all indexes var r = io[st.k[0]]._index(st.i); // process subfields of compound index _.each(st.k.slice(1),function (k) { var v = io[k]; r = _(r).map(function (si) { return v._index(si); }).flatten().value(); }); // expand subindexes to plain ids if (st.e) r = _(r).map(function (si) { return si.all(); }).flatten().value(); // store result of index search return r; }); if (p.length == 1) { p = p[0]; // optimization for the case when search and sorting indexes are the same if (si && pi[0].i === si) { var sif = si.fields(); if (_.every(sort, function (v, i) { return sif[i][1] == v[1]; })) { // sort order exactly matches index order, // so the result is already sorted sort = null; } else if (_.every(sort, function (v, i) { return sif[i][1] == -v[1]; })) { // sort order is exactly opposite to index order, // so the result is sorted, but in reverse direction p.reverse(); sort = null; } } } else { // TODO: use sort index as intersect base to speedup sorting p = tutils.intersectIndexes(p); } // nowe we have ids, need to convert them to positions range = _.map(p,function (_id) { return self._store[_id].pos; }) } else { if (si) { range = _.map(si.all(_.map(sort, 1)), function (_id) { return self._store[_id].pos; }) //if (order==-1) // range.reverse(); sort = null; } else range = _(self._store).values().map('pos').value(); } if (sort && si) { var ps = {}; _.each(range, function (pos) { ps[pos] = true; }); range = []; _.each(si.all(_.map(sort, 1)), function (_id) { var pos = self._store[_id].pos; if (_.has(ps, pos)) range.push(pos); }); //if (order == -1) // range.reverse(); sort = null; } // no sort, no query then return right away if (sort==null && (_.size(qt)==0 || qt._args.length==0)) { if (skip!=0 || limit!=null) { var c = Math.min(range.length-skip,limit?limit:range.length-skip); range = range.splice(skip,c) } return safe.back(cb,null,range); } // check if we can use simple match or array match function var arrayMatch = false; if (self._tdb._gopts.searchInArray) arrayMatch = true; else { _.each(qt.fields(), function (v,k) { if (_.get(arFields, k)) { arrayMatch = true; return false; } }) } var matcher = new Function("obj", "return " + (arrayMatch ? qt.native3() : qt.native())); // create sort index if (sort) { si = new tindex(sort,self); } // now simple non-index search var res = []; var found = 0; safe.forEachSeries(range, function (pos, cb) { if (sort==null && limit && res.length>=limit) return safe.back(cb); self._get(pos, true, safe.sure(cb, function (obj) { if (matcher(obj)) { if (sort!=null || found>=skip) { if (sort==null) res.push(pos); else si.set(obj,pos); } found++; } cb() })) }, safe.sure(cb, function () { if (sort) { res = si.all(); //if (order==-1) { // res.reverse(); //} if (skip!=0 || limit!=null) { var c = Math.min(res.length-skip,limit?limit:res.length-skip); res = res.splice(skip,c) } } cb(null, ...
...
if (!_.isObject(doc))
throw new Error("document must be a valid JavaScript object");
var multi = opts.multi || false;
var updater = new Updater(doc, self._tdb);
var $doc = updater.hasAtomic()?null:doc;
this._tq.add(function (cb) {
self.__find(query, null, 0, multi ? null : 1, null, opts.hint, {}, safe.sure(cb, function
(res) {
if (res.length==0) {
if (opts.upsert) {
$doc = $doc || query;
$doc = self._tdb._cloneDeep($doc);
updater.update($doc,true);
if (_.isUndefined($doc._id))
$doc._id = new self._tdb.ObjectID();
...
_bestSortIndex = function (sort) { // no sort if (!sort) return null; // exact match if (this._idx[sort]) return this._idx[sort]; // find potential sort indexes var pi = []; _.each(this._idx,function (idx) { var fields = idx.fields(); var match = _.takeWhile(fields, function (kv, i) { return i < sort.length ? kv[0] == sort[i][0] : false; }); if (match.length == sort.length) { var score = fields.length; _.each(sort,function (kv, i) { if (kv[1] != fields[i][1]) score += 1; }); pi.push({ value: idx, score: score }); } }); if (pi.length === 0) return null; // select best index pi = pi.sort(function (l, r) { return l.score < r.score; }); return pi[0].value; }
...
} while (hit);
}
tcoll.prototype.__find = function (query, fields, skip, limit, sort, hint, arFields, cb) {
var self = this;
var range;
// find sort index
var si = this._bestSortIndex(sort);
// for non empty query check indexes that we can use
var qt = self._tdb.Finder.matcher(query);
var pi = [];
if (_.size(qt)>0) {
_.each(self._idx,function (i) {
var f = _.map(i.fields(), 0);
var e = _.takeWhile(f, function (k) {
...
_compact = function (cb) { var self = this; var filename = self._filename + '.compact'; fs.open(filename, 'w+', safe.sure(cb, function (fd) { var b1 = new Buffer(45); function get(pos, cb) { fs.read(self._fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) { var h1 = JSON.parse(data.toString()); h1.o = parseInt(h1.o, 10); h1.k = parseInt(h1.k, 10); var b2 = new Buffer(h1.k + h1.o + 3); fs.read(self._fd, b2, 0, b2.length, pos + 45, safe.sure(cb, function (bytes, data) { cb(null, Buffer.concat([ b1, b2 ])); })); })); } var wpos = 0; var store = Object.create ? Object.create(null) : {}; safe.eachOfSeries(self._store, function (rec, k, cb) { get(rec.pos, safe.sure(cb, function (data) { fs.write(fd, data, 0, data.length, wpos, safe.sure(cb, function (written) { if (written != data.length) return cb(new Error('Insufficient disk space')); store[k] = { pos: wpos, sum: rec.sum }; wpos += data.length; cb(); })); })); }, function (err) { if (err) { fs.close(fd, function () { fs.unlink(filename, function () { cb(err); }); }); return; } if (!!process.platform.match(/^win/)) { // WINDOWS: unsafe because if something fail while renaming file it will not // restore automatically fs.close(self._fd, safe.sure(cb,function() { fs.close(fd, safe.sure(cb,function() { fs.unlink(self._filename, safe.sure(cb,function () { fs.rename(filename, self._filename, safe.sure(cb, function () { fs.open(self._filename, 'a+', safe.sure(cb, function (fd) { self._fd = fd; self._fsize = wpos; self._store = store; cb(); })); })); })); })); })); } else { // safe way fs.rename(filename, self._filename, safe.sure(cb, function () { fs.close(self._fd); self._fd = fd; self._fsize = wpos; self._store = store; cb(); })); } }); })); }
...
if (!found && err)
return cb(err); // nothing read and error, just rise it
safe.run(function (cb) {
var size = _.size(self._store);
// autocompact on certain ratio or err
if (deleted > size || err) {
self._compact(function (errCompact) {
if (errCompact && err)
cb(errCompact);
else {
if (errCompact) console.log(err);
cb();
}
});
...
_ensureIds = function (obj) { var self = this; _.each(obj, function (v,k) { if (k.length >0) { if (k[0]=='$') throw new Error("key "+k+" must not start with '$'"); if (k.indexOf('.')!=-1) throw new Error("key "+k+" must not contain '.'"); } if (_.isObject(v)) { if (v instanceof self._tdb.ObjectID) { if (v.id<0) { v._persist(++self._id) } } else self._ensureIds(v) } }) return obj; }
...
if (_.isObject(v)) {
if (v instanceof self._tdb.ObjectID) {
if (v.id<0) {
v._persist(++self._id)
}
}
else
self._ensureIds(v)
}
})
return obj;
}
tcoll.prototype._unwrapTypes = function(obj) {
...
_find = function (query, fields, skip, limit, sort_, hint, arFields, cb) { var self = this; this._tq.add(function (cb) { self.__find(query, fields, skip, limit, sort_, hint, arFields, cb); }, false, cb); }
...
}
if (self._count !== null) {
process.nextTick(function () {
cb(null, self._count);
});
return;
}
self._c._find(self._query, {}, 0, null, null, self._hint, self._arFields, safe.sure(
cb, function (data) {
self._count = data.length;
cb(null, self._count);
}));
}
tcursor.prototype.setReadPreference = function (the, cb) {
var self = this;
...
_getFS = function (pos, unsafe, cb) { var self = this; var cached = self._cache.get(pos,unsafe); if (cached) return safe.back(cb,null,cached); var b1 = new Buffer(45); fs.read(self._fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) { var h1 = JSON.parse(data.toString()); h1.o = parseInt(h1.o,10); h1.k = parseInt(h1.k,10); var b2 = new Buffer(h1.o); fs.read(self._fd,b2,0,h1.o,pos+45+2+h1.k, safe.trap_sure(cb, function (bytes, data) { var obj = self._unwrapTypes(JSON.parse(data.toString())); if (bytes <= self._cmaxobj) self._cache.set(pos, obj); cb(null,obj); })) })) }
n/a
_getM = function (pos, unsafe, cb) { safe.back(cb,null,unsafe?this._mstore[pos-1]:this._tdb._cloneDeep(this._mstore[pos-1])); }
n/a
_putFS = function (item, remove, cb) { var self = this; self._wq.add(function (cb) { var err = _.attempt(function () { item = self._ensureIds(item); }); if (err) { err.errmsg = err.toString(); return cb(err); } if (_.isUndefined(item._id)) return cb(new Error("Invalid object key (_id)")); item = self._wrapTypes(item); var sobj = new Buffer(remove?"":JSON.stringify(item)); item = self._unwrapTypes(item); var key = {_id:simplifyKey(item._id),_uid:self._id,_dt:(new Date()).valueOf()}; if (remove) key._a = "del"; else { var hash = crypto.createHash('md5'); hash.update(sobj, 'utf8'); key._s = hash.digest('hex'); } var skey = new Buffer(JSON.stringify(key)); var zeros = "0000000000"; var lobj = sobj.length.toString(); var lkey = skey.length.toString(); lobj = zeros.substr(0,zeros.length - lobj.length)+lobj; lkey = zeros.substr(0,zeros.length - lkey.length)+lkey; var h1={k:lkey,o:lobj,v:"001"}; var buf = new Buffer(JSON.stringify(h1)+"\n"+skey+"\n"+sobj+"\n"); // check index update if (item && !remove) { err = _.attempt(function () { _.each(self._idx,function(v,k) { v.set(item,key._id,true); }) }); if (err) { err.errmsg = err.toString(); return cb(err); } } safe.run(function (cb) { var rec = self._store[key._id]; if (rec && rec.sum == key._s) return safe.back(cb); fs.write(self._fd, buf, 0, buf.length, self._fsize, safe.sure(cb, function (written) { if (remove) delete self._store[key._id]; else self._store[key._id] = { pos: self._fsize, sum: key._s }; if (remove || sobj.length > self._cmaxobj) self._cache.unset(self._fsize) else self._cache.set(self._fsize,item); self._fsize+=written; // randomly check for non exclusive file usage // which is growth of file that we are nor aware // randomly to avoid overhead if (self._check1==0) { this._check1 = Math.random()*100+1; fs.fstat(self._fd, safe.sure(cb, function (stat) { if (self._fsize!=stat.size) cb(new Error("File size mismatch. Are you use db/collection exclusively?")) else cb() })) } else { self._check1--; cb(); } })); }, function () { // update index _.each(self._idx,function(v,k) { if (!remove) v.set(item,key._id); else v.del(item,key._id); }) cb(null); }); }, true, cb); }
n/a
_putM = function (item_, remove, cb) { var item = this._tdb._cloneDeep(item_); var self = this; self._wq.add(function (cb) { var err = _.attempt(function () { item = self._ensureIds(item); }); if (err) { err.errmsg = err.toString(); return cb(err); } if (_.isUndefined(item._id)) return cb(new Error("Invalid object key (_id)")); var key = {_id:simplifyKey(item._id)}; // check index update if (item && !remove) { err = _.attempt(function () { _.each(self._idx,function(v,k) { v.set(item,key._id,true); }); }); if (err) { err.errmsg = err.toString(); return cb(err); } } if (remove) { self._mstore[self._store[key._id].pos-1]=null; delete self._store[key._id]; } else { if (self._store[key._id]) { self._mstore[self._store[key._id].pos-1] = item; } else { self._mstore.push(item); self._store[key._id] = {pos: self._mstore.length}; } } // update index _.each(self._idx,function(v,k) { if (!remove) v.set(item,key._id); else v.del(item,key._id); }) cb(null); }, true, cb); }
n/a
_refreshIndexes = function (cb) { var self = this; _.each(self._idx,function(v, k) { v.clear(); }); safe.eachOfSeries(self._store, function (rec, k, cb) { self._get(rec.pos, false, safe.sure(cb, function (obj) { var id = simplifyKey(obj._id); _.each(self._idx,function(v, k) { v.set(obj, id); }); cb(); })); }, cb); }
...
else {
if (errCompact) console.log(err);
cb();
}
});
} else cb();
}, function () {
self._refreshIndexes(cb);
});
});
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
};
tcoll.prototype.compactCollection = function (cb) {
...
_stop = function (cb) { var self = this; self._tq.add(function (cb) { // this will prevent any tasks processed on this instance self._tq._stoped = true; if (self._fd) { fs.close(self._fd,safe.sure(cb, function () { cb(null,true); })); } else cb(null,false); },true,cb); }
...
tdb.prototype.close = function (forceClose, cb) {
var self = this;
if (cb==null) cb = forceClose;
cb = cb || function () {};
// stop any further operations on current collections
safe.eachOf(self._cols, function (c, k, cb) {
c._stop(cb)
}, safe.sure(cb, function () {
// and clean list
self._cols = Object.create ? Object.create(null) : {};
this.openCalled = false;
safe.back(cb,null,this);
}))
}
...
_unwrapTypes = function (obj) { var self = this; _.each(obj, function (v,k) { if (_.isObject(v)) { switch (v.$wrap) { case "$date": obj[k] = new Date(v.v); break; case "$oid": var oid = new self._tdb.ObjectID(v.v); obj[k]=oid; break; case "$bin": var bin = new self._tdb.Binary(new Buffer(v.v, 'base64')); obj[k] = bin; break; default: self._unwrapTypes(v); } } }) return obj; }
...
var b1 = new Buffer(45);
fs.read(self._fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) {
var h1 = JSON.parse(data.toString());
h1.o = parseInt(h1.o,10);
h1.k = parseInt(h1.k,10);
var b2 = new Buffer(h1.o);
fs.read(self._fd,b2,0,h1.o,pos+45+2+h1.k, safe.trap_sure(cb, function (bytes, data) {
var obj = self._unwrapTypes(JSON.parse(data.toString()));
if (bytes <= self._cmaxobj)
self._cache.set(pos, obj);
cb(null,obj);
}))
}))
}
...
_wrapTypes = function (obj) { var self = this; _.each(obj, function (v,k) { if (_.isDate(v)) obj[k] = {$wrap:"$date",v:v.valueOf(),h:v} else if (v instanceof self._tdb.ObjectID) obj[k] = {$wrap:"$oid",v:v.toJSON()} else if (v instanceof self._tdb.Binary) obj[k] = {$wrap: "$bin", v: v.toJSON()}; else if (_.isObject(v)) self._wrapTypes(v) }) return obj; }
...
if (_.isDate(v))
obj[k] = {$wrap:"$date",v:v.valueOf(),h:v}
else if (v instanceof self._tdb.ObjectID)
obj[k] = {$wrap:"$oid",v:v.toJSON()}
else if (v instanceof self._tdb.Binary)
obj[k] = {$wrap: "$bin", v: v.toJSON()};
else if (_.isObject(v))
self._wrapTypes(v)
})
return obj;
}
tcoll.prototype._ensureIds = function(obj) {
var self = this;
...
compactCollection = function (cb) { var self = this; self._tq.add(function (cb) { self._compact(safe.sure(cb, function () { self._cache.clear(); self._refreshIndexes(cb); })); }, true, cb); }
...
}));
};
tdb.prototype.compactDatabase = function (cb) {
var self = this;
self.collections(safe.sure(cb, function(collections) {
safe.forEach(collections, function (c, cb) {
c.compactCollection(cb);
},cb);
}));
};
tdb.prototype.renameCollection = function (on,nn,opts,cb) {
if (cb==null) {
cb = opts;
...
count = function (query, options, cb) { var self = this; if (arguments.length == 1) { cb = arguments[0]; options = null; query = null; } if (arguments.length == 2) { query = arguments[0]; cb = arguments[1]; options = null; } if (query==null || _.size(query)==0) { this._tq.add(function (cb) { cb(null, _.size(self._store)); },false,cb); } else self.find(query, options).count(cb); }
...
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
console.log(count);
console.timeEnd("sample");
})
}
})
}
})
...
createIndex = function (obj, options, cb) { var self = this; if (_.isFunction(options)) { cb = options; options = {}; } cb = cb || function () {}; options = options || {}; var c = new tcursor(this,{},{},{}); c.sort(obj); if (c._err) return safe.back(cb,c._err); var key = c._sort; if (key===null) return safe.back(cb,new Error("No fields are specified")); var index = self._idx[key]; if (index) return safe.back(cb,null, index.name); // force array support when global option is set if (_.isUndefined(options._tiarr) && self._tdb._gopts.searchInArray) options._tiarr = true; var name = options.name || _.map(key, function (v) { return v[0] + '_' + v[1]; }).join('_'); index = new tindex(key, self, options, name); if (self._tq._tc==-1) { // if no operation is pending just register index self._idx[key] = index; safe.back(cb, null, index.name); } else { // overwise register index operation this._tq.add(function (cb) { var range = _.values(self._store); safe.forEachSeries(range, function (rec, cb) { self._get(rec.pos, false, safe.sure(cb, function (obj) { index.set(obj,simplifyKey(obj._id)); cb(); })); }, safe.sure(cb, function () { self._idx[key] = index; cb(); })); }, true, function (err) { if (err) cb(err); else cb(null, index.name); }); } }
n/a
drop = function (cb) { this._tdb.dropCollection(this._name,cb); }
...
});
// write results to collection
safe.waterfall([
function (cb) {
self._tdb.collection(opts.out.replace, { strict: 1 }, function (err, col) {
if (err) return cb(null, null);
col.drop(cb);
});
},
function (arg, cb) {
self._tdb.collection(opts.out.replace, {}, cb);
},
function (col, cb) {
var docs = _.map(m, function (value, key) {
...
ensureIndex = function (obj, options, cb) { var self = this; if (_.isFunction(options)) { cb = options; options = {}; } cb = cb || function () {}; options = options || {}; var c = new tcursor(this,{},{},{}); c.sort(obj); if (c._err) return safe.back(cb,c._err); var key = c._sort; if (key===null) return safe.back(cb,new Error("No fields are specified")); var index = self._idx[key]; if (index) return safe.back(cb,null, index.name); // force array support when global option is set if (_.isUndefined(options._tiarr) && self._tdb._gopts.searchInArray) options._tiarr = true; var name = options.name || _.map(key, function (v) { return v[0] + '_' + v[1]; }).join('_'); index = new tindex(key, self, options, name); if (self._tq._tc==-1) { // if no operation is pending just register index self._idx[key] = index; safe.back(cb, null, index.name); } else { // overwise register index operation this._tq.add(function (cb) { var range = _.values(self._store); safe.forEachSeries(range, function (rec, cb) { self._get(rec.pos, false, safe.sure(cb, function (obj) { index.set(obj,simplifyKey(obj._id)); cb(); })); }, safe.sure(cb, function () { self._idx[key] = index; cb(); })); }, true, function (err) { if (err) cb(err); else cb(null, index.name); }); } }
...
##### searchInArray: true|false Default is false
Globally enables support of search in nested arrays. MongoDB supports this unconditionally. For TingoDB, searching arrays when there
are no arrays incurs a performance penalty. That's why this is switched off by default.
Additionally, and this might be a better approach, nested array support can be enabled for individual indexes or search queries.
To enable nested arrays in individual indexed, use "_tiarr:true" option.
self._cash_transactions.ensureIndex("splits.accountId",{_tiarr:true},cb);
To enable nested arrays in individual queries for fields that do not use indexes, use "_tiarr." to prefix field names.
coll.find({'arr.num':10},{"_tiar.arr.num":0})
#### new Db(path, options)
...
find = function () { var cb = null, query = {}, opts = {}, fields = null, skip = null, limit = null, sort = null; var argc = arguments.length; if (argc>0) { // guess callback, it is always latest cb = arguments[argc-1]; if (!_.isFunction(cb)) cb=null else argc--; if (argc>0) { // query should always exist query = arguments[0] if (argc>1) { if (argc==2) { var val = arguments[1]; // worst case we get either options either fiels if (_(val).keys().intersection(findOpts).size() > 0) opts = val else fields = val; } else { fields = arguments[1]; if (argc == 3) opts = arguments[2] else { skip = arguments[2]; limit = arguments[3] } } } } } opts = opts || {}; skip = skip || opts.skip || null; limit = limit || opts.limit || null; fields = fields || opts.fields || null; sort = sort || opts.sort || null; var c = new tcursor(this,query, fields, opts); if (skip) c.skip(skip); if (limit) c.limit(limit); if (sort) c.sort(sort); if (cb) cb(null, c) else return c; }
...
To enable nested arrays in individual indexed, use "_tiarr:true" option.
self._cash_transactions.ensureIndex("splits.accountId",{_tiarr:true},cb);
To enable nested arrays in individual queries for fields that do not use indexes, use "_tiarr." to prefix field names.
coll.find({'arr.num':10},{"_tiar.arr.num":0})
#### new Db(path, options)
The only required parameter is the database path. It should be a valid path to an empty folder or a folder that already contains
collection files.
API extensions
==============
...
findAndModify = function (query, sort, doc, opts, cb) { var self = this; if (_.isFunction(opts) && cb == null) { cb = opts; opts = {}; } opts = opts || {}; doc = doc || {}; var updater = new Updater(doc, self._tdb); var $doc = updater.hasAtomic()?null:doc; var c = new tcursor(this,{}, opts.fields || {},{}); c.sort(sort); if (c._err) return safe.back(cb,c._err); this._tq.add(function (cb) { self.__find(query, null, 0, 1, c._sort, opts.hint, {}, safe.sure(cb, function (res) { if (res.length==0) { if (opts.upsert) { $doc = $doc || query; $doc = self._tdb._cloneDeep($doc); updater.update($doc,true); if (_.isUndefined($doc._id)) $doc._id = new self._tdb.ObjectID(); self._put($doc, false, safe.sure(cb, function () { cb(null,opts.new?c._projectFields($doc):{}) })) } else cb(); } else { self._get(res[0], false, safe.sure(cb, function (obj) { var robj = (opts.new && !opts.remove) ? obj : self._tdb._cloneDeep(obj); // remove current version of doc from indexes _.each(self._idx,function(v,k) { v.del(obj,simplifyKey(obj._id)); }) var udoc = $doc; if (!$doc) { udoc = obj; updater.update(udoc); } udoc._id = obj._id; // put will add it back to indexes self._put(udoc, opts.remove?true:false, safe.sure(cb,function () { cb(null,c._projectFields(robj)) })) })) } })) },true,cb); }
n/a
findAndRemove = function (query, sort, opts, cb) { var self = this; if (_.isFunction(sort) && cb == null && opts==null) { cb = sort; sort = {} opts = {}; } else if (_.isFunction(opts) && cb == null) { cb = opts; opts = {}; } opts = opts || {}; sort = sort || {}; var c = new tcursor(this,{},{},{}); // Fix for mongoouse/tungus they pass sort as undefined c.sort(sort); if (c._err) return safe.back(cb,c._err); this._tq.add(function (cb) { self.__find(query, null, 0, 1, c._sort, opts.hint, {}, safe.sure(cb, function (res) { if (res.length==0) return cb(); self._get(res[0], false, safe.sure(cb, function (obj) { self._put(obj,true,safe.sure(cb, function () { cb(null,obj); })) })) })) },true,cb); }
n/a
findOne = function () { var args = arguments, index = -1, length = nativeMax(args.length - start, 0), array = Array(length); while (++index < length) { array[index] = args[start + index]; } switch (start) { case 0: return func.call(this, array); case 1: return func.call(this, args[0], array); case 2: return func.call(this, args[0], args[1], array); } var otherArgs = Array(start + 1); index = -1; while (++index < start) { otherArgs[index] = args[index]; } otherArgs[start] = array; return apply(func, this, otherArgs); }
...
var db = new Db('test', new Server('localhost', 27017));
var collection = db.collection("batch_document_insert_collection_safe");
collection.insert([{hello:'world_safe1'}
, {hello:'world_safe2'}], {w:1}, function(err, result) {
assert.equal(null, err);
collection.findOne({hello:'world_safe2'}, function(err, item) {
assert.equal(null, err);
assert.equal('world_safe2', item.hello);
})
});
```
The same example using TingoDB is as follows:
...
group = function () { var args = arguments, index = -1, length = nativeMax(args.length - start, 0), array = Array(length); while (++index < length) { array[index] = args[start + index]; } switch (start) { case 0: return func.call(this, array); case 1: return func.call(this, args[0], array); case 2: return func.call(this, args[0], args[1], array); } var otherArgs = Array(start + 1); index = -1; while (++index < start) { otherArgs[index] = args[index]; } otherArgs[start] = array; return apply(func, this, otherArgs); }
n/a
indexExists = function (idx, cb) { if (!_.isArray(idx)) idx = [idx]; var i = _.intersection(idx,_(this._idx).values().map('name').value()); cb(null,i.length == idx.length); }
n/a
indexes = function (cb) { var self = this; this._tq.add(function (cb) { cb(null, _.values(self._idx)); },false,cb); }
n/a
initFS = function (tdb, name, options, create, cb) { var self= this; this._tdb = tdb; this._cache = new tcache(tdb, tdb._gopts.cacheSize); this._cmaxobj = tdb._gopts.cacheMaxObjSize || 1024; this.collectionName = this._name = name; this._filename = path.join(this._tdb._path, this._name); if (options.strict) { var exists = fs.existsSync(self._filename); if (exists && create) return cb(new Error("Collection " + self._name + " already exists. Currently in safe mode.")); else if (!exists && !create) return cb(new Error("Collection " + self._name + " does not exist. Currently in safe mode.")); } var pos = 0; var deleted = 0; var found = 0; this._tq = new wqueue(100, function (cb) { safe.run(function (cb) { fs.open(self._filename, "a+", safe.sure(cb, function (fd) { self._fd = fd; var b1 = new Buffer(45); safe.whilst(function () { return self._fsize === null; }, function(cb) { safe.run(function (cb) { fs.read(fd, b1, 0, 45, pos, safe.trap_sure(cb, function (bytes, data) { if (bytes===0) { self._fsize = pos; return cb(); } var h1 = JSON.parse(data.toString()); h1.o = parseInt(h1.o,10); h1.k = parseInt(h1.k,10); var b2 = new Buffer(h1.k); fs.read(fd,b2,0,h1.k,pos+45+1, safe.sure(cb, function (bytes, data) { var k = JSON.parse(data.toString()); self._id = k._uid; if (k._a=='del') { delete self._store[k._id]; deleted++; } else { if (self._store[k._id]) deleted++; self._store[k._id] = { pos: pos, sum: k._s }; } pos+=45+3+h1.o+h1.k; found++; cb(); })); })); }, function (err) { if (err) cb(new Error(self._name+": Error during load - "+err.toString())); else cb(); }); }, cb); })); }, function (err) { if (!found && err) return cb(err); // nothing read and error, just rise it safe.run(function (cb) { var size = _.size(self._store); // autocompact on certain ratio or err if (deleted > size || err) { self._compact(function (errCompact) { if (errCompact && err) cb(errCompact); else { if (errCompact) console.log(err); cb(); } }); } else cb(); }, function () { self._refreshIndexes(cb); }); }); }); self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb); }
n/a
initM = function (tdb, name, options, create, cb) { var self= this; this._tdb = tdb; tdb._mstore = tdb._mstore || Object.create ? Object.create(null) : {}; this.collectionName = this._name = name; if (options.strict) { var exists = tdb._mstore[name]; if (exists && create) return cb(new Error("Collection " + self._name + " already exists. Currently in safe mode.")); else if (!exists && !create) return cb(new Error("Collection " + self._name + " does not exist. Currently in safe mode.")); } tdb._mstore[name] = this._mstore = tdb._mstore[name] || []; for (var k=0; k< this._mstore.length; k++) { var o = this._mstore[k]; if (o) { self._store[simplifyKey(o._id)]={pos:k+1}; } } this._tq = new wqueue(100, function (cb) { // update indexes safe.eachOfSeries(self._store, function (rec, k, cb) { self._get(rec.pos, false, safe.sure(cb, function (obj) { var id = simplifyKey(obj._id); _.each(self._idx,function(v, k) { v.set(obj, id); }); cb(); })); }, cb); }); self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb); }
n/a
insert = function (docs, opts, cb ) { var self = this; if (_.isFunction(opts) && cb == null) { cb = opts; opts = {}; } opts = opts || {}; if (opts.w>0 && !_.isFunction(cb)) throw new Error("Callback is required for safe update"); cb = cb || function () {}; if (!_.isArray(docs)) docs = [docs]; this._tq.add(function (cb) { safe.forEachSeries(docs, function (doc, cb) { if (_.isUndefined(doc._id)) { doc._id = new self._tdb.ObjectID(); } self._put(doc, false, cb); }, safe.sure(cb, function () { cb(null, docs); })) }, true, cb) }
...
```javascript
var Db = require('mongodb').Db,
Server = require('mongodb').Server,
assert = require('assert');
var db = new Db('test', new Server('localhost', 27017));
var collection = db.collection("batch_document_insert_collection_safe");
collection.insert([{hello:'world_safe1'}
, {hello:'world_safe2'}], {w:1}, function(err, result) {
assert.equal(null, err);
collection.findOne({hello:'world_safe2'}, function(err, item) {
assert.equal(null, err);
assert.equal('world_safe2', item.hello);
})
...
mapReduce = function (map, reduce, opts, cb) { var self = this; if (_.isFunction(opts)) { cb = opts; opts = {}; } if (!opts.out) return safe.back(cb, new Error('the out option parameter must be defined')); if (!opts.out.inline && !opts.out.replace) { return safe.back(cb, new Error('the only supported out options are inline and replace')); } code2fn(opts.scope); var m = {}; function emit(k, v) { var values = m[k]; if (!values) m[k] = [ v ]; else { values.push(v); if (values.length > 1000) values = [ reduce(k, values) ]; } } with (opts.scope || {}) { try { if (map instanceof Code) { with (map.scope) { map = eval('(' + map.code + ')'); } } else map = eval('(' + map + ')'); if (reduce instanceof Code) { with (reduce.scope) { reduce = eval('(' + reduce.code + ')'); } } else reduce = eval('(' + reduce + ')'); if (finalize instanceof Code) { with (finalize.scope) { finalize = eval('(' + finalize.code + ')'); } } else var finalize = eval('(' + opts.finalize + ')'); } catch (e) { return safe.back(cb,e); } } self.find(opts.query, null, { limit: opts.limit, sort: opts.sort }, safe.sure(cb, function (c) { var doc; safe.doUntil( function (cb) { c.nextObject(safe.trap_sure(cb, function (_doc) { doc = _doc; if (doc) map.call(doc); return cb(); })); }, function () { return doc === null; }, safe.trap_sure(cb, function () { _.each(m,function (v, k) { v = v.length > 1 ? reduce(k, v) : v[0]; if (finalize) v = finalize(k, v); m[k] = v; }); var stats = {}; if (opts.out.inline) return process.nextTick(function () { cb(null, _.values(m), stats); // execute outside of trap }); // write results to collection safe.waterfall([ function (cb) { self._tdb.collection(opts.out.replace, { strict: 1 }, function (err, col) { if (err) return cb(null, null); col.drop(cb); }); }, function (arg, cb) { self._tdb.collection(opts.out.replace, {}, cb); }, function (col, cb) { var docs = _.map(m, function (value, key) { return { _id: key, value: value }; }); col.insert(docs, safe.sure(cb, function () { if (opts.verbose) cb(null, col, stats); else cb(null, col); })); } ], cb); } )); // doUntil })); }
n/a
remove = function (query, opts, cb) { var self = this; if (_.isFunction(query)) { cb = query; query = opts = {}; } else if (_.isFunction(opts)) { cb = opts; opts = {}; } opts = opts || {}; if (opts.w>0 && !_.isFunction(cb)) throw new Error("Callback is required"); cb = cb || function () {}; var single = opts.single || false; this._tq.add(function (cb) { self.__find(query, null, 0, single ? 1 : null, null, opts.hint, {}, safe.sure(cb, function (res) { safe.forEachSeries(res, function (pos, cb) { self._get(pos, false, safe.sure(cb, function (obj) { self._put(obj,true,cb); })) }, safe.sure(cb, function () { cb(null,res.length); })) })) },true,cb); }
n/a
rename = function (nname, opts, cb) { var self = this; if (_.isFunction(opts)) { cb = opts; opts = {}; } var err = self._tdb._nameCheck(nname); if (err) return safe.back(cb,err); if (self._tdb._stype=="mem") { delete self._tdb._cols[self._name]; self._tdb._cols[nname] = self; delete self._tdb._mstore[self._name]; self._tdb._mstore[nname] = self._mstore; safe.back(cb,null); } else { self._tq.add(function (cb) { fs.rename(path.join(self._tdb._path,self._name),path.join(self._tdb._path,nname),safe.sure(cb, function () { delete self._tdb._cols[self._name]; self._tdb._cols[nname] = self; self.collectionName = self._name = nname; cb(); })); },true,cb); } }
...
}
if (!!process.platform.match(/^win/)) {
// WINDOWS: unsafe because if something fail while renaming file it will not
// restore automatically
fs.close(self._fd, safe.sure(cb,function() {
fs.close(fd, safe.sure(cb,function() {
fs.unlink(self._filename, safe.sure(cb,function () {
fs.rename(filename, self._filename, safe.sure(cb, function () {
fs.open(self._filename, 'a+', safe.sure(cb, function (fd) {
self._fd = fd;
self._fsize = wpos;
self._store = store;
cb();
}));
}));
...
save = function (doc, opts, cb) { var self = this; cb = _.isFunction(doc)?doc:_.isFunction(opts)?opts:cb; cb = cb || function () {}; doc = doc || {}; opts = opts || {}; this._tq.add(function (cb) { var res = doc; (function(cb) { if (_.isUndefined(doc._id)) { doc._id = new self._tdb.ObjectID(); cb() } else { var id = simplifyKey(doc._id); var pos = self._store[id]; // check if document with this id already exist if (pos) { // if so we need to fetch it to update index self._get(pos.pos, false, safe.sure(cb, function (oldDoc) { // remove current version of doc from indexes _.each(self._idx,function(v,k) { v.del(oldDoc,id); }) res = 1; cb(); })) } else cb(); } })(safe.sure(cb, function () { self._put(doc, false, safe.sure(cb, function () { cb(null,res); // when update return 1 when new save return obj })) })) },true,cb); }
n/a
stats = function (cb) { var self = this; this._tq.add(function (cb) { cb(null, {count:_.size(self._store)}); },false,cb); }
n/a
update = function (query, doc, opts, cb) { var self = this; if (_.isFunction(opts) && cb == null) { cb = opts; } opts = opts || {}; if (opts.w>0 && !_.isFunction(cb)) throw new Error("Callback is required for safe update"); cb = cb || function () {} if (!_.isObject(query)) throw new Error("selector must be a valid JavaScript object"); if (!_.isObject(doc)) throw new Error("document must be a valid JavaScript object"); var multi = opts.multi || false; var updater = new Updater(doc, self._tdb); var $doc = updater.hasAtomic()?null:doc; this._tq.add(function (cb) { self.__find(query, null, 0, multi ? null : 1, null, opts.hint, {}, safe.sure(cb, function (res) { if (res.length==0) { if (opts.upsert) { $doc = $doc || query; $doc = self._tdb._cloneDeep($doc); updater.update($doc,true); if (_.isUndefined($doc._id)) $doc._id = new self._tdb.ObjectID(); self._put($doc, false, safe.sure(cb, function () { cb(null, 1,{updatedExisting:false,upserted:$doc._id,n:1}) })) } else cb(null,0); } else { safe.forEachSeries(res, function (pos, cb) { self._get(pos, false, safe.sure(cb, function (obj) { // remove current version of doc from indexes _.each(self._idx,function(v,k) { v.del(obj,simplifyKey(obj._id)); }) var udoc = $doc; if (!$doc) { udoc = obj; updater.update(udoc); } udoc._id = obj._id; // put will add it back to indexes self._put(udoc, false, cb); })) }, safe.sure(cb,function () { cb(null, res.length, {updatedExisting:true,n:res.length}); })) } })) },true,cb); }
...
item = self._wrapTypes(item);
var sobj = new Buffer(remove?"":JSON.stringify(item));
item = self._unwrapTypes(item);
var key = {_id:simplifyKey(item._id),_uid:self._id,_dt:(new Date()).valueOf()};
if (remove) key._a = "del";
else {
var hash = crypto.createHash('md5');
hash.update(sobj, 'utf8');
key._s = hash.digest('hex');
}
var skey = new Buffer(JSON.stringify(key));
var zeros = "0000000000";
var lobj = sobj.length.toString();
var lkey = skey.length.toString();
lobj = zeros.substr(0,zeros.length - lobj.length)+lobj;
...
function tcursor(tcoll, query, fields, opts) { var self = this; this.INIT = 0; this.OPEN = 1; this.CLOSED = 2; this.GET_MORE = 3; this._query = query; this._c = tcoll; this._i = 0; this._skip = 0; this._limit = null; this._count = null; this._items = null; this._sort = null; this._hint = opts.hint; this._arFields = Object.create ? Object.create(null) : {}; this._fieldsType = null; this._fieldsExcludeId = false; this._fields = Object.create ? Object.create(null) : {}; this.timeout = _.isUndefined(opts.timeout)?true:opts.timeout; _.each(fields, function (v,k) { if (!k && _.isString(v)) { k=v; v = 1; } if (v == 0 || v==1) { // _id treated specially if (k=="_id" && v==0) { self._fieldsExcludeId = true; return; } if (self._fieldsType==null) self._fieldsType = v; if (self._fieldsType==v) { if (k.indexOf("_tiar.")==0) self._arFields[k.substr(6)]=1; else self._fields[k]=v; } else if (!self._err) self._err = new Error("Mixed set of projection options (0,1) is not valid"); } else if (!self._err) self._err = new Error("Unsupported projection option: "+JSON.stringify(v)); }) // _id treated specially if ((self._fieldsType===0 || self._fieldsType===null) && self._fieldsExcludeId) { self._fieldsType = 0; self._fields['_id']=0; } }
n/a
_ensure = function (cb) { var self = this; if (self._items!=null) return process.nextTick(cb); self._c._find(self._query, {}, self._skip, self._limit, self._sort, self._hint, self._arFields, safe.sure_result(cb, function ( data) { self._items = data; self._i=0; })) }
...
tcursor.prototype.nextObject = function (cb) {
var self = this;
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
self._ensure(safe.sure(cb, function () {
if (self._i>=self._items.length)
return cb(null, null);
self._get(self._items[self._i], cb)
self._i++;
}))
}
...
_get = function (pos, cb) { var self = this; self._c._get(pos, false, safe.sure(cb, function (obj) { cb(null,self._projectFields(obj)) })) }
...
var ops = {
$range : function () {
this.op = "$range";
this.dump = function () {
return this._args[0].dump() +" in range (" +this._args[1].dump()+","+this._args[2].dump()+","+this
._args[3].dump()+","+this._args[4].dump()+")";
},
this._index = function (index) {
if (index.order > 0) return index.range(this._args[1]._get(), this._args[2]._get
(), this._args[3]._get(), this._args[4]._get());
else return index.range(this._args[2]._get(), this._args[1]._get(), this._args[4]._get(), this._args[3]._get());
}
},
$lt : function () {
this.op = "$lt";
this._index = function (index) {
...
_projectFields = function (obj) { var self = this; if (self._fieldsType!=null) { if (self._fieldsType==0) { applyProjectionDel(obj,self._fields) } else obj = applyProjectionGet(obj, self._fields,self._fieldsExcludeId?{}:{_id:obj._id}) } return obj; }
...
if (opts.upsert) {
$doc = $doc || query;
$doc = self._tdb._cloneDeep($doc);
updater.update($doc,true);
if (_.isUndefined($doc._id))
$doc._id = new self._tdb.ObjectID();
self._put($doc, false, safe.sure(cb, function () {
cb(null,opts.new?c._projectFields($doc):{})
}))
} else
cb();
} else {
self._get(res[0], false, safe.sure(cb, function (obj) {
var robj = (opts.new && !opts.remove) ? obj : self._tdb._cloneDeep(obj);
// remove current version of doc from indexes
...
batchSize = function (v, cb) { var self = this; if (!_.isFinite(v)) { self._err = new Error("batchSize requires an integer"); if (!cb) throw self._err; } if (self._i!=0) { self._err = new Error('Cursor is closed'); if (!cb) throw self._err; } if (cb) process.nextTick(function () {cb(self._err,self)}) return this; }
n/a
close = function (cb) { var self = this; this._items = []; this._i=-1; this._err = null; if (cb) process.nextTick(function () {cb(self._err,self)}) return this; }
...
store[k] = { pos: wpos, sum: rec.sum };
wpos += data.length;
cb();
}));
}));
}, function (err) {
if (err) {
fs.close(fd, function () {
fs.unlink(filename, function () {
cb(err);
});
});
return;
}
if (!!process.platform.match(/^win/)) {
...
count = function (applySkipLimit, cb) { var self = this; if (!cb) { cb = applySkipLimit; applySkipLimit = false; } if (self._err) { if (cb) process.nextTick(function () {cb(self._err)}) return; } if ((!self._skip && self._limit === null) || applySkipLimit) { self._ensure(safe.sure(cb, function () { cb(null, self._items.length); })); return; } if (self._count !== null) { process.nextTick(function () { cb(null, self._count); }); return; } self._c._find(self._query, {}, 0, null, null, self._hint, self._arFields, safe.sure(cb, function (data) { self._count = data.length; cb(null, self._count); })); }
...
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
console.log(count);
console.timeEnd("sample");
})
}
})
}
})
...
each = function (cb) { if (!_.isFunction(cb)) throw new Error('Callback is required'); var self = this; if (self.isClosed()) self._err = new Error("Cursor is closed"); if (self._err) { if (cb) process.nextTick(function () {cb(self._err)}) return; } self._ensure(safe.sure(cb, function () { safe.forEachSeries(self._i!=0?self._items.slice(self._i,self._items.length):self._items, function (pos,cb1) { self._get(pos, safe.sure(cb, function (obj) { cb(null,obj) cb1(); })) }, safe.sure(cb, function () { self._i=self._items.length; cb(null, null); })) })) }
...
tempset[this.id].push(this);
}
}
ObjectID.prototype._persist = function (v) {
var oldid = this.id;
if (oldid<0) {
_.each(tempset[oldid], function (oid) {
oid.id = v;
});
delete tempset[oldid];
}
}
ObjectID.prototype._init = function (val) {
...
isClosed = function () { if (!this._items) return false; return this._i==-1 || this._i>=this._items.length; }
...
}
tcursor.prototype.toArray = function (cb) {
if (!_.isFunction(cb))
throw new Error('Callback is required');
var self = this;
if (self.isClosed())
self._err = new Error("Cursor is closed");
if (self._err) {
if (cb) process.nextTick(function () {cb(self._err)})
return;
}
...
limit = function (v, cb) { var self = this; if (!_.isFinite(v)) { self._err = new Error("limit requires an integer"); if (!cb) throw self._err; } if (self._i!=0) { self._err = new Error('Cursor is closed'); if (!cb) throw self._err; } if (!self._err) { this._limit = v==0?null:Math.abs(v); } if (cb) process.nextTick(function () {cb(self._err,self)}) return this; }
...
}
var findOpts = ['limit','sort','fields','skip','hint','timeout','
;batchSize','safe','w'];
tcoll.prototype.findOne = _.rest(function (args) {
var cb = args.pop();
this.find.apply(this,args).limit(1).nextObject(cb);
});
tcoll.prototype.find = function () {
var cb = null, query = {}, opts = {}, fields = null, skip = null, limit = null, sort = null;
var argc = arguments.length;
if (argc>0) {
// guess callback, it is always latest
...
nextObject = function (cb) { var self = this; if (self._err) { if (cb) process.nextTick(function () {cb(self._err)}) return; } self._ensure(safe.sure(cb, function () { if (self._i>=self._items.length) return cb(null, null); self._get(self._items[self._i], cb) self._i++; })) }
...
}
var findOpts = ['limit','sort','fields','skip','hint','timeout','
;batchSize','safe','w'];
tcoll.prototype.findOne = _.rest(function (args) {
var cb = args.pop();
this.find.apply(this,args).limit(1).nextObject(cb);
});
tcoll.prototype.find = function () {
var cb = null, query = {}, opts = {}, fields = null, skip = null, limit = null, sort = null;
var argc = arguments.length;
if (argc>0) {
// guess callback, it is always latest
...
rewind = function () { this._i=0; return this; }
n/a
setReadPreference = function (the, cb) { var self = this; if (self._err) { if (cb) process.nextTick(function () {cb(self._err)}) return; } return this; }
n/a
skip = function (v, cb) { var self = this; if (!_.isFinite(v)) { self._err = new Error("skip requires an integer"); if (!cb) throw self._err; } if (self._i!=0) { self._err = new Error('Cursor is closed'); if (!cb) throw self._err; } if (!self._err) this._skip = v; if (cb) process.nextTick(function () {cb(self._err,self)}) return this; }
...
limit = limit || opts.limit || null;
fields = fields || opts.fields || null;
sort = sort || opts.sort || null;
var c = new tcursor(this,query, fields, opts);
if (skip) c.skip(skip);
if (limit) c.limit(limit);
if (sort) c.sort(sort);
if (cb)
cb(null, c)
else
return c;
}
...
sort = function (v, d, cb) { var self = this; if (_.isFunction(d)) { cb = d; d = null; } if (self._i!=0) this._err = new Error('Cursor is closed'); if (!this._err) { this._err = _.attempt(function () { self.sortValue = v; // just to pass contrib test self._sort = parseSortList(v, d); }); } if (cb) process.nextTick(function () {cb(self._err, self)}) return this; }
...
cb = options;
options = {};
}
cb = cb || function () {};
options = options || {};
var c = new tcursor(this,{},{},{});
c.sort(obj);
if (c._err)
return safe.back(cb,c._err);
var key = c._sort;
if (key===null)
return safe.back(cb,new Error("No fields are specified"));
...
stream = function (options) { return new CursorStream(this, options); }
n/a
toArray = function (cb) { if (!_.isFunction(cb)) throw new Error('Callback is required'); var self = this; if (self.isClosed()) self._err = new Error("Cursor is closed"); if (self._err) { if (cb) process.nextTick(function () {cb(self._err)}) return; } self._ensure(safe.sure(cb, function () { safe.mapSeries(self._i!=0?self._items.slice(self._i,self._items.length):self._items, function (pos,cb) { self._get(pos, safe.sure(cb, function (obj) { cb(null, obj); })); }, safe.sure(cb, function (res) { self._i=self._items.length; cb(null, res); })) })) }
n/a
function tdb(path_, opts, gopts) { this._gopts = gopts; this._path = path.resolve(path_); this._cols = Object.create ? Object.create(null) : {}; this._name = opts.name || path.basename(path_); this._stype = gopts.memStore?"mem":"fs"; if (this._stype=="mem") mstore[path_] = this._mstore = mstore[path_] || Object.create ? Object.create(null) : {}; // mongodb compat variables this.openCalled = false; }
n/a
_cloneDeep = function (obj) { var self = this; return _.cloneDeepWith(obj, function (c) { if (c instanceof self.ObjectID) return new c.constructor(c.toString()); if (c instanceof self.Binary) return new c.constructor(new Buffer(c.value(true))); }); }
...
tcache.prototype.clear = function () {
for (var i=0; i<this._cache.length; i++) {
this._cache[i]={k:null};
}
};
tcache.prototype.set = function (k,v) {
this._cache[k%this.size] = {k:k,v:this._tdb._cloneDeep(v)};
};
tcache.prototype.unset = function (k) {
var c = this._cache[k%this.size];
if (c.k==k)
this._cache[k%this.size]={k:null};
};
...
_collection = function (cname, opts, create, cb) { var err = this._nameCheck(cname); if (!cb) { cb = opts; opts = {}; } cb = cb || function () {}; if (err) return safe.back(cb, err); var self = this; var c = self._cols[cname]; if (c) { if (opts.strict && create) safe.back(cb, new Error("Collection " + cname + " already exists. Currently in safe mode.")); else safe.back(cb, null, c); return c; } c = new tcoll(this); self._cols[cname] = c; c.init(this, cname, opts, create, function (err) { if (err) { delete self._cols[cname]; cb(err); } else cb(null, c); }); return c; }
...
if (!c)
return safe.back(args[args.length - 1], new Error("Collection doesn't exists"));
c.createIndex.apply(c, args);
});
tdb.prototype.collection = function (cname, opts, cb) {
return this._collection(cname, opts,false, cb)
}
tdb.prototype.createCollection = function (cname, opts, cb) {
return this._collection(cname, opts,true, cb)
}
tdb.prototype._nameCheck = function (cname) {
...
_nameCheck = function (cname) { var err = null; if (!_.isString(cname)) err = new Error("collection name must be a String"); if (!err && cname.length==0) err = new Error("collection names cannot be empty"); if (!err && cname.indexOf("$")!=-1) err = new Error("collection names must not contain '$'"); if (!err) { var di = cname.indexOf("."); if (di==0 || di==cname.length-1) err = new Error("collection names must not start or end with '.'"); } if (!err && cname.indexOf("..")!=-1) err = new Error("collection names cannot be empty"); return err; }
...
tcoll.prototype.rename = function (nname, opts, cb) {
var self = this;
if (_.isFunction(opts)) {
cb = opts;
opts = {};
}
var err = self._tdb._nameCheck(nname);
if (err)
return safe.back(cb,err);
if (self._tdb._stype=="mem") {
delete self._tdb._cols[self._name];
self._tdb._cols[nname] = self;
delete self._tdb._mstore[self._name];
self._tdb._mstore[nname] = self._mstore;
...
close = function (forceClose, cb) { var self = this; if (cb==null) cb = forceClose; cb = cb || function () {}; // stop any further operations on current collections safe.eachOf(self._cols, function (c, k, cb) { c._stop(cb) }, safe.sure(cb, function () { // and clean list self._cols = Object.create ? Object.create(null) : {}; this.openCalled = false; safe.back(cb,null,this); })) }
...
store[k] = { pos: wpos, sum: rec.sum };
wpos += data.length;
cb();
}));
}));
}, function (err) {
if (err) {
fs.close(fd, function () {
fs.unlink(filename, function () {
cb(err);
});
});
return;
}
if (!!process.platform.match(/^win/)) {
...
collection = function (cname, opts, cb) { return this._collection(cname, opts,false, cb) }
...
```javascript
var Db = require('mongodb').Db,
Server = require('mongodb').Server,
assert = require('assert');
var db = new Db('test', new Server('localhost', 27017));
var collection = db.collection("batch_document_insert_collection_safe");
collection.insert([{hello:'world_safe1'}
, {hello:'world_safe2'}], {w:1}, function(err, result) {
assert.equal(null, err);
collection.findOne({hello:'world_safe2'}, function(err, item) {
assert.equal(null, err);
assert.equal('world_safe2', item.hello);
...
collectionNames = function (opts, cb) { var self = this; if (_.isUndefined(cb)) { cb = opts; opts = {}; } if (this._stype=="mem") { cb(null,_(self._mstore).keys().map(function (e) { return opts.namesOnly?e:{name:self._name+"."+e};}).value()); } else { fs.readdir(self._path, safe.sure(cb,function(files) { // some collections ca be on disk and some only in memory, we need both files = _.union(files,_.keys(self._cols)); cb(null,_(files) .reject(function (e) {return /^\./.test(e);}) // ignore hidden linux alike files .map(function (e) { return opts.namesOnly?e:{name:self._name+"."+e};}) .value()); })); } }
...
.value());
}));
}
};
tdb.prototype.collections = function (cb) {
var self = this;
self.collectionNames({namesOnly:1},safe.sure(cb, function (names) {
safe.forEach(names, function (cname, cb) {
self.collection(cname, cb);
},safe.sure(cb, function () {
cb(null, _.values(self._cols));
}));
}));
};
...
collections = function (cb) { var self = this; self.collectionNames({namesOnly:1},safe.sure(cb, function (names) { safe.forEach(names, function (cname, cb) { self.collection(cname, cb); },safe.sure(cb, function () { cb(null, _.values(self._cols)); })); })); }
...
cb(null,true);
}
}))
}
tdb.prototype.dropDatabase = function (cb) {
var self = this;
self.collections(safe.sure(cb, function(collections) {
safe.forEach(collections, function (c, cb) {
self.dropCollection(c.collectionName,cb);
},cb);
}));
};
tdb.prototype.compactDatabase = function (cb) {
...
compactDatabase = function (cb) { var self = this; self.collections(safe.sure(cb, function(collections) { safe.forEach(collections, function (c, cb) { c.compactCollection(cb); },cb); })); }
n/a
createCollection = function (cname, opts, cb) { return this._collection(cname, opts,true, cb) }
n/a
createIndex = function () { var args = arguments, index = -1, length = nativeMax(args.length - start, 0), array = Array(length); while (++index < length) { array[index] = args[start + index]; } switch (start) { case 0: return func.call(this, array); case 1: return func.call(this, args[0], array); case 2: return func.call(this, args[0], args[1], array); } var otherArgs = Array(start + 1); index = -1; while (++index < start) { otherArgs[index] = args[index]; } otherArgs[start] = array; return apply(func, this, otherArgs); }
n/a
dropCollection = function (cname, cb) { var self = this; var c = this._cols[cname]; if (!c) { var err = new Error("ns not found"); if (cb) return safe.back(cb, err) else throw new err; } c._stop(safe.sure(cb, function (ondisk) { delete self._cols[cname]; if (ondisk) fs.unlink(path.join(self._path,cname),safe.sure(cb, function () { cb(null, true) })) else { if (self._stype=="mem") delete self._mstore[cname]; cb(null,true); } })) }
...
}));
}
});
}));
};
tcoll.prototype.drop = function (cb) {
this._tdb.dropCollection(this._name,cb);
};
tcoll.prototype.rename = function (nname, opts, cb) {
var self = this;
if (_.isFunction(opts)) {
cb = opts;
opts = {};
...
dropDatabase = function (cb) { var self = this; self.collections(safe.sure(cb, function(collections) { safe.forEach(collections, function (c, cb) { self.dropCollection(c.collectionName,cb); },cb); })); }
n/a
open = function (options, cb) { // actually do nothing for now, we are inproc // so nothing to open/close... collection will keep going on their own if (cb==null) cb = options; cb = cb || function () {}; this.openCalled = true; safe.back(cb,null,this) }
...
```javascript
var engine = require('./engine');
var db = engine.getDB();
console.time("sample")
db.open(function(err,db) {
db.collection("homes", function (err, homes) {
// it's fine to create ObjectID in advance
// NOTE!!! we get class through engine because its type
// can depends on database type
var homeId = new engine.ObjectID();
// but with TingoDB.ObjectID righ here it will be negative
// which means temporary. However it's unique and can be used for
...
renameCollection = function (on, nn, opts, cb) { if (cb==null) { cb = opts; opts = {}; } cb = cb || safe.noop; var old = this._cols[on]; if (old) old.rename(nn, {}, cb) else safe.back(cb); }
n/a
function tindex(key, tcoll, options, name) { this.options = options || {}; this.name = name || key+'_'; this._unique = this.options.unique || false; this._c = tcoll; this._nuls = Object.create ? Object.create(null) : {}; this._array = this.options._tiarr || false; this.key = key[0][0]; this.order = key[0][1]; if (key.length > 1) this._sub = key.slice(1); this._bp = BPlusTree.create({ sort: this.order, order: 100 }); var getter = new tcoll._tdb.Finder.field(this.key); this._get = new Function("obj", "return " + (this._array ? getter.native3() : getter.native())); }
n/a
_del = function (k, v, o) { if (this._sub) { var s = (_.isNull(k) || _.isUndefined(k)) ? this._nuls[v] : this._bp.get(k); if (s) s.del(o, v); return; } delete this._nuls[v]; if (this._unique) { this._bp.del(k); } else { var l = this._bp.get(k); if (l) { var i = l.indexOf(v); if (i != -1) l.splice(i, 1); if (l.length===0) this._bp.del(k); } } }
...
};
tindex.prototype.del = function (k_,v) {
var self = this;
var k = this._get(k_);
if (_.isArray(k)) {
_.each(k, function (k1) {
self._del(k1, v, k_);
});
}
else
return this._del(k, v, k_);
};
tindex.prototype._del = function (k, v, o) {
...
_set = function (k, v, o) { if (this._sub) { var s = (_.isNull(k) || _.isUndefined(k)) ? this._nuls[v] : this._bp.get(k); if (!s) { s = new tindex(this._sub, this._c, this.options, this.name + '_' + k); if (_.isNull(k) || _.isUndefined(k)) this._nuls[v] = s; else this._bp.set(k, s); } s.set(o, v); return; } if (_.isNull(k) || _.isUndefined(k)) { this._nuls[v]=v; return; } if (this._unique) return this._bp.set(k,v); else { var l = this._bp.get(k); var n = l || []; n.push(v); if (!l) this._bp.set(k,n); } }
...
if (check) {
if (!this._sub && this._unique && this._bp.get(k) !== null)
throw new Error("duplicate key error index");
} else {
if (_.isArray(k)) {
_.each(k, function (k1) {
self._set(k1, v, k_);
});
}
else
return this._set(k, v, k_);
}
};
...
all = function (order, shallow) { var a = this._bp.all(); var n = _.values(this._nuls); var r = this.order > 0 ? _.union(n, a) : _.union(a, n); if (order && order.length > 0) { if (order[0] != this.order) r = r.reverse(); order = order.slice(1); } if (this._sub) return shallow ? r : _(r).map(function (i) { return i.all(order); }).flattenDeep().value(); return this._unique?r:_.flatten(r); }
...
return s;
}
},
$ne : function () {
this.op = "$ne";
this._index = function (index) {
var m = index.match(this._args[1]._get());
var a = index.all();
return _.difference(a,m);
},
this._ex = function (f) {
if (this.fields()[f]!=null)
return 1;
else
return -1;
...
clear = function () { if (this.count()) this._bp = BPlusTree.create({ sort: this.order, order: 100 }); }
...
// var BPlusTree = require('./bplustree');
function tcache (tdb, size) {
this._tdb = tdb;
this.size = size || 1000;
this._cache = [];
this._cache.length = this.size;
this.clear();
}
tcache.prototype.clear = function () {
for (var i=0; i<this._cache.length; i++) {
this._cache[i]={k:null};
}
};
...
count = function () { var c = 0; this._bp.each(function (k,v) { c += this._sub ? v.count() : v.length; }); return c; }
...
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
console.log(count);
console.timeEnd("sample");
})
}
})
}
})
...
del = function (k_, v) { var self = this; var k = this._get(k_); if (_.isArray(k)) { _.each(k, function (k1) { self._del(k1, v, k_); }); } else return this._del(k, v, k_); }
...
}
// update index
_.each(self._idx,function(v,k) {
if (!remove)
v.set(item,key._id);
else
v.del(item,key._id);
})
cb(null);
}, true, cb);
}
tcoll.prototype._putFS = function (item, remove, cb) {
...
depth = function () { return this._sub ? this._sub.length + 1 : 1; }
...
// compare each potential index with each other
_.each(pi,function (v1, i1) {
_.each(pi,function (v2, i2) {
if (i1 == i2) return;
// compare the set of possible keys for both indexes
if (_.union(v1.k, v2.k).length == v1.k.length) {
// key for v2 is a subset of key for v1, check equality
if (v1.k.length == v2.k.length && v1.i.depth() > v2.i.depth()) {
// keys are equal, but the depth of v2 is lower;
// v2 is preferable, strike out v1
pi.splice(i1, 1);
} else {
// in other two cases v1 is preferable, strike out v2
pi.splice(i2, 1);
}
...
fields = function () { var result = [ [ this.key, this.order ] ]; if (this._sub) result = result.concat(this._sub); return result; }
...
$lt : function () {
this.op = "$lt";
this._index = function (index) {
if (index.order > 0) return index.range(null, this._args[1]._get(), false, true);
else return index.range(this._args[1]._get(), null, true, false);
};
this._ex = function (f) {
if (this.fields()[f]!=null)
return 1;
else
return -1;
}
this.dump = function () {
return this._args[0].dump() +" < " +this._args[1].dump()
}
...
inspect = function (depth) { return '[Index ' + this.name + ']'; }
n/a
match = function (k) { var m = this._bp.get(k); if (!m) return []; return this._unique || this._sub ? [ m ] : m; }
...
console.log(room[0]);
i--;
if (i==0) {
// now lets assume we serving request like
// /rooms?homeid=_some_string_
var query = "/rooms?homeid="+homeId.toString();
// dirty code to get simulated GET variable
var getId = query.match("homeid=(.*)")[1];
console.log(query, getId)
// typical code to get id from external world
// and use it for queries
rooms.find({_idHome:new engine.ObjectID(getId)})
.count(function (err, count) {
console.log(count);
console.timeEnd("sample");
...
nuls = function () { return _.values(this._nuls); }
...
},
$exists : function () {
this.op = "$exists";
this._index = function (index) {
if (this._args[1]._get())
return index.values();
else
return index.nuls();
},
this._ex = function (f) {
if (this.fields()[f]!=null)
return 1;
else
return -1;
}
...
range = function (s, e, si, ei) { var r = this._bp.rangeSync(s,e,si,ei); return this._unique || this._sub ? r : _.flatten(r); }
...
var ops = {
$range : function () {
this.op = "$range";
this.dump = function () {
return this._args[0].dump() +" in range (" +this._args[1].dump()+","+this._args[2].dump()+","+this
._args[3].dump()+","+this._args[4].dump()+")";
},
this._index = function (index) {
if (index.order > 0) return index.range(this._args[1]._get(), this._args[2]._get
(), this._args[3]._get(), this._args[4]._get());
else return index.range(this._args[2]._get(), this._args[1]._get(), this._args[4]._get(), this._args[3]._get());
}
},
$lt : function () {
this.op = "$lt";
this._index = function (index) {
...
set = function (k_, v, check) { var self = this; var k = this._get(k_); if (check) { if (!this._sub && this._unique && this._bp.get(k) !== null) throw new Error("duplicate key error index"); } else { if (_.isArray(k)) { _.each(k, function (k1) { self._set(k1, v, k_); }); } else return this._set(k, v, k_); } }
...
}
this._tq = new wqueue(100, function (cb) {
// update indexes
safe.eachOfSeries(self._store, function (rec, k, cb) {
self._get(rec.pos, false, safe.sure(cb, function (obj) {
var id = simplifyKey(obj._id);
_.each(self._idx,function(v, k) {
v.set(obj, id);
});
cb();
}));
}, cb);
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
};
...
values = function () { var r = this._bp.all(); return this._unique || this._sub ? r : _.flatten(r); }
...
return s;
}
},
$exists : function () {
this.op = "$exists";
this._index = function (index) {
if (this._args[1]._get())
return index.values();
else
return index.nuls();
},
this._ex = function (f) {
if (this.fields()[f]!=null)
return 1;
else
...
function CursorStream(cursor, options) { if(!(this instanceof CursorStream)) return new CursorStream(cursor); options = options ? options : {}; Stream.call(this); this.readable = true; this.paused = false; this._cursor = cursor; this._destroyed = null; this.options = options; // give time to hook up events var self = this; process.nextTick(function() { self._init(); }); }
n/a
_init = function () { if (this._destroyed) return; this._next(); }
...
var inproc_id = -1;
var tempset = {};
function ObjectID(val) {
// every new instance will have temporary inproc unique value
// minus sign will let know to db layer that value is temporary
// and need to be replaced
this.id = --inproc_id;
this._init(val);
// we need to keep track all temporary instances with goal
// to update them at later tima
if (this.id<0) {
if (!tempset[this.id])
tempset[this.id]=[this];
else
tempset[this.id].push(this);
...
_next = function () { if(this.paused || this._destroyed) return; var self = this; // Get the next object processor(function() { if(self.paused || self._destroyed) return; self._cursor.nextObject(function (err, doc) { self._onNextObject(err, doc); }); }); }
...
/**
* Initialize the cursor.
* @ignore
* @api private
*/
CursorStream.prototype._init = function () {
if (this._destroyed) return;
this._next();
}
/**
* Pull the next document from the cursor.
* @ignore
* @api private
*/
...
_onNextObject = function (err, doc) { if(err) return this.destroy(err); // when doc is null we hit the end of the cursor //if(!doc && (this._cursor.state == 1 || this._cursor.state == 2)) { if(!doc) { this.emit('end') return this.destroy(); } else if(doc) { var data = _.isFunction(this.options.transform) ? this.options.transform(doc) : doc; this.emit('data', data); this._next(); } }
...
var self = this;
// Get the next object
processor(function() {
if(self.paused || self._destroyed) return;
self._cursor.nextObject(function (err, doc) {
self._onNextObject(err, doc);
});
});
}
/**
* Handle each document as its returned from the cursor.
* @ignore
...
destroy = function (err) { if (this._destroyed) return; this._destroyed = true; this.readable = false; this._cursor.close(); if(err) { this.emit('error', err); } this.emit('close'); }
...
/**
* Handle each document as its returned from the cursor.
* @ignore
* @api private
*/
CursorStream.prototype._onNextObject = function (err, doc) {
if(err) return this.destroy(err);
// when doc is null we hit the end of the cursor
//if(!doc && (this._cursor.state == 1 || this._cursor.state == 2)) {
if(!doc) {
this.emit('end')
return this.destroy();
} else if(doc) {
...
pause = function () { this.paused = true; }
n/a
resume = function () { var self = this; // Don't do anything if we are not paused if(!this.paused) return; //if(!this._cursor.state == 3) return; process.nextTick(function() { self.paused = false; // Only trigger more fetching if the cursor is open self._next(); }) }
n/a
intersectIndexes = function (indexes, base) { // do intersection of indexes using hashes var ops = [], i = 0; // convert to hashes for (i=0; i<indexes.length; i++) { var ids = {}; _.each(indexes[i], function (id) { ids[id]=id; }) ops.push(ids); } // find minimal one if (_.isUndefined(base)) { base = 0; for (i=0; i<ops.length; i++) { if (ops[i].length<ops[base].length) base = i; } } // iterate over it var m = []; _.each(indexes[base], function (id) { var match = true; for (var i=0; i<ops.length; i++) { if (i==base) continue; if (!ops[i][id]) { match = false; break; } } if (match) m.push(id); }) return m; }
...
// so the result is sorted, but in reverse direction
p.reverse();
sort = null;
}
}
} else {
// TODO: use sort index as intersect base to speedup sorting
p = tutils.intersectIndexes(p);
}
// nowe we have ids, need to convert them to positions
range = _.map(p,function (_id) {
return self._store[_id].pos;
})
} else {
if (si) {
...
function wqueue(limit, first) { this.limit = limit || 100; this._rc = 0; this._q = []; this._blocked = false; this._stoped = false; this._tc = -1; this.first = first || function (cb) {cb()}; }
n/a
_exec = function (task, block, cb) { var self = this; self._blocked = block; self._tc++; if (self._tc==0) { self._blocked = true; self.first(function (err) { if (err) { // restore to initial state on error self._blocked = false; self._tc--; return cb(err); } self._exec(task,block,cb) }) } else { self._rc++; task(function () { cb.apply(this,arguments) self._rc--; if (self._rc==0) self._blocked = false; self._ping(); }) } }
...
self.first(function (err) {
if (err) {
// restore to initial state on error
self._blocked = false;
self._tc--;
return cb(err);
}
self._exec(task,block,cb)
})
} else {
self._rc++;
task(function () {
cb.apply(this,arguments)
self._rc--;
if (self._rc==0)
...
_ping = function () { var self = this; process.nextTick(function () { while (self._q.length>0 && self._rc<self.limit && !self._blocked && (!self._q[0].block || self._rc==0) ) { var t = self._q.splice(0,1)[0]; if (self._stoped) t.cb(new Error("Database is closed")); else { self._exec(t.task, t.block, t.cb) } } }) }
...
this._stoped = false;
this._tc = -1;
this.first = first || function (cb) {cb()};
}
wqueue.prototype.add = function(task,block,cb) {
this._q.push({task:task,block:block, cb:cb});
this._ping();
}
wqueue.prototype._exec = function (task,block,cb) {
var self = this;
self._blocked = block;
self._tc++;
if (self._tc==0) {
...
add = function (task, block, cb) { this._q.push({task:task,block:block, cb:cb}); this._ping(); }
...
});
});
self.ensureIndex({_id: 1}, {name: '_id_', unique: true}, cb);
};
tcoll.prototype.compactCollection = function (cb) {
var self = this;
self._tq.add(function (cb) {
self._compact(safe.sure(cb, function () {
self._cache.clear();
self._refreshIndexes(cb);
}));
}, true, cb);
};
...