인프런 커뮤니티 질문&답변

techsupport2님의 프로필 이미지
techsupport2

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

connector 여러 개 생성 시 지연 현상 문의

해결된 질문

작성

·

1K

·

수정됨

1

안녕하세요 강의를 들으면서 cdc를 익히고 있는 수강생입니다.

해당 강의 내용을 바탕으로 oracle --> kafka --> oracle 로 cdc 구성을 하고 있는데요. 구성 중 질문이 있어 남기게 되었습니다.

  1. A라는 테이블에 대해 source connector 와 sink connector 를 하나만 생성 했을 때는

데이터 변경 시 정상적으로 빠르게 작동을 하는데요.

추가적으로 B라는 테이블 source connector 와 sink connector 를 생성하면 A,B 모두 데이터 변경이 엄청 느립니다. 대략 5-10분 정도 뒤에 변경이 되는데요 원인이 무엇일까요? (sink 쪽에서 느립니다.)

참고로 debezium 버전은 2.1 입니다.

  1. 1.9에서는 최초 source connector 생성시 스냅샷 시간이 별로 걸리지 않았는데요, 2.1에서는 스냅샷 시간이 상당히 늘어났습니다. 혹시 다른 옵션이 있을까요? 매뉴얼에서는 찾지 못해 질문 드립니다.

  2. "auto.evolve": "true" 옵션 사용 시 source 에서 컬럼을 추가하면 sink 에서도 생성이 되는 것은 확인했는데 source 에서 다시 컬럼을 삭제하면 sink 에서는 삭제가 안되는데요. 추가 해야 될 옵션이 있을까요? 아니면 아직 컬럼 삭제에 대한 지원이 되지 않는 것일 까요??

참고로 제가 작성한 json 내용은 아래와 같습니다.

#########################################################################################

A 테이블 source

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '

{

"name": "source_connector_af_claim",

"config" : {

"connector.class" : "io.debezium.connector.oracle.OracleConnector",

"db_type":"oracle",

"tasks.max" : "1",

 

"database.server.name" : "source_connector_01_af_claim",

"database.user" : "xxxxx",

"database.password" : "xxxxx",

"database.url": "jdbc:oracle:thin:@10.74.102.218:1521:aaa",

"database.dbname" : "aaa",

"database.out.server.name":"ora_source_af_claim_out",

 

"schema.history.internal.kafka.bootstrap.servers" : "10.74.105.61:9092",

"schema.history.internal.kafka.topic": "ora_source_af_claim_history",

 

"include.schema.changes": "true",

"database.connection.adapter": "logminer",

"topic.prefix": "ORA_SOURCE_01",

 

"schema.include.list": "xxxxx",

"table.include.list":"xxxxx.AF_CLAIM",

"include.schema.changes": "true",

 

"auto.evolve": "true",

"time.precision.mode": "connect",

 

"key.converter": "org.apache.kafka.connect.json.JsonConverter",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

 

"snapshot.mode" : "initial",

 

"tombstones.on.delete": "true",

 

"transforms": "rename_topic",

"transforms.rename_topic.type" : "org.apache.kafka.connect.transforms.RegexRouter",

"transforms.rename_topic.regex" : "ORA_SOURCE_01(.*)",

"transforms.rename_topic.replacement" : "source_$1",

 

"transforms": "unwrap",

"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

"transforms.unwrap.drop.tombstones": "false"

 

}

}'

#########################################################################################

A 테이블 sink

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '

{

"name": "sink_connector_af-claim",

"config": {

"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

"tasks.max": "1",

"table.name.format": "AF_CLAIM_TEST",

"topics": "ORA_SOURCE_01.xxxxx.AF_CLAIM",

 

"connection.url": "jdbc:oracle:thin:@10.74.102.218:1521:aaa",

"connection.user": "xxxxx",

"connection.password": "xxxxx",

 

"auto.evolve": "true",

"insert.mode": "upsert",

"delete.enabled": "true",

 

"key.converter": "org.apache.kafka.connect.json.JsonConverter",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

 

"table.include.list" : "xxxxx.AF_CLAIM",

"pk.fields": "AF_CLAIM_GROUP_NO , AF_CLAIM_NO",

"pk.mode": "record_key"

 

}

}'

#########################################################################################

B 테이블 source

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '

{

"name": "source_connector_af_cs_mgmt",

"config" : {

"connector.class" : "io.debezium.connector.oracle.OracleConnector",

"db_type":"oracle",

"tasks.max" : "1",

 

"database.server.name" : "source_connector_02_af_cs_mgmt",

"database.user" : "xxxxx",

"database.password" : "xxxxx",

"database.url": "jdbc:oracle:thin:@10.74.102.218:1521:aaa",

"database.dbname" : "aaa",

"database.out.server.name":"ora_source_af_cs_mgmt_out",

 

"schema.history.internal.kafka.bootstrap.servers" : "10.74.105.61:9092",

"schema.history.internal.kafka.topic": "ora_source_af_cs_mgmt_history",

 

"schema.include.list": "xxxxx",

"include.schema.changes": "true",

"database.connection.adapter": "logminer",

"topic.prefix": "ORA_SOURCE_02",

"table.include.list":"xxxxx.AF_CS_MGMT",

"include.schema.changes": "true",

 

"auto.evolve": "true",

"time.precision.mode": "connect",

 

"key.converter": "org.apache.kafka.connect.json.JsonConverter",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

 

"snapshot.mode" : "schema_only",

 

"tombstones.on.delete": "true",

 

"transforms": "rename_topic",

"transforms.rename_topic.type" : "org.apache.kafka.connect.transforms.RegexRouter",

"transforms.rename_topic.regex" : "ORA_SOURCE_02(.*)",

"transforms.rename_topic.replacement" : "source_$1",

 

"transforms": "unwrap",

"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

"transforms.unwrap.drop.tombstones": "false"

 

 

}

}'

#########################################################################################

B 테이블 sink

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '

{

"name": "sink-connector_af_cs_mgmt",

"config": {

"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

"tasks.max": "1",

"table.name.format": "AF_CS_MGMT_TEST",

"topics": "ORA_SOURCE_02.xxxxx.AF_CS_MGMT",

 

"connection.url": "jdbc:oracle:thin:@10.74.102.218:1521:aaa",

"connection.user": "xxxxx",

"connection.password": "xxxxx",

 

"auto.evolve": "true",

"insert.mode": "upsert",

"delete.enabled": "true",

 

"key.converter": "org.apache.kafka.connect.json.JsonConverter",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

 

"table.include.list" : "xxxxx.AF_CS_MGMT",

"pk.fields": "AFCR_NO",

"pk.mode": "record_key"

 

}

}'

 

 

 

답변 1

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

오, 강의에서는 설명드리지 않은 Debezium Oracle을 적용해 보시는 군요. 대단하군요.

저도 오라클에서는 Debezium을 구동해 보지 않았지만,

먼저

두개의 테이블을 소스로 하는데 두개의 source connector를 생성하셨다는 말씀인지요?

만약 그렇다면 , 그렇게 만드실 필요는 없습니다. Debezium은 CDC를 기반으로 하고 있기 때문에 Redo log를 기반으로 만드는 Source connector를 하나만 생성하시면 됩니다. CDC Debizium Source Connector는 모든 Redo log를 읽습니다. 다만 topic으로 만들 table은 대상은 table.include.list 파라미터를 통해서 지정할 수 있습니다. 때문에 두개의 CDC Source Connector를 만들면 redo log를 각각의 Source connector가 읽기 때문에 적용하지 않아야 합니다.

토픽으로 만들 테이블이 두개 이상이라면 table.include.list에 콤마로 분리해서 테이블 명을 넣어주시기 바랍니다. 아마 schema.테이블명을 함께 적어주셔야 할겁니다.

그리고

A,B 모두 데이터 변경이 엄청 느립니다. 대략 5-10분 정도 뒤에 변경이 되는데요 원인이 무엇일까요? (sink 쪽에서 느립니다.)

라고 하셨는데, Sink쪽에서 느리다고 하셨는데, 그럼 소스 테이블 A,B의 변경 사항이 토픽에 적용되는 부분은 느리지 않는지요? kafkacat 등으로 토픽에 실시간으로 잘 적용되었는지 확인 부탁드립니다.

그런데 소스 테이블에서 토픽에 write적용은 실시간으로 잘 되었는데, 토픽에서 Sink 테이블에 적용이 느리다라고 하면, 음.. 원격으로 제가 정확한 이유를 찾아드리기가 좀 어려운 부분이 있을 것 같습니다. sink쪽 table의 lock등의 문제는 아닐까 생각도 됩니다만, 일단 sink쪽 rdbms도 재 기동 해보시고, connect도 다 재기동 해보시고 그리고 일단 위에서 언급 드린 내용대로 환경을 변경하신 뒤에 다시 한번 테스트 해보시고 글 부탁드립니다

그리고,

1.9에서는 최초 source connector 생성시 스냅샷 시간이 별로 걸리지 않았는데요, 2.1에서는 스냅샷 시간이 상당히 늘어났습니다. 혹시 다른 옵션이 있을까요? 매뉴얼에서는 찾지 못해 질문 드립니다.

=> 이건 저도 이유는 정확하게 모르겠습니다만,

먼저 A 테이블은 SNAPSHOT.MODE가 INITIAL이군요. B 테이블은 SCHEMA ONLY이고, 그럼 A테이블에 데이터가 많다면 SNAPSHOT에 시간이 오래 걸렸을 것 같은데, 처음에는 오래 걸리지 않았는데, 두개의 CONNECTOR를 만들고 나서 오래 걸렸다는 의미 인지요?

그리고

"auto.evolve": "true" 옵션 사용 시 source 에서 컬럼을 추가하면 sink 에서도 생성이 되는 것은 확인했는데 source 에서 다시 컬럼을 삭제하면 sink 에서는 삭제가 안되는데요. 추가 해야 될 옵션이 있을까요? 아니면 아직 컬럼 삭제에 대한 지원이 되지 않는 것일 까요??

=> 강의에서 말씀 드린대로 auto.evolve를 true로 설정해도 컬럼 삭제에 대해서는 자동 반영 되지 않습니다.

 

추가적으로, Oracle에 Debezium을 적용해 보실 정도면 이미 뛰어난 오라클 지식이 있으시겠지만, Oracle에 CDC를 적용하려면 Supplemental log 로 DB를 변경해 줘야 합니다. 아마 Debezium 문서에도 나와 있으니 잘 적용하셨을거라 생각되지만 확인차 적어 봅니다.

 

감사합니다.

techsupport2님의 프로필 이미지
techsupport2

작성한 질문수

질문하기