A queue is a linear data structure that follows the First In, First Out (FIFO) principle. Unlike stacks where elements are added and removed from the same end, queues have two distinct ends:
- Front (Head): Where elements are removed (dequeue operation)
- Rear (Tail): Where elements are added (enqueue operation)
Think of a queue like a line of people waiting at a bank, grocery store, or ticket counter – the first person to join the line is the first person to be served.
FIFO Concept (First In, First Out) #
Understanding FIFO #
FIFO means:
- The first element added to the queue is the first one to be removed
- Elements are processed in the exact order they were inserted
- No element can “cut in line” – strict ordering is maintained
Visual Representation #
Queue Operations:
Initial: []
Enqueue 1: [1]
Enqueue 2: [1, 2]
Enqueue 3: [1, 2, 3]
Dequeue: [2, 3] (1 is removed from front)
Dequeue: [3] (2 is removed from front)
Enqueue 4: [3, 4]
Core Queue Operations #
- Enqueue: Add an element to the rear of the queue
- Dequeue: Remove and return the front element from the queue
- Front/Peek: View the front element without removing it
- Rear: View the rear element without removing it
- isEmpty: Check if the queue is empty
- Size: Get the number of elements in the queue
Implementation with Deque #
Why Use Deque? #
A deque (double-ended queue) is perfect for implementing queues because:
- O(1) operations at both ends
- More efficient than regular lists for queue operations
- Avoids the O(n) complexity of inserting at the beginning of a list
Python Implementation with collections.deque #
from collections import deque
import time
class Queue:
def __init__(self):
"""Initialize an empty queue using deque"""
self.items = deque()
def enqueue(self, item):
"""Add an item to the rear of the queue"""
self.items.append(item)
print(f"Enqueued {item} to queue")
def dequeue(self):
"""Remove and return the front item from the queue"""
if self.is_empty():
raise IndexError("Queue is empty")
item = self.items.popleft()
print(f"Dequeued {item} from queue")
return item
def front(self):
"""Return the front item without removing it"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[0]
def rear(self):
"""Return the rear item without removing it"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[-1]
def is_empty(self):
"""Check if the queue is empty"""
return len(self.items) == 0
def size(self):
"""Return the number of items in the queue"""
return len(self.items)
def display(self):
"""Display the current queue contents"""
if self.is_empty():
print("Queue is empty")
else:
print(f"Queue contents (front to rear): {list(self.items)}")
# Example usage
queue = Queue()
queue.enqueue("Customer A")
queue.enqueue("Customer B")
queue.enqueue("Customer C")
queue.display()
print(f"Next to be served: {queue.front()}")
queue.dequeue()
queue.display()
Advanced Queue Implementation with Priority #
import heapq
from collections import deque
import time
class PriorityQueue:
def __init__(self):
self.heap = []
self.counter = 0 # For stable sorting
def enqueue(self, item, priority):
"""Add item with priority (lower number = higher priority)"""
heapq.heappush(self.heap, (priority, self.counter, item))
self.counter += 1
print(f"Enqueued {item} with priority {priority}")
def dequeue(self):
"""Remove and return highest priority item"""
if self.is_empty():
raise IndexError("Priority queue is empty")
priority, _, item = heapq.heappop(self.heap)
print(f"Dequeued {item} (priority: {priority})")
return item
def peek(self):
"""View highest priority item without removing"""
if self.is_empty():
raise IndexError("Priority queue is empty")
return self.heap[0][2]
def is_empty(self):
return len(self.heap) == 0
def size(self):
return len(self.heap)
# Example: Hospital Emergency Room
er_queue = PriorityQueue()
er_queue.enqueue("Patient with headache", 3)
er_queue.enqueue("Patient with broken arm", 2)
er_queue.enqueue("Heart attack patient", 1) # Highest priority
er_queue.enqueue("Patient with cold", 4)
print("Processing patients in order of urgency:")
while not er_queue.is_empty():
er_queue.dequeue()
Circular Queue Implementation #
class CircularQueue:
def __init__(self, capacity):
self.capacity = capacity
self.queue = [None] * capacity
self.front = 0
self.rear = 0
self.size = 0
def is_full(self):
return self.size == self.capacity
def is_empty(self):
return self.size == 0
def enqueue(self, item):
if self.is_full():
raise OverflowError("Queue is full")
self.queue[self.rear] = item
self.rear = (self.rear + 1) % self.capacity
self.size += 1
print(f"Enqueued {item}")
def dequeue(self):
if self.is_empty():
raise IndexError("Queue is empty")
item = self.queue[self.front]
self.queue[self.front] = None
self.front = (self.front + 1) % self.capacity
self.size -= 1
print(f"Dequeued {item}")
return item
def display(self):
if self.is_empty():
print("Circular queue is empty")
return
items = []
current = self.front
for _ in range(self.size):
items.append(self.queue[current])
current = (current + 1) % self.capacity
print(f"Circular queue: {items}")
# Example: Round-robin CPU scheduling
cpu_scheduler = CircularQueue(5)
cpu_scheduler.enqueue("Process A")
cpu_scheduler.enqueue("Process B")
cpu_scheduler.enqueue("Process C")
cpu_scheduler.display()
Real-World Use Cases and Examples #
1. Customer Service Systems #
Call Center Queue Management:
import datetime
from collections import deque
import random
class CallCenterQueue:
def __init__(self):
self.waiting_queue = deque()
self.agents = ['Agent 1', 'Agent 2', 'Agent 3']
self.available_agents = deque(self.agents.copy())
self.busy_agents = {}
def receive_call(self, caller_name, issue_type):
call = {
'caller': caller_name,
'issue': issue_type,
'timestamp': datetime.datetime.now(),
'wait_time': 0
}
self.waiting_queue.append(call)
print(f"š Call from {caller_name} ({issue_type}) added to queue")
self.process_queue()
def process_queue(self):
while self.waiting_queue and self.available_agents:
call = self.waiting_queue.popleft()
agent = self.available_agents.popleft()
wait_time = (datetime.datetime.now() - call['timestamp']).seconds
call['wait_time'] = wait_time
self.busy_agents[agent] = call
print(f"š§ {agent} is now handling call from {call['caller']} "
f"(waited {wait_time}s)")
def finish_call(self, agent):
if agent in self.busy_agents:
call = self.busy_agents.pop(agent)
self.available_agents.append(agent)
print(f"ā
{agent} finished call with {call['caller']}")
self.process_queue() # Process next waiting call
def show_status(self):
print(f"\nš Call Center Status:")
print(f"Calls in queue: {len(self.waiting_queue)}")
print(f"Available agents: {len(self.available_agents)}")
print(f"Busy agents: {len(self.busy_agents)}")
if self.waiting_queue:
print("Waiting callers:")
for i, call in enumerate(self.waiting_queue, 1):
wait_time = (datetime.datetime.now() - call['timestamp']).seconds
print(f" {i}. {call['caller']} ({call['issue']}) - waiting {wait_time}s")
# Simulation
call_center = CallCenterQueue()
call_center.receive_call("John Smith", "Billing Issue")
call_center.receive_call("Mary Johnson", "Technical Support")
call_center.receive_call("Bob Wilson", "Account Question")
call_center.receive_call("Alice Brown", "Product Inquiry")
call_center.show_status()
# Simulate call completion
call_center.finish_call("Agent 1")
call_center.show_status()
2. Operating System Process Scheduling #
CPU Process Scheduling:
import time
from collections import deque
class Process:
def __init__(self, pid, name, burst_time, arrival_time=0):
self.pid = pid
self.name = name
self.burst_time = burst_time
self.remaining_time = burst_time
self.arrival_time = arrival_time
self.start_time = None
self.completion_time = None
self.waiting_time = 0
self.turnaround_time = 0
class CPUScheduler:
def __init__(self, time_quantum=2):
self.ready_queue = deque()
self.time_quantum = time_quantum
self.current_time = 0
self.completed_processes = []
def add_process(self, process):
self.ready_queue.append(process)
print(f"Process {process.name} (PID: {process.pid}) added to ready queue")
def round_robin_scheduling(self):
print(f"\nš Starting Round Robin Scheduling (Time Quantum: {self.time_quantum})")
print("-" * 60)
while self.ready_queue:
current_process = self.ready_queue.popleft()
if current_process.start_time is None:
current_process.start_time = self.current_time
# Execute process for time quantum or remaining time
execution_time = min(self.time_quantum, current_process.remaining_time)
print(f"ā° Time {self.current_time}: Executing {current_process.name} "
f"for {execution_time} units")
current_process.remaining_time -= execution_time
self.current_time += execution_time
# Check if process is completed
if current_process.remaining_time == 0:
current_process.completion_time = self.current_time
current_process.turnaround_time = (current_process.completion_time -
current_process.arrival_time)
current_process.waiting_time = (current_process.turnaround_time -
current_process.burst_time)
self.completed_processes.append(current_process)
print(f"ā
Process {current_process.name} completed at time {self.current_time}")
else:
# Process not completed, add back to queue
self.ready_queue.append(current_process)
print(f"āøļø Process {current_process.name} preempted, "
f"{current_process.remaining_time} units remaining")
print()
def print_statistics(self):
print("š Process Execution Statistics:")
print("-" * 80)
print(f"{'Process':<10} {'PID':<5} {'Burst':<7} {'Start':<7} {'Complete':<10} "
f"{'Waiting':<8} {'Turnaround':<12}")
print("-" * 80)
total_waiting = 0
total_turnaround = 0
for p in self.completed_processes:
print(f"{p.name:<10} {p.pid:<5} {p.burst_time:<7} {p.start_time:<7} "
f"{p.completion_time:<10} {p.waiting_time:<8} {p.turnaround_time:<12}")
total_waiting += p.waiting_time
total_turnaround += p.turnaround_time
print("-" * 80)
avg_waiting = total_waiting / len(self.completed_processes)
avg_turnaround = total_turnaround / len(self.completed_processes)
print(f"Average Waiting Time: {avg_waiting:.2f}")
print(f"Average Turnaround Time: {avg_turnaround:.2f}")
# Simulation
scheduler = CPUScheduler(time_quantum=3)
# Add processes
processes = [
Process(1, "Browser", 8),
Process(2, "TextEditor", 4),
Process(3, "MediaPlayer", 6),
Process(4, "Calculator", 2),
Process(5, "FileManager", 5)
]
for process in processes:
scheduler.add_process(process)
scheduler.round_robin_scheduling()
scheduler.print_statistics()
3. Web Server Request Handling #
HTTP Request Queue Management:
import asyncio
import datetime
from collections import deque
import random
class HTTPRequest:
def __init__(self, client_ip, method, url, timestamp=None):
self.client_ip = client_ip
self.method = method
self.url = url
self.timestamp = timestamp or datetime.datetime.now()
self.response_time = None
self.status_code = None
class WebServer:
def __init__(self, max_workers=5):
self.request_queue = deque()
self.max_workers = max_workers
self.active_workers = 0
self.processed_requests = []
self.total_requests = 0
self.is_running = False
def receive_request(self, request):
self.request_queue.append(request)
self.total_requests += 1
print(f"š„ Received {request.method} {request.url} from {request.client_ip}")
print(f"Queue size: {len(self.request_queue)}")
async def process_request(self, request):
"""Simulate processing an HTTP request"""
start_time = datetime.datetime.now()
# Simulate different processing times based on request type
processing_time = {
'GET': random.uniform(0.1, 0.5),
'POST': random.uniform(0.5, 2.0),
'PUT': random.uniform(0.3, 1.0),
'DELETE': random.uniform(0.2, 0.8)
}.get(request.method, 0.5)
print(f"š Processing {request.method} {request.url} (worker {self.active_workers})")
await asyncio.sleep(processing_time) # Simulate work
# Simulate response
request.response_time = (datetime.datetime.now() - start_time).total_seconds()
request.status_code = random.choice([200, 200, 200, 404, 500]) # Mostly successful
self.processed_requests.append(request)
print(f"ā
Completed {request.method} {request.url} - "
f"Status: {request.status_code}, Time: {request.response_time:.3f}s")
async def worker(self):
"""Worker coroutine to process requests from queue"""
while self.is_running or self.request_queue:
if self.request_queue:
request = self.request_queue.popleft()
self.active_workers += 1
try:
await self.process_request(request)
finally:
self.active_workers -= 1
else:
await asyncio.sleep(0.1) # Wait for requests
async def start_server(self, duration=10):
"""Start the web server with multiple workers"""
print(f"š Starting web server with {self.max_workers} workers")
self.is_running = True
# Start worker tasks
workers = [asyncio.create_task(self.worker()) for _ in range(self.max_workers)]
# Simulate incoming requests
await self.simulate_traffic(duration)
# Stop server and wait for remaining requests
self.is_running = False
await asyncio.gather(*workers)
self.print_statistics()
async def simulate_traffic(self, duration):
"""Simulate incoming HTTP requests"""
end_time = datetime.datetime.now() + datetime.timedelta(seconds=duration)
urls = ['/home', '/api/users', '/api/posts', '/images/logo.png',
'/css/style.css', '/api/login', '/profile', '/search']
methods = ['GET', 'GET', 'GET', 'POST', 'PUT', 'DELETE']
ips = ['192.168.1.100', '10.0.0.50', '172.16.0.25', '203.0.113.1']
while datetime.datetime.now() < end_time:
# Create random request
request = HTTPRequest(
client_ip=random.choice(ips),
method=random.choice(methods),
url=random.choice(urls)
)
self.receive_request(request)
# Random interval between requests
await asyncio.sleep(random.uniform(0.1, 1.0))
def print_statistics(self):
print(f"\nš Web Server Statistics")
print("-" * 50)
print(f"Total requests received: {self.total_requests}")
print(f"Requests processed: {len(self.processed_requests)}")
print(f"Requests in queue: {len(self.request_queue)}")
if self.processed_requests:
avg_response_time = sum(r.response_time for r in self.processed_requests) / len(self.processed_requests)
success_rate = len([r for r in self.processed_requests if r.status_code == 200]) / len(self.processed_requests) * 100
print(f"Average response time: {avg_response_time:.3f}s")
print(f"Success rate: {success_rate:.1f}%")
# Status code distribution
status_codes = {}
for request in self.processed_requests:
status_codes[request.status_code] = status_codes.get(request.status_code, 0) + 1
print("Status code distribution:")
for code, count in sorted(status_codes.items()):
print(f" {code}: {count} requests")
# Run the simulation
async def main():
server = WebServer(max_workers=3)
await server.start_server(duration=8)
# To run: asyncio.run(main())
print("Web server simulation ready. Run with: asyncio.run(main())")
4. Print Job Queue Management #
Printer Spooler System:
import time
import threading
from collections import deque
from datetime import datetime
import random
class PrintJob:
def __init__(self, job_id, document_name, user, pages, priority='normal'):
self.job_id = job_id
self.document_name = document_name
self.user = user
self.pages = pages
self.priority = priority
self.submitted_time = datetime.now()
self.start_time = None
self.completion_time = None
self.status = 'queued'
def __str__(self):
return f"Job {self.job_id}: {self.document_name} ({self.pages} pages) - {self.user}"
class PrinterSpooler:
def __init__(self, printer_name, pages_per_minute=10):
self.printer_name = printer_name
self.pages_per_minute = pages_per_minute
self.print_queue = deque()
self.priority_queue = deque() # For urgent jobs
self.current_job = None
self.total_jobs_processed = 0
self.is_printing = False
self.printer_thread = None
self.lock = threading.Lock()
def submit_job(self, job):
"""Submit a print job to the appropriate queue"""
with self.lock:
if job.priority == 'urgent':
self.priority_queue.append(job)
print(f"šØ URGENT: {job} added to priority queue")
else:
self.print_queue.append(job)
print(f"š {job} added to print queue")
self.show_queue_status()
def get_next_job(self):
"""Get the next job to print (priority queue first)"""
with self.lock:
if self.priority_queue:
return self.priority_queue.popleft()
elif self.print_queue:
return self.print_queue.popleft()
return None
def start_printing(self):
"""Start the printer thread"""
if not self.printer_thread or not self.printer_thread.is_alive():
self.printer_thread = threading.Thread(target=self._printing_worker)
self.printer_thread.daemon = True
self.printer_thread.start()
print(f"šØļø {self.printer_name} started")
def _printing_worker(self):
"""Worker thread that processes print jobs"""
while True:
job = self.get_next_job()
if job is None:
time.sleep(1) # Wait for jobs
continue
self._print_job(job)
def _print_job(self, job):
"""Simulate printing a job"""
self.current_job = job
job.status = 'printing'
job.start_time = datetime.now()
wait_time = (job.start_time - job.submitted_time).total_seconds()
print(f"šØļø Started printing {job} (waited {wait_time:.1f}s)")
# Calculate print time based on pages and printer speed
print_time = (job.pages / self.pages_per_minute) * 60 # seconds
# Simulate printing with progress updates
pages_printed = 0
while pages_printed < job.pages:
time.sleep(print_time / job.pages) # Time per page
pages_printed += 1
if pages_printed % 5 == 0 or pages_printed == job.pages:
print(f" š Printed {pages_printed}/{job.pages} pages of {job.document_name}")
# Job completed
job.completion_time = datetime.now()
job.status = 'completed'
self.current_job = None
self.total_jobs_processed += 1
total_time = (job.completion_time - job.submitted_time).total_seconds()
print(f"ā
Completed {job} in {total_time:.1f}s total")
print()
def show_queue_status(self):
"""Display current queue status"""
with self.lock:
print(f"\nš {self.printer_name} Queue Status:")
print(f"Priority queue: {len(self.priority_queue)} jobs")
print(f"Regular queue: {len(self.print_queue)} jobs")
if self.current_job:
print(f"Currently printing: {self.current_job}")
if self.priority_queue:
print("Priority jobs:")
for i, job in enumerate(self.priority_queue, 1):
print(f" {i}. {job}")
if self.print_queue:
print("Regular jobs:")
for i, job in enumerate(self.print_queue, 1):
wait_time = (datetime.now() - job.submitted_time).total_seconds()
print(f" {i}. {job} (waiting {wait_time:.1f}s)")
print("-" * 50)
def cancel_job(self, job_id):
"""Cancel a job from the queue"""
with self.lock:
# Check priority queue
for job in list(self.priority_queue):
if job.job_id == job_id:
self.priority_queue.remove(job)
print(f"ā Cancelled job {job_id} from priority queue")
return True
# Check regular queue
for job in list(self.print_queue):
if job.job_id == job_id:
self.print_queue.remove(job)
print(f"ā Cancelled job {job_id} from regular queue")
return True
print(f"ā Job {job_id} not found in queues")
return False
# Simulation
def simulate_office_printing():
# Create printer
office_printer = PrinterSpooler("HP LaserJet Pro", pages_per_minute=20)
office_printer.start_printing()
# Simulate various print jobs
jobs = [
PrintJob(1, "Monthly Report.pdf", "Alice", 25),
PrintJob(2, "Meeting Notes.docx", "Bob", 3),
PrintJob(3, "URGENT: Contract.pdf", "Manager", 10, priority='urgent'),
PrintJob(4, "Presentation.pptx", "Carol", 15),
PrintJob(5, "Invoice_001.pdf", "Dave", 2),
PrintJob(6, "CRITICAL: Legal Doc.pdf", "Legal Team", 8, priority='urgent'),
PrintJob(7, "Newsletter.pdf", "Marketing", 20),
]
# Submit jobs with delays
for i, job in enumerate(jobs):
office_printer.submit_job(job)
if i == 2: # After urgent job
time.sleep(2)
office_printer.show_queue_status()
time.sleep(random.uniform(1, 3)) # Random delays between submissions
# Let some jobs process
time.sleep(10)
# Cancel a job
office_printer.cancel_job(4)
# Final status
time.sleep(15)
office_printer.show_queue_status()
print(f"\nš Final Statistics:")
print(f"Total jobs processed: {office_printer.total_jobs_processed}")
# Run simulation
print("Print spooler simulation starting...")
simulate_office_printing()