# 스프링 카프카
# 스프링 카프카 프로듀서
dependencies {
implementation 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE'
implementation 'org.springframework.boot:spring-boot-starter:2.4.0'
}
1
2
3
4
2
3
4
@SpringBootApplication
public class SpringProducerApplication implements CommandLineRunner {
private static String TOPIC_NAME = "test";
// 스프링 카프카에서 제공하는 KafkaTemplate 객체 자동 주입. application.yml 에서 선언한 옵션값 자동 주입
@Autowired
private KafkaTemplate<Integer, String> template;
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringProducerApplication.class);
application.run(args);
}
@Override
public void run(String... args) {
for (int i = 0; i < 10; i++) {
template.send(TOPIC_NAME, "test" + i); // 토픽 이름과 메시지 이름을 넣어 전송
}
System.exit(0); // 전송이 완료되면 종료
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 커스텀 카프카 템플릿
소스 (opens new window) 특별한건 없어보임
# 스프링 카프카 컨슈머
# 레코드 리스너 활용
@SpringBootApplication
public class SpringConsumerApplication {
public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
application.run(args);
}
// 가장 기본적인 리스너 선언
@KafkaListener(topics = "test", groupId = "test-group-00")
public void recordListener(ConsumerRecord<String, String> record) {
logger.info(record.toString());
}
// 메시지 값을 파라미터로 받는 리스너
@KafkaListener(topics = "test", groupId = "test-group-01")
public void singleTopicListener(String messageValue) {
logger.info(messageValue);
}
// 컨슈머 옵션 부여
@KafkaListener(topics = "test", groupId = "test-group-02", properties = {"max.poll.interval.ms:60000", "auto.offset.reset:earliest"})
public void singleTopicWithPropertiesListener(String messageValue) {
logger.info(messageValue);
}
// 컨슈머 스레드 실행
@KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3")
public void concurrentTopicListener(String messageValue) {
logger.info(messageValue);
}
// 특정 토픽의 특정 파티션만 구독
@KafkaListener(topicPartitions = {@TopicPartition(topic = "test01", partitions = {"0", "1"}), @TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3"))}, groupId = "test-group-04")
public void listenSpecificPartition(ConsumerRecord<String, String> record) {
logger.info(record.toString());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 배치 리스너 활용
@SpringBootApplication
public class SpringConsumerApplication {
public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
application.run(args);
}
// 1. ConsumerRecords 파라미터로 받음
@KafkaListener(topics = "test", groupId = "test-group-01")
public void batchListener(ConsumerRecords<String, String> records) {
records.forEach(record -> logger.info(record.toString()));
}
// 2. List 자료구조로 받음
@KafkaListener(topics = "test", groupId = "test-group-02")
public void batchListener(List<String> list) {
list.forEach(recordValue -> logger.info(recordValue));
}
// 3. 2개 이상의 컨슈머 스레드로 배치 리스너 운영
@KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3")
public void concurrentBatchListener(ConsumerRecords<String, String> records) { //3
records.forEach(record -> logger.info(record.toString()));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 배치 커밋 리스너, 배치 컨슈머 리스너
@SpringBootApplication
public class SpringConsumerApplication {
public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
application.run(args);
}
// AckMode를 MANUAL 또는 MANUAL_IMMEDIATE로 사용할 경우에는 수동 커밋을 하기 위해 파라미터로 Acknowledgment 인스턴스를 받아야 한다.
@KafkaListener(topics = "test", groupId = "test-group-01")
public void commitListener(ConsumerRecords<String, String> records, Acknowledgment ack) {
records.forEach(record -> logger.info(record.toString()));
ack.acknowledge(); // 커밋 수행
}
//
@KafkaListener(topics = "test", groupId = "test-group-02")
public void consumerCommitListener(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
records.forEach(record -> logger.info(record.toString()));
consumer.commitSync(); // 원하는 타이밍에 커밋 가능 (commitSync, commitAsync)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24