DEV Community

Artistrator
Artistrator

Posted on

NestJS MSA Lite 실전 아키텍처 (2/3) — 데이터 레이어와 비동기 처리

여러 레포에 흩어져 있던 B2B SaaS 서비스를 MSA Lite 모노레포로 통합한 경험을 정리한 시리즈의 두 번째 글이다.

  • Part 1: 서비스 구조와 통신 설계
  • Part 2: 데이터 레이어와 비동기 처리 (이 글)
  • Part 3: 운영, 배포, 그리고 진화

멀티테넌트 데이터베이스 — 테넌트별 DataSource 동적 관리

SaaS 서비스에서 가장 까다로운 문제 중 하나가 멀티테넌시다. 우리 경우 난이도가 한 단계 더 높았다.

각 병원(테넌트)에는 소스 DB가 있다 — 병원 EMR 시스템의 데이터베이스다. 문제는 병원마다 EMR 벤더가 다르고, DB 브랜드(Oracle, MSSQL, PostgreSQL)와 버전이 제각각이며, 심지어 같은 벤더라도 병원이 확장하면 스키마가 달라질 수 있다는 점이다. 여기에 직접 서비스 로직을 태울 수는 없다.

그래서 병원별 서비스 전용 데이터베이스(AgentDB)를 별도로 두고, 소스 DB에서 필요한 데이터를 ETL로 가져와서 정규화된 스키마로 적재하는 구조를 택했다. 서비스 로직은 항상 AgentDB만 바라본다.

아키텍처

소스 DB (병원 EMR)                    서비스 DB (AgentDB)
├── 병원 A: Oracle 11g  ──ETL──→    ├── 병원 A: PostgreSQL
├── 병원 B: MSSQL 2019  ──ETL──→    ├── 병원 B: PostgreSQL
├── 병원 C: Oracle 19c  ──ETL──→    ├── 병원 C: PostgreSQL
│   (SSH 터널 경유)                   └── ...
└── ...
                                           ↑
Config DB (공유)                    AgentDatabaseService
├── 테넌트 마스터 데이터              (런타임 DataSource 관리)
├── DataSourceConfig
│   (테넌트 → DB 접속 정보 매핑)
└── 시스템 설정
Enter fullscreen mode Exit fullscreen mode

소스 DB의 다양성은 ETL 레이어가 흡수하고, AgentDB는 모두 PostgreSQL로 통일했다. 덕분에 서비스 코드에서는 DB 브랜드를 신경 쓸 필요가 없다. 하지만 테넌트별로 독립된 AgentDB가 존재하므로, 런타임에 동적으로 DataSource를 관리해야 한다.

핵심 구현

@Injectable()
export class AgentDatabaseService implements OnModuleInit, OnModuleDestroy {
  private datasourcesMap = new Map<string, DataSource>();

  async getAgentRepository<T extends ObjectLiteral>(
    tenantId: string,
    entity: EntityTarget<T>,
  ): Promise<Repository<T>> {
    const dataSource = await this.getOrCreateDataSource(tenantId);
    return dataSource.getRepository(entity);
  }

  private async getOrCreateDataSource(tenantId: string): Promise<DataSource> {
    // 1. 캐시 히트 → 즉시 반환
    let ds = this.datasourcesMap.get(tenantId);
    if (ds?.isInitialized) return ds;

    // 2. Config DB에서 접속 정보 조회
    const config = await this.loadTenantConfig(tenantId);

    // 3. DataSource 생성 + 커넥션 풀 초기화
    ds = new DataSource({
      url: config.databaseUrl,
      type: config.databaseType,
      entities: SHARED_ENTITIES,
      synchronize: false,  // 프로덕션: migration으로만 스키마 변경
      pool: { max: 20, min: 2, idleTimeoutMillis: 30000 },
    });

    await ds.initialize();
    this.datasourcesMap.set(tenantId, ds);
    return ds;
  }
}
Enter fullscreen mode Exit fullscreen mode

성능 포인트

  • Lazy Loading: 요청이 들어온 테넌트만 DataSource 생성. 100개 테넌트가 있어도 실제 접속하는 테넌트만 메모리를 사용한다.
  • 커넥션 풀 최적화: 테넌트당 max 20으로 제한. 초기에 기본값(1000)을 그대로 썼더니 100개 테넌트 × 1000 커넥션으로 DB 서버가 다운됐던 경험이 있다.
  • Graceful Shutdown: onModuleDestroy에서 모든 DataSource를 Promise.allSettled로 순차 정리. 한 테넌트의 정리 실패가 다른 테넌트에 영향을 주지 않는다.
  • Health Check: 전체 테넌트 DB 연결 상태를 한 번에 점검하는 healthCheck() 메서드 제공.

사용하는 쪽 코드

모든 서비스에서 동일한 패턴으로 테넌트 DB에 접근한다:

// 어떤 서비스든 동일한 인터페이스
const repository = await this.agentDatabaseService.getAgentRepository<OrderEntity>(
  tenantId,
  OrderEntity,
);
const orders = await repository.find({ where: { status: "ACTIVE" } });
Enter fullscreen mode Exit fullscreen mode

비동기 처리 — BullMQ 3계층 패턴

외부 AI 서버와의 스트리밍 통신, PDF 생성, ETL 배치 같은 작업을 동기 API로 처리하면 타임아웃의 늪에 빠진다. BullMQ로 비동기 처리하되, Producer → Consumer → Handler 3계층으로 관심사를 분리했다.

// 1. Producer: 큐에 작업 추가 (thin wrapper)
@Injectable()
class DocumentGenerateProducer {
  async produce(payload: GeneratePayload): Promise<void> {
    await this.queue.add("generate", payload, {
      attempts: 1,
      backoff: { type: "exponential", delay: 2000 },
      removeOnComplete: { age: 7 * 24 * 3600, count: 1000 },
      removeOnFail: false,  // 실패 작업은 모니터링용으로 보존
    });
  }
}

// 2. Consumer: BullMQ Worker (DI 브릿지 역할만)
@Processor("document-generate")
class DocumentGenerateConsumer extends WorkerHost {
  constructor(private readonly handler: DocumentGenerateHandler) { super(); }

  async process(job: Job<GeneratePayload>) {
    await this.handler.handle(job.data);
  }

  @OnWorkerEvent("failed")
  async onFailed(job: Job, error: Error) {
    await this.handler.handleFailure(job.data, error);
  }
}

// 3. Handler: 순수 비즈니스 로직 (테스트 가능)
@Injectable()
class DocumentGenerateHandler {
  async handle(payload: GeneratePayload): Promise<void> {
    // 실제 문서 생성 로직
  }

  async handleFailure(payload: GeneratePayload, error: Error): Promise<void> {
    // DB 상태 복구 (예: status → FAILED)
  }
}
Enter fullscreen mode Exit fullscreen mode

왜 3계층인가?

레이어 테스트 용이성 책임
Producer 높음 (FakeProducer로 대체) 큐에 작업 추가
Consumer 낮음 (@Processor 데코레이터 종속) BullMQ ↔ DI 브릿지
Handler 높음 (new Handler(fakeDeps)) 비즈니스 로직

Consumer가 BullMQ에 강결합되어 있어 단위 테스트가 어렵다. Handler를 분리하면 new Handler(fakeDb, fakeS3)로 mock 없이 테스트할 수 있다. E2E 테스트에서는 Handler 전체를 MockHandler로 교체하여 큐 의존성을 제거한다.

Bull Board 모니터링

BullMQRootModule.forRoot({ bullBoardRoute: "/queues" })
Enter fullscreen mode Exit fullscreen mode

/queues 경로에서 모든 큐의 대기/처리/실패 작업을 실시간으로 확인할 수 있다. 실패 작업의 스택 트레이스, 재시도 횟수, 처리 시간까지 대시보드에서 바로 보인다. 프로덕션 장애 대응 시 가장 먼저 확인하는 화면이다.

실시간 업데이트 — Redis PubSub 기반 SSE

WebSocket 대신 SSE(Server-Sent Events)를 선택한 이유:

  • 클라이언트 → 서버 양방향 통신이 불필요 (서버 → 클라이언트 단방향 Push만 필요)
  • HTTP/2 환경에서 다중 스트림 지원
  • 연결 끊김 시 브라우저가 자동 재연결

문제는 마이크로서비스에서 이벤트가 발생하는데, SSE 커넥션은 Gateway에 있다는 점이다. Redis PubSub으로 이 간극을 메웠다.

Microservice                Redis PubSub            Gateway SSE
(이벤트 발생)                                       (클라이언트 연결)
     │                           │                       │
Publisher.publish()  ─────→  Channel  ─────→  Subscriber.onMessage()
     │                           │                       │
     │                           │                  res.write(event)
     │                           │                       │
     │                    Multi-instance 동기화           │
Enter fullscreen mode Exit fullscreen mode

베이스 클래스 상속 패턴

새로운 도메인의 SSE를 추가할 때, 채널명과 이벤트 타입만 정의하면 된다:

// 도메인별 Publisher — 이벤트 채널 정의만 하면 된다
class OrderSsePublisher extends BaseSsePublisher<OrderEvent> {
  protected getChannelName(orderId: string): string {
    return `order_updates_${orderId}`;
  }
}

// Gateway의 Orchestrator — 구독 + 정리 라이프사이클 관리
class OrderSseOrchestrator extends BaseSseOrchestrator<OrderEvent> {
  async start(res: Response, orderId: string) {
    await this.subscriber.subscribe(this.getChannelName(orderId));
    this.startHeartbeat();            // 30초 간격 ping
    res.on("close", () => this.cleanup());  // 연결 종료 시 리소스 정리
  }
}
Enter fullscreen mode Exit fullscreen mode

주의점: Gateway에서 SSE 라우트는 compression 미들웨어에서 반드시 제외해야 한다. gzip이 응답을 버퍼링하면 실시간 스트리밍이 깨진다.

Multi-instance 동기화

Gateway가 2대 이상일 때, 클라이언트 A가 Gateway-1에 연결되어 있는데 이벤트가 Gateway-2를 통해 들어올 수 있다. Redis PubSub은 모든 구독자에게 브로드캐스트하므로 이 문제가 자연스럽게 해결된다. 추가 로직 없이 모든 Gateway 인스턴스가 이벤트를 수신한다.

분산 크론 잡 — 다중 인스턴스에서 한 번만 실행

ECS에서 서비스를 2개 이상 인스턴스로 띄우면, @Cron 데코레이터가 모든 인스턴스에서 동시 실행된다. 매일 아침 알림을 2번 보내거나, ETL이 2번 실행되는 사고가 발생한다.

Redis 분산 락 기반 @DistributedCron 데코레이터로 해결했다:

@DistributedCron("0 8 * * *", {
  name: "daily-notification",  // 글로벌 유니크 락 키
  timeZone: "Asia/Seoul",
  ttlMs: 300_000,             // 5분 TTL (자동 연장)
})
async sendDailyNotification(): Promise<void> {
  // 여러 인스턴스 중 단 하나만 실행
  // 나머지는 "[daily-notification] Skipped" 로그 후 스킵
}
Enter fullscreen mode Exit fullscreen mode

내부 동작

  1. Redis SET key lockId PX ttlMs NX — key가 없을 때만 설정 (원자적 락 획득)
  2. 락 획득 성공 → 작업 실행 + TTL의 2/3 시점마다 자동 연장
  3. 락 획득 실패 → 스킵 (다른 인스턴스가 실행 중)
  4. 작업 완료 → 락 해제

자동 연장이 중요한 이유: TTL을 5분으로 설정했는데 작업이 7분 걸리면? TTL의 2/3 시점(3분 20초)에 자동으로 TTL을 갱신하므로, 장시간 작업에서도 락이 중간에 만료되지 않는다.

사용자 코드에서는 락을 전혀 의식할 필요가 없다. NestJS의 @Cron@DistributedCron으로 바꾸고 name만 추가하면 끝이다.

다음 편 예고

Part 3에서는 운영과 진화를 다룬다:

  • 무중단 배포 (Expand-Contract 패턴)
  • 테스트 전략 (jest.fn() 없는 세계)
  • 모놀리스 vs MSA Lite 체감 비교
  • 추후 발전 방향 (OpenTelemetry, Circuit Breaker, K8s)

AI 활용 포인트

소스 DB(병원 EMR)의 스키마 차이를 정규화하는 ETL 매핑 로직을 작성할 때 Claude Code를 활용했다. 병원별로 미묘하게 다른 컬럼명, 데이터 타입, 코드 체계를 분석하고 AgentDB의 통일된 스키마로 변환하는 매퍼를 생성하는 데 특히 유용했다. 멀티테넌트 DB의 커넥션 풀 최적화 값을 결정할 때도 전체 코드베이스의 동시 쿼리 패턴을 분석해 테넌트당 최대 동시 커넥션 수를 추정하는 데 활용했다.

Top comments (0)