Awesome Open Source
Awesome Open Source

GraphQLOverWebSocket

Coherent, zero-dependency, lazy, simple, GraphQL over WebSocket Protocol compliant server and client.

Continuous integration graphql-ws

Use Server-Sent Events (SSE) instead? Check out graphql-sse!


Getting started

Install

yarn add graphql-ws

Create a GraphQL schema

import { GraphQLSchema, GraphQLObjectType, GraphQLString } from 'graphql';

/**
 * Construct a GraphQL schema and define the necessary resolvers.
 *
 * type Query {
 *   hello: String
 * }
 * type Subscription {
 *   greetings: String
 * }
 */
export const schema = new GraphQLSchema({
  query: new GraphQLObjectType({
    name: 'Query',
    fields: {
      hello: {
        type: GraphQLString,
        resolve: () => 'world',
      },
    },
  }),
  subscription: new GraphQLObjectType({
    name: 'Subscription',
    fields: {
      greetings: {
        type: GraphQLString,
        subscribe: async function* () {
          for (const hi of ['Hi', 'Bonjour', 'Hola', 'Ciao', 'Zdravo']) {
            yield { greetings: hi };
          }
        },
      },
    },
  }),
});

Start the server

With ws
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './previous-step';

const server = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer({ schema }, server);

console.log('Listening to port 4000');
With uWebSockets.js
import uWS from 'uWebSockets.js'; // yarn add [email protected]/uWebSockets.js#<tag>
import { makeBehavior } from 'graphql-ws/lib/use/uWebSockets';
import { schema } from './previous-step';

uWS
  .App()
  .ws('/graphql', makeBehavior({ schema }))
  .listen(4000, (listenSocket) => {
    if (listenSocket) {
      console.log('Listening to port 4000');
    }
  });
With @fastify/websocket
import Fastify from 'fastify'; // yarn add fastify
import fastifyWebsocket from '@fastify/websocket'; // yarn add @fastify/websocket
import { makeHandler } from 'graphql-ws/lib/use/@fastify/websocket';
import { schema } from './previous-step';

const fastify = Fastify();
fastify.register(fastifyWebsocket);

fastify.register(async (fastify) => {
  fastify.get('/graphql', { websocket: true }, makeHandler({ schema }));
});

fastify.listen(4000, (err) => {
  if (err) {
    fastify.log.error(err);
    return process.exit(1);
  }
  console.log('Listening to port 4000');
});

Use the client

import { createClient } from 'graphql-ws';

const client = createClient({
  url: 'ws://localhost:4000/graphql',
});

// query
(async () => {
  const result = await new Promise((resolve, reject) => {
    let result;
    client.subscribe(
      {
        query: '{ hello }',
      },
      {
        next: (data) => (result = data),
        error: reject,
        complete: () => resolve(result),
      },
    );
  });

  expect(result).toEqual({ hello: 'Hello World!' });
})();

// subscription
(async () => {
  const onNext = () => {
    /* handle incoming values */
  };

  let unsubscribe = () => {
    /* complete the subscription */
  };

  await new Promise((resolve, reject) => {
    unsubscribe = client.subscribe(
      {
        query: 'subscription { greetings }',
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });

  expect(onNext).toBeCalledTimes(5); // we say "Hi" in 5 languages
})();

Recipes

๐Ÿ”— Client usage with Promise
import { createClient, SubscribePayload } from 'graphql-ws';

const client = createClient({
  url: 'ws://hey.there:4000/graphql',
});

async function execute<T>(payload: SubscribePayload) {
  return new Promise<T>((resolve, reject) => {
    let result: T;
    client.subscribe<T>(payload, {
      next: (data) => (result = data),
      error: reject,
      complete: () => resolve(result),
    });
  });
}

// use
(async () => {
  try {
    const result = await execute({
      query: '{ hello }',
    });
    // complete
    // next = result = { data: { hello: 'Hello World!' } }
  } catch (err) {
    // error
  }
})();
๐Ÿ”— Client usage with AsyncIterator
import { createClient, SubscribePayload } from 'graphql-ws';

const client = createClient({
  url: 'ws://iterators.ftw:4000/graphql',
});

function subscribe<T>(payload: SubscribePayload): AsyncGenerator<T> {
  let deferred: {
    resolve: (done: boolean) => void;
    reject: (err: unknown) => void;
  } | null = null;
  const pending: T[] = [];
  let throwMe: unknown = null,
    done = false;
  const dispose = client.subscribe<T>(payload, {
    next: (data) => {
      pending.push(data);
      deferred?.resolve(false);
    },
    error: (err) => {
      throwMe = err;
      deferred?.reject(throwMe);
    },
    complete: () => {
      done = true;
      deferred?.resolve(true);
    },
  });
  return {
    [Symbol.asyncIterator]() {
      return this;
    },
    async next() {
      if (done) return { done: true, value: undefined };
      if (throwMe) throw throwMe;
      if (pending.length) return { value: pending.shift()! };
      return (await new Promise<boolean>(
        (resolve, reject) => (deferred = { resolve, reject }),
      ))
        ? { done: true, value: undefined }
        : { value: pending.shift()! };
    },
    async throw(err) {
      throw err;
    },
    async return() {
      dispose();
      return { done: true, value: undefined };
    },
  };
}

(async () => {
  const subscription = subscribe({
    query: 'subscription { greetings }',
  });
  // subscription.return() to dispose

  for await (const result of subscription) {
    // next = result = { data: { greetings: 5x } }
  }
  // complete
})();
๐Ÿ”— Client usage with Observable
import { Observable } from 'relay-runtime';
// or
import { Observable } from '@apollo/client/core';
// or
import { Observable } from 'rxjs';
// or
import Observable from 'zen-observable';
// or any other lib which implements Observables as per the ECMAScript proposal: https://github.com/tc39/proposal-observable

const client = createClient({
  url: 'ws://graphql.loves:4000/observables',
});

function toObservable(operation) {
  return new Observable((observer) =>
    client.subscribe(operation, {
      next: (data) => observer.next(data),
      error: (err) => observer.error(err),
      complete: () => observer.complete(),
    }),
  );
}

const observable = toObservable({ query: `subscription { ping }` });

const subscription = observable.subscribe({
  next: (data) => {
    expect(data).toBe({ data: { ping: 'pong' } });
  },
});

// โฑ

subscription.unsubscribe();
๐Ÿ”— Client usage with Relay
import {
  Network,
  Observable,
  RequestParameters,
  Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-ws';

const subscriptionsClient = createClient({
  url: 'ws://i.love:4000/graphql',
  connectionParams: () => {
    // Note: getSession() is a placeholder function created by you
    const session = getSession();
    if (!session) {
      return {};
    }
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});

// both fetch and subscribe can be handled through one implementation
// to understand why we return Observable<any>, please see: https://github.com/enisdenjo/graphql-ws/issues/316#issuecomment-1047605774
function fetchOrSubscribe(
  operation: RequestParameters,
  variables: Variables,
): Observable<any> {
  return Observable.create((sink) => {
    if (!operation.text) {
      return sink.error(new Error('Operation text cannot be empty'));
    }
    return subscriptionsClient.subscribe(
      {
        operationName: operation.name,
        query: operation.text,
        variables,
      },
      sink,
    );
  });
}

export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);
๐Ÿ”— Client usage with urql
import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createWSClient } from 'graphql-ws';

const wsClient = createWSClient({
  url: 'ws://its.urql:4000/graphql',
});

const client = createClient({
  url: '/graphql',
  exchanges: [
    ...defaultExchanges,
    subscriptionExchange({
      forwardSubscription(operation) {
        return {
          subscribe: (sink) => {
            const dispose = wsClient.subscribe(operation, sink);
            return {
              unsubscribe: dispose,
            };
          },
        };
      },
    }),
  ],
});
๐Ÿ”— Client usage with Apollo Client Web
import { createClient } from 'graphql-ws';
// Apollo Client Web v3.5.10 has a GraphQLWsLink class which implements
// graphql-ws directly. For older versions, see the next code block
// to define your own GraphQLWsLink.
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';

const link = new GraphQLWsLink(
  createClient({
    url: 'ws://where.is:4000/graphql',
    connectionParams: () => {
      // Note: getSession() is a placeholder function created by you
      const session = getSession();
      if (!session) {
        return {};
      }
      return {
        Authorization: `Bearer ${session.token}`,
      };
    },
  }),
);
// for Apollo Client v3 older than v3.5.10:
import {
  ApolloLink,
  Operation,
  FetchResult,
  Observable,
} from '@apollo/client/core';
// or for Apollo Client v2:
// import { ApolloLink, Operation, FetchResult, Observable } from 'apollo-link'; // yarn add apollo-link

import { print } from 'graphql';
import { createClient, Client } from 'graphql-ws';

class GraphQLWsLink extends ApolloLink {
  constructor(private client: Client) {
    super();
  }

  public request(operation: Operation): Observable<FetchResult> {
    return new Observable((sink) => {
      return this.client.subscribe<FetchResult>(
        { ...operation, query: print(operation.query) },
        {
          next: sink.next.bind(sink),
          complete: sink.complete.bind(sink),
          error: sink.error.bind(sink),
        },
      );
    });
  }
}
๐Ÿ”— Client usage with Apollo Kotlin

Connect to graphql-transport-ws compatible server in Kotlin using Apollo Kotlin

val apolloClient = ApolloClient.Builder()
    .networkTransport(
        WebSocketNetworkTransport.Builder()
            .serverUrl(
                serverUrl = "http://localhost:9090/graphql",
            ).protocol(
                protocolFactory = GraphQLWsProtocol.Factory()
            ).build()
    )
    .build()
๐Ÿ”— Client usage with Apollo iOS

Connect to graphql-transport-ws compatible server in Swift using Apollo iOS

import Foundation
import Apollo
import ApolloWebSocket

let store = ApolloStore()

let normalTransport = RequestChainNetworkTransport(
  interceptorProvider: DefaultInterceptorProvider(store: store),
  endpointURL: URL(string: "http://localhost:8080/graphql")!
)

let webSocketClient = WebSocket(
  request: URLRequest(url: URL(string: "ws://localhost:8080/websocket")!),
  protocol: .graphql_transport_ws
)
let webSocketTransport = WebSocketTransport(
  websocket: webSocketClient,
  store: store
)

let splitTransport = SplitNetworkTransport(
  uploadingNetworkTransport: normalTransport,
  webSocketNetworkTransport: webSocketTransport
)

let client = ApolloClient(
  networkTransport: splitTransport,
  store: store
)
๐Ÿ”— Client usage with Apollo Studio Explorer

In Explorer Settings, click "Edit" for "Connection Settings" and select graphql-ws under "Implementation".

๐Ÿ”— Client usage with GraphiQL
import React from 'react';
import ReactDOM from 'react-dom';
import { GraphiQL } from 'graphiql';
import { createGraphiQLFetcher } from '@graphiql/toolkit';
import { createClient } from 'graphql-ws';

const fetcher = createGraphiQLFetcher({
  url: 'https://myschema.com/graphql',
  wsClient: createClient({
    url: 'wss://myschema.com/graphql',
  }),
});

export const App = () => <GraphiQL fetcher={fetcher} />;

ReactDOM.render(document.getElementByID('graphiql'), <App />);
๐Ÿ”— Client usage with retry on any connection problem
import { createClient } from 'graphql-ws';
import { waitForHealthy } from './my-servers';

const client = createClient({
  url: 'ws://any.retry:4000/graphql',
  // by default the client will immediately fail on any non-fatal
  // `CloseEvent` problem thrown during the connection phase
  //
  // see `retryAttempts` documentation about which `CloseEvent`s are
  // considered fatal regardless
  shouldRetry: () => true,
  // or pre v5.8.0:
  // isFatalConnectionProblem: () => false,
});
๐Ÿ”— Client usage with custom retry timeout strategy
import { createClient } from 'graphql-ws';
import { waitForHealthy } from './my-servers';

const client = createClient({
  url: 'ws://i.want.retry:4000/control/graphql',
  retryWait: async function waitForServerHealthyBeforeRetry() {
    // if you have a server healthcheck, you can wait for it to become
    // healthy before retrying after an abrupt disconnect (most commonly a restart)
    await waitForHealthy(url);

    // after the server becomes ready, wait for a second + random 1-4s timeout
    // (avoid DDoSing yourself) and try connecting again
    await new Promise((resolve) =>
      setTimeout(resolve, 1000 + Math.random() * 3000),
    );
  },
});
๐Ÿ”— Client usage with graceful restart
import { createClient, Client, ClientOptions } from 'graphql-ws';
import { giveMeAFreshToken } from './token-giver';

interface RestartableClient extends Client {
  restart(): void;
}

function createRestartableClient(options: ClientOptions): RestartableClient {
  let restartRequested = false;
  let restart = () => {
    restartRequested = true;
  };

  const client = createClient({
    ...options,
    on: {
      ...options.on,
      opened: (socket) => {
        options.on?.opened?.(socket);

        restart = () => {
          if (socket.readyState === WebSocket.OPEN) {
            // if the socket is still open for the restart, do the restart
            socket.close(4205, 'Client Restart');
          } else {
            // otherwise the socket might've closed, indicate that you want
            // a restart on the next opened event
            restartRequested = true;
          }
        };

        // just in case you were eager to restart
        if (restartRequested) {
          restartRequested = false;
          restart();
        }
      },
    },
  });

  return {
    ...client,
    restart: () => restart(),
  };
}

const client = createRestartableClient({
  url: 'ws://graceful.restart:4000/is/a/non-fatal/close-code',
  connectionParams: async () => {
    const token = await giveMeAFreshToken();
    return { token };
  },
});

// all subscriptions from `client.subscribe` will resubscribe after `client.restart`
๐Ÿ”— Client usage with ping/pong timeout and latency metrics
import { createClient } from 'graphql-ws';

let activeSocket,
  timedOut,
  pingSentAt = 0,
  latency = 0;
createClient({
  url: 'ws://i.time.out:4000/and-measure/latency',
  keepAlive: 10_000, // ping server every 10 seconds
  on: {
    opened: (socket) => (activeSocket = socket),
    ping: (received) => {
      if (!received /* sent */) {
        pingSentAt = Date.now();
        timedOut = setTimeout(() => {
          if (activeSocket.readyState === WebSocket.OPEN)
            activeSocket.close(4408, 'Request Timeout');
        }, 5_000); // wait 5 seconds for the pong and then close the connection
      }
    },
    pong: (received) => {
      if (received) {
        latency = Date.now() - pingSentAt;
        clearTimeout(timedOut); // pong is received, clear connection close timeout
      }
    },
  },
});
๐Ÿ”— Client usage with abrupt termination on pong timeout
import { createClient } from 'graphql-ws';

let timedOut;
const client = createClient({
  url: 'ws://terminate.me:4000/on-pong-timeout',
  keepAlive: 10_000, // ping server every 10 seconds
  on: {
    ping: (received) => {
      if (!received /* sent */) {
        timedOut = setTimeout(() => {
          // a close event `4499: Terminated` is issued to the current WebSocket and an
          // artificial `{ code: 4499, reason: 'Terminated', wasClean: false }` close-event-like
          // object is immediately emitted without waiting for the one coming from `WebSocket.onclose`
          //
          // calling terminate is not considered fatal and a connection retry will occur as expected
          //
          // see: https://github.com/enisdenjo/graphql-ws/discussions/290
          client.terminate();
        }, 5_000);
      }
    },
    pong: (received) => {
      if (received) {
        clearTimeout(timedOut);
      }
    },
  },
});
๐Ÿ”— Client usage with manual pings and pongs
import {
  createClient,
  Client,
  ClientOptions,
  stringifyMessage,
  PingMessage,
  PongMessage,
  MessageType,
} from 'graphql-ws';

interface PingerClient extends Client {
  ping(payload?: PingMessage['payload']): void;
  pong(payload?: PongMessage['payload']): void;
}

function createPingerClient(options: ClientOptions): PingerClient {
  let activeSocket: WebSocket;

  const client = createClient({
    disablePong: true,
    ...options,
    on: {
      opened: (socket) => {
        options.on?.opened?.(socket);
        activeSocket = socket;
      },
    },
  });

  return {
    ...client,
    ping: (payload) => {
      if (activeSocket.readyState === WebSocket.OPEN)
        activeSocket.send(
          stringifyMessage({
            type: MessageType.Ping,
            payload,
          }),
        );
    },
    pong: (payload) => {
      if (activeSocket.readyState === WebSocket.OPEN)
        activeSocket.send(
          stringifyMessage({
            type: MessageType.Pong,
            payload,
          }),
        );
    },
  };
}
๐Ÿ”— Client usage supported check
import { createClient } from 'graphql-ws';

function supportsGraphQLTransportWS(url: string): Promise<boolean> {
  return new Promise((resolve) => {
    const client = createClient({
      url,
      retryAttempts: 0, // fail immediately
      lazy: false, // connect as soon as the client is created
      on: {
        closed: () => resolve(false), // connection rejected, probably not supported
        connected: () => {
          resolve(true); // connected = supported
          client.dispose(); // dispose after check
        },
      },
    });
  });
}

const supported = await supportsGraphQLTransportWS(
  'ws://some.unknown:4000/enpoint',
);
if (supported) {
  // use graphql-ws
} else {
  // fallback (use subscriptions-transport-ws?)
}
๐Ÿ”— Client usage in browser
<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8" />
    <title>GraphQL over WebSocket</title>
    <script
      type="text/javascript"
      src="https://unpkg.com/graphql-ws/umd/graphql-ws.min.js"
    ></script>
  </head>
  <body>
    <script type="text/javascript">
      const client = graphqlWs.createClient({
        url: 'ws://umdfor.the:4000/win/graphql',
      });

      // consider other recipes for usage inspiration
    </script>
  </body>
</html>
๐Ÿ”— Client usage in Node
const ws = require('ws'); // yarn add ws
const Crypto = require('crypto');
const { createClient } = require('graphql-ws');

const client = createClient({
  url: 'ws://no.browser:4000/graphql',
  webSocketImpl: ws,
  /**
   * Generates a v4 UUID to be used as the ID.
   * Reference: https://gist.github.com/jed/982883
   */
  generateID: () =>
    ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
      (c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
    ),
});

// consider other recipes for usage inspiration
๐Ÿ”— Client usage in Node with custom headers (not possible in browsers)
const WebSocket = require('ws'); // yarn add ws
const { createClient } = require('graphql-ws');

class MyWebSocket extends WebSocket {
  constructor(address, protocols) {
    super(address, protocols, {
      headers: {
        // your custom headers go here
        'User-Agent': 'graphql-ws client',
        'X-Custom-Header': 'hello world',
      },
    });
  }
}

const client = createClient({
  url: 'ws://node.custom-headers:4000/graphql',
  webSocketImpl: MyWebSocket,
});

// consider other recipes for usage inspiration
๐Ÿ”— Server usage with ws
// minimal version of `import { useServer } from 'graphql-ws/lib/use/ws';`

import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { makeServer, CloseCode } from 'graphql-ws';
import { schema } from './my-graphql-schema';

// make
const server = makeServer({ schema });

// create websocket server
const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

// implement
wsServer.on('connection', (socket, request) => {
  // a new socket opened, let graphql-ws take over
  const closed = server.opened(
    {
      protocol: socket.protocol, // will be validated
      send: (data) =>
        new Promise((resolve, reject) => {
          socket.send(data, (err) => (err ? reject(err) : resolve()));
        }), // control your data flow by timing the promise resolve
      close: (code, reason) => socket.close(code, reason), // there are protocol standard closures
      onMessage: (cb) =>
        socket.on('message', async (event) => {
          try {
            // wait for the the operation to complete
            // - if init message, waits for connect
            // - if query/mutation, waits for result
            // - if subscription, waits for complete
            await cb(event.toString());
          } catch (err) {
            // all errors that could be thrown during the
            // execution of operations will be caught here
            socket.close(CloseCode.InternalServerError, err.message);
          }
        }),
    },
    // pass values to the `extra` field in the context
    { socket, request },
  );

  // notify server that the socket closed
  socket.once('close', (code, reason) => closed(code, reason));
});
๐Ÿ”— Server usage with ws and custom auth handling
// check extended implementation at `{ useServer } from 'graphql-ws/lib/use/ws'`

import http from 'http';
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { makeServer, CloseCode } from 'graphql-ws';
import { schema } from './my-graphql-schema';
import { validate } from './my-auth';

// extra in the context
interface Extra {
  readonly request: http.IncomingMessage;
}

// your custom auth
class Forbidden extends Error {}
function handleAuth(request: http.IncomingMessage) {
  // do your auth on every subscription connect
  const good = validate(request.headers['authorization']);
  // or const { iDontApprove } = session(request.cookies);
  if (!good) {
    // throw a custom error to be handled
    throw new Forbidden(':(');
  }
}

// make graphql server
const gqlServer = makeServer<Extra>({
  schema,
  onConnect: async (ctx) => {
    // do your auth on every connect (recommended)
    await handleAuth(ctx.extra.request);
  },
  onSubscribe: async (ctx) => {
    // or maybe on every subscribe
    await handleAuth(ctx.extra.request);
  },
});

// create websocket server
const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

// implement
wsServer.on('connection', (socket, request) => {
  // you may even reject the connection without ever reaching the lib
  // return socket.close(4403, 'Forbidden');

  // pass the connection to graphql-ws
  const closed = gqlServer.opened(
    {
      protocol: socket.protocol, // will be validated
      send: (data) =>
        new Promise((resolve, reject) => {
          // control your data flow by timing the promise resolve
          socket.send(data, (err) => (err ? reject(err) : resolve()));
        }),
      close: (code, reason) => socket.close(code, reason), // for standard closures
      onMessage: (cb) => {
        socket.on('message', async (event) => {
          try {
            // wait for the the operation to complete
            // - if init message, waits for connect
            // - if query/mutation, waits for result
            // - if subscription, waits for complete
            await cb(event.toString());
          } catch (err) {
            // all errors that could be thrown during the
            // execution of operations will be caught here
            if (err instanceof Forbidden) {
              // your magic
            } else {
              socket.close(CloseCode.InternalServerError, err.message);
            }
          }
        });
      },
    },
    // pass request to the extra
    { request },
  );

  // notify server that the socket closed
  socket.once('close', (code, reason) => closed(code, reason));
});
๐Ÿ”— Server usage with ws and subprotocol pings and pongs
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import {
  makeServer,
  CloseCode,
  stringifyMessage,
  MessageType,
} from 'graphql-ws';
import { schema } from './my-graphql-schema';

// make
const server = makeServer({ schema });

// create websocket server
const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

// implement
wsServer.on('connection', (socket, request) => {
  // subprotocol pinger because WS level ping/pongs might not be available
  let pinger, pongWait;
  function ping() {
    if (socket.readyState === socket.OPEN) {
      // send the subprotocol level ping message
      socket.send(stringifyMessage({ type: MessageType.Ping }));

      // wait for the pong for 6 seconds and then terminate
      pongWait = setTimeout(() => {
        clearInterval(pinger);
        socket.close();
      }, 6_000);
    }
  }

  // ping the client on an interval every 12 seconds
  pinger = setInterval(() => ping(), 12_000);

  // a new socket opened, let graphql-ws take over
  const closed = server.opened(
    {
      protocol: socket.protocol, // will be validated
      send: (data) => socket.send(data),
      close: (code, reason) => socket.close(code, reason),
      onMessage: (cb) =>
        socket.on('message', async (event) => {
          try {
            // wait for the the operation to complete
            // - if init message, waits for connect
            // - if query/mutation, waits for result
            // - if subscription, waits for complete
            await cb(event.toString());
          } catch (err) {
            // all errors that could be thrown during the
            // execution of operations will be caught here
            socket.close(CloseCode.InternalServerError, err.message);
          }
        }),
      // pong received, clear termination timeout
      onPong: () => clearTimeout(pongWait),
    },
    // pass values to the `extra` field in the context
    { socket, request },
  );

  // notify server that the socket closed and stop the pinger
  socket.once('close', (code, reason) => {
    clearTimeout(pongWait);
    clearInterval(pinger);
    closed(code, reason);
  });
});
๐Ÿ”— Server usage with Cloudflare Workers

Please check the worker-graphql-ws-template repo out.

๐Ÿ”— ws server usage with GraphQL Yoga
import { ExecutionArgs, execute, subscribe } from 'graphql';
import { createServer } from '@graphql-yoga/node';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './my-graphql-schema';

async function main() {
  const yogaApp = createServer({
    schema,
    graphiql: {
      subscriptionsProtocol: 'WS', // use WebSockets instead of SSE
    },
  });

  const httpServer = await yogaApp.start();
  const wsServer = new WebSocketServer({
    server: httpServer,
    path: yogaApp.getAddressInfo().endpoint,
  });

  // yoga's envelop may augment the `execute` and `subscribe` operations
  // so we need to make sure we always use the freshest instance
  type EnvelopedExecutionArgs = ExecutionArgs & {
    rootValue: {
      execute: typeof execute;
      subscribe: typeof subscribe;
    };
  };

  useServer(
    {
      execute: (args) =>
        (args as EnvelopedExecutionArgs).rootValue.execute(args),
      subscribe: (args) =>
        (args as EnvelopedExecutionArgs).rootValue.subscribe(args),
      onSubscribe: async (ctx, msg) => {
        const { schema, execute, subscribe, contextFactory, parse, validate } =
          yogaApp.getEnveloped(ctx);

        const args: EnvelopedExecutionArgs = {
          schema,
          operationName: msg.payload.operationName,
          document: parse(msg.payload.query),
          variableValues: msg.payload.variables,
          contextValue: await contextFactory(),
          rootValue: {
            execute,
            subscribe,
          },
        };

        const errors = validate(args.schema, args.document);
        if (errors.length) return errors;
        return args;
      },
    },
    wsServer,
  );
}

main().catch((e) => {
  console.error(e);
  process.exit(1);
});
๐Ÿ”— ws server usage with Express GraphQL
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import express from 'express';
import { graphqlHTTP } from 'express-graphql';
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './my-graphql-schema';

// create express and middleware
const app = express();
app.use('/graphql', graphqlHTTP({ schema }));

const server = app.listen(4000, () => {
  // create and use the websocket server
  const wsServer = new WebSocketServer({
    server,
    path: '/graphql',
  });

  useServer({ schema }, wsServer);
});
๐Ÿ”— ws server usage with Apollo Server Express
import { ApolloServer } from 'apollo-server-express';
import { createServer } from 'http';
import express from 'express';
import { ApolloServerPluginDrainHttpServer } from 'apollo-server-core';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './my-graphql-schema';

// create express and HTTP server
const app = express();
const httpServer = createServer(app);

// create websocket server
const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
});

// Save the returned server's info so we can shut down this server later
const serverCleanup = useServer({ schema }, wsServer);

// create apollo server
const apolloServer = new ApolloServer({
  schema,
  plugins: [
    // Proper shutdown for the HTTP server.
    ApolloServerPluginDrainHttpServer({ httpServer }),

    // Proper shutdown for the WebSocket server.
    {
      async serverWillStart() {
        return {
          async drainServer() {
            await serverCleanup.dispose();
          },
        };
      },
    },
  ],
});

await apolloServer.start();
apolloServer.applyMiddleware({ app });

httpServer.listen(4000);
๐Ÿ”— ws server usage with deprecated fastify-websocket
import Fastify from 'fastify'; // yarn add [email protected]^3
import fastifyWebsocket from 'fastify-websocket'; // yarn add [email protected]
import { makeHandler } from 'graphql-ws/lib/use/fastify-websocket';
import { schema } from './previous-step';

const fastify = Fastify();
fastify.register(fastifyWebsocket);

fastify.get('/graphql', { websocket: true }, makeHandler({ schema }));

fastify.listen(4000, (err) => {
  if (err) {
    fastify.log.error(err);
    return process.exit(1);
  }
  console.log('Listening to port 4000');
});
๐Ÿ”— ws server usage with subscriptions-transport-ws backwards compatibility
import http from 'http';
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { execute, subscribe } from 'graphql';
import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from 'graphql-ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { SubscriptionServer, GRAPHQL_WS } from 'subscriptions-transport-ws';
import { schema } from './my-graphql-schema';

// graphql-ws
const graphqlWs = new WebSocketServer({ noServer: true });
useServer({ schema }, graphqlWs);

// subscriptions-transport-ws
const subTransWs = new WebSocketServer({ noServer: true });
SubscriptionServer.create(
  {
    schema,
    execute,
    subscribe,
  },
  subTransWs,
);

// create http server
const server = http.createServer(function weServeSocketsOnly(_, res) {
  res.writeHead(404);
  res.end();
});

// listen for upgrades and delegate requests according to the WS subprotocol
server.on('upgrade', (req, socket, head) => {
  // extract websocket subprotocol from header
  const protocol = req.headers['sec-websocket-protocol'];
  const protocols = Array.isArray(protocol)
    ? protocol
    : protocol?.split(',').map((p) => p.trim());

  // decide which websocket server to use
  const wss =
    protocols?.includes(GRAPHQL_WS) && // subscriptions-transport-ws subprotocol
    !protocols.includes(GRAPHQL_TRANSPORT_WS_PROTOCOL) // graphql-ws subprotocol
      ? subTransWs
      : // graphql-ws will welcome its own subprotocol and
        // gracefully reject invalid ones. if the client supports
        // both transports, graphql-ws will prevail
        graphqlWs;
  wss.handleUpgrade(req, socket, head, (ws) => {
    wss.emit('connection', ws, req);
  });
});

server.listen(4000);
๐Ÿ”— ws server usage with console logging
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './my-graphql-schema';

const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer(
  {
    schema,
    onConnect: (ctx) => {
      console.log('Connect', ctx);
    },
    onSubscribe: (ctx, msg) => {
      console.log('Subscribe', { ctx, msg });
    },
    onNext: (ctx, msg, args, result) => {
      console.debug('Next', { ctx, msg, args, result });
    },
    onError: (ctx, msg, errors) => {
      console.error('Error', { ctx, msg, errors });
    },
    onComplete: (ctx, msg) => {
      console.log('Complete', { ctx, msg });
    },
  },
  wsServer,
);
๐Ÿ”— ws server usage on a multi WebSocket server
import http from 'http';
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import url from 'url';
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './my-graphql-schema';

const server = http.createServer(function weServeSocketsOnly(_, res) {
  res.writeHead(404);
  res.end();
});

/**
 * Two websocket servers on different paths:
 * - `/wave` sends out waves
 * - `/graphql` serves graphql
 */
const waveWS = new WebSocketServer({ noServer: true });
const graphqlWS = new WebSocketServer({ noServer: true });

// delegate upgrade requests to relevant destinations
server.on('upgrade', (request, socket, head) => {
  const pathname = url.parse(request.url).pathname;

  if (pathname === '/wave') {
    return waveWS.handleUpgrade(request, socket, head, (client) => {
      waveWS.emit('connection', client, request);
    });
  }

  if (pathname === '/graphql') {
    return graphqlWS.handleUpgrade(request, socket, head, (client) => {
      graphqlWS.emit('connection', client, request);
    });
  }

  return socket.destroy();
});

// wave on connect
waveWS.on('connection', (socket) => {
  socket.send('๐ŸŒŠ');
});

// serve graphql
useServer({ schema }, graphqlWS);

server.listen(4000);
๐Ÿ”— ws server usage with custom context value
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema, getDynamicContext } from './my-graphql';

const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer(
  {
    context: (ctx, msg, args) => {
      return getDynamicContext(ctx, msg, args);
    }, // or static context by supplying the value direcly
    schema,
  },
  wsServer,
);
๐Ÿ”— ws server usage with dynamic schema
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';

const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer(
  {
    schema: async (ctx, msg, executionArgsWithoutSchema) => {
      // will be called on every subscribe request
      // allowing you to dynamically supply the schema
      // using the depending on the provided arguments.
      // throwing an error here closes the socket with
      // the `Error` message in the close event reason
      const isAdmin = await checkIsAdmin(ctx.request);
      if (isAdmin) return getDebugSchema(ctx, msg, executionArgsWithoutSchema);
      return schema;
    },
  },
  wsServer,
);
๐Ÿ”— ws server usage with custom validation
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { validate } from 'graphql';
import { schema, myValidationRules } from './my-graphql';

const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer(
  {
    validate: (schema, document) =>
      validate(schema, document, myValidationRules),
  },
  wsServer,
);
๐Ÿ”— ws server usage with custom execution arguments
import { parse, validate } from 'graphql';
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema, myValidationRules } from './my-graphql';

const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer(
  {
    onSubscribe: (ctx, msg) => {
      const args = {
        schema,
        operationName: msg.payload.operationName,
        document: parse(msg.payload.query),
        variableValues: msg.payload.variables,
      };

      // dont forget to validate when returning custom execution args!
      const errors = validate(args.schema, args.document, myValidationRules);
      if (errors.length > 0) {
        return errors; // return `GraphQLError[]` to send `ErrorMessage` and stop subscription
      }

      return args;
    },
  },
  wsServer,
);
๐Ÿ”— ws server usage accepting only subscription operations
import { parse, validate, getOperationAST, GraphQLError } from 'graphql';
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './my-graphql';

const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer(
  {
    onSubscribe: (_ctx, msg) => {
      // construct the execution arguments
      const args = {
        schema,
        operationName: msg.payload.operationName,
        document: parse(msg.payload.query),
        variableValues: msg.payload.variables,
      };

      const operationAST = getOperationAST(args.document, args.operationName);
      if (!operationAST) {
        // returning `GraphQLError[]` sends an `ErrorMessage` and stops the subscription
        return [new GraphQLError('Unable to identify operation')];
      }

      // handle mutation and query requests
      if (operationAST.operation !== 'subscription') {
        // returning `GraphQLError[]` sends an `ErrorMessage` and stops the subscription
        return [new GraphQLError('Only subscription operations are supported')];

        // or if you want to be strict and terminate the connection on illegal operations
        throw new Error('Only subscription operations are supported');
      }

      // dont forget to validate
      const errors = validate(args.schema, args.document);
      if (errors.length > 0) {
        // returning `GraphQLError[]` sends an `ErrorMessage` and stops the subscription
        return errors;
      }

      // ready execution arguments
      return args;
    },
  },
  wsServer,
);
๐Ÿ”— ws server and client usage with persisted queries
// ๐Ÿ›ธ server

import { parse, ExecutionArgs } from 'graphql';
import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './my-graphql-schema';

// a unique GraphQL execution ID used for representing
// a query in the persisted queries store. when subscribing
// you should use the `SubscriptionPayload.query` to transmit the id
type QueryID = string;

const queriesStore: Record<QueryID, ExecutionArgs> = {
  iWantTheGreetings: {
    schema, // you may even provide different schemas in the queries store
    document: parse('subscription Greetings { greetings }'),
  },
};

const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer(
  {
    onSubscribe: (_ctx, msg) => {
      const persistedQuery =
        queriesStore[msg.payload.extensions?.persistedQuery];
      if (persistedQuery) {
        return {
          ...persistedQuery,
          variableValues: msg.payload.variables, // use the variables from the client
        };
      }

      // for extra security you only allow the queries from the store.
      // if you want to support both, simply remove the throw below and
      // graphql-ws will handle the query for you
      throw new Error('404: Query Not Found');
    },
  },
  wsServer,
);
// ๐Ÿ“บ client

import { createClient } from 'graphql-ws';

const client = createClient({
  url: 'ws://persisted.graphql:4000/queries',
});

(async () => {
  const onNext = () => {
    /**/
  };

  await new Promise((resolve, reject) => {
    client.subscribe(
      {
        query: '', // query field is required, but you can leave it empty for persisted queries
        extensions: {
          persistedQuery: 'iWantTheGreetings',
        },
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });

  expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
})();
๐Ÿ”— ws server and client auth usage with token expiration, validation and refresh
// ๐Ÿ›ธ server

import { WebSocketServer } from 'ws'; // yarn add ws
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { useServer } from 'graphql-ws/lib/use/ws';
import { CloseCode } from 'graphql-ws';
import { schema } from './my-graphql-schema';
import { isTokenValid } from './my-auth';

const wsServer = new WebSocket.Server({
  port: 4000,
  path: '/graphql',
});

useServer(
  {
    schema,
    onConnect: async (ctx) => {
      // do your auth check on every connect (recommended)
      if (!(await isTokenValid(ctx.connectionParams?.token)))
        // returning false from the onConnect callback will close with `4403: Forbidden`;
        // therefore, being synonymous to ctx.extra.socket.close(4403, 'Forbidden');
        return false;
    },
    onSubscribe: async (ctx) => {
      // or maybe on every subscribe
      if (!(await isTokenValid(ctx.connectionParams?.token)))
        return ctx.extra.socket.close(CloseCode.Forbidden, 'Forbidden');
    },
  },
  wsServer,
);
// ๐Ÿ“บ client

import { createClient, CloseCode } from 'graphql-ws';
import {
  getCurrentToken,
  getCurrentTokenExpiresIn,
  refreshCurrentToken,
} from './my-auth';

// non-fatal WebSocket connection close events will cause the
// client to automatically reconnect. the retries are silent, meaning
// that the client will not error out unless the retry attempts have been
// exceeded or the close event was fatal (read more about the fatal
// close events in the documentation). additionally, all active subscriptions
// will automatically resubscribe upon successful reconnect. this behaviour
// can be leveraged to implement a secure and sound way of authentication;
// handling server-side validation, expiry indication and timely token refreshes

// indicates that the server closed the connection because of
// an auth problem. it indicates that the token should refresh
let shouldRefreshToken = false,
  // the socket close timeout due to token expiry
  tokenExpiryTimeout = null;

const client = createClient({
  url: 'ws://server-validates.auth:4000/graphql',
  connectionParams: async () => {
    if (shouldRefreshToken) {
      // refresh the token because it is no longer valid
      await refreshCurrentToken();
      // and reset the flag to avoid refreshing too many times
      shouldRefreshToken = false;
    }
    return { token: getCurrentToken() };
  },
  on: {
    connected: (socket) => {
      // clear timeout on every connect for debouncing the expiry
      clearTimeout(tokenExpiryTimeout);

      // set a token expiry timeout for closing the socket
      // with an `4403: Forbidden` close event indicating
      // that the token expired. the `closed` event listner below
      // will set the token refresh flag to true
      tokenExpiryTimeout = setTimeout(() => {
        if (socket.readyState === WebSocket.OPEN)
          socket.close(CloseCode.Forbidden, 'Forbidden');
      }, getCurrentTokenExpiresIn());
    },
    closed: (event) => {
      // if closed with the `4403: Forbidden` close event
      // the client or the server is communicating that the token
      // is no longer valid and should be therefore refreshed
      if (event.code === CloseCode.Forbidden) shouldRefreshToken = true;
    },
  },
});
๐Ÿ”— ws server and client usage with subscription acknowledgment
// ๐Ÿ›ธ server

import { WebSocketServer } from 'ws';
// import ws from 'ws'; yarn add [email protected]
// const WebSocketServer = ws.Server;
import { MessageType, stringifyMessage } from 'graphql-ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from './my-graphql-schema';

const wsServer = new WebSocketServer({
  port: 4000,
  path: '/graphql',
});

useServer<undefined, { ackWaiters: Record<string, () => void> }>(
  {
    schema,
    onConnect: (ctx) => {
      // listeners waiting for operation acknowledgment. if subscription, this means the graphql.subscribe was successful
      // intentionally in context extra to avoid memory leaks when clients disconnect
      ctx.extra.ackWaiters = {};
    },
    onSubscribe: (ctx, msg) => {
      const ackId = msg.payload.extensions?.ackId;
      if (typeof ackId === 'string') {
        // if acknowledgment ID is present, create an acknowledger that will be executed when operation succeeds
        ctx.extra.ackWaiters![msg.id] = () => {
          ctx.extra.socket.send(
            stringifyMessage({
              type: MessageType.Ping,
              payload: {
                ackId,
              },
            }),
          );
        };
      }
    },
    onOperation: (ctx, msg) => {
      // acknowledge operation success and remove waiter
      ctx.extra.ackWaiters![msg.id]?.();
      delete ctx.extra.ackWaiters![msg.id];
    },
  },
  wsServer,
);

console.log('Listening to port 4000');
// ๐Ÿ“บ client

import {
  Client,
  ClientOptions,
  createClient,
  ExecutionResult,
  Sink,
  SubscribePayload,
} from 'graphql-ws';

// client with augmented subscribe method accepting the `onAck` callback for operation acknowledgement
type ClientWithSubscribeAck = Omit<Client, 'subscribe'> & {
  subscribe<Data = Record<string, unknown>, Extensions = unknown>(
    payload: SubscribePayload,
    sink: Sink<ExecutionResult<Data, Extensions>>,
    onAck: () => void,
  ): () => void;
};

function createClientWithSubscribeAck(
  options: ClientOptions,
): ClientWithSubscribeAck {
  const client = createClient(options);

  const ackListeners: Record<string, () => void> = {};
  client.on('ping', (_received, payload) => {
    const ackId = payload?.ackId;
    if (typeof ackId === 'string') {
      ackListeners[ackId]?.();
      delete ackListeners[ackId];
    }
  });

  return {
    ...client,
    subscribe: (payload, sink, onAck) => {
      const ackId = Math.random().toString(); // be wary of uniqueness
      ackListeners[ackId] = onAck;
      return client.subscribe(
        {
          ...payload,
          extensions: {
            ...payload.extensions,
            ackId,
          },
        },
        sink,
      );
    },
  };
}

Using the augmented client would be as simple as:

const client = createClientWithSubscribeAck({
  url: 'ws://i.want.ack:4000/graphql',
});

(async () => {
  const onNext = () => {
    /* handle incoming values */
  };

  let unsubscribe = () => {
    /* complete the subscription */
  };

  let subscriptionAcknowledged = () => {
    /* server successfully subscribed */
  };

  await new Promise((resolve, reject) => {
    unsubscribe = client.subscribe(
      {
        query: 'subscription { greetings }',
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
      subscriptionAcknowledged,
    );
  });

  expect(subscriptionAcknowledged).toBeCalledFirst();

  expect(onNext).then.toBeCalledTimes(5); // we say "Hi" in 5 languages
})();

Documentation

Check the docs folder out for TypeDoc generated documentation.

How does it work?

Read about the exact transport intricacies used by the library in the GraphQL over WebSocket Protocol document.

Want to help?

File a bug, contribute with code, or improve documentation? Read up on our guidelines for contributing and drive development with yarn test --watch away!

Disclaimer

This library and the GraphQL over WebSocket Protocol are not cross-compatible with the deprecated subscriptions-transport-ws and its accompanying Protocol.

You must use graphql-ws coherently and implement the GraphQL over WebSocket Protocol on both sides, server and the client.

Alternatives To Graphql Ws
Select To Compare


Alternative Project Comparisons
Related Awesome Lists
Top Programming Languages

Get A Weekly Email With Trending Projects For These Topics
No Spam. Unsubscribe easily at any time.
Typescript (281,747)ย 
Server (65,786)ย 
Express (31,803)ย 
Graphql (16,147)ย 
Websocket (14,120)ย 
Transport (5,358)ย 
Apollo (4,474)ย 
Relay (4,311)ย 
Observables (3,534)ย 
Fastify (847)ย 
Uwebsockets (19)ย