Post

Chapter 04. 카프카 컨슈머

1.1. 컨슈머와 컨슈머 그룹 🚗

컨슈머와 데이터 소비

  • 컨슈머는 카프카 토픽의 데이터를 읽고 처리하는 역할을 함
  • 메시지가 빠르게 쌓이면 처리 속도를 따라가지 못해 지연이 발생할 수 있음
  • 여러 컨슈머를 활용하면 데이터를 병렬로 처리하여 성능을 향상시킬 수 있음

컨슈머 그룹

  • 컨슈머 그룹은 병렬 처리를 위해 컨슈머를 그룹화한 단위
  • 동일 그룹 내 컨슈머는 각기 다른 파티션 데이터를 처리
  • 파티션 수보다 많은 컨슈머는 유휴 상태로 전환됨
1
2
3
4
5
6
7
8
9
10
토픽
- 파티션 0
- 파티션 1
- 파티션 2

컨슈머 그룹
- 컨슈머 0 <-> 파티션 0
- 컨슈머 1 <-> 파티션 1
- 컨슈머 2 <-> 파티션 2
- 컨슈머 3 (유휴 상태)

스케일 아웃과 확장성

  • 컨슈머 그룹에 컨슈머를 추가하면 병렬 처리가 가능해 데이터 처리 속도를 높일 수 있음
  • 컨슈머 수가 파티션 수를 초과하면 초과된 컨슈머는 유휴 상태가 됨
  • 새로운 컨슈머 그룹을 생성하면 독립적으로 토픽의 모든 메시지를 처음부터 소비할 수 있음

1.2. 컨슈머 그룹과 파티션 리밸런스 🏗️

리밸런스란?

  • 리밸런스는 컨슈머 그룹 내 파티션 소유권을 재분배하는 과정
  • 새로운 컨슈머가 추가되거나 기존 컨슈머가 종료/크래시되면 발생
  • 컨슈머는 카프카 브로커의 그룹 코디네이터와 하트비트를 주고받으며 상태를 유지
  • 하트비트가 없으면 세션 타임아웃이 발생하며, 그룹 코디네이터는 해당 컨슈머를 제거하고 리밸런스를 실행함

리밸런스의 역할

  • 컨슈머 그룹의 확장성과 가용성을 제공
  • 작업 중 리밸런스가 발생하면 데이터 처리 중단과 성능 저하를 초래할 수 있음

파티션 할당 전략

조급한 리밸런스 (Eager Rebalance)

  • 모든 컨슈머가 기존 파티션을 해제하고, 작업이 중단된 후 재할당
  • 작업 중단으로 인해 성능에 부정적 영향을 미칠 수 있음

협력적 리밸런스 (Cooperative Rebalance)

  • 기존 작업을 방해하지 않고, 점진적으로 파티션을 다른 컨슈머에 재할당
  • 작업 중단을 방지하며, 효율적으로 파티션을 재분배
  • 카프카 3.1 이상부터 기본값으로 설정됨

1.3. 정적 그룹 멤버십 📌

  • 기본적으로 컨슈머는 일시적 멤버로, 그룹을 떠나면 기존 파티션이 해제됨
  • group.instance.id를 설정하면 정적 그룹 멤버십이 활성화되어 종료 후에도 동일한 파티션을 유지함

특징

  • 종료 후 재참여해도 동일한 파티션을 재할당받아 리밸런스를 방지
  • 동일한 group.instance.id를 가진 컨슈머가 중복 조인하면 에러가 발생
  • 종료 시 컨슈머 그룹을 떠나지 않으며, session.timeout.ms 설정에 따라 멤버십이 해제됨

활용 사례

  • 로컬 상태 또는 캐시를 유지해야 하는 애플리케이션에 적합
  • 불필요한 리밸런스를 줄여 안정적인 파티션 관리를 지원

2. 카프카 컨슈머 생성하기 🖥️

  • KafkaConsumer 인스턴스를 생성하여 컨슈머를 정의
  • 프로듀서와 생성 방식이 유사하지만, group.id 설정이 추가됨

필수 설정

  • bootstrap.servers : 카프카 브로커 주소
  • key.deserializer, value.deserializer : 바이트 데이터를 자바 객체로 변환할 클래스
  • group.id: 컨슈머 그룹 ID. 동일 그룹 내 컨슈머끼리 데이터를 병렬로 처리

추가 설정

  • auto.offset.reset : 초기 메시지 소비 위치
    • earliest: 처음부터 읽음
    • latest: 최신 메시지부터 읽음
    • none: 오프셋이 없으면 에러 발생
  • enable.auto.commit : 오프셋 자동 커밋 여부
    • true: 자동 커밋
    • false: 수동 커밋 필요

예제코드

1
2
3
4
5
6
7
8
9
10
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 브로커 주소
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 키 디시리얼라이저
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 값 디시리얼라이저
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-group"); // 컨슈머 그룹 ID
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 초기 메시지 소비 위치
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 수동 커밋 설정

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

3. 토픽 구독하기 📰

  • subscribe() 메서드로 하나 이상의 토픽을 구독
  • 토픽 목록을 직접 지정하거나, 정규식을 사용해 특정 패턴의 토픽을 구독 가능
  • 정규식을 활용하면 새로운 토픽 추가 시 자동으로 구독에 포함됨
1
2
3
4
5
// 단일 또는 다수의 토픽 구독
consumer.subscribe(Arrays.asList("demo", "test"));

// 정규식으로 특정 패턴의 토픽 구독
consumer.subscribe(Pattern.compile("demo-.*"));

4. 폴링 루프와 쓰레드 안전성 ⛓️

폴링 루프

  • poll() 메서드는 컨슈머가 주기적으로 데이터를 확인하고 처리하는 루프
  • 데이터를 기다리는 동안 설정된 시간만큼 대기하며, 새로운 데이터가 오면 처리함
  • 폴링을 멈추면 컨슈머가 죽은 것으로 간주되어 리밸런스가 발생할 수 있음
  • 첫 번째 poll() 호출 시
    • 그룹 코디네이터와 통신해 컨슈머 그룹에 참여
    • 파티션을 할당받고 리밸런스 처리 수행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("demo"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000)); // 최대 5초 대기
    System.out.println("polling...");

    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
    }

    if (!records.isEmpty()) {
        consumer.commitAsync(); // 비동기 커밋
    }
}

쓰레드 안전성

  • “하나의 스레드당 하나의 컨슈머” 원칙을 준수해야 함
  • 동일 그룹 내 여러 컨슈머를 사용하려면 각각 독립된 스레드에서 실행해야 함
  • 하나의 컨슈머를 여러 스레드에서 공유하면 데이터 충돌 및 오프셋 관리 문제가 발생
  • 효율적 설계 방법
    • 데이터를 가져오는 컨슈머와 데이터를 처리하는 워커 스레드를 분리

5. 컨슈머 설정하기 ⚙️

데이터 처리 관련 설정

  • fetch.min.bytes : 브로커에서 읽어올 최소 데이터 크기 지정
  • fetch.max.wait.ms : 데이터가 쌓일 때까지 대기 시간 지정
  • fetch.max.bytes : 한 번에 가져올 최대 데이터 크기 지정
  • max.poll.records : 한 번에 가져올 최대 메시지 개수
  • max.partition.fetch.bytes : 파티션별 반환할 최대 데이터 크기

타임아웃 및 하트비트 설정

  • session.timeout.ms : 하트비트 없이 컨슈머를 살아있다고 간주할 최대 시간 (기본 10초)
  • heartbeat.interval.ms: 하트비트 전송 간격 (일반적으로 session.timeout.ms의 1/3)
  • max.poll.interval.ms : 폴링 간 최대 대기 시간 (기본 5분)
  • default.api.timeout.ms : 모든 API 호출에 적용되는 기본 타임아웃
  • request.timeout.ms : 브로커 응답 대기 최대 시간 (기본 30초)

오프셋 및 커밋 설정

  • auto.offset.reset : 메시지를 읽기 시작할 위치 지정
    • earliest : 처음부터 읽음
    • latest : 최신 메시지부터 읽음
  • enable.auto.commit : 오프셋 자동 커밋 여부 설정

파티션 할당 전략

  • Range : 컨슈머 그룹 내에서 연속된 파티션을 하나의 컨슈머에 할당하는 전략
  • RoundRobin : 파티션을 컨슈머 그룹 내의 모든 컨슈머에게 고르게 분배하는 전략
  • Sticky : 파티션을 균등하게 분배하며, 리밸런스 시 파티션 이동을 최소화하는 전략
  • Cooperative Sticky : Sticky에 협력적 리밸런스를 추가해 점진적인 파티션 재할당을 지원하는 전략

기타 설정

  • client.id: 로깅/모니터링용 클라이언트 식별 ID
  • client.rack: 레플리카를 가져올 데이터센터/클라우드 영역 지정
  • group.instance.id: 정적 그룹 멤버십 활성화로 리밸런스 최소화
  • receive.buffer.bytes, send.buffer.bytes: TCP 송수신 버퍼 크기 설정 (-1은 OS 기본값 사용)

6. 오프셋과 커밋 📝

오프셋 커밋이란?

  • 오프셋 커밋은 컨슈머가 읽은 메시지의 위치를 저장하는 작업 ✉️
  • 카프카는 메시지를 개별적으로 커밋하지 않고, __consumer_offsets라는 내부 토픽에 저장
  • 커밋되지 않은 오프셋은 동일 메시지의 중복 소비 가능성을 초래

6.1 자동 커밋

  • enable.auto.commit=true 설정 시 컨슈머가 오프셋을 자동으로 커밋
  • 설정된 주기(auto.commit.interval.ms, 기본값 5초)마다 커밋 수행
  • 단점
    • 크래시 발생 시, 처리된 메시지와 커밋된 오프셋 간 차이로 중복 소비 발생 가능

6.2 수동 커밋

  • enable.auto.commit=false 설정 시 수동으로 오프셋 커밋 가능
  • 커밋 방법
    • commitSync(): 동기적 커밋
      • 성공 여부를 확인할 수 있지만, 응답 대기 동안 애플리케이션이 블록됨
    • commitAsync(): 비동기적 커밋
      • 성능은 좋지만 실패 시 재시도하지 않음
  • 단점
    • 동기 커밋은 처리량을 제한할 수 있음
    • 비동기 커밋은 커밋 실패를 감지하기 어려움

6.3 동기적/비동기적 커밋 병행

  • 컨슈머 종료 시 또는 리밸런스 직전에는 commitSync()를 사용해 안정적으로 오프셋을 커밋함
  • 일반 상황에서는 commitAsync()를 사용해 성능을 최적화함
  • 병행 사용으로 데이터 정확성과 성능을 모두 보장

6.4 특정 오프셋 커밋

  • 특정 오프셋 커밋을 위해 파티션과 오프셋 맵 전달

    1
    2
    3
    
      Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
      offsets.put(new TopicPartition("demo-topic", 0), new OffsetAndMetadata(100));
      consumer.commitSync(offsets);
    

7. 리밸런스 리스너 🎧

  • 리밸런스 발생 시 컨슈머가 정리 작업(Cleanup)을 수행하도록 지원하는 메커니즘
  • 할당된 파티션 해제 전에 마지막 처리 오프셋을 커밋해 데이터 손실 및 중복을 방지
  • 파일 핸들, 데이터베이스 연결 등 리소스를 정리하거나 종료하는 작업도 필요

리밸런스 리스너 구현

  • ConsumerRebalanceListener 인터페이스를 구현하여 리밸런스 이벤트를 처리
  • onPartitionsRevoked
    • 파티션 해제 시 호출
    • 마지막 처리된 오프셋 커밋 및 정리 작업 수행
  • onPartitionsAssigned
    • 새로운 파티션 할당 시 호출
    • 초기화 작업 또는 커밋된 오프셋 복구

사용법

  • subscribe() 메서드 호출 시 리스너 전달
1
2
3
4
5
6
7
8
9
10
11
12
consumer.subscribe(Arrays.asList("demo-topic"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Partitions revoked: " + partitions);
        consumer.commitSync(); // 마지막 처리된 오프셋 커밋
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("Partitions assigned: " + partitions);
    }
});

8. 특정 오프셋의 레코드 읽어오기 💿

  • 기본적으로 컨슈머는 마지막 커밋된 오프셋부터 데이터를 읽음
  • 특정 위치에서부터 데이터를 읽고 싶을 경우 seek() 메서드로 시작 위치를 조정

오프셋 조정 메서드

  • seekToBeginning(): 파티션의 처음부터 메시지 읽기
  • seekToEnd(): 파티션의 끝에서부터 새로운 메시지 읽기
  • seek(TopicPartition, offset): 특정 오프셋으로 이동해 메시지 읽기

예제 코드

1
2
3
4
5
6
7
8
9
10
consumer.assign(Arrays.asList(new TopicPartition("demo-topic", 0)));

// 파티션의 처음부터 읽기
consumer.seekToBeginning(Arrays.asList(new TopicPartition("demo-topic", 0)));

// 파티션의 끝에서부터 읽기
consumer.seekToEnd(Arrays.asList(new TopicPartition("demo-topic", 0)));

// 특정 오프셋에서 읽기
consumer.seek(new TopicPartition("demo-topic", 0), 100);

9. 폴링 루프를 벗어나는 방법 ⛓️‍💥

  • 컨슈머를 종료할 때, poll() 대기 상태에서도 즉시 루프를 종료 가능
  • consumer.wakeup()을 호출하면, poll()WakeupException을 발생시켜 중단됨
  • wakeup()다른 스레드에서 호출해야 안전하게 작동

예제 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 다른 스레드에서 wakeup() 호출
Thread shutdownThread = new Thread(() -> {
    consumer.wakeup(); // 종료 요청
});
shutdownThread.start();

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
        }
    }
} catch (WakeupException e) {
    // WakeupException으로 루프 종료
    System.out.println("Consumer is shutting down...");
} finally {
    consumer.close(); // 종료 전 리소스 정리
}

추가 팁

  • JVM 종료 시 안전하게 작업을 마무리하려면 메인 스레드에서 ShutdownHook 사용
1
2
3
4
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    consumer.wakeup(); // 종료 요청
    consumer.close(); // 리소스 정리
}));

10. 디시리얼라이저 📜

디시리얼라이저의 역할

  • 컨슈머는 바이트 배열을 자바 객체로 변환하기 위해 디시리얼라이저를 사용
  • 데이터 형식이 일치해야 시리얼라이저와 디시리얼라이저가 올바르게 작동

종류

  • 카프카 기본 제공 디시리얼라이저
    • StringDeserializer: 바이트 데이터를 문자열로 변환
    • IntegerDeserializer, LongDeserializer : 바이트 데이터를 정수형과 Long 타입으로 변환
  • Avro 디시리얼라이저
    • 표준화된 데이터 변환 도구
    • KafkaAvroDeserializer를 사용해 Avro 스키마 기반 객체 변환

커스텀 디시리얼라이저

  • 특정 객체 변환을 위해 직접 구현 가능
  • 권장하지 않음
    • 구현 복잡도가 높고, 데이터 변환 과정에서 에러 발생 가능성 존재
    • 표준 디시리얼라이저 사용을 우선 고려

리스트 타입 변환

  • List<T>와 같은 중첩 타입도 시리얼라이즈/디시리얼라이즈 가능
  • 주의점
    • 내부 객체와 리스트 구조 모두 설정해야 하므로 복잡함

11. 독립 실행 컨슈머 🏠

  • 독립 실행 컨슈머는 컨슈머 그룹 없이 동작하며, 파티션을 직접 할당받아 데이터를 처리
  • 컨슈머 그룹의 리밸런싱이나 파티션 자동 할당이 불필요한 경우 적합

사용 사례

  • 하나의 컨슈머가 토픽의 모든 파티션 데이터를 처리해야 할 때
  • 특정 파티션에서만 데이터를 읽어야 할 때
  • 리밸런싱이 불필요하거나 비효율적인 상황

특징 및 제한사항

  • 리밸런싱 기능이 제공되지 않으므로, 파티션을 명시적으로 관리해야 함
  • 토픽 구독파티션 직접 할당은 동시에 사용할 수 없음
  • 파티션 정보를 명확히 알고 있어야 함

구현 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 파티션 정보를 가져와 직접 할당
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor("demo-topic");

for (PartitionInfo partition : partitionInfos) {
    partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
consumer.assign(partitions);

// 데이터 폴링 루프
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
    }
}
This post is licensed under CC BY 4.0 by the author.