인프런 커뮤니티 질문&답변

좋은 낙지님의 프로필 이미지
좋은 낙지

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

Numeric과 Decimal 컬럼 타입 데이터를 Debezium Source와 JDBC Sink로 연동 실습

Debezium Source 에서 topic에 저장되는 UTC시간대 질문

작성

·

911

·

수정됨

0

안녕하세요.
topic에 UTC 시간대로 저장되는 문제가 있습니다.
해결 접근방법에 조언을 듣고 싶습니다.


Sink를 적용했을때 customers, products,order_items는 문제없이 적용되었으나 orders테이블의 timestamp타입의 order_datetime컬럼에 문제가 발생하여
SMT 옵션을 추가하다가 발견한 문제입니다.

결론적으로 mysql_cdc_oc_sink_orders_01.json에

        "transforms": "ConvertDateTimeType",
        "transforms.ConvertDateTimeType.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.ConvertDateTimeType.target.type": "Timestamp",
        "transforms.ConvertDateTimeType.field": "order_datetime",
        "transforms.ConvertDateTimeType.format": "yyyy-MM-dd'T'HH:mm:ss'Z'",
        "transforms.ConvertDateTimeType.timezone": "Asia/Seoul"

위 옵션을 추가하여 sink로 저장을 해결하였으나 topic에 저장되는 시간이 다르게 저장되는것을 발견했습니다.

source 데이터베이스에서는 2023-06-20 13:56:40 에 저장하였으나

sink 데이터베이스에서는 2023-06-20 04:56:40으로 저장되고 있었습니다.

이에 topic을 확인해보니 저장되는 시간대가 2023-06-20 04:56:40으로 topic에서부터 저장되는 값이 다른 것을 알 수 있었습니다.

따라서 source설정쪽이 문제일 것 같은데
"database.connectionTimeZone": "Asia/Seoul"
옵션을 넣었음에도 UTC로 적용되고있어 질문드립니다.

 

감사합니다.

 

 

 

 


mysql_cdc_oc_source_01.json

{
    "name": "mysql_cdc_oc_source_01",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.56.101",
        "database.port": "3306",
        "database.user": "connect_dev",
        "database.password": "connect_dev",
        "database.server.id": "10001",
        "database.server.name": "mysql01",
        "database.include.list": "oc",
        "table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items",
        "database.history.kafka.bootstrap.servers": "192.168.56.101:9092",
        "database.history.kafka.topic": "schema-changes.mysql.oc",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 
 
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "database.connectionTimeZone": "Asia/Seoul"
    }
}


MYSQL의 TIME_ZONE은 한국시간대입니다.

mysql> select @@system_time_zone;

+--------------------+

| @@system_time_zone |

+--------------------+

| KST                |

+--------------------+

 

답변 2

1

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

뒤에 나오는 DBMS 컬럼 타입에 따른 Debezium 메시지 변환 이해 - Timestamp 타입(timezone 포함),

Timestamp 컬럼 타입 데이터를 Debezium Source와 JDBC Sink로 연동 실습 - 01, 02 를 다 들으시고, 작업하셨는데, 안되는 건가요?

해당 강의를 안들으셨으면, 먼저 들으신 다음에 작업을 수행해 보셨으면 합니다.

먼저 말씀 드리면 Debezium Source Connector는 기본적으로 UTC로 Topic에 데이터를 저장합니다. 다른 상세 사항은 해당 강의를 참조 부탁드립니다.

만약 해당 강의를 들으셨는데, 안되는 거면 다시 글 부탁드립니다.

감사합니다.

0

하단과 같이 하면 될 듯 합니다.

MySQL의 datetime에는 타임존이 없으나, mysql-binlog-connector-java에서는 Datetime 클래스 기반으로 날짜 처리를 하다보니. 자연스럽게 DBMS와 JVM 타임존 영향을 받게 됩니다.

transforms=convertTimezone
transforms.convertTimezone.type=io.debezium.transforms.TimezoneConverter
transforms.convertTimezone.converted.timezone=Asia/Seoul

추가적으로 확인은 필요하지만, 위 적용 이후 MySQL의 time과 같은 타입도 정상적으로 의도한 결과대로 나오는지 체크도 해봐야합니다. Connector레벨에서 타임존 없는 데이터는 원천 데이터 그대로 전달해주면 좋을텐데.. (String타입일지라도. ^^)

좋은 낙지님의 프로필 이미지
좋은 낙지

작성한 질문수

질문하기