# 카프카 커넥트

  • kafka connect : 카프카 오픈소스에 포함된 툴 중 하나로 데이터 파이프라인 생성 시 작업 형태를 템플릿으로 만들어 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션.
  • 소스 커넥터(source connector) : 프로듀서 역할
  • 싱크 커넥터(sink connector) : 컨슈머 역할

# 커넥트 실행

# 단일모드 커넥트

커넥트 파일 수정

lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0$ cd config
lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0/config$ vi connect-standalone.properties

bootstrap.servers=my-kafka:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins
1
2
3
4
5
6
7
8
9
10
11

단일모드 커넥트는 커넥스 설정파일 정의 실행해야 한다. 예시로 기본제공하는 파일소스커넥터 실행

lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0/config$ vi connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=connect-test
1
2
3
4
5
6
7

단일모드 커넥터 실행

lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0$ bin/connect-standalone.sh config/connect-standalone.properties \
> config/connect-file-source.properties
1
2

동작확인!

lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0$ cd /tmp
lkh@DESKTOP-1L2VEPP:/tmp$ touch test.txt

# test.txt 수정

lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic connect-test --from-beginning
"test"
"234567890"
"어디로 가니"
1
2
3
4
5
6
7
8
9

# 분산모드 커넥트

분산모드 설정파일

lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0/config$ vi connect-distributed.properties

bootstrap.servers=my-kafka:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

실행

bin/connect-distributed.sh config/connect-distributed.properties
1
# 커넥트에 사용할수 있는 플러그인 조회
lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0/config$ curl -X GET http://localhost:8083/connector-plugins
#########
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

# FileStreamSourceConnector 실행
lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0/config$ curl -X POST -H "Content-Type: application/json" \
> --data '{
> "name":"local-file-source",
> "config":{
> "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
> "file":"/tmp/test.txt",
> "tasks.max":"1",
> "topic":"connect-test"
> }
> }' \
> http://localhost:8083/connectors

# 실행확인
lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0/config$ curl -X GET http://localhost:8083/connectors/local-file-source/status
# {"name":"local-file-source","connector":{"state":"RUNNING","worker_id":"127.0.1.1:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"127.0.1.1:8083"}],"type":"source"}

# 커넥터 종료
lkh@DESKTOP-1L2VEPP:~/kafka_2.12-2.5.0/config$ curl -X DELETE http://localhost:8083/connectors/local-file-source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 소스 커넥터

# 파일소스 커넥터 구현

  • 로컬에 저장된 파일을 토픽으로 한줄씩 읽어 토픽으로 보내는 파일소스 커넥터 작성
  • build.gradle에 connect-api 라이브러리와 빌드된 파일을 jar로 압축하기 위한 스크립트 작성
  • 카프카 커넥터를 직접 개발하고 플러그인으로 커넥트에 추가할 때 주의할점은 사용자가 직접 작성한 클래스뿐만 아니라 참조하는 라이브러리도 함께 빌드하여 jar로 압축

소스 (opens new window)

# 실행

  1. 자르 생성

17

  1. 커넥트 플러그인 디렉토리 생성 및 자르 넣기
sudo mkdir -p /usr/local/share/kafka/plugins
sudo cp /mnt/c/simple-source-connector-1.0.jar /usr/local/share/kafka/plugins
1
2
  1. 분산모드 커넥터 실행
bin/connect-distributed.sh config/connect-distributed.properties # 분산모드 커넥터 실행
curl -X GET http://localhost:8083/connector-plugins # 실행가능 플러그인 조회
1
2
# 추가 된 것을 확인할 수 있다 !
[{"class":"com.example.SingleFileSourceConnector","type":"source","version":"1.0"}, 
{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"}, 
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"}, 
{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"}, 
{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
1
2
3
4
5
6
# SingleFileSourceConnector 실행
curl -X POST -H "Content-Type: application/json" \
--data '{
"name":"single-file-source",
"config":{
"connector.class":"com.example.SingleFileSourceConnector",
"tasks.max":"1"
}
}' \
http://localhost:8083/connectors

# 실행확인
curl -X GET http://localhost:8083/connectors/single-file-source/status

# /tmp/kafka.txt 수정

# 조회
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic test \
--from-beginning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 싱크 커넥터

# 미러메이커2

  • 두개의 카프카 클러스터간에 토픽을 복제하는 애플리케이션