Building a job queue
Table of Contents
What even is a job queue? #
Let’s break this down into its pieces: a “job” and a “queue”.
A job is just an arbitrary unit of work, basically some code or procedure you want to run.
Fundamentally, a queue is a data structure that works in a first-in, first-out (FIFO) manner. There’s actually nothing about it specific to jobs, and it can be used for all sorts of things.
So then a job queue is just a way to tee up jobs and process them in the order they were added.
That’s the core of it!
So why use a job queue? #
There are a few reasons why you might want to use a job queue:
- you want to run tasks in the background or async, and don’t want to block your main application
- you expect a high volume of work and want to prepare for eventual scaling
- you want to prioritize certain items over others based on some criteria
Any of these or any other reason is totally valid.
Implementing a job queue #
Ok, let’s go ahead and build one! We’ll keep it very simple.
Initial setup #
Let’s go ahead and install Homebrew using these instructions.
Then let’s install Python 3.10 or above by running brew install python
in a terminal.
Install the Python package manager uv
using the official instructions.
We’ll also need Docker, so please install it using the official instructions.
Use any workspace directory you’d like, and start a uv
project:
cd /path/to/your/workspace
uv init simple-job-queue
Redis #
We’re going to use Redis as our job queue. Run redis using docker:
docker run --name redis -d -p 6379:6379 redis
Then, we’ll need the Redis client for Python. Use uv
to install it:
uv add "redis[hiredis]"
We’ll also need to be able to connect to this client properly. Create a .env
file with the following contents:
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
Also install python-dotenv
to be able to load our environment variables:
uv add python-dotenv
We’re going to create a small Python wrapper around the Redis client. Let’s create a file called redis_wrapper.py
.
We’ll basically just load our env vars and connect to Redis.
redis_wrapper.py
1
import os
import redis
class RedisWrapper(object):
def __init__(self):
redis_host = os.getenv("REDIS_HOST")
redis_port = os.getenv("REDIS_PORT")
redis_db = os.getenv("REDIS_DB")
if not redis_host:
raise ValueError("REDIS_HOST is not set")
if not redis_port:
raise ValueError("REDIS_PORT is not set")
if not redis_db:
raise ValueError("REDIS_DB is not set")
self.client = redis.Redis(
host=redis_host,
port=int(redis_port),
db=int(redis_db),
decode_responses=True,
)
While we’re here, let’s make this a singleton for ease of use.
We’ll add a get_instance
method to the class and also add a private class variable _instance
to store the singleton instance.
redis_wrapper.py
2
class RedisWrapper(object):
_instance = None
# ... previous contents
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
Great! You can now use this wrapper to connect to Redis if you wish.
Job data #
Ok, now let’s define what a job looks like.
We’ll use Pydantic to define a structure. Install it using uv add pydantic
.
Create a file called job_data.py
.
Let’s define the structure like so. We’ll need to know what status the job is in, when the job was defined, when it started executing, and when it completed.
We’ll also create an arbitrary payload to store any data we need during job execution.
job_data.py
1
from pydantic import BaseModel
from datetime import datetime
from enum import StrEnum
from typing import Any
class JobStatus(StrEnum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class JobData(BaseModel):
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
payload: dict[str, Any]
Looking a bit ahead, we’ll also need to make this structure easily serializable to JSON and back.
We can do this by adding the following methods:
job_data.py
2
from pydantic import BaseModel, field_serializer, field_validator
import json
# ... previous imports
class JobData(BaseModel):
# ... previous contents
@field_serializer("payload", when_used="json")
def serialize_payload(self, payload: dict[str, Any]):
return json.dumps(payload)
@field_validator("payload", mode="before")
def deserialize_payload(cls, value: str | dict[str, Any]):
if isinstance(value, str):
return json.loads(value)
return value
Cool, we’ve got our job data structure!
Job store #
Now let’s create a job store to persist our jobs. This is a bit separate from the job queue itself. It’s responsible for tracking jobs and their metadata.
It’s important we have this because jobs will not live forever in the queue and because we want to minimize the amount of data we store in the queue.
We’re going to reuse Redis for this, and use a concept called a “hash”.
Let’s create a file called job_store.py
. We’ll create a class called JobStore
that will handle all the job storage logic.
We can reuse our Redis wrapper we created earlier!
from redis_wrapper import RedisWrapper
class JobStore:
def __init__(self):
self._redis = RedisWrapper.get_instance()
Now let’s add a method to create a job. Note that we have to drop any None
fields because Redis doesn’t support them.
def create_job(self, job_data: JobData) -> None:
self._redis.client.hset(
name=f"job_data:{job_data.id}",
mapping=job_data.model_dump(mode="json", exclude_none=True),
)
We’ll add another method to get a job by ID and one to update the job’s status. While we’re doing this, let’s factor out the job key prefix building.
job_store.py
1
JOB_DATA_KEY_PREFIX = "job_data"
class JobStore:
# ... existing contents
def _get_job_key(self, job_id: str) -> str:
return f"{JOB_DATA_KEY_PREFIX}:{job_id}"
def get_job(self, job_id: str) -> JobData | None:
job_data = self._redis.client.hgetall(name=self._get_job_key(job_id))
if not job_data:
return None
return JobData.model_validate(job_data)
def update_job_status(self, job_id: str, status: JobStatus) -> None:
self._redis.client.hset(
name=self._get_job_key(job_id),
key="status",
value=status.value,
)
Finally let’s add methods to mark when a job started and when it completed.
job_store.py
2
class JobStore:
# ... existing contents
def set_job_started_at(self, job_id: str, started_at: datetime) -> None:
self._redis.client.hset(
name=self._get_job_key(job_id),
key="started_at",
value=started_at.isoformat(),
)
def set_job_completed_at(self, job_id: str, completed_at: datetime) -> None:
self._redis.client.hset(
name=self._get_job_key(job_id),
key="completed_at",
value=completed_at.isoformat(),
)
Ok, there we go! We’ve got our job store!
Job queue #
This is the key ingredient of our work here. Recall that the queue has to process jobs in a first-in, first-out order.
We’re going to use Redis for this and use the list data structure.
When we enqueue a job, we’ll add it to the “back of the line” and when we pick up a job to process, we’ll remove it from the “front of the line”.
What is front and what is back? It’s arbitrary as long as we’re consistent. Let’s use the left-hand side of the list as the back and the right-hand side as the front:
We’ll create a file called job_queue.py
.
job_queue.py
1
from redis_wrapper import RedisWrapper
JOB_QUEUE_KEY = "job_queue"
class JobQueue:
def __init__(self):
self._redis = RedisWrapper.get_instance()
def push_job(self, job_id: str) -> None:
self._redis.client.lpush(
JOB_QUEUE_KEY,
job_id,
)
def pull_next_job(self) -> str | None:
return self._redis.client.rpop(name=JOB_QUEUE_KEY)
That’s it, not complicated at all because Redis does all the heavy lifting for us and because we’ve offloaded metadata storage to the job store.
For utility, we can also add the following to know whether there are any jobs in the queue and peek at the next job to process. Redis LLEN and LINDEX will do the trick, respectively.
job_queue.py
2
class JobQueue:
# ... previous contents
def peek_next_job(self) -> str | None:
return self._redis.client.lindex(name=JOB_QUEUE_KEY, index=-1)
def get_job_count(self) -> int:
return self._redis.client.llen(name=JOB_QUEUE_KEY)
Simulating job processing #
Awesome, we have the essentials, now let’s simulate job processing.
We’ll create a job producer and a job consumer. The producer will create jobs and enqueue them, and the consumer will dequeue them and process them.
Let’s create a file called producer.py
. It has a few pre-baked job variations and will create jobs every 5 seconds.
producer.py
import uuid
import random
import time
from datetime import datetime
from typing import Any
from dotenv import load_dotenv
from job_data import JobData, JobStatus
from job_queue import JobQueue
from job_store import JobStore
class JobProducer:
"""Handles creating jobs and pushing them to the queue."""
def __init__(self, job_store: JobStore, job_queue: JobQueue):
self.job_store = job_store
self.job_queue = job_queue
def create_job(self, payload: dict[str, Any]) -> str:
"""Create a new job and add it to the queue."""
job_id = str(uuid.uuid4())
job = JobData(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.now(),
payload=payload,
)
self.job_store.create_job(job)
self.job_queue.push_job(job_id)
print(f"Created job with ID: {job_id}")
return job_id
def main():
"""Run the producer to continuously create random jobs."""
print("=== Job producer ===\n")
print("Press Ctrl+C to stop.\n")
load_dotenv()
job_store = JobStore()
job_queue = JobQueue()
producer = JobProducer(job_store, job_queue)
# Function to generate random job payloads
def get_random_job() -> dict[str, Any]:
job_type = random.randint(1, 5)
# Email Notification
if job_type == 1:
user_id = random.randint(1000, 9999)
email = f"user{user_id}@example.com"
subjects = ["Hello", "Welcome", "Notification", "Alert", "Update"]
subject = f"{random.choice(subjects)}_{random.randint(1, 100)}"
return {"task_name": "send_email", "to": email, "subject": subject}
# Data Processing
elif job_type == 2:
file_id = random.randint(1, 1000)
file_types = ["csv", "json", "xml", "txt"]
file_path = f"/tmp/data_{file_id}.{random.choice(file_types)}"
return {"task_name": "process_data", "file_path": file_path}
# Failed Job
elif job_type == 3:
should_fail = random.choice([True, False])
severity = random.randint(1, 5)
return {
"task_name": "risky_operation",
"should_fail": should_fail,
"severity": severity,
}
# Image Resizing
elif job_type == 4:
width = random.choice([800, 1024, 1280, 1920])
height = random.choice([600, 768, 1080])
quality = random.randint(70, 100)
return {
"task_name": "resize_image",
"width": width,
"height": height,
"quality": quality,
}
# Database Backup
else:
db_names = ["users", "products", "orders", "payments", "logs"]
db_name = random.choice(db_names)
backup_type = random.choice(["full", "incremental", "differential"])
return {
"task_name": "backup_db",
"db_name": db_name,
"backup_type": backup_type,
}
created_count = 0
try:
while True:
# Get a random job with already randomized payload
payload = get_random_job()
# Create the job
producer.create_job(payload)
created_count += 1
print(f"\nCreated job with payload: {payload}")
print(f"Queue now has {job_queue.get_job_count()} job(s) waiting.")
# Wait for 5 seconds before creating the next job
time.sleep(5)
except KeyboardInterrupt:
print(f"\nProducer stopped. Created {created_count} job(s) in total.")
if __name__ == "__main__":
main()
We’ll also create a file called consumer.py
.
consumer.py
import random
import time
import signal
from datetime import datetime
from typing import Any
from dotenv import load_dotenv
from job_data import JobStatus
from job_queue import JobQueue
from job_store import JobStore
def task_handler(task_name: str, **kwargs: dict[str, Any]):
print(f"Performing task: {task_name}")
# Simulate work
time.sleep(random.randint(2, 10))
# Check for simulated failure
if kwargs.get("should_fail"):
raise Exception("Oh no, an error occurred!")
class JobConsumer:
"""Consumes jobs one at a time from the queue."""
def __init__(self, job_store: JobStore, job_queue: JobQueue):
self.job_store = job_store
self.job_queue = job_queue
self.running = False
def process_job(self, job_id: str):
job = self.job_store.get_job(job_id)
if not job:
print(f"Error: Job {job_id} not found in store")
return
print(f"\nProcessing job: {job_id}")
print(f"Payload: {job.payload}")
self.job_store.update_job_status(job_id, JobStatus.IN_PROGRESS)
self.job_store.set_job_started_at(job_id, datetime.now())
try:
task_name = job.payload.pop("task_name", "unknown")
task_handler(task_name, **job.payload)
except Exception as e:
self.job_store.update_job_status(job_id, JobStatus.FAILED)
print(f"Job {job_id} failed: {str(e)}")
return
self.job_store.update_job_status(job_id, JobStatus.COMPLETED)
self.job_store.set_job_completed_at(job_id, datetime.now())
print(f"Job {job_id} completed successfully")
def run(self, poll_interval_sec: float = 1.0):
"""Run the consumer in a loop, processing jobs as they come in."""
self.running = True
# Set up signal handling for graceful shutdown
def handle_signal(sig, frame):
print("\nShutting down consumer...")
self.running = False
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
print(f"Starting job consumer. Polling every {poll_interval_sec} seconds.")
print("Press Ctrl+C to stop.")
print(f"Currently there are {self.job_queue.get_job_count()} jobs waiting")
jobs_processed = 0
waiting_in_empty = False
while self.running:
job_id = self.job_queue.pull_next_job()
if job_id:
waiting_in_empty = False
self.process_job(job_id)
jobs_processed += 1
else:
if not waiting_in_empty:
print("\nNo jobs in queue. Waiting...")
waiting_in_empty = True
time.sleep(poll_interval_sec)
print(f"\nConsumer stopped after processing {jobs_processed} jobs")
def main():
"""Run the consumer to process jobs from the queue."""
print("=== Job consumer ===\n")
load_dotenv()
job_store = JobStore()
job_queue = JobQueue()
consumer = JobConsumer(job_store, job_queue)
consumer.run()
if __name__ == "__main__":
main()
Run the job processing #
Let’s run the producer and consumer!
Navigate to the root of the project directory.
First, in a terminal session, we’ll need to launch a Redis server to hold our job store and job queue.
Replace {REDIS_PORT}
with the port you set in your .env
file.
docker run --name redis -d -p {REDIS_PORT}:6379 redis:latest
Next, in a separate terminal session, we’ll need to launch at least one job consumer. Feel free to repeat this step in different terminal sessions to launch more job consumers.
uv run consumer.py
Finally, in a separate terminal session, we’ll need to launch a job producer.
uv run producer.py
Now we’re all set!
NOTE: Be sure to terminate the producer and consumer sessions when you’re done, or else they’ll keep running in the background!
Here’s what the sample output looks like:
Sample producer output
$ uv run producer.py
=== Job producer ===
Press Ctrl+C to stop.
Created job with ID: fe77ad78-5ae8-4abb-b306-96f1d8f2abdd
Created job with payload: {'task_name': 'send_email', 'to': 'user7149@example.com', 'subject': 'Update_19'}
Queue now has 1 job(s) waiting.
Created job with ID: 46948e68-afa1-456d-b974-0ac845da99e7
Created job with payload: {'task_name': 'risky_operation', 'should_fail': True, 'severity': 4}
Queue now has 1 job(s) waiting.
Created job with ID: 9f6c84db-f003-4962-8ecb-d858d4094ed2
Sample consumer output
$ uv run consumer.py
=== Job consumer ===
Starting job consumer. Polling every 1.0 seconds.
Press Ctrl+C to stop.
Currently there are 0 jobs waiting
No jobs in queue. Waiting...
Processing job: fe77ad78-5ae8-4abb-b306-96f1d8f2abdd
Payload: {'task_name': 'send_email', 'to': 'user7149@example.com', 'subject': 'Update_19'}
Performing task: send_email
Job fe77ad78-5ae8-4abb-b306-96f1d8f2abdd completed successfully
Concluding thoughts #
Congrats! You’ve built a basic job queue!
In the real world, a job queue will need to handle more complex scenarios, such as high volumes of jobs, job prioritization, job retries, job expiration, and more. But this is a good starting point to understand the basic concepts.
You can find the full code for our example here.