해결된 질문
작성
·
476
1
안녕하세요 강사님. 커넥터, 스키마 레지스트리 관련 질문드립니다!
현재 json형식의 데이터를 s3 sink connector를 통하여 parquet 형식으로 저장하려고 합니다.
json 형식의 데이터는 키 값들이 일정하진 않습니다. 예를 들어 어떤 데이터는 { "test1":"test", "test2":"test2"} 이런식이고, 어떤 데이터는 {"test1":"test1"} 이런 식입니다. 이런 경우에도 스키마 레지스트리를 활용하여 적재가 가능할까요? 없는 키 값들에 대해선 default로 null값을 스키마에 명시하면 자동으로 null처리가 되어 들어오는지 궁금합니다.
그리고 아래처럼 커넥터에 설정을 주면 자동으로 json형식의 데이터가 들어올때 커넥터에서 스키마레지스트리를 바라보고 스키마를 읽어서 parquet로 적재가 되는건지 문의드립니다.
value.converter.schema.registry.url=localhost:8080
value.converter=io.confluent.connect.avro.AvroConverter
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
답변 1
1
안녕하십니까,
제가 s3 sink connector는 연동해 보지는 않았지만, 적어주신 방법대로 JSON 메시지 토픽만으로는 스키마 레지스트리와 연동이 되지는 않을 것 같습니다.
그 이유로는
먼저 스키마 레지스트리는 Topic 메시지가 호환되는 타입이 정해져 있습니다. Avro, Protobuf, Json schema 입니다. 그러니까 메시지 타입에 대한 정보와 메시지 값 자체를 가지는 형태의 메시지들을 스키마 레지스트리에서 지원합니다.
만약에 Json 값만 있는 형태로만 스키마 레지스트리에 등록될 수가 없습니다.
json 메시지를 Avro 메시지로 변경해서 Topic으로 저장할 수 있으면 좋지만 그게 아니고 json 메시지만 있는 형태라면 schema registry를 이용할 수는 없습니다.
메뉴얼을 살펴 보았을 때는 Topic 메시지가 Avro, Protobuf, Json schema 형태일 때는 s3 sink connector로 parquet 파일 저장이 가능해 보입니다.
Topic 메시지가 Avro, Protobuf, Json schema 형태일 때도 주의해야 할 점이 있습니다.
Kafka Connect 기반에서 Schema Registry를 이용하려면 일반적으로 source connector에서 Schema Registry를 활용해서 topic에 메시지를 저장하고, 다시 sink connector에서 Schema Registry로 topic에서 메시지를 가져가는 방식을 사용합니다. 즉 source와 sink connector가 함께 동작하는 방식을 적용합니다.
왜냐면 source connector에서 topic으로 avro 메시지를 보낼 때 오리지널 avro 메시지를 보내지 않습니다. 오리지널 avro 메시지는 레코드 별로 schema에 대한 정보를 가지고 있으므로 데이터가 매우 커지게 됩니다. 때문에 source connector에서는 key converter, value conveter에서 avro converter를 적용하되 오리지널 avro 형식이 아닌 schema registry와 인터페이스 할 수 있는 avro를 생성합니다. 그리고 sink connector에서는 topic의 메시지를 해석할 때 schema registry에서 정보를 가져와서 avro 메시지를 해석을 하게 됩니다.
그런데 지금 토픽 메시지만 가지고 있는 상황에서 이를 적용하려면 이 schema 정보를 schema registry에 topic의 이름에 기반하여 REST API로 만들어 준 다음에 이를 기반으로 Sink connector에서 이 정보를 가져와서 Topic 메시지와 결합하는 방식을 해줘야 합니다.
요약드리자면 현재 json value 값만 가지고는 s3 sink connector에서 schema registry 활용이 낭되며, schema 정보를 함께 가지는 avro, proto-buf, json schema 형태의 topic 메시지로 변환하여 저장할 경우라도 schema registry에 topic명에 기반한 subject를 생성해 줘야 s3 sink connector에 schema registry로 schema를 해석 한 뒤에 parquet 포맷으로 변환이 가능할 것으로 보입니다.
감사합니다.
답변 너무 감사드립니다. 큰 도움이 되었습니다! avro로 바꿔서 적용해보도록 하겠습니다