23.01.09 19:00 작성
·
727
·
수정됨
0
안녕하세요 수강 중 아래 실습 진행이 어려워 질문 드립니다.
수집 파일럿 실행 5단계에서 카프카 Consumer 작동 실습시 Resetting offset 문구가 뜨며 데이터 확인이 안됩니다.
시뮬레이터 재실행 및 스마트카 상태 정보 다시 옮겨 보았지만 안되었습니다.
car-batch-log 를 확인했을때 위와 같이 뜨는데 상관없는지 궁금합니다.
[오류문구]
[root@server02 ~]# kafka-console-consumer --bootstrap-server server02.hadoop.com :9092 --topic SmartCar-Topic --partition 0
23/01/09 18:47:52 INFO utils.Log4jControllerRegistration$: Registered kafka:type =kafka.Log4jController MBean
23/01/09 18:47:52 INFO consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [server02.hadoop.com:9092]
check.crcs = true
client.dns.lookup = default
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-96835
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArray Deserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer .RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArr ayDeserializer
23/01/09 18:47:53 INFO utils.AppInfoParser: Kafka version: 2.2.1-cdh6.3.2
23/01/09 18:47:53 INFO utils.AppInfoParser: Kafka commitId: unknown
23/01/09 18:47:53 INFO consumer.KafkaConsumer: [Consumer clientId=consumer-1, gr oupId=console-consumer-96835] Subscribed to partition(s): SmartCar-Topic-0
23/01/09 18:47:53 INFO clients.Metadata: Cluster ID: AWKsMltnSBSo2f2sVF2P-g
23/01/09 18:47:53 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId =console-consumer-96835] Resetting offset for partition SmartCar-Topic-0 to offs et 14.
감사합니다.
답변 2
1
2023. 01. 10. 01:58
안녕하세요! "Idy242"님!
Resetting offset 는...카프카 브로커 상태에 따라 발생할 수 있는 메세지 입니다.
그보다 먼저 로그 시물레이터가 정상 실행 되었는지 확인이 필요해 보입니다.
현재 실습 단계에선, P133에서 2개(배치성/실시간성)의 java 로그 시뮬레이터를 실행 시키는데요..
이때 시뮬레이터가 만들어 내는 배치로그와 실시간로그의 생성 되는 위치는 아래와 같습니다.
배치로그는 mv 명령을 통해 아래의 위치에 파일이 있어야 하고요~
/home/pilot-pjt/working/car-batch-log/
실시간로그는 java 시뮬레이터 명령과 동시에 아래의 경로에 파일이 만들어 집니다.
/home/pilot-pjt/working/driver-realtime-log
문의하신 현재 파일럿 실습 단계는... 위 2번의 경로에서부터 발생한 데이터를 플럼이 수집해서, 카프카 서버에 전송하고, 이것을 카프카 컨슈머 명령으로 확인 하는 작업 입니다.
현재 보여주신 로그상에선 아예 카프카 서버로 데이터가 들어 오지 않았기 때문에, 카프카 컨슈머가 받은 데이터가 없는 것으로 보입니다.
그럼 카프카의 앞단계인 아래 3가지를 체크해 보셔야 할 것 같습니다.
실시간 로그 시뮬레이터 명령을 정상 실행 했는지?
$ java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.
DriverLogMain 20160101 3 &
아래 경로에 실시간 로그가 만들어 지는지?
/home/pilot-pjt/working/driver-realtime-log
플럼이 정상 작동 중인지? 플럼 로그에 에러는 없는지?
확인 부탁 드립니다~~~ ^^
-빅디 드림
0
빅디님 저도 똑같은 오류가 발생합니다 ㅠㅠ
[root@server02 driver-realtime-log]# kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0
24/06/07 18:46:56 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
24/06/07 18:46:56 INFO consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [server02.hadoop.com:9092]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-70114
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
24/06/07 18:46:56 INFO utils.AppInfoParser: Kafka version: 2.2.1-cdh6.3.2
24/06/07 18:46:56 INFO utils.AppInfoParser: Kafka commitId: unknown
24/06/07 18:46:56 INFO consumer.KafkaConsumer: [Consumer clientId=consumer-1, groupId=console-consumer-70114] Subscribed to partition(s): SmartCar-Topic-0
24/06/07 18:46:56 INFO clients.Metadata: Cluster ID: nozwxBNpTgug3RimbEqQlA
24/06/07 18:46:56 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-70114] Resetting offset for partition SmartCar-Topic-0 to offset 3.
2024. 06. 08. 11:05
안녕하세요! 김태욱님!
Offset 로그는 에러가아닌, Topic 위치정보(Offset) 재설정 로그입니다.
해당 로그는 무시하셔도 되고요, 관련해 파일럿 기능 문제가 있는지 확인해 보시면 됩니다. ^^
-빅디 드림
2023. 01. 10. 10:20
답변 감사합니다. 안내 해주신 사항으로 다시 체크하며 수행해보니 카프카 consumer는 작동이 잘 되고 있습니다.
그런데 p.135의 수집 기능 점검에서
플럼 표준 출력 로그 전송에 문제가 있어 문의드립니다.
[오류 문구]
[root@server02 car-batch-log]# ls -ltr
total 0
[root@server02 car-batch-log]# tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
2023-01-10 10:08:24,949 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: SmartCarInfo_SpoolSource started
2023-01-10 10:08:25,029 INFO org.eclipse.jetty.util.log: Logging initialized @1996ms
2023-01-10 10:08:25,244 INFO org.eclipse.jetty.server.Server: jetty-9.3.25.v20180904, build timestamp: 2018-09-05T06:11:46+09:00, git hash: 3ce520221d0240229c862b122d2b06c12a625732
2023-01-10 10:08:25,460 INFO org.eclipse.jetty.server.AbstractConnector: Started ServerConnector@2d5a0fec{HTTP/1.1,[http/1.1]}{0.0.0.0:41414}
2023-01-10 10:08:25,460 INFO org.eclipse.jetty.server.Server: Started @2427ms
2023-01-10 10:08:25,509 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version: 2.2.1-cdh6.3.2
2023-01-10 10:08:25,509 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId: null
2023-01-10 10:08:25,513 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: DriverCarInfo_KafkaSink: Successfully registered new MBean.
2023-01-10 10:08:25,513 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: DriverCarInfo_KafkaSink started
2023-01-10 10:08:25,560 INFO org.apache.kafka.clients.Metadata: Cluster ID: AWKsMltnSBSo2f2sVF2P-g
카프카에 실시간으로 데이터 유입되는 것은 확인이 되는데
[p.135]
tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
해당 명령어에서 오류가 발생합니다.
chmod로 권한 설정은 동일하게 진행했습니다.
[vi /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log 문구]
[root@server02 ~]# vi /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
2023-01-09 15:57:13,119 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2023-01-09 15:57:13,128 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/var/run/cloudera-scm-agent/process/32-flume-AGENT/flume.conf
2023-01-09 15:57:13,132 INFO org.apache.flume.conf.FlumeConfiguration: Processing:source1
2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:source1
2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:channel1
2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:channel1
2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: sink1 Agent: tier1
2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:source1
2023-01-09 15:57:13,134 INFO org.apache.flume.conf.FlumeConfiguration: Processing:source1
2023-01-09 15:57:13,134 WARN org.apache.flume.conf.FlumeConfiguration: Agent configuration for 'tier1' has no configfilters.
2023-01-09 15:57:13,155 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [tier1]
2023-01-09 15:57:13,155 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels
2023-01-09 15:57:13,165 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel channel1 type memory
2023-01-09 15:57:13,173 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel channel1
2023-01-09 15:57:13,173 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source source1, type netcat
2023-01-09 15:57:13,193 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: sink1, type: logger
2023-01-09 15:57:13,194 INFO org.apache.flume.node.AbstractConfigurationProvider: Channel channel1 connected to [source1, sink1]
2023-01-09 15:57:13,195 INFO org.apache.flume.node.Application: Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:source1,state:IDLE} }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@23a8f888 counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }
2023-01-09 15:57:13,199 INFO org.apache.flume.node.Application: Starting Channel channel1
2023-01-09 15:57:13,201 INFO org.apache.flume.node.Application: Waiting for channel: channel1 to start. Sleeping for 500 ms
2023-01-09 15:57:13,256 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.
2023-01-09 15:57:13,257 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
2023-01-09 15:57:13,701 INFO org.apache.flume.node.Application: Starting Sink sink1
2023-01-09 15:57:13,703 INFO org.apache.flume.node.Application: Starting Source source1
2023-01-09 15:57:13,704 INFO org.apache.flume.source.NetcatSource: Source starting
2023-01-09 15:57:13,723 INFO org.eclipse.jetty.util.log: Logging initialized @1856ms
2023-01-09 15:57:13,725 INFO org.apache.flume.source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]
2023-01-09 15:57:13,824 INFO org.eclipse.jetty.server.Server: jetty-9.3.25.v20180904, build timestamp: 2018-09-05T06:11:46+09:00, git hash: 3ce520221d0240229c862b122d2b06c12a625732
2023-01-09 15:57:13,868 INFO org.eclipse.jetty.server.AbstractConnector: Started ServerConnector@5f31a941{HTTP/1.1,[http/1.1]}{0.0.0.0:41414}
2023-01-09 15:57:13,871 INFO org.eclipse.jetty.server.Server: Started @2004ms
2023-01-09 16:27:03,600 INFO org.apache.flume.node.Application: Shutting down configuration: { sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:source1,state:START} }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@23a8f888 counterGroup:{ name:null counters:{runner.backoffs.consecutive=225, runner.backoffs=225} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }
2023-01-09 16:27:03,605 INFO org.apache.flume.node.Application: Stopping Source source1
2023-01-09 16:27:03,605 INFO org.apache.flume.lifecycle.LifecycleSupervisor: Stopping component: EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:source1,state:START} }
2023-01-09 16:27:03,605 INFO org.apache.flume.source.NetcatSource: Source stopping
2023-01-09 16:27:03,606 INFO org.apache.flume.node.Application: Stopping Sink sink1
2023-01-09 16:27:03,606 INFO org.apache.flume.lifecycle.LifecycleSupervisor: Stopping component: SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@23a8f888 counterGroup:{ name:null counters:{runner.backoffs.consecutive=225, runner.backoffs=225} } }
2023-01-09 16:27:03,606 INFO org.apache.flume.node.Application: Stopping Channel channel1
CM의 클러스터를 재시작하고 하니까 갑자기 됩니다.