인프런 커뮤니티 질문&답변

행복한 족제비님의 프로필 이미지

작성한 질문수

카프카 완벽 가이드 - ksqlDB

debezium에서 ksqldb로

해결된 질문

24.06.13 21:50 작성

·

127

0

제가 구상하고있는 구조가

  1. mysql에서 debezium source connector가 topic에 넘기고

     

  2. ksqldb의 streams나 table로 재구성하여 다른 topic으로 넘긴 후

  3. mysql sink database에서 받는다.

     

라는걸 구상중인데요 ksqldb에서 직접 insert를 하면 json 형식이 아니라서
sink connector가 읽지 못하는거 같습니다.

-- debezium.json --

{
    "name": "debezium",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "1234",
        "database.allowPublicKeyRetrieval": "true",

        "database.server.id": "10777",
        "database.server.name": "debe01",
        "database.include.list": "debe",
        "table.include.list": "debe.user",

        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "schema-changesde.mysql.oc",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",

        "database.connectionTimeZone": "Asia/Seoul",
        "time.precision.mode": "connect",

        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
    }
}
 

그래서 debezium으로 mysql의 data를 읽어서 topic으로 가져왔는데

ksqldb에서 그 data를 읽는 부분에서 막혔습니다

강의에 나온거처럼 ksqldb와 debezium을 연동을 해야 가능한건가요?

아니면 어떤 방법이 있을까요?

답변 2

0

권 철민님의 프로필 이미지
권 철민
지식공유자

2024. 06. 15. 14:16

안녕하십니까,

아, 질문이 있는 걸 놓쳤군요. AI 인프런 인턴이 답변을 달아서 답변 안한 질문이 없는 걸로 되어 있어서 놓쳤습니다.

근데, 정확히 질문을 이해하지 못했습니다.

mysql -> debezium source connector -> topic -> ksqldb에서 재구성하여 다른 topic 으로 넘기는데, 왜 ksqldb에서 직접 insert를 하시는 거죠?

위의 구성외에 별도로 ksqldb insert를 따로 하시겠다는 건가요? 만약 그렇다면 topic에 들어가는 format이 동일하게 입력이 되어야 합니다. debezium 이 만드는 포맷과 ksqldb에서 입력하는 포맷이 맞지 않으면 제대로 출력되지 않습니다.

원하는 답변이 아니면 다시 글 부탁드립니다.

감사합니다.

행복한 족제비님의 프로필 이미지

2024. 06. 16. 14:37

직접 insert하는것은 ksqldb로 message를 재구성하여 topic에 넣고 그걸 가져가는 sink connector를 먼저 구현해 놓으려는 목적으로 시도해본것이었습니다. 지금은 json이 아니라 avro 형식으로 바꿔서 잘 진행되고있습니다. 답변 주셔서 감사합니다

0

인프런 AI 인턴님의 프로필 이미지

2024. 06. 13. 21:57

안녕하세요, 인프런 AI 인턴입니다.

김경민님의 질문과 관련하여, Debezium을 통해 MySQL 데이터를 Apache Kafka Topic으로 가져오고, 이를 ksqlDB를 활용하여 재구성한 후 다시 MySQL Sink Database로 넘기는 작업 흐름에 대해 여쭤보셨네요.

질문에서 언급하신, ksqlDB로 데이터를 재구성하여 다른 Topic으로 전송하는 과정에서 발생하는 문제는 일반적으로 데이터 포맷에 관련된 문제일 가능성이 높습니다. Debezium에서 추출된 데이터를 ksqlDB로 처리한 뒤 JSON 포맷으로 소비할 수 있도록 sink connector가 올바르게 설정되어야 합니다.

특히, key.convertervalue.converter 설정에서 org.apache.kafka.connect.json.JsonConverterschemas.enable 옵션을 false로 설정한 것은 JSON 형태로 데이터를 전송하지만 스키마를 포함하지 않겠다는 뜻입니다. 이는 Debezium Source Connector 구성에 포함되어 있는 부분으로 보이며, ksqlDB와 Sink Connector에서도 이러한 설정을 고려해야 할 수 있습니다.

질문하신 내용과 직접적으로 동일한 상황을 다루는 구체적인 예시나 가이드를 찾기는 어려우나, 유사 상황에서 문제를 해결하기 위해 참고할 수 있는 몇 가지 자료를 제안드립니다.

  1. Debezium과 ksqlDB 사용 예시 (ksqlDB과 Debezium을 이용한 실시간 데이터 파이프라인 구축) - 이 글에서는 Debezium과 ksqlDB를 연동하여 데이터 흐름을 처리하는 방법이 일부 설명되어 있습니다. ksqlDB에서의 데이터 변환 및 처리 방법과 Sink Connector로 데이터를 전송하는 예시가 유익할 수 있습니다.

  2. MySQL과 Kafka 연동 (질문있어요!) - 이 게시물에서는 MySQL과 Kafka를 연결하는 방법에 대한 질문과, Debezium을 사용하여 다양한 데이터베이스와 Kafka를 연동하는 과정에 대한 조언이 포함되어 있습니다. 다른 데이터베이스로의 확장 가능성을 탐색하는 데 도움이 될 수 있습니다.

이러한 자료들은 직접적인 답변을 제공하기보다는 ksqlDB와 Debezium을 사용하여 데이터 처리 흐름을 구성하는 데 있어서 일반적인 안내와 팁을 제공하고 있습니다.

김경민님의 경우, ksqlDB에서 Debezium으로 가져온 데이터를 처리할 때 올바른 포맷으로 변환되고 있는지 확인하고, 필요한 경우 CREATE STREAM 또는 CREATE TABLE 문에서 데이터 포맷을 지정하는 방법에 대해 자세히 살펴보시기를 권장드립니다. 또한, ksqlDB로부터 데이터를 읽어 MySQL Sink Connector로 전송하는 과정에서 데이터 형식을 적절히 조정하는 것이 중요합니다.

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.