Designing Async Validation Queues with Celery
Designing async validation queues with Celery requires decoupling spatial data ingestion from rule execution, routing tasks by geometry complexity, and enforcing strict idempotency so compliance checks can be retried safely. Celery’s distributed task queue handles backpressure, scales workers independently, and integrates with message brokers to process heavy spatial operations—topology validation, CRS alignment, attribute schema enforcement—without blocking upstream pipelines. The core pattern treats spatial validation as a stateful, retry-aware workflow where payloads reference cloud storage paths rather than embedding large WKB/GeoJSON blobs directly in broker messages.
Queue Topology & Spatial Routing
Spatial validation introduces constraints that generic ETL pipelines rarely encounter. Geometries frequently exceed default message broker size limits (typically 256KB–1MB), coordinate reference systems must be normalized before checks, and topology rules often require full-dataset context rather than row-by-row processing. Celery addresses these through explicit routing, task chunking, and shared state backends.
Route tasks based on computational weight:
fastqueue: Lightweight attribute checks, schema validation, bounding-box verification. Low memory footprint, high concurrency.heavyqueue: CRS transformation, geometry simplification, coordinate precision normalization. Moderate memory, bounded concurrency.topologyqueue: Network connectivity, sliver polygon detection, adjacency/overlap rules. High memory, dedicated worker pools, longer timeouts.
Always serialize spatial payloads as file URIs (S3, GCS, or mounted NFS paths) or immutable dataset fingerprints. Passing raw GeoJSON or WKB through Redis/RabbitMQ causes broker fragmentation and OOM crashes. Within a mature Validation Pipeline Architecture, async workers sit between raw data landing zones and certified data stores. By offloading computationally intensive spatial checks to background workers, you maintain responsive ingestion APIs while guaranteeing that every feature passes validation before publication.
Idempotency & Fault Tolerance
Compliance-heavy environments require deterministic execution. Rerunning a validation against the same dataset and rule version must return identical results without duplicating work or generating conflicting reports. Celery supports this through deterministic task IDs and late acknowledgment.
- Deterministic IDs: Generate a SHA-256 hash from the source file URI + rule version + target CRS. Pass it as
task_idwhen dispatching. Celery’s result backend will cache the output, making subsequent calls instant. - Late Acknowledgment: Set
acks_late=Trueso the broker only marks a task as complete after your function returns successfully. If a worker crashes mid-validation, the message remains unacknowledged and routes to another worker. Pair this withreject_on_worker_lost=Truefor automatic requeueing. See the official Celery task reliability documentation for broker-specific behavior. - Exponential Backoff: Use
autoretry_for=(Exception,)withretry_backoff=Trueandretry_jitter=Trueto handle transient network failures, cloud storage throttling, or temporary database locks.
Production Implementation
The following snippet demonstrates a production-ready Celery task configured for spatial topology validation. It includes queue routing, deterministic ID generation, exponential backoff, and real-time progress tracking.
# celery_app.py
import hashlib
import json
import logging
from pathlib import Path
from typing import Dict, Any
from celery import Celery
from celery.exceptions import Ignore
app = Celery(
"spatial_validator",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
# Production routing configuration
app.conf.update(
task_routes={
"spatial_validator.tasks.*": {"queue": "topology"},
},
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1,
broker_connection_retry_on_startup=True,
)
@app.task(
bind=True,
name="validate_topology",
acks_late=True,
reject_on_worker_lost=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
max_retries=5,
queue="topology",
)
def validate_topology(self, dataset_uri: str, rule_version: str, target_crs: str = "EPSG:4326") -> Dict[str, Any]:
"""
Validates spatial topology for a dataset stored at dataset_uri.
Returns a compliance report with error counts and geometry fingerprints.
"""
logger = logging.getLogger(__name__)
# 1. Generate deterministic task ID for idempotency
payload_hash = hashlib.sha256(f"{dataset_uri}:{rule_version}:{target_crs}".encode()).hexdigest()
self.request.id = payload_hash # Overrides auto-generated UUID
# 2. Check result backend cache (Celery handles this automatically if backend is configured)
# If the task was already run with this ID, Celery returns the cached result.
# 3. Download/prepare dataset (mocked for brevity)
logger.info(f"Fetching dataset from {dataset_uri} for CRS {target_crs}")
# In production: use fsspec, boto3, or gcsfs to stream to a local temp dir
# 4. Progress tracking
total_features = 100_000 # Replace with actual feature count from metadata
self.update_state(state="PROGRESS", meta={"current": 0, "total": total_features, "phase": "loading"})
# 5. Execute validation loop (chunked for memory safety)
errors = []
chunk_size = 5_000
for chunk_idx in range(0, total_features, chunk_size):
current = min(chunk_idx + chunk_size, total_features)
self.update_state(state="PROGRESS", meta={
"current": current,
"total": total_features,
"phase": f"validating_chunk_{chunk_idx // chunk_size}"
})
# Mock spatial topology checks
# In production: use shapely, pygeos, or gdal.OGR for topology validation
# Check self-reversing, self-intersecting, or invalid ring orientations
pass
# 6. Finalize report
report = {
"status": "passed" if not errors else "failed",
"dataset_uri": dataset_uri,
"rule_version": rule_version,
"target_crs": target_crs,
"error_count": len(errors),
"errors": errors[:100], # Cap payload size
"fingerprint": payload_hash,
}
return report
Operational Scaling & Monitoring
Routing alone doesn’t guarantee throughput. Spatial workers require explicit resource boundaries and observability hooks.
- Memory Limits & OOM Prevention: Topology checks often load full geometries into RAM. Configure
worker_max_tasks_per_child=50to recycle workers and prevent memory leaks. Useulimitor container cgroups to enforce hard memory caps per worker process. - Chunking & Context Windows: Full-dataset topology rules (e.g., “no overlapping parcels”) cannot run row-by-row. Partition datasets into spatial tiles (e.g., 1km² grids) using
geopandas.sjoinor PostGISST_Within. Process tiles in parallel, then merge boundary checks in a final aggregation task. - CRS Normalization: Always validate and transform coordinates before rule execution. Mixed CRS inputs cause silent topology failures. Use
pyprojorGDALwith strict transformation grids to avoid datum shifts. This aligns with broader Asynchronous Validation Workflows patterns where coordinate standardization precedes business-logic checks. - Observability: Export Celery metrics to Prometheus via
celery-prometheus-exporter. Track queue depth, task latency, retry rates, and worker memory. Alert whentopologyqueue depth exceeds worker capacity or when retry rates spike above 5%.
For routing configuration and advanced broker tuning, consult the official Celery routing documentation. Properly configured, async queues transform spatial validation from a pipeline bottleneck into a scalable, auditable compliance layer.