Skip to content

Commit

Permalink
dedicated KV storage
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Feb 28, 2020
1 parent 9f45900 commit 934461c
Show file tree
Hide file tree
Showing 21 changed files with 672 additions and 283 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The Web Application Message Server goal is to provide durable message source for

Message router has pluggable interface to the several message protocols. As for now it could interact by
* [WAMP V2 Basic Profile](http://wamp-proto.org/)
* [MQTT](http://mqtt.org/)
* [MQTT 3.1](http://mqtt.org/)

It means that event could be send through MQTT interface and handled by WAMP client.

Expand Down Expand Up @@ -90,11 +90,12 @@ has to maintain persistence of keys and provide the value as immediate first
message for the subscription. And here what could be implemented

```javascript
publish('the.key', [ 'args' ], { kwArgs: false }, {
publish('the.key', [ 'args' ], { kwArgs: true }, {
retain: 100,
weak: 'public',
// weak: 'public',
when: { status: 'started' },
watch: false
will: { value: 'to', publish: 'at', session: 'disconnect' }
})
```

Expand Down
10 changes: 5 additions & 5 deletions bin/with_auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ const WampAuth = function () {
this.ticket_auth = function (realmName, secureDetails, secret, extra, cb) {
console.log('TICKET_AUTH:', secureDetails, secret, extra)
app.getRealm(realmName, (realm) => {
let api = realm.wampApi()
const api = realm.wampApi()
let found = false
api.subscribe('sys.user.info.'+secureDetails.authid, (id, args, kwargs) => {
api.subscribe('sys.user.info.' + secureDetails.authid, (id, args, kwargs) => {
if (kwargs.password === secret) {
cb(undefined, kwargs)
} else {
cb(new Error('authentication_failed'))
}
found = true
}).then((subId) => {
}, { retained: true }).then((subId) => {
if (!found) {
cb(new Error('authentication_failed'))
}
Expand Down Expand Up @@ -57,8 +57,8 @@ const WampAuth = function () {
app.getRealm('realm1', function (realm) {
var api = realm.wampApi()
// create demo database
api.publish('sys.user.info.joe', [], { role: 'user', password: 'joe-secret' }, { retain:true })
api.publish('sys.user.info.admin', [], { role: 'admin', password: 'admin-secret' }, { retain:true })
api.publish('sys.user.info.joe', [], { role: 'user', password: 'joe-secret' }, { retain: true })
api.publish('sys.user.info.admin', [], { role: 'admin', password: 'admin-secret' }, { retain: true })
})

console.log('Listening port:', program.port)
Expand Down
46 changes: 46 additions & 0 deletions ext/pubstate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const QueueClient = require('../lib/hyper/queueClient').QueueClient
const NetTransport = require('../lib/hyper/net_transport')

const conf_realm_host = process.env.FC_REALM_HOST || '127.0.0.1'
const conf_realm_port = process.env.FC_REALM_PORT || 9300

let client = new QueueClient()

let socket = NetTransport.createClientSocket(client)
let maxPrefix = undefined
let maxDate = undefined
let maxSegment = undefined

async function worker () {
await new Promise((resolve, reject) => {
socket.connect(conf_realm_port, conf_realm_host, function () {
resolve()
})
})

let loginDetails = await client.login({})
console.log('loginDetails', loginDetails)

await client.trace('getNewSegment', (data, task) => {
let newPrefix = data.date + data.segment
if (!maxPrefix || maxPrefix < newPrefix) {
maxPrefix = newPrefix
maxDate = data.date
maxSegment = data.segment
}

let result = { date: maxDate, segment: maxSegment }
console.log('segment', maxPrefix, data, result)

client.push('takeNewSegment', result)
task.resolve({})
})
}

worker().then(function (value) {
console.log('worker OK:', value)
}, function (err) {
console.error('ERROR:', err, err.stack)
})
9 changes: 5 additions & 4 deletions lib/base_gate.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict'

const RealmError = require('./realm_error').RealmError
const errorCodes = require('./realm_error').errorCodes
const Session = require('./session')

Expand Down Expand Up @@ -37,9 +36,11 @@ class BaseGate {
return (typeof this._authHandler !== 'undefined' && typeof this._authHandler.authorize === 'function')
}

checkAuthorize (ctx, funcClass, uri, id) {
if (this.isAuthorizeRequired() && !this._authHandler.authorize(ctx.getSession(), funcClass, uri)) {
ctx.error(id, errorCodes.ERROR_AUTHORIZATION_FAILED)
checkAuthorize (ctx, cmd, funcClass) {
if (this.isAuthorizeRequired() &&
!this._authHandler.authorize(ctx.getSession(), funcClass, cmd.uri))
{
ctx.sendError(cmd, errorCodes.ERROR_AUTHORIZATION_FAILED)
return false
}
return true
Expand Down
6 changes: 0 additions & 6 deletions lib/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
// sender.send(msg, callback)
// sender.close(code, reason)

const RealmError = require('./realm_error').RealmError

class Context {
constructor (router, session, sender) {
this.router = router
Expand All @@ -29,10 +27,6 @@ class Context {
emit (event, message, data) {
this.router.emit(event, this.session, message, data)
}

error(id, code, msg) {
throw new RealmError(this.getId(), code, msg)
}
}

module.exports = Context
77 changes: 77 additions & 0 deletions lib/hyper/net_transport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
'use strict'

const net = require('net')
const msgpack = require('msgpack-lite')
const { SESSION_TX, SESSION_RX, SESSION_WARNING } = require('../messages')

function ServerNetSender (socket, session, router) {
// var encodeStream = msgpack.createEncodeStream()
// encodeStream.pipe(socket)
this.send = function (msg, callback) {
router.emit(SESSION_TX, session, JSON.stringify(msg))
socket.write(msgpack.encode(msg))
// encodeStream.write(msg);
// encodeStream.end(); does not sending without end, but disconnections
}

this.close = function (code, reason) {
socket.end()
}
}

function NetServer (gate, options) {
let router = gate.getRouter()
let _server = net.Server(function (socket) {
let session = gate.createSession()
let sender = new ServerNetSender(socket, session, router)
let decodeStream = msgpack.createDecodeStream()

socket.pipe(decodeStream).on('data', function (msg) {
let ctx = gate.createContext(session, sender)
try {
router.emit(SESSION_RX, session, JSON.stringify(msg))
session.handle(ctx, msg)
} catch (e) {
router.emit(SESSION_WARNING, session, 'invalid message', msg)
session.close(1003, 'protocol violation')
console.log(e)
}
})

socket.on('end', function () {
})

socket.on('close', function () {
session.cleanup()
})
})
_server.listen(options)

return _server
}

function ClientNetSender (socket) {
this.send = function (msg, callback) {
socket.write(msgpack.encode(msg))
}

this.close = function (code, reason) {
socket.end()
}
}

function createClientSocket (client) {
var socket = new net.Socket()
var sender = new ClientNetSender(socket)
client.sender = sender

let decodeStream = msgpack.createDecodeStream()

socket.pipe(decodeStream).on('data', function (msg) {
client.handle(msg)
})
return socket
}

exports.NetServer = NetServer
exports.createClientSocket = createClientSocket
88 changes: 88 additions & 0 deletions lib/memkv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
'use strict'

const { match, extract, defaultParse, restoreUri } = require('./topic_pattern')
const errorCodes = require('./realm_error').errorCodes
const KeyValueStorageAbstract = require('./realm').KeyValueStorageAbstract

class MemKeyValueStorage extends KeyValueStorageAbstract {
constructor () {
super()
this._keyDb = new Map()
}

getKey (uri, cbRow) {
return new Promise((resolve, reject) => {
for (let [key, item] of this._keyDb) {
const aKey = defaultParse(key)
if (match(aKey, uri)) {
cbRow(aKey, item[1] /* data */)
}
}
resolve()
})
}

setKeyActor (actor) {
const opt = actor.getOpt()
const suri = restoreUri(extract(actor.getUri(), this.getUriPattern()))
const data = actor.getData()

const pub = () => {
if (actor) {
this.pubActor(actor)
}
if (data === null) {
this._keyDb.delete(suri)
} else {
this._keyDb.set(suri, [actor.getSid(), data])
}
}
const row = this._keyDb.get(suri)

if ('when' in opt) {
if (opt.when === null) {

if (undefined === row) {
pub()
return
}
const [sid, val, when] = row

if (val === null) {
pub()
return
}

actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'Found value is not empty')
return
}

if (undefined !== row) {
const [sid, val, when] = row
if (val !== null) {
pub()
return
}
}
actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'Value is empty')
return
}

pub()
}

removeSession (sessionId) {
let toRemove = []
for (let key in this._keyDb) {
const keySessionId = this._keyDb.get(key)[0]
if (keySessionId === sessionId) {
toRemove.push(key)
}
}
for (let i = 0; i < toRemove.length; i++) {
this.setKeyData(toRemove[i], null)
}
}
}

exports.MemKeyValueStorage = MemKeyValueStorage
Loading

0 comments on commit 934461c

Please sign in to comment.