인프런 커뮤니티 질문&답변

좋은 낙지님의 프로필 이미지
좋은 낙지

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

JDBC source connector - SQL server 질문

해결된 질문

작성

·

413

·

수정됨

0

항상 답변 감사합니다.

SQL server 에서 JDBC source connector 연결 구성중에 질문있어 드립니다.

 

환경세팅은 강의와 유사하게 진행하였습니다.

timestamp와 incrementing모드로 Config json을 구성하였고

MS-SQL DB는 connect_dev 계정에 om 데이터베이스의 customers 테이블을 사용하였습니다.

쿼리는 변환하여 삽입하였습니다

 

{
    "name": "mssql_jdbc_source_customers",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:sqlserver://localhost:1433;databaseName=om;trustServerCertificate=true",
        "connection.user": "connect_dev",
        "connection.password": "connect_dev",
        "topic.prefix": "mssql_jdbc_",
        "table.whitelist": "om.customers",
        "poll.interval.ms": 10000,
        "mode": "timestamp+incrementing",
        "incrementing.column.name": "customer_id",
        "timestamp.column.name": "system_upd"
    }
}

config 역시 강의와 유사하나 차이점으론 trustServerCertificate=true 옵션을 줘서 ssl 인증 문제를 회피하였습니다.

 

 

이후 질문 드리고 싶은 주요 문제로는

 아래의 에러메시지

ERROR [mssql_jdbc_source_customers|task-0] WorkerSourceTask{id=mssql_jdbc_source_customers-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
org.apache.kafka.connect.errors.ConnectException: Cannot make incremental queries using timestamp columns [system_upd] on [] because all of these columns nullable.
        at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:546)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

 

timestamp columns [system_upd] on [] because all of these columns nullable. (null이라 문제다)

가 원인이라는건데,

SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, IS_NULLABLE 
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'customers' AND COLUMN_NAME = 'system_upd'

해당 쿼리를 통해 IS_NULLABLE 컬럼이 NO로 되어있음(=null이 아님)을 확인하였습니다.

 

이 null 문제의 경우 다른 생각해 볼 접근방법이 뭐가 있을지 여쭤보고싶습니다.

incrementing 모드에서는 토픽생성까지 성공하였습니다.

감사합니다.

 

답변 1

1

권 철민님의 프로필 이미지
권 철민
지식공유자

오류로 봐서는 system_upd 컬럼이 nullable로 설정되어서 그런것 같은데, information schema로 메타 테이블로 확인하셨을 때는 nullable로 확인이 확실히 되신 건가요?

만약 그렇다면 config 파일에 validate.non.null: false 를 추가해 보시지요.

validate.non.null은 jdbc connector 기동시에 timestamp column의 not null 여부를 체크하는지 여부를 설정하며 false로 설정 시 기동시 체크하지 않습니다.

 

좋은 낙지님의 프로필 이미지
좋은 낙지
질문자

감사합니다. validate.non.null 옵션으로 해결되어 topic 정상생성되었습니다.

좋은 낙지님의 프로필 이미지
좋은 낙지

작성한 질문수

질문하기