Chapter 07. 신뢰성 있는 데이터 전달
(장애상황 1) 클라이언트가 브로커 중 하나와 연결이 끊어짐 ⚡
1. 시나리오 환경설정 🛠️
- 카프카 브로커1(
kafka-docker-kafka-1
) :localhost:9092
- 카프카 브로커2(
kafka-docker-kafka2-1
) :localhost:9093
- 프로듀서: 클러스터에 메시지를 전송
- 컨슈머: 클러스터에서 메시지를 수신
2. 시나리오1: 정상적인 메시지 전송 및 수신 📨
- 프로듀서가
localhost:9092
또는localhost:9093
브로커로 연결하여 메시지 전송 - 메시지는 카프카 브로커에서 처리되고, 컨슈머는 해당 메시지를 다른 브로커에서 정상적으로 수신함
- 이때, 카프카 클러스터는 정상적으로 동작하며, 메시지는 클러스터 내 모든 브로커에서 적절하게 처리
3. 시나리오2: 카프카 브로커1(kafka-docker-kafka-1
) 장애 발생 🚫
localhost:9092
에서 실행 중인kafka-docker-kafka-1
컨테이너가 장애가 발생하거나 종료됨- 프로듀서와 컨슈머는 여전히
localhost:9093
브로커에 연결하여 메시지를 주고받을 수 있음 - 메타데이터 갱신이 이루어져 클라이언트는 새로운 브로커 정보를 인식하고, 장애가 발생한 브로커를 제외한 상태에서 정상적으로 작업을 계속할 수 있음
- 카프카 클러스터는 장애가 발생한 브로커의 파티션 리더를
localhost:9093
에 있는 브로커로 전환함 - 이로써 클러스터는 장애 발생 후에도 정상적으로 메시지를 처리할 수 있음
4. 시나리오 3: 카프카 브로커 1 복구 🔄
localhost:9092
에서 실행 중인kafka-docker-kafka-1
컨테이너가 복구되거나 다시 시작됨- 클라이언트는
localhost:9092
브로커를 다시 사용할 수 있게 되며, 메시지 전송과 수신이 정상적으로 이루어짐 - 복구된 브로커는 카프카 클러스터의 일부로 자동으로 다시 참여함
5. 시나리오를 위한 환경 구성 ⚙️
1. 데이터 초기화
1
docker-compose down -v
2. docker-compose.yml
파일
1
vi docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
services:
kafka:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093,1@kafka2:9093
- KAFKA_KRAFT_CLUSTER_ID=my-cluster-id # 동일한 클러스터 ID 설정
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
kafka2:
image: docker.io/bitnami/kafka:3.9
ports:
- "9093:9092"
volumes:
- "kafka_data2:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker # kafka2도 controller와 broker 역할을 수행
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093,1@kafka2:9093
- KAFKA_KRAFT_CLUSTER_ID=my-cluster-id # 동일한 클러스터 ID 설정
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
kafka_data:
driver: local
kafka_data2:
driver: local
3. 컨테이너 재시작
1
docker-compose up -d
docker ps 결과
1
2
3
4
[shin@gram88 kafka-docker]$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6556e25e5562 bitnami/kafka:3.9 "/opt/bitnami/script…" 2 hours ago Up 2 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka-docker-kafka-1
2482d4a57dba bitnami/kafka:3.9 "/opt/bitnami/script…" 2 hours ago Up 2 hours 0.0.0.0:9093->9092/tcp, [::]:9093->9092/tcp kafka-docker-kafka2-1
4. 카프카 클라이언트 프로젝트 구성하기
KafkaConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 브로커 주소
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return configProps;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
}
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 브로커 주소
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configProps;
}
@Bean
public KafkaConsumer<String, String> consumer() {
return new KafkaConsumer<>(consumerConfig());
}
}
KafkaConsumer
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my_topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
KafkaProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
// ProducerListener 설정
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
// 전송 성공 시, RecordMetadata를 통해 전송된 데이터의 메타 정보를 확인
String topic = recordMetadata.topic();
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();
System.out.println("Message successfully sent to topic: " + topic + ", partition: " + partition + ", offset: " + offset);
}
@Override
public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
// 전송 실패 시, 예외 메시지 출력
System.out.println("Failed to send message to topic: " + producerRecord.topic() + ". Exception: " + exception.getMessage());
System.out.println("Failed message: " + producerRecord.value()); // 실패한 메시지 출력
}
});
}
public void sendMessage(String message) {
long startTime = System.currentTimeMillis();
CompletableFuture<SendResult<String, String>> future = CompletableFuture.supplyAsync(() -> {
try {
// 메시지 전송
return kafkaTemplate.send("my_topic", "key", message).get(); // blocking
} catch (InterruptedException | ExecutionException e) {
// 실패 시 예외 처리
throw new RuntimeException("Failed to send message", e);
}
});
future.whenComplete((result, ex) -> {
long endTime = System.currentTimeMillis();
if (ex == null) {
System.out.println("Message sent successfully in " + (endTime - startTime) + " ms");
} else {
// 지연이나 연결 실패 시 예외 처리
System.out.println("Failed to send message in " + (endTime - startTime) + " ms: " + ex.getMessage());
}
});
}
}
KafkaController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import com.example.errorhandling.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
private final KafkaProducer kafkaProducer;
@Autowired
public KafkaController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/sendMessage")
public String sendMessage() {
kafkaProducer.sendMessage("Hello, Kafka!"); // Kafka로 메시지 전송
return "Message sent to Kafka!";
}
}
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
server:
port: 8080
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9093 # 다중 브로커 설정
consumer:
group-id: my-group
auto-offset-reset: earliest
session.timeout.ms: 1000 # 세션 타임아웃 1초 설정
heartbeat.interval.ms: 500 # 하트비트 간격 500ms 설정
max.poll.interval.ms: 1000 # 폴링 대기 시간 1초
metadata-max-age-ms: 30000 # 30초마다 메타데이터 갱신
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
request.timeout.ms: 1500 # 요청 타임아웃 1.5초 설정
max.block.ms: 1500 # 최대 대기 시간 1.5초 설정
reconnect.backoff.ms: 500
reconnect.backoff.max.ms: 2000
retry.backoff.ms: 500
metadata-max-age-ms: 30000 # 30초마다 메타데이터 갱신
6. 시나리오1 실행 - 정상적인 메시지 전송 및 수신 📩
Postman을 활용해 /sendMessage
엔드포인트를 호출
메시지 정상 처리
7. 시나리오2 실행 - 카프카 브로커1 장애 발생 💥
1
[shin@gram88 kafka-docker]$ docker stop kafka-docker-kafka-1
예상결과
- 프로듀서, 컨슈머는 여전히
localhost:9093
브로커에 연결되어 메시지를 주고받을 수 있지 않을까? - 카프카 클러스터 내에서,
localhost:9093
브로커가 새로운 파티션 리더 역할을 수행하지 않을까?
기대와 다른 결과
localhost:9092
가 장애를 일으키면서 클라이언트가 브로커와 연결할 수 없게 되었음localhost:9093
브로커로의 자동 전환이 이루어지지 않음
기대와 다른 결과가 발생한 원인 분석 (추측 🤔)
- 메타데이터 갱신 지연
- 클라이언트가 장애 브로커를 감지한 후 다른 브로커로 전환하려면 최신 메타데이터가 필요함
- 장애로 인해 메타데이터 갱신 요청이 처리되지 않았을 수도 있음
- 파티션 리더 전환 실패
localhost:9093
이 새로운 파티션 리더로 전환되지 않았을 가능성이 있음- 클라이언트가 새로운 리더 정보를 받지 못해 메시지 전송에 실패했을 수 있음
- 이는 클러스터 내부의 리더 선출 지연 또는 클라이언트 메타데이터 갱신 실패 때문일 가능성 있음
8. 시나리오3 실행 - 카프카 브로커1 복구 🔧
1
[shin@gram88 kafka-docker]$ docker start kafka-docker-kafka-1
Postman을 활용해 /sendMessage
엔드포인트 재호출
메시지 전송과 수신이 다시 정상적으로 이루어짐
(장애상황 2) 클라이언트와 브로커 사이의 긴 지연 ⏳
시나리오: 네트워크 지연발생 (30초) 🌐
1. docker-compose.yml
수정 📝
docker-compose.yml
파일에서 cap_add
항목을 추가하여 NET_ADMIN
권한을 부여함
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
services:
kafka:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093,1@kafka2:9093
- KAFKA_KRAFT_CLUSTER_ID=my-cluster-id # 동일한 클러스터 ID 설정
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
cap_add:
- NET_ADMIN
kafka2:
image: docker.io/bitnami/kafka:3.9
ports:
- "9093:9092"
volumes:
- "kafka_data2:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker # kafka2도 controller와 broker 역할을 수행
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093,1@kafka2:9093
- KAFKA_KRAFT_CLUSTER_ID=my-cluster-id # 동일한 클러스터 ID 설정
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
cap_add:
- NET_ADMIN
volumes:
kafka_data:
driver: local
kafka_data2:
driver: local
2. docker-compose
재시작 🚀
파일을 수정한 후, docker-compose
를 재시작하여 권한을 부여한 후 컨테이너를 실행함
1
2
docker-compose down
docker-compose up -d
3. 컨테이너에서 tc
명령어 실행 🖥️
Docker를 사용할 때 -user=root
옵션을 사용하여 root 권한으로 실행할 수 있음
1
docker exec -it --user=root kafka-docker-kafka-1 bash
네트워크 지연 추가: tc
명령어를 사용하여 네트워크 지연을 추가함
1
2
root@6556e25e5562:/# apt-get update && apt-get install -y iproute2
root@6556e25e5562:/# tc qdisc add dev eth0 root netem delay 30000ms
4. KafkaProducer에서 지연 확인 🕒
Postman을 활용해 /sendMessage
엔드포인트를 호출하여 지연 발생 여부를 확인함
endTime - startTime
이 30초 이상으로 나오고, 메시지가 정상적으로 보내지지 않거나 지연됨
1
Message sent successfully in 30007 ms: Hello, Kafka!
이와 동시에, Kafka 클라이언트에서 다음과 같은 로그 발생
1
Disconnecting from node 0 due to request timeout
카프카 클라이언트는 설정된 시간 안에 브로커로부터 응답을 받지 못하면 연결을 끊음
5. 원인 🔍
- 타임아웃 설정
- 카프카 클라이언트는 설정된 시간 내에 응답을 받지 못하면 연결을 끊고
DisconnectException
이 발생함 - ex.
request.timeout.ms
가 30초라면, 30초 내에 응답이 없으면 타임아웃이 발생함
- 카프카 클라이언트는 설정된 시간 내에 응답을 받지 못하면 연결을 끊고
- 네트워크 지연
- 네트워크 지연이나 브로커 응답 지연이 클 경우 타임아웃이 발생함
- 브로커 응답 불가
- 브로커가 다운되거나 네트워크 문제로 응답이 없으면
DisconnectException
이 발생함
- 브로커가 다운되거나 네트워크 문제로 응답이 없으면
6. 해결 방안 💡
- 타임아웃 값 조정
- 타임아웃 값을 늘려서 일시적인 지연을 허용할 수 있음
- 예:
request.timeout.ms
값을 늘리기
- 네트워크 안정성 확보
- 네트워크 지연을 최소화하고 안정적인 연결을 유지해야 함
- 자동 재연결 설정
retries
,reconnect.backoff.ms
등을 설정하여 클라이언트가 자동으로 재연결을 시도하게 할 수 있음
7. 타임아웃 값 조정 (해결 방안 1번) ⏲️
request.timeout.ms
60초로 늘리고 /sendMessage
엔드포인트 재호출
1
request.timeout.ms: 600000
예상결과
- 네트워크 지연이 30초라고 가정하고 브로커의 응답 지연 임계값을 60초로 설정했음
- 타임아웃 오류 발생안하지 않을까..?
기대와 다른 결과
- 타임아웃 오류 발생
1
2
3
4
5
org.apache.kafka.common.errors.DisconnectException: null
Failed to send message to topic: my_topic. Exception: Topic my_topic not present in metadata after 60000 ms.
Failed message: Hello, Kafka!
Failed to send message in 60026 ms: org.springframework.kafka.KafkaException: Send failed
기대와 다른 결과가 발생한 원인 분석 (추측 🤔)
- 메타데이터 갱신 실패
Topic my_topic not present in metadata after 60000 ms
- 클라이언트가 브로커로부터 최신 메타데이터를 가져오지 못한 상태를 나타냄
- 이는 브로커와의 초기 연결 문제로 발생할 수 있음
- 타임아웃 값의 한계
- 타임아웃 값을 늘려도 브로커 연결이나 메타데이터 갱신 자체가 실패하면 문제 해결 불가
8. 네트워크 연결 안정성 확보 (해결 방안 2번) ✅
지연 제거
1
root@6556e25e5562:/# tc qdisc del dev eth0 root netem
postman을 활용해 /sendMessage
엔드포인트 호출
Received message: Hello, Kafka!
응답 획득