📌 kafka 및 Confluent 를 공부하며 정리하는 글
Kafka를 활용하는 프로젝트 실행의 작은 기록1.
0. 프로젝트의 개요
[ Twitter - Producer - Kafka - Consumer - ElasticSearch ]
1. Twitter Developer Account 생성
아래 사이트에 접속하여 Twitter Developer account 를 생성한다.
https://developer.twitter.com/en/apply-for-access
Apply for access – Twitter Developers
Apply for access. Get started with Twitter APIs and tools. All new developers must apply for a developer account to access Twitter APIs. Once approved, you can begin to use our standard APIs and our new premium APIs.
developer.twitter.com
트위터 개발자 계정 신청 시, 개발자 계정 신청 목적을 다음과 같이 작성한다.
specifics에 모두 No를 입력하여 신청을 완료하고 나면,
해당 트위터 계정에 등록된 이메일주소로 온 컨펌메일만 확인한다.
프로젝트를 시작한다.
프로젝트 정보 및 APP Name을 착실히 작성하고 나면,
다음과 같이 API Key 와 Token이 발급된다.
전체적인 코드의 구조는 아래와 같다.
public class TwitterProducer {
public TwitterProducer() {}
public static void main(String[] args) {}
public void run() {
// 1. create a twitter client
// 2. create a kafka producer
// 3. while loop to send tweets to kafka
}
public Client createTwitterClient(BlockingQueue<String> msgQueue) {}
public KafkaProducer<String, String> createKafkaProducer() {}
}
2. Twitter Client 코드
2-1) pom.xml에 hbc-core 모듈 추가 선언
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
<version>2.2.0</version> <!-- or whatever the latest version is -->
</dependency>
💡 hosebird client는 트위터의 표준 Streaming API를 consume하는 Java HTTP Client이다.
이는 hbc-core 와 hbc-twitter4j 두 개의 모듈로 나뉜다.
hbc-core 모듈은 메세지 큐를 사용하는데, consumer는 메세지 큐를 통해서 String message를 원문 그대로 poll할 수 있다.
hbc-twitter4j 모듈은 twitter4j 리스너 및 데이터 모델을 통해 parsing layer의 기능도 포함한다.
2-2) Client를 결과로 반환하는 createTwitterClient 함수 선언
public Client createTwitterClient(BlockingQueue<String> msgQueue) {}
2-3) createTwitterClient 함수 아래 twitter connection 정보 선언
(github.com/twitter/hbc > Quickstart > Declaring the connection information 참조)
// 기대하는 TPS에 맞게 message queue 크기 설정 필요
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(100000);
// 연결하고자 하는 host 정보(endpoint/authentication) 선언
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// 메세지 추적 키워드:term 선언
List<String> terms = Lists.newArrayList("twitter", "api");
hosebirdEndpoint.trackTerms(terms);
// 연결에 필요한 secrets 정보 선언
Authentication hosebirdAuth = new OAuth1("consumerKey", "consumerSecret", "token", "secret");
- secrets 정보(consumerKey, consumerSecret, token, secret)는 클래스 내부 상단에 따로 String 형으로 선언한 다음,
OAuth 객체 생성시 secrets정보를 변수 파라미터로 전달하는 방식으로 수정한다.
//Authentication INFO
String consumerKey = "JQGmvb4Vd88QIfcK1wj9IuCH8";
String consumerSecret = "beG0iQ1shCROQYTAJrOr9iKIpIs1IflWZdcqbC0RbmzLDJ6obG";
String token = "1422440710859485185-fFjxfl9BSnA0ShyjuCN0DWnQRnwypb";
String secret = "n3f6HcXKPHmtE49b606mgxhdkoetrVi7ZnjNUpL29NjOh";
public Client createTwitterClient(BlockingQueue<String> msgQueue) {
...
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
...
}
2-4) connection 및 secrets 정보를 통해 client 생성하는 코드 작성
(github.com/twitter/hbc > Quickstart > Creating a clinet 참조)
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01")
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
Client hosebirdClient = builder.build();
hosebirdClient.connect();
- hosebirdClient.connect() 코드 부분은 함수에서 반환값을 Client로 해주기 위해 return hosebirdClient; 로 대체한다.
- 그리고 hosebirdClient.connect() 부분은 run()함수 내부로 이동한다.
Client hosebirdClient = builder.build();
//hosebirdClient.connect();
return hosebirdClient;
2-5) run() 함수에 client 생성 및 host에 연결하는 함수 작성
// 1. create a twitter client
Client client = createTwitterClient(msgQueue);
client.connect(); // Attempts to establish a connection.
'자기발전소 > # Kafka_Confluent' 카테고리의 다른 글
Confluent for Kubernetes 삽질 기록 (0) | 2023.10.20 |
---|---|
[Kafka] Idempotent Producer (0) | 2022.04.10 |
[Kafka] Client Bi-Directional Compatibility (0) | 2021.08.03 |
[Kafka] Consumer Group: Rebalancing (0) | 2021.08.02 |
[Kafka] Producer 관련 주요 옵션 (0) | 2021.05.29 |