23.11.19 09:03 작성
·
726
1
Streaming에서 Kafka 데이타 추출하기 부분 진행하고 있는데요
spark_kafka.py 실행시 에러가 납니다. ㅠ
root@81599cbd6b8f:/opt/bitnami/spark/work# spark-submit --master spark://spark:7077 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 spark_kafka.py
.....
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 11 | 11 | 11 | 0 || 11 | 11 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-8f3a0b4c-b23d-4dfa-b9b0-8649735433fc
confs: [default]
11 artifacts copied, 0 already retrieved (56445kB/64ms)
23/11/18 23:57:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/18 23:57:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/11/18 23:57:24 WARN OffsetSeqMetadata: Updating the value of conf 'spark.sql.shuffle.partitions' in current session from '3' to '200'.
23/11/18 23:57:24 ERROR MicroBatchExecution: Query [id = 40288f62-daae-4e69-80db-ff6f83156268, runId = 535853f9-9153-44be-8eca-19f75ee8b4ea] terminated with error
java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"logOffset":2}
at org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:75)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.deserializeOffset(KafkaMicroBatchStream.scala:216)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$getStartOffset$1(MicroBatchExecution.scala:454)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.getStartOffset(MicroBatchExecution.scala:454)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Traceback (most recent call last):
File "/opt/bitnami/spark/work/spark_kafka.py", line 38, in <module>
query.awaitTermination()
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 201, in awaitTermination
File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 40288f62-daae-4e69-80db-ff6f83156268, runId = 535853f9-9153-44be-8eca-19f75ee8b4ea] terminated with exception: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"logOffset":2}
답변 9
1
2023. 11. 19. 09:20
안녕하세요 Oh Suhyeon님,
제가 보기에는 Configuration 에러 같은데, 에러를 풀이해 보자면 Kafka 오프셋에 들어있는 데이터 포맷이 스파크 스트리밍에서 기대했던 것과 달리 잘 맞지 않는 것 같은데요. 연습하시는 거라면, 카프카 데이터를 다시 지우고 다시 데이터를 넣으신 다음에 시작하시면 문제가 해결될 듯 합니다.
프로덕션 데이터라면 하나씩 데이터를 열어보고 어떻게 데이터가 잘 못 되었는지 알아야 되는데, 그건 아닌 것 같으니, 데이터 다시 넣으시고 시작하시는게 나은 해결책 같네요.
지난 번에 리뷰 남기신 거 보고, 스트리밍 데이터 다시 보강해 놨으니 좋은 공부 되시고, 좋은 리뷰 부탁드립니다.
0
혹시 몰라 도커 환경 리스트도 같이 첨부할게요!
% docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
64ab0ab44e45 bitnami/spark:3.4 "/opt/bitnami/script…" 12 hours ago Up 12 hours dockertest-spark-worker-2
a105f410fb7e bitnami/spark:3.4 "/opt/bitnami/script…" 13 hours ago Up 12 hours 0.0.0.0:4040->4040/tcp, 0.0.0.0:8080->8080/tcp, 0.0.0.0:18080->18080/tcp dockertest-spark-1
8bd9e59d9d2d bitnami/cassandra:4.0.11 "/opt/bitnami/script…" 13 hours ago Up 12 hours 7000/tcp, 0.0.0.0:9042->9042/tcp dockertest-cassandra-1
26427ab96f20 bitnami/kafka:3.4 "/opt/bitnami/script…" 13 hours ago Up 11 hours 0.0.0.0:9092->9092/tcp, 0.0.0.0:9094->9094/tcp dockertest-kafka-1
c2c01171c8e6 jupyter/pyspark-notebook "tini -g -- start-no…" 13 hours ago Up 12 hours (healthy) 4040/tcp, 0.0.0.0:8888->8888/tcp dockertest-pyspark-1
bb193b55b622 bitnami/spark:3.4 "/opt/bitnami/script…" 13 hours ago Up 12 hours dockertest-spark-worker-1
구글링 후 버전을 낮추면서 실행해도 동일합니다.
실행을 할때마다 app 폴더가 계속 생깁니다
0
저도 동일한 에러가 발생하는 것 같습니다. 맥북 M1을 사용중이며 강의 자료 수정 없이 진행했는데 발생합니다. 윈도우 노트북에서도 똑같은 에러가 발생하네요...
먼저 카프카 컨테이너에서 토픽부분입니다
I have no name!@26427ab96f20:/opt/bitnami/kafka/bin$ ./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic quickstart --producer.config /opt/bitnami/kafka/config/producer.properties
>123 1234
>asdd fdsfsf
스파크 컨테이너에서 9092 포트 확인 부분입니다.
root@a105f410fb7e:/opt/bitnami/spark/work# nc -vz kafka 9092
Connection to kafka (172.19.0.2) 9092 port [tcp/*] succeeded!
스파크 컨테이너에서 실행시 에러입니다.
root@a105f410fb7e:/opt/bitnami/spark/work# spark-submit --master spark://spark:7077 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 spark_kafka.py
:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b98230a9-4cd2-4a18-8308-db54fe90bc3e;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
found org.apache.kafka#kafka-clients;3.3.2 in central
found org.lz4#lz4-java;1.8.0 in central
found org.xerial.snappy#snappy-java;1.1.10.1 in central
found org.slf4j#slf4j-api;2.0.6 in central
found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
found org.apache.hadoop#hadoop-client-api;3.3.4 in central
found commons-logging#commons-logging;1.1.3 in central
found com.google.code.findbugs#jsr305;3.0.0 in central
found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 244ms :: artifacts dl 6ms
:: modules in use:
com.google.code.findbugs#jsr305;3.0.0 from central in [default]
commons-logging#commons-logging;1.1.3 from central in [default]
org.apache.commons#commons-pool2;2.11.1 from central in [default]
org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
org.apache.hadoop#hadoop-client-runtime;3.3.4 from central in [default]
org.apache.kafka#kafka-clients;3.3.2 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 from central in [default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 from central in [default]
org.lz4#lz4-java;1.8.0 from central in [default]
org.slf4j#slf4j-api;2.0.6 from central in [default]
org.xerial.snappy#snappy-java;1.1.10.1 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 11 | 0 | 0 | 0 || 11 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-b98230a9-4cd2-4a18-8308-db54fe90bc3e
confs: [default]
0 artifacts copied, 11 already retrieved (0kB/5ms)
24/06/04 23:46:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
24/06/04 23:46:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/06/04 23:46:39 WARN OffsetSeqMetadata: Updating the value of conf 'spark.sql.shuffle.partitions' in current session from '3' to '200'.
24/06/04 23:46:40 ERROR MicroBatchExecution: Query [id = 4e479a54-3766-412f-ba6c-c5879ecc0e00, runId = a0b6261a-59a5-4f3b-a214-db6dad504d90] terminated with error
java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"logOffset":0}
at org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:75)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.deserializeOffset(KafkaMicroBatchStream.scala:216)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$getStartOffset$1(MicroBatchExecution.scala:454)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.getStartOffset(MicroBatchExecution.scala:454)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Traceback (most recent call last):
File "/opt/bitnami/spark/work/spark_kafka.py", line 44, in <module>
query.awaitTermination()
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 201, in awaitTermination
File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 4e479a54-3766-412f-ba6c-c5879ecc0e00, runId = a0b6261a-59a5-4f3b-a214-db6dad504d90] terminated with exception: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"logOffset":0}
2024. 06. 05. 10:34
안녕하세요 찬영님,
제가 기억하기로는 테스트 하시는 거라면 테스트 토픽을 다시 지우시고 한번 시작해 보시겠어요? 기존에 넣으셨던 자료 중에 뭔가가 오류가 있는 것 같습니다. 제가 강의를 할때, 각각 시작할때, 토픽을 지우고 강의를 하는 바람에 수강을 한번에 들으시는 분들은 에러가 날 수 있을 것 같습니다.
0
0
2023. 11. 20. 15:21
안녕하세요 Oh Suhyeon님,
혹시 Docker Desktop에 대한 문제가 아닐까요? 윈도우에서는 네트워크상 문제가 잘 일어난다고 듣기는 했는데... 포트를 바꾸셔도 같은 에러라면 컨테이너끼리 서로 통신에 문제가 있는 것 같은데요.
혹시 제가 말씀 드렸던대로 nc
명령어 사용해 보셨나요?
root@dadc6458f9fe:/opt/bitnami/spark# nc -vz kafka 9092
Connection to kafka (172.30.0.3) 9092 port [tcp/*] succeeded!
답변에서 Kafka에서 read를 하셨다고 하는데, Spark Container에서 해 보신건가요?
그것도 아니라면, Spark나 Kafka 버전 문제일까요? 버전 매치가 꽤 중요하거든요... 다시 한번 예제 코드와 확인해 보시기 바랍니다.
마지막으로, 혹시 코드가 잘 못 되었나 싶어서 다시 실행을 해봤는데, 아무런 문제 없이 돌아가는데... 흠... 도무지 알 수가 없네요.
0
2023. 11. 19. 22:23
네 volume으로 잡혀있는것을 확인하고 폴더를 지우고 도커컴포즈도 재시작 했습니다 ㅠ
호스트와 포트를 바꾼 이유는 실제로 데이터가 이상해서 에러가 난지 확인하기 위함이었습니다. 에러문구나 강사님에 처음에 답변주신내용은 데이터가 기대했던거랑 다르다라는 내용이라고 하였으나, kafka consumer.sh로 실행해봤을 때 데이터에는 특이사항이 없었습니다. ㅠ 그래서 혹시 통신이 안됐는데 데이터가 이상하다고 에러 뱉는건 아닐까 의심이 들어 포트를 일부러 이상한 값을 넣어서 테스트 해본것입니다. 카프카랑 아예상관없는 포트를 넣어도 에러내용은 동일해, 데이터 이슈는 아닌것 같네요 ㅠㅠ kafka 호스트랑 포트는 read로 했을때 잘되서 통신도 아닌것 같구 다른설정문제일까요 ㅠ
0
2023. 11. 19. 16:43
안녕하세요 Oh suhyeon님,
Docker Compose를 다시 시작하는 것은 의미가 없습니다. 왜냐하면 제가 데이터를 보존하기 위해서 Docker Compose Kafka에 볼륨을 설정해 놓았기 때문에 그 디렉토리를 지우셔야 Kafka의 데이터가 지워집니다.. kafka-persistence
디렉토리안에 있는 모든 파일을 지우고 다시 시작하시면 됩니다.
https://github.com/dimz119/learn-pyspark/blob/main/docker-compose.yml#L56
Docker Compose는 어플리케이션들을 다시 시작하는 거지 데이타까지 초기화하는 것이 아닙니다.
마지막으로 제가 설정한 Docker Compose를 그대로 쓰실거라면 호스트와 포트 번호는 바꾸시면 안됩니다. 이는 Docker Compose에 아래 링크처럼 설정되어 있기 때문입니다.
호스트: https://github.com/dimz119/learn-pyspark/blob/main/docker-compose.yml#L53
포트번호: https://github.com/dimz119/learn-pyspark/blob/main/docker-compose.yml#L61
그래도 카프카 포트를 확인 하시고 싶으시다면 Spark 다커로 SSH 하신 후, netcat이 설치 후 nc -vz kafka 9092
를 하시면 포트가 오픈되어 있는지 확인 가능합니다.
0
2023. 11. 19. 11:13
그런데 이상하네요 read는 잘되는데 readStream만 안되서 ㅠ
아래처럼 kafka 포트 번호 나 호스트 다른걸로 넣어도 동일한 에러가 발생해서 kafka 토픽을 실제로 안보고 있는것 같아서요
events = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092111")
.option("subscribe", "quickstart")
# .option("startingOffsets", "earliest")
.load()
)
0
2023. 11. 19. 10:46
빠륻답변감사합니다 도커컴포즈 자체를 재시작했는데도 동일합니다. 키프카에 topic을 생성하지 않고 실행햇을때 동일한 에러가 나는것으로보아 spark에서 카프카를 제대로 바라보고 있는지 의문이 드는데 확인할수있는방법 없을까요 ㅠ
2024. 06. 05. 10:36
안녕하세요 찬영님,
이 부분은 https://www.inflearn.com/questions/1208897/app-lt-timestamp-gt-lt-executor-id-gt-%ED%8F%B4%EB%8D%94%EA%B0%80-spark-events-%EB%82%B4%EB%B6%80%EC%97%90-%EC%83%9D%EA%B8%B0%EA%B2%8C-%ED%95%98%EA%B3%A0-%EC%8B%B6%EC%8A%B5%EB%8B%88%EB%8B%A4를 참고해 보시겠어요? 다른 분이 이 디렉토리를 다른 곳에 옮기고 싶으시다 하셔서 설명해 드렸습니다.