카테고리 없음
[Kafka] Consumer Assign & Seek API
iamlucia
2021. 8. 3. 14:03
📌 kafka 및 Confluent 를 공부하며 정리하는 글
package com.github.luciakafka.demo1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class ConsumerDemoAssignSeek {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemoAssignSeek.class.getName());
String bootstrapServers = "192.168.40.83:9092";
//String groupID = "assign-seek-aps";
String topic = "coding_topic";
//Create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//create consumer
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
// assign and seek are mostly used to replay data or fetch a specific message
// assign
TopicPartition partitionToReadFrom = new TopicPartition(topic, 1);
long offsetToReadFrom = 15L;
consumer.assign(Arrays.asList(partitionToReadFrom));
// seek
consumer.seek(partitionToReadFrom, offsetToReadFrom);
int numberOfMessagesToRead = 4;
boolean keepOnReading = true ;
int numberOfMessagesReadSoFar = 0;
//poll for new data
while(keepOnReading) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100)); //new in Kafka 2.0.0: must use Duration Object
for (ConsumerRecord<String,String> record : records) {
numberOfMessagesReadSoFar += 1;
logger.info("Key: "+ record.key() +", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
if(numberOfMessagesReadSoFar >= numberOfMessagesToRead) {
keepOnReading = false ; // to exit 'while' loop
break; // to exit 'for' loop
}
}
}
logger.info("Exiting the application");
}
}
assign 및 seek API 를 사용하는 이유는?
- 정해진 offset 지점부터 다시 데이터를 읽어오기 위해서(replay) 해당 api를 사용한다.