본문 바로가기
  • 노션에서 삽질한 내용을 정리하는 블로그
자기발전소/# Kafka_Confluent

[Kafka] Producer 관련 주요 옵션

by iamlucia 2021. 5. 29.

📌 kafka 및 Confluent 를 공부하며 정리하는 글

 

Producer Configurations  

 

Idempotent Producer 


acks=0 (no acks)

producer는 메세지만 보낼 뿐, 해당 메세지가 broker단에 제대로 전달되었는지 확인하지 않는다.

메세지 손실의 위험이 있다.

단순 메트릭 정보와 같이 메세지 손실이 어느정도 눈감아지는 상황이라면 사용해도 괜찮다.  

 

acks=1 (leader acks) 

producer에서 Kafka로 데이터를 보내면(:write request), 

leader Broker는 write request에 대해 respond를 보낸다.

그리고 해당 데이터를 Topic에 write한다.

leader에게 응답을 받지 못한 경우 producer는 다시 write request를 재시도한다.

하지만 해당 옵션의 경우 leader의 응답만 확인하기 때문에, leader의 replica들도 제대로 메세지를 받았는지 확인할 수 없다. 

 

따라서 replica들이 제대로 leader의 topic/partition을 제대로 복제하지 못한 상태에서

leader가 다운된다면 메세지 손실의 위험이 있다.

 

acks=all(replicas acks)

leader뿐만 아니라 replicas들도 ack 를 보내야 한다.

그만큼 지연시간은 늘어나지만 데이터 손실이 되지 않아 안정성은 높아진다.

따라서 데이터를 하나라도 잃어서는 안되는 경우라면, 해당 옵션을 사용해야 한다.

 

💡  acks=all 옵션을 사용할 때에는 min.insync.replicas 옵션과의 상호작용을 고려해야 한다.
min.insync.replicas 옵션은 broker&topic level에서 설정할 수 있다. (topic단에서 설정한 옵션이broker단의 옵션을 오버라이딩 가능)

예: min.insync.replicas=2 는 적어도 2개의 브로커 (리더 브로커를 포함한 ISR)가 데이터를 받았다는 ack응답을 보내야 한다는 뜻
예: replication factor=3, min.insync.replicas=2, acks=all 인 경우 - 모든 브로커에게 ack를 받지 못해도 min.insync.replicas가 2이기 때문에 브로커 1개가 장애가 난 경우에는 프로세스는 유지될 수 있다.  2개에 장애가난 경우, producer는 NOT_ENOUGH_REPLICAS라는 EXCEPTION응답을 받게 된다.

 

retry 

NotEnoughReplicasException과 같은 일시적인 장애가 생긴 경우,

exception 상황을 잘 대처하여 데이터 손실을 막아야 한다. 이를 위한 retries 옵션이 존재한다. 

 

kafka  2.1 버전 이상부터는 retries 가  2147483647 번으로 기본값이 설정되어 있다.r

etry.backoff.ms 옵션의 기본값은 100ms이다.저 무수한 값동안 retry를 때려야 할까?이를 제한하는 옵션 또한 있다.

 

delivery.timeout.ms

해당 옵션이 2분인 경우, producer는 acks를 받지 못한 메세지에 대해 2분동안 요청 retry를 진행한다.

2분이 지나도록 ack를 받지 못한 메세지는 결국 fail처리가 된다.

 

 

💡 retries 가 일어나는 동안 메세지의 순서가 뒤죽박죽될 가능성이 있다. (batch가 실패하는 경우) 
즉, key 기반 ordering을 원하는 경우 이는 문제가 될 수 있다. 
이 이슈를 해결하기 위해서는 단일 브로커에 대해 얼마나 많은 수의 produce request가 병행적으로 실행되도록 조절하는 옵션인 max.in.flight.requests.per.connection의 값을 조절할 필요가 있다

 

max.in.flight.requests.per.connection

Default: 5

순서를 보장하기 위해서는 이를 1로 설정해야 한다. 하지만 처리량은 낮아질 수 있다.

 

네트워크 에러로 인해 카프카에 메세지를 중복으로 produce하게 되는 경우가 있다.

정상적으로 produce와 ack가 일어나는 프로세스는 다음과 같다.

1. Producer --- produce ---> Broker

2. Broker : commit !

3. Broker --- ack ---> Producer 

 

ack를 보내는 과정에서 네트워크 error가 발생하는 경우 메세지가 중복되는 과정은 다음과 같다.

1. Producer --- produce ---> Broker

2. Broker : commit !

3. Broker -- ack --- network error! --X --> Producer 

4. Producer: ???

5. Producer --- produce(Retry) ----> Broker

6. Broker : commit ! (Duplicated)

 

이를 방지하기 위해서 Kafka 0.11 버전부터는 Idempotent Request라는 개념이 등장한다.

Produce Request에  id가 부여되기 때문에 브로커 단에서 중복되는 request를 알아채고 중복을 방지한다.

이러한 Idempotent Producer는 다음과 같은 기본값의 설정에 의해 가능하다.

 

retries=Integer.MAX_VALUEmax.in.flight.requests.per.connection=5acks=all

 

자바 프로그래밍 과정에서 이를 설정하려면 단지 다음과 같이 설정하면 된다.

producerProps.put("enable.idempotence", true);

 

 

Batching


기본적으로 카프카는 record를 가능한한 빨리 보내는 방향으로 설정되어 있다.

max.in.flight.requests.per.connection=5 옵션을 보면 알 수 있듯이, 동시에 5r건의 메세지가 개별적으로 전달될 수 있게 설정되어있다.

이외에 더 많은 건수의 메세지가 보내져야 하는 경우 카프카는 이들을 batching 하여 한꺼번에 보낸다. 

이러한 batch 작업을 통해 카프카는 지연속도를 낮추면서 동시에 처리량을 높일 수 있다.

batch는 또한 높은 압축률을 가지고 있어서 높은 효율성이라는 이점도 있다.

 

이러한 배치 매커니즘을 조절하는 대표적인 옵션은 다음과 같다.

 

linger.ms (default:0)

프로듀서 단에서 Batch를 보내기 전 기다릴 수 있는 시간(ms)

해당 값을 늘리게 되면, 메세지를 배치에 넣어서 보낼 기회가 더 많아진다.

조금의 딜레이는 얻겠지만, 프로듀서단에서 처리량, 압축률, 효율성은 증가시킬 수 있다.

해당 시간만큼 기다리기도 전에 batch.size값만큼 메세지가 차면 브로커는 그냥 바로 배치를 보내버린다-!

one batch = one request

 

batch.size 

하나의 배치 안에 넣을 수 있는 최대 바이트 수 

배치 사이즈를 늘리게 되면 요청을 보낼 때 압축률, 처리량, 효율성에 있어서 이점을 볼 수 있다.

배치 사이즈보다 더 큰 사이즈의 메세지의 경우에는 배치되지 못한다.

하나의 배치는 파티션별로 할당되기 때문에, 이를 너무 높은 값으로 정하게되면 메모리가 부족할 수도 있다.

 

🔎 Kafka Producer 메트릭스를 통해 평균적인 Batch size정보를 모니터링하여 설정하는 것이 좋다.

 

Memory


브로커가 메세지를 처리하는 속도보다 프로듀서가 더 빠르게 메세지를 produce하는 경우,

해당 레코드는 잠시 프로듀서 메모리에 buffer된다.

 

buffer.memory

기본값은 32MB로, send buffer의 사이즈를 말한다.

이 버퍼는 시간이 지남에 따라 계속 차오르고, 브로커에 더 빠르게 메세지를 보낼 수 있게 되면 다시 내려간다.

버퍼가 가득 차게 되면, send() 메서드는 block된다.  (produce data를 못하고 그냥 기다리게 된다)

 

max.block.ms 

send()메서드가 예외를 던지기 까지 block되는 시간을 말한다.

예외를 던지는 경우는 다음과 같다.

1. 프로듀서 버퍼 메모리가 꽉 찼을 때

2. 브로커가 전혀 새로운 데이터를 받지 못할 때

3. max.block.ms 시간이 다 지났을 때 

 

해당 예외에 부딪히게 되면, 브로커가 down되어 있거나 과부하되어 요청을 받지 못하는 상황임을 추측해볼 수 있다.

 

'자기발전소 > # Kafka_Confluent' 카테고리의 다른 글

[Kafka] Client Bi-Directional Compatibility  (0) 2021.08.03
[Kafka] Consumer Group: Rebalancing  (0) 2021.08.02
[Kafka] Broker & Zookeeper  (0) 2021.05.19
[Kafka] Consumer  (0) 2021.05.19
[Kafka] Producer  (0) 2021.05.19