인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

김동선님의 프로필 이미지
김동선

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

스키마 레지스트리 기반의 JDBC Sink Connector 설정 및 생성 하기

해결하지 못한 에러가 발생 하였습니다.

작성

·

457

·

수정됨

0

안녕하세요 개발자님 에러를 해결 하지 못해 도움을 받고 싶습니다.

ksqldb 마지막 강의를 마치고 실습을 하던중 mysql 테이블을

topic으로 connector(avro)를 한후 ksqldb에서 table을 만드는 과정에서 타입 변환 문제가 발생 하였습니다.

 

avro를 통해 register에 스키마를 저장 하고 사용 하고자 하였습니다.

강의 해주신 .properties 설정은 하였구요.

topic에 데이터 들어온느거 확인

스키마 확인

sqldb 테이블 생성은 되지만 검색시 밑에와 같은 에러가 발생합니다.

 

source, sink connector 실습은 잘 되었으며, ksqldb 거치지 않고 ELK에 데이터도 잘 보내 집니다.

 

ksqldb에서 table 생성 과정에서 PRIMARY KEY설정을 하고 생성이 됩니다. 하지만 검색을 하면 밑에와 같은 에러가 납니다.

PRIMARY KEY없이 table을 생성하면 key값이 보내면 Json형태의 키로 배출됩니다. {CUSTOMER_ID=1}

key 타입을 INTEGER, bigint, int 타입 해보았습니다.

mysql table도 다른걸로 만들어보고 했습니다.

혹시 네가 노친것이 무엇인가요?

어떻게 해야 할까여?

 

register 실행 로그를 보니

WARNING: A provider io.confluent.kafka.schemaregistry.rest.resources.SchemasResource regi stered in SERVER runtime does not implement any provider interfaces applicable in the SER VER runtime. Due to constraint configuration problems the provider io.confluent.kafka.sch emaregistry.rest.resources.SchemasResource will be ignored.

Feb. 08, 2024 5:59:52 A.M. org.glassfish.jersey.internal.inject.Providers checkProviderRu ntime

있습니다. 어떻게 해야 하나요?

 

 

 

[2024-02-08 00:47:43,983] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing message from topic: mysqlavro022.oc.customers","recordB64":null,"cause":["Cannot deserialize type struct as type int32 for path: "],"topic":"mysqlavro022.oc.customers"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_OC_CUSTOMER_3798951142359913405.KsqlTopic.Source.deserializer:44)

[2024-02-08 00:47:43,988] WARN stream-thread [_confluent-ksql-default_transient_transient_OC_CUSTOMER_3798951142359913405_1707320860130-b2b59a3e-3875-4eab-ad2a-185533cf65bc-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[mysqlavro022.oc.customers] partition=[0] offset=[0] (org.apache.kafka.streams.processor.internals.RecordDeserializer:89)

org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: mysqlavro022.oc.customers

at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:55)

at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)

at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:47)

at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:26)

at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)

at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)

at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)

at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)

at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54)

at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)

at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:178)

at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)

at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)

at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:968)

at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)

at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)

at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)

at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)

at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)

Caused by: org.apache.kafka.connect.errors.DataException: Cannot deserialize type struct as type int32 for path:

at io.confluent.ksql.serde.connect.ConnectDataTranslator.throwTypeMismatchException(ConnectDataTranslator.java:71)

at io.confluent.ksql.serde.connect.ConnectDataTranslator.validateType(ConnectDataTranslator.java:90)

at io.confluent.ksql.serde.connect.ConnectDataTranslator.validateSchema(ConnectDataTranslator.java:154)

at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:200)

at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlRow(ConnectDataTranslator.java:54)

at io.confluent.ksql.serde.avro.AvroDataTranslator.toKsqlRow(AvroDataTranslator.java:67)

at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:51)

... 18 more

 

 

 

user01@ubuntu-20:~/kafka/connector_configs/cdc_source_mysql$ register_connector cdc_source_mysql/mysql_cdc_ops_source_avro_01.json

HTTP/1.1 201 Created

Content-Length: 1007

Content-Type: application/json

Date: Wed, 07 Feb 2024 15:42:52 GMT

Location: http://localhost:8083/connectors/mysql_cdc_ops_source_avro_03

Server: Jetty(9.4.44.v20210927)

{

"config": {

"connector.class": "io.debezium.connector.mysql.MySqlConnector",

"database.connectionTimezone": "Asia/Seoul",

"database.history.kafka.bootstrap.servers": "localhost:9092",

"database.history.kafka.topic": "schema-changes.mysql.oc",

"database.hostname": "192.168.0.26",

"database.include.list": "oc",

"database.password": "1234",

"database.port": "3306",

"database.server.id": "31002",

"database.server.name": "mysqlavro022",

"database.user": "cnt_dev",

"key.converter": "io.confluent.connect.avro.AvroConverter",

"key.converter.schema.registry.url": "http://localhost:8081",

"name": "mysql_cdc_ops_source_avro_03",

"table.include.list": "oc.customers",

"tasks.max": "1",

"time.precision.mode": "connect",

"transforms": "unwrap",

"transforms.unwrap.drop.tombstones": "false",

"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

"value.converter": "io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url": "http://localhost:8081"

},

"name": "mysql_cdc_ops_source_avro_03",

"tasks": [],

"type": "source"

}

 

user01@ubuntu-20:~/kafka/data/kafka-logs$ show_topic_messages avro mysqlavro022.oc.customers

{

"customer_id": 1

}

{

"customer_id": 1,

"email_address": "test",

"full_name": "test"

}

user01@ubuntu-20:~/kafka$ http GET http://localhost:8081/schemas

{

"id": 23,

"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"mysqlavro022.oc.customers\",\"fields\":[{\"name\":\"customer_id\",\"type\":\"int\"}],\"connect.name\":\"mysqlavro022.oc.customers.Key\"}",

"subject": "mysqlavro022.oc.customers-key",

"version": 1

},

{

"id": 24,

"schema": "{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"mysqlavro022.oc.customers\",\"fields\":[{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"email_address\",\"type\":\"string\"},{\"name\":\"full_name\",\"type\":\"string\"}],\"connect.name\":\"mysqlavro022.oc.customers.Value\"}",

"subject": "mysqlavro022.oc.customers-value",

"version": 1

},

 

CREATE TABLE oc_customer (

customer_id int PRIMARY KEY,

email_address varchar,

full_name varchar

) WITH (

KAFKA_TOPIC = 'mysqlavro022.oc.customers',

KEY_FORMAT = 'AVRO',

VALUE_FORMAT = 'AVRO'

);

 

ksql> describe oc_customer extended;

Name : OC_CUSTOMER

Type : TABLE

Timestamp field : Not set - using <ROWTIME>

Key format : AVRO

Value format : AVRO

Kafka topic : mysqlavro022.oc.customers (partitions: 1, replication: 1)

Statement : CREATE TABLE OC_CUSTOMER (CUSTOMER_ID INTEGER PRIMARY KEY, EMAIL_ADDRESS STRING, FULL_NAME STRING) WITH (KAFKA_TOPIC='mysqlavro022.oc.customers', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');

Field | Type

------------------------------------------------

CUSTOMER_ID | INTEGER (primary key)

EMAIL_ADDRESS | VARCHAR(STRING)

FULL_NAME | VARCHAR(STRING)

------------------------------------------------

Local runtime statistics

------------------------

 

 

답변 1

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

Deserialized시 오류가 발생하는 것 같습니다만,

근데 ksql강의의 어떤 실습코드가 안되는 걸까요?

create table은 잘되었는데, select가 안되는 건가요?

어떤 강의 영상의 실습 코드가 안되는지 말씀해 주시겠습니까?

 

감사합니다.

 

김동선님의 프로필 이미지
김동선
질문자

안녕하십니까 다변 주셔서 감사합니다.

 

git 으로 제공 해주신 자료를 토대로 제 개발 PC 환경에 마춰 변경되어야 하는 IP,이름 등 이외 모두 복사 하여 사용 하였습니다.

 

우선 MySQL 에서 Table을 만들었습니다.

KafkaConnect 강의 GIt에서 Debezium Source MySql 에서 테이블 생성 정보로 만들었습니다.

CREATE TABLE customers (
customer_id int NOT NULL PRIMARY KEY,
email_address varchar(255) NOT NULL,
full_name varchar(255) NOT NULL
) ENGINE=InnoDB ;

 

Source Connect는 환경 설정 부분만 수정 하였습니다. MySql과 연결이 잘되었습니다.

{
    "name": "mysql_cdc_ops_source_avro_03",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.0.26",
        "database.port": "3306",
        "database.user": "cnt_dev",
        "database.password": "1234",
        "database.server.id": "31002",
        "database.server.name": "mysqlavro022",
        "database.include.list": "oc",
        "table.include.list": "oc.customers",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "schema-changes.mysql.oc",

        "time.precision.mode": "connect",
        "database.connectionTimezone": "Asia/Seoul",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schema.registry.url": "http://localhost:8081",


        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
    }
}

 

마지막으로 KsqlDB에 접속 하여 create table을 하면 잘 만들어 집니다.

CREATE TABLE oc_customer (
  customer_id int PRIMARY KEY,  
  email_address varchar,
  full_name varchar  
) WITH (	
	KAFKA_TOPIC = 'mysqlavro022.oc.customers',	
	KEY_FORMAT = 'AVRO',
	VALUE_FORMAT = 'AVRO'
);

이후 조회 명령어를 치면 error가 발생 합니다.

select * from oc_customer emit changes limit 5;
select * from oc_customer emit changes;
select * from oc_customer;

 

또 의문인 것은 쥬키퍼, 카프카, register, connector, ksql 순으로 키는데,

register 키는 과정에서 WARNING(위에 error) 발생 합니다. ksql 쿼리를 질이 하지 않았는데 발생 합니다. 무시 해도 되는 건지 잘 모르겠습니다.

 

Source connect t설정이 잘 못 된 걸까여?

 

 

 

 

권 철민님의 프로필 이미지
권 철민
지식공유자

음, warning은 걱정하지 않으셔도 될 것 같습니다.

mysqlavro022.oc.customers 토픽에 AVRO 타입으로 값이 들어가 있는지 아래와 같이 확인해 보시겠습니까?

 

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic mysqlavro022.oc.customers --property schema.registry.url=http://localhost:8081 --from-beginning

김동선님의 프로필 이미지
김동선
질문자

네 개발자님 말씀 주신명령어를 해보았습니다.

{"customer_id":1,"email_address":"test","full_name":"test"}
{"customer_id":2,"email_address":"test2","full_name":"test2"}
{"customer_id":3,"email_address":"test3","full_name":"test3"}
{"customer_id":4,"email_address":"test4","full_name":"test4"}

출력 모습은 JSON으로 보여 지는데, avro 타입인지 search을 진행 중입니다.

 

 의문점이 있습니다.

말씀해 주신 명령어 쿼리시 registry_start.sh 실행한 창에 밑에와 같은 log가 출력됩니다.

[2024-02-08 19:19:10,753] INFO 127.0.0.1 - - [08/Feb./2024:10:19:10 +0000] "GET /schemas/ids/28?fetchMaxId=false&subject=null-value HTTP/1.1" 200 301 "-" "Java/11.0.21" 38 (io.confluent.rest-utils.requests:62)

위 요청으로

http GET http://localhost:8081/schemas/ids/28?fetchMaxId=false&subject=null-value 명령어를 실행 하면

아래와 같은 결과 가 나옵니다.

{
    "schema": "{
		\"type\":\"record\",
		\"name\":\"Value\",
		\"namespace\":\"mysqlavro023.oc.customers\",
		\"fields\":
			[
				{\"name\":\"customer_id\",\"type\":\"int\"},
				{\"name\":\"email_address\",\"type\":\"string\"},
				{\"name\":\"full_name\",\"type\":\"string\"}
			],
		\"connect.name\":\"mysqlavro023.oc.customers.Value\"
	      }"
}

의문점은 Value 스키마 조회는 log로 보여 지는 데, Key 스키마 조회는 log로 출력 되지 않습니다.

http GET http://localhost:8081/schemas 명령어로 조회 해 보면 key 스키마도 존재 합니다.

혹시 Key 스키마는 원래 조회를 하지 않는 건가요? 아니면 설정이 따로 필요 한건가요?

 

권 철민님의 프로필 이미지
권 철민
지식공유자

혹시 ksql-server.properties 에 ksql.schema.registry.url 값을 설정하셨나요?

ksqldb에서 schema registry에 접속하지 못하는 것 같습니다만..

김동선님의 프로필 이미지
김동선
질문자

ksql-server.properties 설정 정보 입니다.

listeners=http://0.0.0.0:8088
ksql.logging.processing.topic.auto.create=true
ksql.logging.processing.stream.auto.create=true
bootstrap.servers=localhost:9092
compression.type=snappy
ksql.connect.url=http://localhost:8083
ksql.schema.registry.url=http://localhost:8081
ksql.streams.state.dir=/home/user01/kafka/data/kafka-streams

혹시 잘못된 부분이나, 노친 부분이 있을까여?

 

권 철민님의 프로필 이미지
권 철민
지식공유자

음, config 설정은 문제가 없어 보입니다만,

ksqldb 강의의 실습 코드들은 아무 문제 없이 수행되었나요?

권 철민님의 프로필 이미지
권 철민
지식공유자

오류 내용만 보면

Cannot deserialize type struct as type int32 for path

로 되어 있는데, schema registry에는 int32로 등록되어 있는데, 실제 topic의 값은 struct여서 오류가 발생하는 것 같습니다만, kafka-avro-console-consumer 로 출력한 것 보면 customer_id가 int값으로 되어 있어서 오류 내용이 앞뒤가 맞지 않는것 같습니다.

oc_customer를 새롭게 oc_customer_stream으로 해서 table이 아니라 stream으로 다시 한번 테스트 해보시겠습니까? 토픽 이름도 새롭게 생성하고, 다시 구성해서 테스트 해보면 어떨까 싶습니다.

김동선님의 프로필 이미지
김동선
질문자

안녕하세요 개발자님

새해복 많이 받으세요 ㅎㅎ,

친절한 답변 감사합니다. 꼼꼼하게 챙겨 주셔거 다시한번 감동 받았습니다. 저두 멋진 개발자로 성장 해야겠다 다시한번 마음을 잡게 되었습니다. 감사합니다.

 

말씀주신 ksql 강의에서 제공 받은 dgen_shoe은 정상 작동 되었습니다.

또 새로운 topic을 만들어 stream 생성 작업은 여러 차례 시도 해 보았지만, 같은 오류가 발생 하였습니다.

강의 자료와 제가 하는 실습의 어떤차이가 있는지 비교 해 보았습니다.

 

kafka-avro-console-consumer 의 결과 구조의 차이는 없는 것으로 보여 집니다.

// dgen_shoe
{"order_id":1905,"product_id":"9952c67e-982a-4fb3-a06f-ebf10a8c36c8","customer_id":"a8602711-84d1-4306-812a-f86c7e0ae70b","ts":1609549700000}
// customers
{"customer_id":1,"email_address":"test","full_name":"test"}

 

스키마 구조를 보았습니다.

// dgen_shoe
    {
        "id": 37,
        "schema": "\"int\"",
        "subject": "shoe_orders_avro-key",
        "version": 1
    },
    {
        "id": 38,
        "schema": "                         {\"type\":\"record\",\"name\":\"shoe_orders\",\"namespace\":\"shoes\",
         \"fields\":[
{\"name\":\"order_id\",\"type\":\"int\"},                                   {\"name\":\"product_id\",\"type\":\"string\"},{\"name\":\"customer_id\",\"type\":\"string\"},
{\"name\":\"ts\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-      millis\"}}
],
\"connect.name\":\"shoes.shoe_orders\"}",
        "subject": "shoe_orders_avro-value",
        "version": 1
    },
// customers
    {
        "id": 43,
        "schema": "        
           {\"type\":\"record\",
            \"name\":\"Key\",
            \"namespace\":\"mysqlavro028.oc.customers\",
            \"fields\":[
                    {\"name\":\"customer_id\",\"type\":\"int\"}
              ],
              \"connect.name\":\"mysqlavro028.oc.customers.Key\"}",
               "subject": "mysqlavro028.oc.customers-key",
               "version": 1
    },
    {
        "id": 44,
        "schema": "{\"type\":\"record\",
                    \"name\":\"Value\",
                    \"namespace\":\"mysqlavro028.oc.customers\",
                    \"fields\":[
{\"name\":\"customer_id\",\"type\":\"int\"},           {\"name\":\"email_address\",\"type\":\"string\"},     {\"name\":\"full_name\",\"type\":\"string\"}
],\"connect.name\":\"mysqlavro028.oc.customers.Value\"}",
        "subject": "mysqlavro028.oc.customers-value",
        "version": 1
    },

스키마 구조는 차이가 있습니다.

key 구조의 차이가 있는데, 이부분이 변경 해야된느 부분인지는 의문이 듭니다.

key에도 자세한 스키마 구조가 포함 되어야 한다고 생각은 하고 있는데, ksql에서 오류가 나는 원인이 아닐까 생각이 듭니다. 키 스키마를 변경 해서 사용 하는게 맞을까요? 맞으면 수정은 어떻게 진행 하는 건가요? 또 수정 한다면 sink connector에서는 key 스키마 참조 하여 사용 해도 문제가 없는 건가요?

권 철민님의 프로필 이미지
권 철민
지식공유자

제가 지금 고향에 내려와서, 지금은 좀 그렇고, 다시 집에 올라가서 직접 재현을 해보고 말씀 드려야 할 것 같습니다. 1~2 일 정도 더 걸릴것 같습니다.

김동선님의 프로필 이미지
김동선
질문자

편하 실때 답변 주세요. 감사합니다 저두 열심 히 칮아 보겟습니다

권 철민님의 프로필 이미지
권 철민
지식공유자

말씀하신대로 key 타입이 문제군요.

Debezium Source Connector 는 단일 PK라도 여러 컬럼을 가정해서 기본적으로 Primitive type이 아닌 Struct Type으로 key type이 정해집니다. 예를 들어 customer_id가 pk라고 하더라도 key type은 {customer_id} 와 같은 struct type으로 스키마가 정해 줍니다.

그런데 ksqldb는 customer_id int key(또는 primary key)와 같이 정해 주면 실제 value는 struct type으로 값이 오는데, 이를 ksqldb에서 deserialize 할때 primitive int 형으로 변환시 오류가 발생합니다.

제 생각엔 debezium source connector는 pk를 struct type으로 설정하되, ksqldb에서 아래와 같이 struct type으로 인지할 수 있도록 해야 할 것 같습니다.

CREATE STREAM oc_customer_stream (

customer_id struct<customer_id int> PRIMARY KEY,

email_address varchar,

full_name varchar

) WITH (

KAFKA_TOPIC = 'mysqlavro022.oc.customers',

KEY_FORMAT = 'AVRO',

VALUE_FORMAT = 'AVRO'

);

이렇게 생성하고 select * from oc_customer_stream 을 하게 되면 customer_id가 struct type으로 값이 나오게 됩니다. customer_id를 primitive type 으로 나오게 하려면 아래와 같이 하면 됩니다.

select customer_id->customer_id, email_address, fullname from oc_customer_stream;

 

김동선님의 프로필 이미지
김동선
질문자

안녕하세요 개발자님 답변 주신 방법으로 문제를 해결 해 보았습니다.

 

 

기존 의 테이블 정보에 price 가격만 추가한 새로운 테이블을 만들고 group by를 해보고자 하였습니다.

일러주신 방법으로 Stream을 만들고 참조하여 CTAS를 만들어 보았습니다.

CTAS 조회시 아래와 같은 error가 발생 하였습니다.

Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`CUSTOMER_ID` STRUCT<`CUSTOMER_ID` BIGINT> KEY], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'
Caused by: The 'KAFKA' format does not support type 'STRUCT'

 

Key 타입을 Json으로 변경후 Sink Connector에서 STM를 적용 해 보았지만 Erro만 발생 하였습니다.

+----------------------------------+-----+
|CUSTOMER_ID                       |AVG_PRICE|
+----------------------------------+-----+
|{CUSTOMER_ID=4}                   |400.0    |

 

extract_key 적용 해 보았습니다.

converter를 적용도 해보고, 제외도 해보았습니다. 해결 하지 못하엿습니다.

"transforms": "extract_key",      
        "transforms.extract_key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extract_key.field": "customer_id"

 

CREATE STREAM oc_customer_price_stream (
  customer_id struct<customer_id bigint> KEY,
  email_address varchar,
  full_name varchar,
  price int
) WITH (
  KAFKA_TOPIC = 'customers_price.oc.customers_price',	
  KEY_FORMAT = 'AVRO',
  VALUE_FORMAT = 'AVRO'
);

// customers_price.oc.customers_price의 topic도 partition을 3 개로 마췄습니다.

create table oc_customer_price_table_mv03
with (
	KAFKA_TOPIC = 'customers_price_mv_03',
	KEY_FORMAT = 'JSON', 
	VALUE_FORMAT = 'JSON',
	PARTITIONS = 3
)
as
select customer_id, avg(price) as avg_price
from oc_customer_price_stream group by customer_id;


{
    "name": "mysql_jdbc_om_sink_customers_price_sink_07",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers_price_mv_03",
        "connection.url": "jdbc:mysql://192.168.0.38/oc_sink",
        "connection.user": "cnt_dev_sink",
        "connection.password": "1234",
        "table.name.format": "oc_sink.sink_customers_price",
        "insert.mode": "upsert",
        "pk.fields": "customer_id",
        "pk.mode": "record_key",
        "delete.enabled": "true",

        "auto.evolve": "false",        
        
		
	"transforms": "extract_key",        
        "transforms.extract_key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extract_key.field": "customer_id"
    }
}

 

ksqldb 사용시 Debezium Source Connector에서 특별히 설정을 해야 하는 것이 있나요?

아니면 위에 같은 error를 어떻게 해결 해야 하나요?

수동으로 키 스키마를 강의에서 제공해 주는 dgen_shoe 형태로 변경 하는 방법을 사용 해도 되는 건가요?

 

권 철민님의 프로필 이미지
권 철민
지식공유자

MVIEW Group by CTAS 에서 Group by 컬럼이 Struct type이면 KEY_FORMAT이 'KAFKA' 는 안됩니다. 'KAFKA'는 Group by 컬럼이 primitive type이 되어야 합니다.

두가지 방법이 있는데, KEY_FORMAT을 KAFKA로 하려면 아래와 같이 customer_id->customer_id 와 같이 변경을 해주면 됩니다.

create table customer_stream_mview_primitive

with (

KAFKA_TOPIC = 'customer_stream_mview01',

KEY_FORMAT = 'KAFKA',

VALUE_FORMAT = 'JSON',

PARTITIONS = 1

)

as

select customer_id->customer_id as customer_id, COUNT(*) as cnt

from oc_customer_stream group by customer_id->customer_id;

 

KEY_FORMAT을 JSON으로 하면 아래와 같이 수행할 수도 있습니다.

create table customer_stream_mview_struct

with (

KAFKA_TOPIC = 'customer_stream_mview02',

KEY_FORMAT = 'JSON',

VALUE_FORMAT = 'JSON',

PARTITIONS = 1

)

as

select customer_id, COUNT(*) as avg_point

from oc_customer_stream group by customer_id;

김동선님의 프로필 이미지
김동선
질문자

안녕하세요 개발자님 , 답변 주셔서 해결 할 수 있었습니다. 감사합니다.

김동선님의 프로필 이미지
김동선

작성한 질문수

질문하기