Workflows¶
Workflows let you define and manage sequences of jobs with dependencies. Jobs run in dependency order, with independent jobs running in parallel when multiple workers are available.
Creating a Workflow¶
from gigq import Workflow, task
@task()
def download(url):
return {"file": "data.csv", "rows": 1000}
@task()
def process(parent_results):
data = next(iter(parent_results.values()))
return {"processed": data["file"], "rows": data["rows"]}
@task()
def analyze(parent_results):
data = next(iter(parent_results.values()))
return {"analysis": f"Processed {data['rows']} rows"}
wf = Workflow("data_processing_pipeline")
step1 = wf.add_task(download, params={"url": "https://example.com/data.csv"})
step2 = wf.add_task(process, depends_on=[step1])
wf.add_task(analyze, depends_on=[step2])
Passing Results Between Steps¶
When job B depends on job A, B can receive A's return value automatically. Declare a parent_results parameter on the dependent task. The worker injects a dict mapping each parent job ID to that job's deserialized result.
from gigq import Workflow, JobQueue, task
@task()
def fetch(url):
return {"url": url, "data": 1} # example payload
@task()
def summarize(parent_results):
# Values from all parent jobs, keyed by parent job id
payloads = list(parent_results.values())
return {"summary": len(payloads)}
wf = Workflow("pipeline")
job1 = wf.add_task(fetch, params={"url": "https://a.example"})
job2 = wf.add_task(fetch, params={"url": "https://b.example"})
job3 = wf.add_task(summarize, depends_on=[job1, job2])
Auto vs explicit: With the default pass_parent_results=None ("auto"), injection happens only if the function accepts parent_results or **kwargs. Set pass_parent_results=False on Workflow.add_task() to disable injection. Set True to always inject when there are dependencies.
Fan-Out Pattern¶
A single job spawns multiple parallel jobs:
@task()
def split_data():
return {"chunks": 5}
@task()
def process_chunk(chunk_id, parent_results):
source = next(iter(parent_results.values()))
return {"chunk_id": chunk_id, "total": source["chunks"]}
wf = Workflow("fan_out")
split = wf.add_task(split_data)
chunks = []
for i in range(5):
chunk = wf.add_task(process_chunk, params={"chunk_id": i}, depends_on=[split])
chunks.append(chunk)
Fan-In Pattern¶
Multiple parallel jobs converge to a single job that receives all parent results:
@task()
def combine(parent_results):
results = list(parent_results.values())
return {"total_chunks": len(results)}
# Continues from fan-out above
wf.add_task(combine, depends_on=chunks)
Together, these create a diamond-shaped workflow:
graph TD
A[Split Data] --> B1[Process Chunk 0]
A --> B2[Process Chunk 1]
A --> B3[Process Chunk 2]
A --> B4[Process Chunk 3]
A --> B5[Process Chunk 4]
B1 --> C[Combine Results]
B2 --> C
B3 --> C
B4 --> C
B5 --> C Multiple Dependencies¶
A job can depend on multiple other jobs:
@task()
def generate_report(parent_results):
# Receives results from both process and analyze steps
return {"report": "done"}
wf = Workflow("multi_dep")
step_a = wf.add_task(process, params={"input": "a"})
step_b = wf.add_task(analyze, params={"input": "b"})
wf.add_task(generate_report, depends_on=[step_a, step_b])
graph TD
A[Process] --> C[Generate Report]
B[Analyze] --> C Submitting a Workflow¶
Once defined, submit all jobs to a queue:
from gigq import JobQueue
queue = JobQueue("workflow_jobs.db")
job_ids = wf.submit_all(queue)
print(f"Submitted {len(job_ids)} jobs")
Workers process jobs in dependency order automatically.
Monitoring Workflow Progress¶
def check_workflow_status(queue, job_ids):
for job_id in job_ids:
status = queue.get_status(job_id)
print(f"{status['name']}: {status['status']}")
Or use the CLI:
Best Practices¶
- Keep jobs atomic — each job should perform a single, well-defined task.
- Set appropriate timeouts — match each job's expected runtime.
- Design for restartability — if a workflow is interrupted, workers resume from where they left off.
- Use meaningful job names — clear names make monitoring and debugging easier.
- Use
parent_resultsfor data passing — let the framework handle serialization rather than writing intermediate files.
Next Steps¶
- Error Handling — how GigQ handles errors in jobs and workflows
- CLI Usage — managing workflows from the command line
- Parallel Tasks example — complete fan-out/fan-in example