Post

Chapter 07. 신뢰성 있는 데이터 전달

(장애상황 1) 클라이언트가 브로커 중 하나와 연결이 끊어짐 ⚡

1. 시나리오 환경설정 🛠️

  • 카프카 브로커1(kafka-docker-kafka-1) : localhost:9092
  • 카프카 브로커2(kafka-docker-kafka2-1) : localhost:9093
  • 프로듀서: 클러스터에 메시지를 전송
  • 컨슈머: 클러스터에서 메시지를 수신

2. 시나리오1: 정상적인 메시지 전송 및 수신 📨

  1. 프로듀서가 localhost:9092 또는 localhost:9093 브로커로 연결하여 메시지 전송
  2. 메시지는 카프카 브로커에서 처리되고, 컨슈머는 해당 메시지를 다른 브로커에서 정상적으로 수신함
  3. 이때, 카프카 클러스터는 정상적으로 동작하며, 메시지는 클러스터 내 모든 브로커에서 적절하게 처리

3. 시나리오2: 카프카 브로커1(kafka-docker-kafka-1) 장애 발생 🚫

  1. localhost:9092에서 실행 중인 kafka-docker-kafka-1 컨테이너가 장애가 발생하거나 종료됨
  2. 프로듀서와 컨슈머는 여전히 localhost:9093 브로커에 연결하여 메시지를 주고받을 수 있음
  3. 메타데이터 갱신이 이루어져 클라이언트는 새로운 브로커 정보를 인식하고, 장애가 발생한 브로커를 제외한 상태에서 정상적으로 작업을 계속할 수 있음
  4. 카프카 클러스터는 장애가 발생한 브로커의 파티션 리더를 localhost:9093에 있는 브로커로 전환함
  5. 이로써 클러스터는 장애 발생 후에도 정상적으로 메시지를 처리할 수 있음

4. 시나리오 3: 카프카 브로커 1 복구 🔄

  1. localhost:9092에서 실행 중인 kafka-docker-kafka-1 컨테이너가 복구되거나 다시 시작됨
  2. 클라이언트는 localhost:9092 브로커를 다시 사용할 수 있게 되며, 메시지 전송과 수신이 정상적으로 이루어짐
  3. 복구된 브로커는 카프카 클러스터의 일부로 자동으로 다시 참여함

5. 시나리오를 위한 환경 구성 ⚙️

1. 데이터 초기화

1
docker-compose down -v

2. docker-compose.yml 파일

1
vi docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
services:
  kafka:
    image: docker.io/bitnami/kafka:3.9
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093,1@kafka2:9093
      - KAFKA_KRAFT_CLUSTER_ID=my-cluster-id  # 동일한 클러스터 ID 설정
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

  kafka2:
    image: docker.io/bitnami/kafka:3.9
    ports:
      - "9093:9092"
    volumes:
      - "kafka_data2:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=controller,broker  # kafka2도 controller와 broker 역할을 수행
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093,1@kafka2:9093
      - KAFKA_KRAFT_CLUSTER_ID=my-cluster-id  # 동일한 클러스터 ID 설정
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

volumes:
  kafka_data:
    driver: local
  kafka_data2:
    driver: local

3. 컨테이너 재시작

1
docker-compose up -d

docker ps 결과

1
2
3
4
[shin@gram88 kafka-docker]$ docker ps
CONTAINER ID   IMAGE               COMMAND                  CREATED       STATUS         PORTS                                         NAMES
6556e25e5562   bitnami/kafka:3.9   "/opt/bitnami/script…"   2 hours ago   Up 2 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp     kafka-docker-kafka-1
2482d4a57dba   bitnami/kafka:3.9   "/opt/bitnami/script…"   2 hours ago   Up 2 hours     0.0.0.0:9093->9092/tcp, [::]:9093->9092/tcp   kafka-docker-kafka2-1

4. 카프카 클라이언트 프로젝트 구성하기

KafkaConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");  // Kafka 브로커 주소
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return configProps;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
    }

    @Bean
    public Map<String, Object> consumerConfig() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");  // Kafka 브로커 주소
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return configProps;
    }

    @Bean
    public KafkaConsumer<String, String> consumer() {
        return new KafkaConsumer<>(consumerConfig());
    }
}

KafkaConsumer

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my_topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

KafkaProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;

        // ProducerListener 설정
        kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {

            @Override
            public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
                // 전송 성공 시, RecordMetadata를 통해 전송된 데이터의 메타 정보를 확인
                String topic = recordMetadata.topic();
                int partition = recordMetadata.partition();
                long offset = recordMetadata.offset();
                System.out.println("Message successfully sent to topic: " + topic + ", partition: " + partition + ", offset: " + offset);
            }

            @Override
            public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
                // 전송 실패 시, 예외 메시지 출력
                System.out.println("Failed to send message to topic: " + producerRecord.topic() + ". Exception: " + exception.getMessage());
                System.out.println("Failed message: " + producerRecord.value()); // 실패한 메시지 출력
            }
        });
    }

    public void sendMessage(String message) {
        long startTime = System.currentTimeMillis();
        CompletableFuture<SendResult<String, String>> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 메시지 전송
                return kafkaTemplate.send("my_topic", "key", message).get();  // blocking
            } catch (InterruptedException | ExecutionException e) {
                // 실패 시 예외 처리
                throw new RuntimeException("Failed to send message", e);
            }
        });

        future.whenComplete((result, ex) -> {
            long endTime = System.currentTimeMillis();
            if (ex == null) {
                System.out.println("Message sent successfully in " + (endTime - startTime) + " ms");
            } else {
                // 지연이나 연결 실패 시 예외 처리
                System.out.println("Failed to send message in " + (endTime - startTime) + " ms: " + ex.getMessage());
            }
        });
    }
}

KafkaController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import com.example.errorhandling.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    private final KafkaProducer kafkaProducer;

    @Autowired
    public KafkaController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @GetMapping("/sendMessage")
    public String sendMessage() {
        kafkaProducer.sendMessage("Hello, Kafka!"); // Kafka로 메시지 전송
        return "Message sent to Kafka!";
    }
}

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9093  # 다중 브로커 설정
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      session.timeout.ms: 1000  # 세션 타임아웃 1초 설정
      heartbeat.interval.ms: 500  # 하트비트 간격 500ms 설정
      max.poll.interval.ms: 1000  # 폴링 대기 시간 1초
      metadata-max-age-ms: 30000  # 30초마다 메타데이터 갱신
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
      request.timeout.ms: 1500  # 요청 타임아웃 1.5초 설정
      max.block.ms: 1500  # 최대 대기 시간 1.5초 설정
      reconnect.backoff.ms: 500
      reconnect.backoff.max.ms: 2000
      retry.backoff.ms: 500
      metadata-max-age-ms: 30000  # 30초마다 메타데이터 갱신

6. 시나리오1 실행 - 정상적인 메시지 전송 및 수신 📩

Postman을 활용해 /sendMessage 엔드포인트를 호출

postman

메시지 정상 처리

result1

7. 시나리오2 실행 - 카프카 브로커1 장애 발생 💥

1
[shin@gram88 kafka-docker]$ docker stop kafka-docker-kafka-1

예상결과

  • 프로듀서, 컨슈머는 여전히 localhost:9093 브로커에 연결되어 메시지를 주고받을 수 있지 않을까?
  • 카프카 클러스터 내에서, localhost:9093 브로커가 새로운 파티션 리더 역할을 수행하지 않을까?

기대와 다른 결과

  • localhost:9092가 장애를 일으키면서 클라이언트가 브로커와 연결할 수 없게 되었음
  • localhost:9093 브로커로의 자동 전환이 이루어지지 않음

result2

기대와 다른 결과가 발생한 원인 분석 (추측 🤔)

  • 메타데이터 갱신 지연
    • 클라이언트가 장애 브로커를 감지한 후 다른 브로커로 전환하려면 최신 메타데이터가 필요함
    • 장애로 인해 메타데이터 갱신 요청이 처리되지 않았을 수도 있음
  • 파티션 리더 전환 실패
    • localhost:9093이 새로운 파티션 리더로 전환되지 않았을 가능성이 있음
    • 클라이언트가 새로운 리더 정보를 받지 못해 메시지 전송에 실패했을 수 있음
      • 이는 클러스터 내부의 리더 선출 지연 또는 클라이언트 메타데이터 갱신 실패 때문일 가능성 있음

8. 시나리오3 실행 - 카프카 브로커1 복구 🔧

1
[shin@gram88 kafka-docker]$ docker start kafka-docker-kafka-1

Postman을 활용해 /sendMessage 엔드포인트 재호출

postman

메시지 전송과 수신이 다시 정상적으로 이루어짐

result3

(장애상황 2) 클라이언트와 브로커 사이의 긴 지연 ⏳

시나리오: 네트워크 지연발생 (30초) 🌐

1. docker-compose.yml 수정 📝

docker-compose.yml 파일에서 cap_add 항목을 추가하여 NET_ADMIN 권한을 부여함

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
services:
  kafka:
    image: docker.io/bitnami/kafka:3.9
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093,1@kafka2:9093
      - KAFKA_KRAFT_CLUSTER_ID=my-cluster-id  # 동일한 클러스터 ID 설정
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    cap_add:
      - NET_ADMIN

  kafka2:
    image: docker.io/bitnami/kafka:3.9
    ports:
      - "9093:9092"
    volumes:
      - "kafka_data2:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=controller,broker  # kafka2도 controller와 broker 역할을 수행
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093,1@kafka2:9093
      - KAFKA_KRAFT_CLUSTER_ID=my-cluster-id  # 동일한 클러스터 ID 설정
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    cap_add:
      - NET_ADMIN

volumes:
  kafka_data:
    driver: local
  kafka_data2:
    driver: local

2. docker-compose 재시작 🚀

파일을 수정한 후, docker-compose를 재시작하여 권한을 부여한 후 컨테이너를 실행함

1
2
docker-compose down
docker-compose up -d

3. 컨테이너에서 tc 명령어 실행 🖥️

Docker를 사용할 때 -user=root 옵션을 사용하여 root 권한으로 실행할 수 있음

1
docker exec -it --user=root kafka-docker-kafka-1 bash

네트워크 지연 추가: tc 명령어를 사용하여 네트워크 지연을 추가함

1
2
root@6556e25e5562:/# apt-get update && apt-get install -y iproute2
root@6556e25e5562:/# tc qdisc add dev eth0 root netem delay 30000ms

4. KafkaProducer에서 지연 확인 🕒

Postman을 활용해 /sendMessage 엔드포인트를 호출하여 지연 발생 여부를 확인함

postman

result4

endTime - startTime이 30초 이상으로 나오고, 메시지가 정상적으로 보내지지 않거나 지연됨

1
Message sent successfully in 30007 ms: Hello, Kafka!

이와 동시에, Kafka 클라이언트에서 다음과 같은 로그 발생

1
Disconnecting from node 0 due to request timeout

카프카 클라이언트는 설정된 시간 안에 브로커로부터 응답을 받지 못하면 연결을 끊음

5. 원인 🔍

  1. 타임아웃 설정
    • 카프카 클라이언트는 설정된 시간 내에 응답을 받지 못하면 연결을 끊고 DisconnectException이 발생함
    • ex. request.timeout.ms가 30초라면, 30초 내에 응답이 없으면 타임아웃이 발생함
  2. 네트워크 지연
    • 네트워크 지연이나 브로커 응답 지연이 클 경우 타임아웃이 발생함
  3. 브로커 응답 불가
    • 브로커가 다운되거나 네트워크 문제로 응답이 없으면 DisconnectException이 발생함

6. 해결 방안 💡

  1. 타임아웃 값 조정
    • 타임아웃 값을 늘려서 일시적인 지연을 허용할 수 있음
    • 예: request.timeout.ms 값을 늘리기
  2. 네트워크 안정성 확보
    • 네트워크 지연을 최소화하고 안정적인 연결을 유지해야 함
  3. 자동 재연결 설정
    • retries, reconnect.backoff.ms 등을 설정하여 클라이언트가 자동으로 재연결을 시도하게 할 수 있음

7. 타임아웃 값 조정 (해결 방안 1번) ⏲️

request.timeout.ms 60초로 늘리고 /sendMessage 엔드포인트 재호출

1
request.timeout.ms: 600000

예상결과

  • 네트워크 지연이 30초라고 가정하고 브로커의 응답 지연 임계값을 60초로 설정했음
  • 타임아웃 오류 발생안하지 않을까..?

기대와 다른 결과

  • 타임아웃 오류 발생
1
2
3
4
5
org.apache.kafka.common.errors.DisconnectException: null

Failed to send message to topic: my_topic. Exception: Topic my_topic not present in metadata after 60000 ms.
Failed message: Hello, Kafka!
Failed to send message in 60026 ms: org.springframework.kafka.KafkaException: Send failed

기대와 다른 결과가 발생한 원인 분석 (추측 🤔)

  • 메타데이터 갱신 실패
    • Topic my_topic not present in metadata after 60000 ms
      • 클라이언트가 브로커로부터 최신 메타데이터를 가져오지 못한 상태를 나타냄
      • 이는 브로커와의 초기 연결 문제로 발생할 수 있음
  • 타임아웃 값의 한계
    • 타임아웃 값을 늘려도 브로커 연결이나 메타데이터 갱신 자체가 실패하면 문제 해결 불가

8. 네트워크 연결 안정성 확보 (해결 방안 2번) ✅

지연 제거

1
root@6556e25e5562:/# tc qdisc del dev eth0 root netem

postman을 활용해 /sendMessage 엔드포인트 호출

postman

Received message: Hello, Kafka! 응답 획득

result5

This post is licensed under CC BY 4.0 by the author.