DEV Community

loading...
Cover image for GraphQL Live Queries with Socket.io

GraphQL Live Queries with Socket.io

n1ru4l profile image Laurin Quast ・Updated on ・9 min read

Photo by Luca Campioni on Unsplash

Note: For a better understanding I recommend reading Subscriptions and Live Queries - Real-Time with GraphQL first.

I have been using a GraphQL schema served over Socket.io for a long time. Finally, I abstracted the protocol for both client and server into a library for easier re-use across projects.

In addition to the usual GraphQL operations, I also added support for executing live queries.

TL;DR:

  • @n1ru4l/socket-io-graphql-server: A layer for serving a GraphQL schema via a socket.io server. Supports Queries, Mutations, Subscriptions, and Live Queries.
  • @n1ru4l/socket-io-graphql-client. A network interface for consuming a GraphQL schema that is served via @n1ru4l/socket-io-graphql-server. Can be used with all major GraphQL clients such as Relay, Apollo Client, or Urql.
  • @n1ru4l/graphql-live-query. Utilities for adding live queries to ANY GraphQL schema.
  • @n1ru4l/in-memory-live-query-store. GraphQL live query implementation.

All the packages can be found in this repository:

https://github.com/n1ru4l/graphql-live-queries

For showcasing the libraries I created a todo example app that syncs its state across all clients using the above packages:

  • Server: todo app server implementation with graphql-js, @n1ru4l/socket-io-graphql-server and @n1ru4l/in-memory-live-query-store.
  • Client Relay: Todo app client implementation with create-react-app, relay and @n1ru4l/socket-io-graphql-client.
  • Client Apollo: Todo app client implementation with create-react-app, @apollo/client and @n1ru4l/socket-io-graphql-client.
  • Client Urql: Todo app client implementation with create-react-app, urql and @n1ru4l/socket-io-graphql-client.

Motivation

It seems like real-time for GraphQL is not pushed as much as it should by all the big players out there.

The most popular Node.js implementation for subscriptions is poorly maintained as apollo focuses more on different fields.

There a few live query implementations and NONE that are not tied to a specific database out there.

A few cool ideas popped up in the community (such as graphql-live-subscriptions. But none of those are maintained or have some major flaws such as not being compatible with interface or union types.

The implementations in @n1ru4l/graphql-live-query and @n1ru4l/in-memory-live-query-store should serve as an example of how it could be done without being tied to any specific (reactive) database or data structure. The implementation will hopefully mature with time as people report new use-cases and start adopting live queries.

In addition to that, I created both @n1ru4l/socket-io-graphql-server and @n1ru4l/socket-io-graphql-client as I was already heavily using GraphQL over Socket.io in a project that required real-time updates.

GraphQL already has a solution for real-time

Subscriptions are the perfect tool for responding to events. An example of this would be triggering a sound or showing a toast message once a new message has been received.

Subscriptions are also often used for updating existing query results on the client. Depending on the complexity cache update code can eventually become pretty bloated. Often it is more straight-forward to simply refetch the query once a subscription event is received.

Live Query Magic

Live queries however should feel magical and update the UI with the latest data from the server without having to do any cache update wizardry. This moves the complexity from the client to the server.

Concept

The current definition of a live query for my project is a query operation that is annotated with a @live directive.

query users @live {
  users(first: 10) {
    id
    login
  }
}
Enter fullscreen mode Exit fullscreen mode

A live query is sent to the server (via WebSocket or HTTP) and stored there until the client disconnects or notifies the server he is no longer interested in the live query operation (and hence the server disposing of it).

On the server, the query is re-executed once the data selected by the live query operation changes. The result is then streamed to the client. A re-execution can be scheduled by invalidating a selected resource of the result of the previous execution. The invalidation can either be triggered by using schema coordinates for the root field (e.g. Query.todos) or by a resource identifier (e.g. Todo:1, todo with the id 1). UPDATE: You can learn more how the library collects the resource identifiers here..

LiveQueryStore Implementation

The InMemoryLiveQueryStore is a class provided by the @n1ru4l/in-memory-live-query-store package InMemoryLiveQueryStore which stores all the information about the active live queries in memory.

Registering the LiveQueryStore

In order to enable live queries for the @n1ru4l/socket-io-graphql-server implementation a excute function that returns a AsyncIterator must be passed to the registerSocketIOGraphQLServer function.

In addition to the default execute function from the graphql-js reference implementation, the new execute function can also return a AsyncIterableIterator<ExecutionResult>. Note: As @defer and @stream is added to the graphql-js reference implementation it now also can return AsyncIterators.

The InMemoryLiveQueryStore has the execute property which must be used for executing the live queries (but also non-live query operations by falling back to the default execute from graphql-js if the operation is not identified as a live query operation).

import socketIO from "socket.io";
import { InMemoryLiveQueryStore } from "@n1ru4l/in-memory-live-query-store";
import { registerSocketIOGraphQLServer } from "@n1ru4l/socket-io-graphql-server";
import { schema } from "./schema";

const liveQueryStore = new InMemoryLiveQueryStore();
const server = socketIO(httpServer);

registerSocketIOGraphQLServer({
  socketServer,
  // getExecutionParameter is invoked for each incoming operation.
  // a different context or even schema based on the connection can be returned
  // and will be used for that operation.
  getExecutionParameter: () => ({
    execute: liveQueryStore.execute,
    graphQLExecutionParameter: {
      schema,
      contextValue: {
        liveQueryStore,
      },
    },
  }),
});
Enter fullscreen mode Exit fullscreen mode

There is a WIP pull request on the graphql-js reference implementation for adding AsyncIterableIterator<ExecutionResult> as a valid return result from execute for @defer and @stream directives. As a side effect, this would also help live queries.

Notifying the live query store for changes

In order to re-execute the registered live queries and stream the new results to the connected clients, we must invalidate the live query operations, that select the specific data.

For the given query:

query messages @live {
  messages(first: 10) {
    id
    content
    author {
      id
      name
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

That would look like this:

// Mutation.createMessage resolver
const createMessage = async (root, args, context) => {
  await context.messageStore.createMessage({
    content: args.content,
    userId: context.viewer.id,
  });
  // notify liveQueryStore via query schema coordinate that all live queries that select Queries.messages must be re-executed and sent to the clients.
  context.liveQueryStore.invalidate("Query.messages");
};

const editMessage = async (root, args, context) => {
  await context.messageStore.updateMessage({
    messageId: args.messageId,
    content: args.content,
  });
  // notify liveQueryStore via resource identifier
  context.liveQueryStore.invalidate(`Message:${args.messageId}`);
}
Enter fullscreen mode Exit fullscreen mode

Note: It might be possible to do some kind of abstraction here. Imagine the live query store living on your mesh entry point keeping track of the live queries and mutations occurring and then automatically triggering the invalidations based on mutations instead of hard coding it into resolvers. Apollo recently build something similar but quite a bit different.

Adding the @live directive to your GraphQL schema

@n1ru4l/graphql-live-query exports a directive that can easily be added to an existing schema. You must add it, otherwise, the GraphQL server will complain about unknown directives in your GraphQL operations.

import { GraphQLLiveDirective } from "@n1ru4l/graphql-live-query";

export const schema = new gql.GraphQLSchema({
  query,
  mutation,
  subscription,
  directives: [GraphQLLiveDirective],
});
Enter fullscreen mode Exit fullscreen mode

For people that are using a GraphQL SDL driven development flow, you must add the following to your type definitions.

directive @live on QUERY
Enter fullscreen mode Exit fullscreen mode

Consuming live queries on the client

The @n1ru4l/socket-io-graphql-client package can be used to execute (live) Query, Mutation and Subscription operations on to the server setup by @n1ru4l/socket-io-graphql-server. It implements the underlying GraphQL over Socket.io protocol.

I also created a PR for supporting live queries with graphql-transport-ws.

Basic client creation

import io from "socket.io-client";
import { createSocketIOGraphQLClient } from "@n1ru4l/socket-io-graphql-client";

const socket = io();

const socketIOGraphQLClient = createSocketIOGraphQLClient(socket);
Enter fullscreen mode Exit fullscreen mode

Executing GraphQL operations

The SocketIOGraphQLClient provides an execute method that will return an Observable which can be used to subscribe to the response.

A simple query or mutation operation will only publish one value. However, a live query or subscription will publish multiple values, therefore a Promise is not the perfect data structure for this.

The observable returned is compatible with the proposal spec and is easily consumable by libraries such as apollo-client and relay. But also by tools like GraphiQL.

socketIOGraphQLClient.execute({
  operation: /* GraphQL */ `
    query messages {
      id
      content
    }
  `
}, {
  next: console.log,
  error: console.log,
  complete: console.log
});

socketIOGraphQLClient.execute({
  operation: /* GraphQL */ `
    query messages @live {
      id
      content
    }
  `
}, {
  next: console.log,
  error: console.log,
  complete: console.log
});

socketIOGraphQLClient.execute({
  operation: /* GraphQL */ `
    subscription onNewMessage {
      onNewMessage {
        id
        content
      }
    }
  `
}, {
  next: console.log,
  error: console.log,
  complete: console.log
});
Enter fullscreen mode Exit fullscreen mode

GraphiQL fetcher

It is possible to easily consume and display the results of the GraphQL API with our SocketIOGraphQLClient. We just need to pass a custom fetcher to the GraphiQL component.


const fetcher = ({ query: operation, ...restGraphQLParams }) =>
  ({
    subscribe: (
      sinkOrNext,
      ...args
    ) => {
      const sink: Sink =
        typeof sinkOrNext === "function"
          ? { next: sinkOrNext, error: args[0], complete: args[1] }
          : sinkOrNext;

      const unsubscribe = socketIOGraphQLClient.execute(
        {
          operation,
          ...restGraphQLParams,
        },
        sink
      );

      return { unsubscribe };
    },
  });

const CustomGraphiQL = () => (
  <GraphiQL
    fetcher={({ query: operation, ...execRest }) =>
      socketIOGraphQLClient.execute({ operation, ...execRest })
    }
  />
);
Enter fullscreen mode Exit fullscreen mode

Implementation with Relay

Relay is a powerful library for managing the cache client-side. A relay environment (which holds information about the cache and how the data is fetched from the server) can easily be built around a SocketIOGraphQLClient instance.

import { SocketIOGraphQLClient } from "@n1ru4l/socket-io-graphql-client";
import {
  Environment,
  Network,
  RecordSource,
  Store,
  Observable,
  GraphQLResponse,
  RequestParameters,
  Variables,
} from "relay-runtime";

export const createRelayEnvironment = (
  networkInterface: SocketIOGraphQLClient<GraphQLResponse, Error>
) => {
  const execute = (request: RequestParameters, variables: Variables) => {
    if (!request.text) throw new Error("Missing document.");
    const { text: operation, name } = request;

    return Observable.create<GraphQLResponse>((sink) =>
      networkInterface.execute(
        {
          operation,
          variables,
          operationName: name,
        },
        sink
      )
    );
  };

  const network = Network.create(execute, execute);
  const store = attachNotifyGarbageCollectionBehaviourToStore(
    new Store(new RecordSource())
  );

  return new Environment({
    network,
    store,
  });
};
Enter fullscreen mode Exit fullscreen mode

Consuming live data then becomes straight forward:

const ChatApplicationMessagesQuery = graphql`
  query ChatApplication_MessagesQuery @live {
    messages(limit: 10) {
      id
      ...ChatApplication_message
    }
  }
`;

const ChatApplicationMessageRenderer = React.memo(
  ({ message }: { message: ChatApplication_message }) => {
    return (
      <div>
        <div>{message.author.name}</div>
        <div>{message.content}</div>
      </div>
    );
  }
);

const ChatApplicationMessage = createFragmentContainer(
  ChatApplicationMessageRenderer,
  {
    message: graphql`
      fragment ChatApplication_message on Message {
        id
        content
        author {
          id
          name
        }
      }
    `,
  }
);

export const ChatApplication: React.FunctionComponent<{
  relayEnvironment: RelayEnvironment;
}> = (props) => {
  return (
    <QueryRenderer<ChatApplication_MessagesQuery>
      environment={props.relayEnvironment}
      query={ChatApplicationMessagesQuery}
      variables={{}}
      render={({ props }) => {
        if (!props) {
          return null;
        }

        return props.messages.map((message) => (
          <ChatApplicationMessage key={message.id} message={message} />
        ));
      }}
    />
  );
};
Enter fullscreen mode Exit fullscreen mode

See complete example app

Usage with Apollo Client

import { SocketIOGraphQLClient } from "@n1ru4l/socket-io-graphql-client";
import {
  ApolloClient,
  InMemoryCache,
  ApolloLink,
  Operation,
  Observable,
  FetchResult,
  Observable,
} from "@apollo/client";
import { print } from "graphql";

class SocketIOGraphQLApolloLink extends ApolloLink {
  private networkLayer: SocketIOGraphQLClient;
  constructor(networkLayer: SocketIOGraphQLClient) {
    super();
    this.networkLayer = networkLayer;
  }

  public request(operation: Operation): Observable<FetchResult> | null {
    return new Observable((sink) =>
      this.networkLayer.execute({
        operationName: operation.operationName,
        operation: print(operation.query),
        variables: operation.variables,
      })
    );
  }
}

export const createApolloClient = (networkInterface: SocketIOGraphQLClient) => {
  return new ApolloClient({
    link: new SocketIOGraphQLApolloLink(networkInterface),
    cache: new InMemoryCache(),
  });
};
Enter fullscreen mode Exit fullscreen mode

See complete example app

Usage with Urql

import { SocketIOGraphQLClient } from "@n1ru4l/socket-io-graphql-client";
import {
  Client,
  dedupExchange,
  cacheExchange,
  subscriptionExchange,
  ExecutionResult,
} from "urql";

export const createUrqlClient = (
  networkInterface: SocketIOGraphQLClient<ExecutionResult>
) => {
  return new Client({
    url: "noop",
    exchanges: [
      dedupExchange,
      cacheExchange,
      subscriptionExchange({
        forwardSubscription: (operation) => ({
          subscribe: (sink) => ({
            unsubscribe: networkInterface.execute(
              {
                operation: operation.query,
                variables: operation.variables,
              },
              sink
            ),
          }),
        }),
        enableAllOperations: true,
      }),
    ],
  });
};
Enter fullscreen mode Exit fullscreen mode

See complete example app

The Future

This is the first implementation of the live query library. As more people try it out and build projects with it the API can become more mature and flexible for different use-cases.

New LiveQueryStore implementations for distributed systems could be built (e.g. based on Redis PubSub).

The network layer could furthermore be optimized to only transport the updated patch instructions to the client in order to decrease the payload size.

Other people might not use Socket.io. (Hybrid) implementations for other server libraries out there e.g. (apollo-server or express-graphql) must be built.

Note: GraphQL live queries can now be executed with graphql-ws and even express-graphql experimental. Check out GraphQL Bleeding Edge Playground!

Do you have an idea? Feel free to contact me per Twitter, open a GitHub issues or write a comment down below 😊.

Discussion (3)

Collapse
goodnewso profile image
Ogboada Jaja Goodnews

I need an example of using @n1ru4l /socket-io-graphql-server with subscription or using it with graphql-tools.
makeexecutableschema seems to complain about the GraphQLLiveDirective. thanks

Collapse
n1ru4l profile image
Laurin Quast Author • Edited

Could you share what error message you get? Maybe open an issue on the Github repo? ☺️

Collapse
goodnewso profile image
Ogboada Jaja Goodnews

just added it github.com/n1ru4l/graphql-live-que.... please pardon my english

Forem Open with the Forem app