본문 바로가기
  • 노션에서 삽질한 내용을 정리하는 블로그
자기발전소/# Kafka_Confluent

[Kafka] Twitter Producer 생성

by iamlucia 2021. 8. 5.

📌 kafka 및 Confluent 를 공부하며 정리하는 글 

 

Kafka를 활용하는 프로젝트 실행의 작은 기록1. 


0. 프로젝트의 개요

[ Twitter - Producer - Kafka - Consumer - ElasticSearch ]

 

1. Twitter Developer Account 생성

아래 사이트에 접속하여 Twitter Developer account 를 생성한다. 

 

https://developer.twitter.com/en/apply-for-access

 

Apply for access – Twitter Developers

Apply for access. Get started with Twitter APIs and tools. All new developers must apply for a developer account to access Twitter APIs. Once approved, you can begin to use our standard APIs and our new premium APIs.

developer.twitter.com

 

트위터 개발자 계정 신청 시, 개발자 계정 신청 목적을 다음과 같이 작성한다. 

 

 

specifics에 모두 No를 입력하여 신청을 완료하고 나면, 

해당 트위터 계정에 등록된 이메일주소로 온 컨펌메일만 확인한다.

 

짝짝짝

 

프로젝트를 시작한다. 

 

프로젝트 정보 및 APP Name을 착실히 작성하고 나면,

다음과 같이 API Key 와 Token이 발급된다.

 

해당 페이지 하단에 get_user_token 이라는 문구가 있는데 이를 클릭하면, 

 

Token & Secret을 생성하여 해당 정보도 복사해 두자! 

 

전체적인 코드의 구조는 아래와 같다. 

public class TwitterProducer {
    public TwitterProducer() {}
    public static void main(String[] args) {}
    
    public void run() {
    // 1. create a twitter client
    // 2. create a kafka producer
    // 3. while loop to send tweets to kafka
    }
    
    public Client createTwitterClient(BlockingQueue<String> msgQueue) {}
    
    public KafkaProducer<String, String> createKafkaProducer() {}
}

 

2. Twitter Client 코드 

2-1) pom.xml에  hbc-core 모듈 추가 선언 

<dependency>
   <groupId>com.twitter</groupId>
   <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
   <version>2.2.0</version> <!-- or whatever the latest version is -->
</dependency>

 

💡 hosebird client는 트위터의 표준 Streaming API를 consume하는 Java HTTP Client이다.
이는 hbc-core 와 hbc-twitter4j 두 개의 모듈로 나뉜다. 
hbc-core 모듈은 메세지 큐를 사용하는데, consumer는 메세지 큐를 통해서 String message를 원문 그대로 poll할 수 있다.
hbc-twitter4j 모듈은 twitter4j 리스너 및 데이터 모델을 통해 parsing layer의 기능도 포함한다.  

 

2-2) Client를 결과로 반환하는 createTwitterClient 함수 선언

public Client createTwitterClient(BlockingQueue<String> msgQueue) {}

 

2-3) createTwitterClient 함수 아래 twitter connection 정보 선언

(github.com/twitter/hbc > Quickstart > Declaring the connection information 참조)

// 기대하는 TPS에 맞게 message queue 크기 설정 필요
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(100000);

// 연결하고자 하는 host 정보(endpoint/authentication) 선언
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

// 메세지 추적 키워드:term 선언
List<String> terms = Lists.newArrayList("twitter", "api");
hosebirdEndpoint.trackTerms(terms);

// 연결에 필요한 secrets 정보 선언
Authentication hosebirdAuth = new OAuth1("consumerKey", "consumerSecret", "token", "secret");

 

- secrets 정보(consumerKey, consumerSecret, token, secret)는 클래스 내부 상단에 따로 String 형으로 선언한 다음, 

OAuth 객체 생성시 secrets정보를 변수 파라미터로 전달하는 방식으로 수정한다. 

//Authentication INFO
String consumerKey = "JQGmvb4Vd88QIfcK1wj9IuCH8";
String consumerSecret = "beG0iQ1shCROQYTAJrOr9iKIpIs1IflWZdcqbC0RbmzLDJ6obG";
String token = "1422440710859485185-fFjxfl9BSnA0ShyjuCN0DWnQRnwypb";
String secret = "n3f6HcXKPHmtE49b606mgxhdkoetrVi7ZnjNUpL29NjOh";

public Client createTwitterClient(BlockingQueue<String> msgQueue) {
...
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
...
}

 

2-4) connection 및 secrets 정보를 통해 client 생성하는 코드 작성 

(github.com/twitter/hbc > Quickstart > Creating a clinet 참조)

ClientBuilder builder = new ClientBuilder()
  .name("Hosebird-Client-01")                          
  .hosts(hosebirdHosts)
  .authentication(hosebirdAuth)
  .endpoint(hosebirdEndpoint)
  .processor(new StringDelimitedProcessor(msgQueue));

Client hosebirdClient = builder.build();
hosebirdClient.connect();

-  hosebirdClient.connect() 코드 부분은 함수에서 반환값을 Client로 해주기 위해 return hosebirdClient; 로 대체한다. 

- 그리고 hosebirdClient.connect() 부분은 run()함수 내부로 이동한다. 

Client hosebirdClient = builder.build();
//hosebirdClient.connect();
return hosebirdClient;

 

2-5) run() 함수에 client 생성 및 host에 연결하는 함수 작성

// 1. create a twitter client
Client client = createTwitterClient(msgQueue);
client.connect();  // Attempts to establish a connection.