인프런 커뮤니티 질문&답변

오성환 (highRPM)님의 프로필 이미지
오성환 (highRPM)

작성한 질문수

Kevin의 알기 쉬운 RxJava 1부

Observable과 Flowable에 대한 이해

이해가 잘 안가서 질문하나 드려요.

작성

·

486

0

DROP_LATEST 의 경우 강사님의 설명에서는 onBackPressureBuffer의 첫번째 매개변수인 capacity에 128 의 값을 넣으면  127개까지 정상적으로 발행이 되고 그 후에 나오는 overflow라는 메시지를 보면서 128, 129 가 오버플로 되고 있다고 설명을 하셨습니다. 그리고 그 오버플로우 돼서 나간 값인 128, 129 가 다른곳에 저장 되어있다가 버퍼에 들어온다고 말씀하셨습니다. (버퍼를 지정했는데 그 버퍼 외의 공간에 overflow된 값을 저장하는 것도 말이 안됨)

하지만 이게 맞지 않는 것 같습니다.

일단 127까지(capacity에 무관) 값이 발행되면  그 이후에 들어오는 값들을 capacity만큼 버퍼에 쌓게 됩니다.

현재 발행 속도는 1ms 이므로 0.001초마다 발행이 되어야하죠.

capacity를 256으로 하였다면

256만큼 버퍼에 쌓여야하니 대략 0.256초 이후에 오버플로우가 발생 할 겁니다.

0.257 이후에 오버플로우가 발생합니다. 그리고 255 이후인 값들은 들어오지 못하고 최신값부터 버려지겠죠. 

127+255 는 정확히 382 입니다. 여기까지 버퍼에 들어가서 순차적으로 처리하겠죠. 그 이후는 오버플로우 되어서 최신의 데이터가 삭제 되니. 

이렇게  382에서  583으로 점프합니다.

해당 부분의 강의가 어떻게 하려고해도 이해가 안돼서 검색을 해보니 저런 의미로 설명이 되어있더라고요.

여기에 대한 명확한 설명을 부탁드립니다. 44,000원 주고 강의를 신청했는데 초장부터 잘못된 정보로 강의를 듣다보니 믿음이 좀 깨집니다. 

정확한 답변 부탁드리겠습니다.

답변 10

1

Kevin님의 프로필 이미지
Kevin
지식공유자

네, 감사합니다. 어떤 질문이나 이슈가 생기면 서로 논의 해가면서 같이 해결하고 또 성장 해나갔으면 좋겠습니다 성환님 ^^

또 뵐게요~

1

바빠서 이제야 제대로 봤습니다 ㅎㅎ 답변 감사합니다. 이제 이해가 탁 되네요. 앞으로도 잘 부탁드릴게요!

1

Kevin님의 프로필 이미지
Kevin
지식공유자

성환님, 안녕하세요? 정확한 답을 찾은지라 말씀 드리려고 잠깐 들어왔습니다. 일단 코드부터 먼저 보면서 설명 드릴게요.

아래는 Buffer 전략 중에 DROP_LATEST 전략의 진행 과정입니다.

로그를 저렇게 세번을 출력하니까 설명 드리기가 제일 편한거 같더라구요. interval()에서 통지한 것을 한번 로그로 출력하고, onBackpressureBuffer()에서 다시 통지한 것을 로그로 출력합니다. 마지막으로 subscribe()내의 onNext()에서 전달 받은 값을 출력하구요. 

아래는 실행 결과 인데요. 위 코드상에서 생산자 쪽에 요청하는 데이터 갯수는 observeOn()에서 보시면 1이구요. onBackpressureBuffer()에서 버퍼가 담을 수 있는 데이터 갯수는 2입니다.

(1) 33.103초에 interval()에서 0을 통지하고, 33.104초에 버퍼에서 곧바로 소비자 쪽에 다시 0이 통지가 됩니다.

(2) 이때 통지된 0은 34.107초에 onNext()에서 1초 동안 처리가 되구요.

(3) (1)과 (2) 사이의 타임 구간에서는 interval()에서 통지한 데이터들이 0.3초에 한번씩 병렬로 통지가 되는데 33.403초에 1이 버퍼에 채워지고, 그 다음 0.3초후인 33.702초에는 2가 버퍼에 채워집니다. 버퍼 용량이 2개까지이므로 이제 버퍼는 가득찼습니다.

(4) 그렇기때문에 34.001초에 통지된 데이터 3은 버퍼안에 채워지지 못하고, overflow가 발생했다는 사실을 로그로 출력합니다.이 시점에 버퍼에 안에 나중에(가장 최근에) 채워진 데이터 2가 DROP이 되고, 버퍼밖에 있던 3이 버퍼안에 채워집니다. 이제 버퍼안에는 1과 3이 있습니다.

(5) 34.107초에 소비자쪽에서 데이터 0이 처리가 완료 되기때문에 곧바로 버퍼에서 1이 통지가 됩니다. 이제 버퍼에는 3이 있습니다.

(6) 데이터 1이 34.108초에 소비자쪽에서 1초 동안 처리를 하는 동안 (5)와 (6) 구간 사이에서 34.301초에 interval()에서 통지된 4가 버퍼에 채워집니다. 이제 버퍼에 3과 4가 있습니다.

(7) 0.3초 뒤인 34.603초에 interval()에서 5가 통지가 되지만 버퍼에 3과 4가 있어서 가득 찼기때문에 overflow 발생 로그를 출력합니다. 이 시점에 버퍼 안에 있는 가장 최근에 버퍼안에 채워진 데이터 4가 DROP 되고, 그 자리에 5가 채워집니다. 이제 버퍼에는 3과 5가 있습니다.

(8) 그 다음에는 34.902 초에 interval()에서 6이 통지가 되는데 버퍼에 3과 5가 들어있기때문에 가장 최근에 버퍼에 채워진 5가 다시 DROP이 되고 그 자리에 6이 채워집니다.

나머지는 설명을 안해도 아실꺼 같아서. ^^;

결국 제가 착각한것 중에 하나가 버퍼 안에서 가장 최신의 데이터가 DROP 된다고 생각 못하고 버퍼 밖에 대기하는 데이터가 DROP 된다고 생각했던게 잘못된 결과를 출력한 가장 큰 원인 같아요.

정말 죄송하게 생각하구요. DROP_OLDEST 전략은 버퍼에 채워진 데이터 중에서 가장 먼저 채워진(OLDEST) 데이터가 DROP됩니다. 

강의 자료는 다른 배압 전략들도 좀 수정을 했는데 레코딩 할 시간이 없어서 아마 주말 동안에 레코딩하고 편집해서 수정된 강의 영상을 업로드 할수있을거 같네요.

성환님한테 어느 정도 명확한 설명을 해줄 수 있어서 참 다행이고, 그렇게 해줄 수 있는건 다 성환님 덕분입니다. ^^;

저에 대한 신뢰가 많이는 아니고 조금만 사라졌기를 바래볼게요. 그럼 수정된 영상 업로드 되면 다시 공지 드릴겠습니다.

질문/답변 게시글만이지만 좋은 얘기 나눌 수 있어서 좋았구요. 그럼 또 뵐게요.

1

Kevin님의 프로필 이미지
Kevin
지식공유자

아닙니다. 제가 잘못한 부분이 있으니 바로 잡는게 맞으니까요. 그런데 업무 시작하려니 뭔가 또 좀 찝찝해서 DROP_LATEST 전략과 DROP_OLDEST 전략의 docs 문서를 최종 확인해보니 위에서 설명한 부분도 100퍼센트 맞지는 않는거 같습니다. 최종적으로 다시 확인이 되면 답글 남길게요. 감사합니다.

1

Kevin님의 프로필 이미지
Kevin
지식공유자

성환님, 안녕하세요? 아침 일찍 출근해서 생산자 쪽 통지 시간을 조금 더 느리게 해서 테스트 후, 설명을 추가하였습니다.

이해가 되시길 바라고, 나머지 Backpressure 전략 쪽은 잘못된 부분이 있으면 최대한 빨리 수정해서 코드도 github에 push하고 영상도 수정 업로드 하겠습니다. 우선은 Backpressure쪽은 건너뛰시고 학습을 좀 부탁드릴게요.

그럼 저는 업무를 좀 하고, 쉬는 시간에 나머지 코드를 다시 좀 살펴봐야겠네요. 오늘 하루도 화이팅 하세요.

public class BackpressureBufferExample01 {
public static void main(String[] args){

Flowable.interval(500L, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(
1,
() -> Logger.log(LogType.PRINT, ""),
BackpressureOverflowStrategy.DROP_LATEST)
// .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);

TimeUtil.sleep(4000L);
}
}

1

Kevin님의 프로필 이미지
Kevin
지식공유자

성환님, 답변이 늦어져서 죄송하구요. 전혀 기분 나쁘지 않았고 죄송해 하지 않으셔두 되니까 편하게 생각하셔요.

일단 제가 강의를 레코딩 하는 시점까지 제가 잘못 이해하고 있었던게 좀 있었던거 같습니다. BackPressure쪽 4개 설명드린 영상은 잘못된 부분이 더 있으면 바로 잡고, 더 쉽고 정확하게 레코딩을 다시 올릴테니 조금만 기다려주시구요.

우선 아래 코드로 다시 설명을 드리겠습니다.

Flowable.interval(300L, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(
1,
() -> Logger.log(LogType.PRINT, ""),
BackpressureOverflowStrategy.DROP_LATEST)
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);

TimeUtil.sleep(7000L);

기존 코드에서 onBackpressureBuffer( )의 세번째 파라미터에서 로그를 출력하는 부분과 doOnNext()로 로그를 출력하는 부분은 이해하는데 오히려 더 헷갈리실거 같아서 지웠습니다.

우선 말씀드려야할게 onBackpressureBuffer( )의 첫번째 파라미터인데요. 이 첫번째 파라미터는 버퍼의 용량 즉, 버퍼가 수용할 수 있는 크기가 맞긴합니다. 그런데 observeOn()에 대한 설명은 빼버린채 설명을 드려서 잘못된 설명이 되어버렸네요.

그리고 overflow 되어서 버퍼에 담기지 못한 데이터를 저장한다는 말씀을 드린적은 없지만(기억해둔다는 표현을 썼더라구요.) 어쨌든 이 부분도 설명이 잘못되었습니다.

최종적으로 버퍼의 크기를 좀 줄이고 통지되는 속도도 조금 넉넉하게 한 위 소스코드로 틀린 부분을 바로 잡아서 말씀을 드릴게요.

그럼 동작 순서를 단계적으로 적어보겠습니다.

=======================================================================================

- 소비자 쪽에서 구독을 하게되면 observeOn()의 세번째 파라미터를 1로 지정했기때문에 버퍼에 담긴 데이터를 한개씩만 전달해달라고 요청합니다.

- interval 함수로 인해서 생산자 쪽에서 0.3초 뒤에 데이터 0을 통지합니다. 통지된 데이터는 버퍼에 담긴 후에 소비자 쪽에 전달됩니다.

- 소비자 쪽에서는 sleep()로 인해 전달 받은 데이터 0을 처리하는데 1초의 시간이 걸립니다.

- 소비자 쪽에서 1초동안 데이터를 처리함으로 아직 다음 데이터를 통지 받을 준비가 되어있지 않기때문에 소비자 쪽에서 데이터 0을 처리하는 1초가 지나기 전까지는 생산자 쪽의 버퍼는 비워지지 않습니다.

- 소비자 쪽에서 1초동안 데이터 0을 처리하는 동안 생산자쪽에서 0.3초 뒤에 1을 통지하여 버퍼에 담으려고하지만 버퍼의 수용 용량이 1이고, 아직 버퍼가 비워지지 않았기때문에 데이터 1은 drop 됩니다.

- 생산자 쪽에서 0.3초 뒤에 2를 통지하지만 여전히 소비자쪽에서 데이터 0을 처리하는 1초가 지나지 않았기때문에 버퍼가 비워지지 않은 상태라 데이터 2는 drop 됩니다.

- 생산자 쪽에서 다시 0.3초 뒤에 3을 통지하지만 이 시점에는 0.9초가 지난 시점이라 소비자쪽에서 데이터 0을 처리하는 1초가 지나지 않았으므로 데이터 3 역시 drop 됩니다.

- 생산자 쪽에서 0.3초 뒤에 4를 통지하는 시점에는 소비자 쪽에서 데이터 0을 처리하는 1초가 지났기때문에 버퍼가 비워진 상태이고, 가장 최근에 drop(drop_latest) 된 데이터인 3이 버퍼에 담긴 후, 소비자 쪽에 전달됩니다.

- 이런식으로 위 과정을 반복합니다.

=======================================================================================

이해가 되셨나 모르겠습니다.  육아 휴직 기간동안 몸도 마음도 피곤한 상태에서 리액티브 프로그래밍 영역 중에서 제일 이해하기 어려운 부분 중에 하나인 backpressure 쪽을 레코딩을 하다보니 결과적으로 잘못된 정보를 전달을 하게되었네요.

이유야 어쨌든 전적으로 제가 잘못한 부분입니다. 이 부분은 전체 수강생들에게 공지로 알리고, 틀린부분에 대해서는 레코딩을 다시 해서 올릴게요. 

제가 사정상 유료로 강의를 올리긴했지만 성환님 말씀대로 한국어로 된 RxJava 강의는 전무하고 그래서 다른분들이 RxJava로 리액티브 프로그래밍에 조금 더 쉽게 입문 하시기를 바라는 마음이 더 크긴하거든요. 

제가 잘못한 부분 지적해주셔서 너무 감사드리구요. 성환님 덕분에 앞으로 뒤에 들어오실 수강생 분들한테 피해 주지 않고, 도움이 될 수 있게 된거 같아요. 정말 감사드릴게요.

열심히 학습하시는 모습이 너무 보기좋은데 꼭 좋은 개발자로 성장하시길 바래보겠습니다.

Backpressure쪽은 4개 강의를 다시 검토해보고 틀린 부분도 있으면 바로 잡고, 위 예제 코드처럼 조금 더 이해하기 쉽게 레코딩을 다시 하도록할게요.

질문 게시판이긴하지만 좋은 이야기를 서로 주고 받아서 참 기쁘네요. 성환님한테 오히려 배움을 받은거 같아서 좋습니다.

제가 설명 드린 부분에 또 이상한 부분이 있으면 말씀해주시면 감사드리고, 그럼 또 뵐게요.

 

1

Kevin님의 프로필 이미지
Kevin
지식공유자

안녕하세요? 질문 남겨주셔서 감사드립니다.

초저녁에는 육아때문에 바로 구체적인 답변 드리기가 힘든데 아기 재우고 확인 후에 다시 답변 드릴게요.

아, 그리고 비싼 수강료 지불하고 듣는 강의인데 어떤 이유이든지 간에 믿음이 깨져서는 안될테니 그러지 않도록 노력해보겠습니다.

금일 중으로 다시 답변 드릴테니 조금만 기다려 주시겠어요?

감사합니다.

0

넵 제가 생각하기에는 DROP_LATEST는 가장 최신걸 제거하는 스택같은 느낌이고

DROP_OLDEST는 오래된 데이터부터 제거하는 것이니 큐 를 의미하는 것 같네요.

0

정말 친절한 답변 감사합니다. ㅎㅎ

올려주신 코드 보면서 이해해보겠습니다.

시간내서 확인해주셔서 감사합니다.

좋은 하루보내세요!!

0

넵 천천히 달아주세요. 그리고 제가 감정적인 말투로 댓글을 달아서 기분 나쁘셨다면 죄송합니다...

한국어로 된 RxJava강의는 케빈님 외에 없어서 설레는 가슴으로 듣다가 보니 저렇게 됐네요 ㅠ

감사합니다.

오성환 (highRPM)님의 프로필 이미지
오성환 (highRPM)

작성한 질문수

질문하기