Post

Chapter 05. 프로그램 내에서 코드로 카프카 관리하기

1.1. AdminClient 개요 💡

AdminClient의 주요 기능

  • 토픽 조회, 생성, 삭제
  • 클러스터 상세 정보 확인
  • ACL 및 설정 관리

비동기 처리

  • AdminClient의 메서드는 비동기로 작동하며, 결과를 Future 객체로 반환
  • 반환된 Future 객체를 통해 작업 결과를 확인하거나, 취소 및 대기 가능
  • 작업이 완료되면 후속 작업 실행을 위한 메서드도 제공
  • ex. AdminClient.createTopics 메서드
    • CreateTopicsResult 객체 반환하며, 생성된 각 토픽의 상태를 개별적으로 확인 가능

최종적 일관성(Eventual Consistency)

  • 카프카의 메타데이터는 비동기적으로 전파
  • 새 토픽 생성 시 모든 브로커에 동기화되기까지 시간이 필요

1.2. AdminClient 옵션 ⚙️

메서드별 옵션

  • AdminClient는 작업에 따라 전용 옵션 객체를 제공
    • ex. listTopicsListTopicsOptions , describeClusterDescribeClusterOptions
  • 각 옵션을 통해 작업의 세부 동작을 세밀하게 설정 가능

공통 매개변수 옵션

  • timeoutMs로 클러스터 응답 대기 시간 설정
  • 응답 시간 초과 시 TimeoutException 발생

1.3. AdminClient의 구조적 특징 🏗️

수평적 설계

  • AdminClientKafkaAdminClient 프로토콜로 작업 수행
  • 객체 간 의존 관계나 네임스페이스 없이 필요한 메서드에 직접 접근 가능
  • 복잡한 구조 없이 간결하게 설계됨

사용자 친화성

  • JavaDoc 문서와 IDE 자동완성으로 메서드를 쉽게 탐색할 수 있음

1.4. 추가 참고 📚

작업 유형별 처리 위치

  • 클러스터 변경 작업(create, delete, alter)은 컨트롤러가 담당
  • 조회 작업(list, describe)은 부하가 적은 브로커에서 처리되어 효율성 극대화

주키퍼 수정 금지

  • 카프카는 주키퍼 의존성을 점차 제거하는 방향으로 발전 중
  • 주키퍼를 직접 수정하는 작업은 권장되지 않으며, 예상치 못한 문제를 초래할 수 있음

2. AdminClient 사용법: 생성, 설정, 닫기 🔑

AdminClient 생성 및 닫기

1
2
3
4
5
6
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

AdminClient admin = AdminClient.create(props);

admin.close(Duration.ofSeconds(30));
  • Properties 객체를 통해 AdminClient 생성
  • bootstrap.servers 설정으로 카프카 클러스터와 연결
  • close() 호출 시, 진행 중인 작업이 완료되기를 최대 30초 대기
  • 30초를 초과한 미완료 작업에 대해 예외 발생

DNS 관련 설정

  • client.dns.lookup=resolve_canonical_bootstrap_server_only
    • DNS 별칭을 사용하는 경우, 모든 브로커 이름으로 자동 등록 가능
  • client.dns.lookup=use_all_dns_ips
    • 다수의 IP 주소를 가진 DNS 이름을 활용하여 연결 안정성을 높일 수 있음

응답 대기 시간 설정

  • request.timeout.ms
    • AdminClient가 작업 응답을 기다리는 최대 시간
    • 제한 시간 초과 시 예외 발생
    • 서비스 초기화 중 특정 토픽의 존재 여부를 확인할 때 활용 가능

3. 필수적인 토픽 관리 기능 📊

토픽 조회

1
2
ListTopicsResult topics = admin.listTopics(); // Future 객체 포함
topics.names.get().forEach(System.out::println); // 모든 토픽이름 출력
  • admin.listTopics()ListTopicsResult객체 반환
  • Future 객체를 통해 비동기적으로 결과처리
  • Future.get() 호출 시 토픽 이름 목록을 받을 때까지 대기하며, 타임아웃 시 예외 발생

토픽 생성 및 삭제

  • admin.createTopics , admin.deleteTopics 메서드로 토픽 생성 및 삭제 수행
  • 삭제는 복구가 불가능하므로 신중히 진행

토픽 조건 확인

  • 1개의 파티션으로 데이터 순서 보장
  • 최소 3개의 레플리카로 가용성과 안정성 확보
  • 압착(compaction) 설정으로 오래된 데이터도 유지 가능

비동기 처리로 효율성 향상

  • Future.get()을 사용한 블로킹 방식은 대규모 요청 환경에서 비효율적
  • 대신 비동기 방식을 활용해 서버 효율성을 극대화
    • HTTP 서버는 카프카 응답을 기다리지 않고 다른 요청 처리 가능
    • 카프카 응답 도착 시 클라이언트에 즉시 전달, 처리 속도 최적화

4. 설정 관리 🛠️

ConfigResource 객체 활용

  • 브로커, 로그, 토픽의 설정을 조회하거나 수정 가능
  • CLI 툴(kafka-configs.sh)외에도 애플리케이션 코드로 설정 작업 가능

압착(compaction) 설정

  • 압착 설정은 데이터 일관성과 보존을 위해 중요
  • 주기적으로 압착 설정 여부를 점검하고, 누락 시 자동으로 수정 가능

5. 컨슈머 그룹 관리 👥

  • 카프카는 이전 데이터를 동일한 순서로 재처리할 수 있는 기능 제공
  • 트러블슈팅 및 재해 복구와 같은 상황에서 유용

5.1. 컨슈머 그룹 살펴보기 👀

컨슈머 그룹 목록 조회

1
admin.listConsumerGroups().valid().get().forEach(System.out::println);
  • valid()는 에러 없이 반환된 그룹만 포함
  • 예외는 errors() 메서드로 확인 가능

그룹 상세 정보 확인

1
2
3
4
5
ConsumerGroupDescription groupDescription = admin
    .describeConsumerGroups(CONSUMER_GRP_LIST)
    .describedGroups().get(CONSUMER_GROUP).get();

System.out.println("Description of group " + CONSUMER_GROUP + ":" + groupDescription);
  • ConsumerGroupDescription 객체로 그룹 멤버, 할당된 파티션, 그룹 코디네이터 정보 확인 가능
  • 마지막 커밋된 오프셋지연 상태(lag)를 파악 가능

5.2. 컨슈머 그룹 수정하기 🔄

지원 기능

  • 컨슈머 그룹 삭제
  • 특정 멤버 제외
  • 커밋된 오프셋 삭제 및 변경

오프셋 변경

  • 오프셋 삭제로 컨슈머가 처음부터 데이터를 읽도록 설정 가능
  • auto.offset.reset 값에 따라 처리 시작 지점을 조정 가능

주의사항

  • 활성화된 컨슈머 그룹의 오프셋 변경 불가(ex. UnknownMemberIdException 발생)
  • 상태 저장소를 함께 조정하지 않으면 중복 계산 가능성 존재
  • 오프셋 변경은 컨슈머가 새로운 파티션을 할당받거나 재시작할 때 반영

6. 고급 어드민 작업 🔬

  • 일반적으로 잘 사용되지 않지만, 특정 상황에서는 매우 유용한 작업
  • 특히 사고 복구 중인 SRE에게 중요한 도구로 활용 가능

6.1. 토픽에 파티션 추가하기 ➕

  • 토픽의 파티션 수는 생성 시 결정되며, 일반적으로 변경되지 않음
  • 부하가 파티션의 최대 처리량을 초과하는 경우, 파티션 추가가 필요
1
2
3
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS + 2));
admin.createPartitions(newPartitions).all().get();
  • 데이터 무결성이 깨질 수 있으므로 신중히 처리

6.2. 토픽에서 레코드 삭제하기 🗑️

필요성

  • 개인정보 보호법 등으로 인해 특정 데이터 삭제가 요구되는 경우
  • 지정된 오프셋 이전의 모든 레코드 삭제 가능

사용 방법

  • deleteRecords 메서드로 특정 오프셋 이전의 레코드 삭제
  • 삭제된 레코드의 최대 오프셋 값을 반환해 삭제 완료 여부 확인 가능
1
admin.deleteRecords(recordsToDelete).all().get();

특징

  • 삭제된 레코드는 즉시 접근 불가 상태로 전환
  • 실제 디스크 삭제는 비동기로 처리

6.3. 리더 선출 👑

선호 리더 선출 (Preferred Leader Election)

  • 각 파티션은 기본적으로 선호 리더 레플리카를 가짐
  • 카프카는 선호 리더가 리더 역할을 수행하는지 주기적으로 확인
  • electLeader() 메서드로 선호 리더를 다시 리더로 지정 가능

언클린 리더 선출 (Unclean Leader Election)

  • 리더 레플리카가 모두 사용 불가할 경우, 리더가 없는 상태 발생 가능
  • 데이터 유실을 감수하고 비정상 레플리카를 리더로 지정
  • 이 과정은 비동기로 실행되며, 시간이 소요될 수 있음

6.4. 레플리카 재할당 📦

필요성

  • 특정 브로커 과부하 완화
  • 장비 교체를 위해 레플리카 이동 또는 추가
  • 데이터 균형을 맞추기 위한 재배치

사용 방법

1
admin.alterPartitionReassignments(partitionReassignments).all().get();

주의사항

  • 레플리카 이동은 대량의 데이터 복제를 초래
  • 복제 작업으로 인해 브로커 성능에 영향을 줄 수 있으므로 신중히 진행

7. 카프카 테스트 🧪

MockAdminClient

  • 실제 브로커 없이 AdminClient의 동작을 테스트할 수 있는 도구
  • 카프카가 제공하는 목업 클래스(MockAdminClient)를 활용
This post is licensed under CC BY 4.0 by the author.