Chapter 05. 프로그램 내에서 코드로 카프카 관리하기
1.1. AdminClient 개요 💡
AdminClient의 주요 기능
- 토픽 조회, 생성, 삭제
- 클러스터 상세 정보 확인
- ACL 및 설정 관리
비동기 처리
AdminClient
의 메서드는 비동기로 작동하며, 결과를Future
객체로 반환- 반환된
Future
객체를 통해 작업 결과를 확인하거나, 취소 및 대기 가능 - 작업이 완료되면 후속 작업 실행을 위한 메서드도 제공
- ex.
AdminClient.createTopics
메서드CreateTopicsResult
객체 반환하며, 생성된 각 토픽의 상태를 개별적으로 확인 가능
최종적 일관성(Eventual Consistency)
- 카프카의 메타데이터는 비동기적으로 전파
- 새 토픽 생성 시 모든 브로커에 동기화되기까지 시간이 필요
1.2. AdminClient 옵션 ⚙️
메서드별 옵션
AdminClient
는 작업에 따라 전용 옵션 객체를 제공- ex.
listTopics
→ListTopicsOptions
,describeCluster
→DescribeClusterOptions
- ex.
- 각 옵션을 통해 작업의 세부 동작을 세밀하게 설정 가능
공통 매개변수 옵션
timeoutMs
로 클러스터 응답 대기 시간 설정- 응답 시간 초과 시
TimeoutException
발생
1.3. AdminClient의 구조적 특징 🏗️
수평적 설계
AdminClient
는KafkaAdminClient
프로토콜로 작업 수행- 객체 간 의존 관계나 네임스페이스 없이 필요한 메서드에 직접 접근 가능
- 복잡한 구조 없이 간결하게 설계됨
사용자 친화성
- JavaDoc 문서와 IDE 자동완성으로 메서드를 쉽게 탐색할 수 있음
1.4. 추가 참고 📚
작업 유형별 처리 위치
- 클러스터 변경 작업(
create
,delete
,alter
)은 컨트롤러가 담당 - 조회 작업(
list
,describe
)은 부하가 적은 브로커에서 처리되어 효율성 극대화
주키퍼 수정 금지
- 카프카는 주키퍼 의존성을 점차 제거하는 방향으로 발전 중
- 주키퍼를 직접 수정하는 작업은 권장되지 않으며, 예상치 못한 문제를 초래할 수 있음
2. AdminClient 사용법: 생성, 설정, 닫기 🔑
AdminClient 생성 및 닫기
1
2
3
4
5
6
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
admin.close(Duration.ofSeconds(30));
Properties
객체를 통해AdminClient
생성bootstrap.servers
설정으로 카프카 클러스터와 연결close()
호출 시, 진행 중인 작업이 완료되기를 최대 30초 대기- 30초를 초과한 미완료 작업에 대해 예외 발생
DNS 관련 설정
client.dns.lookup=resolve_canonical_bootstrap_server_only
- DNS 별칭을 사용하는 경우, 모든 브로커 이름으로 자동 등록 가능
client.dns.lookup=use_all_dns_ips
- 다수의 IP 주소를 가진 DNS 이름을 활용하여 연결 안정성을 높일 수 있음
응답 대기 시간 설정
request.timeout.ms
AdminClient
가 작업 응답을 기다리는 최대 시간- 제한 시간 초과 시 예외 발생
- 서비스 초기화 중 특정 토픽의 존재 여부를 확인할 때 활용 가능
3. 필수적인 토픽 관리 기능 📊
토픽 조회
1
2
ListTopicsResult topics = admin.listTopics(); // Future 객체 포함
topics.names.get().forEach(System.out::println); // 모든 토픽이름 출력
admin.listTopics()
는ListTopicsResult
객체 반환Future
객체를 통해 비동기적으로 결과처리Future.get()
호출 시 토픽 이름 목록을 받을 때까지 대기하며, 타임아웃 시 예외 발생
토픽 생성 및 삭제
admin.createTopics
,admin.deleteTopics
메서드로 토픽 생성 및 삭제 수행- 삭제는 복구가 불가능하므로 신중히 진행
토픽 조건 확인
- 1개의 파티션으로 데이터 순서 보장
- 최소 3개의 레플리카로 가용성과 안정성 확보
- 압착(compaction) 설정으로 오래된 데이터도 유지 가능
비동기 처리로 효율성 향상
Future.get()
을 사용한 블로킹 방식은 대규모 요청 환경에서 비효율적- 대신 비동기 방식을 활용해 서버 효율성을 극대화
- HTTP 서버는 카프카 응답을 기다리지 않고 다른 요청 처리 가능
- 카프카 응답 도착 시 클라이언트에 즉시 전달, 처리 속도 최적화
4. 설정 관리 🛠️
ConfigResource 객체 활용
- 브로커, 로그, 토픽의 설정을 조회하거나 수정 가능
- CLI 툴(
kafka-configs.sh
)외에도 애플리케이션 코드로 설정 작업 가능
압착(compaction) 설정
- 압착 설정은 데이터 일관성과 보존을 위해 중요
- 주기적으로 압착 설정 여부를 점검하고, 누락 시 자동으로 수정 가능
5. 컨슈머 그룹 관리 👥
- 카프카는 이전 데이터를 동일한 순서로 재처리할 수 있는 기능 제공
- 트러블슈팅 및 재해 복구와 같은 상황에서 유용
5.1. 컨슈머 그룹 살펴보기 👀
컨슈머 그룹 목록 조회
1
admin.listConsumerGroups().valid().get().forEach(System.out::println);
valid()
는 에러 없이 반환된 그룹만 포함- 예외는
errors()
메서드로 확인 가능
그룹 상세 정보 확인
1
2
3
4
5
ConsumerGroupDescription groupDescription = admin
.describeConsumerGroups(CONSUMER_GRP_LIST)
.describedGroups().get(CONSUMER_GROUP).get();
System.out.println("Description of group " + CONSUMER_GROUP + ":" + groupDescription);
ConsumerGroupDescription
객체로 그룹 멤버, 할당된 파티션, 그룹 코디네이터 정보 확인 가능- 마지막 커밋된 오프셋과 지연 상태(lag)를 파악 가능
5.2. 컨슈머 그룹 수정하기 🔄
지원 기능
- 컨슈머 그룹 삭제
- 특정 멤버 제외
- 커밋된 오프셋 삭제 및 변경
오프셋 변경
- 오프셋 삭제로 컨슈머가 처음부터 데이터를 읽도록 설정 가능
auto.offset.reset
값에 따라 처리 시작 지점을 조정 가능
주의사항
- 활성화된 컨슈머 그룹의 오프셋 변경 불가(ex.
UnknownMemberIdException
발생) - 상태 저장소를 함께 조정하지 않으면 중복 계산 가능성 존재
- 오프셋 변경은 컨슈머가 새로운 파티션을 할당받거나 재시작할 때 반영
6. 고급 어드민 작업 🔬
- 일반적으로 잘 사용되지 않지만, 특정 상황에서는 매우 유용한 작업
- 특히 사고 복구 중인 SRE에게 중요한 도구로 활용 가능
6.1. 토픽에 파티션 추가하기 ➕
- 토픽의 파티션 수는 생성 시 결정되며, 일반적으로 변경되지 않음
- 부하가 파티션의 최대 처리량을 초과하는 경우, 파티션 추가가 필요
1
2
3
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS + 2));
admin.createPartitions(newPartitions).all().get();
- 데이터 무결성이 깨질 수 있으므로 신중히 처리
6.2. 토픽에서 레코드 삭제하기 🗑️
필요성
- 개인정보 보호법 등으로 인해 특정 데이터 삭제가 요구되는 경우
- 지정된 오프셋 이전의 모든 레코드 삭제 가능
사용 방법
deleteRecords
메서드로 특정 오프셋 이전의 레코드 삭제- 삭제된 레코드의 최대 오프셋 값을 반환해 삭제 완료 여부 확인 가능
1
admin.deleteRecords(recordsToDelete).all().get();
특징
- 삭제된 레코드는 즉시 접근 불가 상태로 전환
- 실제 디스크 삭제는 비동기로 처리
6.3. 리더 선출 👑
선호 리더 선출 (Preferred Leader Election)
- 각 파티션은 기본적으로 선호 리더 레플리카를 가짐
- 카프카는 선호 리더가 리더 역할을 수행하는지 주기적으로 확인
electLeader()
메서드로 선호 리더를 다시 리더로 지정 가능
언클린 리더 선출 (Unclean Leader Election)
- 리더 레플리카가 모두 사용 불가할 경우, 리더가 없는 상태 발생 가능
- 데이터 유실을 감수하고 비정상 레플리카를 리더로 지정
- 이 과정은 비동기로 실행되며, 시간이 소요될 수 있음
6.4. 레플리카 재할당 📦
필요성
- 특정 브로커 과부하 완화
- 장비 교체를 위해 레플리카 이동 또는 추가
- 데이터 균형을 맞추기 위한 재배치
사용 방법
1
admin.alterPartitionReassignments(partitionReassignments).all().get();
주의사항
- 레플리카 이동은 대량의 데이터 복제를 초래
- 복제 작업으로 인해 브로커 성능에 영향을 줄 수 있으므로 신중히 진행
7. 카프카 테스트 🧪
MockAdminClient
- 실제 브로커 없이
AdminClient
의 동작을 테스트할 수 있는 도구 - 카프카가 제공하는 목업 클래스(
MockAdminClient
)를 활용
This post is licensed under CC BY 4.0 by the author.