Implementing an In-memory Queue 🧑‍💻

Tech Nov 4, 2022

A queue is a simple data structure but it has vast applications like simple queues, messaging systems, CPU scheduling, spooling ...😵‍💫

Yeah the list goes on... So let's move on.

Hey there! I am Sumit Badsara, a Backend Engineer at FamPay. And today we are going to implement an in-memory queue!

Problem

We use celery and Redis to execute async tasks. Currently, we are facing issues with Redis. When we pushed a lot of tasks in a short period, some of the tasks were getting dropped due to connection errors. But to understand the problem more simply, let's first understand the flow of celery tasks:

We have 3 main entities to understand:

Producer: Service which pushes task(s) to broker/queue.

Broker: The broker who provides a storage solution (eg: Redis/RabbitMQ etc)

Consumer: Service which will consume/execute the task(s) (eg: celery workers)

This is a very abstract overview of how things happen. Our main process (Django server) is where all the real stuff is happening. The producer (in this case main process) sends async tasks to the broker, and when consumers get to know about the task (they poll Redis for new tasks), they pick the tasks and execute them.

Now, what's the problem? Let's try to answer some simple questions:

What if a consumer crashes and is unable to process tasks?

Simple, Redis has persistent storage (not if you change the configuration to deliberately achieve the opposite). So, if consumers are down, Redis still knows which task to execute and which hasn't been picked yet by consumers. Once consumers are back up, they can fetch the pending tasks again from the broker.

... but what if the broker crashes?

Well...that's the interesting part. If the broker goes down, the consumers have nowhere to fetch tasks from, and the producer has nowhere to push tasks to. Now, if we try to push tasks to Redis, we will get this error:

OperationalError: Error 61 connection to $REDIS_URL:6379. Connection refused.

This is a problem!

Solution

So, after brainstorming this issue for some time, we realized: one way would be to send tasks only when Redis is up. Until then we won't send any tasks. Once it can receive tasks, most of our problems will be sorted.

Now, the new problem is to send tasks only when Redis is up 📮. That is doable, but what would be the best way to go about this? An intermediate in-memory queue? Well, why not? Let's see what happens if we add an in-memory queue before Redis.

The tasks are being added to the in-memory queue, which sends tasks to Redis (celery does that for us | refer: apply_async, delay) only when it can. Otherwise, we can try sending it again.

Pros:

  • We have a central location through which all tasks are being passed, we can customize even more things like retrying a prioritized task more number of times compared to a low-priority task which can be dropped if the broker is down.
  • We can bulk process tasks through the queue and add any type of logic in consumer; for example, Process every 10 seconds, Process after 100 tasks are in queue etc.
  • If Redis goes down, we are in a much better place as we won't be losing tasks due to connection errors.

Cons:

  • If Redis is down for a long period of time, tasks can pile up and start consuming more RAM (A basic task can be stored in 1-10KB size, so you can store up to 10^5 tasks in ~1GB), ultimately leading to server unresponsiveness. [Can still be handled through threshold warnings]
  • Restarting the server will be blocked by consumer (in-memory consumer) until all tasks have been sent to Redis (If graceful handling is implemented, otherwise you lose all those tasks).

Implementation

Cool! Now let's get our hands dirty and implement the idea that we discussed above.

First of all, we need the basic stuff - a global queue which lives in memory until the main process is stopped. We can create a class for this, a basic class would look something like this:

import queue

class Client(object):
	
	def __init__(self):
		self.queue = queue.Queue(1000)	
		
	def enqueue(self, task_name):
		message = {
			"task_name":task_name
		}
		
		try:
			self.queue.put(message, block=False)
			return True, message
		except queue.Full:
			raise queue.Full

Cool! That's a minimal client we can work with. Instance of this client will maintain a queue of size 1000, and provide a method enqueue to push tasks into queue.

But we need only 1 queue globally for our use case and hence a single instance should be maintained throughout the application. Now this is a small concern, how do we make sure that a global single instance is maintained for this client? Maybe we can initialize it when the main process starts? Yes, we can do that, but there is a simpler and better solution.

We will enqueue these tasks through a function which will make sure that a global instance of this client exists; otherwise, it will create one, so it will be initialized when 1st task is enqueued.

from .client import Client

default_client = None

__all__=["add_to_queue"]

def add_to_queue(task_name):
	"""
	Enqueues task in in-memory queue if global instance of client exist,
	otherwise initializes a single global instance and pushes the task.		
	"""
	global default_client

	if not default_client:
		default_client = Client()
	
	default_client.enqueue(task_name)

Nice! That looks good, we are maintaining a global client, and this is the only entry point for tasks into queue. We can now simply just add tasks to queue by invoking add_to_queue(task) .

Just a sec! Who is sending these tasks to Redis?

https://media.giphy.com/media/RkKMFRHh71RvhCYUx6/giphy.gif

Well, we need an in-memory consumer (Not to be confused with celery workers which consume Redis task) for that, who will consume tasks and send them to Redis. So, let's create a minimal consumer too:

from queue import Empty
from threading import Thread
from celery.exceptions import OperationalError
from time import sleep

class Consumer(Thread):
	"""
	A consumer which runs on a thread as a daemon process, 
	It consumes tasks from in-memory queue and sends them to Redis.	
	"""
	def __init__(self, queue):
		Thread.__init__(self)
		self.daemon = True
		self.running = True
		self.queue = queue
	
	def pause(self):
		"""
		Pauses thread execution
		"""
		self.running = False

	def run(self):
		"""
		Overriding run method of thread, whenever a thread is started
		run takes control over execution of thread.
		"""
		while self.running:
			self.process_queue()

	def process_queue(self):
		"""
		Processes tasks in queue, marks them as done and dequeues them.

		"""
		while True:
			try:
				item = self.queue.get(timeout=0.1, block=False)
				if item:
					try:
						self.process_task(item)
					except ValueError as e:
						raise e
					finally:
						self.queue.task_done()
			except Empty as e:
				raise e
	
	def process_task(self, item):
		"""
		Sends tasks to redis if it's not down, otherwise it retries 3 times.
		Even after retries, if it fails to push task to redis,
		it raises OperationalError exception
		"""
		task_name = item["task_name"]
		for retry in range(3):
			try:
				return task_name.apply_async()
			except OperationalError as e:
				if retry == 2:
					raise e
				sleep(2) # Can be modified according to requirement

We have created a consumer, which inherits from Thread, so the consumer runs on a different thread from MainThread. process_task and process_queue are self explanatory, so let's talk about other properties/functions.

  • run() - We are overriding Thread's run function, to indefinitely run this consumer and consume tasks from the in memory queue. Once thread is started, run functiontakes control.
  • self.daemon=True - This means that the thread will be running in the background and in a non-blocking way with respect to MainThread.
  • task_name.apply_async() - This is simply sending task to Redis , otherwise you can execute a normal task here as well by replace this to task_name() .

Finally! Everything seems to be in place, now we just need to tell client to run this consumer. Ideally, we should run this consumer when the client starts, so it totally makes sense to initialize this consumer in __init__ of Client.

import atexit

class Client(object):
	
	def __init__(self):
		# All the previous code (Refer above)
		...
		self.consumer = Consumer(self.queue)
		self.consumer.start()
		atexit.register(self.join)
	
	def join(self):
		"""
		Joins the consumer thread into main thread.
		"""
		self.consumer.pause() 
		self.consumer.join()

Yay 🎉 ! Thats all we need to run this queue, now we can add tasks to queue and the consumer should fetch and execute those tasks.

(If you didn't take a nap through this blog😪, it should work 😉 )

But why do we need all these fancy atexit.register() , self.join() functions ?

This is an instruction to Client class that if your instance is getting destroyed (MainThread is stopping in this scenario, as its a global instance), just run self.join function.

What does join do? It joins the consumer thread in the main thread. So now, that's what we call a graceful exit.

P.S. We took inspiration from segment library. Kudos to them for such an elegant implementation! 😍

Tags