# 컨슈머 API
# 카프카 컨슈머 프로젝트 생성
클라이언트와 동일하게 프로젝트 생성
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVICES = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVICES);
configs.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
test에 데이터를 넣어주자
bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic test
testMessage
1
2
2
출력확인
# 동기 오프셋 커밋
poll() 메서드 호출 이후 commitSync() 메서드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있다.
가장 마지막 레코드의 오프셋 기준으로 커밋
// 명시적으로 오프셋 커밋 수행
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
}
consumer.commitSync(); // 파라미터가 들어가지 않으면 poll 메서드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
개별 레코드 단위로 매번 오프셋 커밋
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
for(ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
currentOffset.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, null));
}
consumer.commitSync(currentOffset); // poll 메서드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 비동기 오프셋 커밋
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
for(ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
}
consumer.commitAsync(
new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if(e!=null){
System.err.println("Commit failed");
} else {
System.err.print("Commit suceded");
}
if(e!=null){
logger.error("commit failed for offsets {}", offsets, e);
}
}
}
);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 리밸런스 리스너를 가지는 컨슈머
- 리밸런스 : 컨슈머 그룹에서 컨슈머가 추가 또는 제거되면 파티션을 컨슈머에 재할당하는 과정
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVICES);
configs.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListner());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, null));
consumer.commitSync();
}
}
}
public static class RebalanceListner implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
logger.warn("Partitions are assigned");
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
logger.warn("partitions are revoked");
consumer.commitSync(currentOffsets);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 컨슈머의 안전한 종료
Runtime.getRuntime().addShutdownHook(new ShutDownThread());
.
.
.
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, null));
consumer.commitSync();
}
}
} catch (WakeupException e) {
logger.warn("wakeup consumer");
} finally {
consumer.close(); // 카프카 컨슈머가 안전하게 종료되었음.
}
.
.
.
static class ShutdownThread extends Thread {
public void run(){
logger.info("shutdown hook");
consumer.wakeup();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29