오늘 배달 주문을 하면 상품 DB에 수량 감소가 되도록 Kafka Consumer를 이용해서 토픽의 데이터를 가져와 코드를 실행하도록 했다. 그런데 무슨 이유에서인지 반영이 제대로 되지 않는 문제가 발생했다.
원인 확인하기
docker exec -it kafka bash
kafka-console-consumer --bootstrap-server kafka:9092 --topic source_order_products --from-beginning
위 명령어로 도대체 토픽에 어떤 데이터가 저장되는지 확인해 봤다.

확인해보니 Sink에서 전달한 내용과 똑같은 형식으로 생긴 것을 알 수 있다. type을 struct로 해서 그런 것 같다.
어쨌든 토픽의 데이터를 확인했으니 문제점을 찾아보자.
몇시간동안 생각한 결과 원인을 하나 찾아냈다.
{
"name": "order-product-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://생략:3306/delibird",
"connection.user": "megamaker",
"connection.password": "생략",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "order_products"
}
}
위는 sink connect의 내용이고,
{
"name": "product-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://생략:3306/delibird",
"connection.user": "megamaker",
"connection.password": "생략",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "delibird.order_products",
"topic.prefix": "source_",
"tasks.max": "1"
}
}
이건 source connect의 내용이다.
Consumer 토픽명 확인하기
여기서 내가 헷갈린 부분은 topics과 topic.prefix 이 부분이다. 참고로 위 내용들은 제대로 수정한 내용을 올렸다.
나는 두 커넥터가 같은 토픽을 사용해야 한다고 생각했었다. 왜 그랬는지는 모르겠지만 이 부분이 잘못됐었다. DB에 저장하는 sink에서의 topics는 그대로 order_products 토픽으로 만들어지는거고 DB의 table.whitelist의 테이블 변경사항을 감지하여 topic.prefix를 붙인대로 토픽이 생성된다. 그래서 여기서는 source_order_products라는 토픽이 생성되는 것이다.
kafka-topics --list --bootstrap-server kafka:9092

위 명령어로 확인해보면 order_products와 source_order_products이 있는데 각각 sink와 source에서 사용되는 토픽이다.

그래서 Consumer 역할을 하는 부분에서 토픽 부분은 source에서 지정한 토픽명을 사용해야 해당 데이터를 받아올 수 있다.
JSON 파싱 문제
다음에는 한 가지 문제가 더 발생했는데 데이터는 가져와지지만 null이라고 예외가 발생한 상황이다. 이건 단순히 아까 토픽에 저장되어있는 JSON 구조대로 가져오지 않고 바로 product_id라는 키로 접근했기 때문이다.

그래서 해당 데이터 구조에 맞게 먼저 payload에 접근하고 그 다음 내용들에 접근했다.
여기까지 해결하니까 DB에 원하던대로 수량이 잘 반영됐다..!
'공부 > Kafka' 카테고리의 다른 글
| [Kafka] Docker 컨테이너와 로컬 개발 환경 연결 및 문제 해결하기 (0) | 2024.07.05 |
|---|---|
| [Kafka] Confluent Kafka 실행 및 Connect 사용하기 (0) | 2024.07.04 |
| [Kafka Connect] Kafka Connect Sink DB에 저장이 안 될 때 (0) | 2024.05.27 |
| [Kafka Connect] Docker로 Kafka Connect 사용하기 (0) | 2024.05.26 |