08. 정확히 한번의 의미구조

Last updated - 2025년 07월 10일 Edit Source

7장에서 신뢰성 있게 “최소 한 번"의 데이터 전달에 초점을 맞췄다면, 8장에서는 “정확히 한 번"에 초점을 맞춘다. 결과만 보는 입장에서 특정 이벤트가 중복되게 쓰여져 평균값이 잘못 계산된 것을 알아내는 것은 매우 힘들 것이다. 그래서 “정확히 한 번"의 의미 구조(exactly-once semantics)가 필요하다.

  • 멱등적 프로듀서 : 프로듀서 재시도로 인해 발생하는 중복 방지
  • 트랜잭션 의미 구조 : 스트림 처리 애플리케이션에서 “정확히 한 번” 처리 보장

멱등적 프로듀서와 트랜잭션 의미 구조의 조합으로 “정확히 한 번"의 의미구조를 알아보자.



# 멱등적 프로듀서

멱등적(idempotent) : 동일한 작업을 여러 번 실행해도 한 번 실행한 것과 결과가 동일

  • e.g. UPDATE t SET x=x+1 WHERE y=5 → 비멱등
  • e.g. UPDATE t SET w=18 WHERE y=5 → 멱등

카프카 프로듀서에서 멱등성 의미 구조가 필요한 이유

  • 재고처리, 재무재표, 중복 배송 등 여러 예시가 존재

시나리오 예시

  • (1) 파티션 리더가 프로듀서로부터 레코드를 받아서 팔로워들에게 성공적으로 복제
  • (2) 프로듀서에게 응답 보내기 전, 파티션 리더가 있는 브로커에 크래시 발생
  • (3) 프로듀서 입장에서는 응답을 받지 못한 채 타임아웃 발생 → 따라서, 메세지 재전송
  • (4) 재전송된 메세지가 새로운 리더에 도착. 하지만 이 메세지는 이미 저장되어 있음 (중복 발생)

카프카의 멱등적 프로듀서 기능

  • 자동으로 중복 탐지하고 처리하여 문제 해결
  • 프로듀서 자체의 재시도 메커니즘
  • 아래의 경우에 대한 중복 방지 가능 (애플리케이션에서 동일한 메세지 2번 쓰는건 막을 수 ❌)
    • 프로듀서 에러로 인한 중복
    • 네트워크 에러로 인한 중복
    • 브로커 에러로 인한 중복



# 멱등적 프로듀서의 작동 원리

멱등적 프로듀서 기능 ON → 모든 메세지는 고유한 식별자를 가짐

  • 고유한 식별자는 아래 2개를 합친 것
    • 고유한 프로듀서 ID (PID) : 프로듀서가 시작될 때 브로커로부터 할당받는 고유 ID (long 타입)
    • 시퀀스 넘버 (SID) : 특정 토픽-파티션으로 전송되는 각 메시지 배치에 대해 0부터 시작하여 1씩 증가하는 번호 (integer 타입)
  • 브로커는 각 PID토픽-파티션 조합별로 마지막으로 성공 처리한 시퀀스 넘버를 기억
  • 다음 메시지의 시퀀스 넘버가 (마지막 성공 시퀀스 넘버 + 1)일 때만 정상 처리



각 브로커 → 브로커에 할당된 모든 파티션들에 쓰여진 마지막 5개 메세지들을 추적할 때 고유 식별자 사용

  • max.in.flights.requests.per.connection
    • 사전적 의미 : 하나의 커넥션에서 응답(ack)을 받지 않고 연속으로 보낼 수 있는 request의 최대 갯수
    • 인프런 권철민님이 해석한 의미 : 하나의 커넥션에서 응답(ack)을 받지 않은 상태로 동시에 보낼 수 있는 최대 메세지 배치의 개수
    • 파티션 별 추적되어야하는 SID의 수를 제한하고 싶을 때 설정 가능

요청? 메세지 배치? (출처 : 인프런 권철민님 답변)

  • 사전적 kafka 파라미터의 request → 배치단위
    • 초창기 카프카는 배치 단위가 전송 단위였음
  • 성능 개선을 위해 점점 여러 개의 배치들을 한꺼번에 보낼 수 있게 기능이 개선됨
    • 위와 같은 설정값 나옴



프로듀서가 중복 메세지 받은 경우

  • 적절한 에러 발생시킴 → 중복 메세지 거부
    • 해당 에러는 프로듀서에 로깅됨
    • 지푯값에도 반영됨
  • 그러나 예외가 발생하는 것은 아니라 사용자에게 경보를 보내지 않음
    • 프로듀서 클라이언트에서는 record-error-rate 지푯값 확인하여 에러 확인 가능
    • 브로커의 경우 RequestMetrics 타입의 ErrorsPerSec 지푯값에 기록
      • RequestMetrics에는 유형별 에러 수가 기록됨



# 시나리오 예시 01

일시적이고 일반적인 순서가 바뀐 경우

  • 가장 흔하게 Out Of Order Sequence Number 에러를 마주하는 상황
  • 원인
    • 프로듀서의 max.in.flight.requests.per.connection 설정이 1보다 크기 때문 (defaut : 5)
    • 설정이 1보다 크면, 이전 요청의 응답(ack)을 기다리지 않고 다음 메세지 배치를 미리 보냄
  • 상황
    • (1) 프로듀서가 3개의 메세지 배치를 동시에 전송
      • 배치 A : SID = 3
      • 배치 B : SID = 4
      • 배치 C : SID = 5
    • (2) 그런데 일시적인 네트워크 지연 / 재연결 등의 이유로 브로커에 ACB 순서로 도착
    • (3) 브로커가 예상한 SID와 다름
    • (4) 브로커는 SID 3이후 4를 기대하고 있었는데, 5를 받았으므로 배치 C를 거절하며, 프로듀서에게 Out Of Order Sequence Number 에러 발생시킴
    • (5) 그 사이 배치 B가 브로커에 도착하여 정상 처리됨
    • (6) 이후 프로듀서가 재전송한 배치 C를 받아서 정상 처리
  • 결과
    • 멱등적 프로듀서 기능으로 인해 모든 메세지는 유실 없이 순서대로 정확히 한 번 저장됨



트랜잭션 기능 없이 멱등적 프로듀서 기능만 사용하고 있는 경우 Out Of Order Sequence Number 에러 무시 가능

  • 정확히는 트랜잭션 기능을 안쓴다 = 메세지 순서 보장이 핵심이 아니다 같은 상황을 의미
  • 애플리케이션 레벨에서 별도의 복구 로직 수행하거나 프로그램 중단시킬 필요 없다는 의미
  • 프로듀서의 내부 재시도 메커니즘이 이 상황을 자동으로 해결해주기 때문
    • 프로듀서는 해당 에러를 재시도 가능한 (retriable) 에러임을 이미 알고 있음
    • 위의 상황 같은 경우 에러를 받은 메세지 배치 B에 대한 전송을 자동으로 재시도 함
  • 이 에러는 일시적인 순서 뒤바뀜으로 인해 발생하지만, 정확히 한 번만(exactly-once) 전송하는 것을 보장



그러면, 트랜잭션 사용 시 왜 무시하면 안되는데?

  • 트랜잭션은 여러 메시지 전송을 하나의 원자적인(atomic) 단위로 묶으니, 전부 성공하거나 전부 실패해야하는 원자성을 보장하려면 메세지 순서를 절대적으로 지켜야함
  • 트랜잭션의 순서가 깨졌다는 것 → 데이터 정합성이 깨질 수 있는 critical한 문제
  • 이 경우 카프카는 프로듀서를 신뢰할 수 없다고 판단하여 펜싱(Fencing) 시켜버림
    • 결과 : 프로듀서는 ProducerFencedException 같은 복구 불가능한 에러 수신함
    • 조치 : 애플리케이션에서 producer.abortTransaction() 호출로 트랜잭션 중단시키고 해당 프로듀서 인스턴스 종료 후 새로운 인스턴스 생성해야함



만약, 트랜잭션 사용 시 브로커가 이 상황을 무시하고 프로듀서의 재시도를 허용하면 어떻게 되는데?

  • (1) 좀비 프로듀서 출현
    • 네트워크 문제로 잠시 단절되었던 이전 프로듀서(좀비 프로듀서)가 뒤늦게 트랜잭션의 일부 메시지를 커밋하려고 시도
  • (2) 트랜잭션 파기
    • 새로운 프로듀서는 이전 트랜잭션이 실패했다고 판단하고 abort를 보냈는데, 좀비 프로듀서가 commit을 보내면 트랜잭션의 원자성이 깨짐 (일부는 실패, 일부는 성공)
  • 이렇게 정합성 깨지는 것을 막기 위해 펜싱시키는 것
    • “이 프로듀서는 신뢰할 수 없다. 데이터 정합성을 위해 이 프로듀서가 보내는 모든 요청을 차단(Fencing)하고 트랜잭션을 강제 실패시킨다.”



근데, 위 시나리오대로면 멱등적 프로듀서 기능만 켜도 메세지 순서 보장 되는거 아닌가?

  • 단일 프로듀서 세션 내에서, 동일한 토픽-파티션에 대해서만 순서가 보장됨
  • 파티션 간 순서는 보장 ❌
    • 프로듀서가 메시지 A를 1번 파티션에 보내고, 메시지 B를 2번 파티션에 보낼 때, A가 B보다 먼저 브로커에 도착한다고 보장 ❌
  • 프로듀서가 재시작되면 순서가 깨질 수 있음
    • 프로듀서 애플리케이션이 재시작되면 새로운 PID를 할당받기 때문에 이전 세션과의 순서 연속성은 보장 ❌
  • Out Of Order Sequence Number 에러가 발생해도, 하나의 파티션으로 가는 길이 잠시 꼬인 것뿐이므로, 프로듀서가 내부적으로 재시도하여 결국 순서가 보장되는 것은 맞음



# 시나리오 예시 02

심각한 문제로 인한 메세지 유실 (책에서 나온 예시)

  • Out Of Order Sequence Number 에러 발생했는데도 프로듀서가 정상 작동하는 경우
  • 즉, 프로듀서와 브로커 사이에 메세지 유실이 있었다는 의미
  • 원인
    • 단순한 네트워크 지연 ❌
    • 의심 포인트 01 : 언클린 리더 선출 (Unclean Leader Election)
    • 의심 포인트 02 : 심각한 수준의 지속적인 네트워크 문제
    • 의심 포인트 03 : 프로듀서/브로커 비정상 동작 (설정값 확인 필요)
  • 상황
    • 브로커가 2번 메시지 뒤에 갑자기 27번 메시지 받음
    • 시나리오 1처럼 몇 개의 배치가 순서만 바뀐 수준이 아니라, 중간 메시지들이 유실되었을 가능성이 매우 높은 상황
  • 언클린 리더 선출 시나리오
    • (1) 프로듀서가 1번 파티션의 리더 브로커 A에게 메세지 3~26까지 정상적으로 보냄
    • (2) 이 메세지들이 팔로워 브로커 B에 복제되기 전에 리더 브로커 A가 크래시
    • (3) 만약 , unclean.leader.election.enable=true 설정되어있으면, 뒤쳐져 있던 팔로워 브로커 B가 새로운 리더로 선출됨 (데이터 유실 감수하고 가용성 택한 경우니까)
    • (4) 팔로워 브로커 B메세지를 2까지만 가지고 있음
    • (5) 프로듀서는 리더가 바뀐 것을 인지하고 새로운 리더 브로커 B에게 다음 메세지인 27을 보냄
    • (6) 새로운 리더 브로커 B메세지 2 다음인 3을 기대하고 있었는데 메세지 27을 받았으니까 프로듀서에게 out of order sequence number 에러 발생시킴
    • (7) 프로듀서는 이미 브로커 A에게 저장했다고 알고 있으니, 재전송할 메세지 없음 → 메세지 3~26 영구적으로 유실



그러면 어떻게 대응해야할까 ? → 에러 로그에 찍힌 시퀀스 번호의 간격(GAP) 확인하고 대응

  • 간격이 작고 비정기적으로 발생
    • e.g. 기대한 SID는 2인데, 3을 받은 경우
      • 시나리오 예시 01일 가능성 높음
      • max.in.flight.requests.per.connection 설정으로 인한 자연스러운 현상
  • 간격이 매우 크고 지속적으로 발생
    • e.g. 기대한 SID는 3인데, 27을 받은 경우
      • 시나리오 예시 02일 가능성 높음
      • unclean.leader.election.enable : 브로커 설정에서 해당 값이 false인지 확인
        • 가용성보다 데이터 정합성이 중요하면 true가 아닌 false로 해야지
      • acks=all / acks=-1 : 프로듀서가 모든 ISR(인-싱크 레플리카)의 확인을 받고 메세지 전송을 완료하도록 설정하면 데이터 정합성 보장되니까 !!



# 프로듀서 재시작

일반적인 경우 : 프로듀서 장애 발생 → 새 프로듀서 생성하여 대체

  • 사람이 직접 장비 재시작
  • 쿠버네티스의 자동 장애 복구 기능 활용 등

프로듀서가 새로 시작될 때 + 멱등적 프로듀서 기능 ON

  • 초기화 과정에서 카프카 브로커로부터 프로듀서 ID를 생성받음

프로듀서가 새로 시작될 때 + 트랜잭션 기능 OFF

  • 초기화 과정에서 완전히 새로운 ID가 생성됨
  • 새 프로듀서가 전송하는 메세지 = 장애났던 기존 프로듀서가 전송했던 메세지인 경우 브로커는 메세지에 중복이 발생했음을 알아차리지 못함
    • → 두 메세지가 서로 다른 프로듀서 ID, 서로 다른 시퀀스 넘저를 가지니까 서로 다른것으로 취급됨
    • → 서로 다른 ID를 가짐 → 서로 다른 프로듀서로 판단 → 좀비로 취급 ❌ → 메세지 중복 발생 → 근데 브로커는 알아차리지 ❌



# 브로커 장애

일반적인 경우 : 브로커 장애 발생 → 컨트롤러는 장애가 난 브로커가 리더를 맡고 있던 파티션들에 대해 새 리더 선출


시나리오 예시

  • (1) 토픽 A - 파티션 0에 메세지를 쓰는 프로듀서가 있다고 가정
  • (2) 파티션 0의 리더 레플리카는 브로커 5에 위치, 팔로워 레플리카는 브로커 3에 위치
  • (3) 브로커 5에 장애 발생하면 브로커 3이 새로운 리더로 선출됨
  • (4) 프로듀서는 메타데이터 프로토콜을 통해 브로커 3이 새로운 리더임을 알아차리고 거기로 메세지 씀
  • (5) 브로커 3은 어느 SID까지 쓰여졌는지 어떻게 알고 다음 SID가 오기를 기대할까?

새로 선출된 리더 브로커가 SID에 대해 알고 있는 이유

  • (1) 리더는 새 메세지가 쓰여질 때마다 인 메모리 프로듀서 상태에 저장된 최근 5개의 SID 업데이트
  • (2) 팔로워 레플리카는 리더로부터 새로운 메세지 복제할 때마다 자체 인-메모리 버퍼 업데이트
  • (3) 팔로워가 리더로 선출된 시점에는 이미 메모리 안에 최근 SID 5개 가지고 있는 것

예전 리더 돌아오면 어떻게 함? 재시작하면 인-메모리 프로듀서 상태는 메모리니까 다 날라갔잖아

  • 복구 과정에 도움되도록 브로커는 종료되거나, 새로운 세그먼트가 생성될 때마다 프로듀서 상태에 대한 스냅샷을 파일 형태로 저장
  • (1) 브로커 시작 → 파일에서 최신 상태 read
  • (2) 현재 리더로부터 복제한 레코드 사용하여 프로듀서 상태 업데이트 → 최신 상태 복구
  • (3) 해당 브로커가 리더 맡을 때 쯤 최신 SID 가지기 완료

브로커에 크래시나서 최신 스냅샷 업데이트 안되면?

  • PID, SID는 카프카 로그에 저장되는 메세지 형식의 일부
  • 크래시 복구 작업 진행하는 동안 → 프로듀서 상태는 더 오래된 스냅샷, 각 파티션 최신 세그먼트의 메세지들 등등 사용하여 복구됨



# 한계

프로듀서의 내부 로직으로 인한 재시도가 발생할 경우 생기는 중복만 방지

  • e.g. 애플리케이션에서 동일한 메세지로 producer.send() 두 번 호출 → 이런거는 못막음 !!!
  • e.g. 여러 인스턴스에 존재하는 프로듀서들 중 2개가 동일한 메세지 전송 시도 → 이런거 중복 못걸러냄 !!!



# 멱등적 프로듀서 사용법

프로듀서 설정에 enable.idempotence=true 추가

  • kafka 3.0 이후 (default : true)
  • 프로듀서 설정에 acks=all / acks=-1 이미 있으면 성능적으로 별 차이 ❌

멱등적 프로듀서 기능 키면 바뀌는 것들

  • PID 받아오기 위해 프로듀서 시동 과정에서 API 하나 더 호출
  • 전송되는 각각의 레코드 배치에는 PID와 배치 내 첫 메세지의 SID 포함됨
    • PID, SID는 각 메세지 배치에 96 비트 추가 (pid - long, sid - integer)
  • 브로커들은 모든 프로듀서 인스턴스에서 들어온 레코드 배치의 SID 검증하여 메세지 중복 방지
  • 장애 발생해도 각 파티션에 쓰여지는 메세지들의 순서는 보장됨



# 트랜잭션

트랜잭션 기능 : 카프카 스트림즈를 사용해서 개발된 애플리케이션에 정확성을 보장하기 위해 도입됨

  • 스트림 처리 애플리케이션이 정확한 결과 산출 위해, 각 입력 레코드는 정확히 한 번만 처리되어야함
    • 각 입력 레코드의 처리 → 애플리케이션 내부 상태 업데이트되고 결과가 출력 토픽에 성공적으로 쓰여졌을 때에야 완료로 간주
  • 그 처리 결과 역시 장애 상황에서도 정확히 한 번만 반영되어야함

트랜잭션 기능이 추가된 이유?

  • 다수의 파티션에 대한 원자적 쓰기 기능을 제공
  • 스트림 처리 애플리케이션에서 좀비 프로듀서 방지를 위한 목적

스트림 처리 애플리케이션의 기본 패턴 : 읽기 → 처리 → 쓰기

  • 여기에서 쓰이기 위해 트랜잭션 기능 개발됨



# 활용사례

정확성이 중요한 스트림 처리 애플리케이션에서 트랜잭션 기능은 크게 도움됨

  • e.g. 스트림 처리 로직에 집적이나 조인이 포함된 경우

(1) 개별 레코드 변환, 필터 정도만 수행하는 경우

  • 업데이트 할 상태가 없음
  • 처리 과정에서 중복 발생해도 그냥 출력 스트림으로 걸러내면됨 → so eazy

(2) 다수의 레코드 집적 → 하나로 만드는 경우

  • 결과 레코드 잘못되었는지 판단하기 so hard
  • 여러 개의 입력 레코드가 1번 이상 처리되었을 수 있음
  • 주어진 입력 다시 처리하지 않으면 결과 못바꿈

금융 애플리케이션 → “정확히 한 번” 기능이 정확한 집적 결과를 보장하는데 쓰이는, 복잡한 스트림 처리 애플리케이션의 전형적인 예시

  • 카프카 스트림즈 애플리케이션이 정확히 한 번 보장하는거 설정하는거 진짜 쉬움 → 그래서 챗봇 같이 흔한 활용 사례에서도 기능 사용되는거 자주 보임



# 해결하는 문제

e.g. 단순 스트림 처리 애플리케이션

  • (1) 원본 토픽으로부터 이벤트 read
  • (2) 처리
  • (3) 결과를 다른 토픽에 write

“정확히 한 번"만 결과가 write 되기 원하는데, 잘못될 수 있는 시나리오


01. 애플리케이션 크래시로 인한 재처리

  • 원본 클러스터로부터 메세지를 읽어서 처리한 뒤, 애플리케이션이 해야할 일
    • (1) 결과를 출력 토픽에 write
    • (2) 읽어온 메세지의 오프셋을 커밋
  • 만약, (1) 진행 완료하고 (2) 하다가 크래시 났다고 가정 → 컨슈머가 크래시 난 경우
    • 몇 초 이후 하트비트 끊어지면서 리밸런스 발생
    • 컨슈머가 읽어오고 있던 파티션들 → 다른 컨슈머로 재할당
    • 컨슈머는 새로 할당된 파티션의 마지막으로 커밋된 오프셋으로부터 레코드 읽어오기 시작
    • 마지막으로 커밋된 오프셋 ~ 크래시 난 시점까지
      • 애플리케이션에 의해 처리된 모든 레코드들은 다시 처리됨
      • 결과 역시 출력 토픽에 다시 write → 중복 발생

02. 좀비 애플리케이션에 의해 발생되는 재처리

  • 애플리케이션이 카프카로부터 레코드 배치 읽어온 직후, 처리 전에 멈추거나 카프카랑 연결 끊어지면?
    • 하트비트 끊어지면서 애플리케이션 죽은 것으로 간주
    • 해당 컨슈머에 할당되어 있던 파티션들 → 컨슈머 그룹 내 다른 컨슈머들에게 재할당
    • 파티션 재할당받은 컨슈머 → 레코드 배치 다시 읽어서 처리 + 출력 토픽에 결과 write
  • 이때 멈췄던 애플리케이션의 첫 번째 인스턴스 다시 작동 !!
    • 마지막으로 읽어 왔던 레코드 배치 처리 + 출력 토픽에 결과 write
    • 레코드 읽으려고 새로 카프카 폴링
    • 하트비트를 보내는데, 자기가 죽었던 것으로 판정된거 알아차릴 때까지 이 작업 계속함
  • 좀비 : 스스로가 죽은 상태인지 모르는 컨슈머
    • 즉, 추가적인 보장 없으면 좀비가 출력 토픽으로 데이터를 write 할 수 있으니까 데이터 중복 발생



# 트랜잭션의 정확히 한 번

결국 원하는건 부분적인 결과 (오프셋은 커밋 + 토픽에 결과 write 안됨 or 반대 경우) 가 발생하지 않을 거라는 보장이 필요 → 카프카 트랜잭션은 원자적 다수 파티션 쓰기 (atomic multipartition write) 기능 도입


카프카 트랜잭션은 원자적 다수 파티션 쓰기 (atomic multipartition write) 기능

  • 오프셋 커밋 + 결과 write 모두 파티션에 메세지를 쓰는 과정이 따른다는 점에서 착안
    • 오프셋 → _consumer_offsets 토픽
    • 결과 → 출력 토픽
  • 트랜잭션 시작 ~ 양쪽에 메세지 ~ 둘다 성공해서 커밋
  • 트랜잭션 시작 ~ 양쪽에 메세지 ~ 둘다 재시도 위해 중단
  • 이후 “정확히 한 번” 의미 구조가 알아서 해결

원자적 다수 파티션 쓰기 하려면 트랜잭션적 프로듀서 사용 🟢

  • transactional.id 설정 🟢
    • 카프카 브로커에 의해 자동으로 생성되는 producer.id와는 달리 transactional.id는 프로듀서 설정의 일부, 재시작해도 값 유지됨
    • 주 용도가 재시작 이후에도 동일한 프로듀서 식별하는 것
  • initTransactions() 호출해서 초기화해줌
  • 카프카 브로커
    • (1) transactional.id → producer.id 대응관계 유지
    • (2) 이미 있는 transactional.id 프로듀서가 initTransactions() 다시 호출
    • (3) 새로운 랜덤값이 아닌 이전에 쓰던 producere.id 할당해줌

애플리케이션의 좀비 인스턴스가 중복 프로듀서 생성하는 것 방지하려면 좀비 펜싱 (zombie fencing) 하자

  • 에포크(epoch) : 가장 일반적인 좀비 펜싱 방법
    • 트랜잭션 프로듀서가 초기화를 위해 initTransaction() 호출 → transactional.id에 해당하는 에포크 값 증가시킴
    • 같은 transactional.id 가지지만 에포크 값은 낮은 프로듀서 → 메세지 전송, 트랜잭션 커밋, 트랜잭션 중단 요청 → FencedProducer 에러 발생하면서 거부
    • 이렇게 오래된 프로듀서 → 출력 스트림 쓰는 것 불가능 + close() 호출해서 닫는거 말고 방법 ❌
    • 좀비가 중복 레코드 쓰는 것 불가능 !!

apache-kafka-2.5.0 이후부터는 트랜잭션 메타데이터에 컨슈머 그룹 메타데이터 추가할 수 있는 옵션 생김

  • 해당 메타데이터 역시 펜싱에 사용됨
  • 좀비 인스턴스 펜싱하면서 서로 다른 transaction.id 갖는 프로듀서들이 같은 파티션들에 레코드를 쓸 수 있게 됨

트랜잭션은 대부분 프로듀서 쪽 기능

  • 트랜잭션 프로듀서 생성
  • 트랜잭션 시작
  • 다수의 파티션에 레코드 write
  • 이미 처리된 레코드 표시 위해 오프셋 write
  • 트랜잭션 커밋 or 중단
  • 이 모든 작업이 프로듀서에서 이루어짐

트랜잭션 기능으로 write 된 레코드 → 중단된 트랜잭션이라도 다른 레코드들처럼 파티션에 write 됨

  • 컨슈머에 올바른 격리 수준 설정 안하면 “정확히 한 번” 보장 X

isolation.level

  • 트랜잭션 기능을 이용하여 write 된 메세지들 읽어오는 방식 제어 가능
  • isolation.level=read_committed
    • 토픽들을 구독한 뒤 consumer.poll() 호출하면 아래 경우의 메세지만 리턴
      • 커밋된 트랜잭션에 속한 메세지 리턴
      • 처음부터 트랜잭션에 속하지 않는 메세지 리턴
      • (즉, 중단되거나 진행중인 트랜잭션에 속한 메세지는 리턴 ❌)
    • read_committed라고 특정 트랜잭션에 속한 모든 메세지가 리턴된다고 보장되는건 ❌
      • 트랜잭션에 속한 토픽의 일부만 구독했으니까 일부 메세지만 리턴 받을 수 있는 것 🟢
  • isolation.level=read_uncommitted (default)
    • 토픽들을 구독한 뒤 consumer.poll() 호출하면 아래 경우의 메세지 리턴
      • 진행중인 트랜잭션에 속한 메세지
      • 중단된 트랜잭션에 속한 메세지
      • 모든 레코드 리턴
  • 트랜잭션이 언제 시작되고 끝날지, 어느 메세지가 어느 트랜잭션에 속하는지 애플리케이션은 알 수 ❌

  • read_committed 모드
    • 메세지의 읽기 순서 보장 위해 → 아직 진행중인 트랜잭션이 처음으로 시작된 지점(Last Stable Offset, LSO) 이후에 쓰여진 메세지는 리턴 ❌
    • 트랜잭션이 프로듀서에 의해 커밋되거나 중단될 때까지 보류
    • transaction.timeout.ms 설정값 (default : 15분)만큼 시간이 지나서 브로커가 트랜잭션 중단시킬 때까지 보류
    • 트랜잭션이 오랫동안 닫히지 않음 → 컨슈머 지체 → 종단 지연 길어짐

스트림 처리 애플리케이션은 입력 토픽이 트랜잭션 없이 쓰여졌을 경우에도 “정확히 한 번” 출력을 보장

  • 원자적 다수 파티션 쓰기 기능
    • 만약 출력 레코드가 출력 토픽에 커밋되었을 경우, 입력 레코드의 오프셋 역시 해당 컨슈머에 대해 커밋되는 것을 보장
    • 결과적으로 입력 레코드는 다시 처리되지 ❌



# 해결할 수 없는 문제

카프카 트랜잭션 기능과 관련하여 자주 하는 실수 2가지

  • (1) “정확히 한 번 보장"이 카프카에 대한 쓰기 이외의 작동에서도 보장된다고 착각하는 것
  • (2) 컨슈머가 항상 전체 트랜잭션을 읽어 온다고 (트랜잭션 간의 경계에 대해 알고 있다고) 가정하는 것

아래는 트랜잭션 기능이 “정확히 한 번” 보장에 도움되지 않는 예시 5가지



01. 스트림 처리에 있어서의 부수 효과 (side effect)

  • e.g. 스트림 처리 애플리케이션의 처리 단계에 사용자에 이메일을 보내는 작업이 포함되어있음
    • “정확히 한 번” 활성화
    • 그렇다해도 이메일이 한 번만 발송되는건 아님
    • 이 기능은 카프카에 쓰여지는 레코드에만 적용
  • 레코드 중복을 방지하기 위해 시퀀스 넘버를 사용하는 것이나 트랜잭션을 중단 혹은 취소하기 위해 마커를 사용하는 것은 카프카 안에서만 작동하는 것이지, 이메일 발송을 취소시킬 수 있는 것 ❌
  • 스트림 처리 애플리케이션 안에서 외부 효과를 일으키는 어떠한 작업(REST API 호출, 파일 쓰기 등)에도 해당



02. 카프카 토픽에서 읽어서 데이터베이스에 쓰는 경우

  • e.g. 애플리케이션은 카프카가 아닌 외부 데이터베이스에 결과를 write
    • 여기서 프로듀서 사용 ❌
    • 레코드는 JDBC와 같은 데이터베이스 드라이버를 통해 데이터베이스에 write
    • 오프셋은 컨슈머에 의해 카프카에 커밋
    • 하나의 트랜잭션에서 외부 데이터베이스에는 결과를 write하고 카프카에는 오프셋을 commit 할 수 있도록 해주는 메커니즘 같은 거 ❌
  • 대신 오프셋을 데이터베이스에서 저장하도록 할 수는 있음
    • 이러면 하나의 트랜잭션에서 데이터와 오프셋을 동시에 데이터베이스에 커밋 🟢
    • 이 부분은 카프카가 아닌, 데이터베이스의 트랜잭션 보장에 달려있음

아웃박스 패턴 (outbox pattern)

  • MSA에서는 하나의 트랜잭션 안에서 DB update, 카프카 토픽에 메세지 write 하는 경우 종종 있음
    • (둘다 성공 or 둘다 실패)
  • 카프카 트랜잭션은 이러한 작업 ❌
  • 이러한 문제에 대한 일반적인 해법은 아웃박스 패턴(outbox pattern)
    • (1) 마이크로서비스는 “아웃박스"라 불리는 카프카 토픽에 메세지를 write 하는 작업까지만 진행
    • (2) 별도의 메세지 중계 서비스가 카프카로부터 메시지 읽어와서 DB update
  • 카프카가 DB update에 대해 “정확히 한 번” 보장안하니까 업데이트 작업은 반드시 멱등적이어야 함
  • 아웃박스 패턴 쓰면 결과적으로 메세지가 카프카, 토픽 컨슈머, DB 모두에서 사용 가능 🟢 or 어디에도 write ❌
  • 패턴을 반전시켜서 사용할 수도 있음
    • DB 테이블을 아웃박스로 사용하고 중계 서비스가 테이블 update 내역을 카프카에 메세지로 write
    • 이건 고유 키, 외래 키 같이 RDB에 제약조건 적용되어야 할 때 유리

디비지움 프로젝트 블로그의 아웃박스 패턴 심층 분석 글

Notebook LM 요약

심층 브리핑 문서: 마이크로서비스 데이터 교환을 위한 아웃박스 패턴

주요 테마 및 중요 아이디어/사실 요약

이 문서는 “신뢰할 수 있는 마이크로서비스 데이터 교환을 위한 아웃박스 패턴"이라는 출처를 바탕으로 마이크로서비스 아키텍처에서 데이터 일관성 및 안정적인 데이터 교환을 보장하는 데 있어 아웃박스 패턴의 중요성을 심층적으로 다룹니다.

1. 마이크로서비스 간 데이터 교환의 어려움:

  • 동기적 접근 방식의 문제점: 마이크로서비스는 종종 다른 서비스에 데이터 변경 사항을 알려야 합니다. REST, gRPC와 같은 동기적 API 호출은 다음과 같은 문제점을 야기합니다:
  • 강한 결합: “보내는 서비스는 호출할 다른 서비스와 그 위치를 알아야 합니다.”
  • 가용성 문제: “이러한 서비스가 일시적으로 사용 불가능할 경우에 대비해야 합니다.”
  • 재생 불가능성 (Re-playability) 부족: “이러한 동기적 접근 방식의 또 다른 단점은 재생 불가능성이 부족하다는 것입니다. 즉, 이벤트가 전송된 후 새로운 소비자가 도착하여 전체 이벤트 스트림을 처음부터 소비할 수 있는 가능성이 부족합니다.”
  • 이중 쓰기 문제 (Dual Writes): 마이크로서비스가 로컬 데이터 저장소를 업데이트하고 동시에 다른 서비스에 이벤트를 보내야 할 때 발생합니다.
  • “단순히 이 두 가지 요청을 발행하면 잠재적인 불일치가 발생할 수 있습니다.”
  • 데이터베이스와 메시지 브로커(예: Apache Kafka) 간에 단일 공유 트랜잭션을 가질 수 없기 때문에 한쪽만 성공하고 다른 쪽은 실패할 수 있습니다. 예를 들어, “새로운 구매 주문이 로컬 데이터베이스에 저장되었지만 해당 메시지가 Kafka로 전송되지 않은 상황이 발생할 수 있습니다.”

2. 비동기적 데이터 교환과 Apache Kafka의 역할:

  • 위에서 언급된 동기적 접근 방식의 문제점은 비동기적 데이터 교환 방식을 통해 해결될 수 있습니다.
  • 내구성 있는 메시지 로그 (Durable Message Log): “주문, 재고 및 기타 서비스가 Apache Kafka와 같은 내구성 있는 메시지 로그를 통해 이벤트를 전파하는 방식입니다.”
  • 이점:느슨한 결합: 서비스는 특정 소비자를 알 필요 없이 이벤트 스트림에 이벤트를 발행합니다.
  • 높은 가용성: 메시지 브로커가 일시적으로 중단되어도 이벤트가 손실되지 않고 나중에 소비될 수 있습니다.
  • 재생 가능성: “내구성 있는 로그는 또한 재생 가능성을 지원합니다. 즉, 필요에 따라 새로운 소비자를 추가할 수 있어 원래 생각하지 못했던 사용 사례를 가능하게 합니다.” (예: 데이터 웨어하우스 구축, Elasticsearch 기반 전체 텍스트 검색).

3. 아웃박스 패턴의 소개 및 작동 방식:

  • 개념: “이 접근 방식의 아이디어는 서비스의 데이터베이스에 ‘아웃박스’ 테이블을 두는 것입니다.”
  • 핵심 원리: 서비스가 데이터베이스에 데이터를 삽입할 때, 동일한 트랜잭션의 일부로 전송될 이벤트를 나타내는 레코드도 아웃박스 테이블에 삽입합니다.
  • “새로운 구매 주문에 대한 요청을 받을 때, PurchaseOrder 테이블에 INSERT 작업을 수행할 뿐만 아니라, 동일한 트랜잭션의 일부로 전송될 이벤트를 나타내는 레코드도 해당 아웃박스 테이블에 삽입됩니다.”
  • CDC(Change Data Capture) 활용: “로그 기반 변경 데이터 캡처(CDC)는 아웃박스 테이블의 새 항목을 캡처하여 Apache Kafka로 스트림하는 데 매우 적합합니다.”
  • Debezium: “Debezium은 MySQL, Postgres, SQL Server와 같은 여러 데이터베이스를 위한 CDC 커넥터를 제공합니다.”
  • 일관성 보장:“Read your own writes” 보장: “동기적으로 PurchaseOrder 테이블에 쓰면서 소스 서비스는 ‘read your own writes’ 의미론의 이점을 얻습니다.” 즉, 쓰기 작업 직후 쿼리하면 방금 저장된 데이터가 즉시 반영됩니다.
  • 안정적이고 비동기적, 최종적 일관성: “동시에 Apache Kafka를 통해 다른 서비스로 안정적이고 비동기적이며 최종적으로 일관된 데이터 전파를 얻습니다.”
  • 아웃박스 테이블의 구조 (예시): id: 각 메시지의 고유 ID (소비자가 중복 이벤트를 감지하는 데 사용)
  • aggregatetype: 이벤트가 관련된 애그리게이트 루트의 유형 (Kafka 토픽 라우팅에 사용)
  • aggregateid: 이벤트가 영향을 미치는 애그리게이트 루트의 ID (Kafka 메시지 키로 사용)
  • type: 이벤트 유형 (예: “Order Created”)
  • payload: 실제 이벤트 내용을 담은 JSON 구조

4. 아웃박스 패턴 구현 세부 사항 (Debezium 및 Kafka 예시):

  • CDI 이벤트 활용: 애플리케이션 코드에서 이벤트를 발생시키고, EventSender가 해당 이벤트를 감지하여 아웃박스 테이블에 INSERT합니다.
  • INSERT 후 즉시 DELETE: 아웃박스 테이블에 레코드를 persist()한 후 즉시 remove()합니다.
  • “이것은 처음에는 놀라울 수 있습니다. 하지만 로그 기반 CDC가 작동하는 방식을 기억하면 이해가 됩니다. 즉, 데이터베이스의 테이블 실제 내용을 검사하는 것이 아니라 append-only 트랜잭션 로그를 추적합니다.”
  • persist() 및 remove() 호출은 트랜잭션이 커밋되면 로그에 INSERT 및 DELETE 항목을 생성합니다. Debezium은 INSERT에 대해서만 메시지를 Kafka로 보냅니다.
  • 이 방식으로 아웃박스 테이블 자체는 항상 비어 있어 “추가 디스크 공간이 필요하지 않으며 (어느 시점에서 자동으로 폐기될 로그 파일 요소를 제외하고) 무한히 성장하는 것을 막기 위한 별도의 관리 프로세스도 필요하지 않습니다.”
  • Debezium 커넥터 설정: Postgres 커넥터를 사용합니다.
  • table.whitelist: inventory.outboxevent 테이블의 변경 사항만 캡처하도록 설정합니다.
  • tombstones.on.delete: false: 아웃박스 테이블에서 이벤트 레코드가 삭제될 때 삭제 마커(“tombstones”)를 방출하지 않도록 합니다.
  • 단일 메시지 변환 (SMT)을 통한 토픽 라우팅: 기본적으로 Debezium은 모든 변경 이벤트를 단일 토픽으로 보냅니다.
  • EventRouter라는 사용자 정의 SMT를 사용하여 aggregatetype 값에 따라 이벤트를 다른 Kafka 토픽(OrderEvents, CustomerEvents 등)으로 라우팅합니다.
  • aggregateid는 메시지 키로 사용되어 “하나의 애그리게이트 루트 또는 그에 포함된 하위 엔티티와 관련된 모든 이벤트가 해당 Kafka 토픽의 동일한 파티션으로 이동하도록 보장합니다.”
  • op = ’d’ (삭제 이벤트)는 무시됩니다. op = ‘c’ (생성 이벤트)는 Kafka로 전송됩니다.
  • Kafka 메시지 구조: key: aggregateid
  • value: 이벤트 유형 (eventType), 타임스탬프 (ts_ms), 실제 이벤트 페이로드 (payload - JSON 문자열)를 포함하는 구조.
  • headers: 이벤트 UUID (eventId)를 포함하여 소비자가 중복을 효율적으로 감지할 수 있도록 합니다.

5. 소비 서비스에서의 중복 감지:

  • Kafka의 “최소 한 번 (at least once)” 전달 의미론으로 인해 중복 메시지가 발생할 수 있습니다.
  • MessageLog: 소비 서비스는 MessageLog 테이블에 이미 처리된 이벤트의 UUID를 기록합니다.
  • “소비자에서 효율적인 중복 감지 및 제외를 위해 이벤트 UUID를 Kafka 메시지 헤더로 전파하는 것이 가능합니다.”
  • onOrderEvent() 메서드에서 이벤트 UUID를 확인하여 이미 처리된 이벤트는 무시합니다.
  • 트랜잭션 롤백 시 메시지가 처리된 것으로 표시되지 않아 나중에 재시도할 수 있습니다.

6. 아웃박스 패턴의 이점 요약:

  • 단일 리소스 수정: 데이터베이스만 수정하여 “두 가지 리소스(데이터베이스와 Apache Kafka)를 동시에 변경할 때 발생할 수 있는 잠재적인 불일치"를 방지합니다.
  • “Read your own writes” 보장: 소스 서비스에 즉각적인 일관된 사용자 경험을 제공합니다.
  • 비동기 이벤트 전파: 다른 마이크로서비스로 안정적인 데이터 전파를 가능하게 합니다.
  • Kafka의 확장성 및 신뢰성: “Apache Kafka는 서비스 간 메시징을 위한 매우 확장 가능하고 신뢰할 수 있는 백본 역할을 합니다.”
  • 서비스 디커플링: “전반적인 아키텍처의 중심에 Apache Kafka를 두는 것은 관련된 서비스의 디커플링을 보장합니다.”
  • 시스템 복원력: 구성 요소가 실패하거나 사용할 수 없는 경우에도 이벤트는 나중에 처리됩니다.
  • 최종적 일관성 (Eventually Consistent): 이 파이프라인은 최종적으로 일관성을 가지지만, “일반적으로는 괜찮고 애플리케이션의 비즈니스 로직 측면에서 처리될 수 있습니다.”
  • 낮은 지연 시간: 로그 기반 CDC 덕분에 “이벤트 방출이 거의 실시간으로 이루어지기 때문에 전반적인 솔루션의 엔드 투 엔드 지연 시간은 일반적으로 낮습니다(초 또는 심지어 sub-second 범위).”
  • API로서의 이벤트 구조: “아웃박스를 통해 노출되는 이벤트의 구조는 이벤트를 방출하는 서비스의 API의 일부로 간주되어야 합니다.” 생산 서비스 업그레이드 시 소비자에게 영향을 주지 않도록 호환성을 고려해야 합니다.

7. Debezium 소개:

  • “Debezium은 기존 데이터베이스를 이벤트 스트림으로 전환하는 오픈 소스 분산 플랫폼으로, 애플리케이션이 데이터베이스의 각 커밋된 행 수준 변경에 거의 즉시 반응하고 응답할 수 있도록 합니다.”
  • Kafka를 기반으로 구축되었으며 Kafka Connect 호환 커넥터를 제공합니다.
  • “Debezium은 데이터 변경 기록을 Kafka 로그에 기록하므로, 애플리케이션은 언제든지 중지 및 재시작할 수 있으며 실행되지 않는 동안 놓쳤던 모든 이벤트를 쉽게 소비할 수 있어 모든 이벤트가 올바르고 완벽하게 처리되도록 보장합니다.”
  • Apache License, Version 2.0 하의 오픈 소스입니다.

이 브리핑 문서는 마이크로서비스 환경에서 데이터 일관성 및 안정적인 비동기 통신을 달성하기 위한 아웃박스 패턴의 중요성과 Debezium을 활용한 실질적인 구현 방법을 명확히 설명합니다.



03. 데이터베이스에서 읽어서, 카프카에 쓰고, 여기서 다시 다른 데이터베이스에 쓰는 경우

  • 하나의 앱에서 데이터베이스 데이터를 읽고, 트랜잭션을 구분하고, 카프카에 레코드를 쓰고, 여기서 다시 다른 데이터베이스에 레코드를 쓰고, 그 와중에도 원본 데이터베이스의 원래 트랜잭션을 관리할 수 있는 앱
    • 카프카 트랜잭션은 이러한 종류의 종단 보장(end-to-end guarantee)에 필요한 기능을 가지고 있지 ❌
    • 하나의 트랜잭션 안에서 레코드와 오프셋을 함께 커밋하는 문제 외에도 또 다른 문제가 있기 때문
    • 카프카 컨슈머의 read_committed 보장은 DB 트랜잭션을 보존하기엔 너무 약함
      • 컨슈머가 아직 커밋되지 않은 레코드를 볼 수 없는 건 사실이지만, 일부 토픽에서 랙이 발생했을 수도 있는 만큼 이미 커밋된 트랜잭션의 레코드를 모두 봤을 거라는 보장 또한 없기 때문
      • 트랜잭션의 경계를 알 수 있는 방법 역시 없기 때문에 언제 트랜잭션이 시작되었는지, 끝났는지, 레코드 중 어느 정도를 읽었는지도 알 수 없음



04. 한 클러스터에서 다른 클러스터로 데이터 복제

  • 하나의 카프카 클러스터 → 다른 클러스터로 데이터를 복사할 때 “정확히 한 번"을 보장
  • 미러메이커 2.0에 ‘정확히 한 번’ 기능을 추가하는 KIP-656 참조
    • 원본 클러스터의 각 레코드가 대상 클러스터에 정확히 한 번 복사될 것을 보장하는 내용을 담고있음
  • 이것이 트랜잭션의 원자성을 보장하지는 ❌
    • (1) 애플리케이션이 여러 개의 레코드와 오프셋을 트랜잭션적으로 write
    • (2) 미러메이커 2.0이 이 레코드들을 다른 카프카 클러스터에 복사
    • (3) 복사 과정에서 트랜잭션 속성이나 보장 같은 것은 유실
    • 마찬가지로 이 정보들은 카프카의 데이터를 관계형 데이터베이스에 복사할 때도 유실
    • 카프카에서 데이터를 읽어오는 컨슈머 입장에서는 트랜잭션의 모든 데이터를 읽어왔는지 알 수도 없고 보장할 수도 없는 것
      • e.g. 토픽의 일부만 구독했을 경우 전체 트랜잭션의 일부만 복사할 수 있음

Notebook LM : KIP-656 요약

브리핑 문서: KIP-656: MirrorMaker2 Exactly-once Semantics

작성일: 2023년 10월 26일 소스: “KIP-656: MirrorMaker2 Exactly-once Semantics - Apache Kafka - Apache Software Foundation” 발췌본

1. 개요 및 주요 목표

KIP-656은 Apache Kafka의 MirrorMaker2(MM2)에 대한 정확히 한 번(Exactly-once Semantics, EOS) 전달 의미론을 도입하는 것을 목표로 하는 제안서입니다. 현재 MirrorMaker2는 Kafka Connect 프레임워크의 Source Connector/Task를 기반으로 구현되어 있으며, 이들은 기본적으로 EOS를 제공하지 않습니다. 이로 인해 데이터 손실이나 중복이 발생할 수 있는 문제가 있습니다.

본 제안의 핵심 아이디어는 새로운 MirrorSinkTask 구현을 통해 트랜잭션 방식으로 컨슈머 오프셋을 관리함으로써 클러스터 간 메시지 전달에서 EOS를 가능하게 하는 것입니다.

2. 현재 문제점 및 동기

  • 현재 MM2의 EOS 부재: “MirrorMaker2는 현재 Kafka Connect 프레임워크, 특히 Source Connector/Task에 구현되어 있으며, 이들은 기본적으로 정확히 한 번 의미론(EOS)을 제공하지 않습니다.” 이는 기존의 Kafka Connect JDBC 이슈 및 다른 Kafka 관련 KAFKA-6080, KAFKA-3821 등의 노력에서도 논의되었지만, 성공적으로 진행되지 못했습니다.
  • 데이터 손실 또는 중복: EOS가 없으면 데이터 미러링 과정에서 메시지 손실이나 중복이 발생할 수 있습니다.
  • 비동기 및 주기적인 오프셋 커밋: Source Connector/Task의 기본 특성상 오프셋 커밋이 비동기적이고 주기적으로 이루어지며, 이는 미러링 시 컨슈머 오프셋에 대한 EOS를 방해하는 요인입니다.

3. 제안된 해결책: MirrorSinkTask 도입

이 제안은 기존 MirrorSourceTask가 아닌 새로운 MirrorSinkTask를 구현하여 문제를 해결합니다. MirrorSinkTask는 SinkTask를 확장하며, 다음과 같은 두 가지 전달 옵션을 제공합니다:

  1. 정확히 한 번(Exactly-once): “MirrorSinkTask의 트랜잭션 프로듀서에 의해… 컨슈머 오프셋이 트랜잭션 내에서 커밋됩니다.”
  2. 최소 한 번(At-least-once): “MirrorSinkTask의 비트랜잭션 프로듀서에 의해… 컨슈머 오프셋이 WorkerSinkTask의 컨슈머에 의해 별도로 커밋됩니다.”

MirrorSinkTask를 구현하는 주요 이유:

  • EOS 제약 해결: Kafka Connect Source Connector/Task의 본질적인 EOS 비지원 문제를 해결합니다.
  • WorkerSinkTask의 이점 활용: MirrorSinkTask는 WorkerSinkTask에 의해 초기화되며, WorkerSinkTask는 consumer.subscribe()를 사용하여 재조정(rebalance) 및 새로운 토픽/파티션 자동 감지 이점을 투명하게 활용할 수 있습니다.
  • 완전한 제어: 새로운 구현이므로, 프로듀서 생성(트랜잭션 vs 비트랜잭션) 및 다양한 예외 처리(특히 트랜잭션 모드)를 완전히 제어할 수 있습니다.
  • HDFS Sink Connector 참조: HDFS Sink Connector가 이미 EOS를 달성한 사례를 참고하여 MirrorSinkTask를 올바르게 구현할 수 있습니다.

4. 주요 변경 사항 및 기술적 세부사항

4.1. MirrorSinkTask의 핵심 과제와 해결 방안

클러스터 간 EOS를 실현하기 위한 몇 가지 주요 과제가 있으며, 이에 대한 해결책은 다음과 같습니다.

  1. 클러스터 간 트랜잭션 처리:
  • 과제: 카프카 트랜잭션은 기본적으로 클러스터 간에 발생할 수 없습니다. 컨슈머는 소스 클러스터에서 데이터를 풀하고 오프셋을 저장하며, 프로듀서는 이 데이터를 타겟 클러스터로 보냅니다. 클러스터 간 EOS를 위해 어떤 수정이 필요한가?
  • 해결책: “컨슈머 그룹 오프셋은 트랜잭션 프로듀서에 의해 관리되고 커밋되며, 대신 타겟 클러스터에 저장됩니다.”
  • 컨슈머는 여전히 소스 클러스터에 있어야 하지만, “진실의 원천” 오프셋은 타겟 클러스터에 저장됩니다.
  • “가짜” 컨슈머 그룹을 사용하여 타겟 클러스터에 오프셋이 저장됩니다. 이 “가짜” 그룹은 실제 레코드를 소비하지 않고 오프셋만 __consumer_offsets 토픽에 저장합니다.
  • MirrorSinkTask는 Connect의 내부 오프셋 추적이나 소스 클러스터의 __consumer_offsets에 의존하지 않습니다.
  • 컨슈머 오프셋은 트랜잭션에 포함된 프로듀서에 의해서만 타겟 클러스터에 기록됩니다.
  • 모든 레코드는 단일 클러스터에서와 같이 트랜잭션으로 기록됩니다.
  • MirrorSinkTask가 시작되거나 재조정될 때, 타겟 클러스터의 __consumer_offsets에서 마지막 커밋된 오프셋을 로드하여 재개합니다.
  • 결과:
  • 트랜잭션 성공 시: 타겟 클러스터의 __consumer_offsets 토픽은 EOS 프레임워크의 현재 프로토콜에 따라 업데이트됩니다.
  • 트랜잭션 중단 시: 모든 데이터 레코드가 삭제되고, __consumer_offsets 토픽은 업데이트되지 않습니다.
  • MirrorSinkTask 시작/재시작 시: 타겟 클러스터에 저장된 마지막 커밋된 오프셋에서 재개합니다.
  • 주의 사항: 소스 클러스터에 컨슈머 그룹이 이미 존재하고 타겟 클러스터의 “가짜” 컨슈머 그룹 오프셋이 낮은 경우, 데이터 중복을 피하기 위해 소스에서 타겟으로 오프셋을 한 번 동기화하는 오프라인 작업이 필요할 수 있습니다.
  1. 소스 클러스터 오프셋의 역할:
  • 과제: 소스 클러스터의 컨슈머 그룹 오프셋은 “진실의 원천"이 아니며 트랜잭션에 포함되지 않습니다. 여전히 업데이트되어야 하는가? 특정 경우에 필요한가?
  • 해결책: “소스 클러스터의 컨슈머 그룹 오프셋은 WorkerSinkTask의 로직에 의해 주기적으로 독립적으로 업데이트됩니다.” 이는 트랜잭션에 포함되지 않고 주기적으로 커밋되므로 약간 지연될 수 있습니다.
  • 유용성:
  • 소스 클러스터의 업스트림 프로듀스와 MirrorMaker의 소비 간의 복제 지연을 측정하는 데 유용합니다.
  • 적은 수의 중복 데이터로 손실된 “가짜” 컨슈머 그룹을 복원하는 데 사용할 수 있습니다.

4.2. 주요 코드 로직 (의사 코드)

MirrorSinkTask 내에서 트랜잭션 처리를 위한 핵심 로직은 다음과 같습니다:

  • isTransactional: 트랜잭션 프로듀서 사용 여부를 결정하는 구성입니다.
  • loadContextOffsets(): 트랜잭션 모드일 때 타겟 클러스터에서 초기 컨슈머 그룹 오프셋을 로드합니다.
  • initProducer(boolean isTransactional): 트랜잭션 모드일 때 ProducerConfig.ACKS_CONFIG를 “all"로, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG를 “true"로 설정하고 ProducerConfig.TRANSACTIONAL_ID_CONFIG를 유니크하게 생성하여 트랜잭션 프로듀서를 초기화합니다.
  • put(Collection<SinkRecord> records): 컨슈머로부터 메시지를 받아 sendBatch()를 호출하여 처리합니다. 재조정(RebalanceException), 재전송(ResendRecordsException) 및 기타 예외를 처리합니다.
  • sendBatch(Collection</SinkRecord> records, KafkaProducer</byte[], byte[]> producer):
  • beginTransaction(producer): 트랜잭션을 시작합니다. - producer.send(): 레코드를 타겟 클러스터로 전송하고, 오프셋을 offsetsMap에 저장합니다.
  • 예외 발생 시: 전송되지 않은 메시지를 remainingRecordsMap에 추가하여 재전송을 준비합니다.
  • 트랜잭션 모드이고 remainingRecordsMap이 비어있으면 (producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId)를 호출한 후) commitTransaction(producer)을 호출합니다.
  • remainingRecordsMap이 비어있지 않으면 ResendRecordsException을 발생시켜 전체 트랜잭션을 재시도하거나(트랜잭션 모드), 실패한 레코드만 재시도합니다(비트랜잭션 모드).
  • commitRecord(): MirrorSourceTask와 동일한 로직으로 레코드 커밋을 처리하며, 오프셋을 동기화합니다. - beginTransaction(), initTransactions(), commitTransaction(), abortTransaction(): 트랜잭션 라이프사이클을 관리합니다.

4.3. Public Interfaces 및 구성

  • MirrorMakerConfig:
  • connector.type (문자열, 기본값: “source”): “source"이면 기존 MirrorSourceConnector가, “sink"이면 새로운 MirrorSinkConnector가 실행됩니다.
  • MirrorConnectorConfig:
  • transaction.producer (boolean, 기본값: false): True로 설정하면 컨슈머와 프로듀서 간에 EOS가 활성화됩니다.

4.4. MirrorSinkConnector

MirrorSinkConnector는 SinkTask를 생성하며, 기존 MirrorSourceConnector와 유사한 로직을 따릅니다. 코드 중복을 최소화하기 위해 “MirrorCommonConnector"와 같은 새로운 공통 클래스가 제안됩니다.

5. MirrorSourceConnector에서 MirrorSinkConnector/w EOS로의 마이그레이션

이 지침은 잠정적이며 실제 환경에서는 다르게 처리될 수 있습니다.

  1. 기본 동작 유지: connector.type이 기본값 “source"로 설정되어 있으므로, 최신 MM2 배포 시 현재 미러링 동작은 변경되지 않습니다.
  2. 점진적 전환 (AT_LEAST_ONCE): 여러 MM2 인스턴스가 있는 경우, 한 인스턴스의 connector.type을 “sink"로 변경하고 배포합니다. 안정성이 확인되면 다른 인스턴스에도 반복합니다. 이 단계에서는 메시지 전달 의미론이 여전히 “최소 한 번"이지만, 모든 MM2 인스턴스가 MirrorSinkConnector를 사용하게 됩니다.
  3. EOS 활성화 (DOWN_TIME 허용):
  • EOS는 더 많은 리소스를 소비하고 처리량이 낮아질 수 있으므로, EOS로 전환하기 전에 성능 영향도를 벤치마크하고 충분한 용량을 프로비저닝하는 것이 중요합니다.
  • 짧은 다운타임이 허용된다면, 모든 MirrorMaker2 인스턴스를 중지하고 transaction.producer를 “true"로 설정한 후 다시 시작합니다. 이제 MM2는 EOS로 데이터를 미러링해야 합니다.
  1. EOS 활성화 (DOWN_TIME 없음): “비트랜잭션 Kafka 프로듀서에서 트랜잭션 Kafka 프로듀서로 마이그레이션하는 방법"과 유사하게 더 신중하게 수행되어야 하며, 이 KIP의 범위는 넘어섭니다.

6. 향후 계획 및 폐기

  • connector.type 구성으로 MirrorSourceConnector와 MirrorSinkConnector는 가까운 미래에 코드베이스에 공존할 것입니다.
  • 장기적으로 MirrorSinkConnector가 MirrorSourceConnector의 모든 사용 사례를 포괄하고 마이그레이션이 원활하다는 것이 입증되면, 향후 릴리스에서 MirrorSourceConnector의 폐기가 고려될 수 있습니다.

7. 거부된 대안

이전에 제안되었던 https://github.com/apache/kafka/pull/5553, KAFKA-6080, KAFKA-3821과 같은 관련 노력들은 상당한 시간 동안 성공적으로 진행되지 못했습니다. 본 KIP는 이러한 이전 노력의 한계를 극복하는 새로운 접근 방식을 제안합니다.



05. 발행/구독 패턴

  • 일반적인 활용 사례
  • 발행/구독 패턴에 트랜잭션을 사용할 경우 몇 가지 보장되는 것이 있기는 함
    • read_committed 모드가 설정된 컨슈머들은 중단된 트랜잭션에 속한 레코드들을 보지 못할 것
    • 하지만 이러한 보장은 “정확히 한 번"에 미치지 못함
    • 오프셋 커밋 로직에 따라 컨슈머들은 메세지를 한 번 이상 처리할 수 있음
  • 카프카가 보장하는 것
    • JMS 트랜잭션에서 보장하는 것과 비슷 (메세지 송수신 작업을 하나의 논리적인 단위로 묶어 처리하는 기능)
    • 다른건 커밋되지 않은 트랜잭션들이 보이지 않도록 컨슈머들에 read_committed 설정이 되어 있어야 한다는 전제 조건 🟢
    • JMS 브로커들은 모든 컨슈머에게 커밋되지 않은 트랜잭션의 레코드를 주지 않음

주의 !!

  • 메세지를 쓰고 나서 커밋하기 전에 다른 애플리케이션이 응답하기를 기다리는 패턴은 반드시 피하기
  • 다른 애플리케이션은 트랜잭션이 커밋될 때까지 메세지를 받지 못할 것이기 때문에 결과적으로 데드락이 발생



# 트랜잭션 사용법

프로듀서로 주로 쓴다고 했지만, 트랜잭션은 브로커 기능이기도 함


트랜잭션 기능을 사용하는 가장 일반적이고도 권장되는 방법 → 카프카 스트림즈에서 Exactly-once 보장 활성화

  • 이렇게 하면 트랜잭션 기능을 직접적으로 사용할 일 ❌
  • 카프카 스트림즈가 대신 해당 기능을 사용해서 우리가 필요로 하는 보장을 제공 🟢
  • 카프카 스트림즈 애플리케이션에서 “정확히 한 번”’ 보장 기능을 활성화
    • processing.guarantee 설정을 exactly_once이나 exactly_once_beta로 잡아주면 끝



# 트랜잭션 ID와 펜싱 

  • 카프카 2.5 버전부터 트랜잭션 ID와 컨슈머 그룹 메타데이터를 함께 사용하는 펜싱이 도입
  • 서로 다른 애플리케이션 인스턴스에 대해서는 서로 달라야 함
  • e.g. 같은 그룹의 컨슈머 A, B가 있고 결과물을 프로듀서 A,B 를 통해 다른 토픽에 쓴다고 했을때 컨슈머 A가 장애가 발생해도 정상적으로 동작
    • 트랜잭션 ID와 그룹 정보가 함께 사용되므로 트랜잭션의 연속성을 유지하며 트랜잭션을 관리할 수 있다. (컨슈머 A는 펜싱됨)



# 작동 원리

 - 기본 알고리즘은 Chandy-Lamport snapshot 알고리즘을 영향을 받아서 구현  - 알고리즘은 통신 채널을 통해 마커(marker) 라는 제어 메세지를 보냄  - 이 마커 도착 기준으로 상태를 일관되게 기록

  • 다수의 파티션에 대해 트랜잭션이 커밋되었거나 중단되었다는 것을 표시하기 위해 위의 마커 메세지를 사용
    • 트랜잭션 코디네이터가 커밋 메세지를 받으면 트랜잭션과 관련된 모든 파티션에 커밋 마커를 씀
  • 위에서 일부 파티션에만 커밋 메세지가 쓰여진 상태를 처리하기 위해 2PC(Phase Commit)과 트랜잭션 로그를 사용
    • (1) 진행중인 트랜잭션이 존재를 연관된 파티션들과 함께 기록
    • (2) 트랜잭션 로그에 커밋 또는 중단 시도를 기록
    • (3) 모든 파티션에 트랜잭션 마커를 쓴다
    • (4) 트랜잭션이 종료되었음을 로그에 쓴다
  • 이 모든 과정을 처리하기 위해 카프카는 트랜잭션 로그를 __transaction_state 라는 내부 토픽을 사용



위의 알고리즘을 통해 수행되는 과정 1. 프로듀서 -> initTransaction()을 호출해서 자신이 트랜잭션 프로듀서임을 등록 2. initTransaction() : API는 코디네이터에 새 트랜잭션 ID를 등록하거나, 기존 트랜잭션 ID의 에포크 값을 증가시킴
3. beginTransaction() : 프로듀서에 현재 진행중인 트랜잭션이 있음을 알려줌. 4. 프로듀서가 새로운 파티션으로 레코드를 전송할 때 브로커에 AddPartitionsToTxn 요청을 보냄으로써 현재 이 프로듀서에 진행중인 트랜잭션이 있고, 해당 레코드가 트랜잭션의 일부임을 알림. 해당 정보는 트랜잭션 로그에 기록됨 5. 쓰기 작업이 완료되고 커밋할 준비가 되면, 트랜잭션에서 처리한 레코드들의 오프셋부터 커밋 6. sendOffsetsTOTransaction() : 호출하면 트랜잭션 코디네이터로 오프셋과 컨슈머 그룹 ID가 포함된 요청이 전송 7. commitTransaction(), abortTransaction() : 호출 하면 트랜잭션 코디네이터에 EndTxn 요청이 전송 8. 트랜잭션 코디네이터는 트랜잭션 로그에 커밋 혹은 중단 시도를 기록함

  •   transaction.timeout.ms에 설정된 시간 내에 커밋, 중단 둘 다 안되면 코디네이터가 자동으로 트랜잭션을 중단



# 성능

  • 트랜잭션 ID 등록 요청은 한 번만 있음.
  • 파티션 등록은 각 트랜잭션마다 파티션별로 한 번씩만 이루어짐.
  • 트랜잭션 커밋 요청이 전송되면 각 파티션에 커밋 마커가 추가됨.
  • 이 모든 과정은 동기적으로 진행되어 트랜잭션이 완료되거나 실패할 때까지 데이터는 전송되지 않음.
  • 따라서 많은 메시지를 트랜잭션에 포함시킬수록 오버헤드는 줄어들며 전체 처리량이 증가함.
  • 컨슈머는 커밋 마커를 읽어오는 작업에 약간의 오버헤드가 있음.
  • read_committed 모드에서는 아직 커밋되지 않은 트랜잭션의 메시지가 반환되지 않아 종단 지연이 길어질 수 있음.
  • 하지만, 컨슈머는 완료되지 않은 트랜잭션의 메시지를 버퍼링할 필요가 없어 추가적인 작업은 없음.
  • ack나 트랜잭션 로그 기록 역시 성능 저하가 있을 수 있으며, 멱등적 프로듀서는 메세지의 고유성을 추적해야하는 작업이 추가되어 체크가 필요

Comment