Production-Ready Patterns Using TTA.dev Primitives
Last Updated: October 30, 2025
This guide provides production-ready patterns for optimizing LLM costs using TTA.dev primitives. Each pattern includes:
| Pattern | Cost Reduction | Complexity | Best For |
|---|---|---|---|
| Cache + Router | 30-50% | Low | High-volume, repetitive queries |
| Fallback (Paid β Free) | 20-40% | Medium | Non-critical workloads |
| Budget-Aware Routing | Variable | High | Cost-sensitive applications |
| Retry with Cost Control | 5-10% | Low | All applications |
| Multi-Model Orchestration | 80-95% | Medium | Orchestrator + executor pattern |
Combined Impact: Using all 5 patterns together can reduce costs by 80-95% while maintaining quality.
Cost Reduction: 30-50% Complexity: Low Best For: Applications with repetitive queries or similar inputs
from tta_dev_primitives.integrations import OpenAIPrimitive, OllamaPrimitive
from tta_dev_primitives.core.routing import RouterPrimitive
from tta_dev_primitives.performance.cache import CachePrimitive
from tta_dev_primitives.core.base import WorkflowContext
# Step 1: Define model primitives
gpt4o = OpenAIPrimitive(model="gpt-4o") # $2.50/1M input, $10/1M output
gpt4o_mini = OpenAIPrimitive(model="gpt-4o-mini") # $0.15/1M input, $0.60/1M output
llama_local = OllamaPrimitive(model="llama3.2:8b") # Free, local
# Step 2: Add caching to expensive models
cached_gpt4o = CachePrimitive(
primitive=gpt4o,
cache_key_fn=lambda data, ctx: f"gpt4o:{data.get('prompt', '')[:100]}",
ttl_seconds=3600, # 1 hour cache
max_size=1000
)
cached_gpt4o_mini = CachePrimitive(
primitive=gpt4o_mini,
cache_key_fn=lambda data, ctx: f"mini:{data.get('prompt', '')[:100]}",
ttl_seconds=3600,
max_size=2000
)
# Step 3: Route based on complexity
def route_by_complexity(data: dict, context: WorkflowContext) -> str:
"""Route to appropriate model based on query complexity."""
prompt = data.get("prompt", "")
# Simple heuristics (customize for your use case)
if len(prompt) < 100:
return "local" # Free
elif len(prompt) < 500:
return "mini" # Cheap
else:
return "premium" # Expensive but high quality
router = RouterPrimitive(
routes={
"local": llama_local, # Free
"mini": cached_gpt4o_mini, # Cheap + cached
"premium": cached_gpt4o # Expensive + cached
},
router_fn=route_by_complexity,
default="mini"
)
# Step 4: Execute
context = WorkflowContext(workflow_id="cost-optimized-workflow")
result = await router.execute({"prompt": "Your query here"}, context)
Before optimization:
After optimization:
New cost:
from tta_dev_primitives.observability import get_enhanced_metrics_collector
collector = get_enhanced_metrics_collector()
# Track cache hit rate
cache_metrics = collector.get_all_metrics("cached_gpt4o")
print(f"Cache hit rate: {cache_metrics['cache_hit_rate']:.1%}")
# Track routing distribution
router_metrics = collector.get_all_metrics("router")
print(f"Route distribution: {router_metrics['route_counts']}")
Cost Reduction: 20-40% Complexity: Medium Best For: Non-critical workloads, graceful degradation scenarios
from tta_dev_primitives.integrations import OpenAIPrimitive, OllamaPrimitive
from tta_dev_primitives.recovery.fallback import FallbackPrimitive
from tta_dev_primitives.core.base import WorkflowContext
# Primary: Paid model (best quality)
primary = OpenAIPrimitive(model="gpt-4o")
# Fallback 1: Cheaper paid model
fallback1 = OpenAIPrimitive(model="gpt-4o-mini")
# Fallback 2: Free local model
fallback2 = OllamaPrimitive(model="llama3.2:8b")
# Create fallback chain
workflow = FallbackPrimitive(
primary=primary,
fallbacks=[fallback1, fallback2]
)
# Execute with automatic fallback
context = WorkflowContext(workflow_id="fallback-workflow")
try:
result = await workflow.execute({"prompt": "Your query"}, context)
print(f"Used model: {result.get('model_used')}")
except Exception as e:
print(f"All models failed: {e}")
class BudgetAwareFallback(FallbackPrimitive):
"""Fallback that considers budget constraints."""
def __init__(self, primary, fallbacks, daily_budget: float = 100.0):
super().__init__(primary, fallbacks)
self.daily_budget = daily_budget
self.daily_spend = 0.0
async def execute(self, input_data, context):
# Check budget before using paid model
if self.daily_spend >= self.daily_budget:
# Budget exceeded, skip to free fallback
context.metadata["budget_exceeded"] = True
return await self.fallbacks[-1].execute(input_data, context)
# Normal fallback logic
result = await super().execute(input_data, context)
# Track spending
cost = result.get("cost", 0.0)
self.daily_spend += cost
return result
# Usage
workflow = BudgetAwareFallback(
primary=OpenAIPrimitive(model="gpt-4o"),
fallbacks=[
OpenAIPrimitive(model="gpt-4o-mini"),
OllamaPrimitive(model="llama3.2:8b")
],
daily_budget=50.0 # $50/day limit
)
Scenario: 10K requests/day, 20% failure rate on primary
Before:
After:
Cost Reduction: Variable (depends on budget) Complexity: High Best For: Cost-sensitive applications with strict budget constraints
from tta_dev_primitives.core.routing import RouterPrimitive
from tta_dev_primitives.core.base import WorkflowContext
from tta_dev_primitives.integrations import OpenAIPrimitive, OllamaPrimitive
from datetime import datetime, timedelta
class BudgetTracker:
"""Track daily spending and enforce budget limits."""
def __init__(self, daily_budget: float):
self.daily_budget = daily_budget
self.daily_spend = 0.0
self.last_reset = datetime.now()
def reset_if_new_day(self):
"""Reset spending counter at midnight."""
now = datetime.now()
if now.date() > self.last_reset.date():
self.daily_spend = 0.0
self.last_reset = now
def record_cost(self, cost: float):
"""Record a cost."""
self.reset_if_new_day()
self.daily_spend += cost
def get_remaining_budget(self) -> float:
"""Get remaining budget for today."""
self.reset_if_new_day()
return max(0.0, self.daily_budget - self.daily_spend)
def get_budget_utilization(self) -> float:
"""Get budget utilization (0.0 to 1.0)."""
self.reset_if_new_day()
return min(1.0, self.daily_spend / self.daily_budget)
# Initialize budget tracker
budget_tracker = BudgetTracker(daily_budget=100.0) # $100/day
# Define models
gpt4o = OpenAIPrimitive(model="gpt-4o")
gpt4o_mini = OpenAIPrimitive(model="gpt-4o-mini")
llama_local = OllamaPrimitive(model="llama3.2:8b")
def budget_aware_router(data: dict, context: WorkflowContext) -> str:
"""Route based on remaining budget."""
utilization = budget_tracker.get_budget_utilization()
if utilization < 0.5:
# <50% budget used: Use premium model
return "premium"
elif utilization < 0.8:
# 50-80% budget used: Use mid-tier model
return "mid"
else:
# >80% budget used: Use free model
return "free"
router = RouterPrimitive(
routes={
"premium": gpt4o,
"mid": gpt4o_mini,
"free": llama_local
},
router_fn=budget_aware_router,
default="free"
)
# Execute and track costs
context = WorkflowContext(workflow_id="budget-aware")
result = await router.execute({"prompt": "Your query"}, context)
# Record cost
cost = result.get("cost", 0.0)
budget_tracker.record_cost(cost)
print(f"Budget utilization: {budget_tracker.get_budget_utilization():.1%}")
print(f"Remaining budget: ${budget_tracker.get_remaining_budget():.2f}")
def print_budget_dashboard(tracker: BudgetTracker):
"""Print budget status dashboard."""
utilization = tracker.get_budget_utilization()
remaining = tracker.get_remaining_budget()
print("=" * 50)
print("BUDGET DASHBOARD")
print("=" * 50)
print(f"Daily Budget: ${tracker.daily_budget:.2f}")
print(f"Spent Today: ${tracker.daily_spend:.2f}")
print(f"Remaining: ${remaining:.2f}")
print(f"Utilization: {utilization:.1%}")
print("=" * 50)
if utilization > 0.9:
print("β οΈ WARNING: Budget almost exhausted!")
elif utilization > 0.7:
print("β οΈ CAUTION: Budget 70% consumed")
else:
print("β
Budget healthy")
Cost Reduction: 5-10% Complexity: Low Best For: All applications (prevents wasted API calls)
from tta_dev_primitives.recovery.retry import RetryPrimitive
from tta_dev_primitives.integrations import OpenAIPrimitive
# Create retry wrapper
workflow = RetryPrimitive(
primitive=OpenAIPrimitive(model="gpt-4o"),
max_retries=3,
backoff_strategy="exponential",
initial_delay=1.0, # Start with 1s delay
max_delay=30.0, # Cap at 30s
retry_on_exceptions=(TimeoutError, ConnectionError) # Only retry transient errors
)
# Execute with automatic retry
context = WorkflowContext(workflow_id="retry-workflow")
result = await workflow.execute({"prompt": "Your query"}, context)
Before:
After:
Cost Reduction: 80-95% Complexity: Medium Best For: Applications where an orchestrator can delegate tasks to specialized models
Multi-model orchestration uses a high-quality orchestrator model (e.g., Claude Sonnet 4.5) to analyze tasks and delegate execution to appropriate free flagship models. The orchestrator handles planning and validation (small token usage), while free models handle bulk execution (large token usage).
Key Insight: Most AI applications spend 80%+ of tokens on execution, not planning. By delegating execution to free models, you can achieve 80-95% cost reduction while maintaining quality.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Orchestrator Model β
β (Claude Sonnet 4.5 - Paid) β
β β
β 1. Analyze task requirements β
β 2. Classify complexity β
β 3. Select best executor model β
β 4. Validate output quality β
ββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββ¬ββββββββββββββ¬βββββββββββββββ
βΌ βΌ βΌ βΌ
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Gemini Pro β β DeepSeek R1 β β Groq Llama β β HuggingFace β
β (FREE) β β (FREE) β β (FREE) β β (FREE) β
β β β β β β β β
β General β β Complex β β Ultra-fast β β Model β
β Purpose β β Reasoning β β Inference β β Variety β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
Step 1: Create Task Classifier
from tta_dev_primitives.orchestration import TaskClassifierPrimitive
from tta_dev_primitives.core.base import WorkflowContext
# Create classifier
classifier = TaskClassifierPrimitive(prefer_free=True)
# Classify task
context = WorkflowContext(workflow_id="classify-demo")
request = TaskClassifierRequest(
task_description="Summarize this article in 3 bullet points",
user_preferences={"prefer_free": True}
)
classification = await classifier.execute(request, context)
print(f"Recommended: {classification.recommended_model}")
print(f"Reasoning: {classification.reasoning}")
print(f"Cost: ${classification.estimated_cost}")
Step 2: Create Delegation Primitive
from tta_dev_primitives.orchestration import DelegationPrimitive
from tta_dev_primitives.integrations import (
GoogleAIStudioPrimitive,
GroqPrimitive,
OpenRouterPrimitive
)
# Create delegation primitive with executors
delegation = DelegationPrimitive(
executor_primitives={
"gemini-2.5-pro": GoogleAIStudioPrimitive(model="gemini-2.5-pro"),
"llama-3.3-70b-versatile": GroqPrimitive(model="llama-3.3-70b-versatile"),
"deepseek/deepseek-r1:free": OpenRouterPrimitive(model="deepseek/deepseek-r1:free")
}
)
# Delegate task
request = DelegationRequest(
task_description="Summarize article",
executor_model="gemini-2.5-pro",
messages=[{"role": "user", "content": "Summarize: [article text]"}]
)
response = await delegation.execute(request, context)
print(f"Executor: {response.executor_model}")
print(f"Cost: ${response.cost}") # $0.00 (FREE!)
Step 3: Create Multi-Model Workflow
from tta_dev_primitives.orchestration import MultiModelWorkflow
# Create workflow with automatic routing
workflow = MultiModelWorkflow(
executor_primitives={
"gemini-2.5-pro": GoogleAIStudioPrimitive(),
"llama-3.3-70b-versatile": GroqPrimitive(),
"deepseek/deepseek-r1:free": OpenRouterPrimitive()
},
prefer_free=True
)
# Execute task (automatic classification + delegation)
request = MultiModelRequest(
task_description="Analyze renewable energy benefits",
messages=[{"role": "user", "content": "Analyze: [content]"}],
user_preferences={"prefer_free": True},
validate_output=True
)
response = await workflow.execute(request, context)
print(f"Executor: {response.executor_model}")
print(f"Complexity: {response.classification['complexity']}")
print(f"Cost: ${response.cost}")
print(f"Validation: {'Passed' if response.validation_passed else 'Failed'}")
Use Case: Research, analysis, content generation
# Claude's role: Analyze requirements and create execution plan
claude_plan = """
Task: Research renewable energy from 3 perspectives
Sub-tasks:
1. Environmental benefits β Gemini Pro
2. Economic impact β DeepSeek R1
3. Technical challenges β Groq (Llama 3.3 70B)
"""
# Execute sub-tasks in parallel with free models
tasks = [
DelegationRequest(
task_description="Environmental benefits",
executor_model="gemini-2.5-pro",
messages=[{"role": "user", "content": "Explain environmental benefits..."}]
),
DelegationRequest(
task_description="Economic impact",
executor_model="deepseek/deepseek-r1:free",
messages=[{"role": "user", "content": "Analyze economic impact..."}]
),
DelegationRequest(
task_description="Technical challenges",
executor_model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": "Describe technical challenges..."}]
)
]
# Execute in parallel
responses = await asyncio.gather(*[delegation.execute(task, context) for task in tasks])
# Claude's role: Aggregate and validate results
# Total cost: ~$0.01 (Claude planning) + $0.00 (free execution) = $0.01
# vs. $0.50 (Claude for everything) = 98% savings
Use Case: Quality assurance, fact-checking
# Free model executes
gemini_response = await delegation.execute(
DelegationRequest(
task_description="Generate summary",
executor_model="gemini-2.5-pro",
messages=[{"role": "user", "content": "Summarize: [content]"}]
),
context
)
# Claude validates (small token usage)
validation_prompt = f"""
Validate this summary for accuracy and completeness:
{gemini_response.content}
Original content: [content]
"""
# Cost: $0.00 (Gemini) + $0.005 (Claude validation) = $0.005
# vs. $0.05 (Claude for everything) = 90% savings
Scenario: AI content generation tool (10K requests/day)
| Approach | Model | Tokens/Request | Cost/1M Tokens | Daily Cost | Monthly Cost |
|---|---|---|---|---|---|
| All Claude | Claude Sonnet 4.5 | 2,000 | $3.00 | $60.00 | $1,800 |
| All Gemini | Gemini Pro (free) | 2,000 | $0.00 | $0.00 | $0.00 |
| Orchestration | Claude (plan) + Gemini (execute) | 200 + 1,800 | $3.00 + $0.00 | $6.00 | $180 |
Cost Savings:
β Use When:
β Donβt Use When:
Track these metrics for orchestration workflows:
# Add to observability
context.data["orchestration_metrics"] = {
"orchestrator_tokens": 200,
"executor_tokens": 1800,
"orchestrator_cost": 0.006,
"executor_cost": 0.0,
"total_cost": 0.006,
"cost_savings_vs_all_paid": 0.90, # 90% savings
"executor_model": "gemini-2.5-pro",
"classification": "moderate"
}
Issue: Orchestrator overhead too high
Issue: Free model quality insufficient
Issue: Latency increased
Scenario: Code completion tool with 50K requests/day
Requirements:
Solution:
from tta_dev_primitives.integrations import OpenAIPrimitive, OllamaPrimitive
from tta_dev_primitives.core.routing import RouterPrimitive
from tta_dev_primitives.performance.cache import CachePrimitive
from tta_dev_primitives.recovery.retry import RetryPrimitive
# Step 1: Cache layer (40% hit rate expected)
cached_gpt4o_mini = CachePrimitive(
primitive=OpenAIPrimitive(model="gpt-4o-mini"),
cache_key_fn=lambda data, ctx: f"code:{data.get('code_context', '')[:200]}",
ttl_seconds=1800, # 30 min cache (code changes frequently)
max_size=5000
)
# Step 2: Local model for simple completions
llama_code = OllamaPrimitive(model="codellama:7b")
# Step 3: Router (route simple to local, complex to cloud)
def route_code_completion(data: dict, context):
code_length = len(data.get("code_context", ""))
complexity = data.get("complexity", "simple")
if complexity == "simple" or code_length < 100:
return "local" # Free, fast
else:
return "cloud" # Cached, high quality
router = RouterPrimitive(
routes={
"local": llama_code,
"cloud": cached_gpt4o_mini
},
router_fn=route_code_completion,
default="local"
)
# Step 4: Retry wrapper (prevent wasted calls)
workflow = RetryPrimitive(
primitive=router,
max_retries=2,
backoff_strategy="exponential"
)
Results:
Scenario: Customer support with 10K conversations/day
Requirements:
Solution:
from tta_dev_primitives.integrations import OpenAIPrimitive, AnthropicPrimitive
from tta_dev_primitives.recovery.fallback import FallbackPrimitive
from tta_dev_primitives.performance.cache import CachePrimitive
# Primary: Claude Sonnet (best for customer support)
primary = CachePrimitive(
primitive=AnthropicPrimitive(model="claude-3-5-sonnet-20241022"),
cache_key_fn=lambda data, ctx: f"support:{data.get('question', '')[:100]}",
ttl_seconds=7200, # 2 hour cache (FAQs repeat)
max_size=2000
)
# Fallback: GPT-4o-mini (cheaper, still good quality)
fallback = OpenAIPrimitive(model="gpt-4o-mini")
workflow = FallbackPrimitive(
primary=primary,
fallbacks=[fallback]
)
Results:
Optimization:
# Add budget-aware routing
def support_router(data: dict, context):
# Route simple questions to GPT-4o-mini
question = data.get("question", "").lower()
simple_keywords = ["hours", "location", "contact", "price"]
if any(kw in question for kw in simple_keywords):
return "simple" # Use cheaper model
else:
return "complex" # Use premium model
router = RouterPrimitive(
routes={
"simple": OpenAIPrimitive(model="gpt-4o-mini"),
"complex": primary # Cached Claude
},
router_fn=support_router,
default="simple"
)
New Results:
Problem: User reports Gemini unexpectedly downgrading from Pro to Flash before hitting rate limits
Root Cause: Googleβs API may downgrade based on:
Solution:
from tta_dev_primitives.integrations import GoogleGeminiPrimitive
from tta_dev_primitives.recovery.timeout import TimeoutPrimitive
from tta_dev_primitives.recovery.retry import RetryPrimitive
# Step 1: Add timeout to prevent long-running requests
gemini_pro = TimeoutPrimitive(
primitive=GoogleGeminiPrimitive(model="gemini-2.5-pro"),
timeout_seconds=30.0 # Prevent runaway context usage
)
# Step 2: Add retry with backoff (avoid concurrent request spikes)
workflow = RetryPrimitive(
primitive=gemini_pro,
max_retries=3,
backoff_strategy="exponential",
initial_delay=2.0 # Spread out requests
)
# Step 3: Monitor token usage
from tta_dev_primitives.observability import get_enhanced_metrics_collector
collector = get_enhanced_metrics_collector()
async def execute_with_monitoring(prompt: str):
context = WorkflowContext(workflow_id="gemini-monitor")
result = await workflow.execute({"prompt": prompt}, context)
# Check token usage
metrics = collector.get_all_metrics("gemini_pro")
tokens_used = metrics.get("total_tokens", 0)
if tokens_used > 1_000_000: # Approaching limit
print("β οΈ WARNING: High token usage, consider switching to Flash")
return result
Monitoring Script:
import asyncio
from datetime import datetime, timedelta
class GeminiUsageTracker:
"""Track Gemini usage to prevent unexpected downgrades."""
def __init__(self):
self.hourly_tokens = 0
self.hourly_requests = 0
self.last_reset = datetime.now()
def reset_if_new_hour(self):
now = datetime.now()
if now - self.last_reset > timedelta(hours=1):
self.hourly_tokens = 0
self.hourly_requests = 0
self.last_reset = now
def record_request(self, tokens: int):
self.reset_if_new_hour()
self.hourly_tokens += tokens
self.hourly_requests += 1
def should_throttle(self) -> bool:
"""Check if we should throttle requests."""
self.reset_if_new_hour()
# Gemini Pro limits (approximate)
MAX_TOKENS_PER_HOUR = 500_000
MAX_REQUESTS_PER_HOUR = 1000
if self.hourly_tokens > MAX_TOKENS_PER_HOUR * 0.8:
return True # 80% of token limit
if self.hourly_requests > MAX_REQUESTS_PER_HOUR * 0.8:
return True # 80% of request limit
return False
# Usage
tracker = GeminiUsageTracker()
async def safe_gemini_call(prompt: str):
if tracker.should_throttle():
print("β οΈ Throttling: Switching to Gemini Flash")
# Use Flash instead
flash = GoogleGeminiPrimitive(model="gemini-2.5-flash")
result = await flash.execute({"prompt": prompt}, context)
else:
# Use Pro
result = await workflow.execute({"prompt": prompt}, context)
tracker.record_request(result.get("tokens_used", 0))
return result
from tta_dev_primitives.observability import get_enhanced_metrics_collector
collector = get_enhanced_metrics_collector()
# 1. Cost metrics
cost_metrics = collector.get_all_metrics("llm_workflow")
print(f"Total cost: ${cost_metrics['cost']['total_cost']:.2f}")
print(f"Cost savings: ${cost_metrics['cost']['total_savings']:.2f}")
# 2. Cache hit rate
cache_metrics = collector.get_all_metrics("cached_llm")
print(f"Cache hit rate: {cache_metrics['cache_hit_rate']:.1%}")
# 3. Route distribution
router_metrics = collector.get_all_metrics("router")
print(f"Route counts: {router_metrics['route_counts']}")
# 4. Error rate
error_rate = 1 - (cost_metrics['successful_requests'] / cost_metrics['total_requests'])
print(f"Error rate: {error_rate:.1%}")
def check_alerts(metrics: dict):
"""Check metrics and trigger alerts."""
# Alert 1: High cost
if metrics['cost']['total_cost'] > 100.0: # $100/day
send_alert("π° COST ALERT: Daily spend exceeds $100")
# Alert 2: Low cache hit rate
if metrics.get('cache_hit_rate', 0) < 0.2: # <20%
send_alert("π CACHE ALERT: Hit rate below 20%")
# Alert 3: High error rate
error_rate = 1 - (metrics['successful_requests'] / metrics['total_requests'])
if error_rate > 0.1: # >10%
send_alert("β οΈ ERROR ALERT: Error rate above 10%")
# Alert 4: Budget exceeded
if metrics['cost']['total_cost'] > metrics.get('daily_budget', float('inf')):
send_alert("π¨ BUDGET ALERT: Daily budget exceeded!")
def send_alert(message: str):
"""Send alert (implement your notification method)."""
print(f"ALERT: {message}")
# TODO: Send to Slack, email, PagerDuty, etc.
Symptoms: Cache hit rate <20%, not seeing expected cost savings
Causes:
Solutions:
# β Bad: Cache key includes timestamp
cache_key_fn=lambda data, ctx: f"{data['prompt']}:{datetime.now()}"
# β
Good: Cache key only includes stable data
cache_key_fn=lambda data, ctx: f"{data['prompt'][:200]}"
# β Bad: TTL too short
ttl_seconds=60 # 1 minute
# β
Good: TTL matches usage pattern
ttl_seconds=3600 # 1 hour for typical queries
# β Bad: Max size too small
max_size=10
# β
Good: Max size based on memory budget
max_size=1000 # ~10MB for typical LLM responses
Symptoms: Gemini Pro requests return Flash-quality responses
Causes:
Solutions:
# Solution 1: Add request throttling
import asyncio
class RequestThrottler:
def __init__(self, max_concurrent: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute(self, primitive, data, context):
async with self.semaphore:
return await primitive.execute(data, context)
throttler = RequestThrottler(max_concurrent=5)
# Solution 2: Monitor token usage
tracker = GeminiUsageTracker() # See Example 3 above
# Solution 3: Explicit model selection
gemini_pro = GoogleGeminiPrimitive(
model="gemini-2.5-pro",
# Explicitly set model in every request
model_kwargs={"model": "gemini-2.5-pro"}
)
Symptoms: Costs higher than expected, budget alerts firing
Causes:
Solutions:
# Solution 1: Add retry limits
workflow = RetryPrimitive(
primitive=expensive_llm,
max_retries=3, # Limit retries
backoff_strategy="exponential"
)
# Solution 2: Add budget tracking
budget_tracker = BudgetTracker(daily_budget=100.0)
# Solution 3: Test routing logic
def test_routing():
test_cases = [
({"prompt": "short"}, "local"),
({"prompt": "a" * 500}, "mini"),
({"prompt": "a" * 1000}, "premium")
]
for data, expected_route in test_cases:
actual_route = route_by_complexity(data, WorkflowContext())
assert actual_route == expected_route, f"Expected {expected_route}, got {actual_route}"
print("β
Routing logic tests passed")
test_routing()
packages/tta-dev-primitives/src/tta_dev_primitives/performance/cache.pypackages/tta-dev-primitives/src/tta_dev_primitives/core/routing.pypackages/tta-dev-primitives/src/tta_dev_primitives/recovery/fallback.pypackages/tta-dev-primitives/src/tta_dev_primitives/recovery/retry.pyLast Updated: October 30, 2025 For: Production AI Applications Maintained by: TTA.dev Team
π‘ Pro Tip: Combine all 4 patterns for maximum cost savings (50-70% reduction typical)