In Part 3, we implemented the core resolvers for Queries and Mutations. In Part 4, we will add real-time capabilities to our API using GraphQL Subscriptions and Redis.
Step 1: Update Docker Compose
We need a Redis instance to handle our PubSub messaging.
Update docker-compose.yml to add the redis service:
- db:/var/lib/postgresql/data
- ./posgresql/init_shadow_db.sql:/docker-entrypoint-initdb.d/init_shadow_db.sql
+ redis:
+ image: redis:8.4.0-alpine
+ ports:
+ - '6379:6379'
+ networks:
+ - academy_manager
+
networks:
academy_manager:
external: true
Step 2: Update GraphQL Schema
We need to define the Subscription type in our schema.
Update graphql/schemas/schema.graphql:
type Mutation {
createCategory(input: CreateCategoryInput!): Category!
createProduct(input: CreateProductInput!): Product!
}
+
+ type Subscription {
+ productAdded: Product!
+ }
Step 3: Redis Infrastructure
We need to set up the infrastructure to communicate with Redis. We'll use ioredis for this.
First, install the necessary dependencies:
pnpm add @inversifyjs/apollo-subscription-ws ioredis
PubSub Channels
Create src/foundation/redis/models/PubSubChannels.ts:
export enum PubSubChannel {
productAdded = 'product:added',
}
Service Identifiers
We need identifiers for our Redis publisher and subscriber clients.
Create src/foundation/redis/models/ioredisPublisherServiceIdentifier.ts:
import { type ServiceIdentifier } from 'inversify';
import { type Redis } from 'ioredis';
export const ioredisPublisherServiceIdentifier: ServiceIdentifier<Redis> =
Symbol.for(
'@inversifyjs/example-catalog-graphql-api/ioredisPublisherService',
);
Create src/foundation/redis/models/ioredisSubscriberServiceIdentifier.ts:
import { type ServiceIdentifier } from 'inversify';
import { type Redis } from 'ioredis';
export const ioredisSubscriberServiceIdentifier: ServiceIdentifier<Redis> =
Symbol.for(
'@inversifyjs/example-catalog-graphql-api/ioredisSubscriberService',
);
PubSub Service
Now we implement the PubSubService which handles publishing and subscribing to Redis channels.
Create src/foundation/redis/services/PubSubService.ts:
import { inject, injectable } from 'inversify';
import { type Redis } from 'ioredis';
import { ioredisPublisherServiceIdentifier } from '../models/ioredisPublisherServiceIdentifier.js';
import { ioredisSubscriberServiceIdentifier } from '../models/ioredisSubscriberServiceIdentifier.js';
@injectable()
export class PubSubService {
readonly #publisher: Redis;
readonly #subscriber: Redis;
constructor(
@inject(ioredisPublisherServiceIdentifier)
publisher: Redis,
@inject(ioredisSubscriberServiceIdentifier)
subscriber: Redis,
) {
this.#publisher = publisher;
this.#subscriber = subscriber;
}
public async publish(channel: string, message: unknown): Promise<void> {
await this.#publisher.publish(channel, JSON.stringify(message));
}
public async *subscribe<T>(channel: string): AsyncGenerator<T> {
await this.#subscriber.subscribe(channel);
try {
for await (const [
receivedChannel,
message,
] of this.#createMessageIterator()) {
if (receivedChannel === channel) {
yield JSON.parse(message) as T;
}
}
} finally {
await this.#subscriber.unsubscribe(channel);
}
}
async *#createMessageIterator(): AsyncGenerator<[string, string]> {
const messageQueue: Array<[string, string]> = [];
let resolveNext:
| ((value: IteratorResult<[string, string]>) => void)
| null = null;
const messageHandler: (channel: string, message: string) => void = (
channel: string,
message: string,
): void => {
const item: [string, string] = [channel, message];
if (resolveNext !== null) {
resolveNext({ done: false, value: item });
resolveNext = null;
} else {
messageQueue.push(item);
}
};
this.#subscriber.on('message', messageHandler);
try {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
if (messageQueue.length > 0) {
const item: [string, string] | undefined = messageQueue.shift();
if (item !== undefined) {
yield item;
}
} else {
await new Promise<IteratorResult<[string, string]>>(
(resolve: (value: IteratorResult<[string, string]>) => void) => {
resolveNext = resolve;
},
).then((result: IteratorResult<[string, string]>): void => {
if (result.done === false) {
messageQueue.push(result.value);
}
});
}
}
} finally {
this.#subscriber.off('message', messageHandler);
}
}
}
Redis Container Module
We need to bind the Redis clients and the PubSub service in a container module.
Create src/foundation/redis/modules/IoredisContainerModule.ts:
import 'dotenv/config';
import { ContainerModule, type ContainerModuleLoadOptions } from 'inversify';
import { Redis } from 'ioredis';
import { ioredisPublisherServiceIdentifier } from '../models/ioredisPublisherServiceIdentifier.js';
import { ioredisSubscriberServiceIdentifier } from '../models/ioredisSubscriberServiceIdentifier.js';
import { PubSubService } from '../services/PubSubService.js';
export class IoredisContainerModule extends ContainerModule {
constructor() {
super((options: ContainerModuleLoadOptions): void => {
const redisConfig: { host: string; port: number } = {
host: process.env['REDIS_HOST'] as string,
port: parseInt(process.env['REDIS_PORT'] as string, 10),
};
options
.bind(ioredisPublisherServiceIdentifier)
.toConstantValue(new Redis(redisConfig));
options
.bind(ioredisSubscriberServiceIdentifier)
.toConstantValue(new Redis(redisConfig));
options.bind(PubSubService).toSelf().inSingletonScope();
});
}
}
Step 4: Implement Subscription Resolvers
Now we can implement the SubscriptionResolvers to handle the productAdded subscription.
Create src/app/resolvers/SubscriptionResolvers.ts:
import { inject, injectable } from 'inversify';
import { PubSubChannel } from '../../foundation/redis/models/PubSubChannels.js';
import { PubSubService } from '../../foundation/redis/services/PubSubService.js';
import { Context } from '../../graphql/models/Context.js';
import * as graphqlModels from '../../graphql/models/types.js';
@injectable()
export class SubscriptionResolvers implements graphqlModels.SubscriptionResolvers<Context> {
public readonly productAdded: graphqlModels.SubscriptionSubscriberObject<
graphqlModels.ResolverTypeWrapper<graphqlModels.Product>,
'productAdded',
unknown,
Context,
unknown
>;
readonly #pubSubService: PubSubService;
constructor(
@inject(PubSubService)
pubSubService: PubSubService,
) {
this.#pubSubService = pubSubService;
const pubSubServiceRef: PubSubService = this.#pubSubService;
this.productAdded = {
subscribe: async function* (): AsyncGenerator<{
productAdded: graphqlModels.ResolverTypeWrapper<graphqlModels.Product>;
}> {
for await (const product of pubSubServiceRef.subscribe<graphqlModels.Product>(
PubSubChannel.productAdded,
)) {
yield { productAdded: product };
}
},
};
}
}
Step 5: Update Mutation Resolvers
We need to update the createProduct mutation to publish a message when a product is created.
Update src/app/resolvers/MutationResolvers.ts:
import { inject, injectable } from 'inversify';
import {
type Category as CategoryDb,
type Product as ProductDb,
} from '../../../generated/index.js';
import { CategoryRepository } from '../../category/repositories/CategoryRepository.js';
+ import { PubSubChannel } from '../../foundation/redis/models/PubSubChannels.js';
+ import { PubSubService } from '../../foundation/redis/services/PubSubService.js';
import { type Context } from '../../graphql/models/Context.js';
import type * as graphqlModels from '../../graphql/models/types.js';
import { ProductRepository } from '../../product/repositories/ProductRepository.js';
@injectable()
export class MutationResolvers implements graphqlModels.MutationResolvers<Context> {
readonly #categoryRepository: CategoryRepository;
readonly #productRepository: ProductRepository;
+ readonly #pubSubService: PubSubService;
constructor(
@inject(CategoryRepository)
categoryRepository: CategoryRepository,
@inject(ProductRepository)
productRepository: ProductRepository,
+ @inject(PubSubService)
+ pubSubService: PubSubService,
) {
this.#categoryRepository = categoryRepository;
this.#productRepository = productRepository;
+ this.#pubSubService = pubSubService;
}
public async createCategory(
_parent: unknown,
args: graphqlModels.MutationCreateCategoryArgs,
): Promise<graphqlModels.Category> {
const category: CategoryDb = await this.#categoryRepository.createOne(
args.input.name,
args.input.slug,
);
return {
id: category.id,
name: category.name,
products: {
edges: [],
pageInfo: {
endCursor: null,
hasNextPage: false,
hasPreviousPage: false,
startCursor: null,
},
totalCount: 0,
},
slug: category.slug,
};
}
public async createProduct(
_parent: unknown,
args: graphqlModels.MutationCreateProductArgs,
): Promise<graphqlModels.Product> {
const product: ProductDb = await this.#productRepository.createOne(
args.input.categoryId,
args.input.title,
args.input.description,
args.input.currency,
args.input.price,
);
- return {
+ const productPayload: graphqlModels.Product = {
currency: product.currency,
description: product.description,
id: product.id,
price: product.price,
title: product.title,
};
+
+ await this.#pubSubService.publish(
+ PubSubChannel.productAdded,
+ productPayload,
+ );
+
+ return productPayload;
}
}
Step 6: Update App Container Module
Bind the SubscriptionResolvers in the AppContainerModule.
Update src/app/modules/AppContainerModule.ts:
import { ContainerModule, type ContainerModuleLoadOptions } from 'inversify';
import { AppResolvers } from '../resolvers/AppResolvers.js';
import { MutationResolvers } from '../resolvers/MutationResolvers.js';
import { QueryResolvers } from '../resolvers/QueryResolvers.js';
+ import { SubscriptionResolvers } from '../resolvers/SubscriptionResolvers.js';
export class AppContainerModule extends ContainerModule {
constructor() {
super((options: ContainerModuleLoadOptions): void => {
options.bind(AppResolvers).toSelf().inSingletonScope();
options.bind(MutationResolvers).toSelf().inSingletonScope();
options.bind(QueryResolvers).toSelf().inSingletonScope();
+ options.bind(SubscriptionResolvers).toSelf().inSingletonScope();
});
}
}
Step 7: Update App Resolvers
Add the Subscription resolver to AppResolvers.
Update src/app/resolvers/AppResolvers.ts:
import { inject, injectable } from 'inversify';
import { CategoryResolvers } from '../../category/resolvers/CategoryResolvers.js';
import { type Context } from '../../graphql/models/Context.js';
import type * as graphqlModels from '../../graphql/models/types.js';
import { MutationResolvers } from './MutationResolvers.js';
import { QueryResolvers } from './QueryResolvers.js';
+ import { SubscriptionResolvers } from './SubscriptionResolvers.js';
@injectable()
export class AppResolvers implements Partial<graphqlModels.Resolvers<Context>> {
public readonly Category: graphqlModels.CategoryResolvers<Context>;
public readonly Mutation: graphqlModels.MutationResolvers<Context>;
public readonly Query: graphqlModels.QueryResolvers<Context>;
+ public readonly Subscription: graphqlModels.SubscriptionResolvers<Context>;
constructor(
@inject(CategoryResolvers)
categoryResolvers: CategoryResolvers,
@inject(MutationResolvers)
mutationResolvers: MutationResolvers,
@inject(QueryResolvers)
queryResolvers: QueryResolvers,
+ @inject(SubscriptionResolvers)
+ subscriptionResolvers: SubscriptionResolvers,
) {
this.Category = categoryResolvers;
this.Mutation = mutationResolvers;
this.Query = queryResolvers;
+ this.Subscription = subscriptionResolvers;
}
}
Step 8: Update Server Bootstrap
Finally, we need to load the IoredisContainerModule and the ApolloSubscriptionServerContainerModule in our bootstrap script.
Update src/app/scripts/bootstrap.ts:
import type http from 'node:http';
import { type ExpressContextFunctionArgument } from '@as-integrations/express5';
import {
type InversifyApolloProvider,
inversifyApolloProviderServiceIdentifier,
} from '@inversifyjs/apollo-core';
import { ApolloExpressServerContainerModule } from '@inversifyjs/apollo-express';
+ import { ApolloSubscriptionServerContainerModule } from '@inversifyjs/apollo-subscription-ws';
import { readSchemas } from '@inversifyjs/graphql-codegen';
import { InversifyExpressHttpAdapter } from '@inversifyjs/http-express';
import { Container } from 'inversify';
import { CategoryContainerModule } from '../../category/modules/CategoryContainerModule.js';
import { PrismaModule } from '../../foundation/db/modules/PrismaModule.js';
+ import { IoredisContainerModule } from '../../foundation/redis/modules/IoredisContainerModule.js';
import { type Context } from '../../graphql/models/Context.js';
import { ProductContainerModule } from '../../product/modules/ProductContainerModule.js';
import { AppContainerModule } from '../modules/AppContainerModule.js';
import { AppResolvers } from '../resolvers/AppResolvers.js';
const PORT: number = 3000;
const container: Container = new Container();
await container.load(
new AppContainerModule(),
ApolloExpressServerContainerModule.graphServerFromOptions<Context>(
{
controllerOptions: {
path: '',
},
getContext: async (
arg: ExpressContextFunctionArgument,
): Promise<Context> => ({
request: arg.req,
}),
},
{
resolverServiceIdentifier: AppResolvers,
typeDefs: await readSchemas({
glob: {
patterns: ['./graphql/schemas/**/*.graphql'],
},
}),
},
),
+ new ApolloSubscriptionServerContainerModule({
+ path: '/subscriptions',
+ }),
new CategoryContainerModule(),
+ new IoredisContainerModule(),
new PrismaModule(),
new ProductContainerModule(),
);
const adapter: InversifyExpressHttpAdapter = new InversifyExpressHttpAdapter(
container,
);
Conclusion
We have successfully added real-time capabilities to our API! We can now subscribe to productAdded events and receive updates whenever a new product is created.
Top comments (0)