io = function (config) { this._currentState = STATES.CLOSED this.constants = C this._loadConfig(config) this._connectionEndpoint = null this._messageProcessor = null this._messageDistributor = null this._eventHandler = null this._rpcHandler = null this._recordHandler = null this._plugins = [ 'messageConnector', 'storage', 'cache', 'authenticationHandler', 'permissionHandler' ] }
n/a
io.config_permission_handler = function (options, config) { this._ruleCache = new RuleCache(options) this._options = options this._permissionOptions = options.permission.options this._config = null this._recordHandler = null this.isReady = false this.type = `valve permissions loaded from ${this._permissionOptions.path}` this._optionsValid = true const maxRuleIterations = options.permission.options.maxRuleIterations if (maxRuleIterations !== undefined && maxRuleIterations < 1) { this._optionsValid = false process.nextTick(() => this.emit('error', 'Maximum rule iteration has to be at least one ')) } else if (config) { this.useConfig(config) } }
n/a
io.dependency_initialiser = function (options, name) { this.isReady = false this._options = options this._dependency = options[name] this._name = name this._timeout = null if (typeof this._dependency.on !== 'function' && typeof this._dependency.isReady === 'undefined') { const errorMessage = `${this._name} needs to implement isReady or be an emitter` this._options.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.PLUGIN_INITIALIZATION_ERROR, errorMessage) const error = new Error(errorMessage) error.code = 'PLUGIN_INITIALIZATION_ERROR' throw error } if (this._dependency.isReady) { this._onReady() } else { this._timeout = setTimeout( this._onTimeout.bind(this), this._options.dependencyInitialisationTimeout ) this._dependency.once('ready', this._onReady.bind(this)) this._dependency.on('error', this._onError.bind(this)) if (this._dependency.init) { this._dependency.init() } } }
n/a
io.event_handler = function (options) { this._options = options this._subscriptionRegistry = new SubscriptionRegistry(options, C.TOPIC.EVENT) this._listenerRegistry = new ListenerRegistry(C.TOPIC.EVENT, options, this._subscriptionRegistry) this._subscriptionRegistry.setSubscriptionListener(this._listenerRegistry) this._logger = options.logger this._message = options.messageConnector }
n/a
io.json_path = function (path) { this._path = path this._tokens = [] this._tokenize() }
n/a
io.record_deletion = function (options, socketWrapper, message, successCallback) { this._options = options this._socketWrapper = socketWrapper this._message = message this._successCallback = successCallback this._recordName = message.data[0] this._completed = 0 this._isDestroyed = false this._cacheTimeout = setTimeout( this._handleError.bind(this, 'cache timeout'), this._options.cacheRetrievalTimeout ) this._options.cache.delete( this._recordName, this._checkIfDone.bind(this, this._cacheTimeout) ) if (!this._options.storageExclusion || !this._options.storageExclusion.test(this._recordName)) { this._storageTimeout = setTimeout( this._handleError.bind(this, 'storage timeout'), this._options.storageRetrievalTimeout ) this._options.storage.delete( this._recordName, this._checkIfDone.bind(this, this._storageTimeout) ) } else { this._checkIfDone(null) } }
n/a
io.record_handler = function (options) { this._options = options this._subscriptionRegistry = new SubscriptionRegistry(options, C.TOPIC.RECORD) this._listenerRegistry = new ListenerRegistry(C.TOPIC.RECORD, options, this._subscriptionRegistry) this._subscriptionRegistry.setSubscriptionListener(this._listenerRegistry) this._transitions = {} this._recordRequestsInProgress = {} }
n/a
io.record_request = function (recordName, options, socketWrapper, onComplete, onError) { this._recordName = recordName this._options = options this._socketWrapper = socketWrapper this._storageRetrievalTimeout = null this._onComplete = onComplete this._onError = onError this._isDestroyed = false this._cacheRetrievalTimeout = setTimeout( this._sendError.bind(this, C.EVENT.CACHE_RETRIEVAL_TIMEOUT, this._recordName), this._options.cacheRetrievalTimeout ) this._options.cache.get(this._recordName, this._onCacheResponse.bind(this)) }
n/a
io.record_transition = function (name, options, recordHandler) { this._name = name this._options = options this._recordHandler = recordHandler this._steps = [] this._record = null this._currentStep = null this._recordRequest = null this._sendVersionExists = [] this.isDestroyed = false this._pendingUpdates = {} this._ending = false this._storageResponses = 0 this._cacheResponses = 0 this._lastVersion = null this._lastError = null }
n/a
io.rule_application = function (params) { this._params = params this._isDestroyed = false this._runScheduled = false this._maxIterationCount = this._params.permissionOptions.maxRuleIterations this._crossReferenceFn = this._crossReference.bind(this) this._pathVars = this._getPathVars() this._user = this._getUser() this._recordData = {} this._id = Math.random().toString() this._iterations = 0 this._run() }
n/a
io.rule_cache = function (options) { this._options = options this._data = {} setInterval(this._purge.bind(this), options.cacheEvacuationInterval) }
n/a
io.socket_wrapper = function (socket, options) { this.socket = socket this.isClosed = false this.socket.once('close', this._onSocketClose.bind(this)) this._options = options this.user = null this.authCallBack = null this.authAttempts = 0 this.setMaxListeners(0) this.uuid = Math.random() this._handshakeData = null this._setUpHandshakeData() this._queuedMessages = [] this._currentPacketMessageCount = 0 this._sendNextPacketTimeout = null this._currentMessageResetTimeout = null }
n/a
io.std_out_logger = function (options) { this._options = options || {} this.isReady = true this._$useColors = this._options.colors === undefined ? true : this._options.colors this._logLevelColors = [ 'white', 'green', 'yellow', 'red' ] this._currentLogLevel = C.LOG_LEVEL[this._options.logLevel] || C.LOG_LEVEL.DEBUG }
n/a
function readMessage(message) { const TOPIC = C.TOPIC const ACTIONS = C.ACTIONS return { isRecord: message.topic === TOPIC.RECORD, isEvent: message.topic === TOPIC.EVENT, isRPC: message.topic === TOPIC.RPC, isCreate: message.action === ACTIONS.CREATEORREAD, isRead: message.action === ACTIONS.CREATEORREAD, isChange: ( message.action === ACTIONS.PATCH || message.action === ACTIONS.UPDATE ), isDelete: message.action === ACTIONS.DELETE, isAck: message.action === ACTIONS.ACK, isSubscribe: message.action === ACTIONS.SUBSCRIBE, isUnsubscribe: message.action === ACTIONS.UNSUBSCRIBE, isRequest: message.action === ACTIONS.REQUEST, isRejection: message.action === ACTIONS.REJECTION, name: message.data[0], path: message.action === ACTIONS.PATCH ? message.data[2] : undefined, data: message.action === ACTIONS.PATCH ? message.data[3] : message.data[2] } }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
compile = function (config) { const compiledConfig = {} let compiledRuleset let section let path for (section in config) { compiledConfig[section] = [] for (path in config[section]) { compiledRuleset = compileRuleset(path, config[section][path]) compiledConfig[section].push(compiledRuleset) } } return compiledConfig }
...
const validationResult = configValidator.validate(config)
if (validationResult !== true) {
this.emit('error', `invalid permission config - ${validationResult}`)
return
}
this._config = configCompiler.compile(config)
this._ruleCache.reset()
this._ready()
}
/**
* Implements the permissionHandler's canPerformAction interface
* method
...
initialise = function (config) { commandLineArguments = global.deepstreamCLI || {} handleUUIDProperty(config) handleSSLProperties(config) handleLogger(config) handlePlugins(config) handleAuthStrategy(config) handlePermissionStrategy(config) return config }
...
*
* @public
* @returns {Object} config deepstream configuration object
*/
module.exports.loadConfig = function (filePath, /* test only */ args) {
const config = exports.loadConfigWithoutInitialisation(filePath, args)
return {
config: configInitialiser.initialise(config.config),
file: config.configPath
}
}
/**
* Set the globalConfig prefix that will be used as the directory for ssl, permissions and auth
* relative files within the config file
...
config_permission_handler = function (options, config) { this._ruleCache = new RuleCache(options) this._options = options this._permissionOptions = options.permission.options this._config = null this._recordHandler = null this.isReady = false this.type = `valve permissions loaded from ${this._permissionOptions.path}` this._optionsValid = true const maxRuleIterations = options.permission.options.maxRuleIterations if (maxRuleIterations !== undefined && maxRuleIterations < 1) { this._optionsValid = false process.nextTick(() => this.emit('error', 'Maximum rule iteration has to be at least one ')) } else if (config) { this.useConfig(config) } }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
_getCompiledRulesForName = function (name, ruleSpecification) { if (this._ruleCache.has(ruleSpecification.section, name, ruleSpecification.type)) { return this._ruleCache.get(ruleSpecification.section, name, ruleSpecification.type) } const section = this._config[ruleSpecification.section] let i = 0 let pathLength = 0 let result = null for (i; i < section.length; i++) { if ( typeof section[i].rules[ruleSpecification.type] !== UNDEFINED && section[i].path.length >= pathLength && section[i].regexp.test(name) ) { pathLength = section[i].path.length result = { path: section[i].path, regexp: section[i].regexp, rule: section[i].rules[ruleSpecification.type] } } } if (result) { this._ruleCache.set(ruleSpecification.section, name, ruleSpecification.type, result) } return result }
...
const name = message.data[0]
if (ruleSpecification === null) {
callback(null, true)
return
}
const ruleData = this._getCompiledRulesForName(name, ruleSpecification)
// eslint-disable-next-line
new RuleApplication({
recordHandler: this._recordHandler,
username,
authData,
path: ruleData,
...
_onConfigLoaded = function (filePath, loadError, config) { if (loadError) { this.emit('error', `error while loading config: ${loadError.toString()}`) return } this.emit('config-loaded', filePath) this.useConfig(config) }
n/a
_ready = function () { if (this.isReady === false) { this.isReady = true this.emit('ready') } }
...
if (validationResult !== true) {
this.emit('error', `invalid permission config - ${validationResult}`)
return
}
this._config = configCompiler.compile(config)
this._ruleCache.reset()
this._ready()
}
/**
* Implements the permissionHandler's canPerformAction interface
* method
*
* This is the main entry point for permissionOperations and will
...
canPerformAction = function ( username, message, callback, authData) { if (typeof message.data[0] !== STRING) { callback('invalid message', false) return } const ruleSpecification = rulesMap.getRulesForMessage(message) const name = message.data[0] if (ruleSpecification === null) { callback(null, true) return } const ruleData = this._getCompiledRulesForName(name, ruleSpecification) // eslint-disable-next-line new RuleApplication({ recordHandler: this._recordHandler, username, authData, path: ruleData, ruleSpecification, message, action: ruleSpecification.action, regexp: ruleData.regexp, rule: ruleData.rule, name, callback, logger: this._options.logger, permissionOptions: this._permissionOptions, options: this._options }) }
...
continue
}
if (parsedMessage.topic === C.TOPIC.CONNECTION && parsedMessage.action === C.ACTIONS.PONG) {
continue
}
this._options.permissionHandler.canPerformAction(
socketWrapper.user,
parsedMessage,
this._onPermissionResponse.bind(this, socketWrapper, parsedMessage),
socketWrapper.authData
)
}
}
...
init = function () { if (this._config === null && this._optionsValid) { this.loadConfig(this._permissionOptions.path) } }
...
this._onTimeout.bind(this),
this._options.dependencyInitialisationTimeout
)
this._dependency.once('ready', this._onReady.bind(this))
this._dependency.on('error', this._onError.bind(this))
if (this._dependency.init) {
this._dependency.init()
}
}
}
utils.inherits(DependencyInitialiser, EventEmitter)
/**
...
loadConfig = function (filePath) { jsYamlLoader.readAndParseFile(filePath, this._onConfigLoaded.bind(this, filePath)) }
...
* first
*
* @public
* @returns {void}
*/
ConfigPermissionHandler.prototype.init = function () {
if (this._config === null && this._optionsValid) {
this.loadConfig(this._permissionOptions.path)
}
}
/**
* Load a configuration file. This will either load a configuration file for the first time at
* startup or reload the configuration at runtime
*
...
setRecordHandler = function (recordHandler) { this._recordHandler = recordHandler }
n/a
useConfig = function (config) { const validationResult = configValidator.validate(config) if (validationResult !== true) { this.emit('error', `invalid permission config - ${validationResult}`) return } this._config = configCompiler.compile(config) this._ruleCache.reset() this._ready() }
...
this._optionsValid = true
const maxRuleIterations = options.permission.options.maxRuleIterations
if (maxRuleIterations !== undefined && maxRuleIterations < 1) {
this._optionsValid = false
process.nextTick(() => this.emit('error', 'Maximum rule iteration has to be at least one '))
} else if (config) {
this.useConfig(config)
}
}
utils.inherits(ConfigPermissionHandler, events.EventEmitter)
/**
* Will be invoked with the initialised recordHandler instance by deepstream.io
...
validate = function (config) { let validationStepResult let key for (key in validationSteps) { validationStepResult = validationSteps[key](config) if (validationStepResult !== true) { return validationStepResult } } return true }
...
*
* @param {Object} config deepstream permissionConfig
*
* @public
* @returns {void}
*/
ConfigPermissionHandler.prototype.useConfig = function (config) {
const validationResult = configValidator.validate(config)
if (validationResult !== true) {
this.emit('error', `invalid permission config - ${validationResult}`)
return
}
this._config = configCompiler.compile(config)
...
get = function () {
const options = {
/*
* General
*/
serverName: utils.getUid(),
showLogo: true,
logLevel: C.LOG_LEVEL.INFO,
/*
* Connectivity
*/
port: 6020,
host: '0.0.0.0',
urlPath: '/deepstream',
healthCheckPath: '/health-check',
externalUrl: null,
heartbeatInterval: 30000,
/*
* SSL Configuration
*/
sslKey: null,
sslCert: null,
sslCa: null,
/*
* Authentication
*/
auth: {
type: 'none'
},
/*
* Permissioning
*/
permission: {
type: 'none'
},
/*
* Default Plugins
*/
messageConnector: require('./default-plugins/noop-message-connector'),
cache: require('./default-plugins/local-cache'),
storage: require('./default-plugins/noop-storage'),
/*
* Storage options
*/
storageExclusion: null,
/*
* Security
*/
unauthenticatedClientTimeout: 180000,
maxAuthAttempts: 3,
logInvalidAuthData: true,
maxMessageSize: 1048576,
/**
* Listening
*/
shuffleListenProviders: true,
/*
* Timeouts
*/
rpcAckTimeout: 1000,
rpcTimeout: 10000,
cacheRetrievalTimeout: 1000,
storageRetrievalTimeout: 2000,
dependencyInitialisationTimeout: 2000,
stateReconciliationTimeout: 500,
clusterKeepAliveInterval: 5000,
clusterActiveCheckInterval: 1000,
clusterNodeInactiveTimeout: 6000,
listenResponseTimeout: 500,
lockTimeout: 1000,
lockRequestTimeout: 1000,
broadcastTimeout: 0
}
return options
}
...
*
* @param {String} urlPath URL where to download the archive
* @param {Stream} writeable output stream to save the archive
* @param {Function} callback Callback (err)
* @return {void}
*/
const downloadArchive = function (urlPath, outStream, callback) {
needle.get(`https://github.com${urlPath}`, {
follow_max: 5,
headers: { 'User-Agent': 'nodejs-client' }
}, (error, response) => {
if (error) {
return callback(error)
}
outStream.write(response.body)
...
dependency_initialiser = function (options, name) { this.isReady = false this._options = options this._dependency = options[name] this._name = name this._timeout = null if (typeof this._dependency.on !== 'function' && typeof this._dependency.isReady === 'undefined') { const errorMessage = `${this._name} needs to implement isReady or be an emitter` this._options.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.PLUGIN_INITIALIZATION_ERROR, errorMessage) const error = new Error(errorMessage) error.code = 'PLUGIN_INITIALIZATION_ERROR' throw error } if (this._dependency.isReady) { this._onReady() } else { this._timeout = setTimeout( this._onTimeout.bind(this), this._options.dependencyInitialisationTimeout ) this._dependency.once('ready', this._onReady.bind(this)) this._dependency.on('error', this._onError.bind(this)) if (this._dependency.init) { this._dependency.init() } } }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
_emitReady = function () { this.isReady = true this.emit('ready') }
n/a
_logError = function (message) { if (this._options.logger && this._options.logger.isReady) { this._options.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.PLUGIN_ERROR, message) } else { console.error('Error while initialising dependency') console.error(message) } }
...
* Callback for dependencies that weren't initialised in time
*
* @private
* @returns {void}
*/
DependencyInitialiser.prototype._onTimeout = function () {
const message = `${this._name} wasn't initialised in time`
this._logError(message)
const error = new Error(message)
error.code = C.EVENT.PLUGIN_INITIALIZATION_TIMEOUT
throw error
}
/**
* Handles errors emitted by the dependency at startup.
...
_onError = function (error) { if (this.isReady !== true) { this._logError(`Error while initialising ${this._name}: ${error.toString()}`) error.code = C.EVENT.PLUGIN_INITIALIZATION_ERROR throw error } }
...
*/
RecordRequest.prototype._sendError = function (event, message) {
this._options.logger.log(C.LOG_LEVEL.ERROR, event, message)
if (this._socketWrapper) {
this._socketWrapper.sendError(C.TOPIC.RECORD, event, message)
}
if (this._onError) {
this._onError(event, message)
}
this._destroy()
}
/**
* Destroys the record request. Clears down all references and stops
* all pending timeouts
...
_onReady = function () { if (this._timeout) { clearTimeout(this._timeout) } const dependencyType = this._dependency.type ? `: ${this._dependency.type}` : '' this._options.logger.log(C.LOG_LEVEL.INFO, C.EVENT.INFO, `${this._name} ready${dependencyType}`) process.nextTick(this._emitReady.bind(this)) }
...
this._options.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.PLUGIN_INITIALIZATION_ERROR, errorMessage)
const error = new Error(errorMessage)
error.code = 'PLUGIN_INITIALIZATION_ERROR'
throw error
}
if (this._dependency.isReady) {
this._onReady()
} else {
this._timeout = setTimeout(
this._onTimeout.bind(this),
this._options.dependencyInitialisationTimeout
)
this._dependency.once('ready', this._onReady.bind(this))
this._dependency.on('error', this._onError.bind(this))
...
_onTimeout = function () { const message = `${this._name} wasn't initialised in time` this._logError(message) const error = new Error(message) error.code = C.EVENT.PLUGIN_INITIALIZATION_TIMEOUT throw error }
n/a
getDependency = function () { return this._dependency }
n/a
event_handler = function (options) { this._options = options this._subscriptionRegistry = new SubscriptionRegistry(options, C.TOPIC.EVENT) this._listenerRegistry = new ListenerRegistry(C.TOPIC.EVENT, options, this._subscriptionRegistry) this._subscriptionRegistry.setSubscriptionListener(this._listenerRegistry) this._logger = options.logger this._message = options.messageConnector }
n/a
_addSubscriber = function (socket, message) { if (validateSubscriptionMessage(socket, message)) { this._subscriptionRegistry.subscribe(message.data[0], socket) } }
...
* @param {Object} message parsed and permissioned deepstream message
*
* @public
* @returns {void}
*/
EventHandler.prototype.handle = function (socket, message) {
if (message.action === C.ACTIONS.SUBSCRIBE) {
this._addSubscriber(socket, message)
} else if (message.action === C.ACTIONS.UNSUBSCRIBE) {
this._removeSubscriber(socket, message)
} else if (message.action === C.ACTIONS.EVENT) {
this._triggerEvent(socket, message)
} else if (message.action === C.ACTIONS.LISTEN ||
message.action === C.ACTIONS.UNLISTEN ||
message.action === C.ACTIONS.LISTEN_ACCEPT ||
...
_removeSubscriber = function (socket, message) { if (validateSubscriptionMessage(socket, message)) { this._subscriptionRegistry.unsubscribe(message.data[0], socket) } }
...
* @public
* @returns {void}
*/
EventHandler.prototype.handle = function (socket, message) {
if (message.action === C.ACTIONS.SUBSCRIBE) {
this._addSubscriber(socket, message)
} else if (message.action === C.ACTIONS.UNSUBSCRIBE) {
this._removeSubscriber(socket, message)
} else if (message.action === C.ACTIONS.EVENT) {
this._triggerEvent(socket, message)
} else if (message.action === C.ACTIONS.LISTEN ||
message.action === C.ACTIONS.UNLISTEN ||
message.action === C.ACTIONS.LISTEN_ACCEPT ||
message.action === C.ACTIONS.LISTEN_REJECT) {
this._listenerRegistry.handle(socket, message)
...
_sendError = function (socket, event, message) { if (socket && socket.sendError) { socket.sendError(C.TOPIC.EVENT, event, message) } this._logger.log(C.LOG_LEVEL.ERROR, event, message) }
...
this._triggerEvent(socket, message)
} else if (message.action === C.ACTIONS.LISTEN ||
message.action === C.ACTIONS.UNLISTEN ||
message.action === C.ACTIONS.LISTEN_ACCEPT ||
message.action === C.ACTIONS.LISTEN_REJECT) {
this._listenerRegistry.handle(socket, message)
} else {
this._sendError(socket, C.EVENT.UNKNOWN_ACTION, `unknown action ${message.action}`)
}
}
/**
* Handler for the SUBSCRIBE action. Adds the socket as
* a subscriber to the specified event name
*
...
_triggerEvent = function (socket, message) { if (typeof message.data[0] !== 'string') { this._sendError(socket, C.EVENT.INVALID_MESSAGE_DATA, message.raw) return } this._logger.log(C.LOG_LEVEL.DEBUG, C.EVENT.TRIGGER_EVENT, message.raw) if (socket !== C.SOURCE_MESSAGE_CONNECTOR) { this._message.publish(C.TOPIC.EVENT, message) } this._subscriptionRegistry.sendToSubscribers( message.data[0], messageBuilder.getMsg(C.TOPIC.EVENT, C.ACTIONS.EVENT, message.data), false, socket ) }
...
*/
EventHandler.prototype.handle = function (socket, message) {
if (message.action === C.ACTIONS.SUBSCRIBE) {
this._addSubscriber(socket, message)
} else if (message.action === C.ACTIONS.UNSUBSCRIBE) {
this._removeSubscriber(socket, message)
} else if (message.action === C.ACTIONS.EVENT) {
this._triggerEvent(socket, message)
} else if (message.action === C.ACTIONS.LISTEN ||
message.action === C.ACTIONS.UNLISTEN ||
message.action === C.ACTIONS.LISTEN_ACCEPT ||
message.action === C.ACTIONS.LISTEN_REJECT) {
this._listenerRegistry.handle(socket, message)
} else {
this._sendError(socket, C.EVENT.UNKNOWN_ACTION, `unknown action ${message.action}`)
...
handle = function (socket, message) { if (message.action === C.ACTIONS.SUBSCRIBE) { this._addSubscriber(socket, message) } else if (message.action === C.ACTIONS.UNSUBSCRIBE) { this._removeSubscriber(socket, message) } else if (message.action === C.ACTIONS.EVENT) { this._triggerEvent(socket, message) } else if (message.action === C.ACTIONS.LISTEN || message.action === C.ACTIONS.UNLISTEN || message.action === C.ACTIONS.LISTEN_ACCEPT || message.action === C.ACTIONS.LISTEN_REJECT) { this._listenerRegistry.handle(socket, message) } else { this._sendError(socket, C.EVENT.UNKNOWN_ACTION, `unknown action ${message.action}`) } }
...
this._removeSubscriber(socket, message)
} else if (message.action === C.ACTIONS.EVENT) {
this._triggerEvent(socket, message)
} else if (message.action === C.ACTIONS.LISTEN ||
message.action === C.ACTIONS.UNLISTEN ||
message.action === C.ACTIONS.LISTEN_ACCEPT ||
message.action === C.ACTIONS.LISTEN_REJECT) {
this._listenerRegistry.handle(socket, message)
} else {
this._sendError(socket, C.EVENT.UNKNOWN_ACTION, `unknown action ${message.action}`)
}
}
/**
* Handler for the SUBSCRIBE action. Adds the socket as
...
fileExistsSync = function (filePath) { try { fs.lstatSync(filePath) return true } catch (e) { return false } }
...
*
* @param {String} configPath the path to the config file
*
* @private
* @returns {String} verified path
*/
function verifyCustomConfigPath (configPath) {
if (fileUtils.fileExistsSync(configPath)) {
return configPath
}
throw new Error(`Configuration file not found at: ${configPath}`)
}
/**
...
lookupConfRequirePath = function (filePath) { return exports.lookupRequirePath(filePath, global.deepstreamConfDir) }
...
let filePath
for (let i = 0; i < sslFiles.length; i++) {
key = sslFiles[i]
filePath = config[key]
if (!filePath) {
continue
}
resolvedFilePath = fileUtils.lookupConfRequirePath(filePath)
try {
config[key] = fs.readFileSync(resolvedFilePath, 'utf8')
} catch (e) {
throw new Error(`The file path "${resolvedFilePath}" provided by "${key}" does not exist.`)
}
}
}
...
lookupLibRequirePath = function (filePath) { return exports.lookupRequirePath(filePath, global.deepstreamLibDir) }
...
function resolvePluginClass (plugin, type) {
// nexe needs *global.require* for __dynamic__ modules
// but browserify and proxyquire can't handle *global.require*
const req = global && global.require ? global.require : require
let requirePath
let pluginConstructor
if (plugin.path != null) {
requirePath = fileUtils.lookupLibRequirePath(plugin.path)
pluginConstructor = req(requirePath)
} else if (plugin.name != null) {
if (type != null) {
requirePath = `deepstream.io-${type}-${plugin.name}`
requirePath = fileUtils.lookupLibRequirePath(requirePath)
pluginConstructor = req(requirePath)
}
...
lookupRequirePath = function (filePath, prefix) { // filePath is absolute if (path.parse(filePath).root !== '') { return filePath } // filePath is not relative (and not absolute) if (filePath[0] !== '.') { if (prefix == null) { return filePath } return resolvePrefixAndFile(filePath, prefix) } // filePath is relative, starts with . if (prefix == null) { return path.resolve(process.cwd(), filePath) } return resolvePrefixAndFile(filePath, prefix) }
...
*
* @param {String} filePath
*
* @private
* @returns {String} file path with the library prefix used
*/
exports.lookupLibRequirePath = function (filePath) {
return exports.lookupRequirePath(filePath, global.deepstreamLibDir)
}
/**
* Append the global configuration directory as the prefix to any path
* used here
*
* @param {String} filePath
...
loadConfig = function (filePath, args) { const config = exports.loadConfigWithoutInitialisation(filePath, args) return { config: configInitialiser.initialise(config.config), file: config.configPath } }
...
* first
*
* @public
* @returns {void}
*/
ConfigPermissionHandler.prototype.init = function () {
if (this._config === null && this._optionsValid) {
this.loadConfig(this._permissionOptions.path)
}
}
/**
* Load a configuration file. This will either load a configuration file for the first time at
* startup or reload the configuration at runtime
*
...
loadConfigWithoutInitialisation = function (filePath, args) { const argv = args || global.deepstreamCLI || {} const configPath = setGlobalConfigDirectory(argv, filePath) const configString = fs.readFileSync(configPath, { encoding: 'utf8' }) const rawConfig = parseFile(configPath, configString) const config = extendConfig(rawConfig, argv) setGlobalLibDirectory(argv, config) return { config, configPath } }
...
.description('Generate a hash from a plaintext password using file auth configuration settings')
.option('-c, --config [file]', 'configuration file containing file auth and hash settings')
.action(hash)
}
function hash(password) {
global.deepstreamCLI = this
const config = jsYamlLoader.loadConfigWithoutInitialisation().config
if (config.auth.type !== 'file') {
console.error('Error: Can only use hash with file authentication as auth type')
process.exit(1)
}
if (!config.auth.options.hash) {
...
readAndParseFile = function (filePath, callback) { try { fs.readFile(filePath, 'utf8', (error, fileContent) => { if (error) { return callback(error) } try { const config = parseFile(filePath, fileContent) return callback(null, config) } catch (parseError) { return callback(parseError) } }) } catch (error) { callback(error) } }
...
constructor (settings) {
super()
this.isReady = false
this.type = `file using ${settings.path}`
this._validateSettings(settings)
this._settings = settings
this._base64KeyLength = 4 * Math.ceil(this._settings.keyLength / 3)
jsYamlLoader.readAndParseFile(settings.path, this._onFileLoad.bind(this))
}
/**
* Main interface. Authenticates incoming connections
*
* @param {Object} connectionData
* @param {Object} authData
...
json_path = function (path) { this._path = path this._tokens = [] this._tokenize() }
n/a
_tokenize = function () { const parts = this._path.split(SPLIT_REG_EXP) let part let i for (i = 0; i < parts.length; i++) { part = parts[i].trim() if (part.length === 0) { continue } if (!isNaN(part)) { this._tokens.push(parseInt(part, 10)) continue } this._tokens.push(part) } }
...
* @param {String} path A path, e.g. users[2].firstname
*
* @constructor
*/
const JsonPath = function (path) {
this._path = path
this._tokens = []
this._tokenize()
}
/**
* Sets the value of the path. If the path (or parts
* of it) doesn't exist yet, it will be created
*
* @param {Object} node
...
setValue = function (node, value) { let i = 0 for (i = 0; i < this._tokens.length - 1; i++) { if (node[this._tokens[i]] !== undefined) { node = node[this._tokens[i]] } else if (this._tokens[i + 1] && !isNaN(this._tokens[i + 1])) { node = node[this._tokens[i]] = [] } else { node = node[this._tokens[i]] = {} } } node[this._tokens[i]] = value }
...
if (currentData === null) {
return new Error(`Tried to apply patch to non-existant record ${msg.data[0]}`)
}
if (typeof currentData !== UNDEFINED && currentData !== LOADING) {
jsonPath = new JsonPath(msg.data[2])
data = JSON.parse(JSON.stringify(currentData._d))
jsonPath.setValue(data, newData)
return data
}
this._loadRecord(this._params.name)
}
/**
* Returns or loads the record's previous value. Only supported for record
...
getErrorMsg = function (topic, type, message) { if (message instanceof Array) { return `${topic + SEP}E${SEP}${type}${SEP}${message.join(SEP)}${C.MESSAGE_SEPERATOR}` } return `${topic + SEP}E${SEP}${type}${SEP}${message}${C.MESSAGE_SEPERATOR}` }
...
* @param {String} msg generic error message
*
* @public
* @returns {void}
*/
SocketWrapper.prototype.sendError = function (topic, type, msg) {
if (this.isClosed === false) {
this.send(messageBuilder.getErrorMsg(topic, type, msg))
}
}
/**
* Sends a message based on the provided action and topic
*
* @param {String} topic one of C.TOPIC
...
getMsg = function (topic, action, data) { const sendData = [topic, action] if (data) { for (let i = 0; i < data.length; i++) { if (typeof data[i] === 'object') { sendData.push(JSON.stringify(data[i])) } else { sendData.push(data[i]) } } } return sendData.join(SEP) + C.MESSAGE_SEPERATOR }
...
if (socket !== C.SOURCE_MESSAGE_CONNECTOR) {
this._message.publish(C.TOPIC.EVENT, message)
}
this._subscriptionRegistry.sendToSubscribers(
message.data[0],
messageBuilder.getMsg(C.TOPIC.EVENT, C.ACTIONS.EVENT, message.data),
false,
socket
)
}
EventHandler.prototype._sendError = function (socket, event, message) {
if (socket && socket.sendError) {
...
typed = function (value) { const type = typeof value if (type === 'string') { return C.TYPES.STRING + value } if (value === null) { return C.TYPES.NULL } if (type === 'object') { return C.TYPES.OBJECT + JSON.stringify(value) } if (type === 'number') { return C.TYPES.NUMBER + value.toString() } if (value === true) { return C.TYPES.TRUE } if (value === false) { return C.TYPES.FALSE } if (value === undefined) { return C.TYPES.UNDEFINED } throw new Error(`Can't serialize type ${value}`) }
...
this._appendDataToSocketWrapper(socketWrapper, userData)
if (typeof userData.clientData === 'undefined') {
socketWrapper.sendMessage(C.TOPIC.AUTH, C.ACTIONS.ACK)
} else {
socketWrapper.sendMessage(
C.TOPIC.AUTH,
C.ACTIONS.ACK,
[messageBuilder.typed(userData.clientData)]
)
}
if (socketWrapper.user !== OPEN) {
this.emit('client-connected', socketWrapper)
}
...
publish = function () {}
...
* @returns {[type]}
*/
leaveCluster () {
if (this._inCluster === false) {
return
}
this._options.logger.log(C.LOG_LEVEL.INFO, C.EVENT.CLUSTER_LEAVE, this._options.serverName)
this._options.messageConnector.publish(C.TOPIC.CLUSTER, {
topic: C.TOPIC.CLUSTER,
action: C.ACTIONS.REMOVE,
data: [this._options.serverName]
})
// TODO: If a message connector doesn't close this is required to avoid an error
// being thrown during shutdown
...
subscribe = function () {}
...
this._nodes = {}
this._leaderScore = Math.random()
this.setMaxListeners(12)
this._onMessageFn = this._onMessage.bind(this)
this._leaveClusterFn = this.leaveCluster.bind(this)
this._options.messageConnector.subscribe(C.TOPIC.CLUSTER, this._onMessageFn)
this._publishStatus()
this._publishInterval = setInterval(
this._publishStatus.bind(this),
this._options.clusterKeepAliveInterval
)
this._checkInterval = setInterval(
this._checkNodes.bind(this),
...
unsubscribe = function () {}
...
topic: C.TOPIC.CLUSTER,
action: C.ACTIONS.REMOVE,
data: [this._options.serverName]
})
// TODO: If a message connector doesn't close this is required to avoid an error
// being thrown during shutdown
// this._options.messageConnector.unsubscribe( C.TOPIC.CLUSTER, this._onMessageFn );
process.removeListener('beforeExit', this._leaveClusterFn)
process.removeListener('exit', this._leaveClusterFn)
clearInterval(this._publishInterval)
clearInterval(this._checkInterval)
this._nodes = {}
this._inCluster = false
...
delete = function (key, callback) { callback(null) }
...
}
if (p.test(subscriptionName)) {
servers = servers.concat(providerRegistry.getAllServers(pattern))
}
}
const set = new Set(servers)
set.delete(this._options.serverName)
if (!this._options.shuffleListenProviders) {
return Array.from(set)
}
return utils.shuffleArray(Array.from(set))
}
...
get = function (key, callback) { callback(null, null) }
...
*
* @param {String} urlPath URL where to download the archive
* @param {Stream} writeable output stream to save the archive
* @param {Function} callback Callback (err)
* @return {void}
*/
const downloadArchive = function (urlPath, outStream, callback) {
needle.get(`https://github.com${urlPath}`, {
follow_max: 5,
headers: { 'User-Agent': 'nodejs-client' }
}, (error, response) => {
if (error) {
return callback(error)
}
outStream.write(response.body)
...
set = function (key, value, callback) { callback(null) }
...
*/
ConfigPermissionHandler.prototype.setRecordHandler = function (recordHandler) {
this._recordHandler = recordHandler
}
/**
* Will be called by the dependency initialiser once server.start() is called.
* This gives users a chance to change the path using server.set()
* first
*
* @public
* @returns {void}
*/
ConfigPermissionHandler.prototype.init = function () {
if (this._config === null && this._optionsValid) {
...
parse = function (path) { const variables = [] let regExp = path.replace(WILDCARD_REGEXP, WILDCARD_STRING) regExp = regExp.replace(VARIABLE_REGEXP, (variableName) => { variables.push(variableName) return VARIABLE_STRING }) return { variables, path, regexp: new RegExp(`^${regExp}$`) } }
...
}
function fetchLibs(libDir, meta) {
const directory = libDir || 'lib'
const files = glob.sync(path.join(directory, '*', 'package.json'))
meta.libs = files.map((filePath) => {
const pkg = fs.readFileSync(filePath, 'utf8')
const object = JSON.parse(pkg)
return `${object.name}:${object.version}`
})
}
...
validate = function (path) { if (typeof path !== 'string') { return 'path must be a string' } if (path.length === 0) { return 'path can\'t be empty' } if (path[0] === '/') { return 'path can\'t start with /' } const invalidVariableNames = path.match(INVALID_VARIABLE_REGEXP) if (invalidVariableNames !== null) { return `invalid variable name ${invalidVariableNames[0]}` } return true }
...
*
* @param {Object} config deepstream permissionConfig
*
* @public
* @returns {void}
*/
ConfigPermissionHandler.prototype.useConfig = function (config) {
const validationResult = configValidator.validate(config)
if (validationResult !== true) {
this.emit('error', `invalid permission config - ${validationResult}`)
return
}
this._config = configCompiler.compile(config)
...
ensureNotRunning = function (callback) { read((err, data) => { if (err) { if (err.code === 'ENOENT') { return callback() } return callback(err) } try { const pid = data.pid if (isRunning(pid)) { throw new Error(`A deepstream server is already running with PID ${pid}, see ${PID_FILE}`) } else { // pid file is there but process is not running anymore return callback() } } catch (err) { return callback(err) } }) }
...
args.push('--lib-dir')
args.push(this.libDir)
}
// TODO: need to pass other options as well, which are accessable directly as properties of this
// but need to transform camelCase back to kebabCase, like disableAuth
// ensure there is no pid file with a running process
pidHelper.ensureNotRunning((err) => {
if (err) {
return pidHelper.exit(err)
}
const child = child_process.spawn(path.join(__dirname, 'deepstream'), ['start'].concat(args), {
detached: true,
stdio: ['ignore']
})
...
exit = function (err) { if (err instanceof Error) { console.error(colors.red(err.toString())) console.error(err) process.exit(1) } else { remove(() => { process.exit(0) }) } }
...
function hash(password) {
global.deepstreamCLI = this
const config = jsYamlLoader.loadConfigWithoutInitialisation().config
if (config.auth.type !== 'file') {
console.error('Error: Can only use hash with file authentication as auth type')
process.exit(1)
}
if (!config.auth.options.hash) {
console.error('Error: Can only use hash with file authentication')
process.exit(1)
}
...
isRunning = function (pid) { try { return process.kill(pid, 0) } catch (e) { return e.code === 'EPERM' } }
...
const pidHelper = require('./pid-helper')
function statusCheck(cmd, program) {
pidHelper.read((err, data) => {
if (err) {
return console.log('not running (no pid file)')
}
if (pidHelper.isRunning(data.pid)) {
const seconds = (new Date().getTime() - data.timestamp) / 1000
console.log(`Process running with PID ${data.pid} since ${seconds} seconds`)
} else {
console.log(`Not running (no process for PID ${data.pid})`)
}
})
}
...
read = function (callback) { fs.readFile(PID_FILE, 'utf8', (err, content) => { if (err) { return callback(err) } try { return callback(null, JSON.parse(content)) } catch (err) { return callback(err) } }) }
...
'use strict'
const pidHelper = require('./pid-helper')
function statusCheck(cmd, program) {
pidHelper.read((err, data) => {
if (err) {
return console.log('not running (no pid file)')
}
if (pidHelper.isRunning(data.pid)) {
const seconds = (new Date().getTime() - data.timestamp) / 1000
console.log(`Process running with PID ${data.pid} since ${seconds} seconds`)
} else {
...
save = function (pid, callback) { if (callback == null) { callback = function () {} } if (pid == null) { return callback(new Error('pid is null')) } const now = new Date() const data = { pid, timestamp: now.getTime(), time: now.toString() } fs.writeFile(PID_FILE, JSON.stringify(data), (err) => { callback(err) }) }
...
} else {
// non-detach case
const Deepstream = require('../src/deepstream.io.js')
try {
process.on('uncaughtException', pidHelper.exit)
const ds = new Deepstream(null)
ds.on('started', () => {
pidHelper.save(process.pid)
})
ds.start()
} catch (err) {
console.error(err.toString())
console.trace()
process.exit(1)
}
...
stop = function () { read((err, data) => { if (err) { return console.log('no pid file') } else { try { process.kill(data.pid) } catch (err) { return console.log(`No process found for PID ${data.pid}`) } const uptime = new Date().getTime() - data.timestamp console.log(`Deepstream was running for ${uptime / 1000} seconds`) } }) }
n/a
record_deletion = function (options, socketWrapper, message, successCallback) { this._options = options this._socketWrapper = socketWrapper this._message = message this._successCallback = successCallback this._recordName = message.data[0] this._completed = 0 this._isDestroyed = false this._cacheTimeout = setTimeout( this._handleError.bind(this, 'cache timeout'), this._options.cacheRetrievalTimeout ) this._options.cache.delete( this._recordName, this._checkIfDone.bind(this, this._cacheTimeout) ) if (!this._options.storageExclusion || !this._options.storageExclusion.test(this._recordName)) { this._storageTimeout = setTimeout( this._handleError.bind(this, 'storage timeout'), this._options.storageRetrievalTimeout ) this._options.storage.delete( this._recordName, this._checkIfDone.bind(this, this._storageTimeout) ) } else { this._checkIfDone(null) } }
n/a
_checkIfDone = function (timeoutId, error) { clearTimeout(timeoutId) this._completed++ if (this._isDestroyed) { return } if (error) { this._handleError(error.toString()) return } if (this._completed === 2) { this._done() } }
...
this._options.storageRetrievalTimeout
)
this._options.storage.delete(
this._recordName,
this._checkIfDone.bind(this, this._storageTimeout)
)
} else {
this._checkIfDone(null)
}
}
/**
* Callback for completed cache and storage interactions. Will invoke
* _done() once both are completed
*
...
_destroy = function () { clearTimeout(this._cacheTimeout) clearTimeout(this._storageTimeout) this._options = null this._socketWrapper = null this._message = null this._isDestroyed = true }
...
* @private
* @returns {void}
*/
_onComplete (error, response) {
if (error) {
this._logger.log(C.LOG_LEVEL.WARN, C.EVENT.AUTH_ERROR, `http auth error: ${error}`)
this._callback(false, null)
this._destroy()
return
}
if (response.statusCode >= 500 && response.statusCode < 600) {
this._logger.log(C.LOG_LEVEL.WARN, C.EVENT.AUTH_ERROR, `http auth server error: ${response.body}`)
}
...
_done = function () { this._options.logger.log(C.LOG_LEVEL.INFO, C.EVENT.RECORD_DELETION, this._recordName) const ackMessage = { topic: C.TOPIC.RECORD, action: C.ACTIONS.ACK, data: [C.ACTIONS.DELETE, this._recordName], raw: messageBuilder.getMsg(C.TOPIC.RECORD, C.ACTIONS.ACK, [C.ACTIONS.DELETE, this._recordName]) } this._socketWrapper.send(ackMessage.raw) this._successCallback(this._recordName, ackMessage, this._socketWrapper) this._destroy() }
...
if (error) {
this._handleError(error.toString())
return
}
if (this._completed === 2) {
this._done()
}
}
/**
* Callback for successful deletions. Notifies the original sender and calls
* the callback to allow the recordHandler to broadcast the deletion
*
...
_handleError = function (errorMsg) { this._socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.RECORD_DELETE_ERROR, errorMsg) this._options.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.RECORD_DELETE_ERROR, errorMsg) this._destroy() }
...
this._completed++
if (this._isDestroyed) {
return
}
if (error) {
this._handleError(error.toString())
return
}
if (this._completed === 2) {
this._done()
}
}
...
record_handler = function (options) { this._options = options this._subscriptionRegistry = new SubscriptionRegistry(options, C.TOPIC.RECORD) this._listenerRegistry = new ListenerRegistry(C.TOPIC.RECORD, options, this._subscriptionRegistry) this._subscriptionRegistry.setSubscriptionListener(this._listenerRegistry) this._transitions = {} this._recordRequestsInProgress = {} }
n/a
_create = function (recordName, socketWrapper) { const record = { _v: 0, _d: {} } // store the records data in the cache and wait for the result this._options.cache.set(recordName, record, (error) => { if (error) { this._options.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.RECORD_CREATE_ERROR, recordName) socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.RECORD_CREATE_ERROR, recordName) } else { this._read(recordName, record, socketWrapper) } }) if (!this._options.storageExclusion || !this._options.storageExclusion.test(recordName)) { // store the record data in the persistant storage independently and don't wait for the result this._options.storage.set(recordName, record, (error) => { if (error) { this._options.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.RECORD_CREATE_ERROR, `storage:${error}`) } }) } }
n/a
_createOrRead = function (socketWrapper, message) { const recordName = message.data[0] const onComplete = function (record) { if (record) { this._read(recordName, record, socketWrapper) } else { this._permissionAction( C.ACTIONS.CREATE, recordName, socketWrapper, this._create.bind(this, recordName, socketWrapper) ) } } // eslint-disable-next-line new RecordRequest( recordName, this._options, socketWrapper, onComplete.bind(this) ) }
...
}
if (message.action === C.ACTIONS.CREATEORREAD) {
/*
* Return the record's contents and subscribes for future updates.
* Creates the record if it doesn't exist
*/
this._createOrRead(socketWrapper, message)
} else if (message.action === C.ACTIONS.SNAPSHOT) {
/*
* Return the current state of the record in cache or db
*/
this._snapshot(socketWrapper, message)
} else if (message.action === C.ACTIONS.HEAD) {
/*
...
_delete = function (socketWrapper, message) { const recordName = message.data[0] if (this._transitions[recordName]) { this._transitions[recordName].destroy() delete this._transitions[recordName] } if (socketWrapper === C.SOURCE_MESSAGE_CONNECTOR) { this._onDeleted(recordName, message, socketWrapper) } else { // eslint-disable-next-line new RecordDeletion(this._options, socketWrapper, message, this._onDeleted.bind(this)) } }
...
* Handle complete (UPDATE) or partial (PATCH) updates
*/
this._update(socketWrapper, message)
} else if (message.action === C.ACTIONS.DELETE) {
/*
* Deletes the record
*/
this._delete(socketWrapper, message)
} else if (message.action === C.ACTIONS.UNSUBSCRIBE) {
/*
* Unsubscribes (discards) a record that was previously subscribed to
* using read()
*/
this._subscriptionRegistry.unsubscribe(message.data[0], socketWrapper)
} else if (message.action === C.ACTIONS.LISTEN ||
...
_hasRecord = function (socketWrapper, message) { const recordName = message.data[0] const onComplete = function (record) { const hasRecord = record ? C.TYPES.TRUE : C.TYPES.FALSE socketWrapper.sendMessage(C.TOPIC.RECORD, C.ACTIONS.HAS, [recordName, hasRecord]) } const onError = function (error) { socketWrapper.sendError(C.TOPIC.RECORD, C.ACTIONS.HAS, [recordName, error]) } // eslint-disable-next-line new RecordRequest(recordName, this._options, socketWrapper, onComplete.bind(this), onError.bind(this) ) }
...
* Return the current state of the record in cache or db
*/
this._head(socketWrapper, message)
} else if (message.action === C.ACTIONS.HAS) {
/*
* Return a Boolean to indicate if record exists in cache or database
*/
this._hasRecord(socketWrapper, message)
} else if (message.action === C.ACTIONS.UPDATE || message.action === C.ACTIONS.PATCH) {
/*
* Handle complete (UPDATE) or partial (PATCH) updates
*/
this._update(socketWrapper, message)
} else if (message.action === C.ACTIONS.DELETE) {
/*
...
_head = function (socketWrapper, message) { const recordName = message.data[0] const onComplete = function (record) { if (record) { socketWrapper.sendMessage(C.TOPIC.RECORD, C.ACTIONS.HEAD, [recordName, record._v]) } else { socketWrapper.sendError( C.TOPIC.RECORD, C.ACTIONS.HEAD, [recordName, C.EVENT.RECORD_NOT_FOUND] ) } } const onError = function (error) { socketWrapper.sendError(C.TOPIC.RECORD, C.ACTIONS.HEAD, [recordName, error]) } // eslint-disable-next-line new RecordRequest( recordName, this._options, socketWrapper, onComplete.bind(this), onError.bind(this) ) }
...
* Return the current state of the record in cache or db
*/
this._snapshot(socketWrapper, message)
} else if (message.action === C.ACTIONS.HEAD) {
/*
* Return the current state of the record in cache or db
*/
this._head(socketWrapper, message)
} else if (message.action === C.ACTIONS.HAS) {
/*
* Return a Boolean to indicate if record exists in cache or database
*/
this._hasRecord(socketWrapper, message)
} else if (message.action === C.ACTIONS.UPDATE || message.action === C.ACTIONS.PATCH) {
/*
...
_onDeleted = function (name, message, originalSender) { this._$broadcastUpdate(name, message, true, originalSender) for (const subscriber of this._subscriptionRegistry.getLocalSubscribers(name)) { this._subscriptionRegistry.unsubscribe(name, subscriber, true) } }
...
if (this._transitions[recordName]) {
this._transitions[recordName].destroy()
delete this._transitions[recordName]
}
if (socketWrapper === C.SOURCE_MESSAGE_CONNECTOR) {
this._onDeleted(recordName, message, socketWrapper)
} else {
// eslint-disable-next-line
new RecordDeletion(this._options, socketWrapper, message, this._onDeleted.bind(this))
}
}
/*
...
_permissionAction = function ( action, recordName, socketWrapper, successCallback ) { const message = { topic: C.TOPIC.RECORD, action, data: [recordName] } const onResult = function (error, canPerformAction) { if (error !== null) { socketWrapper.sendError(message.topic, C.EVENT.MESSAGE_PERMISSION_ERROR, error.toString()) } else if (canPerformAction !== true) { socketWrapper.sendError(message.topic, C.EVENT.MESSAGE_DENIED, [recordName, action]) } else { successCallback() } } this._options.permissionHandler.canPerformAction( socketWrapper.user, message, onResult, socketWrapper.authData ) }
...
RecordHandler.prototype._createOrRead = function (socketWrapper, message) {
const recordName = message.data[0]
const onComplete = function (record) {
if (record) {
this._read(recordName, record, socketWrapper)
} else {
this._permissionAction(
C.ACTIONS.CREATE,
recordName,
socketWrapper,
this._create.bind(this, recordName, socketWrapper)
)
}
}
...
_read = function (recordName, record, socketWrapper) { this._permissionAction(C.ACTIONS.READ, recordName, socketWrapper, () => { this._subscriptionRegistry.subscribe(recordName, socketWrapper) this._sendRecord(recordName, record, socketWrapper) }) }
...
* @returns {void}
*/
RecordHandler.prototype._createOrRead = function (socketWrapper, message) {
const recordName = message.data[0]
const onComplete = function (record) {
if (record) {
this._read(recordName, record, socketWrapper)
} else {
this._permissionAction(
C.ACTIONS.CREATE,
recordName,
socketWrapper,
this._create.bind(this, recordName, socketWrapper)
)
...
_sendRecord = function (recordName, record, socketWrapper) { socketWrapper.sendMessage(C.TOPIC.RECORD, C.ACTIONS.READ, [recordName, record._v, record._d]) }
...
* @returns {void}
*/
RecordHandler.prototype._snapshot = function (socketWrapper, message) {
const recordName = message.data[0]
const onComplete = function (record) {
if (record) {
this._sendRecord(recordName, record, socketWrapper)
} else {
socketWrapper.sendError(
C.TOPIC.RECORD,
C.ACTIONS.SNAPSHOT,
[recordName, C.EVENT.RECORD_NOT_FOUND]
)
}
...
_snapshot = function (socketWrapper, message) { const recordName = message.data[0] const onComplete = function (record) { if (record) { this._sendRecord(recordName, record, socketWrapper) } else { socketWrapper.sendError( C.TOPIC.RECORD, C.ACTIONS.SNAPSHOT, [recordName, C.EVENT.RECORD_NOT_FOUND] ) } } const onError = function (error) { socketWrapper.sendError(C.TOPIC.RECORD, C.ACTIONS.SNAPSHOT, [recordName, error]) } // eslint-disable-next-line new RecordRequest( recordName, this._options, socketWrapper, onComplete.bind(this), onError.bind(this) ) }
...
* Creates the record if it doesn't exist
*/
this._createOrRead(socketWrapper, message)
} else if (message.action === C.ACTIONS.SNAPSHOT) {
/*
* Return the current state of the record in cache or db
*/
this._snapshot(socketWrapper, message)
} else if (message.action === C.ACTIONS.HEAD) {
/*
* Return the current state of the record in cache or db
*/
this._head(socketWrapper, message)
} else if (message.action === C.ACTIONS.HAS) {
/*
...
_update = function (socketWrapper, message) {
if (message.data.length < 3) {
socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, message.data[0])
return
}
const recordName = message.data[0]
const version = parseInt(message.data[1], 10)
/*
* If the update message is received from the message bus, rather than from a client,
* assume that the original deepstream node has already updated the record in cache and
* storage and only broadcast the message to subscribers
*/
if (socketWrapper === C.SOURCE_MESSAGE_CONNECTOR) {
this._$broadcastUpdate(recordName, message, false, socketWrapper)
return
}
if (isNaN(version)) {
socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_VERSION, [recordName, version])
return
}
if (this._transitions[recordName] && this._transitions[recordName].hasVersion(version)) {
this._transitions[recordName].sendVersionExists({ message, version, sender: socketWrapper })
return
}
if (!this._transitions[recordName]) {
this._transitions[recordName] = new RecordTransition(recordName, this._options, this)
}
this._transitions[recordName].add(socketWrapper, version, message)
}
...
* Return a Boolean to indicate if record exists in cache or database
*/
this._hasRecord(socketWrapper, message)
} else if (message.action === C.ACTIONS.UPDATE || message.action === C.ACTIONS.PATCH) {
/*
* Handle complete (UPDATE) or partial (PATCH) updates
*/
this._update(socketWrapper, message)
} else if (message.action === C.ACTIONS.DELETE) {
/*
* Deletes the record
*/
this._delete(socketWrapper, message)
} else if (message.action === C.ACTIONS.UNSUBSCRIBE) {
/*
...
handle = function (socketWrapper, message) {
/*
* All messages have to provide at least the name of the record they relate to
* or a pattern in case of listen
*/
if (!message.data || message.data.length < 1) {
socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, message.raw)
return
}
if (message.action === C.ACTIONS.CREATEORREAD) {
/*
* Return the record's contents and subscribes for future updates.
* Creates the record if it doesn't exist
*/
this._createOrRead(socketWrapper, message)
} else if (message.action === C.ACTIONS.SNAPSHOT) {
/*
* Return the current state of the record in cache or db
*/
this._snapshot(socketWrapper, message)
} else if (message.action === C.ACTIONS.HEAD) {
/*
* Return the current state of the record in cache or db
*/
this._head(socketWrapper, message)
} else if (message.action === C.ACTIONS.HAS) {
/*
* Return a Boolean to indicate if record exists in cache or database
*/
this._hasRecord(socketWrapper, message)
} else if (message.action === C.ACTIONS.UPDATE || message.action === C.ACTIONS.PATCH) {
/*
* Handle complete (UPDATE) or partial (PATCH) updates
*/
this._update(socketWrapper, message)
} else if (message.action === C.ACTIONS.DELETE) {
/*
* Deletes the record
*/
this._delete(socketWrapper, message)
} else if (message.action === C.ACTIONS.UNSUBSCRIBE) {
/*
* Unsubscribes (discards) a record that was previously subscribed to
* using read()
*/
this._subscriptionRegistry.unsubscribe(message.data[0], socketWrapper)
} else if (message.action === C.ACTIONS.LISTEN ||
/*
* Listen to requests for a particular record or records
* whose names match a pattern
*/
message.action === C.ACTIONS.UNLISTEN ||
message.action === C.ACTIONS.LISTEN_ACCEPT ||
message.action === C.ACTIONS.LISTEN_REJECT ||
message.action === C.ACTIONS.LISTEN_SNAPSHOT) {
this._listenerRegistry.handle(socketWrapper, message)
} else {
/*
* Default for invalid messages
*/
this._options.logger.log(C.LOG_LEVEL.WARN, C.EVENT.UNKNOWN_ACTION, message.action)
if (socketWrapper !== C.SOURCE_MESSAGE_CONNECTOR) {
socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.UNKNOWN_ACTION, `unknown action ${message.action}`)
}
}
}
...
this._removeSubscriber(socket, message)
} else if (message.action === C.ACTIONS.EVENT) {
this._triggerEvent(socket, message)
} else if (message.action === C.ACTIONS.LISTEN ||
message.action === C.ACTIONS.UNLISTEN ||
message.action === C.ACTIONS.LISTEN_ACCEPT ||
message.action === C.ACTIONS.LISTEN_REJECT) {
this._listenerRegistry.handle(socket, message)
} else {
this._sendError(socket, C.EVENT.UNKNOWN_ACTION, `unknown action ${message.action}`)
}
}
/**
* Handler for the SUBSCRIBE action. Adds the socket as
...
removeRecordRequest = function (recordName) { if (!this._recordRequestsInProgress[recordName]) { return } if (this._recordRequestsInProgress[recordName].length === 0) { delete this._recordRequestsInProgress[recordName] return } const callback = this._recordRequestsInProgress[recordName].splice(0, 1)[0] callback(recordName) }
...
* Destroys this class and nulls down values to avoid
* memory leaks
*
* @private
* @returns {void}
*/
RuleApplication.prototype._destroy = function () {
this._params.recordHandler.removeRecordRequest(this._params.name)
this._isDestroyed = true
this._runScheduled = false
this._params = null
this._crossReferenceFn = null
this._pathVars = null
this._user = null
this._recordData = null
...
runWhenRecordStable = function (recordName, callback) { if ( !this._recordRequestsInProgress[recordName] || this._recordRequestsInProgress[recordName].length === 0 ) { this._recordRequestsInProgress[recordName] = [] callback(recordName) } else { this._recordRequestsInProgress[recordName].push(callback) } }
...
if (typeof this._recordData[recordName] !== UNDEFINED) {
this._onLoadComplete(recordName, this._recordData[recordName])
return
}
this._recordData[recordName] = LOADING
this._params.recordHandler.runWhenRecordStable(
recordName,
this._createNewRecordRequest.bind(this)
)
}
/**
* Load the record data from the cache for permissioning. This method should be
...
record_request = function (recordName, options, socketWrapper, onComplete, onError) { this._recordName = recordName this._options = options this._socketWrapper = socketWrapper this._storageRetrievalTimeout = null this._onComplete = onComplete this._onError = onError this._isDestroyed = false this._cacheRetrievalTimeout = setTimeout( this._sendError.bind(this, C.EVENT.CACHE_RETRIEVAL_TIMEOUT, this._recordName), this._options.cacheRetrievalTimeout ) this._options.cache.get(this._recordName, this._onCacheResponse.bind(this)) }
n/a
_destroy = function () { clearTimeout(this._cacheRetrievalTimeout) clearTimeout(this._storageRetrievalTimeout) this._recordName = null this._options = null this._socketWrapper = null this._storageRetrievalTimeout = null this._onComplete = null this._onError = null this._isDestroyed = true }
...
* @private
* @returns {void}
*/
_onComplete (error, response) {
if (error) {
this._logger.log(C.LOG_LEVEL.WARN, C.EVENT.AUTH_ERROR, `http auth error: ${error}`)
this._callback(false, null)
this._destroy()
return
}
if (response.statusCode >= 500 && response.statusCode < 600) {
this._logger.log(C.LOG_LEVEL.WARN, C.EVENT.AUTH_ERROR, `http auth server error: ${response.body}`)
}
...
_onCacheResponse = function (error, record) { clearTimeout(this._cacheRetrievalTimeout) if (this._isDestroyed === true) { return } if (error) { this._sendError(C.EVENT.RECORD_LOAD_ERROR, `error while loading ${this._recordName} from cache:${error.toString()}`) } else if (record) { this._onComplete(record) } else if ( !this._options.storageExclusion || !this._options.storageExclusion.test(this._recordName) ) { this._storageRetrievalTimeout = setTimeout( this._sendError.bind(this, C.EVENT.STORAGE_RETRIEVAL_TIMEOUT, this._recordName), this._options.storageRetrievalTimeout ) this._options.storage.get(this._recordName, this._onStorageResponse.bind(this)) } else { this._onComplete(null) } }
n/a
_onStorageResponse = function (error, record) {
clearTimeout(this._storageRetrievalTimeout)
if (this._isDestroyed === true) {
return
}
if (error) {
this._sendError(
C.EVENT.RECORD_LOAD_ERROR,
`error while loading ${this._recordName} from storage:${error.toString()}`
)
} else {
this._onComplete(record || null)
if (record) {
/*
* Load record from storage into cache
*/
this._options.cache.set(this._recordName, record, () => {})
}
this._destroy()
}
}
n/a
_sendError = function (event, message) { this._options.logger.log(C.LOG_LEVEL.ERROR, event, message) if (this._socketWrapper) { this._socketWrapper.sendError(C.TOPIC.RECORD, event, message) } if (this._onError) { this._onError(event, message) } this._destroy() }
...
this._triggerEvent(socket, message)
} else if (message.action === C.ACTIONS.LISTEN ||
message.action === C.ACTIONS.UNLISTEN ||
message.action === C.ACTIONS.LISTEN_ACCEPT ||
message.action === C.ACTIONS.LISTEN_REJECT) {
this._listenerRegistry.handle(socket, message)
} else {
this._sendError(socket, C.EVENT.UNKNOWN_ACTION, `unknown action ${message.action}`)
}
}
/**
* Handler for the SUBSCRIBE action. Adds the socket as
* a subscriber to the specified event name
*
...
record_transition = function (name, options, recordHandler) { this._name = name this._options = options this._recordHandler = recordHandler this._steps = [] this._record = null this._currentStep = null this._recordRequest = null this._sendVersionExists = [] this.isDestroyed = false this._pendingUpdates = {} this._ending = false this._storageResponses = 0 this._cacheResponses = 0 this._lastVersion = null this._lastError = null }
n/a
_getRecordConfig = function (message) { let config if (message.action === C.ACTIONS.PATCH && message.data.length === 5) { config = message.data[4] } else if (message.action === C.ACTIONS.UPDATE && message.data.length === 4) { config = message.data[3] } if (!config) { return null } return JSON.parse(config) }
...
version,
upsert: false,
sender: socketWrapper
}
let data
try {
const config = RecordTransition._getRecordConfig(message)
this._applyConfig(config, update)
} catch (e) {
update.sender.sendError(
C.TOPIC.RECORD,
C.EVENT.INVALID_CONFIG_DATA,
message.data[4] || message.data[3]
)
...
_applyConfig = function (config, step) { if (!config) { return } if (config.writeSuccess) { if (this._pendingUpdates[step.sender.uuid] === undefined) { this._pendingUpdates[step.sender.uuid] = { socketWrapper: step.sender, versions: [step.version] } } else { const update = this._pendingUpdates[step.sender.uuid] update.versions.push(step.version) } } if (config.upsert) { step.upsert = true } }
...
upsert: false,
sender: socketWrapper
}
let data
try {
const config = RecordTransition._getRecordConfig(message)
this._applyConfig(config, update)
} catch (e) {
update.sender.sendError(
C.TOPIC.RECORD,
C.EVENT.INVALID_CONFIG_DATA,
message.data[4] || message.data[3]
)
return
...
_flushVersionExists = function () { for (let i = 0; i < this._sendVersionExists.length; i++) { const conflict = this._sendVersionExists[i] this.sendVersionExists(conflict) } this._sendVersionExists = [] }
...
* @param {Object} record
*
* @private
* @returns {void}
*/
RecordTransition.prototype._processRecord = function (record) {
this._record = record
this._flushVersionExists()
this._next()
}
/**
* Once the record is loaded this method is called recoursively
* for every step in the queue of pending updates.
...
_next = function () {
if (this.isDestroyed === true) {
return
}
if (this._steps.length === 0) {
if (this._cacheResponses === 0 && this._storageResponses === 0) {
this.destroy()
}
return
}
this._currentStep = this._steps.shift()
if (this._currentStep.version === -1) {
const message = this._currentStep.message
const version = this._record._v + 1
this._currentStep.version = message.data[1] = version
// Raw message is rebroadcast, needs to be rebuilt with new version number
message.raw = messageBuilder.getMsg(message.topic, message.action, message.data)
}
if (this._record._v !== this._currentStep.version - 1) {
this._cacheResponses--
this.sendVersionExists(this._currentStep)
this._next()
return
}
this._record._v = this._currentStep.version
if (this._currentStep.isPatch) {
(new JsonPath(this._currentStep.path)).setValue(this._record._d, this._currentStep.data)
} else {
this._record._d = this._currentStep.data
}
/*
* Please note: saving to storage is called first to allow for synchronous cache
* responses to destroy the transition, it is however not on the critical path
* and the transition will continue straight away, rather than wait for the storage response
* to be returned.
*
* If the storage response is asynchronous and write acknowledgement is enabled, the transition
* will not be destroyed until writing to storage is finished
*/
if (!this._options.storageExclusion || !this._options.storageExclusion.test(this._name)) {
this._storageResponses++
this._options.storage.set(
this._name,
this._record,
this._onStorageResponse.bind(this, this._currentStep)
)
}
this._options.cache.set(
this._name,
this._record,
this._onCacheResponse.bind(this, this._currentStep)
)
}
...
this._name,
this._options,
socketWrapper,
this._onRecord.bind(this, update),
this._onFatalError.bind(this)
)
} else if (this._steps.length === 1 && this._cacheResponses === 1) {
this._next()
}
}
/**
* Destroys the instance
*
* @private
...
_onCacheResponse = function (currentStep, error) { this._cacheResponses-- this._writeError = this._writeError || error if (error) { this._onFatalError(error) } else if (this.isDestroyed === false) { this._recordHandler._$broadcastUpdate( this._name, this._currentStep.message, false, this._currentStep.sender ) this._next() } else if ( this._cacheResponses === 0 && this._storageResponses === 0 && this._steps.length === 0 ) { this.destroy() } }
n/a
_onFatalError = function (errorMessage) {
if (this.isDestroyed === true) {
/* istanbul ignore next */
return
}
this._options.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.RECORD_UPDATE_ERROR, errorMessage)
for (let i = 0; i < this._steps.length; i++) {
if (this._steps[i].sender !== C.SOURCE_MESSAGE_CONNECTOR) {
this._steps[i].sender.sendError(
C.TOPIC.RECORD,
C.EVENT.RECORD_UPDATE_ERROR,
this._steps[i].version
)
}
}
if (this._cacheResponses === 0 && this._storageResponses === 0) {
this.destroy(errorMessage)
}
}
...
* @param {Object} record
*
* @private
* @returns {void}
*/
RecordTransition.prototype._onRecord = function (step, record) {
if (record === null && !step.upsert) {
this._onFatalError(`Received update for non-existant record ${this._name}`)
} else if (record === null && step.upsert) {
const emptyRecord = {
_v: 0,
_d: {}
}
this._recordHandler._permissionAction(
...
_onRecord = function (step, record) { if (record === null && !step.upsert) { this._onFatalError(`Received update for non-existant record ${this._name}`) } else if (record === null && step.upsert) { const emptyRecord = { _v: 0, _d: {} } this._recordHandler._permissionAction( C.ACTIONS.CREATE, this._name, step.sender, this._processRecord.bind(this, emptyRecord) ) } else { this._processRecord(record) } }
n/a
_onStorageResponse = function (currentStep, error) { this._storageResponses-- this._writeError = this._writeError || error if (error) { this._onFatalError(error) } else if ( this._cacheResponses === 0 && this._storageResponses === 0 && this._steps.length === 0 ) { this.destroy() } }
n/a
_processRecord = function (record) { this._record = record this._flushVersionExists() this._next() }
...
this._recordHandler._permissionAction(
C.ACTIONS.CREATE,
this._name,
step.sender,
this._processRecord.bind(this, emptyRecord)
)
} else {
this._processRecord(record)
}
}
/**
* Callback used to process next update after record successfully returned or permissiom
* check passed
*
...
_sendWriteAcknowledgements = function (errorMessage) { errorMessage = errorMessage === undefined ? null : errorMessage // eslint-disable-line for (const uid in this._pendingUpdates) { const update = this._pendingUpdates[uid] update.socketWrapper.sendMessage(C.TOPIC.RECORD, C.ACTIONS.WRITE_ACKNOWLEDGEMENT, [ this._name, update.versions, messageBuilder.typed(errorMessage) ]) } }
...
* @returns {void}
*/
RecordTransition.prototype.destroy = function (errorMessage) {
if (this.isDestroyed) {
return
}
this._sendWriteAcknowledgements(errorMessage || this._writeError)
this._recordHandler._$transitionComplete(this._name)
this.isDestroyed = true
this._options = null
this._name = null
this._record = null
this._recordHandler = null
this._steps = null
...
add = function (socketWrapper, version, message) { const update = { message, version, upsert: false, sender: socketWrapper } let data try { const config = RecordTransition._getRecordConfig(message) this._applyConfig(config, update) } catch (e) { update.sender.sendError( C.TOPIC.RECORD, C.EVENT.INVALID_CONFIG_DATA, message.data[4] || message.data[3] ) return } if (message.action === C.ACTIONS.UPDATE) { if (message.data.length !== 4 && message.data.length !== 3) { socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, message.raw) return } try { data = JSON.parse(message.data[2]) } catch (e) { socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, message.raw) return } if (!utils.isOfType(data, 'object') && !utils.isOfType(data, 'array')) { socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, message.raw) return } update.isPatch = false update.data = data } if (message.action === C.ACTIONS.PATCH) { if (message.data.length !== 5 && message.data.length !== 4) { socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, message.raw) return } update.isPatch = true update.data = messageParser.convertTyped(message.data[3]) if (update.data instanceof Error) { socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, `${update.data.toString()}:${message.data[3]}`) return } update.path = message.data[2] } if (this._lastVersion !== null && this._lastVersion !== version - 1) { this.sendVersionExists(update) return } if (version !== -1) { this._lastVersion = version } this._cacheResponses++ this._steps.push(update) if (this._recordRequest === null) { this._recordRequest = new RecordRequest( this._name, this._options, socketWrapper, this._onRecord.bind(this, update), this._onFatalError.bind(this) ) } else if (this._steps.length === 1 && this._cacheResponses === 1) { this._next() } }
...
this._locallyProvidedRecords[subscriptionName] = {
socketWrapper,
pattern: message.data[0],
closeListener: this._removeListener.bind(this, socketWrapper, message)
}
socketWrapper.once('close', this._locallyProvidedRecords[subscriptionName].closeListener)
this._clusterProvidedRecords.add(subscriptionName)
this._stopLocalDiscoveryStage(subscriptionName)
}
/**
* Register a client as a listener for record subscriptions
*
...
destroy = function (errorMessage) { if (this.isDestroyed) { return } this._sendWriteAcknowledgements(errorMessage || this._writeError) this._recordHandler._$transitionComplete(this._name) this.isDestroyed = true this._options = null this._name = null this._record = null this._recordHandler = null this._steps = null this._currentStep = null this._recordRequest = null this._pendingUpdates = null this._lastVersion = null this._cacheResponses = 0 this._storageResponses = 0 }
...
}
const msg = messageParser.parse(connectionMessage)[0]
if (msg === null || msg === undefined) {
this._options.logger.log(C.LOG_LEVEL.WARN, C.EVENT.MESSAGE_PARSE_ERROR, connectionMessage)
socketWrapper.sendError(C.TOPIC.CONNECTION, C.EVENT.MESSAGE_PARSE_ERROR, connectionMessage)
socketWrapper.destroy()
} else if (msg.topic !== C.TOPIC.CONNECTION) {
this._options.logger.log(C.LOG_LEVEL.WARN, C.EVENT.INVALID_MESSAGE, `invalid connection message ${connectionMessage}`)
socketWrapper.sendError(C.TOPIC.CONNECTION, C.EVENT.INVALID_MESSAGE, 'invalid connection message')
} else if (msg.action === C.ACTIONS.PONG) {
// do nothing
} else if (msg.action === C.ACTIONS.CHALLENGE_RESPONSE) {
socketWrapper.socket.removeListener('message', socketWrapper.connectionCallback)
...
hasVersion = function (version) { return version !== -1 && version <= this._lastVersion }
...
}
if (isNaN(version)) {
socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_VERSION, [recordName, version])
return
}
if (this._transitions[recordName] && this._transitions[recordName].hasVersion(version)) {
this._transitions[recordName].sendVersionExists({ message, version, sender: socketWrapper })
return
}
if (!this._transitions[recordName]) {
this._transitions[recordName] = new RecordTransition(recordName, this._options, this)
}
...
sendVersionExists = function (step) { const socketWrapper = step.sender const version = step.version const config = step.message.data[4] if (this._record) { const data = config === undefined ? [this._name, this._record._v, JSON.stringify(this._record._d)] : [this._name, this._record._v, JSON.stringify(this._record._d), config] socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.VERSION_EXISTS, data) const msg = `${socketWrapper.user} tried to update record ${this._name} to version ${version} but it already was ${this._record ._v}` this._options.logger.log(C.LOG_LEVEL.WARN, C.EVENT.VERSION_EXISTS, msg) } else { this._sendVersionExists.push({ version, sender: socketWrapper, config, message: step.message }) } }
...
if (isNaN(version)) {
socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_VERSION, [recordName, version])
return
}
if (this._transitions[recordName] && this._transitions[recordName].hasVersion(version)) {
this._transitions[recordName].sendVersionExists({ message, version, sender: socketWrapper
})
return
}
if (!this._transitions[recordName]) {
this._transitions[recordName] = new RecordTransition(recordName, this._options, this)
}
...
rule_application = function (params) { this._params = params this._isDestroyed = false this._runScheduled = false this._maxIterationCount = this._params.permissionOptions.maxRuleIterations this._crossReferenceFn = this._crossReference.bind(this) this._pathVars = this._getPathVars() this._user = this._getUser() this._recordData = {} this._id = Math.random().toString() this._iterations = 0 this._run() }
n/a
_createNewRecordRequest = function (recordName) { // eslint-disable-next-line new RecordRequest( recordName, this._params.options, null, this._onLoadComplete.bind(this, recordName), this._onLoadError.bind(this, recordName) ) }
n/a
_crossReference = function (recordName) { const type = typeof recordName if (type !== UNDEFINED && type !== STRING) { this._onRuleError(`crossreference got unsupported type ${type}`) } else if (type === UNDEFINED || recordName.indexOf(UNDEFINED) !== -1) { return } else if (this._recordData[recordName] === LOADING) { return } else if (this._recordData[recordName] === null) { return null } else if (typeof this._recordData[recordName] === UNDEFINED) { this._loadRecord(recordName) } else { return this._recordData[recordName]._d } }
n/a
_destroy = function () { this._params.recordHandler.removeRecordRequest(this._params.name) this._isDestroyed = true this._runScheduled = false this._params = null this._crossReferenceFn = null this._pathVars = null this._user = null this._recordData = null this._currentData = null }
...
* @private
* @returns {void}
*/
_onComplete (error, response) {
if (error) {
this._logger.log(C.LOG_LEVEL.WARN, C.EVENT.AUTH_ERROR, `http auth error: ${error}`)
this._callback(false, null)
this._destroy()
return
}
if (response.statusCode >= 500 && response.statusCode < 600) {
this._logger.log(C.LOG_LEVEL.WARN, C.EVENT.AUTH_ERROR, `http auth server error: ${response.body}`)
}
...
_getArguments = function () { return [ this._crossReferenceFn, this._user, this._getCurrentData(), this._getOldData(), Date.now(), this._params ? this._params.action : null ].concat(this._pathVars) }
...
}
if (this._iterations > this._maxIterationCount) {
this._onRuleError('Exceeded max iteration count')
return
}
const args = this._getArguments()
let result
if (this._isDestroyed === true) {
return
}
try {
...
_getCurrentData = function () { if (this._params.rule.hasData === false) { return null } const msg = this._params.message let data if (msg.topic === C.TOPIC.EVENT && msg.data[1]) { data = messageParser.convertTyped(msg.data[1]) } else if (msg.topic === C.TOPIC.RPC) { data = messageParser.convertTyped(msg.data[2]) } else if (msg.topic === C.TOPIC.RECORD && msg.action === C.ACTIONS.UPDATE) { data = this._getRecordUpdateData(msg) } else if (msg.topic === C.TOPIC.RECORD && msg.action === C.ACTIONS.PATCH) { data = this._getRecordPatchData(msg) } if (data instanceof Error) { this._onRuleError(`error when converting message data ${data.toString()}`) } else { return data } }
...
* @private
* @returns {void}
*/
RuleApplication.prototype._getArguments = function () {
return [
this._crossReferenceFn,
this._user,
this._getCurrentData(),
this._getOldData(),
Date.now(),
this._params ? this._params.action : null
].concat(this._pathVars)
}
/**
...
_getOldData = function () { if (this._isDestroyed === true || this._params.rule.hasOldData === false) { return null } else if (this._recordData[this._params.name]) { return this._recordData[this._params.name]._d } this._loadRecord(this._params.name) }
...
* @returns {void}
*/
RuleApplication.prototype._getArguments = function () {
return [
this._crossReferenceFn,
this._user,
this._getCurrentData(),
this._getOldData(),
Date.now(),
this._params ? this._params.action : null
].concat(this._pathVars)
}
/**
* Returns the data for the user variable. This is only done once
...
_getPathVars = function () { return this._params.name.match(this._params.regexp).slice(1) }
...
*/
const RuleApplication = function (params) {
this._params = params
this._isDestroyed = false
this._runScheduled = false
this._maxIterationCount = this._params.permissionOptions.maxRuleIterations
this._crossReferenceFn = this._crossReference.bind(this)
this._pathVars = this._getPathVars()
this._user = this._getUser()
this._recordData = {}
this._id = Math.random().toString()
this._iterations = 0
this._run()
}
...
_getRecordPatchData = function (msg) { if (msg.data.length !== 4 || typeof msg.data[2] !== STRING) { return new Error('Invalid message data') } const currentData = this._recordData[this._params.name] const newData = messageParser.convertTyped(msg.data[3]) let jsonPath let data if (newData instanceof Error) { return newData } if (currentData === null) { return new Error(`Tried to apply patch to non-existant record ${msg.data[0]}`) } if (typeof currentData !== UNDEFINED && currentData !== LOADING) { jsonPath = new JsonPath(msg.data[2]) data = JSON.parse(JSON.stringify(currentData._d)) jsonPath.setValue(data, newData) return data } this._loadRecord(this._params.name) }
...
if (msg.topic === C.TOPIC.EVENT && msg.data[1]) {
data = messageParser.convertTyped(msg.data[1])
} else if (msg.topic === C.TOPIC.RPC) {
data = messageParser.convertTyped(msg.data[2])
} else if (msg.topic === C.TOPIC.RECORD && msg.action === C.ACTIONS.UPDATE) {
data = this._getRecordUpdateData(msg)
} else if (msg.topic === C.TOPIC.RECORD && msg.action === C.ACTIONS.PATCH) {
data = this._getRecordPatchData(msg)
}
if (data instanceof Error) {
this._onRuleError(`error when converting message data ${data.toString()}`)
} else {
return data
}
...
_getRecordUpdateData = function (msg) { let data try { data = JSON.parse(msg.data[2]) } catch (error) { return error } return data }
...
let data
if (msg.topic === C.TOPIC.EVENT && msg.data[1]) {
data = messageParser.convertTyped(msg.data[1])
} else if (msg.topic === C.TOPIC.RPC) {
data = messageParser.convertTyped(msg.data[2])
} else if (msg.topic === C.TOPIC.RECORD && msg.action === C.ACTIONS.UPDATE) {
data = this._getRecordUpdateData(msg)
} else if (msg.topic === C.TOPIC.RECORD && msg.action === C.ACTIONS.PATCH) {
data = this._getRecordPatchData(msg)
}
if (data instanceof Error) {
this._onRuleError(`error when converting message data ${data.toString()}`)
} else {
...
_getUser = function () { return { isAuthenticated: this._params.username !== OPEN, id: this._params.username, data: this._params.authData } }
...
const RuleApplication = function (params) {
this._params = params
this._isDestroyed = false
this._runScheduled = false
this._maxIterationCount = this._params.permissionOptions.maxRuleIterations
this._crossReferenceFn = this._crossReference.bind(this)
this._pathVars = this._getPathVars()
this._user = this._getUser()
this._recordData = {}
this._id = Math.random().toString()
this._iterations = 0
this._run()
}
/**
...
_isReady = function () { let isLoading = false for (const key in this._recordData) { if (this._recordData[key] === LOADING) { isLoading = true } } return isLoading === false && this._runScheduled === false }
...
if (this._isDestroyed === true) {
return
}
try {
result = this._params.rule.fn.apply({}, args)
} catch (error) {
if (this._isReady()) {
this._onRuleError(error)
return
}
}
if (this._isReady()) {
this._params.callback(null, result)
...
_loadRecord = function (recordName) {
/* istanbul ignore next */
if (this._recordData[recordName] === LOADING) {
return
}
/* istanbul ignore next */
if (typeof this._recordData[recordName] !== UNDEFINED) {
this._onLoadComplete(recordName, this._recordData[recordName])
return
}
this._recordData[recordName] = LOADING
this._params.recordHandler.runWhenRecordStable(
recordName,
this._createNewRecordRequest.bind(this)
)
}
...
if (typeof currentData !== UNDEFINED && currentData !== LOADING) {
jsonPath = new JsonPath(msg.data[2])
data = JSON.parse(JSON.stringify(currentData._d))
jsonPath.setValue(data, newData)
return data
}
this._loadRecord(this._params.name)
}
/**
* Returns or loads the record's previous value. Only supported for record
* write and read operations
*
* If getData encounters an error, the rule application might already be destroyed
...
_onLoadComplete = function (recordName, data) { this._recordData[recordName] = data if (this._isReady()) { this._runScheduled = true process.nextTick(this._run.bind(this)) } }
...
RuleApplication.prototype._loadRecord = function (recordName) {
/* istanbul ignore next */
if (this._recordData[recordName] === LOADING) {
return
}
/* istanbul ignore next */
if (typeof this._recordData[recordName] !== UNDEFINED) {
this._onLoadComplete(recordName, this._recordData[recordName])
return
}
this._recordData[recordName] = LOADING
this._params.recordHandler.runWhenRecordStable(
recordName,
...
_onLoadError = function (recordName, error) { this._recordData[recordName] = ERROR const errorMsg = `failed to load record ${this._params.name} for permissioning:${error.toString()}` this._params.logger.log(C.LOG_LEVEL.ERROR, C.EVENT.RECORD_LOAD_ERROR, errorMsg) this._params.callback(C.EVENT.RECORD_LOAD_ERROR, false) this._destroy() }
n/a
_onRuleError = function (error) { if (this._isDestroyed === true) { return } const errorMsg = `error when executing ${this._params.rule.fn.toString()}${EOL }for ${this._params.path}: ${error.toString()}` this._params.logger.log(C.LOG_LEVEL.WARN, C.EVENT.MESSAGE_PERMISSION_ERROR, errorMsg) this._params.callback(C.EVENT.MESSAGE_PERMISSION_ERROR, false) this._destroy() }
...
this._iterations++
/* istanbul ignore next */
if (this._isDestroyed === true) {
return
}
if (this._iterations > this._maxIterationCount) {
this._onRuleError('Exceeded max iteration count')
return
}
const args = this._getArguments()
let result
if (this._isDestroyed === true) {
...
_run = function () {
this._runScheduled = false
this._iterations++
/* istanbul ignore next */
if (this._isDestroyed === true) {
return
}
if (this._iterations > this._maxIterationCount) {
this._onRuleError('Exceeded max iteration count')
return
}
const args = this._getArguments()
let result
if (this._isDestroyed === true) {
return
}
try {
result = this._params.rule.fn.apply({}, args)
} catch (error) {
if (this._isReady()) {
this._onRuleError(error)
return
}
}
if (this._isReady()) {
this._params.callback(null, result)
this._destroy()
}
}
...
this._maxIterationCount = this._params.permissionOptions.maxRuleIterations
this._crossReferenceFn = this._crossReference.bind(this)
this._pathVars = this._getPathVars()
this._user = this._getUser()
this._recordData = {}
this._id = Math.random().toString()
this._iterations = 0
this._run()
}
/**
* Runs the rule function. This method is initially called when this class
* is constructed and recoursively from thereon whenever the loading of a record
* is completed
*
...
rule_cache = function (options) { this._options = options this._data = {} setInterval(this._purge.bind(this), options.cacheEvacuationInterval) }
n/a
_purge = function () { for (const key in this._data) { if (this._data[key].isUsed === true) { this._data[key].isUsed = false } else { delete this._data[key] } } }
n/a
_toKey = function (section, name, type) { return `${section}_${name}_${type}` }
...
* @param {String} name the name of the record, event or rpc
* @param {String} type the type of the action, e.g. read, write, subscribe
*
* @public
* @returns {Boolean}
*/
RuleCache.prototype.has = function (section, name, type) {
return !!this._data[this._toKey(section, name, type)]
}
/**
* Resets the usage flag and returns an entry from the cache
*
* @param {String} section e.g. record, event or rpc
* @param {String} name the name of the record, event or rpc
...
get = function (section, name, type) { const key = this._toKey(section, name, type) this._data[key].isUsed = true return this._data[key].rule }
...
*
* @param {String} urlPath URL where to download the archive
* @param {Stream} writeable output stream to save the archive
* @param {Function} callback Callback (err)
* @return {void}
*/
const downloadArchive = function (urlPath, outStream, callback) {
needle.get(`https://github.com${urlPath}`, {
follow_max: 5,
headers: { 'User-Agent': 'nodejs-client' }
}, (error, response) => {
if (error) {
return callback(error)
}
outStream.write(response.body)
...
has = function (section, name, type) { return !!this._data[this._toKey(section, name, type)] }
...
/**
* Used primarily for tests. Returns whether or not a provider exists for
* the specific subscriptionName
* @public
* @returns {boolean}
*/
hasActiveProvider (susbcriptionName) {
return this._clusterProvidedRecords.has(susbcriptionName)
}
/**
* The main entry point to the handle class.
* Called on any of the following actions:
*
* 1) C.ACTIONS.LISTEN
...
reset = function () { this._data = {} }
...
if (validationResult !== true) {
this.emit('error', `invalid permission config - ${validationResult}`)
return
}
this._config = configCompiler.compile(config)
this._ruleCache.reset()
this._ready()
}
/**
* Implements the permissionHandler's canPerformAction interface
* method
*
...
set = function (section, name, type, rule) { this._data[this._toKey(section, name, type)] = { rule, isUsed: true } }
...
*/
ConfigPermissionHandler.prototype.setRecordHandler = function (recordHandler) {
this._recordHandler = recordHandler
}
/**
* Will be called by the dependency initialiser once server.start() is called.
* This gives users a chance to change the path using server.set()
* first
*
* @public
* @returns {void}
*/
ConfigPermissionHandler.prototype.init = function () {
if (this._config === null && this._optionsValid) {
...
parse = function (rule, variables) { if (rule === true || rule === false) { return { fn: rule === true ? function () { return true } : function () { return false }, hasOldData: false, hasData: false } } const ruleObj = {} const args = ['_', 'user', 'data', 'oldData', 'now', 'action'].concat(variables) args.push(`return ${rule};`) ruleObj.fn = Function.apply(this, args) ruleObj.hasOldData = !!rule.match(OLD_DATA_REGEXP) ruleObj.hasData = !!rule.match(DATA_REGEXP) return ruleObj }
...
}
function fetchLibs(libDir, meta) {
const directory = libDir || 'lib'
const files = glob.sync(path.join(directory, '*', 'package.json'))
meta.libs = files.map((filePath) => {
const pkg = fs.readFileSync(filePath, 'utf8')
const object = JSON.parse(pkg)
return `${object.name}:${object.version}`
})
}
...
validate = function (rule, section, type) { if (typeof rule === 'boolean') { return true } if (typeof rule !== 'string') { return 'rule must be a string' } if (rule.length === 0) { return 'rule can\'t be empty' } if (rule.match(NEW_REGEXP)) { return 'rule can\'t contain the new keyword' } if (rule.match(USER_FUNCTION_REGEXP)) { return 'rule can\'t contain user functions' } const functions = rule.match(FUNCTION_REGEXP) let functionName let i // TODO _ cross references are only supported for section record if (functions) { for (i = 0; i < functions.length; i++) { functionName = functions[i].replace(/\s*\($/, '') if (SUPPORTED_FUNCTIONS.indexOf(functionName) === -1) { return `function ${functionName} is not supported` } } } try { // eslint-disable-next-line new Function(rule) } catch (e) { return e.toString() } if (!!rule.match(OLD_DATA_REGEXP) && !rulesMap.supportsOldData(type)) { return `rule ${type} for ${section} does not support oldData` } if (!!rule.match(DATA_REGEXP) && !rulesMap.supportsData(type)) { return `rule ${type} for ${section} does not support data` } return true }
...
*
* @param {Object} config deepstream permissionConfig
*
* @public
* @returns {void}
*/
ConfigPermissionHandler.prototype.useConfig = function (config) {
const validationResult = configValidator.validate(config)
if (validationResult !== true) {
this.emit('error', `invalid permission config - ${validationResult}`)
return
}
this._config = configCompiler.compile(config)
...
getRulesForMessage = function (message) { if (RULES_MAP[message.topic] === undefined) { return null } if (RULES_MAP[message.topic].actions[message.action] === undefined) { return null } return { section: RULES_MAP[message.topic].section, type: RULES_MAP[message.topic].actions[message.action].name, action: actionToKey[message.action] } }
...
ConfigPermissionHandler.prototype.canPerformAction = function (
username, message, callback, authData) {
if (typeof message.data[0] !== STRING) {
callback('invalid message', false)
return
}
const ruleSpecification = rulesMap.getRulesForMessage(message)
const name = message.data[0]
if (ruleSpecification === null) {
callback(null, true)
return
}
...
supportsData = function (type) { return RULE_TYPES[type.toUpperCase()].data }
...
return e.toString()
}
if (!!rule.match(OLD_DATA_REGEXP) && !rulesMap.supportsOldData(type)) {
return `rule ${type} for ${section} does not support oldData`
}
if (!!rule.match(DATA_REGEXP) && !rulesMap.supportsData(type)) {
return `rule ${type} for ${section} does not support data`
}
return true
}
/**
...
supportsOldData = function (type) { return RULE_TYPES[type.toUpperCase()].oldData }
...
try {
// eslint-disable-next-line
new Function(rule)
} catch (e) {
return e.toString()
}
if (!!rule.match(OLD_DATA_REGEXP) && !rulesMap.supportsOldData(type)) {
return `rule ${type} for ${section} does not support oldData`
}
if (!!rule.match(DATA_REGEXP) && !rulesMap.supportsData(type)) {
return `rule ${type} for ${section} does not support data`
}
...
socket_wrapper = function (socket, options) { this.socket = socket this.isClosed = false this.socket.once('close', this._onSocketClose.bind(this)) this._options = options this.user = null this.authCallBack = null this.authAttempts = 0 this.setMaxListeners(0) this.uuid = Math.random() this._handshakeData = null this._setUpHandshakeData() this._queuedMessages = [] this._currentPacketMessageCount = 0 this._sendNextPacketTimeout = null this._currentMessageResetTimeout = null }
n/a
finalizeMessage = function (preparedMessage) { uws.native.server.finalizeMessage(preparedMessage) }
...
*
* @param {External} preparedMessage the prepared message to finalize
*
* @public
* @returns {void}
*/
SocketWrapper.finalizeMessage = function (preparedMessage) {
uws.native.server.finalizeMessage(preparedMessage)
}
/**
* Returns a map of parameters that were collected
* during the initial http request that established the
* connection
*
...
prepareMessage = function (message) { SocketWrapper.lastPreparedMessage = message return uws.native.server.prepareMessage(message, uws.OPCODE_TEXT) }
...
* @param {String} message the message to be prepared
*
* @public
* @returns {External} prepared message
*/
SocketWrapper.prepareMessage = function (message) {
SocketWrapper.lastPreparedMessage = message
return uws.native.server.prepareMessage(message, uws.OPCODE_TEXT)
}
/**
* Sends the [uws] prepared message, or in case of testing sends the
* last prepared message.
*
* @param {External} preparedMessage the prepared message
...
function EventEmitter() { EventEmitter.init.call(this); }
n/a
_onSocketClose = function () { this.isClosed = true this.emit('close', this) this._options.logger.log(C.LOG_LEVEL.INFO, C.EVENT.CLIENT_DISCONNECTED, this.user) this.socket.removeAllListeners() }
n/a
_setUpHandshakeData = function () { this._handshakeData = { remoteAddress: this.socket._socket.remoteAddress } if (this.socket.upgradeReq) { this._handshakeData.headers = this.socket.upgradeReq.headers this._handshakeData.referer = this.socket.upgradeReq.headers.referer } return this._handshakeData }
...
this._options = options
this.user = null
this.authCallBack = null
this.authAttempts = 0
this.setMaxListeners(0)
this.uuid = Math.random()
this._handshakeData = null
this._setUpHandshakeData()
this._queuedMessages = []
this._currentPacketMessageCount = 0
this._sendNextPacketTimeout = null
this._currentMessageResetTimeout = null
}
...
destroy = function () { this.socket.close() this.authCallBack = null }
...
}
const msg = messageParser.parse(connectionMessage)[0]
if (msg === null || msg === undefined) {
this._options.logger.log(C.LOG_LEVEL.WARN, C.EVENT.MESSAGE_PARSE_ERROR, connectionMessage)
socketWrapper.sendError(C.TOPIC.CONNECTION, C.EVENT.MESSAGE_PARSE_ERROR, connectionMessage)
socketWrapper.destroy()
} else if (msg.topic !== C.TOPIC.CONNECTION) {
this._options.logger.log(C.LOG_LEVEL.WARN, C.EVENT.INVALID_MESSAGE, `invalid connection message ${connectionMessage}`)
socketWrapper.sendError(C.TOPIC.CONNECTION, C.EVENT.INVALID_MESSAGE, 'invalid connection message')
} else if (msg.action === C.ACTIONS.PONG) {
// do nothing
} else if (msg.action === C.ACTIONS.CHALLENGE_RESPONSE) {
socketWrapper.socket.removeListener('message', socketWrapper.connectionCallback)
...
getHandshakeData = function () { return this._handshakeData }
...
* @param {Websocket} socket
*
* @private
* @returns {void}
*/
_onConnection (socket) {
const socketWrapper = new SocketWrapper(socket, this._options)
const handshakeData = socketWrapper.getHandshakeData()
const logMsg = `from ${handshakeData.referer} (${handshakeData.remoteAddress})`
let disconnectTimer
this._options.logger.log(C.LOG_LEVEL.INFO, C.EVENT.INCOMING_CONNECTION, logMsg)
if (this._options.unauthenticatedClientTimeout !== null) {
disconnectTimer = setTimeout(
...
send = function (message) { if (message.charAt(message.length - 1) !== C.MESSAGE_SEPERATOR) { message += C.MESSAGE_SEPERATOR // eslint-disable-line } if (this.isClosed === true) { return } this.socket.send(message) }
...
* @param {Boolean} hasProvider send T or F so provided status
* @param {SocketWrapper} socketWrapper the socket wrapper to send to,
* if it doesn't exist then don't do anything
* @param {String} subscriptionName The subscription name which provided status changed
*/
_sendHasProviderUpdateToSingleSubscriber (hasProvider, socketWrapper, subscriptionName) {
if (socketWrapper && this._topic === C.TOPIC.RECORD) {
socketWrapper.send(this._createHasProviderMessage(hasProvider, this._topic, subscriptionName
))
}
}
/**
* Sends a has provider update to all subcribers
* @param {Boolean} hasProvider send T or F so provided status
* @param {String} subscriptionName The subscription name which provided status changed
...
sendError = function (topic, type, msg) { if (this.isClosed === false) { this.send(messageBuilder.getErrorMsg(topic, type, msg)) } }
...
false,
socket
)
}
EventHandler.prototype._sendError = function (socket, event, message) {
if (socket && socket.sendError) {
socket.sendError(C.TOPIC.EVENT, event, message)
}
this._logger.log(C.LOG_LEVEL.ERROR, event, message)
}
/**
* Makes sure that subscription message contains the name of the event. Sends an error to the client
* if not
...
sendMessage = function (topic, action, data) { if (this.isClosed === false) { this.send(messageBuilder.getMsg(topic, action, data)) } }
...
socketWrapper.connectionCallback = this._processConnectionMessage.bind(this, socketWrapper)
socketWrapper.authCallBack = this._authenticateConnection.bind(
this,
socketWrapper,
disconnectTimer
)
socketWrapper.sendMessage(C.TOPIC.CONNECTION, C.ACTIONS.CHALLENGE)
socket.on('message', socketWrapper.connectionCallback)
}
/**
* Always challenges the client that connects. This will be opened up later to allow users
* to put in their own challenge authentication, but requires more work on the clustering
* aspect first.
...
sendNative = function (message) { this.socket.send(message) }
...
while (i < gaps.length) {
message += sharedMessages.substring(lastStop, gaps[i++])
lastStop = gaps[i++]
}
message += sharedMessages.substring(lastStop, sharedMessages.length)
if (message) {
socket.sendNative(message)
}
}
// for all sockets in this subscription name, send either sharedMessage or this socket's
// specialized message. only sockets that sent something will have a special message, all
// other sockets are only listeners and receive the exact same (sharedMessage) message.
const sockets = this._subscriptions.get(name)
...
sendPrepared = function (preparedMessage) { if (this.socket.external) { uws.native.server.sendPrepared(this.socket.external, preparedMessage) } else if (this.socket.external !== null) { this.socket.send(SocketWrapper.lastPreparedMessage) } }
...
* @param {External} preparedMessage the prepared message
*
* @public
* @returns {void}
*/
SocketWrapper.prototype.sendPrepared = function (preparedMessage) {
if (this.socket.external) {
uws.native.server.sendPrepared(this.socket.external, preparedMessage)
} else if (this.socket.external !== null) {
this.socket.send(SocketWrapper.lastPreparedMessage)
}
}
/**
* Variant of send with no particular checks or appends of message.
...
std_out_logger = function (options) { this._options = options || {} this.isReady = true this._$useColors = this._options.colors === undefined ? true : this._options.colors this._logLevelColors = [ 'white', 'green', 'yellow', 'red' ] this._currentLogLevel = C.LOG_LEVEL[this._options.logLevel] || C.LOG_LEVEL.DEBUG }
n/a
log = function (logLevel, event, logMessage) { if (logLevel < this._currentLogLevel) { return } const msg = `${event} | ${logMessage}` let outputStream if (logLevel === C.LOG_LEVEL.ERROR || logLevel === C.LOG_LEVEL.WARN) { outputStream = 'stderr' } else { outputStream = 'stdout' } if (this._$useColors) { process[outputStream].write(msg[this._logLevelColors[logLevel]] + EOL) } else { process[outputStream].write(msg + EOL) } }
...
const fileAuthenticationHandler = new FileAuthenticationHandler(config.auth.options)
fileAuthenticationHandler.createHash(password, (err, hash) => {
if (err) {
console.error('Hash could not be created', err)
process.exit(1)
}
console.log('Password hash:', hash)
})
}
...
setLogLevel = function (logLevel) { this._currentLogLevel = logLevel }
...
}
config.logger = new Logger(configOptions)
if (LOG_LEVEL_KEYS.indexOf(config.logLevel) !== -1) {
// NOTE: config.logLevel has highest priority, compare to the level defined
// in the nested logger object
config.logLevel = C.LOG_LEVEL[config.logLevel]
config.logger.setLogLevel(config.logLevel)
}
}
/**
* Handle the plugins property in the config object the connectors.
* Allowed types: {message|cache|storage}
* Plugins can be passed either as a __path__ property or as a __name__ property with
...
combineEvents = function (emitters, event, callback) { let i let count = 0 const increment = function () { count++ if (count === emitters.length) { callback() } } for (i = 0; i < emitters.length; i++) { emitters[i].once(event, increment) } }
n/a
deepCopy = function (obj) { if (typeof obj === OBJECT) { return JSON.parse(JSON.stringify(obj)) } return obj }
n/a
getRandomIntInRange = function (min, max) { return min + Math.floor(Math.random() * (max - min)) }
...
* @public
* @returns {SocketWrapper|RpcProxy} alternativeProvider
*/
getAlternativeProvider (rpcName, correlationId) {
const rpcData = this._rpcs.get(correlationId)
const subscribers = Array.from(this._subscriptionRegistry.getLocalSubscribers(rpcName))
let index = utils.getRandomIntInRange(0, subscribers.length)
for (let n = 0; n < subscribers.length; ++n) {
if (!rpcData.providers.has(subscribers[index])) {
rpcData.providers.add(subscribers[index])
return subscribers[index]
}
index = (index + 1) % subscribers.length
...
getUid = function () { return `${Date.now().toString(36)}-${(Math.random() * 10000000000000000000).toString(36)}` }
...
const C = require('./constants/constants')
exports.get = function () {
const options = {
/*
* General
*/
serverName: utils.getUid(),
showLogo: true,
logLevel: C.LOG_LEVEL.INFO,
/*
* Connectivity
*/
port: 6020,
...
isOfType = function (input, expectedType) { if (input === null) { return expectedType === 'null' } else if (expectedType === 'array') { return Array.isArray(input) } else if (expectedType === 'url') { return !!url.parse(input).host } return typeof input === expectedType }
...
try {
data = JSON.parse(message.data[2])
} catch (e) {
socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, message.raw)
return
}
if (!utils.isOfType(data, 'object') && !utils.isOfType(data,
x27;array')) {
socketWrapper.sendError(C.TOPIC.RECORD, C.EVENT.INVALID_MESSAGE_DATA, message.raw)
return
}
update.isPatch = false
update.data = data
}
...
merge = function () { const result = {} const objs = Array.prototype.slice.apply(arguments) // eslint-disable-line let i const _merge = (objA, objB) => { let key for (key in objB) { if (objB[key] && objB[key].constructor === Object) { objA[key] = objA[key] || {} _merge(objA[key], objB[key]) } else if (objB[key] !== undefined) { objA[key] = objB[key] } } } for (i = 0; i < objs.length; i++) { _merge(result, objs[i]) } return result }
...
const cliArgs = {}
let key
for (key in defaultOptions.get()) {
cliArgs[key] = typeof argv[key] === 'undefined' ? undefined : argv[key]
}
return utils.merge({ plugins: {} }, defaultOptions.get(), config, cliArgs)
}
/**
* Checks if a config file is present at a given path
*
* @param {String} configPath the path to the config file
*
...
parseJSON = function (text, reviver) { try { return { value: JSON.parse(text, reviver) } } catch (err) { return { error: err } } }
...
const type = value.charAt(0)
if (type === C.TYPES.STRING) {
return value.substr(1)
}
if (type === C.TYPES.OBJECT) {
const result = utils.parseJSON(value.substr(1))
if (result.value) {
return result.value
}
return result.error
}
if (type === C.TYPES.NUMBER) {
...
reverseMap = function (map) { const reversedMap = {} for (const key in map) { reversedMap[map[key]] = key } return reversedMap }
...
'use strict'
const C = require('../constants/constants')
const utils = require('../utils/utils')
const actionToKey = utils.reverseMap(C.ACTIONS)
const RULES_MAP = {}
const RULE_TYPES = {}
/**
* Different rule types support different features. Generally, all rules can
* use cross referencing _() to reference records, but only record writes, incoming events
* or RPC requests carry data and only existing records have a concept of oldData
...
setInterval = function (callback, intervalDuration) { if (intervalDuration !== null) { return setInterval(callback, intervalDuration) } return -1 }
n/a
setTimeout = function (callback, timeoutDuration) { if (timeoutDuration !== null) { return setTimeout(callback, timeoutDuration) } return -1 }
...
* @param {String} leaderServerName The leader of the cluster
* @param {Function} callback The callback to invoke once a response
* from the server is retrieved
* @private
* @returns {void}
*/
_getRemoteLock (name, leaderServerName, callback) {
this._timeouts[name] = utils.setTimeout(
this._onLockRequestTimeout.bind(this, name),
this._options.lockRequestTimeout
)
this._responseEventEmitter.once(name, callback)
const remoteTopic = this._getPrivateTopic(leaderServerName)
...
shuffleArray = function (array) { for (let i = array.length - 1; i > 0; i--) { const j = Math.floor(Math.random() * (i + 1)) const temp = array[i] array[i] = array[j] array[j] = temp } return array }
...
const set = new Set(servers)
set.delete(this._options.serverName)
if (!this._options.shuffleListenProviders) {
return Array.from(set)
}
return utils.shuffleArray(Array.from(set))
}
/**
* Create a map of all the listeners that patterns match the subscriptionName locally
* @param {Object} patterns All patterns currently on this deepstream node
* @param {SusbcriptionRegistry} providerRegistry All the providers currently registered
* @param {String} subscriptionName the subscription to find a provider for
...
spliceRandomElement = function (array) { const randomIndex = exports.getRandomIntInRange(0, array.length) return array.splice(randomIndex, 1)[0] }
n/a
validateMap = function (map, throwError, schema) { let error let key for (key in schema) { if (typeof map[key] === 'undefined') { error = new Error(`Missing key ${key}`) break } if (!exports.isOfType(map[key], schema[key])) { error = new Error(`Invalid type ${typeof map[key]} for ${key}`) break } } if (error) { if (throwError) { throw error } else { return error } } else { return true } }
...
* @param {Object} settings
*
* @private
* @returns {void}
*/
_validateSettings (settings) { // eslint-disable-line
if (!settings.hash) {
utils.validateMap(settings, true, {
path: 'string'
})
return
}
utils.validateMap(settings, true, {
path: 'string',
...