인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

감바스님의 프로필 이미지

작성한 질문수

실전! Redis 활용

Pub/Sub

안녕하세요 Pub/sub과 분산 락에 대해 질문이 있습니다.

해결된 질문

작성

·

52

0

안녕하세요 강사님 우선 아래 질문한 내용에 대해 구글에서 이것저것 찾아보고 GPT도 다 돌려봤는데 도저히 이해가 안가서 강사님께 여쭤봅니다.

강의에서 분산 락과 Pub/Sub 개념을 복습한 후, 이전에 Redisson을 사용해 구현했던 분산 락 로직을 정리하고 있습니다. 그 과정에서 RedissonLock 클래스의 tryLock 메서드를 살펴보다가 PublishSubscribe 클래스의 subscribe 메서드까지 코드를 따라가게 되었습니다.

이와 관련하여 다음 몇 가지 질문이 생겼습니다:

  1. PublishSubscribe 클래스의 subscribe 메서드를 보면 세마포어를 사용하는데, 여기서 세마포어의 역할이 무엇인지 잘 이해가 되지 않습니다. 왜 이 시점에서 세마포어를 사용하는 건가요?

  2. CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); 코드에서 threadId로 무언가를 구독합니다. 정확히 무엇을 구독하는지 잘 모르겠습니다. 구독 대상과 구독의 목적이 무엇인지 궁금합니다.

  3. tryLock 메서드의 아래쪽 코드를 보면 while (true)로 락을 반복적으로 시도하는 모습이 마치 스핀락처럼 보입니다. 저는 Redisson의 분산 락이 Lettuce의 스핀락과는 다르다고 생각했는데, 실제 구현을 보니 스핀락과 비슷한 방식으로 동작한다고 봐도 될까요?

RedissonLock 클래스의 tryLock 메서드

    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        try {
            subscribeFuture.get(time, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.whenComplete((res, ex) -> {
                    if (ex == null) {
                        unsubscribe(res, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        } catch (ExecutionException e) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

PublishSubscribe 클래스

/**
 * Copyright (c) 2013-2022 Nikita Koksharov
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.redisson.pubsub;

import org.redisson.PubSubEntry;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.misc.AsyncSemaphore;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 
 * @author Nikita Koksharov
 *
 */
abstract class PublishSubscribe<E extends PubSubEntry<E>> {

    private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();
    private final PublishSubscribeService service;

    PublishSubscribe(PublishSubscribeService service) {
        super();
        this.service = service;
    }

    public void unsubscribe(E entry, String entryName, String channelName) {
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
        semaphore.acquire().thenAccept(c -> {
            if (entry.release() == 0) {
                entries.remove(entryName);
                service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName))
                        .whenComplete((r, e) -> {
                            semaphore.release();
                        });
            } else {
                semaphore.release();
            }
        });
    }

    public void timeout(CompletableFuture<?> promise) {
        service.timeout(promise);
    }

    public void timeout(CompletableFuture<?> promise, long timeout) {
        service.timeout(promise, timeout);
    }

    public CompletableFuture<E> subscribe(String entryName, String channelName) {
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
        CompletableFuture<E> newPromise = new CompletableFuture<>();

        semaphore.acquire().thenAccept(c -> {
            if (newPromise.isDone()) {
                semaphore.release();
                return;
            }

            E entry = entries.get(entryName);
            if (entry != null) {
                entry.acquire();
                semaphore.release();
                entry.getPromise().whenComplete((r, e) -> {
                    if (e != null) {
                        newPromise.completeExceptionally(e);
                        return;
                    }
                    newPromise.complete(r);
                });
                return;
            }

            E value = createEntry(newPromise);
            value.acquire();

            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.acquire();
                semaphore.release();
                oldValue.getPromise().whenComplete((r, e) -> {
                    if (e != null) {
                        newPromise.completeExceptionally(e);
                        return;
                    }
                    newPromise.complete(r);
                });
                return;
            }

            RedisPubSubListener<Object> listener = createListener(channelName, value);
            CompletableFuture<PubSubConnectionEntry> s = service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, listener);
            newPromise.whenComplete((r, e) -> {
                if (e != null) {
                    s.completeExceptionally(e);
                }
            });
            s.whenComplete((r, e) -> {
                if (e != null) {
                    value.getPromise().completeExceptionally(e);
                    return;
                }
                value.getPromise().complete(value);
            });

        });

        return newPromise;
    }

    protected abstract E createEntry(CompletableFuture<E> newPromise);

    protected abstract void onMessage(E value, Long message);

    private RedisPubSubListener<Object> createListener(String channelName, E value) {
        RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {

            @Override
            public void onMessage(CharSequence channel, Object message) {
                if (!channelName.equals(channel.toString())) {
                    return;
                }

                PublishSubscribe.this.onMessage(value, (Long) message);
            }
        };
        return listener;
    }

}

 

감사합니다.

 

 

답변 2

0

신동현님의 프로필 이미지
신동현
지식공유자

안녕하세요, Java를 주력 언어로 사용하고 있지 않아서 정확한 내용은 아닐 수도 있지만, 코드를 보고 저의 생각을 남겨봅니다. Redisson은 전반적으로 Pub/Sub과 세마포어를 이용해 Redis 서버에 대한 부하를 줄이려는 방식으로 디자인 된 것 같습니다.

1. 세마포어는 일반적으로 동시성 문제를 제어하기 위해 사용하는데요. Redis 서버로 너무 많은 요청을 보내는 것을 방지하기 위해 애플리케이션 레벨에서 요청량을 제한하기 위해 사용하는 것으로 추측됩니다. 세마포어에 대한 자세한 내용은 링크를 참고 부탁드립니다.

2. Redis Pub/Sub 채널을 통해 락이 해제되는 이벤트를 구독하는 것 같습니다.

3. while 문을 통해 일정 기간 동안 락 획득을 반복적으로 시도하지만 스핀락과는 조금 다릅니다. 일반적으로 스핀락이라고 하면 락 획득을 성공할 때까지 계속 시도하는 반면에 여기서는 제한된 시간 안에서만 반복적으로 락 획득을 시도하다가 실패하면 false를 반환합니다.

 

감바스님의 프로필 이미지
감바스
질문자

안녕하세요 강사님 우선 답변 감사합니다. 추가적인 질문이 있어 이렇게 댓글을 남겼습니다.

  1. 얘기해주신 내용과 첨부해주신 링크를 보고 든 생각은 redis는 싱글 스레드로 동작하기 때문에 만약 너무 많은 요청이 오면 병목이 발생할거 같다고 생각 했습니다. 특히 분산 락 같은 경우에는 락을 얻기 위해 많은 요청이 오기에 redisson은 이러한 부분을 위해 세마포어 개념을 활용해서 요청 스레드가 특정 개수 이하가 되도록 관리한다 라고 이해를 했는데 이러한 이해가 맞게 이해를 한건지 궁금합니다.

  2. 1번 질문과 연관이 있는데요 세마포어 같은 경우에는 특정 개수 이하의 스레드들은 공유 자원에 접근이 가능하다고 알고 있습니다. 그런데 분산 락 같은 경우에는 락을 하나만 얻어야 하지 않나요? 세마포어를 활용하면 여러 스레드가 동시에 분산 락을 얻어버리는게 아닌지 궁금합니다.

     

신동현님의 프로필 이미지
신동현
지식공유자

답변 드립니다.

  1. 네, 맞습니다.

  2. 세마포어는 Java 애플리케이션에서 Redis로 락 획득 요청을 보내는 쓰레드 개수를 제한하는 역할을 합니다. 세마포어를 획득했다고 모두 레디스 분산락 획득이 가능한 것은 아니고, 세마포어를 획득한 쓰레드 중 1개만 레디스 분산락을 획득할 수 있습니다.

0

인프런 AI 인턴님의 프로필 이미지

안녕하세요, 인프런 AI 인턴입니다.
감바스님께서 질문하신 Redisson의 분산 락 구현과 관련된 여러 가지 질문에 대해 설명하도록 하겠습니다.

1. 세마포어의 역할

PublishSubscribe 클래스의 subscribe 메서드에서 사용되는 세마포어는 락 메커니즘의 일환으로, 동시성 제어를 위해 사용됩니다. 여러 스레드가 동일한 리소스에 접근하려고 할 때, 세마포어는 특정 순간에 허용되는 스레드 수를 제한함으로써 Race Condition 등의 문제를 방지합니다. 세마포어를 사용하는 이유는 네트워크 및 클라우드 환경에서 다수의 구독과 해지가 빈번히 발생할 수 있으며, 이러한 과정에서 안정적인 리소스 관리를 위해 필요합니다.

2. CompletableFuture의 구독 대상과 목적

CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); 코드에서 threadId를 사용하여 구독하는 것은 특정 스레드가 락 상태 변화를 비동기적으로 감지할 수 있도록 설정하는 과정입니다. RedissonLockEntry는 특정 스레드가 락을 얻으려고 대기하는 동안 상태 정보를 보관하게 됩니다. 구독의 목적은 스레드가 분산 환경에서 락 획득 요청의 결과를 기다리고, 락 해제 등을 감지하여 적절한 동작을 수행할 수 있도록 하는 것입니다.

3. 스핀락 유사성

tryLock 메서드에서 while (true) 루프는 락의 상태를 반복적으로 확인함으로써, 스레드가 락을 얻을 수 있도록 대기하는 구조를 구현합니다. 이는 CPU 사이클을 지속적으로 소모하는 스핀락과 유사해 보이지만, Redisson의 구현은 네트워크 기반의 Pub/Sub 메커니즘을 활용하여 락 상태 변화를 비동기적으로 통지받을 수 있게 되어 있습니다. 따라서 비효율적인 Busy Waiting을 방지하고, 효율적으로 락을 관리합니다.

추가적인 자료와 유사한 질문에 대한 게시물을 통해 더 깊이 있는 이해를 도울 수 있습니다:

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.