Build reliable AI applications with production-ready primitives and patterns.
TTA.dev is a collection of battle-tested components for building AI-native applications. Every component has:
# Install with pip
pip install tta-dev-primitives
# Or with uv (recommended)
uv pip install tta-dev-primitives
from tta_dev_primitives import (
CachePrimitive,
RouterPrimitive,
RetryPrimitive,
WorkflowContext
)
# Define your processing function
async def process_with_llm(data: dict, context: WorkflowContext) -> dict:
# Your LLM call here
return {"result": "processed"}
# Compose workflow with operators
workflow = (
CachePrimitive(ttl=3600) >> # Cache for 1 hour
RouterPrimitive(tier="balanced") >> # Smart model selection
RetryPrimitive(max_attempts=3) >> # Retry on failure
process_with_llm
)
# Execute
context = WorkflowContext(trace_id="request-123")
result = await workflow.execute({"input": "Hello"}, context)
Your workflow now has:
Small, composable building blocks for workflows:
Combine primitives using operators:
# Sequential: Execute in order
workflow = step1 >> step2 >> step3
# Parallel: Execute concurrently
workflow = step1 | step2 | step3
# Conditional: Branch based on data
workflow = router >> (fast_path if simple else complex_path)
Every execution has context for tracing and correlation:
context = WorkflowContext(
trace_id="abc-123",
correlation_id="request-456",
metadata={"user_id": "user123"}
)
from tta_dev_primitives import CachePrimitive, RouterPrimitive
async def analyze_text(text: str) -> dict:
workflow = (
CachePrimitive(ttl=3600) >>
RouterPrimitive(tier="balanced") >>
llm_analyzer
)
return await workflow.execute(
{"text": text},
WorkflowContext()
)
from tta_dev_primitives import RetryPrimitive, TimeoutPrimitive, FallbackPrimitive
workflow = (
TimeoutPrimitive(seconds=10) >>
RetryPrimitive(max_attempts=3, backoff_factor=2.0) >>
FallbackPrimitive(
primary=expensive_api,
fallback=cheap_api
)
)
from tta_dev_primitives import ParallelPrimitive
# Fetch data from multiple sources concurrently
workflow = ParallelPrimitive([
fetch_user_profile,
fetch_recommendations,
fetch_analytics
])
results = await workflow.execute({"user_id": 123}, context)
# Cache reduces redundant LLM calls by 30-40%
cache = CachePrimitive(
ttl=3600, # 1 hour
max_size=1000, # Max 1000 entries
context_aware=True # Include context in cache key
)
# Route to appropriate model based on complexity
router = RouterPrimitive(
tier="fast", # Use cheaper, faster model
# tier="balanced" # Balance cost and quality
# tier="quality" # Use best model for hard tasks
)
from opentelemetry import trace
from tta_dev_primitives import WorkflowContext
# Context automatically propagates traces
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("my_operation") as span:
context = WorkflowContext(
trace_id=span.get_span_context().trace_id
)
result = await workflow.execute(data, context)
import logging
logger = logging.getLogger(__name__)
# Context provides correlation IDs
logger.info(
"Workflow completed",
extra={
"trace_id": context.trace_id,
"duration_ms": duration,
"cache_hit": True
}
)
from tta_dev_primitives.testing import MockPrimitive, create_test_context
async def test_my_workflow():
# Use mocks for testing
mock_llm = MockPrimitive(
response={"result": "test response"}
)
workflow = cache >> mock_llm >> processor
context = create_test_context(trace_id="test-123")
result = await workflow.execute({"input": "test"}, context)
assert result["result"] == "processed test response"
assert mock_llm.call_count == 1
Start here! 5 validated, working examples ready to run:
| Example | What It Shows | Use When |
|---|---|---|
| RAG Workflow | Caching + Fallback + Retry | Building document retrieval systems |
| Agentic RAG | Router + Grading + Validation | Production RAG with quality controls |
| Cost Tracking | Budget Enforcement + Metrics | Managing LLM API costs |
| Streaming | AsyncIterator + Buffering | Real-time response streaming |
| Multi-Agent | Coordinator + Parallel Execution | Complex agent orchestration |
Quick Start:
# Run any example
uv run python packages/tta-dev-primitives/examples/rag_workflow.py
# Or explore all examples
ls packages/tta-dev-primitives/examples/
Implementation Guide: PHASE3_EXAMPLES_COMPLETE.md - Comprehensive documentation including:
More patterns in the examples directory:
docs/ directorypackages/tta-dev-primitives/examples/Create your own primitives:
from tta_dev_primitives import WorkflowPrimitive, WorkflowContext
class CustomPrimitive(WorkflowPrimitive):
"""Your custom primitive."""
async def _execute(
self,
data: dict,
context: WorkflowContext
) -> dict:
# Your implementation
return processed_data
Tips for optimal performance:
Before deploying:
Every component is battle-tested and production-ready:
Build complex workflows from simple primitives:
Understand what’s happening:
Interested in contributing? Check out:
Ready to build? Start with the quick start above or explore the examples.