DEV Community

Artistrator
Artistrator

Posted on

NestJS 마이크로서비스에서 SSE 스트리밍 추상화하기 — Redis Pub/Sub 기반 3-Tier Base Class 설계

NestJS 마이크로서비스에서 SSE 스트리밍 추상화하기 — Redis Pub/Sub 기반 3-Tier Base Class 설계

배경 / 문제 상황

MSA 환경에서 실시간 이벤트를 클라이언트에 전달해야 했다. 우리 서비스는 ECS 위에 여러 인스턴스가 뜨고, 이벤트를 발행하는 마이크로서비스와 SSE를 내려보내는 Gateway가 물리적으로 다른 프로세스다.

문제는 이렇다:

  • 인스턴스 A의 BullMQ Consumer가 작업을 완료하고 이벤트를 발생시킨다
  • 인스턴스 B의 Gateway에 클라이언트가 SSE로 연결되어 있다
  • A에서 발생한 이벤트가 B의 클라이언트에게 도달해야 한다

단순히 EventEmitter로는 불가능하다. 프로세스 경계를 넘어야 하니까.

게다가 SSE 도메인이 점점 늘어나면서 (실시간 편집 세션, 인증 세션, 채팅 등) 매번 Redis 구독, 하트비트, 연결 정리 코드를 반복 작성하고 있었다. 한 곳에서 버그를 고치면 다른 곳에는 여전히 남아있는 상황이 반복됐다.

접근 방법

SSE 연결의 생명주기를 분석하면 도메인과 무관하게 항상 같은 흐름이 반복된다:

연결 → 세션 시작 → Redis 구독 → 이벤트 수신/전송 → 하트비트 → 연결 종료 → 정리
Enter fullscreen mode Exit fullscreen mode

이 흐름을 3개의 추상 클래스로 분리했다:

클래스 책임 위치
BaseSsePublisher Redis Pub/Sub 채널에 이벤트 발행 마이크로서비스
BaseSseSubscriber Redis 구독 + SSE 응답 스트리밍 Gateway
BaseSseOrchestrator 연결 생명주기 전체 조율 Gateway

핵심 설계 원칙은 "도메인 로직만 구현하면 나머지는 Base가 처리한다"였다.

구현

1. Publisher — 이벤트 발행의 단일 진입점

export abstract class BaseSsePublisher<TEvent = any> {
  protected abstract readonly channelPrefix: string;

  constructor(
    protected readonly redisService: RedisService,
    protected readonly logger: Logger,
  ) {}

  protected abstract getChannelName(...keys: string[]): string;

  protected async publishEvent(
    channel: string,
    eventType: string,
    data: TEvent,
  ): Promise<void> {
    const message = JSON.stringify({ type: eventType, ...data });
    await this.redisService.publish(channel, message);
  }

  createSubscriber() {
    return this.redisService.createSubscriber();
  }
}
Enter fullscreen mode Exit fullscreen mode

마이크로서비스에서 BullMQ Handler가 작업을 완료하면 Publisher를 호출한다. Redis Pub/Sub이므로 어떤 인스턴스에서 발행하든, 구독 중인 모든 Gateway 인스턴스가 수신한다.

// BullMQ Handler 내부
await this.ssePublisher.publishStatusChanged(tenantId, orderCode, {
  status: 'COMPLETED',
});
Enter fullscreen mode Exit fullscreen mode

2. Subscriber — SSE 응답 스트리밍의 핵심

여기가 가장 까다로운 부분이다. Express Response 객체를 직접 다루면서 Redis 구독, 하트비트, 연결 끊김 감지를 모두 처리해야 한다.

export abstract class BaseSseSubscriber<TEvent = any> {
  protected subscriber: RedisSubscriber;
  protected isClosed = false;
  protected heartbeatInterval?: NodeJS.Timeout;
  protected onWriteFailureCallback?: () => void;

  constructor(
    protected readonly res: Response,
    protected readonly redisService: RedisService,
    protected readonly channel: string,
    protected readonly enableHeartbeat: boolean = false,
    protected readonly heartbeatIntervalMs: number = 30000,
  ) {
    this.subscriber = this.redisService.createSubscriber();
  }

  setupHeaders(): void {
    this.res.setHeader('Content-Type', 'text/event-stream');
    this.res.setHeader('Cache-Control', 'no-cache');
    this.res.setHeader('Connection', 'keep-alive');
    this.res.setHeader('X-Accel-Buffering', 'no'); // Nginx 버퍼링 비활성화
    this.res.flushHeaders();
  }

  // 서브클래스가 이벤트 타입별 분기만 구현
  protected abstract handleMessage(data: TEvent): void;
  abstract sendConnected(data?: any): void;
}
Enter fullscreen mode Exit fullscreen mode

X-Accel-Buffering: no가 빠지면 Nginx 뒤에서 이벤트가 지연된다. 실제로 이것 때문에 프로덕션에서 이벤트가 30초씩 밀리는 이슈를 겪었다. Base 클래스에 넣어둔 이유다.

Write Failure 감지

SSE에서 가장 흔한 버그 중 하나가 "이미 끊긴 연결에 계속 쓰는 것"이다. res.write()의 반환값으로 이를 감지한다:

sendEvent(event: string, data: string | object): void {
  if (this.isClosed) return;
  const message = typeof data === 'string' ? data : JSON.stringify(data);
  try {
    const success = this.res.write(`event: ${event}\n`);
    this.res.write(`data: ${message}\n\n`);
    if (success === false) {
      // 버퍼가 가득 찼거나 연결이 끊김
      this.onWriteFailureCallback?.();
    }
  } catch (error) {
    this.onWriteFailureCallback?.();
  }
}
Enter fullscreen mode Exit fullscreen mode

res.write()false를 반환하면 TCP 버퍼가 가득 찬 것이다. 대부분 클라이언트가 이미 떠났다는 의미다. 이 콜백으로 세션 상태를 즉시 업데이트할 수 있다.

리소스 정리 — 중복 호출 방지

protected cleanup(): void {
  if (this.isClosed) return; // 가드
  this.isClosed = true;

  if (this.heartbeatInterval) {
    clearInterval(this.heartbeatInterval);
  }

  this.subscriber.unsubscribe().catch(() => {});
  this.subscriber.quit().catch(() => {});
}
Enter fullscreen mode Exit fullscreen mode

isClosed 플래그가 없으면 res.on('close')와 에러 핸들러가 동시에 cleanup을 호출하면서 Redis unsubscribe가 두 번 날아간다.

3. Orchestrator — 생명주기 조율

Orchestrator는 "세션 시작 → SSE 설정 → 이벤트 등록 → 종료 처리"라는 전체 흐름을 하나의 start() 메서드로 캡슐화한다:

export abstract class BaseSseOrchestrator<
  TSubscriber extends BaseSseSubscriber,
  TStartResult = void,
> {
  protected sessionStarted = false;

  protected abstract createSseSubscriber(): TSubscriber;
  protected abstract startSession(): Promise<TStartResult>;
  protected abstract endSession(): Promise<void>;
  protected abstract getConnectedData(result: TStartResult): any;

  async start(): Promise<void> {
    try {
      const sessionResult = await this.startSession();
      this.sessionStarted = true;

      this.sse.setupHeaders();
      await this.sse.subscribe();
      this.registerEventHandlers();
      this.sse.sendConnected(this.getConnectedData(sessionResult));
      this.sse.startHeartbeat();
    } catch (error) {
      this.handleError(error);
    }
  }

  protected async cleanup(): Promise<void> {
    if (this.sessionStarted) {
      await this.endSession();
    }
    this.sse.close();
  }
}
Enter fullscreen mode Exit fullscreen mode

sessionStarted 플래그가 중요하다. startSession()에서 예외가 발생하면 endSession()을 호출하면 안 된다. 시작하지도 않은 세션을 종료하면 다른 사용자의 세션을 건드릴 수 있다.

에러 발생 시에도 SSE 프로토콜을 지킨다. JSON을 직접 내려보내는 게 아니라 SSE 포맷으로 에러 이벤트를 전송한다:

protected handleError(error: unknown): void {
  this.res.setHeader('Content-Type', 'text/event-stream');
  this.res.flushHeaders();
  this.res.write(
    `event: error\ndata: ${JSON.stringify({
      error: true,
      message: error instanceof Error ? error.message : 'Connection failed',
    })}\n\n`,
  );
  this.res.end();
}
Enter fullscreen mode Exit fullscreen mode

실제 사용 — 도메인 구현체

새로운 SSE 도메인을 추가할 때 구현해야 할 것:

// 1. Publisher (마이크로서비스)
class OrderSsePublisher extends BaseSsePublisher<OrderEvent> {
  protected readonly channelPrefix = 'order:status';
  protected getChannelName(tenantId: string, orderId: string) {
    return `${this.channelPrefix}:${tenantId}:${orderId}`;
  }
}

// 2. Subscriber (Gateway)
class OrderSseSubscriber extends BaseSseSubscriber<OrderEvent> {
  protected handleMessage(data: OrderEvent): void {
    this.sendEvent(data.type, data);
  }
  sendConnected(data: any): void {
    this.sendEvent('CONNECTED', data);
  }
}

// 3. Orchestrator (Gateway)
class OrderSseOrchestrator extends BaseSseOrchestrator<OrderSseSubscriber> {
  protected createSseSubscriber() { return new OrderSseSubscriber(...); }
  protected async startSession() { /* TCP로 마이크로서비스 호출 */ }
  protected async endSession() { /* 세션 정리 */ }
  protected getConnectedData(result) { return { orderId: result.id }; }
}
Enter fullscreen mode Exit fullscreen mode

잊기 쉬운 함정: Compression 미들웨어

SSE 라우트는 반드시 compression 미들웨어에서 제외해야 한다:

// gateway.module.ts
compression({
  filter: (req: Request) => {
    if (req.path === '/order/sse/subscribe') {
      return false; // 압축 비활성화
    }
    return compression.filter(req, req.res);
  },
})
Enter fullscreen mode Exit fullscreen mode

compression이 켜져 있으면 응답 버퍼링이 발생해서 이벤트가 실시간으로 내려가지 않는다. 이것도 Base 수준에서 해결할 수 없는 문제라 문서화해뒀다.

결과 / 배운 점

이 구조를 적용한 뒤 새로운 SSE 도메인을 추가하는 데 걸리는 시간이 크게 줄었다. 하트비트, 연결 정리, Write failure 감지 같은 보일러플레이트를 신경 쓸 필요가 없다.

핵심 교훈:

  • SSE는 "보내기"보다 "정리"가 어렵다. isClosed 가드, sessionStarted 플래그 같은 방어 코드가 프로덕션 안정성을 결정한다
  • res.write()의 반환값을 무시하면 좀비 연결이 쌓인다. 반드시 체크해야 한다
  • Redis Pub/Sub은 다중 인스턴스 SSE의 가장 실용적인 해법이다. Kafka나 RabbitMQ는 이 용도엔 과하다
  • X-Accel-Buffering: no와 compression 제외는 코드 리뷰에서 빠지기 쉬운데, 빠지면 프로덕션에서 바로 장애가 된다

AI 활용 포인트

Claude Code로 3개의 Base 클래스를 동시에 리팩토링하면서 기존 구현체와의 호환성을 검증했다. 특히 제네릭 타입 파라미터 설계(BaseSseOrchestrator<TSubscriber, TStartResult>)에서 여러 도메인의 사용 패턴을 한꺼번에 분석해 최적의 인터페이스를 도출하는 데 AI가 효과적이었다.

Top comments (0)