인프런 커뮤니티 질문&답변

행복한 족제비님의 프로필 이미지
행복한 족제비

작성한 질문수

카프카 완벽 가이드 - ksqlDB

CLI로 실행과 코드로 실행하면 결과가 다르게 나옵니다

해결된 질문

작성

·

70

1

CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS INT) AS new_column FROM test_stream EMIT CHANGES;

이렇게 기존 test_stream에서 column을 추가한 add_stream을 만들려고 CLI문을 실행시키면

원래 test_stream에 담겨있는 data가 담아져서 나오는데

package com.example.service;

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;

@Service
public class streamPracticeAdd {

    @Value("${ksqldb.server.host}")
    private String ksqlDbHost;

    private int ksqlDbPort;

    private Client client;

    @PostConstruct
    public void init() {
        ClientOptions options = ClientOptions.create()
                .setHost(ksqlDbHost)
                .setPort(ksqlDbPort);
        client = Client.create(options);
    }

    public void streamsAdd(String columnName, String dataType) {

        String createStreamKsql = "CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS " + dataType + ") AS " + columnName + " FROM test_stream EMIT CHANGES;";
        try {
            ExecuteStatementResult result = client.executeStatement(createStreamKsql).get();
            System.out.println("Stream created and data inserted into new topic: " + result.queryId());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

/kafka/addColumn/new_column/INT 인 API 요청을 줘서 새 stream을 만드는 코드인데
실행시키면 기존 column에 새 column까지 추가는 되는데 기존 data가 하나도 들어오지 않습니다.

검색을 해봤는데도 잘 안나와서 질문 남깁니다 감사합니다.

답변 1

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

저도 Java API 인터페이스는 잘 모르지만,

streamsAdd() 메소드를 수행하면 add_stream Stream은 만들어지는데, test_stream에 데이타를 입력하면 add_stream에서 데이터가 출력되지 않는다는 건가요? 만약 그렇다면,

  1. streamsAdd()메소드에서 stream 이름을 add_stream이 아니라 add_stream_new로 해서 새롭게 만들어 보시고, 함 테스트 해보시지요. 그리고 CLI에서 add_stream_new select 해서 데이터가 나오는지 확인해 보십시요.

안되면 다시 글 부탁드립니다.

감사합니다.

다시 실행하였더니 수행됐습니다 감사합니다

행복한 족제비님의 프로필 이미지
행복한 족제비

작성한 질문수

질문하기