Production-ready development primitives for building TTA agents and workflows. This package provides composable patterns, recovery strategies, performance utilities, and observability tools for development automation.
Note: These are development tools for building TTA, not player-facing game components.
# Install from local package
uv pip install -e packages/tta-dev-primitives
# Install with all extras
uv pip install -e "packages/tta-dev-primitives[dev,tracing,apm]"
from tta_dev_primitives import Sequential, Parallel, Router, WorkflowPrimitive
# Sequential workflow
workflow = Sequential([
load_data,
process_data,
save_results
])
result = await workflow.execute({"input": "data"})
# Parallel execution
parallel = Parallel([
fetch_user_data,
fetch_analytics,
fetch_recommendations
])
results = await parallel.execute({"user_id": 123})
# Dynamic routing with cost optimization
router = Router({
"fast": gpt4_mini,
"balanced": gpt4,
"quality": gpt4_turbo
})
response = await router.execute({"tier": "balanced", "prompt": "..."})
from tta_dev_primitives import Retry, Fallback, Timeout, Saga
# Retry with exponential backoff
@Retry(max_attempts=3, backoff_factor=2.0)
async def flaky_api_call():
return await external_api.fetch()
# Fallback strategy
workflow = Fallback(
primary=expensive_model,
fallback=cheap_model
)
# Timeout enforcement
@Timeout(seconds=5.0)
async def long_running_task():
return await process_data()
# Saga compensation pattern
saga = Saga()
saga.add_step(create_user, rollback=delete_user)
saga.add_step(send_email, rollback=send_cancellation)
await saga.execute({"user_data": {...}})
from tta_dev_primitives import cached
# LRU cache with TTL
@cached(max_size=1000, ttl=3600)
async def expensive_computation(input_data: str) -> dict:
# Expensive operation here
return result
# Check cache stats
stats = expensive_computation.cache_stats()
print(f"Hit rate: {stats.hit_rate:.2%}")
from tta_dev_primitives import get_logger, track_metrics, trace_operation
# Structured logging
logger = get_logger(__name__)
logger.info("Processing request", user_id=123, request_id="abc")
# Metrics tracking
@track_metrics(name="api_latency")
async def api_call():
return await external_service.call()
# Distributed tracing
@trace_operation(span_name="data_processing")
async def process_pipeline(data):
# Automatic span creation and context propagation
return await transform(data)
The examples/ directory contains 5 validated, production-ready workflows demonstrating key patterns:
| Example | Pattern | Features | Use When |
|---|---|---|---|
| rag_workflow.py | RAG Pipeline | Caching, Fallback, Retry, Sequential | Building document retrieval systems |
| agentic_rag_workflow.py | Agentic RAG | Router, Grading, Validation, Hallucination Detection | Production RAG with quality control |
| cost_tracking_workflow.py | Cost Management | Budget Enforcement, Per-Model Tracking | Managing LLM API costs |
| streaming_workflow.py | Token Streaming | AsyncIterator, Buffering, Metrics | Real-time response streaming |
| multi_agent_workflow.py | Multi-Agent | Coordinator, Parallel Specialists, Aggregation | Complex agent orchestration |
Agentic RAG (Production Pattern):
# Complete RAG pipeline with quality controls
workflow = (
QueryRouterPrimitive() >> # Route simple vs complex
VectorstoreRetrieverPrimitive() >> # Cached retrieval
DocumentGraderPrimitive() >> # Filter irrelevant docs
AnswerGeneratorPrimitive() >> # Generate response
AnswerGraderPrimitive() >> # Validate quality
HallucinationGraderPrimitive() # Detect hallucinations
)
Multi-Agent Coordination:
# Decompose task and execute with specialist agents
workflow = (
CoordinatorAgentPrimitive() >> # Analyze and plan
ParallelPrimitive([ # Execute in parallel
DataAnalystAgentPrimitive(),
ResearcherAgentPrimitive(),
FactCheckerAgentPrimitive(),
SummarizerAgentPrimitive()
]) >>
AggregatorAgentPrimitive() # Combine results
)
Cost Tracking:
# Track and enforce budget across LLM calls
cost_tracker = CostTrackingPrimitive(llm_primitive)
enforcer = BudgetEnforcementPrimitive(
cost_tracker,
budget_usd=10.0
)
# Automatic cost reporting
report = await enforcer.get_cost_report()
All examples follow the InstrumentedPrimitive pattern with:
Detailed Guide: See PHASE3_EXAMPLES_COMPLETE.md for complete implementation details, test results, and pattern documentation.
tta-dev-primitives/
βββ src/tta_dev_primitives/
β βββ core/ # Workflow primitives
β β βββ base.py # Base classes and context
β β βββ sequential.py # Sequential execution
β β βββ parallel.py # Parallel execution
β β βββ conditional.py # Conditional branching
β β βββ routing.py # Dynamic routing
β βββ recovery/ # Recovery patterns
β β βββ retry.py # Retry logic
β β βββ fallback.py # Fallback strategies
β β βββ timeout.py # Timeout enforcement
β β βββ compensation.py # Saga pattern
β βββ performance/ # Performance utilities
β β βββ cache.py # LRU cache with TTL
β βββ observability/ # Observability tools
β β βββ logging.py # Structured logging
β β βββ metrics.py # Metrics tracking
β β βββ tracing.py # Distributed tracing
β βββ testing/ # Testing utilities
β β βββ mocks.py # Mock primitives
β βββ apm/ # APM integration
β βββ decorators.py # APM decorators
β βββ instrumented.py # Instrumented primitives
β βββ setup.py # APM setup
βββ tests/ # 95 comprehensive tests
β βββ unit/ # 77 unit tests for all primitives
β βββ observability/ # Observability instrumentation tests
β βββ integration/ # 18 integration tests with real backends
βββ examples/ # Usage examples
βββ scripts/ # Helper scripts (integration-test-env.sh)
βββ docker-compose.integration.yml # Integration test environment
βββ pyproject.toml # Package configuration
βββ apm.yml # APM metadata
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=src --cov-report=html
# Run specific test module
uv run pytest tests/test_cache.py -v
The package includes comprehensive integration tests that verify observability instrumentation works correctly with real OpenTelemetry backends (Jaeger, Prometheus, Grafana, OpenTelemetry Collector).
# Start integration test environment
cd packages/tta-dev-primitives
./scripts/integration-test-env.sh start
# Run integration tests
uv run pytest tests/integration/ -v
# Stop services when done
./scripts/integration-test-env.sh stop
Once started, the following services are available:
| Service | URL | Purpose |
|---|---|---|
| Jaeger UI | http://localhost:16686 | Distributed tracing visualization |
| Prometheus | http://localhost:9090 | Metrics collection and querying |
| Grafana | http://localhost:3000 | Dashboards and visualization (admin/admin) |
| OTLP Collector | http://localhost:4317 (gRPC) http://localhost:4318 (HTTP) |
OpenTelemetry data collection |
Tests in tests/integration/test_otel_backend_integration.py verify that all workflow primitives create proper spans with correlation IDs:
# Run OpenTelemetry integration tests
uv run pytest tests/integration/test_otel_backend_integration.py -v
Coverage:
Tests in tests/integration/test_prometheus_metrics.py verify the metrics collection pipeline:
# Run Prometheus integration tests
uv run pytest tests/integration/test_prometheus_metrics.py -v
Coverage:
tta-dev-primitivesworkflow.correlation_id=<your-correlation-id>tta-dev-primitivesprimitive.SequentialPrimitivetta-dev-primitiveserror=trueotelcol_process_uptime{job="otel-collector"}
otelcol_exporter_sent_spans{job="otel-collector"}
up{job=~"prometheus|otel-collector|tta-primitives"}
# Check if ports are already in use
lsof -i :9090 # Prometheus
lsof -i :16686 # Jaeger
lsof -i :3000 # Grafana
# Stop any conflicting services
docker ps | grep -E "prometheus|jaeger|grafana|otel"
docker stop <container-id>
# Verify services are running
docker ps | grep tta-
# Check service health
curl http://localhost:9090/-/healthy # Prometheus
curl http://localhost:16686/ # Jaeger
# Restart services
./scripts/integration-test-env.sh stop
./scripts/integration-test-env.sh start
docker logs tta-otel-collector
workflow.correlation_id tagcurl http://localhost:8889/metrics
curl http://localhost:9090/api/v1/status/config
The following integration testing tasks are planned for future implementation:
# Install development dependencies
uv sync --all-extras
# Format code
uv run ruff format .
# Lint code
uv run ruff check . --fix
# Type check
uv run mypy src/
This package includes Agent Package Manager (APM) metadata for MCP compatibility:
# apm.yml
name: tta-dev-primitives
version: 0.1.0
type: library
category: development-tools
Proprietary - TTA Storytelling Platform
tta-ai-framework: AI components for TTA (separate - for game components)tta-narrative-engine: Narrative generation (separate - for game components)This package is specifically for development automation, not player-facing features.