해결된 질문
작성
·
81
1
CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS INT) AS new_column FROM test_stream EMIT CHANGES;
이렇게 기존 test_stream에서 column을 추가한 add_stream을 만들려고 CLI문을 실행시키면
원래 test_stream에 담겨있는 data가 담아져서 나오는데
package com.example.service;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;
@Service
public class streamPracticeAdd {
@Value("${ksqldb.server.host}")
private String ksqlDbHost;
private int ksqlDbPort;
private Client client;
@PostConstruct
public void init() {
ClientOptions options = ClientOptions.create()
.setHost(ksqlDbHost)
.setPort(ksqlDbPort);
client = Client.create(options);
}
public void streamsAdd(String columnName, String dataType) {
String createStreamKsql = "CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS " + dataType + ") AS " + columnName + " FROM test_stream EMIT CHANGES;";
try {
ExecuteStatementResult result = client.executeStatement(createStreamKsql).get();
System.out.println("Stream created and data inserted into new topic: " + result.queryId());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
/kafka/addColumn/new_column/INT 인 API 요청을 줘서 새 stream을 만드는 코드인데
실행시키면 기존 column에 새 column까지 추가는 되는데 기존 data가 하나도 들어오지 않습니다.
검색을 해봤는데도 잘 안나와서 질문 남깁니다 감사합니다.
답변 1
0
안녕하십니까,
저도 Java API 인터페이스는 잘 모르지만,
streamsAdd() 메소드를 수행하면 add_stream Stream은 만들어지는데, test_stream에 데이타를 입력하면 add_stream에서 데이터가 출력되지 않는다는 건가요? 만약 그렇다면,
streamsAdd()메소드에서 stream 이름을 add_stream이 아니라 add_stream_new로 해서 새롭게 만들어 보시고, 함 테스트 해보시지요. 그리고 CLI에서 add_stream_new select 해서 데이터가 나오는지 확인해 보십시요.
안되면 다시 글 부탁드립니다.
감사합니다.
다시 실행하였더니 수행됐습니다 감사합니다