본문 바로가기
  • 노션에서 삽질한 내용을 정리하는 블로그
카테고리 없음

[Kafka] Consumer Assign & Seek API

by iamlucia 2021. 8. 3.

📌 kafka 및 Confluent 를 공부하며 정리하는 글

 

 

package com.github.luciakafka.demo1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class ConsumerDemoAssignSeek {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger(ConsumerDemoAssignSeek.class.getName());

        String bootstrapServers = "192.168.40.83:9092";
        //String groupID = "assign-seek-aps";
        String topic = "coding_topic";

        //Create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //create consumer
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);

        // assign and seek are mostly used to replay data or fetch a specific message
        // assign
        TopicPartition partitionToReadFrom = new TopicPartition(topic, 1);
        long offsetToReadFrom = 15L;
        consumer.assign(Arrays.asList(partitionToReadFrom));

        // seek
        consumer.seek(partitionToReadFrom, offsetToReadFrom);

        int numberOfMessagesToRead = 4;
        boolean keepOnReading = true ;
        int numberOfMessagesReadSoFar = 0;

        //poll for new data
        while(keepOnReading) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100)); //new in Kafka 2.0.0: must use Duration Object
            for (ConsumerRecord<String,String> record : records) {
                numberOfMessagesReadSoFar += 1;
                logger.info("Key: "+ record.key() +", Value: " + record.value());
                logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
                if(numberOfMessagesReadSoFar >= numberOfMessagesToRead) {
                    keepOnReading = false ; // to exit 'while' loop
                    break; // to exit 'for' loop
                }
            }
        }
        logger.info("Exiting the application");
    }
}

 

 

assign 및 seek API 를 사용하는 이유는?

- 정해진 offset 지점부터 다시 데이터를 읽어오기 위해서(replay)  해당 api를 사용한다.