📌 kafka 및 Confluent 를 공부하며 정리하는 글
Consumer Group 의 Rebalancing
consumer group 프로세스가 하나 돌아가는 중에
하나를 더 추가로 돌려보자.
첫 번째 돌고 있던 consumer group 로그에서
"Attempt to heartbeat failed since group is rebalancing" 를 통해 rebalancing 시작을 알 수 있다.
이어서 Rejoining 이 발생하고,
파티션이 나누어 할당된다.
"Adding newly assigned partitions: coding_topic-2" 로그를 통해
첫번째 consumer 프로세스는 2 파티션을 할당 받음을 알 수 있다.
대상 토픽의 파티션 3개를 두 개의 consumer 가 나누어 할당받았는 것을 확인할 수 있다.
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Revoke previously assigned partitions coding_topic-2, coding_topic-1, coding_topic-0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Successfully joined group with generation Generation{generationId=8, memberId='consumer-testing-cg-1-3360346a-1362-4c00-8f89-31d811278598', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Finished assignment for group at generation 8: {consumer-testing-cg-1-1650089a-23e2-457d-b4a5-01b7f6fe873f=Assignment(partitions=[coding_topic-0, coding_topic-1]), consumer-testing-cg-1-3360346a-1362-4c00-8f89-31d811278598=Assignment(partitions=[coding_topic-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Successfully synced group in generation Generation{generationId=8, memberId='consumer-testing-cg-1-3360346a-1362-4c00-8f89-31d811278598', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Notifying assignor about the new Assignment(partitions=[coding_topic-2])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Adding newly assigned partitions: coding_topic-2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Setting offset for partition coding_topic-2 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[broker1:9092 (id: 1 rack: null)], epoch=0}}
두번째 생성된 consumer group 의 로그(아래)를 통해
"Adding newly assigned partitions: coding_topic-1, coding_topic-0"
1과 0 파티션을 할당받았음을 알 수 있다.
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Successfully joined group with generation Generation{generationId=8, memberId='consumer-testing-cg-1-1650089a-23e2-457d-b4a5-01b7f6fe873f', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Successfully synced group in generation Generation{generationId=8, memberId='consumer-testing-cg-1-1650089a-23e2-457d-b4a5-01b7f6fe873f', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Notifying assignor about the new Assignment(partitions=[coding_topic-0, coding_topic-1])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-testing-cg-1, groupId=testing-cg] Adding newly assigned partitions: coding_topic-1, coding_topic-0
Producer를 통해 메세지를 생성시켜
각 Consumer 가 지정된 파티션의 메세지만 consume하는지 확인해본다.
첫번째 Consumer Group : 파티션 2의 메세지만 consume
두번째 Consumer Group : 파티션 0과 1의 메세지만 consume
이때 두번째 Consumer 프로세스를 다운시키면
첫번째Consumer 에서 다시 rebalancing이 발생하며 파티션을 재할당받는다.
'자기발전소 > # Kafka_Confluent' 카테고리의 다른 글
[Kafka] Twitter Producer 생성 (0) | 2021.08.05 |
---|---|
[Kafka] Client Bi-Directional Compatibility (0) | 2021.08.03 |
[Kafka] Producer 관련 주요 옵션 (0) | 2021.05.29 |
[Kafka] Broker & Zookeeper (0) | 2021.05.19 |
[Kafka] Consumer (0) | 2021.05.19 |