본문 바로가기
Algorithm/학습

[Python] PriorityQueue vs heapq

by famo1245 2025. 2. 17.

문제 상황


[Python] 백준 - 1753번 최단경로(3)

 

[Python] 백준 - 1753번 최단경로(3)

이전글[Python] 백준 - 1753번 최단경로(2) [Python] 백준 - 1753 최단경로(2)이전글[Python] 백준 - 1753번 최단경로(1) [Python] 백준 - 1753번 최단경로(1)문제백준 1753번최종 코드import sysimport mathfrom collections im

devfromyoung.tistory.com

 

이전글에서 백준에서 pypy3 환경으로 제출했을 때 Priority Queue 사용했을 때 런타임 에러가 발생했었다.

(한참 전에 조사했었지만..)

찾아보니 예전에는 백준에서 python3 환경으로 제출했을 때 Priority Queue 사용이 막혀있었고 현재는 pypy3 환경에서 런타임 에러가 발생하는 것 같았다.

관련 글

 

ProrityQueue? heapq?


 

그렇다면 비슷한 기능을 하는 두 라이브러리의 차이점이 무엇인지 궁금해져 내부 코드를 보았다.

 

우선 해당 라이브러리에서 Priority Queue의 구현을 찾아봤다.

class PriorityQueue(Queue):

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)

 

PriorityQueue class 또한 내부적으로 heapq의 메서드를 사용하고 있었고 처음에는 'heapq 보다 더 많은 기능을 제공하기 위해서인가?'라는 생각이 들었다.

그러던 중 PriorityQueue가 Queue class를 상속받고 있다는 점을 발견하고 Queue class의 구현을 자세히 들여다보게 되었다.

 

아래는 Queue class의 구현 중 일부이다.

class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

	...

    def put(self, item, block=True, timeout=None):
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def get(self, block=True, timeout=None):
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item
    
    ...

 

Queue class 내부에서 _put()_get() 메서드를 사용하고 있고 PriorityQueue class에서 이를 override 했던 것이다. 또한 __init__() 메서드를 보면 mutex lock과 3가지 condition 객체를 생성하고 이를 put()get() 메서드에서 사용하고 있다는 것을 알 수 있었다. 따라서 PriorityQueue class는 단순히 heapq 라이브러리의 기능 확장이 아닌, 동기화 메커니즘을 이용한 thread safe한 자료구조라는 것을 알았다.

 

PriorityQueue를 사용한 경우
heapq를 사용한 경우

 

이전에 백준 1753번 문제를 풀 때 PriorityQueue와 heapq 둘 다 사용하여 풀었었다. 이때 PriorityQueue를 이용한 경우 시간이 더 걸렸었고 이유가 궁금했었는데, 결국 해당 class의 동기화 과정때문에 발생한 오버헤드때문에 생긴 시간 차이라는 것을 알 수 있었다.

 

멀티 스레드 환경에서 개발을 한다면 PriorityQueue를 고려하겠지만, 알고리즘 문제를 풀 때는 사실 멀티 스레드 환경을 고려할 필요가 없으므로 앞으로는 더 빠른 heapq를 사용할 것 같다!

 

틀린 부분 또는 궁금한 점은 댓글로 남겨주세요! 항상 환영합니다!