DEV Community

Archist
Archist

Posted on • Edited on

NestJS 마이크로서비스에서 SSE 스트리밍 추상화하기 — Redis Pub/Sub 기반 3-Tier Base Class 설계

NestJS 마이크로서비스에서 SSE 스트리밍 추상화하기 — Redis Pub/Sub 기반 3-Tier Base Class 설계

배경 / 문제 상황

MSA 환경에서 실시간 이벤트를 클라이언트에 전달해야 했다. 우리 서비스는 ECS 위에 여러 인스턴스가 뜨고, 이벤트를 발행하는 마이크로서비스와 SSE를 내려보내는 Gateway가 물리적으로 다른 프로세스다.

문제는 이렇다:

  • 인스턴스 A의 BullMQ Consumer가 작업을 완료하고 이벤트를 발생시킨다
  • 인스턴스 B의 Gateway에 클라이언트가 SSE로 연결되어 있다
  • A에서 발생한 이벤트가 B의 클라이언트에게 도달해야 한다

단순히 EventEmitter로는 불가능하다. 프로세스 경계를 넘어야 하니까.

게다가 SSE 도메인이 점점 늘어나면서 (실시간 편집 세션, 인증 세션, 채팅 등) 매번 Redis 구독, 하트비트, 연결 정리 코드를 반복 작성하고 있었다. 한 곳에서 버그를 고치면 다른 곳에는 여전히 남아있는 상황이 반복됐다.

CTO 관점에서의 설계 판단: SSE vs WebSocket vs Long Polling을 비교했다. WebSocket은 양방향 통신이 필요 없는 우리 케이스에서 과하고, ALB WebSocket 연결 유지 비용도 부담이었다. Long Polling은 지연이 불가피했다. SSE는 HTTP/1.1 기반으로 인프라 변경 없이 적용 가능하고, 단방향 실시간 전달에 최적화되어 있어 선택했다.


접근 방법

SSE 연결의 생명주기를 분석하면 도메인과 무관하게 항상 같은 흐름이 반복된다:

연결 → 세션 시작 → Redis 구독 → 이벤트 수신/전송 → 하트비트 → 연결 종료 → 정리
Enter fullscreen mode Exit fullscreen mode

이 흐름을 3개의 추상 클래스로 분리했다:

클래스 책임 위치 도메인이 구현할 것
BaseSsePublisher Redis Pub/Sub 채널에 이벤트 발행 마이크로서비스 getChannelName(), 이벤트 발행 메서드
BaseSseSubscriber Redis 구독 + SSE 응답 스트리밍 Gateway handleMessage(), sendConnected()
BaseSseOrchestrator 연결 생명주기 전체 조율 Gateway startSession(), endSession()

핵심 설계 원칙은 "도메인 로직만 구현하면 나머지는 Base가 처리한다"였다.

트레이드오프: 상속 기반 설계는 컴포지션보다 유연성이 떨어진다. 하지만 SSE의 생명주기가 모든 도메인에서 동일한 순서로 실행되어야 하는 제약이 있어서, Template Method 패턴이 적합하다고 판단했다. 잘못된 순서로 호출하는 실수를 컴파일 타임에 방지할 수 있다.


구현

1. Publisher — 이벤트 발행의 단일 진입점

export abstract class BaseSsePublisher<TEvent = any> {
  protected abstract readonly channelPrefix: string;

  constructor(
    protected readonly redisService: RedisService,
    protected readonly logger: Logger,
  ) {}

  protected abstract getChannelName(...keys: string[]): string;

  protected async publishEvent(
    channel: string,
    eventType: string,
    data: TEvent,
  ): Promise<void> {
    const message = JSON.stringify({ type: eventType, ...data });
    await this.redisService.publish(channel, message);
  }

  createSubscriber() {
    return this.redisService.createSubscriber();
  }
}
Enter fullscreen mode Exit fullscreen mode

마이크로서비스에서 BullMQ Handler가 작업을 완료하면 Publisher를 호출한다. Redis Pub/Sub이므로 어떤 인스턴스에서 발행하든, 구독 중인 모든 Gateway 인스턴스가 수신한다.

// BullMQ Handler 내부
await this.ssePublisher.publishStatusChanged(tenantId, orderCode, {
  status: 'COMPLETED',
});
Enter fullscreen mode Exit fullscreen mode

2. Subscriber — SSE 응답 스트리밍의 핵심

여기가 가장 까다로운 부분이다. Express Response 객체를 직접 다루면서 Redis 구독, 하트비트, 연결 끊김 감지를 모두 처리해야 한다.

export abstract class BaseSseSubscriber<TEvent = any> {
  protected subscriber: RedisSubscriber;
  protected isClosed = false;
  protected heartbeatInterval?: NodeJS.Timeout;
  protected onWriteFailureCallback?: () => void;

  constructor(
    protected readonly res: Response,
    protected readonly redisService: RedisService,
    protected readonly channel: string,
    protected readonly enableHeartbeat: boolean = false,
    protected readonly heartbeatIntervalMs: number = 30000,
  ) {
    this.subscriber = this.redisService.createSubscriber();
  }

  setupHeaders(): void {
    this.res.setHeader('Content-Type', 'text/event-stream');
    this.res.setHeader('Cache-Control', 'no-cache');
    this.res.setHeader('Connection', 'keep-alive');
    this.res.setHeader('X-Accel-Buffering', 'no'); // Nginx 버퍼링 비활성화
    this.res.flushHeaders();
  }

  // 서브클래스가 이벤트 타입별 분기만 구현
  protected abstract handleMessage(data: TEvent): void;
  abstract sendConnected(data?: any): void;
}
Enter fullscreen mode Exit fullscreen mode

X-Accel-Buffering: no가 빠지면 Nginx 뒤에서 이벤트가 지연된다. 실제로 이것 때문에 프로덕션에서 이벤트가 30초씩 밀리는 이슈를 겪었다. Base 클래스에 넣어둔 이유다.


Write Failure 감지

SSE에서 가장 흔한 버그 중 하나가 "이미 끊긴 연결에 계속 쓰는 것"이다. res.write()의 반환값으로 이를 감지한다:

sendEvent(event: string, data: string | object): void {
  if (this.isClosed) return;
  const message = typeof data === 'string' ? data : JSON.stringify(data);
  try {
    const success = this.res.write(`event: ${event}\n`);
    this.res.write(`data: ${message}\n\n`);
    if (success === false) {
      // 버퍼가 가득 찼거나 연결이 끊김
      this.onWriteFailureCallback?.();
    }
  } catch (error) {
    this.onWriteFailureCallback?.();
  }
}
Enter fullscreen mode Exit fullscreen mode

res.write()false를 반환하면 TCP 버퍼가 가득 찬 것이다. 대부분 클라이언트가 이미 떠났다는 의미다. 이 콜백으로 세션 상태를 즉시 업데이트할 수 있다.


리소스 정리 — 중복 호출 방지

protected cleanup(): void {
  if (this.isClosed) return; // 가드
  this.isClosed = true;

  if (this.heartbeatInterval) {
    clearInterval(this.heartbeatInterval);
  }

  this.subscriber.unsubscribe().catch(() => {});
  this.subscriber.quit().catch(() => {});
}
Enter fullscreen mode Exit fullscreen mode

isClosed 플래그가 없으면 res.on('close')와 에러 핸들러가 동시에 cleanup을 호출하면서 Redis unsubscribe가 두 번 날아간다.

프로덕션에서 배운 것: cleanup에서 .catch(() => {})로 에러를 무시하는 것은 편의상 그렇게 했지만, 최소한 로깅은 해야 한다. Redis 연결 문제가 cleanup에서 발생하면 디버깅 단서가 완전히 사라진다.


3. Orchestrator — 생명주기 조율

Orchestrator는 "세션 시작 → SSE 설정 → 이벤트 등록 → 종료 처리"라는 전체 흐름을 하나의 start() 메서드로 캡슐화한다:

export abstract class BaseSseOrchestrator<
  TSubscriber extends BaseSseSubscriber,
  TStartResult = void,
> {
  protected sessionStarted = false;

  protected abstract createSseSubscriber(): TSubscriber;
  protected abstract startSession(): Promise<TStartResult>;
  protected abstract endSession(): Promise<void>;
  protected abstract getConnectedData(result: TStartResult): any;

  async start(): Promise<void> {
    try {
      const sessionResult = await this.startSession();
      this.sessionStarted = true;

      this.sse.setupHeaders();
      await this.sse.subscribe();
      this.registerEventHandlers();
      this.sse.sendConnected(this.getConnectedData(sessionResult));
      this.sse.startHeartbeat();
    } catch (error) {
      this.handleError(error);
    }
  }

  protected async cleanup(): Promise<void> {
    if (this.sessionStarted) {
      await this.endSession();
    }
    this.sse.close();
  }
}
Enter fullscreen mode Exit fullscreen mode

sessionStarted 플래그가 중요하다. startSession()에서 예외가 발생하면 endSession()을 호출하면 안 된다. 시작하지도 않은 세션을 종료하면 다른 사용자의 세션을 건드릴 수 있다.

에러 발생 시에도 SSE 프로토콜을 지킨다. JSON을 직접 내려보내는 게 아니라 SSE 포맷으로 에러 이벤트를 전송한다:

protected handleError(error: unknown): void {
  this.res.setHeader('Content-Type', 'text/event-stream');
  this.res.flushHeaders();
  this.res.write(
    `event: error\ndata: ${JSON.stringify({
      error: true,
      message: error instanceof Error ? error.message : 'Connection failed',
    })}\n\n`,
  );
  this.res.end();
}
Enter fullscreen mode Exit fullscreen mode

실제 사용 — 도메인 구현체

새로운 SSE 도메인을 추가할 때 구현해야 할 것:

// 1. Publisher (마이크로서비스)
class OrderSsePublisher extends BaseSsePublisher<OrderEvent> {
  protected readonly channelPrefix = 'order:status';
  protected getChannelName(tenantId: string, orderId: string) {
    return `${this.channelPrefix}:${tenantId}:${orderId}`;
  }
}

// 2. Subscriber (Gateway)
class OrderSseSubscriber extends BaseSseSubscriber<OrderEvent> {
  protected handleMessage(data: OrderEvent): void {
    this.sendEvent(data.type, data);
  }
  sendConnected(data: any): void {
    this.sendEvent('CONNECTED', data);
  }
}

// 3. Orchestrator (Gateway)
class OrderSseOrchestrator extends BaseSseOrchestrator<OrderSseSubscriber> {
  protected createSseSubscriber() { return new OrderSseSubscriber(...); }
  protected async startSession() { /* TCP로 마이크로서비스 호출 */ }
  protected async endSession() { /* 세션 정리 */ }
  protected getConnectedData(result) { return { orderId: result.id }; }
}
Enter fullscreen mode Exit fullscreen mode

새 도메인 추가 시 작성하는 코드 vs Base가 처리하는 코드:

항목 도메인이 구현 Base가 처리
채널명 생성 O -
이벤트 타입별 분기 O -
세션 시작/종료 O -
Redis 구독/해제 - O
SSE 헤더 설정 - O
하트비트 - O
Write failure 감지 - O
연결 끊김 정리 - O
에러 핸들링 - O

잊기 쉬운 함정: Compression 미들웨어

SSE 라우트는 반드시 compression 미들웨어에서 제외해야 한다:

// gateway.module.ts
compression({
  filter: (req: Request) => {
    if (req.path === '/order/sse/subscribe') {
      return false; // 압축 비활성화
    }
    return compression.filter(req, req.res);
  },
})
Enter fullscreen mode Exit fullscreen mode

compression이 켜져 있으면 응답 버퍼링이 발생해서 이벤트가 실시간으로 내려가지 않는다. 이것도 Base 수준에서 해결할 수 없는 문제라 문서화해뒀다.


결과 / 배운 점

이 구조를 적용한 뒤 새로운 SSE 도메인을 추가하는 데 걸리는 시간이 크게 줄었다. 하트비트, 연결 정리, Write failure 감지 같은 보일러플레이트를 신경 쓸 필요가 없다.

핵심 교훈:

  • SSE는 "보내기"보다 "정리"가 어렵다. isClosed 가드, sessionStarted 플래그 같은 방어 코드가 프로덕션 안정성을 결정한다
  • res.write()의 반환값을 무시하면 좀비 연결이 쌓인다. 반드시 체크해야 한다
  • Redis Pub/Sub은 다중 인스턴스 SSE의 가장 실용적인 해법이다. Kafka나 RabbitMQ는 이 용도엔 과하다
  • X-Accel-Buffering: no와 compression 제외는 코드 리뷰에서 빠지기 쉬운데, 빠지면 프로덕션에서 바로 장애가 된다

아쉬웠던 점

  1. Redis Pub/Sub의 at-most-once 특성으로 이벤트 유실 가능: Redis Pub/Sub은 구독자가 없으면 메시지를 버린다. 클라이언트가 SSE 재연결하는 짧은 순간에 발행된 이벤트는 영원히 유실된다. 현재 서비스 특성상 "최종 상태만 정확하면 되는" 케이스가 대부분이라 큰 문제는 아니었지만, 정확한 이벤트 순서가 중요한 도메인에서는 치명적일 수 있다.

  2. 하트비트만으로 연결 상태를 판단하는 한계: 30초 간격 하트비트로 연결 유지를 확인하지만, 하트비트 사이에 연결이 끊어지면 최대 30초간 감지가 안 된다. 그 사이에 세션 상태가 "활성"으로 남아있어서, 같은 사용자가 다른 디바이스에서 접속할 때 중복 세션 문제가 발생한 적이 있다.

  3. Orchestrator의 제네릭 타입이 복잡해져서 신규 도메인 추가 시 학습 비용: BaseSseOrchestrator<TSubscriber extends BaseSseSubscriber<TEvent>, TStartResult> 같은 중첩 제네릭이 되면서, TypeScript에 익숙하지 않은 팀원이 새 도메인을 추가할 때 타입 에러와 씨름하는 시간이 길어졌다. 추상화의 편의성을 타입 복잡성이 상쇄하는 아이러니한 상황이었다.


향후 보완할 점

  1. Redis Streams 도입 검토 (이벤트 유실 방지): Redis Pub/Sub 대신 Redis Streams를 사용하면 메시지가 영구 저장되어, 구독자가 없는 동안의 이벤트도 나중에 소비할 수 있다. Consumer Group을 활용하면 다중 인스턴스 환경에서도 정확히 한 번 처리(at-least-once)가 가능하다. 다만 Pub/Sub 대비 복잡도가 증가하므로, 이벤트 유실이 치명적인 도메인부터 단계적으로 적용할 계획이다.

  2. SSE 재연결 시 lastEventId 기반 이벤트 재전송: SSE 표준의 Last-Event-ID 헤더를 활용하여, 재연결 시 놓친 이벤트를 재전송하는 메커니즘을 구현할 계획이다. Redis Streams의 메시지 ID와 SSE의 id 필드를 연계하면, 클라이언트가 마지막으로 받은 이벤트 이후의 메시지만 정확하게 재전송할 수 있다.

  3. Base 클래스 가이드 문서 정비: 제네릭 타입의 복잡성을 코드로 해결하기보다, 잘 작성된 예제와 가이드로 학습 비용을 줄이는 것이 현실적이라고 판단했다. 각 도메인별 구현 예제, 자주 발생하는 타입 에러와 해결법, 체크리스트를 포함한 내부 가이드 문서를 작성 중이다.


AI 활용 포인트

Claude Code로 3개의 Base 클래스를 동시에 리팩토링하면서 기존 구현체와의 호환성을 검증했다. 특히 제네릭 타입 파라미터 설계(BaseSseOrchestrator<TSubscriber, TStartResult>)에서 여러 도메인의 사용 패턴을 한꺼번에 분석해 최적의 인터페이스를 도출하는 데 AI가 효과적이었다.

Top comments (0)