인프런 커뮤니티 질문&답변

lsch7210님의 프로필 이미지
lsch7210

작성한 질문수

15일간의 빅데이터 파일럿 프로젝트

Flume에서 배치 처리중 오류 발생

작성

·

423

0

스마트카 상태정보(SmartCarStatusInfo_20160101.txt)를 car-batch-log/ 로 복사해서

Flume이 Hadoop에 저장하는 처리중 에러가 발생합니다. 어디를 잘못 했을까요?

에러 내용 --------------------------------------------------------------------------------------

2021-05-04 08:51:25,195 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel DriverCarInfo_Channel

2021-05-04 08:51:25,195 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel SmartCarInfo_Channel type memory

2021-05-04 08:51:25,203 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel SmartCarInfo_Channel

2021-05-04 08:51:25,205 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source SmartCarInfo_SpoolSource, type spooldir

2021-05-04 08:51:25,234 ERROR org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Unhandled error

java.lang.NoSuchMethodError: org.apache.flume.Context.getSubProperties(Ljava/lang/String;)Ljava/util/Map;

        at org.apache.flume.source.SpoolDirectorySource.configure(SpoolDirectorySource.java:176)

        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)

        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)

        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)

        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

답변 2

0

빅디님이 제공해주신 예제 bigdata.smartcar.flume-1.0.jar 에서 사용된 패키지의 버전과 cdh 에서 설치한 flume버전이 달라서 발생하는 문제 같습니다. jar interceptor 를 제거하니 해당 문제는 발생하지 않습니다.. bigdata.smartcar.flume-1.0.jar를 패키징 한 프로젝트 파일을 공유해주시면, 수정해서 사용 가능할 것 같습니다.

 

임시 방편으로나마 collectDayInterceptor를 제거한 conf를 사용하시면 될것 같습니다..

SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource

SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel

SmartCar_Agent.sinks    = SmartCarInfo_HdfsSink DriverCarInfo_KafkaSink

 

 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

 

 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = timeInterceptor typeInterceptor filterInterceptor

 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.timeInterceptor.type = timestamp

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.timeInterceptor.preserveExisting = true

 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.type = static

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.key = logType

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.value = car-batch-log

 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false

 

SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory

SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000

SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000

 

 

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.type = hdfs

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.path = /pilot-pjt/collect/%{logType}/wrk_date=%Y%m%d

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.filePrefix = %{logType}

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.fileSuffix = .log

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.fileType = DataStream

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.writeFormat = Text

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.batchSize = 10000

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollInterval = 0

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollCount = 0

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.idleTimeout = 100

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.callTimeout = 600000

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollSize = 67108864

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.threadsPoolSize = 10

 

 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.channel = SmartCarInfo_Channel

 

 

SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec

SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log

SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true

SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

 

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

 

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000

 

 

SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory

SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000

SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000

 

 

SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel

 

Big.D님의 프로필 이미지
Big.D
지식공유자

안녕하세요! 빅디입니다!

추석 명절 잘 보내셨는지요~ ㅎㅎ

김명훈님 말씀대로 버젼에 차이가 발생하면 플럼의 인터셉터에서 문제가 발생 할 수 있습니다.

하지만 파일럿 프로젝트의 수집 환경을 임으로 바꾸지 않았다면 아래의 버젼이 유지 된 상태로, 해당 기능은 정상 작동 해야 합니다.  

JDK 1.8 

Flume 1.9

bigdata.smartcar.flume-1.0.jar 

보통 JDK의 심벌릭 링크 작업중 오타등의 실수로 해당 문제가 발생 하곤 합니다.

그리고 요청하신 bigdata.smartcar.flume-1.0.jar  프로젝트 워크스페이스는 아래 경로에 있습니다.

\\예제소스\bigdata2nd-master\workspace\bigdata.smartcar.flume

- 빅디 드림

0

Big.D님의 프로필 이미지
Big.D
지식공유자

안녕하세요! 빅디 입니다.

플럼의 Conf 내용을 확인해 볼 필요가 있는데요..

그전에 자바 버젼의 심벌릭 링크 작업에 문제가 있을 수 있습니다.

Server02에 접속하셔서 아래의 자바 버젼 확인 부탁 드립니다.  - 빅디 드림

$ java -version

lsch7210님의 프로필 이미지
lsch7210
질문자

[root@server02 working]# java -version

java version "1.8.0_181"

Java(TM) SE Runtime Environment (build 1.8.0_181-b13)

Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

Big.D님의 프로필 이미지
Big.D
지식공유자

넵! 확인 했습니다. JDK 문제는 아닌것 같습니다.

다음으로 플럼 에이전트의 Conf 파일 확인이 필요합니다. 클라우데라 매니져 > 홈 > 플럼 > 구성 에서요, 플럼의 구성요소 내용이 진행중인 실습 단계의 내용과 일치 하는지 또는 오타가 있는지 확인을 해보시기 바랍니다.

그래도 해결이 안되면 현재 플럼 에이전트 Conf 내용을 여기 댓글로 복붙 해주세요~

- 빅디 올림 

lsch7210님의 프로필 이미지
lsch7210
질문자

SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource

SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel

SmartCar_Agent.sinks    = SmartCarInfo_HdfsSink DriverCarInfo_KafkaSink

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = timeInterceptor typeInterceptor collectDayInterceptor filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.timeInterceptor.type = timestamp

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.timeInterceptor.preserveExisting = true

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.type = static

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.key = logType

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.value = car-batch-log

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.collectDayInterceptor.type = com.wikibook.bigdata.smartcar.flume.CollectDayInterceptor$Builder

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false

SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory

SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000

SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.type = hdfs

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.path = /pilot-pjt/collect/%{logType}/wrk_date=%Y%m%d

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.filePrefix = %{logType}

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.fileSuffix = .log

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.fileType = DataStream

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.writeFormat = Text

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.batchSize = 10000

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollInterval = 0

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollCount = 0

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.idleTimeout = 100

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.callTimeout = 600000

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollSize = 67108864

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.threadsPoolSize = 10

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel

SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.channel = SmartCarInfo_Channel

SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec

SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log

SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true

SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000

SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory

SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000

SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000

SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel

Big.D님의 프로필 이미지
Big.D
지식공유자

보내 주신 플럼 Conf 파일에도 문제는 없어 보입니다.
(저의 파일럿 환경에서 해당 Conf 내용으로 테스트도 잘 됐습니다.)

에러의 내용을 봐서는 플럼 SpoolDir이 car-batch-log 폴더를 폴링 하면서 SmartCarStatusInfo_20160101.txt 파일을 처리할때 발생 하는 에러 같습니다.   

2021-05-04 08:51:25,234 ERROR org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Unhandled error

java.lang.NoSuchMethodError: org.apache.flume.Context.getSubProperties(Ljava/lang/String;)Ljava/util/Map;

위와 같은 에러는 자바 환경에서 소프트웨어( or Library )간 버젼 문제로 자주 발생 하는 에러 입니다. Java의 버젼과 사용하는 소프트웨어들의 버젼 호완성 문제들로 생기는데요...예를들어 Java 버전 1.8 인데, 사용중이던 플럼의 버젼이 1.2 에서 2.0 으로 업그레이드 하는 경우 입니다. 

파일럿 환경을 순서대로 따라만 하면 위와 같은 문제가 발생 하지는 않는데요..
제가 lsch7210님의 작업 과정을 모두 알수 없어서 원인만 추축 하게 되네요....^^;; 

일단 최선책은 파일럿 환경에 추가적인 변경 작업이 없었다면, 앞에 과정으로 돌아가서 꼼꼼히 다시 해보는 것을 추천드립니다.

만약 똑같은 문제가 계속 발생 한다면 사용중이신 소프트웨어의 모든 로그 파일을 분석 해야 할 수도 있습니다. 번거롭지만 다시한번 앞단계로 넘어가 플럼을 삭제하고 설치 > 실습 진행 단계를 부탁 드립니다.  -빅디 올림

 (플럼 삭제 방법은 Cloudera Manager > Home > Flume > 삭제 입니다.)

lsch7210님의 프로필 이미지
lsch7210
질문자

감사합니다.

lsch7210님의 프로필 이미지
lsch7210

작성한 질문수

질문하기