How to detect a subscription has already been initialised? #379
-
Hi! I'm trying to get my context into a place where it can detect if async iterator has been returned for a specific combination of query /input parameters. Using this I was hoping to avoid any further event listeners being attached to the subscription. Why? performance mostly. I only need subscriptions to be initialsied, the responses are distributed via another pubsub mechanism. I'm probably not thinking about this correctly either but getting a clear diagram is quite tricky (I feel a contribution coming on!) Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
You can probably do something like this, you will need to provide your own Note: I did not run this code, see it as pseude code. import type { subscribe, ExecutionArgs, ExecutionResult } from 'graphql';
import { Repeater, Push, Stop } from '@repeaterjs/repeater';
type Record = {
source: AsyncGenerator<ExecutionResult, void, void>;
handlers: Set<{ next: Push<ExecutionResult>; stop: Stop }>;
start: () => Promise<void>;
};
function makeSubscribe(
subscribeFn: typeof subscribe,
/** Function for building a unique key for a subscription "process" */
buildKeyFromArgs: (args: ExecutionArgs) => string
): typeof subscribe {
const cache = new Map<string, Record>();
return async function newSubscribe(
args: ExecutionArgs
): Promise<AsyncGenerator<ExecutionResult, void, void> | ExecutionResult> {
const key = buildKeyFromArgs(args);
let cachedEntry = cache.get(key);
if (cachedEntry === undefined) {
const source = await subscribeFn(args);
if (Symbol.asyncIterator in source === false) {
return source;
}
cachedEntry = {
source: source as AsyncGenerator<ExecutionResult, void, void>,
handlers: new Set(),
start: async function () {
try {
for await (const value of cachedEntry!.source) {
for (const handler of cachedEntry!.handlers) {
await handler.next(value);
}
}
} finally {
for (const handler of cachedEntry!.handlers) {
handler.stop();
}
}
},
};
cache.set(key, cachedEntry);
}
return new Repeater<ExecutionResult>((next, done) => {
const handlers = { next, stop: done };
done.then(() => {
cachedEntry!.handlers.delete(handlers);
if (cachedEntry!.handlers.size === 0) {
cachedEntry!.source.return?.();
cache.delete(key);
}
});
cachedEntry!.handlers.add(handlers);
if (cachedEntry!.handlers.size === 1) {
cachedEntry!.start();
}
});
};
} |
Beta Was this translation helpful? Give feedback.
-
I don't quite understand what you're trying to achieve, so I'll just literally answer your requirement:
Here's an example of how you may achieve that (untested code ahead): import { ExecutionArgs } from 'graphql';
import { OperationResult } from 'graphql-ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { WebSocketServer } from 'ws';
function onCompleteNotifier(args: ExecutionArgs, operation: OperationResult) {
// distribute event however you see fit
}
// map of IDs to GraphQL operations (with original execution arguments)
const idToOp: Record<
string,
{ args: ExecutionArgs; operation: OperationResult }
> = {};
useServer(
{
// ...
onOperation: (_ctx, msg, args, operation) => {
idToOp[msg.id] = {
args, // execution arguments used for the operation
operation, // operation result (the return value of graphql.execute / graphql.subscribe). will be an async iterator for subscriptions
};
},
onComplete: (_ctx, msg) => {
const op = idToOp[msg.id];
delete idToOp[msg.id];
// graphql.operation completed OR graphql.subscription async iterator returned
onCompleteNotifier(op.args, op.operation);
},
},
new WebSocketServer({
port: 4000,
path: '/graphql',
}),
);
console.log('Listening to port 4000'); |
Beta Was this translation helpful? Give feedback.
-
thanks for the replies. I’ve come from apollo-server-express (2.7), I’m not really recognising these setup methods for ws. Are these from Apollo server 3? |
Beta Was this translation helpful? Give feedback.
I don't quite understand what you're trying to achieve, so I'll just literally answer your requirement:
Here's an example of how you may achieve that (untested code ahead):