DEV Community

Cover image for Building a Catalog GraphQL API with InversifyJS - Part 4: Subscriptions & Redis
notaphplover
notaphplover

Posted on

Building a Catalog GraphQL API with InversifyJS - Part 4: Subscriptions & Redis

In Part 3, we implemented the core resolvers for Queries and Mutations. In Part 4, we will add real-time capabilities to our API using GraphQL Subscriptions and Redis.

Step 1: Update Docker Compose

We need a Redis instance to handle our PubSub messaging.

Update docker-compose.yml to add the redis service:

      - db:/var/lib/postgresql/data
      - ./posgresql/init_shadow_db.sql:/docker-entrypoint-initdb.d/init_shadow_db.sql

+   redis:
+     image: redis:8.4.0-alpine
+     ports:
+       - '6379:6379'
+     networks:
+       - academy_manager
+
  networks:
    academy_manager:
      external: true
Enter fullscreen mode Exit fullscreen mode

Step 2: Update GraphQL Schema

We need to define the Subscription type in our schema.

Update graphql/schemas/schema.graphql:

  type Mutation {
    createCategory(input: CreateCategoryInput!): Category!
    createProduct(input: CreateProductInput!): Product!
  }
+
+ type Subscription {
+   productAdded: Product!
+ }
Enter fullscreen mode Exit fullscreen mode

Step 3: Redis Infrastructure

We need to set up the infrastructure to communicate with Redis. We'll use ioredis for this.

First, install the necessary dependencies:

pnpm add @inversifyjs/apollo-subscription-ws ioredis
Enter fullscreen mode Exit fullscreen mode

PubSub Channels

Create src/foundation/redis/models/PubSubChannels.ts:

export enum PubSubChannel {
  productAdded = 'product:added',
}
Enter fullscreen mode Exit fullscreen mode

Service Identifiers

We need identifiers for our Redis publisher and subscriber clients.

Create src/foundation/redis/models/ioredisPublisherServiceIdentifier.ts:

import { type ServiceIdentifier } from 'inversify';
import { type Redis } from 'ioredis';

export const ioredisPublisherServiceIdentifier: ServiceIdentifier<Redis> =
  Symbol.for(
    '@inversifyjs/example-catalog-graphql-api/ioredisPublisherService',
  );
Enter fullscreen mode Exit fullscreen mode

Create src/foundation/redis/models/ioredisSubscriberServiceIdentifier.ts:

import { type ServiceIdentifier } from 'inversify';
import { type Redis } from 'ioredis';

export const ioredisSubscriberServiceIdentifier: ServiceIdentifier<Redis> =
  Symbol.for(
    '@inversifyjs/example-catalog-graphql-api/ioredisSubscriberService',
  );
Enter fullscreen mode Exit fullscreen mode

PubSub Service

Now we implement the PubSubService which handles publishing and subscribing to Redis channels.

Create src/foundation/redis/services/PubSubService.ts:

import { inject, injectable } from 'inversify';
import { type Redis } from 'ioredis';

import { ioredisPublisherServiceIdentifier } from '../models/ioredisPublisherServiceIdentifier.js';
import { ioredisSubscriberServiceIdentifier } from '../models/ioredisSubscriberServiceIdentifier.js';

@injectable()
export class PubSubService {
  readonly #publisher: Redis;
  readonly #subscriber: Redis;

  constructor(
    @inject(ioredisPublisherServiceIdentifier)
    publisher: Redis,
    @inject(ioredisSubscriberServiceIdentifier)
    subscriber: Redis,
  ) {
    this.#publisher = publisher;
    this.#subscriber = subscriber;
  }

  public async publish(channel: string, message: unknown): Promise<void> {
    await this.#publisher.publish(channel, JSON.stringify(message));
  }

  public async *subscribe<T>(channel: string): AsyncGenerator<T> {
    await this.#subscriber.subscribe(channel);

    try {
      for await (const [
        receivedChannel,
        message,
      ] of this.#createMessageIterator()) {
        if (receivedChannel === channel) {
          yield JSON.parse(message) as T;
        }
      }
    } finally {
      await this.#subscriber.unsubscribe(channel);
    }
  }

  async *#createMessageIterator(): AsyncGenerator<[string, string]> {
    const messageQueue: Array<[string, string]> = [];
    let resolveNext:
      | ((value: IteratorResult<[string, string]>) => void)
      | null = null;

    const messageHandler: (channel: string, message: string) => void = (
      channel: string,
      message: string,
    ): void => {
      const item: [string, string] = [channel, message];
      if (resolveNext !== null) {
        resolveNext({ done: false, value: item });
        resolveNext = null;
      } else {
        messageQueue.push(item);
      }
    };

    this.#subscriber.on('message', messageHandler);

    try {
      // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
      while (true) {
        if (messageQueue.length > 0) {
          const item: [string, string] | undefined = messageQueue.shift();
          if (item !== undefined) {
            yield item;
          }
        } else {
          await new Promise<IteratorResult<[string, string]>>(
            (resolve: (value: IteratorResult<[string, string]>) => void) => {
              resolveNext = resolve;
            },
          ).then((result: IteratorResult<[string, string]>): void => {
            if (result.done === false) {
              messageQueue.push(result.value);
            }
          });
        }
      }
    } finally {
      this.#subscriber.off('message', messageHandler);
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Redis Container Module

We need to bind the Redis clients and the PubSub service in a container module.

Create src/foundation/redis/modules/IoredisContainerModule.ts:

import 'dotenv/config';

import { ContainerModule, type ContainerModuleLoadOptions } from 'inversify';
import { Redis } from 'ioredis';

import { ioredisPublisherServiceIdentifier } from '../models/ioredisPublisherServiceIdentifier.js';
import { ioredisSubscriberServiceIdentifier } from '../models/ioredisSubscriberServiceIdentifier.js';
import { PubSubService } from '../services/PubSubService.js';

export class IoredisContainerModule extends ContainerModule {
  constructor() {
    super((options: ContainerModuleLoadOptions): void => {
      const redisConfig: { host: string; port: number } = {
        host: process.env['REDIS_HOST'] as string,
        port: parseInt(process.env['REDIS_PORT'] as string, 10),
      };

      options
        .bind(ioredisPublisherServiceIdentifier)
        .toConstantValue(new Redis(redisConfig));

      options
        .bind(ioredisSubscriberServiceIdentifier)
        .toConstantValue(new Redis(redisConfig));

      options.bind(PubSubService).toSelf().inSingletonScope();
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Implement Subscription Resolvers

Now we can implement the SubscriptionResolvers to handle the productAdded subscription.

Create src/app/resolvers/SubscriptionResolvers.ts:

import { inject, injectable } from 'inversify';

import { PubSubChannel } from '../../foundation/redis/models/PubSubChannels.js';
import { PubSubService } from '../../foundation/redis/services/PubSubService.js';
import { Context } from '../../graphql/models/Context.js';
import * as graphqlModels from '../../graphql/models/types.js';

@injectable()
export class SubscriptionResolvers implements graphqlModels.SubscriptionResolvers<Context> {
  public readonly productAdded: graphqlModels.SubscriptionSubscriberObject<
    graphqlModels.ResolverTypeWrapper<graphqlModels.Product>,
    'productAdded',
    unknown,
    Context,
    unknown
  >;

  readonly #pubSubService: PubSubService;

  constructor(
    @inject(PubSubService)
    pubSubService: PubSubService,
  ) {
    this.#pubSubService = pubSubService;

    const pubSubServiceRef: PubSubService = this.#pubSubService;

    this.productAdded = {
      subscribe: async function* (): AsyncGenerator<{
        productAdded: graphqlModels.ResolverTypeWrapper<graphqlModels.Product>;
      }> {
        for await (const product of pubSubServiceRef.subscribe<graphqlModels.Product>(
          PubSubChannel.productAdded,
        )) {
          yield { productAdded: product };
        }
      },
    };
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 5: Update Mutation Resolvers

We need to update the createProduct mutation to publish a message when a product is created.

Update src/app/resolvers/MutationResolvers.ts:

  import { inject, injectable } from 'inversify';
  import {
    type Category as CategoryDb,
    type Product as ProductDb,
  } from '../../../generated/index.js';
  import { CategoryRepository } from '../../category/repositories/CategoryRepository.js';
+ import { PubSubChannel } from '../../foundation/redis/models/PubSubChannels.js';
+ import { PubSubService } from '../../foundation/redis/services/PubSubService.js';
  import { type Context } from '../../graphql/models/Context.js';
  import type * as graphqlModels from '../../graphql/models/types.js';
  import { ProductRepository } from '../../product/repositories/ProductRepository.js';

  @injectable()
  export class MutationResolvers implements graphqlModels.MutationResolvers<Context> {
    readonly #categoryRepository: CategoryRepository;
    readonly #productRepository: ProductRepository;
+   readonly #pubSubService: PubSubService;

    constructor(
      @inject(CategoryRepository)
      categoryRepository: CategoryRepository,
      @inject(ProductRepository)
      productRepository: ProductRepository,
+     @inject(PubSubService)
+     pubSubService: PubSubService,
    ) {
      this.#categoryRepository = categoryRepository;
      this.#productRepository = productRepository;
+     this.#pubSubService = pubSubService;
    }

    public async createCategory(
      _parent: unknown,
      args: graphqlModels.MutationCreateCategoryArgs,
    ): Promise<graphqlModels.Category> {
      const category: CategoryDb = await this.#categoryRepository.createOne(
        args.input.name,
        args.input.slug,
      );

      return {
        id: category.id,
        name: category.name,
        products: {
          edges: [],
          pageInfo: {
            endCursor: null,
            hasNextPage: false,
            hasPreviousPage: false,
            startCursor: null,
            },
            totalCount: 0,
          },
          slug: category.slug,
        };
      }

    public async createProduct(
      _parent: unknown,
      args: graphqlModels.MutationCreateProductArgs,
    ): Promise<graphqlModels.Product> {
      const product: ProductDb = await this.#productRepository.createOne(
        args.input.categoryId,
        args.input.title,
        args.input.description,
        args.input.currency,
        args.input.price,
      );

-     return {
+     const productPayload: graphqlModels.Product = {
        currency: product.currency,
        description: product.description,
        id: product.id,
        price: product.price,
        title: product.title,
      };
+
+     await this.#pubSubService.publish(
+       PubSubChannel.productAdded,
+       productPayload,
+     );
+
+     return productPayload;
    }
  }
Enter fullscreen mode Exit fullscreen mode

Step 6: Update App Container Module

Bind the SubscriptionResolvers in the AppContainerModule.

Update src/app/modules/AppContainerModule.ts:

  import { ContainerModule, type ContainerModuleLoadOptions } from 'inversify';
  import { AppResolvers } from '../resolvers/AppResolvers.js';
  import { MutationResolvers } from '../resolvers/MutationResolvers.js';
  import { QueryResolvers } from '../resolvers/QueryResolvers.js';
+ import { SubscriptionResolvers } from '../resolvers/SubscriptionResolvers.js';

  export class AppContainerModule extends ContainerModule {
    constructor() {
      super((options: ContainerModuleLoadOptions): void => {
        options.bind(AppResolvers).toSelf().inSingletonScope();
        options.bind(MutationResolvers).toSelf().inSingletonScope();
        options.bind(QueryResolvers).toSelf().inSingletonScope();
+       options.bind(SubscriptionResolvers).toSelf().inSingletonScope();
      });
    }
  }
Enter fullscreen mode Exit fullscreen mode

Step 7: Update App Resolvers

Add the Subscription resolver to AppResolvers.

Update src/app/resolvers/AppResolvers.ts:

  import { inject, injectable } from 'inversify';
  import { CategoryResolvers } from '../../category/resolvers/CategoryResolvers.js';
  import { type Context } from '../../graphql/models/Context.js';
  import type * as graphqlModels from '../../graphql/models/types.js';
  import { MutationResolvers } from './MutationResolvers.js';
  import { QueryResolvers } from './QueryResolvers.js';
+ import { SubscriptionResolvers } from './SubscriptionResolvers.js';

  @injectable()
  export class AppResolvers implements Partial<graphqlModels.Resolvers<Context>> {
    public readonly Category: graphqlModels.CategoryResolvers<Context>;
    public readonly Mutation: graphqlModels.MutationResolvers<Context>;
    public readonly Query: graphqlModels.QueryResolvers<Context>;
+   public readonly Subscription: graphqlModels.SubscriptionResolvers<Context>;

    constructor(
      @inject(CategoryResolvers)
      categoryResolvers: CategoryResolvers,
      @inject(MutationResolvers)
      mutationResolvers: MutationResolvers,
      @inject(QueryResolvers)
      queryResolvers: QueryResolvers,
+     @inject(SubscriptionResolvers)
+     subscriptionResolvers: SubscriptionResolvers,
    ) {
      this.Category = categoryResolvers;
      this.Mutation = mutationResolvers;
      this.Query = queryResolvers;
+     this.Subscription = subscriptionResolvers;
    }
  }
Enter fullscreen mode Exit fullscreen mode

Step 8: Update Server Bootstrap

Finally, we need to load the IoredisContainerModule and the ApolloSubscriptionServerContainerModule in our bootstrap script.

Update src/app/scripts/bootstrap.ts:

  import type http from 'node:http';
  import { type ExpressContextFunctionArgument } from '@as-integrations/express5';
  import {
    type InversifyApolloProvider,
    inversifyApolloProviderServiceIdentifier,
  } from '@inversifyjs/apollo-core';
  import { ApolloExpressServerContainerModule } from '@inversifyjs/apollo-express';
+ import { ApolloSubscriptionServerContainerModule } from '@inversifyjs/apollo-subscription-ws';
  import { readSchemas } from '@inversifyjs/graphql-codegen';
  import { InversifyExpressHttpAdapter } from '@inversifyjs/http-express';
  import { Container } from 'inversify';

  import { CategoryContainerModule } from '../../category/modules/CategoryContainerModule.js';
  import { PrismaModule } from '../../foundation/db/modules/PrismaModule.js';
+ import { IoredisContainerModule } from '../../foundation/redis/modules/IoredisContainerModule.js';
  import { type Context } from '../../graphql/models/Context.js';
  import { ProductContainerModule } from '../../product/modules/ProductContainerModule.js';
  import { AppContainerModule } from '../modules/AppContainerModule.js';
  import { AppResolvers } from '../resolvers/AppResolvers.js';

  const PORT: number = 3000;

  const container: Container = new Container();

  await container.load(
    new AppContainerModule(),
    ApolloExpressServerContainerModule.graphServerFromOptions<Context>(
      {
        controllerOptions: {
          path: '',
        },
        getContext: async (
          arg: ExpressContextFunctionArgument,
        ): Promise<Context> => ({
          request: arg.req,
        }),
      },
      {
        resolverServiceIdentifier: AppResolvers,
        typeDefs: await readSchemas({
          glob: {
            patterns: ['./graphql/schemas/**/*.graphql'],
          },
        }),
      },
    ),
+   new ApolloSubscriptionServerContainerModule({
+     path: '/subscriptions',
+   }),
    new CategoryContainerModule(),
+   new IoredisContainerModule(),
    new PrismaModule(),
    new ProductContainerModule(),
  );

  const adapter: InversifyExpressHttpAdapter = new InversifyExpressHttpAdapter(
    container,
  );
Enter fullscreen mode Exit fullscreen mode

Conclusion

We have successfully added real-time capabilities to our API! We can now subscribe to productAdded events and receive updates whenever a new product is created.

Top comments (0)