CQRS - CQRS Study 3

 

CQRS 방법론 정리3 (2025.05 ~ 2025.12)

목차

  1. 코드 리뷰: Elasticsearch 로직 부분 1 - Query 모델 동기화
  2. 코드 리뷰: Elasticsearch 로직 부분 2 - Query 모델 동기화 개선 사항
  3. 코드 리뷰: MongoDB 로직 부분 1 - Query 모델 동기화
  4. Projection 방식 비교 - Interface 기반 vs QueryDSL Constructor
  5. MongoDB 동기화에서 여러 번 읽기를 수행하는 이유


1. 코드 리뷰: Elasticsearch 로직 부분 1 - Query 모델 동기화

개요

Debezium 등 CDC(Change Data Capture)로부터 Kafka 토픽(product-events)으로 전달된 DB 변경 이벤트를 수신하여, 상품 검색 전용 Elasticsearch 인덱스(products)를 실시간으로 동기화하는 읽기 모델 동기화 시스템입니다. 핵심 구성 요소는 다음과 같습니다:

  • 이벤트 모델: CdcEvent
  • 동기화기(컨슈머): ProductSearchModelSyncer
  • ES 문서 스키마: ProductSearchDocument
  • ES 저장소: ProductSearchRepository
  • ES 인덱스 초기화: ElasticsearchInitService


데이터 흐름

  1. Kafka 수신
  2. 이벤트 파싱(CdcEvent)
  3. 테이블별 라우팅
  4. ES 문서 전체 저장 또는 부분 업데이트
  5. products 인덱스에 반영



CdcEvent: 이벤트 포맷

파일: product/query/sync/CdcEvent.java

CDC 표준 필드 구조를 Map<String, Object>로 관리합니다:

  • op: 이벤트 타입 (c 생성, u 수정, d 삭제, r 스냅샷)
  • before, after: 변경 전/후 레코드 스냅샷
  • source: DB/스키마/테이블 메타정보
  • timestamp(ts_ms): 이벤트 시간
  • key: Kafka 메시지 키(현재 로직에서는 미사용)

헬퍼 메서드: isCreate(), isUpdate(), isDelete(), isRead(), getTable(), getBeforeData(), getAfterData()


사용 맥락:

  • 컨슈머에서 messageValueCdcEvent로 역직렬화
  • messageKey는 별도로 Map으로 파싱하여 event.setKey(...)로 부착
  • 실제 라우팅은 event.getTable() 값으로 결정



ProductSearchDocument: ES 문서 스키마

파일: product/query/search/ProductSearchDocument.java

  • 인덱스 이름: products
  • 설정: @Setting(settingPath = "elasticsearch/product-index-settings.json") → Nori 분석기 등 한글 검색 최적화


주요 필드 및 매핑:

  • name, shortDescription, fullDescription, materials: FieldType.Text + nori_analyzer
  • status, slug, sellerId, brandId, categoryIds, tagIds: FieldType.Keyword
  • basePrice, salePrice: FieldType.Double (Java 타입은 BigDecimal)
  • inStock: FieldType.Boolean
  • createdAt, updatedAt: FieldType.Date(epoch_millis)
  • 리뷰 관련: averageRating(Double), reviewCount(Integer)


설계 의도:

  • 한글 검색어 토크나이징을 위한 Nori 분석기 적용
  • 필터/정렬용 필드는 키워드 타입으로 유지



ElasticsearchInitService: 인덱스 초기화

파일: product/query/search/ElasticsearchInitService.java

  • 애플리케이션 기동 시(ApplicationReadyEvent) ProductSearchDocument 기반 인덱스 존재 여부 확인
  • 인덱스가 없으면 indexOps.create()putMapping(indexOps.createMapping())로 인덱스와 매핑 생성
  • 설정 파일의 분석기/토크나이저 등이 자동 적용



ProductSearchRepository: 저장소

파일: product/query/search/ProductSearchRepository.java

  • ElasticsearchRepository<ProductSearchDocument, Long> 상속
  • 기본 CRUD 및 간단한 쿼리 메서드 지원



ProductSearchModelSyncer: 동기화기(핵심 로직)

파일: product/query/sync/ProductSearchModelSyncer.java

Kafka 컨슈머: @KafkaListener(topics = {"product-events"}, groupId = "product-search-group")

수신 처리 순서:

  1. 수신 로그 기록
  2. messageValueCdcEvent 역직렬화
  3. messageKeyMap으로 파싱하여 event.key에 저장
  4. event.getTable()로 분기 처리


테이블별 처리 상세

공통 유틸: updatePartialDocument(productId, updates)

  • 내부적으로 ElasticsearchOperations.update(UpdateQuery) 사용
  • withDocAsUpsert(true) → 문서가 없으면 생성(upsert)
  • IndexCoordinates.of("products")로 직접 인덱스 지정

1) products 테이블handleProductEvent

  • delete: before.id로 문서 삭제
  • create/update/read(snapshot): after로 전체 문서 재구성 후 repository.save(document)
    • 필드: id, name, slug, shortDescription, fullDescription, status, sellerId, brandId, createdAt, updatedAt
    • inStock: status == "ACTIVE" 여부로 계산
    • 기존 문서가 있으면 누락 필드 보존: materials, basePrice, salePrice, categoryIds, tagIds, averageRating, reviewCount


2) product_detailshandleProductDetailEvent

  • delete: 해당 상품의 materialsnull로 부분 업데이트
  • create/update: materials만 부분 업데이트


3) product_priceshandleProductPriceEvent

  • delete: basePrice, salePricenull로 부분 업데이트
  • create/update: 존재하는 필드만 부분 업데이트(base_price, sale_price)


4) product_categorieshandleProductCategoryEvent

  • 카테고리 목록을 기존 문서에서 읽어 수정 후 전체 배열을 부분 업데이트
  • delete: 해당 categoryId 제거
  • create/update: 배열에 categoryId가 없으면 추가
  • 문서가 없을 경우 경고 로그만 남기고 리턴(upsert로 새 문서를 만들지 않음)


5) product_tagshandleProductTagEvent

  • 카테고리와 동일한 패턴으로 태그 배열 유지/갱신
  • 문서가 없을 경우 경고 로그만 남기고 리턴


6) reviewshandleReviewEvent

  • 기존 문서를 읽어 평균 평점/리뷰 수를 증감·재계산하여 부분 업데이트
    • delete: 리뷰 1개 감소, 해당 평점을 평균에서 제거. 0개가 되면 평균 0.0
    • update: 이전 평점과 새 평점이 다른 경우에만 평균 재계산(카운트는 동일)
    • create: 카운트 +1, 평균에 새 평점 반영
  • 문서가 없을 경우 경고 로그 후 리턴


공통 파싱/변환 유틸

  • 숫자 파싱: getLongValue(), getIntegerValue(), getBigDecimalValue()
  • 시간 파싱: parseTimestampToInstant()
    • 문자열: LocalDateTime.parse() 후 시스템 타임존 적용 → Instant
    • 숫자: 마이크로초를 밀리초로 가정하고 /1000 후 epoch millis로 변환



아키텍처/설계 의도 요약

  • 쓰기 모델(DB)의 변화를 읽기 모델(ES)로 투영하는 CQRS/이벤트 소싱 스타일의 동기화 계층
  • products는 전체 문서 업서트, 연관 테이블(product_details, product_prices)은 부분 필드만 업서트
  • 다대다/연관(categories, tags)과 집계(reviews)는 기존 문서를 읽어서 파생 필드만 갱신



장점

  • 테이블별 관심사 분리: 어떤 변경이 ES 문서의 어느 필드에 반영되는지 명확
  • 부분 업데이트 활용: 대용량 문서 전체 재색인을 피하고 필요한 필드만 갱신하여 비용 절감
  • 인덱스 초기화 자동화: 기동 시 인덱스/매핑 보장



잠재 이슈 및 개선 제안

1) 가격 필드 타입 정합성/정밀도

  • ES 매핑은 FieldType.Double, Java는 BigDecimal
  • 금액은 double 타입에서 부동소수점 오차가 발생할 수 있으므로 ES 매핑을 scaled_float(+ scaling_factor) 또는 long(센트 단위)로 저장하는 방식 권장


2) 시간 파싱 가정

  • parseTimestampToInstant()에서 숫자형 타임스탬프를 “마이크로초→밀리초”로 가정하여 /1000 수행
  • Debezium의 ts_ms는 밀리초 단위이나, DB 칼럼 created_at/updated_at은 소스 DB/커넥터 설정에 따라 단위가 다를 수 있음
  • 실제 단위를 명시적으로 확인하여 분기하는 것이 안전


3) 부분 업데이트 동작의 일관성

  • updatePartialDocument()docAsUpsert(true)이므로 문서가 없어도 새로 생성
  • 반면 카테고리/태그/리뷰는 기존 문서를 먼저 읽어야 하며, 없으면 경고만 출력하고 종료
  • 테이블별 동작이 일관되지 않음
    • 옵션 A: 카테고리/태그/리뷰도 초기 값(빈 배열/0/0.0)으로 upsert 허용
    • 옵션 B: 모든 부분 업데이트 전에 문서 존재 여부를 공통으로 확인하고, 없으면 큐잉/재시도 전략 적용


4) 이벤트 도착 순서/중복(Idempotency)

  • Kafka/CDC 특성상 순서 보장이나 중복 없는 전달을 과신하면 안 됨
  • Out-of-order 상황에서 역전 가능성 존재
  • 개선: 각 문서에 “마지막 처리 오프셋/타임스탬프”를 저장하여, 더 과거 이벤트는 무시하는 가드 추가


5) 평균 평점 계산의 수치 안정성/라운딩

  • 반복 가감 산술에서 부동소수점 오차가 누적될 수 있음
  • 1~2자리 라운딩 일관 적용 또는 합계/카운트를 원장처럼 저장하여 평균을 매 조회 시 계산하는 대안 고려


6) 예외 처리/재시도

  • updatePartialDocument()에서 예외를 로깅만 하고 종료
  • 재시도 전략(재처리 큐, Dead Letter Topic) 또는 알람 체계 연계 필요


7) 스냅샷(op=r) 처리

  • 스냅샷 이벤트는 products의 경우 일반 create/update 경로로 반영
  • 연관 테이블 스냅샷이 먼저 들어오면 불일치 발생 가능


8) 필드 기본값/널 처리

  • 삭제 이벤트에서 materials/basePrice/salePricenull로 설정
  • 검색/정렬 쿼리에서 null에 대한 동작을 명확히 정의 필요(예: 정렬 시 nulls_last, 필터 시 exists 사용)


9) 키(payload key) 미사용

  • event.key를 저장만 하고 사용하지 않음
  • Upsert 시 자연키·중복 탐지 등에 활용 계획이 없다면 불필요한 파싱 생략 가능


10) 다건/배치 처리

  • 현재는 이벤트마다 단건 업데이트/저장(네트워크 왕복)
  • 트래픽이 높다면 벌크 API로 묶어 처리하는 최적화 여지 존재



운영 체크리스트

  • 인덱스 설정(product-index-settings.json)의 Nori 토크나이저/필터가 실제 질의 분석기와 일치하는지 검증
  • 매핑 호환성(배포 간 변경 시 롤링 전략)과 템플릿/ILM 정책 고려
  • 재처리(리플레이) 시 스냅샷/증분 이벤트 순서 보장 전략
  • 모니터링: Kafka 컨슈머 랙, 처리 실패율, ES 업데이트 레이턴시
  • Dead Letter Topic 또는 실패 재시도 백오프 정책



결론

본 코드는 DB 변경을 ES 읽기 모델로 실시간 동기화하는 구현이며, 테이블별로 전체/부분 업데이트가 잘 분리되어 있습니다. 실무 운영 관점에서는 타입 정밀도(가격), 타임스탬프 단위, 문서 미존재 시 정책 일관성, 이벤트 순서/중복에 대한 가드, 재시도/감시 체계 등을 보완하면 안정성과 일관성이 높아질 것입니다.



2. 코드 리뷰: Elasticsearch 로직 부분 2 - Query 모델 동기화 개선 사항

안내

검색/필터/정렬 쿼리 예시와 앞서 언급한 개선 항목의 구체적인 코드/설계 방안을 정리했습니다. 예시는 Spring Data Elasticsearch 기반 Repository/Operations와 순수 Elasticsearch Query DSL 양쪽을 함께 제공합니다.


검색/필터/정렬 — 쿼리 예시 모음

아래 예시에서 인덱스는 products, 문서 타입은 ProductSearchDocument입니다.


1) 기본 키워드 검색(이름/설명) + 상태 필터 + 최신순 정렬

의도: 사용자가 입력한 키워드로 name, shortDescription, fullDescription을 Nori 분석기로 검색, 노출 가능한 상태(ACTIVE)만 표시하고 updatedAt 내림차순 정렬

Elasticsearch Query DSL 예시:

{
  "query": {
    "bool": {
      "must": [
        {
          "multi_match": {
            "query": "겨울 코트",
            "fields": ["name^3", "shortDescription^2", "fullDescription"],
            "type": "best_fields"
          }
        }
      ],
      "filter": [
        { "term": { "status": "ACTIVE" } }
      ]
    }
  },
  "sort": [
    { "updatedAt": { "order": "desc" } }
  ],
  "from": 0,
  "size": 20
}


Spring Data(Operations) 예시:

NativeQuery query = NativeQuery.builder()
    .withQuery(q -> q
        .bool(b -> b
            .must(m -> m.multiMatch(mm -> mm
                .query("겨울 코트")
                .fields("name^3", "shortDescription^2", "fullDescription")
                .type(MultiMatchQueryType.BEST_FIELDS)))
            .filter(f -> f.term(t -> t.field("status").value("ACTIVE")))
        )
    )
    .withSort(s -> s.sort("updatedAt", SortOrder.DESC))
    .withPageable(PageRequest.of(0, 20))
    .build();
SearchHits<ProductSearchDocument> hits = operations.search(query, ProductSearchDocument.class);


2) 카테고리/태그 다중 선택 필터 + 가격 범위 + 판매자 필터

의도: 특정 카테고리들 중 하나 이상 포함, 태그 중 하나 이상 포함, 세일가 기준 범위 필터, 특정 셀러 상품만

{
  "query": {
    "bool": {
      "must": [],
      "filter": [
        { "terms": { "categoryIds": [101, 102, 103] } },
        { "terms": { "tagIds": [7, 9] } },
        { "range": { "salePrice": { "gte": 50000, "lte": 150000 } } },
        { "term": { "sellerId": 88 } },
        { "term": { "status": "ACTIVE" } }
      ]
    }
  },
  "sort": [ { "salePrice": { "order": "asc" } } ],
  "size": 40
}


3) 가용 재고(inStock) 필터 + 평점 상위 정렬 + 보조 정렬(리뷰 수)

의도: 재고 있는 상품만, 평균 평점 내림차순 정렬, 동점 시 리뷰 수 많은 순

{
  "query": {
    "bool": {
      "filter": [
        { "term": { "inStock": true } },
        { "term": { "status": "ACTIVE" } }
      ]
    }
  },
  "sort": [
    { "averageRating": { "order": "desc" } },
    { "reviewCount": { "order": "desc" } }
  ],
  "size": 20
}


4) 오타 허용/유사어 검색(fuzziness)

의도: 키워드에 경미한 오타가 있어도 매칭

{
  "query": {
    "bool": {
      "must": [
        {
          "multi_match": {
            "query": "넥타이",
            "fields": ["name^2", "shortDescription", "fullDescription"],
            "fuzziness": "AUTO"
          }
        }
      ],
      "filter": [ { "term": { "status": "ACTIVE" } } ]
    }
  }
}


5) 신상품 우선 + 가격 보조 정렬

{
  "query": { "term": { "status": "ACTIVE" } },
  "sort": [
    { "createdAt": { "order": "desc" } },
    { "salePrice": { "order": "asc", "missing": "_last" } }
  ]
}


6) Prefix/자동완성(edge_ngram 설정 시)

{
  "query": {
    "bool": {
      "should": [
        { "match_phrase_prefix": { "name": { "query": "니트", "slop": 1 } } },
        { "match_phrase_prefix": { "shortDescription": { "query": "니트" } } }
      ],
      "minimum_should_match": 1,
      "filter": [ { "term": { "status": "ACTIVE" } } ]
    }
  },
  "size": 10
}


개선 항목 — 구체적인 코드/설계 방안

1) 가격 정밀도 개선: Double → scaled_float 또는 long

ES 매핑 변경(권장):

{
  "mappings": {
    "properties": {
      "basePrice": { "type": "scaled_float", "scaling_factor": 100 },
      "salePrice": { "type": "scaled_float", "scaling_factor": 100 }
    }
  }
}
  • Java 모델은 그대로 BigDecimal 유지
  • 저장 시 updates.put("salePrice", bigDecimal);는 그대로 동작(Elasticsearch가 스케일링 처리)
  • 대안: 센트 단위 정수(long)로 저장(priceCents) 후 UI에서 나눠 표시

배포 전략:

  • 매핑 변경은 호환성 이슈가 있으므로 새 인덱스 버전(products_v2) 생성 → reindex → alias 전환


2) 시간 단위 명확화

private Instant parseTimestampToInstant(Object timestamp) {
    if (timestamp == null) return null;
    try {
        if (timestamp instanceof Number) {
            long v = ((Number) timestamp).longValue();
            // 10자리(초), 13자리(밀리초), 16자리(마이크로초) 구분
            if (v < 1_000_000_000_0L) { // < 10자리 → 초로 간주
                return Instant.ofEpochSecond(v);
            } else if (v < 1_000_000_000_000L) { // 12~13자리 → 밀리초
                return Instant.ofEpochMilli(v);
            } else { // 그 이상은 마이크로초 → 밀리초 환산
                return Instant.ofEpochMilli(v / 1000);
            }
        }
        if (timestamp instanceof String) {
            return LocalDateTime.parse((String) timestamp)
                    .atZone(ZoneId.systemDefault()).toInstant();
        }
    } catch (Exception e) {
        log.warn("Failed to parse timestamp: {}", timestamp, e);
    }
    return null;
}


3) 부분 업데이트 일관성: 문서 미존재 시 정책 통일

옵션 A(Upsert 허용): 카테고리/태그/리뷰도 기본값으로 업서트

private void upsertListField(Long productId, String field, List<Long> values) {
    Map<String, Object> updates = new HashMap<>();
    updates.put(field, values != null ? values : new ArrayList<>());
    updatePartialDocument(productId, updates); // docAsUpsert(true)
}

옵션 B(존재할 때만 반영): 모든 핸들러 시작부에서 공통 체크

private Optional<ProductSearchDocument> mustGetExisting(Long productId) {
    Optional<ProductSearchDocument> doc = productSearchRepository.findById(productId);
    if (doc.isEmpty()) log.warn("Doc not found: {}", productId);
    return doc;
}


4) 순서 역전/중복 방지 가드(Idempotency)

private boolean isOutdated(CdcEvent evt, ProductSearchDocument doc) {
    Long evtTs = evt.getTimestamp(); // Debezium ts_ms
    return doc.getLastProcessedTs() != null && evtTs != null && evtTs < doc.getLastProcessedTs();
}
  • 모든 handle*의 초입에 findById()isOutdated() 체크 → outdated면 skip
  • 필드 추가가 필요하므로 문서/매핑 확장(버전 인덱스 권장)


5) 평점 계산 안정화

  • 합계와 카운트를 같이 보관하여 오차 누적 방지
  • ES 문서에 ratingSum(Long) 추가, averageRating = ratingSum / reviewCount는 조회 시 계산
private double round1(double v) { 
    return Math.round(v * 10.0) / 10.0; 
}


6) 예외/재시도 & DLQ

  • @KafkaListenerDefaultErrorHandler 구성 with backoff
  • 반복 실패 시 Dead Letter Topic(product-events-dlt)으로 전송
  • updatePartialDocument() 실패 시 런타임 예외를 던져 컨슈머 에러 핸들러로 전달


7) 스냅샷(op=r) 처리 순서 이슈

  • 스냅샷 배치 시 products를 우선 소비/적재 후 연관 테이블 재생하도록 컨슈머 그룹/파티션 전략 설계
  • 또는 하나의 컨슈머에서 테이블 우선순위를 강제: 이벤트 버퍼링 후 products 문서가 생성되면 연관 이벤트 drain


8) null 필드 처리 정책

  • 정렬 시 missing 옵션 사용: "missing": "_last" 또는 _first
  • 필터 시 존재성 체크: {"exists": {"field": "salePrice"}}와 조합
  • 애플리케이션 계층에서 null-safe 디폴트 제공(예: Price 미기재 시 “가격협의”)


9) 메시지 키 활용

  • event.key에 자연키가 있다면 멱등키로 활용 가능
  • 동일 키/오프셋 조합은 Redis 등으로 단기 캐싱하여 중복 처리 차단
  • 멱등성 토큰 예: String idempotencyKey = evt.getTable()+":"+evt.getKey()+":"+evt.getTimestamp();


10) 벌크 처리 최적화

  • ElasticsearchOperations.bulkUpdate(...) 사용
  • 배치 사이즈를 200~1000 사이로 튜닝
  • Kafka 컨슈머에서 @BatchListener 또는 poll 레벨에서 모아 벌크 업서트 실행


샘플: Spring Data Repository에 검색용 메서드 추가

public interface ProductSearchRepository
    extends ElasticsearchRepository<ProductSearchDocument, Long> {

    Page<ProductSearchDocument> findByStatusAndInStockTrue(
        String status, Pageable pageable);

    Page<ProductSearchDocument> findByStatusAndCategoryIdsIn(
        String status, Collection<Long> categoryIds, Pageable pageable);

    Page<ProductSearchDocument> findByStatusAndSellerId(
        String status, Long sellerId, Pageable pageable);
}
  • 복잡한 검색은 ElasticsearchOperations 또는 Querydsl-style의 커스텀 리포지토리로 분리 권장


인덱스 설정 팁(product-index-settings.json)

한글 검색 품질 개선:

  • decompound_mode: mixed
  • 불용어 사전/동의어 사전 적용(사업 도메인 용어 튜닝)
  • 필요 시 edge_ngram 기반 보조 필드(name.ngram) 추가 후 multi_match에 포함


정렬 전용 필드:

  • keyword 서브필드(name.keyword) 또는 normalizer 적용


마이그레이션/배포 전략 요약

  1. 스키마 변경(가격 타입, 가드 필드 추가 등) → 새 인덱스 products_v2 작성
  2. 백필/리인덱싱(Scroll + Bulk)
  3. 알리아스 전환(productsproducts_v1, products_v2)
  4. 컨슈머 코드 릴리스 후 점진 전환



3. 코드 리뷰: MongoDB 로직 부분 1 - Query 모델 동기화

개요

RDB의 변경 이벤트(CDC)를 받아서, 검색/조회에 최적화된 MongoDB의 ProductDocument(비정규화된 문서)를 최신 상태로 유지하는 동기화 파이프라인입니다.


핵심 컴포넌트:

  • 동기화기: ProductDocumentModelSyncer
  • MongoDB 저장소: ProductDocumentRepository
  • MongoDB 초기화: MongoDBInitService
  • Projection 인터페이스: ProductOptionProjection, OptionGroupWithProductProjection, ProductImageProjection


특징:

  • 변경 이벤트 단위로 부분 업데이트를 복잡하게 계산하지 않고, 정합성이 중요한 트리를 항상 “일관된 스냅샷”으로 재구성
  • RDB에서 필요한 칼럼만 Projection으로 읽기 때문에 엔티티 전체 로딩이나 연관 Lazy 초기화 오버헤드를 회피



동작 흐름(요약)

CDC 이벤트 수신:

  • consumeProductEvents(..), consumeProductOptionEvents(..)에서 Kafka/CDC 메시지를 파싱하여 CdcEvent로 변환
  • 테이블명에 따라 개별 핸들러로 분기


테이블별 처리:

  • products, product_prices, product_details, product_categories, product_tags, reviews, product_option_groups, product_options, product_images 등의 변경을 각각 handle*Event()가 처리
  • 삭제 이벤트는 before 데이터 사용, 생성/수정 이벤트는 after 데이터 사용


문서 갱신:

  • MongoDB에서 productIdProductDocument 조회
  • 필요한 부분(가격/상세/리뷰/카테고리/태그/옵션/이미지 등)을 갱신하여 저장
  • 옵션/이미지 변경은 전체 옵션 트리를 Projection 기반으로 한 번에 재구성하여 문서에 반영: updateProductOptionsUsingProjections(productId)



핵심 포인트 1: 옵션/이미지 전체 재구성 로직

관련 코드: updateProductOptionsUsingProjections(Long productId)

동작 방식:

  • 옵션 변경(handleProductOptionEventWithFullUpdate())이나 이미지 변경(handleProductImageEventWithFullUpdate())이 발생하면 해당 상품의 옵션 그룹/옵션/이미지 트리를 RDB에서 Projection으로 일괄 조회 → 메모리에서 중첩 구조화 → MongoDB 문서 반영


사용되는 Projection들:

  • OptionGroupWithProductProjection: 상품에 속한 옵션 그룹들의 메타(그룹 id, 이름, displayOrder 등) 조회
  • ProductOptionProjection: 각 그룹에 속한 옵션들의 메타(id, name, price, stock, displayOrder 등) 조회
  • ProductImageProjection: 옵션/상품 이미지 메타(id, url, isPrimary, displayOrder 등) 조회


장점:

  • 변경 이벤트 단위로 부분 업데이트를 복잡하게 계산하지 않고, 정합성이 중요한 트리를 항상 “일관된 스냅샷”으로 재구성
  • RDB에서 필요한 칼럼만 Projection으로 읽기 때문에 엔티티 전체를 로딩하거나 연관을 Lazy 초기화하는 오버헤드 회피



핵심 포인트 2: 인덱스 및 MongoDB 문서 구조

ProductDocument:

  • 검색/필터/정렬에 최적화된 비정규화 모델
  • 중첩 클래스(브랜드, 카테고리, 옵션 그룹, 옵션, 이미지, 태그, 평점 요약 등)를 통해 한 번의 조회로 상세 화면 구성 가능


MongoDBInitService:

  • 애플리케이션 기동 시 컬렉션 생성과 인덱스 보장
  • 인덱스 예시: name_idx, status_idx, created_at_idx, rating_avg_idx, price_idx, category_idx, brand_idx, tag_idx
  • 인덱스는 주로 필터/정렬 성능을 높이기 위한 것으로, 실제 조회 패턴과 정합



핵심 포인트 3: ProductDocumentRepository의 ID 타입 통일

  • ProductDocument.@IdLong이므로 레포지토리 제네릭도 Long으로 통일
  • findById(), deleteById() 등에서의 타입 불일치 제거



왜 Projection 인터페이스를 따로 만들었나?

1) 불필요한 로딩 방지(성능 최적화)

  • 인터페이스 기반 Spring Data Projection은 “정말 필요한 칼럼만” SELECT
  • 엔티티를 통째로 가져오면 연관 로딩/프록시 초기화까지 동반될 수 있는데, Projection은 그 오버헤드 제거
  • 옵션/이미지 트리를 자주/대량으로 읽는 동기화 시나리오에서 중요


2) 쓰기 모델과 조회 모델의 분리(클린 아키텍처)

  • 도메인 엔티티는 쓰기 모델(집계/불변 규칙 등)에 초점
  • 조회 목적의 칼럼 나열형 타입을 엔티티에 섞지 않는 것이 깔끔
  • Projection 인터페이스는 “쿼리 결과 전송 전용 타입(읽기 전용 계약)”으로서 엔티티 변경과 독립적으로 진화 가능


3) 쿼리 매핑의 안정성/명시성

  • 네이밍된 getter들(getId(), getName(), getAdditionalPrice(), …)은 쿼리 결과 칼럼과 1:1로 매핑
  • 네이티브/JPQL/QueryDSL 기반 조회에서도 타입 안정성과 가독성 제공
  • 변경 이벤트 핸들러(ProductDocumentModelSyncer) 입장에서는 일관된 인터페이스만 의존하므로 저장소 구현 교체가 용이


4) 사이드 이펙트 차단 및 테스트 용이성

  • Projection은 순수 값 전달에 집중하므로 엔티티의 지연 로딩, 변경 감지, 영속성 컨텍스트 상호작용 등 부수효과가 없음
  • 단위 테스트에서 스텁을 만들기 쉬워 동기화 로직 테스트가 간단


5) 모듈 간 의존 최소화

  • infrastructure.repository.projection 패키지에 두어 도메인/애그리게잇과 느슨하게 결합
  • 조회 모델 변경이 쓰기 모델에 파급되지 않음


정리: Projection 파일을 따로 만든 이유는 “조회 최적화(성능) + 책임 분리(아키텍처) + 타입 안정성(명시적 계약) + 테스트 편의성”을 동시에 얻기 위한 설계 선택입니다.



코드 리뷰(분석 관점 제안)

긍정적 포인트:

  • 이벤트별 핸들러 분리로 가독성 우수
  • 삭제/업서트 등 케이스 분기 명확, get*Value() 유틸로 파싱 일관성 확보
  • 옵션/이미지 변경 시 전체 스냅샷 재구성은 정합성을 간단하게 보장
  • MongoDB 인덱스가 주요 질의 패턴(필터/정렬)에 맞게 정의


개선 제안(참고):

  • 동시성/경합: 동일 상품에 대한 이벤트가 빠르게 연속 도착할 때 마지막 쓰기 승자 보장(예: updatedAt 비교)이나 이벤트 순서 보장 고려
  • 존재하지 않는 문서 처리: 옵션/이미지 이벤트가 먼저 오고 ProductDocument가 아직 생성되지 않은 경우 현재는 경고 로그 후 리턴. “지연 생성(upsert)” 전략 도입 여부 결정 필요
  • 대형 문서 관리: 옵션/이미지 배열이 매우 커질 수 있어 문서 크기 증가에 따른 MongoDB 단일 문서 사이즈(16MB) 한계나 업데이트 비용 점검. 페이로드 분리(옵션만 별도 컬렉션/별도 조회) 전략 평가 가능
  • 인덱스 재검토: name은 텍스트 검색이 목적이면 TextIndex(text) 고려. 현재는 단순 정렬/검색 인덱스. 실제 조회 조건에 맞추어 compound index도 검토
  • 타입 일관성: 금액(BigDecimal), 시간(LocalDateTime) 등 직렬화/역직렬화 포맷을 운영 환경과 동일하게 검증
  • 예외/로그: 일부 경고 로그가 빈번히 발생할 수 있으므로 샘플링이나 레벨 조정 고려



결론

이 구조는 “RDB의 변경을 CDC로 받고, 조회 최적화된 MongoDB 문서를 최신 상태로 유지”하는 전형적인 CQRS/Read Model 동기화 패턴을 잘 따르고 있습니다. Projection 인터페이스를 별도 파일로 둔 이유는 필요한 칼럼만 읽는 가벼운 조회 모델로 성능을 확보하고, 쓰기 모델(엔티티)와 관심사를 분리하며, 타입 안정성과 테스트 용이성을 얻기 위함입니다.



4. Projection 방식 비교 - Interface 기반 vs QueryDSL Constructor

개요

ProductDocumentModelSyncer에서 사용한 인터페이스 기반 Projection(ProductOptionProjection, OptionGroupWithProductProjection, ProductImageProjection)과 Projections.constructor(...)는 둘 다 Projection(투영)이라는 큰 범주에 속하지만, 사용하는 층위와 목적이 다릅니다.


간단 요약:

  • 인터페이스 기반 Projection = Spring Data JPA가 리포지토리 메서드 반환 타입으로 “필드 서브셋 인터페이스”를 직접 채워주는 방식(저장소 계층에서 가볍게 읽기)
  • Projections.constructor(DTO.class, ...) = QueryDSL에서 선택한 식들을 특정 DTO 생성자에 매핑하는 방식(쿼리 DSL 레벨에서 DTO로 즉시 매핑)



상세 비교

1) 사용 위치/도구

인터페이스 기반 Projection:

  • 도구: Spring Data JPA
  • 위치: Repository 메서드 시그니처 반환 타입
    • 예: List<ProductOptionProjection> findOptionsByOptionGroupId(Long id)
  • 효과: 필요한 칼럼만 SELECT. JPA가 결과를 인터페이스의 getter로 매핑하여 즉시 사용 가능


Projections.constructor(...):

  • 도구: QueryDSL
  • 위치: JPQL/QueryDSL 쿼리 작성부
    • 예: queryFactory.select(Projections.constructor(ProductSearchResponse.class, ...))
  • 효과: 쿼리 결과를 지정한 DTO 생성자 호출로 바로 매핑. 여러 테이블 조인/서브쿼리/그룹바이 등 복잡한 선택을 DTO로 변환


2) 주된 목적/시나리오

인터페이스 기반 Projection:

  • 목적: “특정 엔티티/집합에서 필요한 칼럼만 가볍게” 읽기
  • 엔티티 전체, 연관 로딩 없이 값만 빠르게 가져와서 상위 계층에 전달
  • 본문 사례: 옵션/옵션그룹/이미지를 “필요한 칼럼만” 일괄 조회하여 중첩 구조를 조립한 뒤 MongoDB 문서를 업데이트(정합성 + 성능 최적화)


Projections.constructor(...):

  • 목적: 검색 API 같은 복합 조회에서 조인/집계/서브쿼리 결과를 “즉시 DTO”로 만들기
  • 컨트롤러 응답 모델에 그대로 맞추는 데 유리
  • 본문 사례: ProductRepositoryImpl.findProductsByConditions()에서 검색 결과를 ProductSearchResponse로 바로 투영(대표 이미지 서브쿼리, 평균 평점 집계 등 포함)


3) 의존성/결합도

인터페이스 기반 Projection:

  • 저장소 메서드를 통해 값 목록을 가져오고, 서비스/동기화 계층에서 별도 조립
  • 저장소와 상위 계층 간 계약이 인터페이스로 단순


Projections.constructor(...):

  • 쿼리 빌드와 DTO 매핑이 한 지점에 모여 있어 쿼리 변경이 DTO 구조와 직접 연결
  • 컨트롤러/서비스 응답 DTO가 쿼리와 밀접



왜 두 방식을 각각 그 자리에 썼나?


MongoDB 동기화(ProductDocumentModelSyncer):

  • “깊은 하위 구조의 정합성 있는 스냅샷”을 빠르게 만들기 위해 엔티티 전부 대신 필요한 칼럼만 인터페이스 Projection으로 여러 번(그룹/옵션/이미지) 읽어와 메모리에서 중첩 구조로 조립 → MongoDB에 반영
  • 장점: 가벼움(필드 서브셋), 부수효과 없음(JPA 지연 로딩 회피), 테스트 용이


검색 API(ProductRepositoryImpl):

  • 한 번의 복합 쿼리로 화면에 필요한 ProductSearchResponse를 완성하려고 QueryDSL의 Projections.constructor()로 DTO를 즉시 생성
  • 장점: 불필요한 중간 오브젝트 없이 바로 응답 모델 생성, 조인/그룹/서브쿼리 표현 유연



실무 선택 가이드

  • 화면/응답 DTO로 바로 내려보낼 “완성된 결과”가 필요 → QueryDSL Projections.constructor()(또는 fields/bean) 사용
  • 도메인/동기화 로직에서 “필드 서브셋만 여러 번 읽어와 조립” → 인터페이스 Projection 사용
  • 성능 병목이 명확하고 DB 고유 최적화가 필요 → 네이티브 쿼리 고려(측정 기반 판단)



현재 코드베이스 맥락에 적용

  • 검색 API(ProductRepositoryImpl): 한 방에 DTO(ProductSearchResponse) 필요 → Projections.constructor(...) 선택이 적절
  • MongoDB 동기화(ProductDocumentModelSyncer): 깊은 하위 구조를 값만 가볍게 여러 번 읽어 조립 → 인터페이스 Projection(OptionGroupWithProductProjection, ProductOptionProjection, ProductImageProjection)이 적절



한 줄 결론

“바로 응답 DTO 완성”이면 Projections.constructor(), “서비스/동기화 레이어에서 추가 조립/비즈니스 처리”면 인터페이스 Projection, “특수 성능 최적화가 꼭 필요”하면 네이티브 쿼리를 사용합니다.



5. MongoDB 동기화에서 여러 번 읽기를 수행하는 이유

“MongoDB 문서를 최신 상태로 동기화”에서 여러 번 읽는 이유


1) 변경 단위가 서로 다른 테이블에서 발생하기 때문

  • 이벤트는 products, product_option_groups, product_options, product_images 등 다양한 테이블에서 따로 도착
  • 특정 이벤트(예: 옵션 변경)만으로는 “상품 전체 옵션 트리”를 완성하지 못하므로, 해당 시점의 일관된 스냅샷을 만들기 위해 그룹/옵션/이미지를 각각 조회하여 조립


2) 일관성 있는 스냅샷을 만들기 위해

  • 부분 업데이트(바뀐 필드만 패치) 방식은 이전 상태 의존성이 생겨 누락/불일치 위험 존재
  • “해당 상품의 옵션 관련 전체 데이터”를 매번 슬림하게(Projection) 다시 읽어, 정합성 있는 트리로 재구성 → MongoDB 문서에 반영


3) 성능-정합성 트레이드오프의 균형

  • 한 번의 거대한 조인으로 모두 가져오는 대신, 역할별로 가벼운 Projection을 여러 번 호출
  • 각 호출은 필요한 칼럼만 SELECT하여 I/O를 줄임
  • 조인 폭발/중복 로우 처리 부담을 낮추면서도 최종적으로는 완전한 중첩 구조를 획득


4) 캐시/선행 생성이 보장되지 않기 때문

  • 이미지 이벤트가 먼저 오고, 옵션/옵션그룹/상품 문서가 아직 준비되지 않았을 수 있음
  • 이 경우에도 최신 스냅샷을 만들려면 관련 소스들을 다시 확인 필요


5) 변경 전/후 의존성 제거(부수효과 최소화)

  • 인터페이스 Projection으로 “값만” 읽어오면 JPA 지연 로딩/영속성 컨텍스트 부수효과를 회피
  • 다단 조회를 해도 안정적



요약

이벤트가 부분적으로 오고(테이블별), 문서에 반영할 때는 ‘완성된 일관 스냅샷’이 필요하기 때문에 그룹/옵션/이미지를 “필요한 칼럼만” 각각 Projection으로 여러 번 읽어 모아 조립합니다. 이렇게 해야 MongoDB의 비정규화 문서를 항상 최신·정합 상태로 유지할 수 있습니다.