작성
·
235
0
궁금한게 있어서 질문드립니다~~
consumer를 구현할 때 데이터를 올드 데이터가 덮어쓰는 경우나 삭제 처리 때문에 날짜를 validation하는 로직이 들어가게 되는데요
데이터를 올드 데이터가 덮어쓰는 경우는 저장소가 장애가나거나 비즈니스 로직으로인해 에러가 나고 뉴 데이터는 들어가게 되었을 때 올드 데이터가 데드레터에 들어가 있고 복구된다면 덮어쓰게 됩니다.
consumer -> validation store
validation 통과후
consumer -> source store
위의 과정 후에 저장하게 됩니다.
그런데 카프카 커넥터를 기반으로 했을 때는 위의 과정 처리를 어떻게 할 수 있을까요?
물론 로직이 들어가게 되므로 커넥터를 사용하지 않고 구현하는게 맞는 것 같기는한데 만약 그렇게 될 경우에는 카프카 커넥터의 사용범위가 로그성 데이터나 초기 데이터를 이관할 때 정도로 사용되는 범위가 축소될 것 같아서 질문드려 봅니다!
감사합니다
답변 1
0
안녕하십니까,
Validation 로직을 어떻게 적용하는지에 대해서 좀 더 자세히 적어 주셨으면 합니다.
먼저 개괄적인 답변을 드리면 간단한 변환의 경우 Connect의 SMT를 이용하면 됩니다.
변환 로직이 SMT로 변환이 어려운 복잡한 로직일 경우에는 Connect + ksqlDB를 적용하면 됩니다. 그런데 ksqldb에 대해서는 본 강의에 포함되어 있지 않습니다.
감사합니다
consumer에서 db에 입력하기 전에 다시 다른 documentDB나 LocalDB에서 validation을 한다는 건가요? 아님 consumer에서 db에 입력등을 하지 않고 그냥 documentDB나 LocalDB에서 Validation을 한다는 건가요?
아님 db에 입력을 하기전에 해당 db에서 먼저 validation을 한다는 건가요?
그리고 validation을 동일한 pk 값으로 이미 존재하고 있는지를 validation한다는 건가요? 아님 어떤 validation을 한다는 건가요?
consumer에서 sink db에 저장하기전에 documentdb나 localdb에 수정 날짜 기반으로 validation을 한 다음에 현재 데이터가 documentdb나 localdb에 수정 일자가 더 최신이라면 그렇게 되도록 하고 있습니다.
저는 target source가 elasticsearch이고 수정일자 validation은 documentDB로 하고 있습니다.
documentdb에는 키와 수정일자만 들어가있습니다.
최신 데이터가 documentdb에 들어가있는 데이터보다 최근이면 데이터를 갱신합니다
조회 조건 key = 1 and modifiedDate <= 2023-05-14 14:22:03
이렇게 하는 이유는 target source가 validation으로 인한 부하가 가장 큰 이유입니다
upsert 퍼포먼스나 원자성 이런 것들도 있지만요.
검증을 타 DB에서 수행하고 Target DB에 입력하는 현재 방식의 로직은 SMT나 kSQLDB를 사용하더라도 JDBC Sink Connector나 Elasticsearch Sink Connector 모두에서 다 불가할 것 같습니다.
네
key : 1, value: "haha", modifieDate: 2023-05-14 14:22:03
key : 1, value: "haha12", modifieDate: 2023-05-14 14:22:05
위 두개가 메시지가 있습니다 그러나 첫번째 메시지가 엔드포인트에 저장하는 것이 실패해서 데드레터로 가게 되었고 2번째 메시지는 처리되었습니다 즉 저장되었습니다
그러나 데드레터에 있는 메시지를 다시 컨슈밍하여 처리하는 과정에서 첫번째 메시지가 두번째 메시지를 덮어쓸 수 있기 때문에 documentDB나 local DB를 사용해서 validation을 하게 됩니다.
이것을 카프카 커넥트로는 어떻게 처리할 수 있는지에 대한 질문이였습니다.
혹시 이해가 안가신다면 다시 말씀 부탁드립니다.