Performance Optimization¶
This page provides guidance on optimizing GigQ's performance for various workloads and scenarios.
Understanding GigQ's Performance Characteristics¶
GigQ is designed to be lightweight while providing reliable job processing. Its performance is influenced by several factors:
- SQLite Database Performance: GigQ uses SQLite as its backend, which has specific performance characteristics
- Worker Configuration: How workers are configured affects throughput and resource usage
- Job Characteristics: The nature of the jobs being processed impacts overall system performance
- System Resources: Available CPU, memory, and disk I/O affect how many jobs can be processed concurrently
SQLite Database Optimization¶
Database Location¶
The location of your SQLite database file significantly impacts performance:
| Location | Performance | Reliability | Use Case |
|---|---|---|---|
| Local SSD | Excellent | Excellent | Production |
| Local HDD | Good | Good | Development |
| Network File System (NFS) | Poor to Fair | Fair | Distributed setups (use cautiously) |
Memory (:memory: or tmpfs) | Excellent | Poor (no persistence) | Ephemeral workloads |
For best performance, keep your database file on local SSD storage.
Connection Pooling¶
GigQ uses thread-local storage to efficiently reuse SQLite connections within each thread. This approach respects SQLite's threading model (where connections can only be used in the thread that created them) while improving performance.
import sqlite3
import threading
# Thread-local storage for connections
_thread_local = threading.local()
def get_connection(db_path):
"""Get a connection from thread-local storage or create a new one."""
# Initialize connections dict for this thread if needed
if not hasattr(_thread_local, 'connections'):
_thread_local.connections = {}
# Get or create connection for this database
if db_path not in _thread_local.connections:
conn = sqlite3.connect(db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
_thread_local.connections[db_path] = conn
return _thread_local.connections[db_path]
def close_connection(db_path):
"""Close the connection for the specified database."""
if hasattr(_thread_local, 'connections') and db_path in _thread_local.connections:
_thread_local.connections[db_path].close()
del _thread_local.connections[db_path]
def close_connections():
"""Close all connections for this thread."""
if hasattr(_thread_local, 'connections'):
for conn in _thread_local.connections.values():
conn.close()
_thread_local.connections.clear()
This approach provides several benefits:
- Performance: Eliminates the overhead of repeatedly opening and closing connections
- Thread safety: Each thread only accesses its own connections
- Resource efficiency: Reduces the number of file handles used
- Simplicity: No complex connection pool management required
Remember to close connections when threads are done with them:
Index Optimization¶
GigQ creates basic indexes, but you might benefit from additional indexes based on your query patterns:
# Add custom indexes for your specific workload
conn = sqlite3.connect("jobs.db")
conn.execute("CREATE INDEX IF NOT EXISTS idx_jobs_name ON jobs (name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_jobs_priority_status ON jobs (priority, status)")
conn.commit()
conn.close()
Database Maintenance¶
Regular maintenance helps maintain performance:
def optimize_database(db_path):
"""Perform database maintenance to optimize performance."""
conn = sqlite3.connect(db_path)
# Analyze to update statistics
conn.execute("ANALYZE")
# Vacuum to reclaim space and defragment
conn.execute("VACUUM")
# Reindex to optimize indexes
conn.execute("REINDEX")
conn.close()
Run this periodically, especially after clearing many jobs.
Worker Configuration¶
Polling Interval¶
The worker polling interval affects both responsiveness and database load:
# More responsive but higher database load
worker_responsive = Worker("jobs.db", polling_interval=1)
# Less responsive but lower database load
worker_efficient = Worker("jobs.db", polling_interval=10)
Finding the right balance depends on your workload:
- For latency-sensitive jobs, use shorter intervals (1-2 seconds)
- For background processing, use longer intervals (5-30 seconds)
Number of Workers¶
The optimal number of workers depends on:
- CPU Cores: For CPU-bound jobs, start with one worker per core
- I/O Operations: For I/O-bound jobs, you can use more workers than cores
- SQLite Limitations: Too many concurrent workers can cause lock contention
A simple formula to start with:
def calculate_workers(job_type="mixed"):
import os
cores = os.cpu_count() or 4
if job_type == "cpu_bound":
return cores
elif job_type == "io_bound":
return cores * 2
else: # mixed
return cores + 2
Specialized Workers¶
Create specialized workers for different job types:
class HighPriorityWorker(Worker):
"""Worker that only processes high-priority jobs."""
def _claim_job(self):
conn = self._get_connection()
try:
conn.execute("BEGIN EXCLUSIVE TRANSACTION")
cursor = conn.execute(
"""
SELECT * FROM jobs
WHERE status = ? AND priority >= 50
ORDER BY priority DESC, created_at ASC
LIMIT 1
""",
(JobStatus.PENDING.value,)
)
# Rest of method implementation...
Worker Process Management¶
For long-running workers, consider using a process manager:
# Example with multiprocessing
from multiprocessing import Process
import os
def start_workers(db_path, count=4):
"""Start multiple worker processes."""
processes = []
for i in range(count):
worker_id = f"worker-{i+1}"
p = Process(target=run_worker, args=(db_path, worker_id))
p.daemon = True
p.start()
processes.append(p)
return processes
def run_worker(db_path, worker_id):
"""Run a worker in a separate process."""
worker = Worker(db_path, worker_id=worker_id)
worker.start()
# Usage
procs = start_workers("jobs.db", count=4)
# To stop workers
for p in procs:
p.terminate()
Job Optimization¶
Job Batching¶
Batching small jobs into larger ones reduces overhead:
# Instead of submitting many small jobs
for item in items:
job = Job(name=f"process_{item}", function=process_item, params={"item": item})
queue.submit(job)
# Batch them into a single job
def process_batch(items):
results = []
for item in items:
results.append(process_item(item))
return results
batch_job = Job(name="process_batch", function=process_batch, params={"items": items})
queue.submit(batch_job)
Job Prioritization¶
Use priorities to ensure important jobs run first:
# Critical job - will execute first
critical_job = Job(
name="critical_task",
function=critical_function,
priority=100
)
# Standard job - will execute after critical jobs
standard_job = Job(
name="standard_task",
function=standard_function,
priority=0
)
# Background job - will execute when no other jobs are available
background_job = Job(
name="background_task",
function=background_function,
priority=-100
)
Appropriate Timeouts¶
Set job timeouts based on expected execution time:
# Quick job with short timeout
quick_job = Job(
name="quick_task",
function=quick_function,
timeout=30 # 30 seconds
)
# Long-running job with longer timeout
long_job = Job(
name="long_task",
function=long_function,
timeout=3600 # 1 hour
)
Appropriate timeouts prevent jobs from getting stuck and blocking workers.
Efficient Job Functions¶
Optimize your job functions for performance:
# Less efficient version
def process_data_inefficient(file_path):
# Read the entire file into memory
with open(file_path, 'r') as f:
data = f.read()
# Process line by line
lines = data.split('\n')
results = []
for line in lines:
results.append(process_line(line))
return results
# More efficient version
def process_data_efficient(file_path):
results = []
# Process the file in chunks
with open(file_path, 'r') as f:
for line in f:
results.append(process_line(line.strip()))
return results
Database Cleanup¶
Regular database cleanup prevents performance degradation over time:
# Automated cleanup job
def cleanup_job(db_path, days_to_keep=7):
queue = JobQueue(db_path)
# Calculate cutoff date
import datetime
cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).isoformat()
# Clear completed and cancelled jobs before cutoff
cleared = queue.clear_completed(before_timestamp=cutoff)
# Optimize the database
optimize_database(db_path)
return {"cleared": cleared}
# Schedule this to run periodically
cleanup_job_obj = Job(
name="database_cleanup",
function=cleanup_job,
params={"db_path": "jobs.db", "days_to_keep": 7},
description="Weekly database cleanup"
)
Memory Usage Optimization¶
Memory-Efficient Job Processing¶
For memory-intensive jobs, consider streaming approaches:
# Memory-intensive approach
def analyze_large_file_memory_intensive(file_path):
import pandas as pd
# Load entire dataset into memory
df = pd.read_csv(file_path)
result = df.groupby('category').sum()
return result.to_dict()
# Memory-efficient approach
def analyze_large_file_memory_efficient(file_path):
import csv
from collections import defaultdict
# Process the file in chunks
results = defaultdict(int)
with open(file_path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
category = row['category']
value = float(row['value'])
results[category] += value
return dict(results)
Limiting Job Result Size¶
Large job results can consume significant memory:
# Instead of returning large datasets
def job_with_large_result():
# Process data...
return large_dataset # Could be megabytes of data
# Save results to a file and return the file path
def job_with_efficient_result():
# Process data...
result_path = f"results/job_{uuid.uuid4()}.json"
os.makedirs(os.path.dirname(result_path), exist_ok=True)
with open(result_path, 'w') as f:
json.dump(large_dataset, f)
return {"result_file": result_path}
Benchmarking and Monitoring¶
Job Performance Metrics¶
Track job performance to identify bottlenecks:
def job_with_metrics(params):
import time
# Measure execution time
start_time = time.time()
# Process step 1
step1_start = time.time()
step1_result = process_step1(params)
step1_time = time.time() - step1_start
# Process step 2
step2_start = time.time()
step2_result = process_step2(step1_result)
step2_time = time.time() - step2_start
# Process step 3
step3_start = time.time()
final_result = process_step3(step2_result)
step3_time = time.time() - step3_start
total_time = time.time() - start_time
# Add metrics to the result
metrics = {
"execution_time": total_time,
"steps": {
"step1": step1_time,
"step2": step2_time,
"step3": step3_time
}
}
return {
"result": final_result,
"metrics": metrics
}
Queue Monitoring¶
Monitor queue health to detect performance issues:
def monitor_queue_performance(db_path):
"""Monitor queue performance metrics."""
import time
queue = JobQueue(db_path)
# Get initial counts
pending_count_start = len(queue.list_jobs(status="pending"))
# Submit a benchmark job
benchmark_job = Job(
name="benchmark",
function=lambda: time.sleep(0.1),
params={}
)
# Measure submission time
submit_start = time.time()
job_id = queue.submit(benchmark_job)
submit_time = time.time() - submit_start
# Measure queue processing time
worker = Worker(db_path)
process_start = time.time()
worker.process_one()
process_time = time.time() - process_start
# Get status check time
status_start = time.time()
queue.get_status(job_id)
status_time = time.time() - status_start
# Get final counts
pending_count_end = len(queue.list_jobs(status="pending"))
return {
"submit_time": submit_time,
"process_time": process_time,
"status_time": status_time,
"pending_delta": pending_count_end - pending_count_start
}
Production Deployment Tips¶
Database Journaling Mode¶
Adjust SQLite journaling mode for better performance:
def optimize_sqlite_settings(db_path):
"""Configure SQLite for better performance."""
conn = sqlite3.connect(db_path)
# Use WAL mode for better concurrency
conn.execute("PRAGMA journal_mode = WAL")
# Set cache size (adjust based on available memory)
conn.execute("PRAGMA cache_size = -10000") # ~10MB
# Control how often SQLite syncs to disk
conn.execute("PRAGMA synchronous = NORMAL")
conn.close()
Process Supervision¶
In production, use proper process supervision:
# Example systemd service file for GigQ workers
[Unit]
Description=GigQ Worker
After=network.target
[Service]
User=appuser
WorkingDirectory=/path/to/app
ExecStart=/path/to/python -m gigq.cli --db /path/to/jobs.db worker
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Database Backup¶
Regularly backup your job queue database:
def backup_database(db_path, backup_dir):
"""Create a backup of the job queue database."""
import shutil
import datetime
# Create backup directory if it doesn't exist
os.makedirs(backup_dir, exist_ok=True)
# Generate a timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# Create backup file path
backup_path = os.path.join(backup_dir, f"jobs_backup_{timestamp}.db")
# Copy the database file
shutil.copy2(db_path, backup_path)
# Vacuum the backup to optimize size
conn = sqlite3.connect(backup_path)
conn.execute("VACUUM")
conn.close()
return {"original": db_path, "backup": backup_path}
Load Testing¶
Before deploying to production, load test your GigQ setup:
def load_test_gigq(db_path, num_jobs=1000, num_workers=4, job_type="mixed"):
"""Run a load test for GigQ."""
import time
import random
import multiprocessing
# Create a queue
queue = JobQueue(db_path)
# Define test job functions
def cpu_intensive_job(iterations=1000000):
result = 0
for i in range(iterations):
result += i * i
return {"result": result}
def io_intensive_job(sleep_time=0.1):
time.sleep(sleep_time)
return {"slept_for": sleep_time}
def mixed_job(iterations=10000, sleep_time=0.05):
result = 0
for i in range(iterations):
result += i * i
time.sleep(sleep_time)
return {"result": result, "slept_for": sleep_time}
# Choose job function based on type
if job_type == "cpu":
job_func = cpu_intensive_job
elif job_type == "io":
job_func = io_intensive_job
else:
job_func = mixed_job
# Submit jobs
print(f"Submitting {num_jobs} jobs...")
start_time = time.time()
for i in range(num_jobs):
job = Job(
name=f"load_test_{i}",
function=job_func,
params={"iterations": random.randint(10000, 1000000)} if job_type != "io" else {"sleep_time": random.uniform(0.01, 0.2)},
priority=random.randint(-10, 10)
)
queue.submit(job)
submit_time = time.time() - start_time
print(f"Submitted {num_jobs} jobs in {submit_time:.2f} seconds")
# Start workers
print(f"Starting {num_workers} workers...")
processes = []
for i in range(num_workers):
p = multiprocessing.Process(target=run_worker, args=(db_path, f"worker-{i+1}"))
p.start()
processes.append(p)
# Monitor progress
total_jobs = num_jobs
while True:
time.sleep(5)
pending = len(queue.list_jobs(status="pending"))
running = len(queue.list_jobs(status="running"))
completed = len(queue.list_jobs(status="completed"))
failed = len(queue.list_jobs(status="failed"))
progress = (completed + failed) / total_jobs * 100
print(f"Progress: {progress:.1f}% | Pending: {pending} | Running: {running} | Completed: {completed} | Failed: {failed}")
if pending == 0 and running == 0:
break
# Stop workers
for p in processes:
p.terminate()
# Calculate stats
end_time = time.time()
total_time = end_time - start_time
print(f"\nLoad Test Results:")
print(f"Total time: {total_time:.2f} seconds")
print(f"Average job time: {total_time / total_jobs:.4f} seconds")
print(f"Jobs per second: {total_jobs / total_time:.2f}")
return {
"total_time": total_time,
"jobs_per_second": total_jobs / total_time,
"average_job_time": total_time / total_jobs
}
def run_worker(db_path, worker_id):
"""Run a worker for load testing."""
worker = Worker(db_path, worker_id=worker_id, polling_interval=0.1)
worker.start()
Next Steps¶
Now that you understand how to optimize GigQ's performance, you might want to explore:
- Concurrency - Learn more about GigQ's concurrency model
- SQLite Schema - Understand the database schema for advanced optimization