packages/tta-dev-primitives/examples/MockPrimitive?)ParallelPrimitive?)Sequential or ParallelWorkflowContext is used for state passingWhen you see these patterns, call them out and suggest refactoring:
RetryPrimitiveTimeoutPrimitiveCachePrimitiveWorkflowContextpip → use uvOptional[T]) → use T | NoneWhen making decisions, prioritize in this order:
str | None, not Optional[str])WorkflowPrimitive[InputType, OutputType]>> (Sequential) and | (Parallel)WorkflowPrimitive for new componentsMockPrimitive from testing/ module@pytest.mark.asyncioParallelPrimitive for independent operationsCachePrimitive to avoid redundant workRetryPrimitive for transient failuresTimeoutPrimitive to prevent hangsFallbackPrimitive for graceful degradationWorkflowContext parametercontext.metadata for correlation IDscontext.state for passing data between stepspackages/tta-dev-primitives/examples/ruff format, ruff check, pyright)uv run pytest -vuv run pytest --cov=packagesuv run ruff format .uv run ruff check . --fixuvx pyright packages/When reviewing code (or suggestions), check in this order:
Any without reasonAlways use uv, never pip directly:
uv sync --all-extrasuv run <command>uv run pytest -vuv pip install -e packages/tta-dev-primitivespackages/tta-dev-primitives/examples/packages/tta-dev-primitives/README.mdpackages/tta-dev-primitives/tests/❌ BAD:
pip install -e packages/tta-dev-primitives
python -m pytest
✅ GOOD:
uv sync --all-extras
uv run pytest -v
❌ BAD:
class MyPrimitive(WorkflowPrimitive):
async def execute(self, input_data, context):
return process(input_data)
✅ GOOD:
class MyPrimitive(WorkflowPrimitive[dict, str]):
async def execute(self, input_data: dict, context: WorkflowContext) -> str:
return process(input_data)
❌ BAD:
# TODO: Add tests later
class NewFeature(WorkflowPrimitive[dict, dict]):
...
✅ GOOD:
class NewFeature(WorkflowPrimitive[dict, dict]):
"""Feature with comprehensive tests."""
...
# In tests/test_new_feature.py
@pytest.mark.asyncio
async def test_new_feature():
mock = MockPrimitive("feature", return_value={"status": "ok"})
...
❌ BAD:
GLOBAL_COUNTER = 0
USER_SESSIONS = {}
async def execute(self, input_data: dict, context: WorkflowContext) -> dict:
global GLOBAL_COUNTER
GLOBAL_COUNTER += 1
return {"count": GLOBAL_COUNTER}
✅ GOOD:
async def execute(self, input_data: dict, context: WorkflowContext) -> dict:
count = context.state.get("counter", 0) + 1
context.state["counter"] = count
return {"count": count}
❌ BAD:
from typing import Optional, Dict, List
def process(data: Optional[Dict[str, List[str]]]) -> Optional[str]:
...
✅ GOOD:
def process(data: dict[str, list[str]] | None) -> str | None:
...
❌ BAD:
async def process_all():
result1 = await step1()
result2 = await step2(result1)
result3 = await step3(result2)
return result3
✅ GOOD:
from tta_dev_primitives import SequentialPrimitive
workflow = step1 >> step2 >> step3
result = await workflow.execute(input_data, context)
❌ BAD:
async def call_api():
max_retries = 3
for attempt in range(max_retries):
try:
return await api_call()
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
✅ GOOD:
from tta_dev_primitives import RetryPrimitive, LambdaPrimitive
api_primitive = LambdaPrimitive(api_call)
retry_api = RetryPrimitive(api_primitive, max_retries=3, backoff_factor=2.0)
result = await retry_api.execute(input_data, context)
❌ BAD:
async def slow_operation():
try:
return await asyncio.wait_for(operation(), timeout=5.0)
except asyncio.TimeoutError:
return {"error": "timeout"}
✅ GOOD:
from tta_dev_primitives import TimeoutPrimitive, LambdaPrimitive
op_primitive = LambdaPrimitive(operation)
timeout_op = TimeoutPrimitive(op_primitive, timeout=5.0)
result = await timeout_op.execute(input_data, context)
❌ BAD:
CACHE = {}
async def get_data(key: str):
if key in CACHE:
return CACHE[key]
result = await expensive_operation(key)
CACHE[key] = result
return result
✅ GOOD:
from tta_dev_primitives import CachePrimitive, LambdaPrimitive
op_primitive = LambdaPrimitive(expensive_operation)
cached_op = CachePrimitive(op_primitive, ttl=3600)
result = await cached_op.execute(key, context)
❌ BAD:
result1 = await operation1()
result2 = await operation2() # Could run in parallel!
result3 = await operation3() # Could run in parallel!
return [result1, result2, result3]
✅ GOOD:
workflow = op1 | op2 | op3 # All run in parallel
results = await workflow.execute(input_data, context)
❌ BAD:
async def execute(self, input_data: dict, context: WorkflowContext) -> dict:
# Losing context!
result = await some_operation(input_data)
return result
✅ GOOD:
async def execute(self, input_data: dict, context: WorkflowContext) -> dict:
# Pass context through
result = await child_primitive.execute(input_data, context)
return result
❌ BAD:
async def execute(self, input_data: dict, context: WorkflowContext) -> dict:
return process(input_data)
✅ GOOD:
async def execute(self, input_data: dict, context: WorkflowContext) -> dict:
"""
Process input with validation.
Args:
input_data: Data to process
context: Workflow context
Returns:
Processed result
Example:
```python
result = await processor.execute({"key": "value"}, context)
```
"""
return process(input_data)
❌ BAD:
"""Process data and return result."""
✅ GOOD:
"""
Process data and return result.
Example:
```python
processor = DataProcessor()
context = WorkflowContext(workflow_id="demo")
result = await processor.execute({"query": "test"}, context)
```
"""
❌ BAD:
@pytest.mark.asyncio
async def test_workflow():
# Using real implementations in tests
workflow = RealStep1() >> RealStep2()
result = await workflow.execute(data, context)
assert result == expected
✅ GOOD:
@pytest.mark.asyncio
async def test_workflow():
# Using mocks for fast, isolated tests
mock1 = MockPrimitive("step1", return_value="result1")
mock2 = MockPrimitive("step2", return_value="result2")
workflow = mock1 >> mock2
result = await workflow.execute(data, context)
assert mock1.call_count == 1
assert result == "result2"
❌ BAD:
@pytest.mark.asyncio
async def test_success():
# Only testing happy path
result = await primitive.execute(valid_data, context)
assert result == expected
✅ GOOD:
@pytest.mark.asyncio
async def test_success():
result = await primitive.execute(valid_data, context)
assert result == expected
@pytest.mark.asyncio
async def test_invalid_input():
with pytest.raises(ValidationError, match="Missing required field"):
await primitive.execute(invalid_data, context)
@pytest.mark.asyncio
async def test_timeout():
slow_mock = MockPrimitive("slow", side_effect=asyncio.TimeoutError())
with pytest.raises(asyncio.TimeoutError):
await slow_mock.execute(data, context)
❌ BAD:
# Make changes, commit directly
git add .
git commit -m "fix stuff"
✅ GOOD:
# Make changes, run quality checks
uv run ruff format .
uv run ruff check . --fix
uvx pyright packages/
uv run pytest -v
git add .
git commit -m "fix: specific description of fix"
❌ BAD:
# Add new feature
git add packages/tta-dev-primitives/src/core/new_feature.py
git commit -m "feat: add new feature"
✅ GOOD:
# Add new feature with tests
git add packages/tta-dev-primitives/src/core/new_feature.py
git add packages/tta-dev-primitives/tests/test_new_feature.py
uv run pytest -v
git commit -m "feat: add new feature with tests"
This is a production library - avoid these patterns to maintain:
examples/README.md - All example workflows../../PHASE3_EXAMPLES_COMPLETE.md - InstrumentedPrimitive pattern guideREADME.md - API documentation../../AGENTS.md - Repository-wide agent instructions../../PRIMITIVES_CATALOG.md - Complete primitive referenceAll Phase 3 examples now use the InstrumentedPrimitive pattern:
examples/rag_workflow.py) - Basic retrieval-augmented generationexamples/agentic_rag_workflow.py) - Production RAG with gradingexamples/cost_tracking_workflow.py) - Token/cost trackingexamples/streaming_workflow.py) - Token-by-token streamingexamples/multi_agent_workflow.py) - Agent coordinationSee PHASE3_EXAMPLES_COMPLETE.md for implementation details.