Flink 는 데이터 처리 프로세싱 프레임워크 및 엔진 중 하나로, kafka 로부터 메세지를 실시간으로 읽어서 분석할 때에 주로 쓰인다.
Flink Deployment에는 두 가지의 모드가 있다. 각 모드는 Deployment 시 자원의 분리 수준에서 주된 차이점을 가진다.
- Application mode: deployment가 각각 별개의 Flink Cluster 에서 수행된다.
Flink Cluster의 라이프 사이클은 그 Deployment의 라이프사이클에 종속된다.
- Session mode: Deployment가 Flink Session Cluster에서 실행된다.
따라서 다른 Deployment와 자원을 공유하게 된다. Flink Cluster의 라이프사이클과 Deployment의 라이프사이클은 독립적이다.
나는 Flink를 설치한다음 Apache Kafka SQL Connector 를 사용하고자 하기 때문에 아래 JAR 파일이 필요하다.
>> flink-sql-connector-kafka-1.18.0.jar
그런데 apache/flink:latest 이미지에의 lib폴더에는 해당 파일이 기본으로 추가되어있지 않다.
따라서 flink 이미지를 베이스로 커스텀 이미지를 생성하는 것이 필요하다.
1. 원하는 jar 파일을 추가하여 Dockerfile 파일 작성: 해당 파일이 있는 경로와 똑같은 위치에 jar파일을 위치시킨다.
FROM apache/flink:latest
COPY --chown=flink:flink flink-sql-connector-kafka-1.18.0.jar /opt/flink/lib/flink-sql-connector-kafka-1.18.0.jar
2. Docker Image 생성
docker build -t apache/flink:young .
3. flink-configmap.yaml 파일 작성
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.pekko.name = org.apache.pekko
logger.pekko.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
4. flink-svc.yaml 파일 작성
apiVersion: v1
kind: Service
metadata:
name: jobmanager
spec:
type: NodePort
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
5. jobmanager-session.yaml 파일 작성 : spec.container.image에 아까 커스텀으로 제작한 이미지를 명시한다.
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: apache/flink:young
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
6. taskmanager-session.yaml 파일 작성
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: apache/flink:young
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
7. Deploy ConfigMap/Service/Pod
kubectl apply -f flink-configmap.yaml
kubectl apply -f flink-svc.yaml
kubectl apply -f taskmanager.yaml
kubectl apply -f jobmanager.yaml
'자기발전소 > # Apaches' 카테고리의 다른 글
Install Airflow on Kubernetes (0) | 2023.11.26 |
---|---|
Install Apache Airflow on Docker (0) | 2023.11.01 |
Airflow 설치 및 구성 테스트 (0) | 2023.10.26 |