Subscriptions / WebSockets
info
Subscriptions & WebSockets are in beta, alpha & might change without a major version bump. However, feel free to use them and report any issue you may find on GitHub
#
Using Subscriptionstip
- For a full-stack example have a look at /examples/next-prisma-starter-websockets.
- For a bare-minumum Node.js example see /examples/standalone-server.
#
Adding a subscription procedureimport * as trpc from '@trpc/server';import { EventEmitter } from 'events';
// create a global event emitter (could be replaced by redis, etc)const ee = new EventEmitter()
trpc.router() .subscription('onAdd', { resolve({ ctx }) { // `resolve()` is triggered for each client when they start subscribing `onAdd`
// return a `Subscription` with a callback which is triggered immediately return new trpc.Subscription<Post>((emit) => { const onAdd = (data: Post) => { // emit data to client emit.data(data) };
// trigger `onAdd()` when `add` is triggered in our event emitter ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing return () => { ee.off('add', onAdd); }; }); }, }) .mutation('add', { input: z.object({ id: z.string().uuid().optional(), text: z.string().min(1), }), async resolve({ ctx, input }) { const post = { ...input } /* [..] add to db */
ee.emit('add', post); return post; }, })
#
Creating a WebSocket-serveryarn add ws
import ws from 'ws';import { applyWSSHandler } from '@trpc/server/adapters/ws';import { appRouter } from './routers/app';import { createContext } from './trpc';const wss = new ws.Server({ port: 3001,});const handler = applyWSSHandler({ wss, router: appRouter, createContext });
wss.on('connection', (ws) => { console.log(`โโ Connection (${wss.clients.size})`); ws.once('close', () => { console.log(`โโ Connection (${wss.clients.size})`); });});console.log('โ
WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => { console.log('SIGTERM'); handler.broadcastReconnectNotification(); wss.close();});
TRPCClient
to use WebSockets#
Setting tip
You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets.
import { createWSClient, wsLink } from '@trpc/client/links/wsLink';import { httpBatchLink } from '@trpc/client/links/httpBatchLink';
// create persistent WebSocket connectionconst wsClient = createWSClient({ url: `ws://localhost:3001`,});
// configure TRPCClient to use WebSockets transportconst client = createTRPCClient<AppRouter>({ links: [ wsLink({ client: wsClient, }), ],});
#
Using ReactSee /examples/next-prisma-starter-websockets.
#
WebSockets RPC SpecificationYou can read more details by drilling into the TypeScript definitions:
query
/ mutation
#
#
Request{ id: number | string; jsonrpc?: '2.0'; method: 'query' | 'mutation'; params: { path: string; input?: unknown; // <-- pass input of procedure, serialized by transformer };}
#
Response... below, or an error.
{ id: number | string; jsonrpc: '2.0'; result: { type: 'data'; // always 'data' for mutation / queries data: TOutput; // output from procedure };}
subscription
/ subscription.stop
#
#
Start a subscription{ id: number | string; jsonrpc?: '2.0'; method: 'subscription'; params: { path: string; input?: unknown; // <-- pass input of procedure, serialized by transformer };}
subscription.stop
#
To cancel a subscription, call { id: number | string; // <-- id of your created subscription jsonrpc?: '2.0'; method: 'subscription.stop';}
#
Subscription response shape... below, or an error.
{ id: number | string; jsonrpc: '2.0'; result: ( | { type: 'data'; data: TData; // subscription emitted data } | { type: 'started'; // sub started } | { type: 'stopped'; // sub stopped } )}
#
ErrorsSee https://www.jsonrpc.org/specification#error_object or Error Formatting.
#
Notifications from Server to Client{id: null, type: 'reconnect' }
#
Tells clients to reconnect before shutting down server. Invoked by wssHandler.broadcastReconnectNotification()
.