Skip to content

Commit

Permalink
Refactored waterline core.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikermcneil committed Apr 7, 2013
1 parent 623481b commit bfa1401
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 422 deletions.
63 changes: 44 additions & 19 deletions .jshintrc
Original file line number Diff line number Diff line change
@@ -1,22 +1,47 @@
{
"bitwise": true,
"camelcase": true,
"node": true,
"undef": true,
"unused": true,
"curly": true,
"immed": true,
"latedef": true,
"noarg": true,
"noempty": true,
"plusplus": true,
"quotmark": "single",
"trailing": true,
"asi": false,
"eqnull": true,
"eval": true,
// To fix column positions for JSHint errors you may want to add `"indent": 1` to your
// **User** "jshint_options". This issue affects users with tabs for indentation.
// This fix was reverted due to a conflict with using the `"white": true` option.
// "indent": 1,
"evil": true,
"regexdash": true,
"browser": true,
"wsh": true,
"sub": true,
"supernew": true,
"eqeqeq": true,
"eqnull": true

// Suppress warnings about mixed tabs and spaces
"smarttabs": true,

// Suppress warnings about trailing whitespace
"trailing": false,

// Suppress warnings about the use of expressions where fn calls or assignments are expected
"expr": true,

// Suppress warnings about using functions inside loops (useful for inifinityCounters)
"loopfunc": true,

// Suppress warnings about using assignments where conditionals are expected
"boss": true

// "bitwise": true,
// "camelcase": true,
// "node": true,
// "undef": true,
// "unused": true,
// "curly": true,
// "immed": true,
// "latedef": true,
// "noarg": true,
// "noempty": true,
// "plusplus": true,
// "quotmark": "single",
// "trailing": true,
// "asi": false,
// "eqnull": true,
// "eval": true,
// "sub": true,
// "supernew": true,
// "eqeqeq": true,
// "eqnull": true
}
245 changes: 18 additions & 227 deletions lib/waterline/adapter.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
var async = require('async');
var _ = require('underscore');
var parley = require('parley');
var uuid = require('node-uuid');


// (for sorting)
var MAX_INTEGER = 4294967295;

// Read global waterline config
var waterlineConfig = require('./config.js');
Expand All @@ -17,15 +11,8 @@ var normalize = require('./normalize.js');
module.exports = function(adapterDef, cb) {
var self = this;


// Absorb any properties/methods from definition that are undefined here

// this.identity = adapterDef.identity;
// this.syncable = adapterDef.syncable;

// Pass through defaults from adapterDef
// this.defaults = adapterDef.defaults;

// (absorbs any properties/methods from definition that are undefined here)
_.defaults(this,adapterDef);


Expand Down Expand Up @@ -261,17 +248,13 @@ module.exports = function(adapterDef, cb) {
}

// Default behavior
// Warning: Inefficient! App-level tranactions should not be used for built-in compound queries.
// WARNING: Not transactional! (unless your data adapter is)
else {
// Create transaction name based on collection
// var transactionName = collectionName+'.waterline.default.create';
// self.transaction(transactionName, function (err,done) {
self.find(collectionName, criteria, function(err, result) {
if(err) cb(err);
else if(result) cb(null, result);
else self.create(collectionName, values, cb);
});
// }, cb);
self.find(collectionName, criteria, function(err, result) {
if(err) cb(err);
else if(result) cb(null, result);
else self.create(collectionName, values, cb);
});
}

// TODO: Return model instance Promise object for joins, etc.
Expand All @@ -292,13 +275,11 @@ module.exports = function(adapterDef, cb) {
}

// Default behavior
// WARNING: Not transactional! (unless your data adapter is)
else {
// Create transaction name based on collection
// self.transaction(collectionName+'.waterline.default.create', function (err,done) {
async.forEachSeries(valuesList, function (values,cb) {
self.create(collectionName, values, cb);
}, cb);
// },cb);
async.forEachSeries(valuesList, function (values,cb) {
self.create(collectionName, values, cb);
}, cb);
}
};
// If an optimized findOrCreateEach exists, use it, otherwise use an asynchronous loop with create()
Expand Down Expand Up @@ -344,11 +325,6 @@ module.exports = function(adapterDef, cb) {




//////////////////////////////////////////
// Streaming
//////////////////////////////////////////

// stream.write() is used to send data
// Must call stream.end() to complete stream
this.stream = function (collectionName, criteria, stream) {
Expand All @@ -357,185 +333,23 @@ module.exports = function(adapterDef, cb) {
};


// Mix in transactions
var transactions = require('./transaction')(adapterDef);
this.transaction = transactions.transaction;
this.getAutoIncrementAttribute = transactions.getAutoIncrementAttribute;

//////////////////////////////////////////////////////////////////////
// Concurrency
//////////////////////////////////////////////////////////////////////

/**
* App-level transaction
* @transactionName a unique identifier for this transaction
* @atomicLogic the logic to be run atomically
* @afterUnlock (optional) the function to trigger after unlock() is called
*/
this.transaction = function(transactionName, atomicLogic, afterUnlock) {
var self = this;

// Use the adapter definition's transaction() if specified
if (adapterDef.transaction) return adapterDef.transaction(transactionName, atomicLogic, afterUnlock);
else if (!adapterDef.commitLog) return afterUnlock("Cannot process transaction. Commit log disabled in adapter, and no custom transaction logic is defined.");

// Generate unique lock
var newLock = {
uuid: uuid.v4(),
name: transactionName,
atomicLogic: atomicLogic,
afterUnlock: afterUnlock
};

// write new lock to commit log
if (!this.transactionCollection) {
console.error("Trying to start transaction ("+transactionName+") in collection:",this.identity);
console.error("But the transactionCollection is: ",this.transactionCollection);
return afterUnlock("Transaction collection not defined!");
}
this.transactionCollection.create(newLock, function afterCreatingTransaction(err, newLock) {
if(err) return atomicLogic(err, function() {
throw err;
});

// Check if lock was written, and is the oldest with the proper name
self.transactionCollection.findAll(function afterLookingUpTransactions(err, locks) {
if(err) return atomicLogic(err, function() {
throw err;
});

var conflict = false;
_.each(locks, function eachLock (entry) {

// If a conflict IS found, respect the oldest
if(entry.name === newLock.name &&
entry.uuid !== newLock.uuid &&
entry.id < newLock.id) conflict = entry;
});

// If there are no conflicts, the lock is acquired!
if(!conflict) acquireLock(newLock);

// Otherwise, get in line: a lock was acquired before mine, do nothing


});
});
};

/**
* acquireLock() is run after the lock is acquired, but before passing control to the atomic app logic
*
* @newLock the object representing the lock to acquire
* @name name of the lock
* @atomicLogic the transactional logic to be run atomically
* @afterUnlock (optional) the function to run after the lock is subsequently released
*/
var acquireLock = function(newLock) {

var warningTimer = setTimeout(function() {
console.error("Transaction :: " + newLock.name + " is taking an abnormally long time (> " + waterlineConfig.transactionWarningTimer + "ms)");
}, waterlineConfig.transactionWarningTimer);

newLock.atomicLogic(null, function unlock () {
clearTimeout(warningTimer);
releaseLock(newLock,arguments);
});
};


// releaseLock() will grant pending lock requests in the order they were received
//
// @currentLock the lock currently acquired
// @afterUnlockArgs the arguments to pass to the afterUnlock function
var releaseLock = function(currentLock, afterUnlockArgs) {

var cb = currentLock.afterUnlock;

// Get all locks
self.transactionCollection.findAll(function afterLookingUpTransactions(err, locks) {
if(err) return cb && cb(err);

// Determine the next user in line
// (oldest lock that isnt THIS ONE w/ the proper transactionName)
var nextInLine = getNextLock(locks, currentLock);

// Remove current lock
self.transactionCollection.destroy({
uuid: currentLock.uuid
}, function afterLockReleased (err) {
if(err) return cb && cb(err);

// Trigger unlock's callback if specified
// > NOTE: do this before triggering the next queued transaction
// to prevent transactions from monopolizing the event loop
cb && cb.apply(null, afterUnlockArgs);

// Now allow the nextInLine lock to be acquired
// This marks the end of the previous transaction
nextInLine && acquireLock(nextInLine);
});
});
};



// Find this collection's auto-increment field and return its name
this.getAutoIncrementAttribute = function (collectionName, cb) {
this.describe(collectionName, function (err,attributes) {
var attrName, done=false;
_.each(attributes, function(attribute, aname) {
if(!done && _.isObject(attribute) && attribute.autoIncrement) {
attrName = aname;
done = true;
}
});

cb(null, attrName);
});
};

// Share this method with the child adapter
adapterDef.getAutoIncrementAttribute = this.getAutoIncrementAttribute;

// If @collectionName and @otherCollectionName are both using this adapter, do a more efficient remote join.
// (By default, an inner join, but right and left outer joins are also supported.)
this.join = function(collectionName, otherCollectionName, key, foreignKey, left, right, cb) {
adapterDef.join ? adapterDef.join(collectionName, otherCollectionName, key, foreignKey, left, right, cb) : cb();
// TODO
adapterDef.join ? adapterDef.join(collectionName, otherCollectionName, key, foreignKey, left, right, cb) : cb('Join not supported!');
};

// Sync given collection's schema with the underlying data model
// Controls whether database is dropped and recreated when app starts,
// or whether waterline will try and synchronize the schema with the app models.
this.sync = {

// Drop and recreate collection
drop: function(collection, cb) {
var self = this;
this.drop(collection.identity, function afterDrop (err, data) {
if(err) return cb(err);
else self.define(collection.identity, collection, cb);
});
},

// Alter schema
alter: function(collection, cb) {
var self = this;

// Check that collection exists--
this.describe(collection.identity, function afterDescribe (err, attrs) {
attrs = _.clone(attrs);
if(err) return cb(err);

// if it doesn't go ahead and add it and get out
else if(!attrs) return self.define(collection.identity, collection, cb);

// Otherwise, if it *DOES* exist, we'll try and guess what changes need to be made
else self.alter(collection.identity, require('./augmentAttributes')(collection.attributes,collection), cb);
});
},

// Do nothing to the underlying data model
safe: function (collection,cb) {
cb();
}
};
this.sync = require('./sync.js');

// Bind adapter methods to self
_.bindAll(adapterDef);
Expand Down Expand Up @@ -574,26 +388,3 @@ module.exports = function(adapterDef, cb) {

return cb && cb(null, self);
};


// Find the oldest lock with the same transaction name
// ************************************************************
// this function wouldn't be necessary if we could....
// TODO: call find() with the SORT option
// ************************************************************
function getNextLock(locks, currentLock) {
var nextLock;
_.each(locks, function(lock) {

// Ignore locks with different transaction names
if (lock.name !== currentLock.name) return;

// Ignore current lock
if (lock.uuid === currentLock.uuid) return;

// Find the lock with the smallest id
var minId = nextLock ? nextLock.id : MAX_INTEGER;
if (lock.id < minId) nextLock = lock;
});
return nextLock;
}
Loading

0 comments on commit bfa1401

Please sign in to comment.