Queue

13 min read

A queue is a linear data structure that follows the First In, First Out (FIFO) principle. Unlike stacks where elements are added and removed from the same end, queues have two distinct ends:

  • Front (Head): Where elements are removed (dequeue operation)
  • Rear (Tail): Where elements are added (enqueue operation)

Think of a queue like a line of people waiting at a bank, grocery store, or ticket counter – the first person to join the line is the first person to be served.

FIFO Concept (First In, First Out) #

Understanding FIFO #

FIFO means:

  • The first element added to the queue is the first one to be removed
  • Elements are processed in the exact order they were inserted
  • No element can “cut in line” – strict ordering is maintained

Visual Representation #

Queue Operations:
Initial:     []
Enqueue 1:   [1]
Enqueue 2:   [1, 2]
Enqueue 3:   [1, 2, 3]
Dequeue:     [2, 3]     (1 is removed from front)
Dequeue:     [3]        (2 is removed from front)
Enqueue 4:   [3, 4]

Core Queue Operations #

  1. Enqueue: Add an element to the rear of the queue
  2. Dequeue: Remove and return the front element from the queue
  3. Front/Peek: View the front element without removing it
  4. Rear: View the rear element without removing it
  5. isEmpty: Check if the queue is empty
  6. Size: Get the number of elements in the queue

Implementation with Deque #

Why Use Deque? #

A deque (double-ended queue) is perfect for implementing queues because:

  • O(1) operations at both ends
  • More efficient than regular lists for queue operations
  • Avoids the O(n) complexity of inserting at the beginning of a list

Python Implementation with collections.deque #

from collections import deque
import time

class Queue:
    def __init__(self):
        """Initialize an empty queue using deque"""
        self.items = deque()
    
    def enqueue(self, item):
        """Add an item to the rear of the queue"""
        self.items.append(item)
        print(f"Enqueued {item} to queue")
    
    def dequeue(self):
        """Remove and return the front item from the queue"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        item = self.items.popleft()
        print(f"Dequeued {item} from queue")
        return item
    
    def front(self):
        """Return the front item without removing it"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items[0]
    
    def rear(self):
        """Return the rear item without removing it"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items[-1]
    
    def is_empty(self):
        """Check if the queue is empty"""
        return len(self.items) == 0
    
    def size(self):
        """Return the number of items in the queue"""
        return len(self.items)
    
    def display(self):
        """Display the current queue contents"""
        if self.is_empty():
            print("Queue is empty")
        else:
            print(f"Queue contents (front to rear): {list(self.items)}")

# Example usage
queue = Queue()
queue.enqueue("Customer A")
queue.enqueue("Customer B")
queue.enqueue("Customer C")
queue.display()

print(f"Next to be served: {queue.front()}")
queue.dequeue()
queue.display()

Advanced Queue Implementation with Priority #

import heapq
from collections import deque
import time

class PriorityQueue:
    def __init__(self):
        self.heap = []
        self.counter = 0  # For stable sorting
    
    def enqueue(self, item, priority):
        """Add item with priority (lower number = higher priority)"""
        heapq.heappush(self.heap, (priority, self.counter, item))
        self.counter += 1
        print(f"Enqueued {item} with priority {priority}")
    
    def dequeue(self):
        """Remove and return highest priority item"""
        if self.is_empty():
            raise IndexError("Priority queue is empty")
        priority, _, item = heapq.heappop(self.heap)
        print(f"Dequeued {item} (priority: {priority})")
        return item
    
    def peek(self):
        """View highest priority item without removing"""
        if self.is_empty():
            raise IndexError("Priority queue is empty")
        return self.heap[0][2]
    
    def is_empty(self):
        return len(self.heap) == 0
    
    def size(self):
        return len(self.heap)

# Example: Hospital Emergency Room
er_queue = PriorityQueue()
er_queue.enqueue("Patient with headache", 3)
er_queue.enqueue("Patient with broken arm", 2)
er_queue.enqueue("Heart attack patient", 1)  # Highest priority
er_queue.enqueue("Patient with cold", 4)

print("Processing patients in order of urgency:")
while not er_queue.is_empty():
    er_queue.dequeue()

Circular Queue Implementation #

class CircularQueue:
    def __init__(self, capacity):
        self.capacity = capacity
        self.queue = [None] * capacity
        self.front = 0
        self.rear = 0
        self.size = 0
    
    def is_full(self):
        return self.size == self.capacity
    
    def is_empty(self):
        return self.size == 0
    
    def enqueue(self, item):
        if self.is_full():
            raise OverflowError("Queue is full")
        
        self.queue[self.rear] = item
        self.rear = (self.rear + 1) % self.capacity
        self.size += 1
        print(f"Enqueued {item}")
    
    def dequeue(self):
        if self.is_empty():
            raise IndexError("Queue is empty")
        
        item = self.queue[self.front]
        self.queue[self.front] = None
        self.front = (self.front + 1) % self.capacity
        self.size -= 1
        print(f"Dequeued {item}")
        return item
    
    def display(self):
        if self.is_empty():
            print("Circular queue is empty")
            return
        
        items = []
        current = self.front
        for _ in range(self.size):
            items.append(self.queue[current])
            current = (current + 1) % self.capacity
        
        print(f"Circular queue: {items}")

# Example: Round-robin CPU scheduling
cpu_scheduler = CircularQueue(5)
cpu_scheduler.enqueue("Process A")
cpu_scheduler.enqueue("Process B")
cpu_scheduler.enqueue("Process C")
cpu_scheduler.display()

Real-World Use Cases and Examples #

1. Customer Service Systems #

Call Center Queue Management:

import datetime
from collections import deque
import random

class CallCenterQueue:
    def __init__(self):
        self.waiting_queue = deque()
        self.agents = ['Agent 1', 'Agent 2', 'Agent 3']
        self.available_agents = deque(self.agents.copy())
        self.busy_agents = {}
    
    def receive_call(self, caller_name, issue_type):
        call = {
            'caller': caller_name,
            'issue': issue_type,
            'timestamp': datetime.datetime.now(),
            'wait_time': 0
        }
        self.waiting_queue.append(call)
        print(f"šŸ“ž Call from {caller_name} ({issue_type}) added to queue")
        self.process_queue()
    
    def process_queue(self):
        while self.waiting_queue and self.available_agents:
            call = self.waiting_queue.popleft()
            agent = self.available_agents.popleft()
            
            wait_time = (datetime.datetime.now() - call['timestamp']).seconds
            call['wait_time'] = wait_time
            
            self.busy_agents[agent] = call
            print(f"šŸŽ§ {agent} is now handling call from {call['caller']} "
                  f"(waited {wait_time}s)")
    
    def finish_call(self, agent):
        if agent in self.busy_agents:
            call = self.busy_agents.pop(agent)
            self.available_agents.append(agent)
            print(f"āœ… {agent} finished call with {call['caller']}")
            self.process_queue()  # Process next waiting call
    
    def show_status(self):
        print(f"\nšŸ“Š Call Center Status:")
        print(f"Calls in queue: {len(self.waiting_queue)}")
        print(f"Available agents: {len(self.available_agents)}")
        print(f"Busy agents: {len(self.busy_agents)}")
        
        if self.waiting_queue:
            print("Waiting callers:")
            for i, call in enumerate(self.waiting_queue, 1):
                wait_time = (datetime.datetime.now() - call['timestamp']).seconds
                print(f"  {i}. {call['caller']} ({call['issue']}) - waiting {wait_time}s")

# Simulation
call_center = CallCenterQueue()
call_center.receive_call("John Smith", "Billing Issue")
call_center.receive_call("Mary Johnson", "Technical Support")
call_center.receive_call("Bob Wilson", "Account Question")
call_center.receive_call("Alice Brown", "Product Inquiry")

call_center.show_status()

# Simulate call completion
call_center.finish_call("Agent 1")
call_center.show_status()

2. Operating System Process Scheduling #

CPU Process Scheduling:

import time
from collections import deque

class Process:
    def __init__(self, pid, name, burst_time, arrival_time=0):
        self.pid = pid
        self.name = name
        self.burst_time = burst_time
        self.remaining_time = burst_time
        self.arrival_time = arrival_time
        self.start_time = None
        self.completion_time = None
        self.waiting_time = 0
        self.turnaround_time = 0

class CPUScheduler:
    def __init__(self, time_quantum=2):
        self.ready_queue = deque()
        self.time_quantum = time_quantum
        self.current_time = 0
        self.completed_processes = []
    
    def add_process(self, process):
        self.ready_queue.append(process)
        print(f"Process {process.name} (PID: {process.pid}) added to ready queue")
    
    def round_robin_scheduling(self):
        print(f"\nšŸ”„ Starting Round Robin Scheduling (Time Quantum: {self.time_quantum})")
        print("-" * 60)
        
        while self.ready_queue:
            current_process = self.ready_queue.popleft()
            
            if current_process.start_time is None:
                current_process.start_time = self.current_time
            
            # Execute process for time quantum or remaining time
            execution_time = min(self.time_quantum, current_process.remaining_time)
            
            print(f"ā° Time {self.current_time}: Executing {current_process.name} "
                  f"for {execution_time} units")
            
            current_process.remaining_time -= execution_time
            self.current_time += execution_time
            
            # Check if process is completed
            if current_process.remaining_time == 0:
                current_process.completion_time = self.current_time
                current_process.turnaround_time = (current_process.completion_time - 
                                                 current_process.arrival_time)
                current_process.waiting_time = (current_process.turnaround_time - 
                                              current_process.burst_time)
                
                self.completed_processes.append(current_process)
                print(f"āœ… Process {current_process.name} completed at time {self.current_time}")
            else:
                # Process not completed, add back to queue
                self.ready_queue.append(current_process)
                print(f"āøļø  Process {current_process.name} preempted, "
                      f"{current_process.remaining_time} units remaining")
            
            print()
    
    def print_statistics(self):
        print("šŸ“ˆ Process Execution Statistics:")
        print("-" * 80)
        print(f"{'Process':<10} {'PID':<5} {'Burst':<7} {'Start':<7} {'Complete':<10} "
              f"{'Waiting':<8} {'Turnaround':<12}")
        print("-" * 80)
        
        total_waiting = 0
        total_turnaround = 0
        
        for p in self.completed_processes:
            print(f"{p.name:<10} {p.pid:<5} {p.burst_time:<7} {p.start_time:<7} "
                  f"{p.completion_time:<10} {p.waiting_time:<8} {p.turnaround_time:<12}")
            total_waiting += p.waiting_time
            total_turnaround += p.turnaround_time
        
        print("-" * 80)
        avg_waiting = total_waiting / len(self.completed_processes)
        avg_turnaround = total_turnaround / len(self.completed_processes)
        print(f"Average Waiting Time: {avg_waiting:.2f}")
        print(f"Average Turnaround Time: {avg_turnaround:.2f}")

# Simulation
scheduler = CPUScheduler(time_quantum=3)

# Add processes
processes = [
    Process(1, "Browser", 8),
    Process(2, "TextEditor", 4),
    Process(3, "MediaPlayer", 6),
    Process(4, "Calculator", 2),
    Process(5, "FileManager", 5)
]

for process in processes:
    scheduler.add_process(process)

scheduler.round_robin_scheduling()
scheduler.print_statistics()

3. Web Server Request Handling #

HTTP Request Queue Management:

import asyncio
import datetime
from collections import deque
import random

class HTTPRequest:
    def __init__(self, client_ip, method, url, timestamp=None):
        self.client_ip = client_ip
        self.method = method
        self.url = url
        self.timestamp = timestamp or datetime.datetime.now()
        self.response_time = None
        self.status_code = None

class WebServer:
    def __init__(self, max_workers=5):
        self.request_queue = deque()
        self.max_workers = max_workers
        self.active_workers = 0
        self.processed_requests = []
        self.total_requests = 0
        self.is_running = False
    
    def receive_request(self, request):
        self.request_queue.append(request)
        self.total_requests += 1
        print(f"šŸ“„ Received {request.method} {request.url} from {request.client_ip}")
        print(f"Queue size: {len(self.request_queue)}")
    
    async def process_request(self, request):
        """Simulate processing an HTTP request"""
        start_time = datetime.datetime.now()
        
        # Simulate different processing times based on request type
        processing_time = {
            'GET': random.uniform(0.1, 0.5),
            'POST': random.uniform(0.5, 2.0),
            'PUT': random.uniform(0.3, 1.0),
            'DELETE': random.uniform(0.2, 0.8)
        }.get(request.method, 0.5)
        
        print(f"šŸ”„ Processing {request.method} {request.url} (worker {self.active_workers})")
        
        await asyncio.sleep(processing_time)  # Simulate work
        
        # Simulate response
        request.response_time = (datetime.datetime.now() - start_time).total_seconds()
        request.status_code = random.choice([200, 200, 200, 404, 500])  # Mostly successful
        
        self.processed_requests.append(request)
        print(f"āœ… Completed {request.method} {request.url} - "
              f"Status: {request.status_code}, Time: {request.response_time:.3f}s")
    
    async def worker(self):
        """Worker coroutine to process requests from queue"""
        while self.is_running or self.request_queue:
            if self.request_queue:
                request = self.request_queue.popleft()
                self.active_workers += 1
                
                try:
                    await self.process_request(request)
                finally:
                    self.active_workers -= 1
            else:
                await asyncio.sleep(0.1)  # Wait for requests
    
    async def start_server(self, duration=10):
        """Start the web server with multiple workers"""
        print(f"šŸš€ Starting web server with {self.max_workers} workers")
        self.is_running = True
        
        # Start worker tasks
        workers = [asyncio.create_task(self.worker()) for _ in range(self.max_workers)]
        
        # Simulate incoming requests
        await self.simulate_traffic(duration)
        
        # Stop server and wait for remaining requests
        self.is_running = False
        await asyncio.gather(*workers)
        
        self.print_statistics()
    
    async def simulate_traffic(self, duration):
        """Simulate incoming HTTP requests"""
        end_time = datetime.datetime.now() + datetime.timedelta(seconds=duration)
        
        urls = ['/home', '/api/users', '/api/posts', '/images/logo.png', 
                '/css/style.css', '/api/login', '/profile', '/search']
        methods = ['GET', 'GET', 'GET', 'POST', 'PUT', 'DELETE']
        ips = ['192.168.1.100', '10.0.0.50', '172.16.0.25', '203.0.113.1']
        
        while datetime.datetime.now() < end_time:
            # Create random request
            request = HTTPRequest(
                client_ip=random.choice(ips),
                method=random.choice(methods),
                url=random.choice(urls)
            )
            
            self.receive_request(request)
            
            # Random interval between requests
            await asyncio.sleep(random.uniform(0.1, 1.0))
    
    def print_statistics(self):
        print(f"\nšŸ“Š Web Server Statistics")
        print("-" * 50)
        print(f"Total requests received: {self.total_requests}")
        print(f"Requests processed: {len(self.processed_requests)}")
        print(f"Requests in queue: {len(self.request_queue)}")
        
        if self.processed_requests:
            avg_response_time = sum(r.response_time for r in self.processed_requests) / len(self.processed_requests)
            success_rate = len([r for r in self.processed_requests if r.status_code == 200]) / len(self.processed_requests) * 100
            
            print(f"Average response time: {avg_response_time:.3f}s")
            print(f"Success rate: {success_rate:.1f}%")
            
            # Status code distribution
            status_codes = {}
            for request in self.processed_requests:
                status_codes[request.status_code] = status_codes.get(request.status_code, 0) + 1
            
            print("Status code distribution:")
            for code, count in sorted(status_codes.items()):
                print(f"  {code}: {count} requests")

# Run the simulation
async def main():
    server = WebServer(max_workers=3)
    await server.start_server(duration=8)

# To run: asyncio.run(main())
print("Web server simulation ready. Run with: asyncio.run(main())")

4. Print Job Queue Management #

Printer Spooler System:

import time
import threading
from collections import deque
from datetime import datetime
import random

class PrintJob:
    def __init__(self, job_id, document_name, user, pages, priority='normal'):
        self.job_id = job_id
        self.document_name = document_name
        self.user = user
        self.pages = pages
        self.priority = priority
        self.submitted_time = datetime.now()
        self.start_time = None
        self.completion_time = None
        self.status = 'queued'
    
    def __str__(self):
        return f"Job {self.job_id}: {self.document_name} ({self.pages} pages) - {self.user}"

class PrinterSpooler:
    def __init__(self, printer_name, pages_per_minute=10):
        self.printer_name = printer_name
        self.pages_per_minute = pages_per_minute
        self.print_queue = deque()
        self.priority_queue = deque()  # For urgent jobs
        self.current_job = None
        self.total_jobs_processed = 0
        self.is_printing = False
        self.printer_thread = None
        self.lock = threading.Lock()
    
    def submit_job(self, job):
        """Submit a print job to the appropriate queue"""
        with self.lock:
            if job.priority == 'urgent':
                self.priority_queue.append(job)
                print(f"🚨 URGENT: {job} added to priority queue")
            else:
                self.print_queue.append(job)
                print(f"šŸ“„ {job} added to print queue")
            
            self.show_queue_status()
    
    def get_next_job(self):
        """Get the next job to print (priority queue first)"""
        with self.lock:
            if self.priority_queue:
                return self.priority_queue.popleft()
            elif self.print_queue:
                return self.print_queue.popleft()
            return None
    
    def start_printing(self):
        """Start the printer thread"""
        if not self.printer_thread or not self.printer_thread.is_alive():
            self.printer_thread = threading.Thread(target=self._printing_worker)
            self.printer_thread.daemon = True
            self.printer_thread.start()
            print(f"šŸ–Øļø  {self.printer_name} started")
    
    def _printing_worker(self):
        """Worker thread that processes print jobs"""
        while True:
            job = self.get_next_job()
            
            if job is None:
                time.sleep(1)  # Wait for jobs
                continue
            
            self._print_job(job)
    
    def _print_job(self, job):
        """Simulate printing a job"""
        self.current_job = job
        job.status = 'printing'
        job.start_time = datetime.now()
        
        wait_time = (job.start_time - job.submitted_time).total_seconds()
        print(f"šŸ–Øļø  Started printing {job} (waited {wait_time:.1f}s)")
        
        # Calculate print time based on pages and printer speed
        print_time = (job.pages / self.pages_per_minute) * 60  # seconds
        
        # Simulate printing with progress updates
        pages_printed = 0
        while pages_printed < job.pages:
            time.sleep(print_time / job.pages)  # Time per page
            pages_printed += 1
            
            if pages_printed % 5 == 0 or pages_printed == job.pages:
                print(f"   šŸ“ƒ Printed {pages_printed}/{job.pages} pages of {job.document_name}")
        
        # Job completed
        job.completion_time = datetime.now()
        job.status = 'completed'
        self.current_job = None
        self.total_jobs_processed += 1
        
        total_time = (job.completion_time - job.submitted_time).total_seconds()
        print(f"āœ… Completed {job} in {total_time:.1f}s total")
        print()
    
    def show_queue_status(self):
        """Display current queue status"""
        with self.lock:
            print(f"\nšŸ“Š {self.printer_name} Queue Status:")
            print(f"Priority queue: {len(self.priority_queue)} jobs")
            print(f"Regular queue: {len(self.print_queue)} jobs")
            
            if self.current_job:
                print(f"Currently printing: {self.current_job}")
            
            if self.priority_queue:
                print("Priority jobs:")
                for i, job in enumerate(self.priority_queue, 1):
                    print(f"  {i}. {job}")
            
            if self.print_queue:
                print("Regular jobs:")
                for i, job in enumerate(self.print_queue, 1):
                    wait_time = (datetime.now() - job.submitted_time).total_seconds()
                    print(f"  {i}. {job} (waiting {wait_time:.1f}s)")
            print("-" * 50)
    
    def cancel_job(self, job_id):
        """Cancel a job from the queue"""
        with self.lock:
            # Check priority queue
            for job in list(self.priority_queue):
                if job.job_id == job_id:
                    self.priority_queue.remove(job)
                    print(f"āŒ Cancelled job {job_id} from priority queue")
                    return True
            
            # Check regular queue
            for job in list(self.print_queue):
                if job.job_id == job_id:
                    self.print_queue.remove(job)
                    print(f"āŒ Cancelled job {job_id} from regular queue")
                    return True
            
            print(f"āŒ Job {job_id} not found in queues")
            return False

# Simulation
def simulate_office_printing():
    # Create printer
    office_printer = PrinterSpooler("HP LaserJet Pro", pages_per_minute=20)
    office_printer.start_printing()
    
    # Simulate various print jobs
    jobs = [
        PrintJob(1, "Monthly Report.pdf", "Alice", 25),
        PrintJob(2, "Meeting Notes.docx", "Bob", 3),
        PrintJob(3, "URGENT: Contract.pdf", "Manager", 10, priority='urgent'),
        PrintJob(4, "Presentation.pptx", "Carol", 15),
        PrintJob(5, "Invoice_001.pdf", "Dave", 2),
        PrintJob(6, "CRITICAL: Legal Doc.pdf", "Legal Team", 8, priority='urgent'),
        PrintJob(7, "Newsletter.pdf", "Marketing", 20),
    ]
    
    # Submit jobs with delays
    for i, job in enumerate(jobs):
        office_printer.submit_job(job)
        
        if i == 2:  # After urgent job
            time.sleep(2)
            office_printer.show_queue_status()
        
        time.sleep(random.uniform(1, 3))  # Random delays between submissions
    
    # Let some jobs process
    time.sleep(10)
    
    # Cancel a job
    office_printer.cancel_job(4)
    
    # Final status
    time.sleep(15)
    office_printer.show_queue_status()
    
    print(f"\nšŸ“ˆ Final Statistics:")
    print(f"Total jobs processed: {office_printer.total_jobs_processed}")

# Run simulation
print("Print spooler simulation starting...")
simulate_office_printing()
Updated on June 9, 2025