해결된 질문
작성
·
302
1
https://github.com/chulminkw/KafkaConnect/blob/723d598394241434f424184998176c903c0967f9/%EC%8B%A4%EC%8A%B5%EC%88%98%ED%96%89/JDBC%20Sink%20Connector%20%EC%8B%A4%EC%8A%B5.md
Source 테이블과 연계하여 Sink 테이블에 데이터 연동 테스트
다른 테이블에 대해서도 Sink Connector를 생성하고 Source 테이블에 데이터 입력하여 Sink(Target) 테이블에 데이터가 동기화 되는지 확인.
products_sink용 sink connector를 위해서 아래 설정을 mysql_jdbc_sink_products.json 파일에 저장.
{
"name": "mysql_jdbc_sink_products",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysql_jdbc_products",
"connection.url": "jdbc:mysql://localhost:3306/om_sink",
"connection.user": "connect_dev",
"connection.password": "connect_dev",
"insert.mode": "upsert",
"pk.mode": "record_key"
"pk.fields": "product_id",
"delete.enabled": "true",
"table.name.format": "om_sink.products_sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
실습하다가 발견해서 제보드립니다.
해당부분 Git에 "pk.mode": "record_key" 다음에 쉼표가 없어 register_connector 가 작동되지 않았었습니다.
쉼표를 추가하니 해결되었습니다 ㅎㅎ