📌 kafka 및 Confluent 를 공부하며 정리하는 글
Kafka를 활용하는 프로젝트 실행의 작은 기록1.
0. 프로젝트의 개요
[ Twitter - Producer - Kafka - Consumer - ElasticSearch ]
1. Twitter Developer Account 생성
아래 사이트에 접속하여 Twitter Developer account 를 생성한다.
https://developer.twitter.com/en/apply-for-access
트위터 개발자 계정 신청 시, 개발자 계정 신청 목적을 다음과 같이 작성한다.
specifics에 모두 No를 입력하여 신청을 완료하고 나면,
해당 트위터 계정에 등록된 이메일주소로 온 컨펌메일만 확인한다.
프로젝트를 시작한다.
프로젝트 정보 및 APP Name을 착실히 작성하고 나면,
다음과 같이 API Key 와 Token이 발급된다.
전체적인 코드의 구조는 아래와 같다.
public class TwitterProducer {
public TwitterProducer() {}
public static void main(String[] args) {}
public void run() {
// 1. create a twitter client
// 2. create a kafka producer
// 3. while loop to send tweets to kafka
}
public Client createTwitterClient(BlockingQueue<String> msgQueue) {}
public KafkaProducer<String, String> createKafkaProducer() {}
}
2. Twitter Client 코드
2-1) pom.xml에 hbc-core 모듈 추가 선언
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
<version>2.2.0</version> <!-- or whatever the latest version is -->
</dependency>
💡 hosebird client는 트위터의 표준 Streaming API를 consume하는 Java HTTP Client이다.
이는 hbc-core 와 hbc-twitter4j 두 개의 모듈로 나뉜다.
hbc-core 모듈은 메세지 큐를 사용하는데, consumer는 메세지 큐를 통해서 String message를 원문 그대로 poll할 수 있다.
hbc-twitter4j 모듈은 twitter4j 리스너 및 데이터 모델을 통해 parsing layer의 기능도 포함한다.
2-2) Client를 결과로 반환하는 createTwitterClient 함수 선언
public Client createTwitterClient(BlockingQueue<String> msgQueue) {}
2-3) createTwitterClient 함수 아래 twitter connection 정보 선언
(github.com/twitter/hbc > Quickstart > Declaring the connection information 참조)
// 기대하는 TPS에 맞게 message queue 크기 설정 필요
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(100000);
// 연결하고자 하는 host 정보(endpoint/authentication) 선언
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// 메세지 추적 키워드:term 선언
List<String> terms = Lists.newArrayList("twitter", "api");
hosebirdEndpoint.trackTerms(terms);
// 연결에 필요한 secrets 정보 선언
Authentication hosebirdAuth = new OAuth1("consumerKey", "consumerSecret", "token", "secret");
- secrets 정보(consumerKey, consumerSecret, token, secret)는 클래스 내부 상단에 따로 String 형으로 선언한 다음,
OAuth 객체 생성시 secrets정보를 변수 파라미터로 전달하는 방식으로 수정한다.
//Authentication INFO
String consumerKey = "JQGmvb4Vd88QIfcK1wj9IuCH8";
String consumerSecret = "beG0iQ1shCROQYTAJrOr9iKIpIs1IflWZdcqbC0RbmzLDJ6obG";
String token = "1422440710859485185-fFjxfl9BSnA0ShyjuCN0DWnQRnwypb";
String secret = "n3f6HcXKPHmtE49b606mgxhdkoetrVi7ZnjNUpL29NjOh";
public Client createTwitterClient(BlockingQueue<String> msgQueue) {
...
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
...
}
2-4) connection 및 secrets 정보를 통해 client 생성하는 코드 작성
(github.com/twitter/hbc > Quickstart > Creating a clinet 참조)
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01")
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
Client hosebirdClient = builder.build();
hosebirdClient.connect();
- hosebirdClient.connect() 코드 부분은 함수에서 반환값을 Client로 해주기 위해 return hosebirdClient; 로 대체한다.
- 그리고 hosebirdClient.connect() 부분은 run()함수 내부로 이동한다.
Client hosebirdClient = builder.build();
//hosebirdClient.connect();
return hosebirdClient;
2-5) run() 함수에 client 생성 및 host에 연결하는 함수 작성
// 1. create a twitter client
Client client = createTwitterClient(msgQueue);
client.connect(); // Attempts to establish a connection.
'자기발전소 > # Kafka_Confluent' 카테고리의 다른 글
Confluent for Kubernetes 삽질 기록 (0) | 2023.10.20 |
---|---|
[Kafka] Idempotent Producer (0) | 2022.04.10 |
[Kafka] Client Bi-Directional Compatibility (0) | 2021.08.03 |
[Kafka] Consumer Group: Rebalancing (0) | 2021.08.02 |
[Kafka] Producer 관련 주요 옵션 (0) | 2021.05.29 |