Skip to content

Parallel Tasks

Minimal copy-paste-friendly example showing the @task decorator and Worker(concurrency=N). Zero external dependencies.

Run it:

python examples/parallel_tasks.py

Task Definitions

from gigq import task

@task(timeout=30, max_attempts=2)
def hash_block(block_id, rounds=200_000):
    """Hash random data repeatedly to simulate a small workload."""
    data = os.urandom(64)
    for _ in range(rounds):
        data = hashlib.sha256(data).digest()
    return {"block_id": block_id, "sha256": data.hex()}

@task(timeout=10)
def summarise(total_blocks):
    """Runs only after all hash jobs complete."""
    return {"total_blocks": total_blocks, "status": "done"}

Batch Submit + Concurrent Worker

from gigq import JobQueue, Worker

queue = JobQueue("jobs.db")

for i in range(16):
    hash_block.submit(queue, block_id=i)

worker = Worker("jobs.db", concurrency=4)
worker.start()

Workflow with Dependencies

Use Workflow.add_task() to build a fan-out/fan-in pipeline:

from gigq import Workflow

wf = Workflow("hash_pipeline")

hash_jobs = []
for i in range(8):
    j = wf.add_task(hash_block, params={"block_id": i})
    hash_jobs.append(j)

wf.add_task(summarise, params={"total_blocks": 8}, depends_on=hash_jobs)
wf.submit_all(queue)
graph TD
    H0[hash_block 0] --> S[summarise]
    H1[hash_block 1] --> S
    H2[hash_block 2] --> S
    H3[hash_block 3] --> S
    H4[hash_block 4] --> S
    H5[hash_block 5] --> S
    H6[hash_block 6] --> S
    H7[hash_block 7] --> S

Sample Output

Submitted 16 jobs
  16/16 completed in 1.3s (concurrency=4)
  block 0: fe9d08fc8ad3dbb974d795cec3e71059...
  block 1: 82775a31e01522b44e29b40fa1245ab8...
  block 2: f819d25b694185354f86ad310646fd29...

Submitted workflow: 8 hash jobs → 1 summary
  hash_block      completed
  hash_block      completed
  hash_block      completed
  hash_block      completed
  hash_block      completed
  hash_block      completed
  hash_block      completed
  hash_block      completed
  summarise       completed

Done.

Source

Full runnable script: examples/parallel_tasks.py

For the full-featured demo with sequential-vs-parallel comparison, speedup chart, and crash recovery, see Hyperparameter Tuning.


Last update: March 16, 2026