Recently, I have been re-learning Python in my spare time, and I want to write a blog (or blogs) to document something new to me.

Stack

Surprisingly, there’s no stack type in Python. There are three ways to implement a stack in Python. Check out How to Implement a Python Stack:

  • list
  • collections.deque
  • queue.LifoQueue

List

List is built upon blocks of continous memory.

To implement a stack:

>>> stack = []
>>> stack.append(1)
>>> stack.append(2)
>>> stack.pop()
2
>>> stack.pop()
1

Deque

A deque is implemented as a doubly linked list.

>>> from collections import deque
>>> stack = deque()
>>> stack.append(1)
>>> stack.append(2)
>>> stack.pop()
2
>>> stack.pop()
1

Check out the deque methods to learn more.

method description
append(x) add x to the right side (end) of the deque
appendleft(x) add x to the left side (front) of the deque
pop() remove and return an element from the right side (end) of the deque
popleft() remove and return an element from the left side (front) of the deque

LIFO Queue

It is a little weird to call stack a LIFO queue, but it is what it is.

What surprises to me is that, in Python, the queue class is threat safe!

we will get back to this point in the Queue section later.

>>> from queue import LifoQueue

>>> stack = LifoQueue()
>>> stack.put(1)
>>> stack.put(2)
>>> stack.get()
2
>>> stack.get()
1

Queue

As mentioned above, queue is thread safe. In languages like Jave, we need to use a BlockingQueue.

You can also create a PriorityQueue, and put a tuple of (priority_number, ...data) to the pq.

Here are some useful methods:

  • put(): by default, block is True
  • get(): by default, block is True
  • task_done(): for each get() call, when the processing is done, invoke this method
  • join(): blocks until all items in the queue have been processed

Here’s a toy example that I got from the queue docs:

from threading import Thread, current_thread
from queue import Queue
import time
import random

def worker(q: Queue):
    while True:
        item = q.get()
        print(f'{current_thread().name} is working on item {item}')
        time.sleep(random.random())
        print(f'{current_thread().name} finished item {item}')
        q.task_done()

if __name__ == "__main__":
    q = Queue()
    for i in range(30):
        q.put(i)

    for _ in range(5):
        t = Thread(target=worker, args=(q,), daemon=True)
        t.start()

    q.join()
    print("All work completed")