This comprehensive guide covers integration with external systems, custom development, and best practices for extending the Aurora AI framework. With 27 integrated systems and 74 API endpoints, Aurora provides extensive integration capabilities.
# Basic API integration example
import requests
# Aurora AI API base URL
AURORA_API_BASE = "http://localhost:8080"
# System health check
def check_system_health():
response = requests.get(f"{AURORA_API_BASE}/api/status")
return response.json()
# Data validation integration
def validate_external_data(data):
response = requests.post(
f"{AURORA_API_BASE}/api/validation/schema",
json={"schema_type": "json_schema", "data": data}
)
return response.json()
# Model training integration
def trigger_model_training(config):
response = requests.post(
f"{AURORA_API_BASE}/api/training/enhanced",
json=config
)
return response.json()# Database integration example
import psycopg2
import requests
class AuroraDatabaseIntegration:
def __init__(self, db_config, aurora_api_url):
self.db_conn = psycopg2.connect(**db_config)
self.aurora_api = aurora_api_url
def sync_data_to_aurora(self, table_name):
"""Sync database data to Aurora AI"""
cursor = self.db_conn.cursor()
cursor.execute(f"SELECT * FROM {table_name}")
data = cursor.fetchall()
# Validate data before sending to Aurora
validation_response = requests.post(
f"{self.aurora_api}/api/validation/quality",
json={"data": data, "scope": "comprehensive"}
)
if validation_response.json().get("status") == "QUALITY_ASSESSMENT_COMPLETED":
return {"status": "sync_ready", "data": data}
return {"status": "validation_failed"}# RabbitMQ/Kafka integration example
import pika
import json
class AuroraMessageQueueIntegration:
def __init__(self, queue_config, aurora_api_url):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(**queue_config)
)
self.channel = self.connection.channel()
self.aurora_api = aurora_api_url
def setup_training_queue(self):
"""Setup queue for model training requests"""
self.channel.queue_declare(queue='aurora_training')
def callback(ch, method, properties, body):
training_request = json.loads(body)
# Send to Aurora AI for training
response = requests.post(
f"{self.aurora_api}/api/training/enhanced",
json=training_request
)
print(f"Training initiated: {response.json()}")
self.channel.basic_consume(
queue='aurora_training',
on_message_callback=callback,
auto_ack=True
)
self.channel.start_consuming()# Custom Aurora module example
from core.base import BaseComponent
from typing import Dict, Any, Optional
class CustomAuroraModule(BaseComponent):
"""Custom module for Aurora AI framework"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self.custom_config = self.config.get('custom_settings', {})
self.initialized = False
def initialize(self) -> bool:
"""Initialize custom module"""
try:
self.logger.info("Initializing Custom Aurora Module...")
# Custom initialization logic
if not self.validate_custom_config():
return False
self.setup_custom_components()
self.initialized = True
self.logger.info("Custom Module initialized successfully")
return True
except Exception as e:
self.logger.error(f"Custom Module initialization failed: {e}")
return False
def validate_custom_config(self) -> bool:
"""Validate custom configuration"""
required_keys = ['api_endpoint', 'auth_token', 'data_source']
return self.validate_config(required_keys)
def setup_custom_components(self):
"""Setup custom components"""
# Custom component setup logic
pass
def process_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process data using custom logic"""
if not self.initialized:
raise RuntimeError("Module not initialized")
# Custom data processing logic
processed_data = self.apply_custom_transformations(data)
return processed_data
def apply_custom_transformations(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Apply custom data transformations"""
# Custom transformation logic
return data# Custom API endpoint integration
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
AURORA_API = "http://localhost:8080"
@app.route('/custom/predict', methods=['POST'])
def custom_prediction():
"""Custom prediction endpoint using Aurora AI"""
data = request.get_json()
# Validate input data
validation_response = requests.post(
f"{AURORA_API}/api/validation/schema",
json={"data": data, "schema_type": "prediction_input"}
)
if validation_response.json().get("status") != "SCHEMA_VALIDATION_COMPLETED":
return jsonify({"error": "Invalid input data"}), 400
# Get prediction from Aurora
prediction_response = requests.post(
f"{AURORA_API}/api/inference/batch",
json={"data": [data], "model_id": "MDL-001"}
)
return jsonify(prediction_response.json())
@app.route('/custom/train', methods=['POST'])
def custom_training():
"""Custom training endpoint using Aurora AI"""
config = request.get_json()
# Trigger training in Aurora
training_response = requests.post(
f"{AURORA_API}/api/training/enhanced",
json=config
)
return jsonify(training_response.json())# Robust error handling for Aurora integration
import requests
from typing import Optional
import time
class AuroraIntegrationClient:
def __init__(self, base_url: str, max_retries: int = 3):
self.base_url = base_url
self.max_retries = max_retries
def make_request(self, endpoint: str, method: str = "GET",
data: Optional[dict] = None) -> Optional[dict]:
"""Make request with retry logic"""
url = f"{self.base_url}{endpoint}"
for attempt in range(self.max_retries):
try:
if method == "GET":
response = requests.get(url, timeout=30)
elif method == "POST":
response = requests.post(url, json=data, timeout=30)
if response.status_code == 200:
return response.json()
elif response.status_code == 500:
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
return {"error": f"HTTP {response.status_code}"}
except requests.exceptions.RequestException as e:
if attempt == self.max_retries - 1:
return {"error": f"Request failed: {str(e)}"}
time.sleep(2 ** attempt)
return None# Environment-specific configuration
import os
from typing import Dict
class AuroraConfig:
def __init__(self):
self.environment = os.getenv('AURORA_ENV', 'development')
self.config = self._load_config()
def _load_config(self) -> Dict:
configs = {
'development': {
'api_url': 'http://localhost:8080',
'timeout': 30,
'retry_attempts': 3
},
'staging': {
'api_url': 'https://staging.aurora.ai',
'timeout': 60,
'retry_attempts': 5
},
'production': {
'api_url': 'https://api.aurora.ai',
'timeout': 120,
'retry_attempts': 5
}
}
return configs.get(self.environment, configs['development'])
def get(self, key: str, default=None):
return self.config.get(key, default)# Monitoring and logging integration
import logging
from datetime import datetime
class AuroraMonitoring:
def __init__(self, aurora_api_url: str):
self.api_url = aurora_api_url
self.logger = logging.getLogger(__name__)
def log_api_call(self, endpoint: str, method: str,
response_code: int, duration: float):
"""Log API call metrics"""
log_data = {
'timestamp': datetime.now().isoformat(),
'endpoint': endpoint,
'method': method,
'response_code': response_code,
'duration': duration
}
# Send to Aurora monitoring
try:
requests.post(
f"{self.api_url}/api/logs/system",
json=log_data
)
except Exception as e:
self.logger.error(f"Failed to log to Aurora: {e}")
def check_system_health(self):
"""Check Aurora system health"""
try:
response = requests.get(f"{self.api_url}/api/status")
return response.json().get('status') == 'SUCCESS'
except Exception as e:
self.logger.error(f"Health check failed: {e}")
return False# JWT authentication for Aurora API
import jwt
import requests
from datetime import datetime, timedelta
class AuroraAuthClient:
def __init__(self, api_url: str, secret_key: str):
self.api_url = api_url
self.secret_key = secret_key
self.token = None
def generate_token(self, user_id: str, permissions: list) -> str:
"""Generate JWT token for Aurora API"""
payload = {
'user_id': user_id,
'permissions': permissions,
'exp': datetime.utcnow() + timedelta(hours=1),
'iat': datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')
def authenticate(self, user_id: str, permissions: list):
"""Authenticate with Aurora API"""
self.token = self.generate_token(user_id, permissions)
return self.token
def make_authenticated_request(self, endpoint: str, method: str = "GET",
data: Optional[dict] = None):
"""Make authenticated request to Aurora API"""
headers = {'Authorization': f'Bearer {self.token}'}
if method == "GET":
return requests.get(f"{self.api_url}{endpoint}", headers=headers)
elif method == "POST":
return requests.post(f"{self.api_url}{endpoint}",
json=data, headers=headers)# Data encryption integration
import requests
import json
class AuroraSecurityIntegration:
def __init__(self, api_url: str):
self.api_url = api_url
def encrypt_sensitive_data(self, data: dict) -> dict:
"""Encrypt sensitive data using Aurora security"""
response = requests.post(
f"{self.api_url}/api/security/encrypt",
json={
'action': 'encrypt',
'data': json.dumps(data),
'algorithm': 'AES-256'
}
)
if response.status_code == 200:
return response.json()
return {'error': 'Encryption failed'}
def decrypt_sensitive_data(self, encrypted_data: str) -> dict:
"""Decrypt sensitive data using Aurora security"""
response = requests.post(
f"{self.api_url}/api/security/encrypt",
json={
'action': 'decrypt',
'data': encrypted_data,
'algorithm': 'AES-256'
}
)
if response.status_code == 200:
return response.json()
return {'error': 'Decryption failed'}# Redis caching integration
import redis
import json
from typing import Optional
class AuroraCacheIntegration:
def __init__(self, redis_host: str, redis_port: int, aurora_api_url: str):
self.redis_client = redis.Redis(host=redis_host, port=redis_port)
self.aurora_api = aurora_api_url
self.cache_ttl = 3600 # 1 hour
def get_cached_prediction(self, data_hash: str) -> Optional[dict]:
"""Get cached prediction from Redis"""
cached_result = self.redis_client.get(f"prediction:{data_hash}")
if cached_result:
return json.loads(cached_result)
return None
def cache_prediction(self, data_hash: str, prediction: dict):
"""Cache prediction result"""
self.redis_client.setex(
f"prediction:{data_hash}",
self.cache_ttl,
json.dumps(prediction)
)
def get_prediction_with_cache(self, data: dict) -> dict:
"""Get prediction with caching"""
data_hash = hash(json.dumps(data, sort_keys=True))
# Check cache first
cached_result = self.get_cached_prediction(data_hash)
if cached_result:
return cached_result
# Get from Aurora AI
response = requests.post(
f"{self.aurora_api}/api/inference/batch",
json={"data": [data]}
)
if response.status_code == 200:
result = response.json()
# Cache the result
self.cache_prediction(data_hash, result)
return result
return {'error': 'Prediction failed'}# .github/workflows/aurora-integration.yml
name: Aurora AI Integration
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test-aurora-integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest requests
- name: Start Aurora AI
run: |
python web_backend/server.py &
sleep 10
- name: Run integration tests
run: |
python -c "
import requests
import time
# Wait for Aurora to be ready
for _ in range(30):
try:
response = requests.get('http://localhost:8080/api/status')
if response.status_code == 200:
break
except:
time.sleep(1)
# Run integration tests
pytest tests/integration/
"
- name: Test Aurora endpoints
run: |
python -c "
import requests
# Test key endpoints
endpoints = [
'/api/status',
'/api/training/status',
'/api/security/status',
'/api/monitoring/advanced'
]
for endpoint in endpoints:
response = requests.get(f'http://localhost:8080{endpoint}')
assert response.status_code == 200, f'Endpoint {endpoint} failed'
print('All Aurora endpoints working correctly')
"// Aurora AI integration for React Native
import axios from 'axios';
class AuroraAIService {
constructor(baseURL = 'http://localhost:8080') {
this.api = axios.create({
baseURL,
timeout: 30000,
headers: {
'Content-Type': 'application/json',
},
});
}
async predict(data) {
try {
const response = await this.api.post('/api/inference/batch', {
data: [data],
model_id: 'MDL-001',
});
return response.data;
} catch (error) {
throw new Error(`Prediction failed: ${error.message}`);
}
}
async validateData(data) {
try {
const response = await this.api.post('/api/validation/schema', {
schema_type: 'json_schema',
data,
});
return response.data;
} catch (error) {
throw new Error(`Validation failed: ${error.message}`);
}
}
async getSystemStatus() {
try {
const response = await this.api.get('/api/status');
return response.data;
} catch (error) {
throw new Error(`Status check failed: ${error.message}`);
}
}
}
// Usage example
const aurora = new AuroraAIService();
const makePrediction = async (inputData) => {
try {
// Validate input first
const validation = await aurora.validateData(inputData);
if (validation.status !== 'SCHEMA_VALIDATION_COMPLETED') {
throw new Error('Invalid input data');
}
// Make prediction
const prediction = await aurora.predict(inputData);
return prediction;
} catch (error) {
console.error('Prediction error:', error);
throw error;
}
};// Aurora AI integration for web applications
class AuroraWebIntegration {
constructor(apiURL, authToken) {
this.apiURL = apiURL;
this.authToken = authToken;
}
async makeAuthenticatedRequest(endpoint, method = 'GET', data = null) {
const config = {
method,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.authToken}`,
},
};
if (data && method !== 'GET') {
config.body = JSON.stringify(data);
}
const response = await fetch(`${this.apiURL}${endpoint}`, config);
return response.json();
}
async trainModel(config) {
return this.makeAuthenticatedRequest('/api/training/enhanced', 'POST', config);
}
async getModelMetrics() {
return this.makeAuthenticatedRequest('/api/monitoring/analytics');
}
async optimizeSystem() {
return this.makeAuthenticatedRequest('/api/optimization/analyze', 'POST', {
scope: 'full_system',
depth: 'comprehensive',
});
}
}- Connection Timeouts: Increase timeout values for large operations
- Authentication Failures: Verify JWT tokens and API keys
- Data Format Issues: Use schema validation before sending data
- Rate Limiting: Implement request throttling and caching
- System Unavailability: Implement health checks and retry logic
# Debug mode for Aurora integration
class AuroraDebugClient:
def __init__(self, api_url: str):
self.api_url = api_url
self.debug = True
def debug_request(self, endpoint: str, method: str, data: dict = None):
"""Debug request with detailed logging"""
if self.debug:
print(f"Making {method} request to {endpoint}")
if data:
print(f"Request data: {json.dumps(data, indent=2)}")
start_time = time.time()
response = requests.request(method, f"{self.api_url}{endpoint}", json=data)
duration = time.time() - start_time
if self.debug:
print(f"Response status: {response.status_code}")
print(f"Response time: {duration:.2f}s")
print(f"Response data: {json.dumps(response.json(), indent=2)}")
return responseAurora AI Integration Guide
Enterprise Integration • Custom Development • Best Practices