Chapter 04. 카프카 컨슈머
1.1. 컨슈머와 컨슈머 그룹 🚗
컨슈머와 데이터 소비
- 컨슈머는 카프카 토픽의 데이터를 읽고 처리하는 역할을 함
- 메시지가 빠르게 쌓이면 처리 속도를 따라가지 못해 지연이 발생할 수 있음
- 여러 컨슈머를 활용하면 데이터를 병렬로 처리하여 성능을 향상시킬 수 있음
컨슈머 그룹
- 컨슈머 그룹은 병렬 처리를 위해 컨슈머를 그룹화한 단위
- 동일 그룹 내 컨슈머는 각기 다른 파티션 데이터를 처리
- 파티션 수보다 많은 컨슈머는 유휴 상태로 전환됨
1
2
3
4
5
6
7
8
9
10
토픽
- 파티션 0
- 파티션 1
- 파티션 2
컨슈머 그룹
- 컨슈머 0 <-> 파티션 0
- 컨슈머 1 <-> 파티션 1
- 컨슈머 2 <-> 파티션 2
- 컨슈머 3 (유휴 상태)
스케일 아웃과 확장성
- 컨슈머 그룹에 컨슈머를 추가하면 병렬 처리가 가능해 데이터 처리 속도를 높일 수 있음
- 컨슈머 수가 파티션 수를 초과하면 초과된 컨슈머는 유휴 상태가 됨
- 새로운 컨슈머 그룹을 생성하면 독립적으로 토픽의 모든 메시지를 처음부터 소비할 수 있음
1.2. 컨슈머 그룹과 파티션 리밸런스 🏗️
리밸런스란?
- 리밸런스는 컨슈머 그룹 내 파티션 소유권을 재분배하는 과정
- 새로운 컨슈머가 추가되거나 기존 컨슈머가 종료/크래시되면 발생
- 컨슈머는 카프카 브로커의 그룹 코디네이터와 하트비트를 주고받으며 상태를 유지
- 하트비트가 없으면 세션 타임아웃이 발생하며, 그룹 코디네이터는 해당 컨슈머를 제거하고 리밸런스를 실행함
리밸런스의 역할
- 컨슈머 그룹의 확장성과 가용성을 제공
- 작업 중 리밸런스가 발생하면 데이터 처리 중단과 성능 저하를 초래할 수 있음
파티션 할당 전략
조급한 리밸런스 (Eager Rebalance)
- 모든 컨슈머가 기존 파티션을 해제하고, 작업이 중단된 후 재할당
- 작업 중단으로 인해 성능에 부정적 영향을 미칠 수 있음
협력적 리밸런스 (Cooperative Rebalance)
- 기존 작업을 방해하지 않고, 점진적으로 파티션을 다른 컨슈머에 재할당
- 작업 중단을 방지하며, 효율적으로 파티션을 재분배
- 카프카 3.1 이상부터 기본값으로 설정됨
1.3. 정적 그룹 멤버십 📌
- 기본적으로 컨슈머는 일시적 멤버로, 그룹을 떠나면 기존 파티션이 해제됨
group.instance.id
를 설정하면 정적 그룹 멤버십이 활성화되어 종료 후에도 동일한 파티션을 유지함
특징
- 종료 후 재참여해도 동일한 파티션을 재할당받아 리밸런스를 방지
- 동일한
group.instance.id
를 가진 컨슈머가 중복 조인하면 에러가 발생 - 종료 시 컨슈머 그룹을 떠나지 않으며,
session.timeout.ms
설정에 따라 멤버십이 해제됨
활용 사례
- 로컬 상태 또는 캐시를 유지해야 하는 애플리케이션에 적합
- 불필요한 리밸런스를 줄여 안정적인 파티션 관리를 지원
2. 카프카 컨슈머 생성하기 🖥️
KafkaConsumer
인스턴스를 생성하여 컨슈머를 정의- 프로듀서와 생성 방식이 유사하지만,
group.id
설정이 추가됨
필수 설정
bootstrap.servers
: 카프카 브로커 주소key.deserializer
,value.deserializer
: 바이트 데이터를 자바 객체로 변환할 클래스group.id
: 컨슈머 그룹 ID. 동일 그룹 내 컨슈머끼리 데이터를 병렬로 처리
추가 설정
auto.offset.reset
: 초기 메시지 소비 위치earliest
: 처음부터 읽음latest
: 최신 메시지부터 읽음none
: 오프셋이 없으면 에러 발생
enable.auto.commit
: 오프셋 자동 커밋 여부true
: 자동 커밋false
: 수동 커밋 필요
예제코드
1
2
3
4
5
6
7
8
9
10
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 브로커 주소
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 키 디시리얼라이저
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 값 디시리얼라이저
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-group"); // 컨슈머 그룹 ID
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 초기 메시지 소비 위치
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 수동 커밋 설정
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3. 토픽 구독하기 📰
subscribe()
메서드로 하나 이상의 토픽을 구독- 토픽 목록을 직접 지정하거나, 정규식을 사용해 특정 패턴의 토픽을 구독 가능
- 정규식을 활용하면 새로운 토픽 추가 시 자동으로 구독에 포함됨
1
2
3
4
5
// 단일 또는 다수의 토픽 구독
consumer.subscribe(Arrays.asList("demo", "test"));
// 정규식으로 특정 패턴의 토픽 구독
consumer.subscribe(Pattern.compile("demo-.*"));
4. 폴링 루프와 쓰레드 안전성 ⛓️
폴링 루프
poll()
메서드는 컨슈머가 주기적으로 데이터를 확인하고 처리하는 루프- 데이터를 기다리는 동안 설정된 시간만큼 대기하며, 새로운 데이터가 오면 처리함
- 폴링을 멈추면 컨슈머가 죽은 것으로 간주되어 리밸런스가 발생할 수 있음
- 첫 번째
poll()
호출 시- 그룹 코디네이터와 통신해 컨슈머 그룹에 참여
- 파티션을 할당받고 리밸런스 처리 수행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("demo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000)); // 최대 5초 대기
System.out.println("polling...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
}
if (!records.isEmpty()) {
consumer.commitAsync(); // 비동기 커밋
}
}
쓰레드 안전성
- “하나의 스레드당 하나의 컨슈머” 원칙을 준수해야 함
- 동일 그룹 내 여러 컨슈머를 사용하려면 각각 독립된 스레드에서 실행해야 함
- 하나의 컨슈머를 여러 스레드에서 공유하면 데이터 충돌 및 오프셋 관리 문제가 발생
- 효율적 설계 방법
- 데이터를 가져오는 컨슈머와 데이터를 처리하는 워커 스레드를 분리
5. 컨슈머 설정하기 ⚙️
데이터 처리 관련 설정
fetch.min.bytes
: 브로커에서 읽어올 최소 데이터 크기 지정fetch.max.wait.ms
: 데이터가 쌓일 때까지 대기 시간 지정fetch.max.bytes
: 한 번에 가져올 최대 데이터 크기 지정max.poll.records
: 한 번에 가져올 최대 메시지 개수max.partition.fetch.bytes
: 파티션별 반환할 최대 데이터 크기
타임아웃 및 하트비트 설정
session.timeout.ms
: 하트비트 없이 컨슈머를 살아있다고 간주할 최대 시간 (기본 10초)heartbeat.interval.ms
: 하트비트 전송 간격 (일반적으로session.timeout.ms
의 1/3)max.poll.interval.ms
: 폴링 간 최대 대기 시간 (기본 5분)default.api.timeout.ms
: 모든 API 호출에 적용되는 기본 타임아웃request.timeout.ms
: 브로커 응답 대기 최대 시간 (기본 30초)
오프셋 및 커밋 설정
auto.offset.reset
: 메시지를 읽기 시작할 위치 지정earliest
: 처음부터 읽음latest
: 최신 메시지부터 읽음
enable.auto.commit
: 오프셋 자동 커밋 여부 설정
파티션 할당 전략
Range
: 컨슈머 그룹 내에서 연속된 파티션을 하나의 컨슈머에 할당하는 전략RoundRobin
: 파티션을 컨슈머 그룹 내의 모든 컨슈머에게 고르게 분배하는 전략Sticky
: 파티션을 균등하게 분배하며, 리밸런스 시 파티션 이동을 최소화하는 전략Cooperative Sticky
: Sticky에 협력적 리밸런스를 추가해 점진적인 파티션 재할당을 지원하는 전략
기타 설정
client.id
: 로깅/모니터링용 클라이언트 식별 IDclient.rack
: 레플리카를 가져올 데이터센터/클라우드 영역 지정group.instance.id
: 정적 그룹 멤버십 활성화로 리밸런스 최소화receive.buffer.bytes
,send.buffer.bytes
: TCP 송수신 버퍼 크기 설정 (-1
은 OS 기본값 사용)
6. 오프셋과 커밋 📝
오프셋 커밋이란?
- 오프셋 커밋은 컨슈머가 읽은 메시지의 위치를 저장하는 작업 ✉️
- 카프카는 메시지를 개별적으로 커밋하지 않고,
__consumer_offsets
라는 내부 토픽에 저장 - 커밋되지 않은 오프셋은 동일 메시지의 중복 소비 가능성을 초래
6.1 자동 커밋
enable.auto.commit=true
설정 시 컨슈머가 오프셋을 자동으로 커밋- 설정된 주기(
auto.commit.interval.ms
, 기본값 5초)마다 커밋 수행 - 단점
- 크래시 발생 시, 처리된 메시지와 커밋된 오프셋 간 차이로 중복 소비 발생 가능
6.2 수동 커밋
enable.auto.commit=false
설정 시 수동으로 오프셋 커밋 가능- 커밋 방법
commitSync()
: 동기적 커밋- 성공 여부를 확인할 수 있지만, 응답 대기 동안 애플리케이션이 블록됨
commitAsync()
: 비동기적 커밋- 성능은 좋지만 실패 시 재시도하지 않음
- 단점
- 동기 커밋은 처리량을 제한할 수 있음
- 비동기 커밋은 커밋 실패를 감지하기 어려움
6.3 동기적/비동기적 커밋 병행
- 컨슈머 종료 시 또는 리밸런스 직전에는
commitSync()
를 사용해 안정적으로 오프셋을 커밋함 - 일반 상황에서는
commitAsync()
를 사용해 성능을 최적화함 - 병행 사용으로 데이터 정확성과 성능을 모두 보장
6.4 특정 오프셋 커밋
특정 오프셋 커밋을 위해 파티션과 오프셋 맵 전달
1 2 3
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("demo-topic", 0), new OffsetAndMetadata(100)); consumer.commitSync(offsets);
7. 리밸런스 리스너 🎧
- 리밸런스 발생 시 컨슈머가 정리 작업(Cleanup)을 수행하도록 지원하는 메커니즘
- 할당된 파티션 해제 전에 마지막 처리 오프셋을 커밋해 데이터 손실 및 중복을 방지
- 파일 핸들, 데이터베이스 연결 등 리소스를 정리하거나 종료하는 작업도 필요
리밸런스 리스너 구현
ConsumerRebalanceListener
인터페이스를 구현하여 리밸런스 이벤트를 처리onPartitionsRevoked
- 파티션 해제 시 호출
- 마지막 처리된 오프셋 커밋 및 정리 작업 수행
onPartitionsAssigned
- 새로운 파티션 할당 시 호출
- 초기화 작업 또는 커밋된 오프셋 복구
사용법
subscribe()
메서드 호출 시 리스너 전달
1
2
3
4
5
6
7
8
9
10
11
12
consumer.subscribe(Arrays.asList("demo-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
consumer.commitSync(); // 마지막 처리된 오프셋 커밋
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
});
8. 특정 오프셋의 레코드 읽어오기 💿
- 기본적으로 컨슈머는 마지막 커밋된 오프셋부터 데이터를 읽음
- 특정 위치에서부터 데이터를 읽고 싶을 경우
seek()
메서드로 시작 위치를 조정
오프셋 조정 메서드
seekToBeginning()
: 파티션의 처음부터 메시지 읽기seekToEnd()
: 파티션의 끝에서부터 새로운 메시지 읽기seek(TopicPartition, offset)
: 특정 오프셋으로 이동해 메시지 읽기
예제 코드
1
2
3
4
5
6
7
8
9
10
consumer.assign(Arrays.asList(new TopicPartition("demo-topic", 0)));
// 파티션의 처음부터 읽기
consumer.seekToBeginning(Arrays.asList(new TopicPartition("demo-topic", 0)));
// 파티션의 끝에서부터 읽기
consumer.seekToEnd(Arrays.asList(new TopicPartition("demo-topic", 0)));
// 특정 오프셋에서 읽기
consumer.seek(new TopicPartition("demo-topic", 0), 100);
9. 폴링 루프를 벗어나는 방법 ⛓️💥
- 컨슈머를 종료할 때,
poll()
대기 상태에서도 즉시 루프를 종료 가능 consumer.wakeup()
을 호출하면,poll()
이WakeupException
을 발생시켜 중단됨wakeup()
은 다른 스레드에서 호출해야 안전하게 작동
예제 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 다른 스레드에서 wakeup() 호출
Thread shutdownThread = new Thread(() -> {
consumer.wakeup(); // 종료 요청
});
shutdownThread.start();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
}
}
} catch (WakeupException e) {
// WakeupException으로 루프 종료
System.out.println("Consumer is shutting down...");
} finally {
consumer.close(); // 종료 전 리소스 정리
}
추가 팁
- JVM 종료 시 안전하게 작업을 마무리하려면 메인 스레드에서 ShutdownHook 사용
1
2
3
4
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup(); // 종료 요청
consumer.close(); // 리소스 정리
}));
10. 디시리얼라이저 📜
디시리얼라이저의 역할
- 컨슈머는 바이트 배열을 자바 객체로 변환하기 위해 디시리얼라이저를 사용
- 데이터 형식이 일치해야 시리얼라이저와 디시리얼라이저가 올바르게 작동
종류
- 카프카 기본 제공 디시리얼라이저
StringDeserializer
: 바이트 데이터를 문자열로 변환IntegerDeserializer
,LongDeserializer
: 바이트 데이터를 정수형과 Long 타입으로 변환
- Avro 디시리얼라이저
- 표준화된 데이터 변환 도구
KafkaAvroDeserializer
를 사용해 Avro 스키마 기반 객체 변환
커스텀 디시리얼라이저
- 특정 객체 변환을 위해 직접 구현 가능
- 권장하지 않음
- 구현 복잡도가 높고, 데이터 변환 과정에서 에러 발생 가능성 존재
- 표준 디시리얼라이저 사용을 우선 고려
리스트 타입 변환
List<T>
와 같은 중첩 타입도 시리얼라이즈/디시리얼라이즈 가능- 주의점
- 내부 객체와 리스트 구조 모두 설정해야 하므로 복잡함
11. 독립 실행 컨슈머 🏠
- 독립 실행 컨슈머는 컨슈머 그룹 없이 동작하며, 파티션을 직접 할당받아 데이터를 처리
- 컨슈머 그룹의 리밸런싱이나 파티션 자동 할당이 불필요한 경우 적합
사용 사례
- 하나의 컨슈머가 토픽의 모든 파티션 데이터를 처리해야 할 때
- 특정 파티션에서만 데이터를 읽어야 할 때
- 리밸런싱이 불필요하거나 비효율적인 상황
특징 및 제한사항
- 리밸런싱 기능이 제공되지 않으므로, 파티션을 명시적으로 관리해야 함
- 토픽 구독과 파티션 직접 할당은 동시에 사용할 수 없음
- 파티션 정보를 명확히 알고 있어야 함
구현 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 파티션 정보를 가져와 직접 할당
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor("demo-topic");
for (PartitionInfo partition : partitionInfos) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
consumer.assign(partitions);
// 데이터 폴링 루프
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
}
}
This post is licensed under CC BY 4.0 by the author.