# Kafka broker + Spring boot kafka producer + Spring Kafka batch listner
# 박M 저번 시간 피드백
- 공통고민
- 왜 를 고민하는 과정
- java util화 component 화
- 시나리오 타겟 우리 환경에서 이렇게 이렇게 그래서 나는 이걸 컴포넌트로 만들었다
- 스토리 라인
# 시나리오
스프링 부트 카프카 프로듀서 사용하여 사용자가 인사정보 생성(localhost:8081)
1-1. api로 인사정보 1000건 전송 가정 하였을때 1000건 모두 직렬화 하여 한번에 카프카 전송
1-2. api로 인사정보 1000건 전송, 서버에서 루프 돌려서 인사정보 1건씩 카프카 전송
1-3. api로 단건 전송 1000번 실행, 카프카로 단건전송카프카 브로커 전송(aws)
배치에서 실시간 전송 받음
# spring boot + kafka producer
// arr
var arr = [];
for(var i=0;i<userCnt;i++){
arr.push({"name":"이름"+i, "no":i}); // 갯수만큼 임의 더미 유저 생성
}
$.ajax({
url: 'http://localhost:8081/sendKafka'
, type: 'post'
, dataType: 'json' // 데이터 타입을 Json으로 변경
, contentType: 'application/json' // Content-Type을 Json으로 변경
, data: JSON.stringify(arr) // JSON String으로 전환하여 보낸다.
, success: function(response) {
}
});
function sendDatasingle() {
var userCnt = Number($("#user_name").val());
alert("sendData");
for(var i=0;i<userCnt;i++){
var arr = {"name":"이름"+i, "no":i};
$.ajax({
url: 'http://localhost:8081/sendKafkasingle'
, type: 'post'
, contentType: 'application/json' // 데이터 타입을 Json으로 변경
, data: JSON.stringify(arr) // JSON String으로 전환하여 보낸다.
, success: function(response) {
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
현재 gw api가 스프링 부트 이므로 스프링 부트에 카프카 프로듀서를 붙인다 가정
dependencies {
implementation 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE'
implementation 'org.springframework.boot:spring-boot-starter-web:2.3.4.RELEASE'
implementation 'com.google.code.gson:gson:2.8.6'
}
1
2
3
4
5
2
3
4
5
# 공통 모듈화
- common 패키지 아래 클래스 생성
com.example.common.KafkaProducer.java
- 템플릿 화 시켜서 클래스와 List클래스 둘다 한 함수로 전송 가능
@Service
public class KafkaProducer {
private final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public <T> void send(String topic, T data) {
Gson gson = new Gson();
String jsonColorLog = gson.toJson(data);
kafkaTemplate.send(topic, jsonColorLog).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("success"+result.toString());
}
@Override
public void onFailure(Throwable ex) {
logger.error(ex.getMessage(), ex);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
컨트롤러단에서 호출
// 시나리오 1. 리스트 전송
@PostMapping("/sendKafka")
public void url(@RequestBody List<MemberVO> members) {
this.kafkaProducer.send("select-color", members);
}
// 시나리오 2. 인스턴스 하나 전송
@PostMapping("/sendKafka2")
public void sendKafka2(@RequestBody List<MemberVO> members) {
members.forEach( memberVO -> this.kafkaProducer.send("select-color", memberVO));
}
// 시나리오 3. api 단건 전송
@PostMapping("/sendKafkasingle")
public void url(@RequestBody MemberVO member) {
this.kafkaProducer.send("select-color", member);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# kafka batch listner
배치 리스너에서 실시간 전송 받음
dependencies {
implementation 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE'
implementation 'org.springframework.boot:spring-boot-starter:2.4.0'
}
1
2
3
4
2
3
4
@KafkaListener(topics = "select-color", groupId = "test-group-01")
public void batchListener(ConsumerRecords<String, String> records) {
records.forEach(record -> logger.info(record.toString()));
}
1
2
3
4
2
3
4
# +++ 오늘의 피드백
feedback
- 키값, 라스트 원, 내가 원하는값만 가져오고 싶을때, 옮긴 데이터만 알고 싶을때
- 변화된 데이터를 어떻게 판단? 뭘로? (레거시 에서 요청했을대 한사람꺼 변한거 전체 다)
- 식별할 수 있는 키값 정의!
- 수명주기 & 키 값 건드리면 안되 자동으로 없어지라고 그래서 이중에서 최근걸 찾을수 있어야
- 전문정의서 : json 항목 파라미터는 뭔지 url은 뭔지 타입
- 레디스 & 카우치베이스 거의 똑같다
- 다음주중으로 키값 고민