인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

행복한 족제비님의 프로필 이미지
행복한 족제비

작성한 질문수

카프카 완벽 가이드 - ksqlDB

table의 데이터가 실시간으로 topic에 담기지 않습니다

작성

·

126

0

mysql에 debezium source connector로 topic에 가져온 데이터를 받는 stream을 만들고

그 stream을 기반으로 하여

CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' , KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=1) AS
> SELECT
> order_id -> order_id AS  order_id,
> TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')) AS last_log_time
> FROM orders
> GROUP BY order_id -> order_id
> HAVING ((UNIX_TIMESTAMP(CONVERT_TZ(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'UTC', 'Asia/Seoul')) - UNIX_TIMESTAMP(TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')))) / 1000 > 600)
> EMIT CHANGES;

이런식으로 id별로 마지막 로그 시간이 오고 10분 이상이 지나면 table에 담기도록 만들었습니다

처음에 이미 10분이 지난 데이터를 넣으면 table에도 들어가고 topic에도 잘 들어가는데

현재시간의 데이터를 넣고 10분이 지나면 table에는 들어가는데 topic에는 들어가지 않습니다

table에도 담기고 topic에도 담기려면 어떻게 해야하나요? 아니면 원래 불가능한건가요?
기반한 stream은 데이터를 넣으면 곧 바로 stream과 토픽에 잘 들어갑니다.

|ORDER_ID                                  |CALCULATED_TIME                           |LAST_LOG_TIME                             |
+------------------------------------------+------------------------------------------+------------------------------------------+
|1                                         |78088                                     |2024-06-16T12:30:00.000                   |
|2                                         |69988                                     |2024-06-16T14:45:00.000                   |
|3                                         |72088                                     |2024-06-16T14:10:00.000                   |
|4                                         |32739088                                  |2023-06-04T12:00:00.000                   |
|5                                         |32637088                                  |2023-06-05T16:20:00.000                   |
|6                                         |32567788                                  |2023-06-06T11:35:00.000                   |
|7                                         |69058                                     |2024-06-16T15:00:30.000                   |
|8                                         |68698                                     |2024-06-16T15:06:30.000                   |
|9                                         |66958                                     |2024-06-16T15:35:30.000                   |
|10                                        |65698                                     |2024-06-16T15:56:30.000                   |
|11                                        |66298                                     |2024-06-16T15:46:30.000                   |
|12                                        |4258                                      |2024-06-17T09:00:30.000                   |
|13                                        |3418                                      |2024-06-17T09:14:30.000                   |
|14                                        |1918                                      |2024-06-17T09:39:30.000                   |
|15                                        |2429                                      |2024-06-17T09:30:59.000                   |
Query terminated
ksql> print result7777;
Key format: AVRO or KAFKA_STRING
Value format: AVRO
rowtime: 2024/06/16 04:23:23.878 Z, key: 1, value: {"CALCULATED_TIME": 12183, "LAST_LOG_TIME": 1718541000000}, partition: 0
rowtime: 2024/06/16 04:23:23.879 Z, key: 2, value: {"CALCULATED_TIME": 4083, "LAST_LOG_TIME": 1718549100000}, partition: 0
rowtime: 2024/06/16 05:10:08.498 Z, key: 3, value: {"CALCULATED_TIME": 6183, "LAST_LOG_TIME": 1718547000000}, partition: 0
rowtime: 2024/06/16 06:06:52.365 Z, key: 4, value: {"CALCULATED_TIME": 32673183, "LAST_LOG_TIME": 1685880000000}, partition: 0
rowtime: 2024/06/16 06:06:52.373 Z, key: 5, value: {"CALCULATED_TIME": 32571183, "LAST_LOG_TIME": 1685982000000}, partition: 0
rowtime: 2024/06/16 06:06:52.377 Z, key: 6, value: {"CALCULATED_TIME": 32501883, "LAST_LOG_TIME": 1686051300000}, partition: 0
rowtime: 2024/06/16 06:09:36.530 Z, key: 7, value: {"CALCULATED_TIME": 3153, "LAST_LOG_TIME": 1718550030000}, partition: 0
rowtime: 2024/06/16 06:15:08.351 Z, key: 8, value: {"CALCULATED_TIME": 2793, "LAST_LOG_TIME": 1718550390000}, partition: 0
rowtime: 2024/06/16 06:41:28.920 Z, key: 9, value: {"CALCULATED_TIME": 1053, "LAST_LOG_TIME": 1718552130000}, partition: 0
rowtime: 2024/06/17 00:23:09.442 Z, key: 12, value: {"CALCULATED_TIME": 1372, "LAST_LOG_TIME": 1718614830000}, partition: 0

1-9, 12 이미 10분이 지난 데이터 // 그 외 = 데이터가 mysql에 담기고 10분이 지나 table에 담긴 데이터

답변 1

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

먼저 질문 내용을 명확히 하고 싶습니다.

  1. mysql에 debezium source connector로 topic에 가져온 데이터를 받는 stream을 만들고

그 stream을 기반으로 하여

=> 그 stream 명이 orders 인가요? 일단 orders 라고 가정하고

  1. 처음에 이미 10분이 지난 데이터를 넣으면 table에도 들어가고 topic에도 잘 들어가는데

    현재시간의 데이터를 넣고 10분이 지나면 table에는 들어가는데 topic에는 들어가지 않습니다

=> CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' ,....)

로 Table명 timeout을 만드셨는데, KAFKA_TOPIC 명도 timeout 으로 만드셨습니다. 이 topic에 데이터가 안들어 간다는 건가요? 헷갈릴 수 있으니까 일단 kafka_topic명을 다른걸로 변경해 보시고 이 topic에 데이터가 들어가는지 확인해 보십시요.

그런데 위와 같이 하면 해당 topic에 데이터가 들어가야 합니다. table에 데이터가 만들어지면서 동시에 kafka_topic에 write가 되어야 보여지게 됩니다. 다시 한번 확인 부탁드립니다.

그리고 두번째 캡처 이미지에 print result7777 이라고 되어 있는데, 이건 어떤 topic인건지요?

위 내용 다시 확인 부탁드립니다.

 

감사합니다.

 

죄송합니다 급하게 글을 쓰느라 정확하지 못하였습니다.
다시 정리하자면 source connector로 받아서 넣은 원본 stream은 raw_orders입니다.

select * from raw_orders;
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|ORDER_ID            |ORDER_DATETIME      |CUSTOMER_ID         |ORDER_STATUS        |PRICE               |STORE_ID            |
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{ORDER_ID=1}        |2024-06-17T00:00:00Z|1001                |Completed           |59.99               |1                   |
|{ORDER_ID=2}        |2024-06-17T01:15:00Z|1002                |Pending             |29.99               |2                   |
|{ORDER_ID=3}        |2024-06-17T02:30:00Z|1003                |Cancelled           |19.99               |3                   |
|{ORDER_ID=4}        |2024-06-17T06:30:00Z|1004                |Cancelled           |190.99              |3                   |
|{ORDER_ID=5}        |2024-06-16T06:30:00Z|1004                |Cancelled           |190.99              |3                   |
|{ORDER_ID=6}        |2024-06-16T06:30:00Z|1006                |yes                 |1190.99             |6                   |
|{ORDER_ID=7}        |2024-06-18T00:20:30Z|1007                |yes                 |1190.99             |7                   |

총 7개의 데이터를 넣었습니다.

이 stream을 기반으로 하여 10분이 지나면 table로 가져오게끔 만들었습니다

CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' , KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=1) AS
> SELECT
> order_id -> order_id AS  order_id,
> TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')) AS last_log_time
> FROM raw_orders
> GROUP BY order_id -> order_id
> HAVING ((UNIX_TIMESTAMP(CONVERT_TZ(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'UTC', 'Asia/Seoul')) - UNIX_TIMESTAMP(TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')))) / 1000 > 600)
> EMIT CHANGES;

1-6번까지의 데이터는 이미 설정해둔 시간이 지나서 table에는 담겨있습니다.

 select * from timeout;
+----------------------------------------------------------------+----------------------------------------------------------------+
|ORDER_ID                                                        |LAST_LOG_TIME                                                   |
+----------------------------------------------------------------+----------------------------------------------------------------+
|1                                                               |2024-06-17T09:00:00.000                                         |
|2                                                               |2024-06-17T10:15:00.000                                         |
|3                                                               |2024-06-17T11:30:00.000                                         |
|4                                                               |2024-06-17T15:30:00.000                                         |
|5                                                               |2024-06-16T15:30:00.000                                         |
|6                                                               |2024-06-16T15:30:00.000                                         |

하지만 7번 데이터가 들어오고 10분이 지나서 table에는 7번 데이터가 담기는데
topic에는 담기지 않습니다.

select * from timeout;
+----------------------------------------------------------------+----------------------------------------------------------------+
|ORDER_ID                                                        |LAST_LOG_TIME                                                   |
+----------------------------------------------------------------+----------------------------------------------------------------+
|1                                                               |2024-06-17T09:00:00.000                                         |
|2                                                               |2024-06-17T10:15:00.000                                         |
|3                                                               |2024-06-17T11:30:00.000                                         |
|4                                                               |2024-06-17T15:30:00.000                                         |
|5                                                               |2024-06-16T15:30:00.000                                         |
|6                                                               |2024-06-16T15:30:00.000                                         |
|7                                                               |2024-06-18T09:20:30.000                                         |
Query terminated
ksql> print timeout;
Key format: AVRO or KAFKA_STRING
Value format: AVRO
rowtime: 2024/06/17 01:51:02.907 Z, key: 1, value: {"LAST_LOG_TIME": 1718614800000}, partition: 0
rowtime: 2024/06/17 01:51:02.922 Z, key: 2, value: {"LAST_LOG_TIME": 1718619300000}, partition: 0
rowtime: 2024/06/17 02:19:38.295 Z, key: 5, value: {"LAST_LOG_TIME": 1718551800000}, partition: 0
rowtime: 2024/06/18 00:24:06.884 Z, key: 6, value: {"LAST_LOG_TIME": 1718551800000}, partition: 0
^CTopic printing ceased

참고로 3,4번도 같은 방식으로 진행했지만 담기지 않았습니다.

어떻게하면 좋을까요?

권 철민님의 프로필 이미지
권 철민
지식공유자

원본 stream 명은 raw_orders인데, 아래 CTAS를 보면 SELECT를 orders에서 하는데, orders는 어떤 거인지요?

Create table timeout

as

select ...

from orders

죄송합니다 오타가 있었습니다 raw_orders에서 받아온게 맞습니다

질문글에 오타입니다. 진행은 raw_orders로 했습니다. 시간이 지나고 table에는 담기지만
topic에는 담기지않습니다..

권 철민님의 프로필 이미지
권 철민
지식공유자

음, 이건 원인을 정확히 모르겠군요.

먼저 아래와 같이

  1. 토픽명을 바꿔 보셨나요? 헷갈릴수 있으니 토픽명을 일단 변경해 보시지요. 그리고

  1. CTAS에서 Having절을 삭제해 보시고, 다시 한번 테스트 해보시지요.

  2. 1번이 마찬가지면 일단 consumer로 데이터를 확인해 보시지요.

     

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 토픽명 --from-beginning

행복한 족제비님의 프로필 이미지
행복한 족제비

작성한 질문수

질문하기