인프런 커뮤니티 질문&답변

김진봉님의 프로필 이미지
김진봉

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

mongodb sink connect 사용 중 update, delete 문제

작성

·

975

0

안녕하세요.

선생님 강의를 듣고 kafka connect 매커니즘에 대해 상세하게 알게 됐습니다.

다만, 실무에 적용을 하는 도중 문제에 봉착해 도움을 구하고자 문의드립니다.

현재 debezium mysql connector를 사용하여 source 데이터는 topic으로 저장하는데 성공하였지만,

해당 데이터를 mongodb 에 저장하는데 저장/업데이트는 정상적으로 되지만 delete 시 반영이 안되는 문제가 있습니다.

RDB와는 다르게 mongodb sink connector는 insert.mode는 지원하지 않고

write model Strategy 를 활용하는 걸로 보이는데,

아래와 같이 sink connector를 설정할 경우 account_id 를 key 로 해서 업데이트는 가능한데, 삭제는 안되네요?

   "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",

   "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
   "document.id.strategy.partial.value.projection.list":"account_id",
   "document.id.strategy.partial.value.projection.type":"AllowList",
   "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",

혹시 Source 에서 입력, 업데이트, 삭제를 mongodb에 반영하려면 어떻게 해야 되는지 알 수 있을까요?

감사합니다.

 

답변 1

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

mongodb는 저도 해보지 않아서 정확한 답변이 아닐 수 있지만,

먼저 debezium source connector에서 topic으로 delete message에 key값은 있지만 value가 null인지, 메시지를 확인해 보십시요.

그리고 sink connector에 아래를 설정해 보시지요.

"delete.on.null.values": "true"

해당 파라미터는 설명은 아래에 나와 있습니다.

https://www.mongodb.com/docs/kafka-connector/current/sink-connector/configuration-properties/id-strategy/

그런데 FullKeyStrategy, PartialKeyStrategy, and ProvidedInKeyStrategy

에서 동작한다고 되어 있군요. 테스트가 필요해 보입니다.

감사합니다.

 

김진봉님의 프로필 이미지
김진봉
질문자

    "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
    "document.id.strategy.partial.key.projection.list":"account_id",
    "document.id.strategy.partial.key.projection.type":"AllowList",
    "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
    "delete.on.null.values":"true",

알려주신대로 delete.on.null.values 를 true로 하고, Write Strategy를 PartialKeyStrategy로 했는데도, 업데이트는 되지만 삭제는 안되네요.

삭제를 수행하면 topic에 아래와 같이 key는 넘어가고 value는 null로 들어갑니다.

{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"account_id"}],"optional":false,"name":"fri.testdb.accounts.Key"},"payload":{"account_id":"111"}}    null

혹시나 해서 여러가지 형태로 메시지를 변형해 봤는데도 안되네요.

{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"account_id"}],"optional":false,"name":"fri.testdb.accounts.Key"},"payload":{"account_id":"111"}}	{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"account_id"},{"type":"string","optional":true,"field":"role_id"},{"type":"string","optional":true,"field":"user_name"},{"type":"string","optional":true,"field":"user_description"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"default":0,"field":"update_date"}],"optional":false,"name":"fri.testdb.accounts.Value"},"payload":{"account_id":"111",null}}
...
{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"default":0,"field":"update_date"}],"optional":false,"name":"fri.testdb.accounts.Value"},"payload":{"account_id":"111"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"account_id"}],"optional":false,"name":"fri.testdb.accounts.Key"},"payload":{"account_id":"111"}}

좀 더 테스트를 해봐야겠습니다. ㅠㅠ

감사합니다.

김진봉님의 프로필 이미지
김진봉

작성한 질문수

질문하기