Skip to content

Commit

Permalink
Optimization of client determination for kind ws
Browse files Browse the repository at this point in the history
  • Loading branch information
oklemenz2 committed Sep 20, 2024
1 parent 47f6cc3 commit b57b5a1
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added

- Include or exclude defined list of users
- Optimization of client determination for kind `ws`
- Allow empty PCP message in event definition
- Improve documentation

Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,21 +286,21 @@ In case of execution errors, the event broadcast is retried automatically, while

#### Current User

Events are broadcast to all websocket clients, including clients established in context of current event context user.
Events are broadcast to all websocket clients, including clients established in context of current context user.
To influence event broadcasting based on current context user, the annotation `@websocket.user` or `@ws.user` is available on
event level and event element level (alternatives include `@websocket.broadcast.user` or `@ws.broadcast.user`):

Valid annotation values are:

- **Event level**:
- `'includeCurrent'`: Current event context user is statically included everytime during broadcasting to websocket clients.
- `'includeCurrent'`: Current context user is statically included everytime during broadcasting to websocket clients.
Only websocket clients established in context to that user are respected during event broadcast.
- `'excludeCurrent'`: Current event context user is statically excluded everytime during broadcasting to websocket clients.
- `'excludeCurrent'`: Current context user is statically excluded everytime during broadcasting to websocket clients.
All websocket clients established in context to that user are not respected during event broadcast.
- **Event element level**:
- `'includeCurrent'`: Current event context user is dynamically included during broadcasting to websocket clients,
- `'includeCurrent'`: Current context user is dynamically included during broadcasting to websocket clients,
based on the value of the annotated event element. If truthy, only websocket clients established in context to that user are respected during event broadcast.
- `'excludeCurrent'`: Current event context user is dynamically excluded during broadcasting to websocket clients,
- `'excludeCurrent'`: Current context user is dynamically excluded during broadcasting to websocket clients,
based on the value of the annotated event element. If truthy, all websocket clients established in context to that user are not respected during event broadcast.

Furthermore, also additional equivalent annotations alternatives are available:
Expand Down Expand Up @@ -330,7 +330,7 @@ event received {
}
```

Event is published only to websocket clients established in context to the current event context user.
Event is published only to websocket clients established in context to the current context user.

**Entity Element Level:**

Expand All @@ -343,7 +343,7 @@ event received {
}
```

Event is published only to websocket clients established in context to the current event context user, if the event data of `flag` is falsy.
Event is published only to websocket clients established in context to the current context user, if the event data of `flag` is falsy.

#### Defined Users

Expand Down
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"@cap-js-community/websocket": "./",
"@cap-js/sqlite": "^1.7.3",
"@eslint/js": "^9.10.0",
"@sap/cds": "^8.2.2",
"@sap/cds": "^8.2.3",
"@sap/cds-dk": "^8.2.2",
"@socket.io/redis-adapter": "^8.3.0",
"@socket.io/redis-streams-adapter": "^0.2.2",
Expand Down
99 changes: 97 additions & 2 deletions src/socket/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ class SocketServer {

/**
* Return format instance for service
* @param {Object }service Service definition
* @param {String} origin Origin format, e.g. 'json'
* @param {Object} service Service definition
* @param {String} [origin] Origin format, e.g. 'json'
* @returns {*}
*/
format(service, origin) {
Expand All @@ -377,13 +377,108 @@ class SocketServer {

/**
* Default path for websocket service
* @param {String} service Service path
*/
defaultPath(service) {
if (service.path.startsWith(this.path)) {
return service.path.substring(this.path.length);
}
return service.path;
}

/**
* Gets key value from Map and initializes key with init value if not found
* @param {Map} map Map
* @param {String} key Key to get
* @param {*)} init Initial value
* @returns {*} Value
*/
getFromMap(map, key, init) {
let entry = map.get(key);
if (entry === undefined) {
entry = init;
map.set(key, entry);
}
return entry;
}

/**
* Add value to a Set for key of Map
* @param {Map<String,Set>} map Map
* @param {String} key Key to get
* @param {*)} value Add value
*/
addToSetOfMap(map, key, value) {
return this.getFromMap(map, key, new Set()).add(value);
}

/**
* Delete value from a Set for key of Map
* @param {Map<String,Set>} map Map
* @param {String} key Key to get
* @param {*)} value Delete value
*/
deleteFromSetOfMap(map, key, value) {
let set = map.get(key);
if (set !== undefined) {
set.delete(value);
if (set.size === 0) {
map.delete(key);
}
}
}

/**
* Collect values from Map based on keys
* @param {Map<String, Array<Set>>} map Map
* @param {Array} keys Keys to include values from
*/
collectFromMap(map, keys) {
const result = new Set();
if (!map || !keys?.length) {
return result;
}
for (const key of keys) {
const set = map.get(key);
if (set !== undefined) {
for (const entry of set) {
result.add(entry);
}
}
}
return result;
}

/**
* Collect values from Set based on check
* @param {Set} set Set
* @param {function} check Check to be performed for entry
*/
collectFromSet(set, check) {
const result = new Set();
if (!set) {
return result;
}
for (const entry of set) {
if (check(entry)) {
result.add(entry);
}
}
return result;
}

/**
* Keep entries from set
* @param set Set to be filtered
* @param keepSet Entries from set to keep, others are removed
*/
keepEntriesFromSet(set, keepSet) {
for (const entry of set) {
if (!keepSet.has(entry)) {
set.delete(entry);
}
}
}
}

module.exports = SocketServer;
118 changes: 87 additions & 31 deletions src/socket/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ class SocketWSServer extends SocketServer {
const servicePath = `${this.path}${path}`;
const format = this.format(service);
this.services[servicePath] = (ws, request) => {
ws.request = request;
ws.events = new Map();
ws.contexts = new Set();
DEBUG?.("Connected");
this.onInit(ws, request);
DEBUG?.("Initialized");
ws.on("close", () => {
ws.events.clear();
ws.contexts.clear();
this.onDisconnect(ws);
DEBUG?.("Disconnected");
});
ws.on("error", (err) => {
Expand All @@ -52,7 +49,7 @@ class SocketWSServer extends SocketServer {
ws.on("message", async (message) => {
const payload = format.parse(message);
try {
for (const callback of ws.events.get(payload?.event) || []) {
for (const callback of this.getFromMap(ws.events, payload?.event, new Set())) {
await callback(payload.data);
}
} catch (err) {
Expand All @@ -71,12 +68,7 @@ class SocketWSServer extends SocketServer {
return ws.context;
},
on: (event, callback) => {
let callbacks = ws.events.get(event);
if (!callbacks) {
callbacks = [];
ws.events.set(event, callbacks);
}
callbacks.push(callback);
this.addToSetOfMap(ws.events, event, callback);
},
emit: async (event, data) => {
await ws.send(format.compose(event, data));
Expand Down Expand Up @@ -109,9 +101,13 @@ class SocketWSServer extends SocketServer {
},
enter: async (context) => {
ws.contexts.add(context);
const clients = this.fetchClients(ws.context.tenant, ws.request?.url);
this.addToSetOfMap(clients.contexts, context, ws);
},
exit: async (context) => {
ws.contexts.delete(context);
const clients = this.fetchClients(ws.context.tenant, ws.request?.url);
this.deleteFromSetOfMap(clients.contexts, context, ws);
},
disconnect() {
ws.close();
Expand All @@ -121,7 +117,9 @@ class SocketWSServer extends SocketServer {
},
};
ws.context.ws = { service: ws.facade, socket: ws };
this.onConnect(ws, request);
connected && connected(ws.facade);
DEBUG?.("Connected");
} catch (err) {
LOG?.error(err);
}
Expand All @@ -144,27 +142,40 @@ class SocketWSServer extends SocketServer {
path = path || this.defaultPath(service);
tenant = tenant || socket?.context.tenant;
const servicePath = `${this.path}${path}`;
const clients = [];
this.wss.clients.forEach((client) => {
if (
client !== socket &&
client.readyState === WebSocket.OPEN &&
client.request?.url === servicePath &&
client.context.tenant === tenant &&
(!user?.include?.length || user.include.includes(client.context.user?.id)) &&
(!user?.exclude?.length || !user.exclude.includes(client.context.user?.id)) &&
(!contexts?.length || contexts.find((context) => client.contexts.has(context))) &&
(!identifier?.include?.length || identifier.include.includes(client.request?.queryOptions?.id)) &&
(!identifier?.exclude?.length || !identifier.exclude.includes(client.request?.queryOptions?.id))
) {
clients.push(client);
}
});
if (clients.length > 0) {
const serviceClients = this.fetchClients(tenant, servicePath);
const clients = new Set(serviceClients.all);
if (contexts?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.contexts, contexts));
}
if (user?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.users, user?.include));
}
if (identifier?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.identifiers, identifier?.include));
}
if (user?.exclude?.length) {
this.keepEntriesFromSet(
clients,
this.collectFromSet(clients, (client) => {
return !user?.exclude.includes(client.context.user?.id);
}),
);
}
if (identifier?.exclude?.length) {
this.keepEntriesFromSet(
clients,
this.collectFromSet(clients, (client) => {
return !identifier?.exclude.includes(client.request?.queryOptions?.id);
}),
);
}
if (clients.size > 0) {
const format = this.format(service);
const clientMessage = format.compose(event, data);
for (const client of clients) {
await client.send(clientMessage);
if (client !== socket && client.readyState === WebSocket.OPEN) {
await client.send(clientMessage);
}
}
}
if (!local) {
Expand All @@ -183,6 +194,51 @@ class SocketWSServer extends SocketServer {
}
}

onInit(ws, request) {
ws.request = request;
ws.events = new Map(); // Map<event, Set<callback>>
ws.contexts = new Set(); // Set<context>
}

onConnect(ws) {
const clients = this.fetchClients(ws.context.tenant, ws.request?.url);
clients.all.add(ws);
if (ws.context.user?.id) {
this.addToSetOfMap(clients.users, ws.context.user?.id, ws);
}
if (ws.request?.queryOptions?.id) {
this.addToSetOfMap(clients.identifiers, ws.request?.queryOptions?.id, ws);
}
}

onDisconnect(ws) {
ws.events.clear();
ws.contexts.clear();
const clients = this.fetchClients(ws.context?.tenant, ws.request?.url);
clients.all.delete(ws);
if (ws.context?.user?.id) {
this.deleteFromSetOfMap(clients.users, ws.context?.user?.id, ws);
}
for (const [key] of clients.contexts) {
this.deleteFromSetOfMap(clients.contexts, key, ws);
}
if (ws.request?.queryOptions?.id) {
this.deleteFromSetOfMap(clients.identifiers, ws.request?.queryOptions?.id, ws);
}
}

fetchClients(tenant, service) {
this.wss.cdsClients ??= new Map(); // Map<tenant, Map<service,...>>
const initTenantClients = new Map(); // Map<service, {all,users,contexts,identifiers}>
const serviceClients = this.getFromMap(this.wss.cdsClients, tenant, initTenantClients);
return this.getFromMap(serviceClients, service, {
all: new Set(), // Set<client>
users: new Map(), // Map<user, Set<client>>
contexts: new Map(), // Map<context, Set<client>>
identifiers: new Map(), // Map<identifier, Set<client>>
});
}

async applyAdapter() {
try {
const config = { ...this.config?.adapter };
Expand Down
Loading

0 comments on commit b57b5a1

Please sign in to comment.