묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결카프카 완벽 가이드 - ksqlDB
스트림, 테이블 생성시 데이터 관련 문의
스트림 혹은 테이블 생성 하는 시점부터 토픽의 데이터를 스트림, 테이블에 인입 시키는건가요? 혹은 생성시 토픽에 적재된 모든 데이터에 대해 인입 시키는건가요? 스트림, 테이블 생성 시점에 오프셋이나, earliest, latest 와 같은 옵션으로 데이터를 인입 시키도록 할 수 있을까요?
-
미해결카프카 완벽 가이드 - ksqlDB
푸시 쿼리 종료 방법에 대해 문의 드립니다.
푸시 쿼리의 경우 종료 시그널을 주지않으면 쿼리가 종료 되지 않는다고 하셨는데요.동일한 쿼리를 실행중인 인스턴스가 여러대 일 경우 api 를 사용하여 쿼리가 종료 되도록 하려면 각 인스턴스들에 각각 호출 해야 하는건가요?그리고 스트림을 드롭하는 형태로도 종료가 가능 할까요?
-
미해결카프카 완벽 가이드 - ksqlDB
table의 데이터가 실시간으로 topic에 담기지 않습니다
mysql에 debezium source connector로 topic에 가져온 데이터를 받는 stream을 만들고그 stream을 기반으로 하여CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' , KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=1) AS > SELECT > order_id -> order_id AS order_id, > TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')) AS last_log_time > FROM orders > GROUP BY order_id -> order_id > HAVING ((UNIX_TIMESTAMP(CONVERT_TZ(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'UTC', 'Asia/Seoul')) - UNIX_TIMESTAMP(TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')))) / 1000 > 600) > EMIT CHANGES;이런식으로 id별로 마지막 로그 시간이 오고 10분 이상이 지나면 table에 담기도록 만들었습니다처음에 이미 10분이 지난 데이터를 넣으면 table에도 들어가고 topic에도 잘 들어가는데현재시간의 데이터를 넣고 10분이 지나면 table에는 들어가는데 topic에는 들어가지 않습니다table에도 담기고 topic에도 담기려면 어떻게 해야하나요? 아니면 원래 불가능한건가요?기반한 stream은 데이터를 넣으면 곧 바로 stream과 토픽에 잘 들어갑니다.|ORDER_ID |CALCULATED_TIME |LAST_LOG_TIME | +------------------------------------------+------------------------------------------+------------------------------------------+ |1 |78088 |2024-06-16T12:30:00.000 | |2 |69988 |2024-06-16T14:45:00.000 | |3 |72088 |2024-06-16T14:10:00.000 | |4 |32739088 |2023-06-04T12:00:00.000 | |5 |32637088 |2023-06-05T16:20:00.000 | |6 |32567788 |2023-06-06T11:35:00.000 | |7 |69058 |2024-06-16T15:00:30.000 | |8 |68698 |2024-06-16T15:06:30.000 | |9 |66958 |2024-06-16T15:35:30.000 | |10 |65698 |2024-06-16T15:56:30.000 | |11 |66298 |2024-06-16T15:46:30.000 | |12 |4258 |2024-06-17T09:00:30.000 | |13 |3418 |2024-06-17T09:14:30.000 | |14 |1918 |2024-06-17T09:39:30.000 | |15 |2429 |2024-06-17T09:30:59.000 | Query terminated ksql> print result7777; Key format: AVRO or KAFKA_STRING Value format: AVRO rowtime: 2024/06/16 04:23:23.878 Z, key: 1, value: {"CALCULATED_TIME": 12183, "LAST_LOG_TIME": 1718541000000}, partition: 0 rowtime: 2024/06/16 04:23:23.879 Z, key: 2, value: {"CALCULATED_TIME": 4083, "LAST_LOG_TIME": 1718549100000}, partition: 0 rowtime: 2024/06/16 05:10:08.498 Z, key: 3, value: {"CALCULATED_TIME": 6183, "LAST_LOG_TIME": 1718547000000}, partition: 0 rowtime: 2024/06/16 06:06:52.365 Z, key: 4, value: {"CALCULATED_TIME": 32673183, "LAST_LOG_TIME": 1685880000000}, partition: 0 rowtime: 2024/06/16 06:06:52.373 Z, key: 5, value: {"CALCULATED_TIME": 32571183, "LAST_LOG_TIME": 1685982000000}, partition: 0 rowtime: 2024/06/16 06:06:52.377 Z, key: 6, value: {"CALCULATED_TIME": 32501883, "LAST_LOG_TIME": 1686051300000}, partition: 0 rowtime: 2024/06/16 06:09:36.530 Z, key: 7, value: {"CALCULATED_TIME": 3153, "LAST_LOG_TIME": 1718550030000}, partition: 0 rowtime: 2024/06/16 06:15:08.351 Z, key: 8, value: {"CALCULATED_TIME": 2793, "LAST_LOG_TIME": 1718550390000}, partition: 0 rowtime: 2024/06/16 06:41:28.920 Z, key: 9, value: {"CALCULATED_TIME": 1053, "LAST_LOG_TIME": 1718552130000}, partition: 0 rowtime: 2024/06/17 00:23:09.442 Z, key: 12, value: {"CALCULATED_TIME": 1372, "LAST_LOG_TIME": 1718614830000}, partition: 01-9, 12 이미 10분이 지난 데이터 // 그 외 = 데이터가 mysql에 담기고 10분이 지나 table에 담긴 데이터
-
해결됨카프카 완벽 가이드 - ksqlDB
debezium에서 ksqldb로
제가 구상하고있는 구조가 mysql에서 debezium source connector가 topic에 넘기고 ksqldb의 streams나 table로 재구성하여 다른 topic으로 넘긴 후mysql sink database에서 받는다. 라는걸 구상중인데요 ksqldb에서 직접 insert를 하면 json 형식이 아니라서 sink connector가 읽지 못하는거 같습니다.-- debezium.json --{ "name": "debezium", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "1234", "database.allowPublicKeyRetrieval": "true", "database.server.id": "10777", "database.server.name": "debe01", "database.include.list": "debe", "table.include.list": "debe.user", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema-changesde.mysql.oc", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "database.connectionTimeZone": "Asia/Seoul", "time.precision.mode": "connect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false" } } 그래서 debezium으로 mysql의 data를 읽어서 topic으로 가져왔는데ksqldb에서 그 data를 읽는 부분에서 막혔습니다강의에 나온거처럼 ksqldb와 debezium을 연동을 해야 가능한건가요?아니면 어떤 방법이 있을까요?
-
해결됨카프카 완벽 가이드 - ksqlDB
CLI로 실행과 코드로 실행하면 결과가 다르게 나옵니다
CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS INT) AS new_column FROM test_stream EMIT CHANGES;이렇게 기존 test_stream에서 column을 추가한 add_stream을 만들려고 CLI문을 실행시키면원래 test_stream에 담겨있는 data가 담아져서 나오는데package com.example.service; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; import io.confluent.ksql.api.client.ExecuteStatementResult; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutionException; @Service public class streamPracticeAdd { @Value("${ksqldb.server.host}") private String ksqlDbHost; private int ksqlDbPort; private Client client; @PostConstruct public void init() { ClientOptions options = ClientOptions.create() .setHost(ksqlDbHost) .setPort(ksqlDbPort); client = Client.create(options); } public void streamsAdd(String columnName, String dataType) { String createStreamKsql = "CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS " + dataType + ") AS " + columnName + " FROM test_stream EMIT CHANGES;"; try { ExecuteStatementResult result = client.executeStatement(createStreamKsql).get(); System.out.println("Stream created and data inserted into new topic: " + result.queryId()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }/kafka/addColumn/new_column/INT 인 API 요청을 줘서 새 stream을 만드는 코드인데실행시키면 기존 column에 새 column까지 추가는 되는데 기존 data가 하나도 들어오지 않습니다.검색을 해봤는데도 잘 안나와서 질문 남깁니다 감사합니다.
-
미해결카프카 완벽 가이드 - ksqlDB
ksqldb는 workbench처럼 ui는 없을까요?
ksql강의를 수강하면서..cli로만 명령을 수행하시는걸 보고 ksql ui가 존재하는지 찾아보았는데! 찾지못한건지 없는건지 확실하지않아 질문을 남깁니다.
-
미해결카프카 완벽 가이드 - ksqlDB
Pull 쿼리 제약에 대한 이유
안녕하세요,강의 내용 중 table에 대해서 source를 topic으로 하는 경우 pull 쿼리가 불가능 한 이유에 대해 강사님께서 구체적으로 확인이 되지 않았지만 rocksdb 이슈로 추정된다라고 말씀하신 것 같은데요, 혹시 아래의 이유가 아닐 지 확인 부탁드립니다. https://docs.ksqldb.io/en/latest/concepts/materialized-views/In ksqlDB, a table can be materialized into a view or not. If a table is created directly on top of a Kafka topic, it's not materialized. Non-materialized tables can't be queried, because they would be highly inefficient. On the other hand, if a table is derived from another collection, ksqlDB materializes its results, and you can make queries against it. document에 따르면 source가 topic인 table의 경우는 mview가 아니라고 하는 것 같습니다.(mview가 아니기 때문에 비효율적이다.) 그렇다면 이 경우는 rocksdb를 통해 stateful한 결과를 저장한 뒤 가져오는 것이 아닌 토픽으로부터 전체 레코드를 읽어와서 compact한 처리를 하는 케이스라고 볼 수 있을 것 같습니다. 즉, source가 topic인 케이스는 오히려 rocksdb를 사용하지 않는 케이스이기 때문에 그런 것이 아닐까 추측이 되는데요 어떻게 생각하시는지 궁금합니다.
-
해결됨카프카 완벽 가이드 - ksqlDB
IoT Event Streaming 적용에 대해서
안녕하세요. IoT Device들의 데이터를 받아서 값들이 조건을 만족하면 이벤트를 발생시키는 개발업무에 있습니다. 그래서 streaming 분석솔루션을 찾던도중 ksqlDB관련을 접하게되었습니다. 강의를 간단하게 훑어보고 ksqlDB 솔루션을 적용해보고싶은데 궁금한점이있어서 질문 드립니다. IoT Device의 개수가 많을 경우에 각각의 Device의 걸어주어야하는 조건이 다양해질수있습니다.만약에,A device의 조건은 해당 값이 10이 넘어가면 이벤트 발생 그리고 B device의 조건은 값이 15가 넘어가면 이벤트발생 이런식으로 구성될수있습니다.각 Device마다 고유한 조건이 있기때문에 많은 stream이 생성될수있습니다.또한 E Device에서 발생한 이벤트 와 J Device에서 발생한 이벤트를 조인해서 새로운 stream을 만들어주어야할때도있습니다. 디바이스가 많고 이벤트처리 흐름이 복잡해질수록 stream,table,Mview가 많이 생길수 있기때문에 성능에대한 우려가 있습니다.물론 H/W나 traffic, SQL문의 복잡도,네트워크구성등 성능에 미치는 요인들이 많기때문에 성능이 어떻다는 확실히 답변하기 어려울것같긴합니다. 결론은 데이터스트리밍 분석을 위해 많은 stream,table,Mview 작성해도 성능 이슈가 크게 발생하는지(몇백 혹은 몇천개단위). 만약 성능이 부족하다면 kafka를 scale out,up하면 나아질수있는지도 궁금합니다. 강의를 전체적으로 훑는식으로봐서 자세히 듣지못해 틀린부분이 있을수있지만, 부족한점 참고하여 봐주시면 감사하겠습니다.
-
미해결카프카 완벽 가이드 - ksqlDB
[수정요청] Join이해 중 select inner join a.user_id 수정 필요
안녕하세요.. 수강 하면서 하다 보니, 본의 아니게 수정 사항이 보입니다.KSQLDB/Join 이해 중에 select절에서 a.id 가 아닌 , a.user_id 로 수정 해야 될거 같습니다.
-
해결됨카프카 완벽 가이드 - ksqlDB
실무에서 카프카 환경 구축
안녕하세요,문득 실무에서 카프카 클러스터 환경 구축에 대해 궁금한 점이 생겼습니다. 강사님께서 진행하신 방법은 confluent의 카프카 프로젝트를 하나의 노드에 직접 다운받아서 주키퍼, 브로커 기동도 하고 ksql-cli 실행도 하신 것 같은데요,실무 환경에서는 클러스터 구축 노드가 주키퍼 클러스터를 이루는 노드들 및 브로커 등 여러 가지 일텐데요, 이런 경우는 어떻게 진행하는게 일반적일까요? 강의에서 사용한 동일한 플랫폼용 카프카 프로젝트를 ansible 등으로 설치 지원이 되는지 궁금합니다. 추가로 주키퍼나 브로커 노드들은 그 기능만 수행하고 ksql-cli나 schema-registry 혹은 connect 용으로 노드를 별도로 두고 사용하는 것이 일반적인지 궁금합니다.
-
미해결카프카 완벽 가이드 - ksqlDB
[수정요청] Mview CSAS 강좌중에 Insert문장 수정 요청
안녕하세요. 강좌를 수강 하다가, 강의 내용중 Mview CSAS 에서, customer_activeity_stream insert 문장이 강의 내용과 다르고, 칼럼이 달라서 에러 발생 합니다. activety_id 없습니다. 감사 합니다. INSERT INTO customer_activity_stream (customer_id, activity_id, activity_type, activity_point) VALUES (2, 10,'mobile_open',0.65); INSERT INTO customer_activity_stream (customer_id, activity_id, activity_type, activity_point) VALUES (4, 3, 'deposit', 0.35);
-
미해결카프카 완벽 가이드 - ksqlDB
inner join , outer join
안녕하세요, join query를 보면left join이라고 작성한 경우 outer join으로 적용되고,그냥 join은 inner join으로 적용되는 것 같은데 ksqldb의 기본 join은 무엇인가요?
-
미해결카프카 완벽 가이드 - ksqlDB
group by 리파티션에 대한 질문
안녕하세요, 만약 group by의 경우 일반 컬럼을 지정해서 리파티션이 되었다면 해당 source topic 레코드를 조회하면 그 때부터 key값에 group by로 지정한 일반 컬럼이 key로 나오게 되는건가요? 아니면 group by 만을 위한 리파티션 수행 내부 토픽이 별도로 생성되는거고 source 토픽은 그냥 유지되는건가요?
-
미해결카프카 완벽 가이드 - ksqlDB
ksqldb timestamp 타입 질문
안녕하세요, ksqldb에는 RDBMS와 달리 datetime 타입이 따로 없는 것 같은데요, timestamp 타입은 타임존이 반영된 타입인건가요?
-
미해결카프카 완벽 가이드 - ksqlDB
stream format 관련 질문
안녕하세요, stream의 key.format, value.format 관련 질문있습니다.이 두 옵션이 직렬화에 대한 옵션이라고 하셨는데, select 시에는 consumer가 동작하고 insert에는 producer가 동작한다고 하셨던 것과 조금 혼동이 되어서 질문드립니다. insert 시에 stream옵션으로 직렬화가 되면 producer의 직렬화 옵션은 어떻게 되는건가요?그리고 select시 에는 topic 메시지를 consumer가 역직렬화해서 가져온 것을 다시 직렬화 하는건가요? stream의 직렬화 시점에 대해 조금 이해가 되지 않습니다. 감사합니다.
-
미해결카프카 완벽 가이드 - ksqlDB
전통적 분석 시스템 한계에 대해 질문있습니다.
안녕하세요, 실시간 분석 시스템 아키텍처에 ksqlDB 사용 명분(?)을 좀 더 확실히 하고 싶어서 질문드립니다. 제가 이해한 것은 전통적 분석 시스템은 운영 DB 부하로 분석 시스템을 직접 붙일 수 없고, DW/Batch을 분석용으로 따로 두는 것으로 이해했습니다.운영계에서 DW로 데이터를 전송하는 주기가 하루 주기인 것도 마찬가지로 운영계 I/O 부하 문제인걸까요? 또한 실시간 분석 시스템의 경우 CDC를 통해 일단위 데이터 전송에서 실시간으로 전송이 가능한 것으로 보이는데 이것은 redo dump file 전송은 DB에 직접적인 부하를 주지 않기 때문에 가능한 것인가요? 마지막으로 CDC 기반으로 실시간 데이터 전송을 했을 때 타겟 DBMS가 좋은 퍼포먼스를 가져야 함은 실시간 데이터에 대한 부하를 견딜 수 있어야 하기 때문인건가요? 질문이 많네요.. 늘 좋은 강의 감사드립니다.
-
미해결카프카 완벽 가이드 - ksqlDB
AWS 에서 confluent kafka 와 apache kafka 차이가 궁금 합니다.
안녕하세요. 저는 1년차 개발자 입니다.현재는 회사를 그만 두고 공부를 마치고 취업 준비를 하고 있습니다. 권철민 개발자님의 강의를 접하고 너무 좋아서,kafka 3개의 모두 수강하고 마지막 강의를 듣고 있습니다.지식을 모두에거 공유한다는 것이 너무 존경스럽고 개발자가 가져야하는 자세와 덕목이라 생각하고 배우게 되었습니다. 감사합니다. 실무에서 실제로 사용 하고 싶어서 집에서 AWS에 환경을 구축 해보고 싶습니다. 하지만 비용이 청구 된다는 것에 두려움을 가지고 있습니다. AWS는 실제로 한번도 없지만 클라우드 환경에 대한 선수 지식과 AWS사용 법등은 숙지하고 있지만, 비용적인 측면에서 기술을 사용하는데 있어 비용이 어떤 정책으로 측정 되는지 알고 있는게 없어 너무 걱정입니다. confluent kafka 와 apache kafka 사용법에 차이는 별로 없다고 하셧지만, 그래도 배운 confluent kafka로 aws를 구축 하고 싶은 마음이지만 몇가지 궁금 한게 있습니다. 현재 현업에서는 confluent kafka 와 apache kafka 둘 중 어떤것이 많이 사용되며, 선호 되는 것이 어떤건가요?현업에서는 실제로 구축한다면, 강의에서와 같이 Ubuntu AWS에서 만들어서,kafka를 설치하여 사용 하나요? 아니면 제공 되어지는 Saas를 통해 사용하나요? 보통 어떤것을 사용하나요? 아니면 다른 방법이 있나요? 실무에서 사용 하려면 kafka가 무료가 아니라고 알고 있습니다. confluent kafka 와 apache kafka 둘다 무료가 아닌건가요? AWS사용 비용외 별도 비용을 지불 하고 사용 해야 하는 건가요?, 아니면 Saas를 사용하면 AWS비용을 포함하여 비용이 청구 되나요? confluent kafka 와 apache kafka 둘다 똑같은 환경을 구축 한다면, 비용적 측면에서 어떤것이 저렴 한가요? 아니면 걱정 할 정도의 비용 차이가 나지 않는 건가요? 실제 현업에서 어느 정도 규모에서 사용 되며, 작은 규모, 중간 규모, 큰 규묘 에서 대략 비용이 대략 한달 얼마 정도 청구 되나요? 급하게 두서 없이 작성한거 같네요. 긴 글 읽어 주셔서 감사합니다.
-
미해결카프카 완벽 가이드 - ksqlDB
ksqlDB Cluster 여부 - 박성범님 질문(제가 대신해서 적습니다)
안녕하십니까, 박성범님이 수강평과 함께 질문을 올려 주셔서 제가 질문과 답변을 함께 적겠습니다. 먼저 질문 내용은 ksqlDB Cluster 구성 가능에 대한 질문입니다.
-
미해결카프카 완벽 가이드 - ksqlDB
Group by push쿼리 사용중 오류 관련해서 질문드립니다.
안녕하세요 강사님 질문드립니다. group by 를 사용하여 조회할때한번씩 group by 가 되지않고 조회되는경우가 있는데이경우에는 리눅스 서버가 문제인건가요?쿼리 날리면 결과가 조금 늦게 뜨긴합니다.
-
미해결카프카 완벽 가이드 - ksqlDB
ksqlDB 2부 강의의 PDF 문서는 어디서 다운 받을 수 있을까요
ksqlDB 2부 강의의 PDF 문서는 어디서 다운 받을 수 있을까요