인프런 커뮤니티 질문&답변

Calice님의 프로필 이미지
Calice

작성한 질문수

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Orders Microservice 수정 - Order Kafka Producer

Kafka Connect Sink 사용할 때 LocalDateTime은 어떻게 하나요??

작성

·

385

0

안녕하세요.
강사님의 강의를 듣고 개인 프로젝트에 적용하고 있습니다.

LocalDateTime이 없을 때는 잘 수행되었는데 제 개인 프로젝트 엔티티에는 LocalDateTime이 있습니다. 이 부분에서 자꾸 에러가 나와서 어떻게 처리해야 하는지 알려주시면 감사하겠습니다.

 

다음은 제가 LocalDateTime을 처리하기 위해 따로 만든 엔티티 입니다.

package com.dotd.product.kafkaConnectTest;


import jdk.jfr.Name;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;

import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Table(name = "nametest")
@EntityListeners(AuditingEntityListener.class)
public class NameTest {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;

    private String name;

    @CreatedDate
    @Column(name = "created_at")
    private LocalDateTime createdAt;


}

 

 

 

또한, Kafka Connect Sink가 한 번 에러가 발생하면 state : FAILED로 바뀌면서 작동을 안 하는 거 같은데 어떻게 정상화 하나요..??

 

127.0.0.1:8083/connectors/nametest-sink-connect/status

이 요청을 보냈을 때 이렇게 나옵니다.

 

{
    "name": "nametest-sink-connect",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.0.14:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "192.168.0.14:8083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\r\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\r\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\r\n\tat java.base/java.lang.Thread.run(Thread.java:834)\r\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\r\njava.sql.BatchUpdateException: Column 'created_at' cannot be null\r\n\r\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:124)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\r\n\t... 10 more\r\nCaused by: java.sql.SQLException: Exception chain:\r\njava.sql.BatchUpdateException: Column 'created_at' cannot be null\r\n\r\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:154)\r\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:104)\r\n\t... 11 more\r\n"
        }
    ],
    "type": "sink"
}

 

답변 1

0

안녕하세요, 인프런 AI 인턴이에요. Kafka Connect Sink에서 LocalDateTime을 처리하는 방법에 대해서 알려드릴게요.

Kafka Connect Sink는 기본적으로 Avro 형식을 지원합니다. 그래서 LocalDateTime과 같은 Java 8의 새로운 날짜시간 API를 사용할 수 있도록 설정하셔야 합니다.

먼저, Kafka Connect Sink 설정 파일에 다음과 같은 내용을 추가해주세요.

"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "<스키마 레지스트리 URL>",
"value.converter.schema.registry.url": "<스키마 레지스트리 URL>",
"transforms": "ValueToStruct",
"transforms.ValueToStruct.type": "org.apache.kafka.connect.transforms.HoistField$Value"

위 설정에서 <스키마 레지스트리 URL> 부분에는 실제 스키마 레지스트리의 URL을 입력해야 합니다.

그리고 LocalDateTime을 사용하는 필드인 경우, 스키마를 정의해야 합니다. 예를 들어, 다음과 같이 스키마를 정의할 수 있습니다.

{
  "namespace": "com.dotd.product.kafkaConnectTest",
  "type": "record",
  "name": "NameTest",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "createdAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

createdAt 필드의 logicalTypetimestamp-millis로 설정하여 LocalDateTime을 변환하도록 합니다.

이렇게 설정을 하면 Kafka Connect Sink가 LocalDateTime을 처리할 수 있게 됩니다.

추가로, Kafka Connect Sink가 에러가 발생하고 state가 FAILED로 전환되면, Kafka Connect의 상태를 확인하고 로그를 분석해야 합니다. 문제를 해결하기 위해서는 로그에서 발생한 에러 메시지를 확인하고 그에 맞는 조치를 취해야 합니다.

Kafka Connect Sink를 정상화하기 위해서는 에러 메시지의 원인을 파악하고 그에 따른 조치를 취해야 합니다. 예를 들어, Sink Connector 설정의 문제, 데이터 소스에 대한 접근 권한 부족 등 여러 가지 원인이 있을 수 있으므로 상세한 에러 메시지를 살펴보고 조치를 취하시면 됩니다.

이러한 경우, 일반적으로 Kafka Connect Sink의 문서나 커뮤니티나 고객센터에 문의하여 도움을 받을 수도 있습니다.

다른 질문이 있으시면 언제든지 물어보세요! 좋은 결과 있기를 바라요.

Calice님의 프로필 이미지
Calice

작성한 질문수

질문하기