본문 바로가기
  • 노션에서 삽질한 내용을 정리하는 블로그
자기발전소/# Kafka_Confluent

[Kafka] Consumer Group: Rebalancing

by iamlucia 2021. 8. 2.

📌 kafka 및 Confluent 를 공부하며 정리하는 글 

 

Consumer Group 의  Rebalancing 

consumer group 프로세스가 하나 돌아가는 중에

하나를 더 추가로 돌려보자.

 

첫 번째 돌고 있던 consumer group 로그에서

"Attempt to heartbeat failed since group is rebalancing" 를 통해 rebalancing 시작을 알 수 있다.

이어서 Rejoining 이 발생하고, 

파티션이 나누어 할당된다.

 

"Adding newly assigned partitions: coding_topic-2" 로그를 통해

첫번째 consumer 프로세스는 2 파티션을 할당 받음을 알 수 있다. 

대상 토픽의 파티션 3개를 두 개의 consumer 가 나누어 할당받았는 것을 확인할 수 있다.

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Revoke previously assigned partitions coding_topic-2, coding_topic-1, coding_topic-0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Successfully joined group with generation Generation{generationId=8, memberId='consumer-testing-cg-1-3360346a-1362-4c00-8f89-31d811278598', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Finished assignment for group at generation 8: {consumer-testing-cg-1-1650089a-23e2-457d-b4a5-01b7f6fe873f=Assignment(partitions=[coding_topic-0, coding_topic-1]), consumer-testing-cg-1-3360346a-1362-4c00-8f89-31d811278598=Assignment(partitions=[coding_topic-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Successfully synced group in generation Generation{generationId=8, memberId='consumer-testing-cg-1-3360346a-1362-4c00-8f89-31d811278598', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Notifying assignor about the new Assignment(partitions=[coding_topic-2])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Adding newly assigned partitions: coding_topic-2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Setting offset for partition coding_topic-2 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker1:9092 (id: 1 rack: null)], epoch=0}}

 

두번째 생성된 consumer group 의 로그(아래)를 통해

"Adding newly assigned partitions: coding_topic-1, coding_topic-0" 

1과 0 파티션을 할당받았음을 알 수 있다.

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Successfully joined group with generation Generation{generationId=8, memberId='consumer-testing-cg-1-1650089a-23e2-457d-b4a5-01b7f6fe873f', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Successfully synced group in generation Generation{generationId=8, memberId='consumer-testing-cg-1-1650089a-23e2-457d-b4a5-01b7f6fe873f', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Notifying assignor about the new Assignment(partitions=[coding_topic-0, coding_topic-1])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Adding newly assigned partitions: coding_topic-1, coding_topic-0

 

Producer를 통해 메세지를 생성시켜

각 Consumer 가 지정된 파티션의 메세지만 consume하는지 확인해본다.

 

첫번째 Consumer Group :  파티션 2의 메세지만 consume

 

두번째 Consumer Group :  파티션 0과  1의 메세지만 consume

 

이때 두번째 Consumer 프로세스를 다운시키면 

첫번째Consumer 에서 다시 rebalancing이 발생하며 파티션을 재할당받는다.

 

'자기발전소 > # Kafka_Confluent' 카테고리의 다른 글

[Kafka] Twitter Producer 생성  (0) 2021.08.05
[Kafka] Client Bi-Directional Compatibility  (0) 2021.08.03
[Kafka] Producer 관련 주요 옵션  (0) 2021.05.29
[Kafka] Broker & Zookeeper  (0) 2021.05.19
[Kafka] Consumer  (0) 2021.05.19