Enterprise-grade best practices for optimal AI-powered development with GitHub Models
This comprehensive guide covers proven strategies, optimization techniques, and enterprise-ready patterns for maximizing the effectiveness of Aurelis in production environments.
# 1. Initialize Aurelis configuration
aurelis init
# 2. Verify GitHub token
aurelis auth status
# 3. Test model connectivity
aurelis models
# 4. Create project-specific configuration
cp .aurelis.yaml .aurelis.dev.yaml
# Create shared configuration template
cat > .aurelis.template.yaml << EOF
github_token: "\${GITHUB_TOKEN}"
models:
primary: "codestral-2501"
fallback: "gpt-4o-mini"
analysis:
max_file_size: "1MB"
chunk_size: 3500
security:
audit_logging: true
secure_token_storage: true
EOF
# Team members copy and customize
cp .aurelis.template.yaml .aurelis.yaml
# Check system health
aurelis health
# Review overnight logs (if applicable)
aurelis logs --since="yesterday"
# Update models availability
aurelis models --refresh
# 1. Analyze existing code
aurelis analyze src/
# 2. Generate new components
aurelis generate "Create user authentication service"
# 3. Review and refine
aurelis review --file=generated_code.py
# 4. Run tests
aurelis test --generate-missing
# Clear temporary caches
aurelis cache clear --temporary
# Export usage analytics
aurelis analytics export --format=json
# Backup configuration
cp .aurelis.yaml .aurelis.backup.yaml
# Let Aurelis choose the optimal model
from aurelis.models import ModelRequest, TaskType
# Code generation - automatically uses Codestral-2501
request = ModelRequest(
prompt="Create a REST API endpoint for user registration",
task_type=TaskType.CODE_GENERATION
)
# Documentation - automatically uses Cohere Command-R
request = ModelRequest(
prompt="Document this authentication service",
task_type=TaskType.DOCUMENTATION
)
# Force specific model for specialized tasks
request = ModelRequest(
prompt="Optimize this database query for PostgreSQL",
model_type=ModelType.CODESTRAL_2501, # Best for code optimization
task_type=TaskType.CODE_OPTIMIZATION
)
Task Type | Primary Model | Fallback | Use Case | Performance |
---|---|---|---|---|
Code Generation | Codestral-2501 | GPT-4o | Production code, APIs | βββββ |
Code Optimization | Codestral-2501 | Llama 70B | Performance tuning | βββββ |
Documentation | Command-R | GPT-4o-mini | Technical writing | ββββ |
Complex Reasoning | GPT-4o | Llama 405B | Architecture decisions | βββββ |
Quick Tasks | GPT-4o-mini | Mistral Nemo | Simple operations | ββββ |
Enterprise Apps | Mistral Large | GPT-4o | Production systems | βββββ |
def select_model_for_context(file_size: int, complexity: str, urgency: str) -> ModelType:
"""Intelligent model selection based on context."""
if urgency == "high" and file_size < 1000:
return ModelType.GPT_4O_MINI # Fast for urgent small tasks
if complexity == "high":
return ModelType.GPT_4O # Complex reasoning capability
if file_size > 10000:
return ModelType.CODESTRAL_2501 # Best for large codebases
return None # Let Aurelis auto-select
# .aurelis.yaml
cache:
enabled: true
ttl: 3600 # 1 hour for most responses
max_size: 1000
strategy: "lru"
# Task-specific TTL
task_ttl:
documentation: 7200 # 2 hours (docs change less)
code_generation: 1800 # 30 minutes (code changes often)
explanation: 3600 # 1 hour (explanations stable)
# Include relevant context in cache keys
request = ModelRequest(
prompt="Optimize this function",
task_type=TaskType.CODE_OPTIMIZATION,
metadata={
"language": "python",
"framework": "fastapi",
"version": "3.11",
"performance_target": "high"
}
)
from aurelis.models import get_model_orchestrator
async def process_multiple_files(file_paths: List[str]) -> List[str]:
"""Process multiple files concurrently."""
orchestrator = get_model_orchestrator()
# Create batch requests
requests = [
ModelRequest(
prompt=f"Analyze and improve this code:\n{read_file(path)}",
task_type=TaskType.CODE_OPTIMIZATION,
metadata={"file_path": path}
)
for path in file_paths
]
# Process in parallel with rate limiting
responses = await orchestrator.batch_request(
requests,
max_concurrent=5 # Respect API limits
)
return [response.content for response in responses]
# .aurelis.yaml
processing:
concurrent_requests: 5 # Conservative for stability
request_interval: 0.1 # 100ms between requests
max_retries: 3
timeout: 60
# Burst handling
burst_limit: 10
burst_window: 60 # seconds
# β
Good: Concise and specific
request = ModelRequest(
prompt="Create Python function: validate email, return boolean",
max_tokens=200 # Limit response length
)
# β Bad: Verbose and unfocused
request = ModelRequest(
prompt="""I need help creating a function. This function should be written
in Python and it should validate email addresses. It should return True
if the email is valid and False if it's not valid. Please make sure it
handles edge cases..."""
)
# Set appropriate token limits based on task
TASK_TOKEN_LIMITS = {
TaskType.CODE_GENERATION: 2000,
TaskType.DOCUMENTATION: 1500,
TaskType.EXPLANATION: 1000,
TaskType.CODE_OPTIMIZATION: 1500,
TaskType.TESTING: 1000
}
request = ModelRequest(
prompt=prompt,
task_type=task_type,
max_tokens=TASK_TOKEN_LIMITS.get(task_type, 1000)
)
# Use environment variables (recommended)
export GITHUB_TOKEN="ghp_your_token_here"
# For enterprise: Use system keyring
aurelis config set security.token_storage keyring
# Monthly token rotation script
#!/bin/bash
# rotate_token.sh
echo "Generating new GitHub token..."
# Manual step: Generate new token on GitHub
echo "Updating environment..."
export GITHUB_TOKEN_NEW="ghp_new_token_here"
echo "Testing new token..."
GITHUB_TOKEN=$GITHUB_TOKEN_NEW aurelis models
if [ $? -eq 0 ]; then
echo "New token verified. Updating configuration..."
export GITHUB_TOKEN=$GITHUB_TOKEN_NEW
echo "Token rotation complete."
else
echo "New token verification failed. Keeping old token."
fi
# .aurelis.yaml
security:
audit_logging: true
audit_level: "detailed"
logging:
handlers:
- type: "file"
path: "logs/aurelis_audit.log"
level: "INFO"
- type: "syslog"
facility: "local0"
level: "WARNING"
# What to log
log_requests: true
log_responses: false # Never log response content
log_tokens: true
log_performance: true
# .aurelis.yaml
security:
compliance_mode: true
data_retention_days: 90
anonymize_logs: true
encryption_at_rest: true
# Data classification
classify_requests: true
redact_sensitive: true
audit_data_flow: true
# .aurelis.yaml
network:
enforce_https: true
verify_ssl: true
timeout: 30
# Proxy configuration (if required)
proxy:
http: "http://proxy.company.com:8080"
https: "https://proxy.company.com:8080"
no_proxy: "localhost,127.0.0.1"
# Track usage per project/team
class UsageTracker:
def __init__(self):
self.usage_data = {}
async def track_request(self, request: ModelRequest, response: ModelResponse):
"""Track token usage and costs."""
project = request.metadata.get("project", "unknown")
if project not in self.usage_data:
self.usage_data[project] = {
"requests": 0,
"tokens": 0,
"cost": 0.0
}
self.usage_data[project]["requests"] += 1
self.usage_data[project]["tokens"] += response.token_usage.get("total_tokens", 0)
self.usage_data[project]["cost"] += self.calculate_cost(response)
def calculate_cost(self, response: ModelResponse) -> float:
"""Calculate estimated cost based on model and tokens."""
MODEL_COSTS = {
ModelType.GPT_4O: 0.00003, # per token
ModelType.GPT_4O_MINI: 0.000001,
ModelType.CODESTRAL_2501: 0.000002,
# Add other models...
}
cost_per_token = MODEL_COSTS.get(response.model_used, 0.000001)
return response.token_usage.get("total_tokens", 0) * cost_per_token
# .aurelis.yaml
cost_management:
daily_budget: 50.00 # USD
weekly_budget: 300.00
monthly_budget: 1000.00
alerts:
- threshold: 80 # percentage
action: "warn"
- threshold: 95
action: "throttle"
- threshold: 100
action: "stop"
# Use cheaper models for appropriate tasks
COST_OPTIMIZED_ROUTING = {
"simple_docs": ModelType.GPT_4O_MINI,
"quick_fixes": ModelType.MISTRAL_NEMO,
"complex_arch": ModelType.GPT_4O, # Worth the cost
"bulk_generation": ModelType.CODESTRAL_2501
}
def select_cost_effective_model(task_complexity: str) -> ModelType:
"""Select model based on cost-effectiveness."""
return COST_OPTIMIZED_ROUTING.get(task_complexity, ModelType.GPT_4O_MINI)
# Aggressive caching for repeated patterns
request = ModelRequest(
prompt="Standard CRUD operations for User entity",
task_type=TaskType.CODE_GENERATION,
metadata={
"cache_duration": 24 * 3600, # 24 hours
"cache_tags": ["crud", "user", "standard"]
}
)
async def robust_code_generation(prompt: str) -> str:
"""Generate code with multiple fallback strategies."""
orchestrator = get_model_orchestrator()
# Strategy 1: Primary model
try:
request = ModelRequest(
prompt=prompt,
task_type=TaskType.CODE_GENERATION,
model_type=ModelType.CODESTRAL_2501
)
response = await orchestrator.send_request(request)
return response.content
except ModelError as e:
logger.warning(f"Primary model failed: {e}")
# Strategy 2: Fallback model
try:
request.model_type = ModelType.GPT_4O
response = await orchestrator.send_request(request)
return response.content
except ModelError as e:
logger.error(f"Fallback model failed: {e}")
# Strategy 3: Simplified prompt
simplified_prompt = f"Generate basic {prompt.split()[-1]} function"
request.prompt = simplified_prompt
request.model_type = ModelType.GPT_4O_MINI
try:
response = await orchestrator.send_request(request)
return f"# Simplified version due to errors:\n{response.content}"
except Exception as e:
logger.error(f"All strategies failed: {e}")
return f"# Error: Unable to generate code for: {prompt}"
class ModelCircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = {}
self.last_failure_time = {}
self.state = {} # "closed", "open", "half_open"
async def call_with_breaker(self, model_type: ModelType, request: ModelRequest):
"""Call model with circuit breaker protection."""
if self.is_open(model_type):
if self.should_attempt_reset(model_type):
self.state[model_type] = "half_open"
else:
raise ModelError(f"Circuit breaker open for {model_type}")
try:
response = await self.orchestrator.send_request(request)
self.on_success(model_type)
return response
except ModelError as e:
self.on_failure(model_type)
raise e
def on_success(self, model_type: ModelType):
"""Reset circuit breaker on successful call."""
self.failure_count[model_type] = 0
self.state[model_type] = "closed"
def on_failure(self, model_type: ModelType):
"""Record failure and potentially open circuit."""
self.failure_count[model_type] = self.failure_count.get(model_type, 0) + 1
self.last_failure_time[model_type] = time.time()
if self.failure_count[model_type] >= self.failure_threshold:
self.state[model_type] = "open"
import asyncio
from typing import TypeVar, Callable, Any
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[[], T],
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
) -> T:
"""Retry function with exponential backoff."""
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise e
delay = min(base_delay * (exponential_base ** attempt), max_delay)
logger.warning(
f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.1f} seconds..."
)
await asyncio.sleep(delay)
raise RuntimeError("Maximum retries exceeded")
# β
Pattern 1: Specific requirements
prompt = """
Create a Python function with these exact requirements:
- Function name: validate_user_input
- Parameters: user_data (dict), schema (dict)
- Returns: tuple (is_valid: bool, errors: list)
- Include type hints and docstring
- Handle missing keys and type mismatches
"""
# β
Pattern 2: Context-aware
prompt = f"""
Optimize this {language} function for {framework}:
{existing_code}
Focus on:
- Performance improvements
- Memory usage reduction
- {specific_optimization_target}
"""
# β
Pattern 3: Incremental improvement
prompt = f"""
Current code:
{current_implementation}
Failing tests:
{test_failures}
Fix the failing tests while maintaining existing functionality.
"""
# β Vague requests
prompt = "Make this code better"
# β Too many requirements
prompt = """Create a function that handles users and authentication and
authorization and logging and error handling and caching and..."""
# β No context
prompt = "Fix this bug" # Without showing the bug or code
async def automated_code_review(file_path: str) -> Dict[str, Any]:
"""Perform automated code review using Aurelis."""
code = read_file(file_path)
language = detect_language(file_path)
# Security review
security_request = ModelRequest(
prompt=f"""
Security review for {language} code:
{code}
Check for:
- SQL injection vulnerabilities
- Cross-site scripting (XSS)
- Authentication bypasses
- Data validation issues
- Sensitive data exposure
""",
task_type=TaskType.ANALYSIS,
model_type=ModelType.GPT_4O # Use best model for security
)
# Performance review
performance_request = ModelRequest(
prompt=f"""
Performance analysis for {language} code:
{code}
Identify:
- Algorithmic inefficiencies
- Memory usage issues
- Database query optimization
- Caching opportunities
""",
task_type=TaskType.CODE_OPTIMIZATION,
model_type=ModelType.CODESTRAL_2501
)
# Code style review
style_request = ModelRequest(
prompt=f"""
Code style review for {language}:
{code}
Check against:
- PEP 8 (Python) / language standards
- Naming conventions
- Documentation completeness
- Code organization
""",
task_type=TaskType.ANALYSIS,
model_type=ModelType.COHERE_COMMAND_R
)
# Execute reviews in parallel
responses = await asyncio.gather(
orchestrator.send_request(security_request),
orchestrator.send_request(performance_request),
orchestrator.send_request(style_request)
)
return {
"security": responses[0].content,
"performance": responses[1].content,
"style": responses[2].content,
"overall_score": calculate_score(responses)
}
async def generate_comprehensive_tests(function_code: str) -> str:
"""Generate comprehensive test suite."""
request = ModelRequest(
prompt=f"""
Generate comprehensive tests for this function:
{function_code}
Include:
- Unit tests for normal cases
- Edge case tests
- Error condition tests
- Performance tests (if applicable)
- Integration tests (if applicable)
Use pytest framework with appropriate fixtures.
""",
task_type=TaskType.TESTING,
model_type=ModelType.CODESTRAL_2501,
temperature=0.1 # Deterministic for tests
)
response = await orchestrator.send_request(request)
return response.content
# .aurelis.team.yaml
team:
name: "Backend Development Team"
lead: "[email protected]"
models:
primary: "codestral-2501"
fallback: "gpt-4o-mini"
standards:
code_style: "pep8"
test_framework: "pytest"
documentation_style: "google"
quotas:
daily_tokens: 50000
monthly_budget: 500.00
workflows:
code_review: true
auto_documentation: true
test_generation: true
# .aurelis.yaml
access_control:
roles:
developer:
models: ["codestral-2501", "gpt-4o-mini"]
daily_limit: 10000
features: ["generation", "analysis"]
senior_developer:
models: ["all"]
daily_limit: 25000
features: ["generation", "analysis", "optimization"]
architect:
models: ["all"]
daily_limit: 50000
features: ["all"]
# prompts/team_prompts.py
TEAM_PROMPTS = {
"api_endpoint": """
Create a {framework} API endpoint with:
- Path: {path}
- Method: {method}
- Authentication: JWT
- Validation: Pydantic models
- Error handling: Custom exceptions
- Logging: Structured logging
- Tests: Unit and integration
""",
"database_model": """
Create a {orm} model for {entity}:
- Table name: {table_name}
- Fields: {fields}
- Relationships: {relationships}
- Constraints: {constraints}
- Indexes: {indexes}
- Migration script included
""",
"documentation": """
Document this {component_type}:
{code}
Follow our documentation standards:
- Clear purpose statement
- Parameter descriptions with types
- Usage examples
- Common pitfalls
- Related components
"""
}
def get_team_prompt(prompt_type: str, **kwargs) -> str:
"""Get standardized team prompt."""
template = TEAM_PROMPTS.get(prompt_type)
if not template:
raise ValueError(f"Unknown prompt type: {prompt_type}")
return template.format(**kwargs)
#!/bin/bash
# .git/hooks/pre-commit
echo "Running Aurelis code review..."
# Get changed files
changed_files=$(git diff --cached --name-only --diff-filter=ACM | grep '\.py$')
if [ -n "$changed_files" ]; then
for file in $changed_files; do
echo "Reviewing $file..."
# Quick security check
aurelis analyze --security --file="$file"
if [ $? -ne 0 ]; then
echo "Security issues found in $file. Commit blocked."
exit 1
fi
done
fi
echo "Code review passed."
# .github/workflows/aurelis-review.yml
name: Aurelis Code Review
on:
pull_request:
types: [opened, synchronize]
jobs:
aurelis-review:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Aurelis
run: |
pip install aurelis-cli
export GITHUB_TOKEN=$
- name: Run Aurelis Review
run: |
aurelis review \
--files=$(git diff --name-only origin/main...HEAD) \
--format=github-comment \
--output=review_results.md
- name: Post Review Comment
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const review = fs.readFileSync('review_results.md', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: review
});
# .aurelis.production.yaml
environment: "production"
security:
token_storage: "keyring"
audit_logging: true
compliance_mode: true
encryption_at_rest: true
processing:
concurrent_requests: 10
timeout: 45
max_retries: 5
circuit_breaker: true
monitoring:
metrics_enabled: true
health_checks: true
performance_tracking: true
logging:
level: "INFO"
structured: true
correlation_ids: true
cache:
enabled: true
backend: "redis"
ttl: 1800
max_size: 10000
from fastapi import FastAPI
from aurelis.models import get_model_orchestrator
app = FastAPI()
@app.get("/health")
async def health_check():
"""Comprehensive health check endpoint."""
orchestrator = get_model_orchestrator()
health_status = orchestrator.health_check()
# Additional checks
health_status.update({
"cache": check_cache_health(),
"token": check_token_validity(),
"network": check_network_connectivity()
})
overall_healthy = all(
status.get("status") == "healthy"
for status in health_status.values()
if isinstance(status, dict)
)
return {
"status": "healthy" if overall_healthy else "unhealthy",
"timestamp": datetime.now().isoformat(),
"details": health_status
}
@app.get("/metrics")
async def metrics():
"""Prometheus-compatible metrics endpoint."""
orchestrator = get_model_orchestrator()
stats = orchestrator.get_model_stats()
# Convert to Prometheus format
metrics = []
metrics.append(f"aurelis_requests_total {stats.get('total_requests', 0)}")
metrics.append(f"aurelis_tokens_total {stats.get('total_tokens', 0)}")
metrics.append(f"aurelis_errors_total {stats.get('total_errors', 0)}")
return "\n".join(metrics)
For single-instance usage, focus on:
class AurelisMetrics:
def __init__(self):
self.request_count = Counter()
self.response_time = Histogram()
self.token_usage = Counter()
self.error_count = Counter()
self.cache_hits = Counter()
def record_request(self, response: ModelResponse):
"""Record metrics for a model request."""
# Request counting
self.request_count.labels(
model=response.model_used,
task_type=response.task_type,
cached=response.cached
).inc()
# Response time
self.response_time.labels(
model=response.model_used
).observe(response.processing_time)
# Token usage
self.token_usage.labels(
model=response.model_used,
type="total"
).inc(response.token_usage.get("total_tokens", 0))
# Cache performance
if response.cached:
self.cache_hits.labels(
model=response.model_used
).inc()
def record_error(self, error: Exception, model_type: str):
"""Record error metrics."""
self.error_count.labels(
model=model_type,
error_type=type(error).__name__
).inc()
class AnalyticsDashboard:
def __init__(self):
self.db = get_analytics_db()
def get_usage_summary(self, period: str = "7d") -> Dict[str, Any]:
"""Get usage summary for specified period."""
return {
"total_requests": self.get_request_count(period),
"total_tokens": self.get_token_usage(period),
"average_response_time": self.get_avg_response_time(period),
"error_rate": self.get_error_rate(period),
"cache_hit_rate": self.get_cache_hit_rate(period),
"cost_estimate": self.estimate_costs(period),
"top_models": self.get_top_models(period),
"peak_usage_hours": self.get_peak_hours(period)
}
def get_model_performance(self) -> Dict[str, Any]:
"""Get performance metrics by model."""
models = {}
for model_type in ModelType:
models[model_type.value] = {
"avg_response_time": self.get_model_avg_time(model_type),
"success_rate": self.get_model_success_rate(model_type),
"token_efficiency": self.get_token_efficiency(model_type),
"cost_per_request": self.get_cost_per_request(model_type)
}
return models
def generate_report(self, format: str = "html") -> str:
"""Generate comprehensive analytics report."""
data = {
"summary": self.get_usage_summary(),
"model_performance": self.get_model_performance(),
"trends": self.get_usage_trends(),
"recommendations": self.get_optimization_recommendations()
}
if format == "html":
return self.render_html_report(data)
elif format == "pdf":
return self.render_pdf_report(data)
else:
return json.dumps(data, indent=2)
# .aurelis.yaml
alerting:
enabled: true
thresholds:
error_rate: 5 # percentage
response_time: 30 # seconds
token_usage: 100000 # daily
cost: 100 # daily USD
channels:
- type: "email"
recipients: ["[email protected]"]
events: ["error_spike", "cost_threshold"]
- type: "slack"
webhook: "${SLACK_WEBHOOK_URL}"
events: ["all"]
- type: "pagerduty"
service_key: "${PAGERDUTY_KEY}"
events: ["critical"]
class AlertManager:
def __init__(self):
self.config = get_config()
self.handlers = self._setup_handlers()
async def check_thresholds(self):
"""Check all monitoring thresholds."""
current_metrics = self.get_current_metrics()
# Error rate check
if current_metrics["error_rate"] > self.config.alerting.thresholds.error_rate:
await self.send_alert(
level="warning",
message=f"Error rate {current_metrics['error_rate']}% exceeds threshold",
metrics=current_metrics
)
# Response time check
if current_metrics["avg_response_time"] > self.config.alerting.thresholds.response_time:
await self.send_alert(
level="warning",
message=f"Response time {current_metrics['avg_response_time']}s exceeds threshold",
metrics=current_metrics
)
# Cost check
if current_metrics["daily_cost"] > self.config.alerting.thresholds.cost:
await self.send_alert(
level="critical",
message=f"Daily cost ${current_metrics['daily_cost']} exceeds budget",
metrics=current_metrics
)
async def send_alert(self, level: str, message: str, metrics: Dict[str, Any]):
"""Send alert through configured channels."""
alert_data = {
"timestamp": datetime.now().isoformat(),
"level": level,
"message": message,
"metrics": metrics,
"service": "aurelis"
}
for handler in self.handlers:
if handler.should_handle(level):
await handler.send(alert_data)
from aurelis.models import ModelProvider
class CustomModelProvider(ModelProvider):
"""Custom model provider for specialized use cases."""
def __init__(self, provider_name: str, endpoint: str):
self.provider_name = provider_name
self.endpoint = endpoint
super().__init__()
async def send_request(self, request: ModelRequest) -> ModelResponse:
"""Send request to custom model provider."""
# Custom implementation
response = await self.call_custom_api(request)
return ModelResponse(
model_type=request.model_type,
task_type=request.task_type,
content=response.content,
confidence=response.confidence,
token_usage=response.token_usage,
processing_time=response.processing_time
)
def get_supported_models(self) -> List[ModelType]:
"""Get list of supported models."""
return [ModelType.CUSTOM_MODEL]
# Register custom provider
from aurelis.core.registry import register_provider
register_provider("custom", CustomModelProvider)
class MultiLevelCache:
def __init__(self):
self.memory_cache = LRUCache(maxsize=1000)
self.redis_cache = redis.Redis()
self.persistent_cache = get_database()
async def get(self, key: str) -> Optional[ModelResponse]:
"""Get from cache with fallback layers."""
# Level 1: Memory cache (fastest)
response = self.memory_cache.get(key)
if response:
return response
# Level 2: Redis cache (fast)
cached_data = await self.redis_cache.get(key)
if cached_data:
response = ModelResponse.from_json(cached_data)
self.memory_cache[key] = response
return response
# Level 3: Persistent cache (slower but durable)
response = await self.persistent_cache.get_cached_response(key)
if response:
await self.redis_cache.setex(key, 3600, response.to_json())
self.memory_cache[key] = response
return response
return None
async def set(self, key: str, response: ModelResponse, ttl: int = 3600):
"""Set cache at all levels."""
# Memory cache
self.memory_cache[key] = response
# Redis cache
await self.redis_cache.setex(key, ttl, response.to_json())
# Persistent cache (for long-term storage)
if ttl > 3600: # Only persist long-lived cache entries
await self.persistent_cache.store_cached_response(key, response, ttl)
class WorkflowEngine:
def __init__(self):
self.orchestrator = get_model_orchestrator()
self.workflows = {}
def register_workflow(self, name: str, workflow: 'Workflow'):
"""Register a custom workflow."""
self.workflows[name] = workflow
async def execute_workflow(self, name: str, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a registered workflow."""
workflow = self.workflows.get(name)
if not workflow:
raise ValueError(f"Unknown workflow: {name}")
return await workflow.execute(input_data, self.orchestrator)
class CodeReviewWorkflow:
"""Complete code review workflow."""
async def execute(self, input_data: Dict[str, Any], orchestrator) -> Dict[str, Any]:
code = input_data["code"]
language = input_data.get("language", "python")
# Step 1: Security analysis
security_analysis = await self._analyze_security(code, language, orchestrator)
# Step 2: Performance analysis
performance_analysis = await self._analyze_performance(code, language, orchestrator)
# Step 3: Style analysis
style_analysis = await self._analyze_style(code, language, orchestrator)
# Step 4: Generate improvement suggestions
improvements = await self._generate_improvements(
code, security_analysis, performance_analysis, style_analysis, orchestrator
)
# Step 5: Generate test cases
test_cases = await self._generate_tests(code, language, orchestrator)
return {
"security": security_analysis,
"performance": performance_analysis,
"style": style_analysis,
"improvements": improvements,
"test_cases": test_cases,
"overall_score": self._calculate_score(security_analysis, performance_analysis, style_analysis)
}
# Register workflow
workflow_engine = WorkflowEngine()
workflow_engine.register_workflow("code_review", CodeReviewWorkflow())
Last Updated: December 2024
Version: 2.0.0
Author: Gamecooler19 (Lead Developer at Kanopus)
Aurelis - Where AI meets enterprise code development