-
Notifications
You must be signed in to change notification settings - Fork 0
performance scaling
Valtronics Team edited this page May 4, 2026
·
1 revision
Comprehensive guide for optimizing and scaling the Valtronics system
This guide covers performance optimization strategies, scaling techniques, and best practices for ensuring the Valtronics system can handle growing workloads efficiently while maintaining high performance and reliability.
# app/monitoring/performance_monitor.py
import time
import psutil
import asyncio
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class PerformanceMetrics:
cpu_usage: float
memory_usage: float
disk_usage: float
network_io: Dict[str, float]
response_time: float
throughput: float
error_rate: float
timestamp: datetime
class PerformanceMonitor:
def __init__(self):
self.metrics_history: List[PerformanceMetrics] = []
self.alerts = []
self.thresholds = {
'cpu_usage': 80.0,
'memory_usage': 85.0,
'disk_usage': 90.0,
'response_time': 1000.0, # ms
'error_rate': 0.05, # 5%
}
async def collect_metrics(self) -> PerformanceMetrics:
"""Collect current system performance metrics"""
# CPU metrics
cpu_percent = psutil.cpu_percent(interval=1)
# Memory metrics
memory = psutil.virtual_memory()
memory_percent = memory.percent
# Disk metrics
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
# Network I/O
network = psutil.net_io_counters()
network_io = {
'bytes_sent': network.bytes_sent,
'bytes_recv': network.bytes_recv,
'packets_sent': network.packets_sent,
'packets_recv': network.packets_recv
}
# Application metrics (would be collected from application)
response_time = await self.get_average_response_time()
throughput = await self.get_throughput()
error_rate = await self.get_error_rate()
metrics = PerformanceMetrics(
cpu_usage=cpu_percent,
memory_usage=memory_percent,
disk_usage=disk_percent,
network_io=network_io,
response_time=response_time,
throughput=throughput,
error_rate=error_rate,
timestamp=datetime.utcnow()
)
self.metrics_history.append(metrics)
# Keep only last 1000 metrics
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
# Check for performance alerts
await self.check_performance_alerts(metrics)
return metrics
async def get_average_response_time(self) -> float:
"""Get average API response time"""
# Implementation would query application metrics
return 250.0 # ms
async def get_throughput(self) -> float:
"""Get system throughput (requests per second)"""
# Implementation would query application metrics
return 150.0 # rps
async def get_error_rate(self) -> float:
"""Get system error rate"""
# Implementation would query application metrics
return 0.02 # 2%
async def check_performance_alerts(self, metrics: PerformanceMetrics):
"""Check for performance alerts"""
alerts = []
if metrics.cpu_usage > self.thresholds['cpu_usage']:
alerts.append({
'type': 'cpu_high',
'message': f'CPU usage is {metrics.cpu_usage:.1f}%',
'severity': 'warning' if metrics.cpu_usage < 95 else 'critical'
})
if metrics.memory_usage > self.thresholds['memory_usage']:
alerts.append({
'type': 'memory_high',
'message': f'Memory usage is {metrics.memory_usage:.1f}%',
'severity': 'warning' if metrics.memory_usage < 95 else 'critical'
})
if metrics.disk_usage > self.thresholds['disk_usage']:
alerts.append({
'type': 'disk_high',
'message': f'Disk usage is {metrics.disk_usage:.1f}%',
'severity': 'warning' if metrics.disk_usage < 95 else 'critical'
})
if metrics.response_time > self.thresholds['response_time']:
alerts.append({
'type': 'response_time_high',
'message': f'Response time is {metrics.response_time:.1f}ms',
'severity': 'warning' if metrics.response_time < 2000 else 'critical'
})
if metrics.error_rate > self.thresholds['error_rate']:
alerts.append({
'type': 'error_rate_high',
'message': f'Error rate is {metrics.error_rate:.2%}',
'severity': 'warning' if metrics.error_rate < 0.1 else 'critical'
})
for alert in alerts:
self.alerts.append({
**alert,
'timestamp': metrics.timestamp
})
def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary for the last hour"""
if not self.metrics_history:
return {}
# Get metrics from last hour
cutoff_time = datetime.utcnow() - timedelta(hours=1)
recent_metrics = [
m for m in self.metrics_history
if m.timestamp > cutoff_time
]
if not recent_metrics:
return {}
return {
'period': '1 hour',
'metrics_count': len(recent_metrics),
'cpu': {
'avg': sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics),
'max': max(m.cpu_usage for m in recent_metrics),
'min': min(m.cpu_usage for m in recent_metrics)
},
'memory': {
'avg': sum(m.memory_usage for m in recent_metrics) / len(recent_metrics),
'max': max(m.memory_usage for m in recent_metrics),
'min': min(m.memory_usage for m in recent_metrics)
},
'response_time': {
'avg': sum(m.response_time for m in recent_metrics) / len(recent_metrics),
'max': max(m.response_time for m in recent_metrics),
'min': min(m.response_time for m in recent_metrics)
},
'throughput': {
'avg': sum(m.throughput for m in recent_metrics) / len(recent_metrics),
'max': max(m.throughput for m in recent_metrics),
'min': min(m.throughput for m in recent_metrics)
},
'error_rate': {
'avg': sum(m.error_rate for m in recent_metrics) / len(recent_metrics),
'max': max(m.error_rate for m in recent_metrics),
'min': min(m.error_rate for m in recent_metrics)
},
'alerts_count': len([a for a in self.alerts if a['timestamp'] > cutoff_time])
}# app/monitoring/database_monitor.py
import time
import psycopg2
from sqlalchemy import text
from app.db.session import engine
from typing import Dict, Any
class DatabaseMonitor:
def __init__(self):
self.connection_pool_stats = {}
self.query_performance = {}
def get_database_stats(self) -> Dict[str, Any]:
"""Get comprehensive database performance statistics"""
stats = {}
with engine.connect() as conn:
# Connection pool stats
pool = engine.pool
stats['connection_pool'] = {
'size': pool.size(),
'checked_in': pool.checkedin(),
'checked_out': pool.checkedout(),
'overflow': pool.overflow(),
'invalid': pool.invalid()
}
# Database size and table stats
stats['database_size'] = self._get_database_size(conn)
stats['table_stats'] = self._get_table_stats(conn)
# Query performance
stats['slow_queries'] = self._get_slow_queries(conn)
stats['query_stats'] = self._get_query_stats(conn)
# Index usage
stats['index_usage'] = self._get_index_usage(conn)
# Lock information
stats['locks'] = self._get_lock_info(conn)
return stats
def _get_database_size(self, conn) -> Dict[str, Any]:
"""Get database size information"""
result = conn.execute(text("""
SELECT
pg_size_pretty(pg_database_size(current_database())) as size,
pg_size_pretty(pg_total_relation_size()) as total_size
"""))
row = result.fetchone()
return {
'database_size': row[0],
'total_size': row[1]
}
def _get_table_stats(self, conn) -> Dict[str, Any]:
"""Get table statistics"""
result = conn.execute(text("""
SELECT
schemaname,
tablename,
n_tup_ins as inserts,
n_tup_upd as updates,
n_tup_del as deletes,
n_live_tup as live_tuples,
n_dead_tup as dead_tuples,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze
FROM pg_stat_user_tables
ORDER BY n_live_tup DESC
LIMIT 10
"""))
tables = []
for row in result:
tables.append({
'schema': row[0],
'table': row[1],
'inserts': row[2],
'updates': row[3],
'deletes': row[4],
'live_tuples': row[5],
'dead_tuples': row[6],
'last_vacuum': row[7],
'last_autovacuum': row[8],
'last_analyze': row[9],
'last_autoanalyze': row[10]
})
return {'tables': tables}
def _get_slow_queries(self, conn) -> List[Dict[str, Any]]:
"""Get slow query statistics"""
result = conn.execute(text("""
SELECT
query,
calls,
total_time,
mean_time,
rows,
100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent
FROM pg_stat_statements
WHERE mean_time > 100
ORDER BY mean_time DESC
LIMIT 10
"""))
slow_queries = []
for row in result:
slow_queries.append({
'query': row[0][:200] + '...' if len(row[0]) > 200 else row[0],
'calls': row[1],
'total_time': row[2],
'mean_time': row[3],
'rows': row[4],
'hit_percent': row[5]
})
return slow_queries
def _get_query_stats(self, conn) -> Dict[str, Any]:
"""Get overall query performance statistics"""
result = conn.execute(text("""
SELECT
SUM(calls) as total_calls,
SUM(total_time) as total_time,
AVG(mean_time) as avg_time,
MAX(mean_time) as max_time,
SUM(rows) as total_rows
FROM pg_stat_statements
"""))
row = result.fetchone()
return {
'total_calls': row[0],
'total_time': row[1],
'avg_time': row[2],
'max_time': row[3],
'total_rows': row[4]
}
def _get_index_usage(self, conn) -> List[Dict[str, Any]]:
"""Get index usage statistics"""
result = conn.execute(text("""
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC
LIMIT 20
"""))
indexes = []
for row in result:
indexes.append({
'schema': row[0],
'table': row[1],
'index': row[2],
'scans': row[3],
'tuples_read': row[4],
'tuples_fetched': row[5]
})
return indexes
def _get_lock_info(self, conn) -> Dict[str, Any]:
"""Get lock information"""
result = conn.execute(text("""
SELECT
mode,
COUNT(*) as count,
pg_size_pretty(pg_total_relation_size(pg_locks.relation)) as relation_size
FROM pg_locks
LEFT JOIN pg_class ON pg_locks.relation = pg_class.oid
WHERE NOT granted
GROUP BY mode
"""))
locks = {}
for row in result:
locks[row[0]] = {
'count': row[1],
'relation_size': row[2]
}
return locks# app/optimization/query_optimizer.py
from sqlalchemy import text
from sqlalchemy.orm import Session
from typing import List, Dict, Any
import logging
class QueryOptimizer:
def __init__(self):
self.logger = logging.getLogger(__name__)
def analyze_query_performance(self, db: Session, query: str) -> Dict[str, Any]:
"""Analyze query performance using EXPLAIN ANALYZE"""
try:
# Run EXPLAIN ANALYZE
explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
result = db.execute(text(explain_query))
explain_result = result.fetchone()[0]
# Extract performance metrics
metrics = self._extract_query_metrics(explain_result)
# Identify optimization opportunities
optimizations = self._identify_optimizations(explain_result)
return {
'query': query,
'metrics': metrics,
'optimizations': optimizations,
'explain_plan': explain_result
}
except Exception as e:
self.logger.error(f"Error analyzing query: {e}")
return {'error': str(e)}
def _extract_query_metrics(self, explain_result: Dict[str, Any]) -> Dict[str, Any]:
"""Extract key performance metrics from EXPLAIN ANALYZE"""
plan = explain_result.get('Plan', {})
metrics = {
'execution_time': 0,
'planning_time': 0,
'total_cost': plan.get('Total Cost', 0),
'actual_rows': plan.get('Actual Rows', 0),
'actual_loops': plan.get('Actual Loops', 0),
'buffer_usage': {},
'index_usage': []
}
# Extract timing information
if 'Execution Time' in explain_result:
metrics['execution_time'] = float(explain_result['Execution Time'])
if 'Planning Time' in explain_result:
metrics['planning_time'] = float(explain_result['Planning Time'])
# Extract buffer usage
if 'Buffers' in explain_result:
buffers = explain_result['Buffers']
metrics['buffer_usage'] = {
'shared_hit_blocks': buffers.get('Shared Hit Blocks', 0),
'shared_read_blocks': buffers.get('Shared Read Blocks', 0),
'shared_dirtied_blocks': buffers.get('Shared Dirtied Blocks', 0),
'shared_written_blocks': buffers.get('Shared Written Blocks', 0),
'local_hit_blocks': buffers.get('Local Hit Blocks', 0),
'local_read_blocks': buffers.get('Local Read Blocks', 0),
'local_dirtied_blocks': buffers.get('Local Dirtied Blocks', 0),
'local_written_blocks': buffers.get('Local Written Blocks', 0),
'temp_read_blocks': buffers.get('Temp Read Blocks', 0),
'temp_written_blocks': buffers.get('Temp Written Blocks', 0)
}
# Extract index usage
self._extract_index_usage(plan, metrics['index_usage'])
return metrics
def _extract_index_usage(self, plan: Dict[str, Any], index_usage: List[Dict[str, Any]]):
"""Recursively extract index usage information"""
if 'Node Type' in plan:
node_type = plan['Node Type']
if node_type in ['Index Scan', 'Index Only Scan', 'Bitmap Index Scan']:
index_info = {
'node_type': node_type,
'index_name': plan.get('Index Name'),
'actual_rows': plan.get('Actual Rows', 0),
'total_cost': plan.get('Total Cost', 0)
}
index_usage.append(index_info)
# Recursively check child plans
if 'Plans' in plan:
for child_plan in plan['Plans']:
self._extract_index_usage(child_plan, index_usage)
def _identify_optimizations(self, explain_result: Dict[str, Any]) -> List[str]:
"""Identify potential optimizations based on query plan"""
optimizations = []
plan = explain_result.get('Plan', {})
# Check for sequential scans
if self._has_sequential_scan(plan):
optimizations.append("Consider adding an index for this query")
# Check for high cost operations
if self._has_high_cost_operations(plan):
optimizations.append("Consider optimizing the query structure")
# Check for buffer usage issues
if self._has_buffer_issues(explain_result):
optimizations.append("Consider increasing work_mem or shared_buffers")
# Check for sorting issues
if self._has_sorting_issues(plan):
optimizations.append("Consider adding indexes to avoid sorting")
return optimizations
def _has_sequential_scan(self, plan: Dict[str, Any]) -> bool:
"""Check if plan contains sequential scans"""
if plan.get('Node Type') == 'Seq Scan':
return True
if 'Plans' in plan:
for child_plan in plan['Plans']:
if self._has_sequential_scan(child_plan):
return True
return False
def _has_high_cost_operations(self, plan: Dict[str, Any]) -> bool:
"""Check for high cost operations"""
total_cost = plan.get('Total Cost', 0)
actual_rows = plan.get('Actual Rows', 1)
if actual_rows > 0 and total_cost / actual_rows > 1000:
return True
if 'Plans' in plan:
for child_plan in plan['Plans']:
if self._has_high_cost_operations(child_plan):
return True
return False
def _has_buffer_issues(self, explain_result: Dict[str, Any]) -> bool:
"""Check for buffer usage issues"""
buffers = explain_result.get('Buffers', {})
# Check for high temporary usage
temp_read = buffers.get('Temp Read Blocks', 0)
temp_written = buffers.get('Temp Written Blocks', 0)
if temp_read > 1000 or temp_written > 1000:
return True
return False
def _has_sorting_issues(self, plan: Dict[str, Any]) -> bool:
"""Check for sorting operations"""
if plan.get('Node Type') == 'Sort':
return True
if 'Plans' in plan:
for child_plan in plan['Plans']:
if self._has_sorting_issues(child_plan):
return True
return False# app/optimization/index_optimizer.py
from sqlalchemy import text
from sqlalchemy.orm import Session
from typing import List, Dict, Any
import logging
class IndexOptimizer:
def __init__(self):
self.logger = logging.getLogger(__name__)
def analyze_index_usage(self, db: Session) -> Dict[str, Any]:
"""Analyze index usage and recommend optimizations"""
# Get current indexes
current_indexes = self._get_current_indexes(db)
# Get unused indexes
unused_indexes = self._get_unused_indexes(db)
# Get missing indexes
missing_indexes = self._get_missing_indexes(db)
# Get index size information
index_sizes = self._get_index_sizes(db)
return {
'current_indexes': current_indexes,
'unused_indexes': unused_indexes,
'missing_indexes': missing_indexes,
'index_sizes': index_sizes
}
def _get_current_indexes(self, db: Session) -> List[Dict[str, Any]]:
"""Get current database indexes"""
result = db.execute(text("""
SELECT
schemaname,
tablename,
indexname,
indexdef,
idx_scan,
idx_tup_read,
idx_tup_fetch,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
JOIN pg_class ON pg_class.oid = indexrelid
ORDER BY schemaname, tablename, indexname
"""))
indexes = []
for row in result:
indexes.append({
'schema': row[0],
'table': row[1],
'name': row[2],
'definition': row[3],
'scans': row[4],
'tuples_read': row[5],
'tuples_fetched': row[6],
'size': row[7]
})
return indexes
def _get_unused_indexes(self, db: Session) -> List[Dict[str, Any]]:
"""Get indexes that are never used"""
result = db.execute(text("""
SELECT
schemaname,
tablename,
indexname,
indexdef,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
JOIN pg_class ON pg_class.oid = indexrelid
WHERE idx_scan = 0
AND indexdef NOT LIKE '%_pkey%'
ORDER BY schemaname, tablename, indexname
"""))
unused = []
for row in result:
unused.append({
'schema': row[0],
'table': row[1],
'name': row[2],
'definition': row[3],
'size': row[4]
})
return unused
def _get_missing_indexes(self, db: Session) -> List[Dict[str, Any]]:
"""Get potentially missing indexes based on query patterns"""
# This is a simplified version - in practice, you would analyze
# query patterns from pg_stat_statements
result = db.execute(text("""
SELECT
schemaname,
tablename,
attname,
n_distinct,
correlation
FROM pg_stats
WHERE schemaname = 'public'
AND n_distinct > 100
AND correlation < 0.1
ORDER BY n_distinct DESC
LIMIT 20
"""))
missing = []
for row in result:
missing.append({
'schema': row[0],
'table': row[1],
'column': row[2],
'distinct_values': row[3],
'correlation': row[4],
'recommendation': f"Consider index on {row[1]}({row[2]})"
})
return missing
def _get_index_sizes(self, db: Session) -> Dict[str, Any]:
"""Get index size information"""
result = db.execute(text("""
SELECT
schemaname,
SUM(pg_relation_size(indexrelid)) as total_size,
pg_size_pretty(SUM(pg_relation_size(indexrelid))) as total_size_pretty
FROM pg_stat_user_indexes
JOIN pg_class ON pg_class.oid = indexrelid
GROUP BY schemaname
"""))
sizes = {}
for row in result:
sizes[row[0]] = {
'total_size_bytes': row[1],
'total_size_pretty': row[2]
}
return sizes
def create_recommended_indexes(self, db: Session, recommendations: List[Dict[str, Any]]) -> bool:
"""Create recommended indexes"""
try:
for rec in recommendations:
if 'create_sql' in rec:
db.execute(text(rec['create_sql']))
db.commit()
self.logger.info(f"Created index: {rec['name']}")
return True
except Exception as e:
self.logger.error(f"Error creating indexes: {e}")
db.rollback()
return False
def drop_unused_indexes(self, db: Session, indexes: List[Dict[str, Any]]) -> bool:
"""Drop unused indexes"""
try:
for index in indexes:
drop_sql = f"DROP INDEX {index['schema']}.{index['name']}"
db.execute(text(drop_sql))
db.commit()
self.logger.info(f"Dropped unused index: {index['name']}")
return True
except Exception as e:
self.logger.error(f"Error dropping indexes: {e}")
db.rollback()
return False# app/optimization/cache_manager.py
import redis
import json
import pickle
from typing import Any, Optional, Union
from datetime import datetime, timedelta
import hashlib
import logging
class CacheManager:
def __init__(self, redis_url: str):
self.redis_client = redis.from_url(redis_url, decode_responses=False)
self.default_ttl = 3600 # 1 hour
self.logger = logging.getLogger(__name__)
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
cached_data = self.redis_client.get(key)
if cached_data:
return pickle.loads(cached_data)
return None
except Exception as e:
self.logger.error(f"Cache get error for key {key}: {e}")
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set value in cache"""
try:
ttl = ttl or self.default_ttl
serialized_value = pickle.dumps(value)
return self.redis_client.setex(key, ttl, serialized_value)
except Exception as e:
self.logger.error(f"Cache set error for key {key}: {e}")
return False
def delete(self, key: str) -> bool:
"""Delete value from cache"""
try:
return bool(self.redis_client.delete(key))
except Exception as e:
self.logger.error(f"Cache delete error for key {key}: {e}")
return False
def clear_pattern(self, pattern: str) -> int:
"""Clear keys matching pattern"""
try:
keys = self.redis_client.keys(pattern)
if keys:
return self.redis_client.delete(*keys)
return 0
except Exception as e:
self.logger.error(f"Cache clear pattern error for pattern {pattern}: {e}")
return 0
def get_or_set(self, key: str, func, ttl: Optional[int] = None, *args, **kwargs) -> Any:
"""Get value from cache or set using function"""
value = self.get(key)
if value is not None:
return value
# Compute value
value = func(*args, **kwargs)
# Set in cache
self.set(key, value, ttl)
return value
def cache_device_telemetry(self, device_id: int, telemetry_data: Dict[str, Any]) -> bool:
"""Cache device telemetry data"""
key = f"telemetry:device:{device_id}"
return self.set(key, telemetry_data, ttl=300) # 5 minutes
def get_device_telemetry(self, device_id: int) -> Optional[Dict[str, Any]]:
"""Get cached device telemetry data"""
key = f"telemetry:device:{device_id}"
return self.get(key)
def cache_api_response(self, endpoint: str, params: Dict[str, Any], response: Any, ttl: int = 300) -> bool:
"""Cache API response"""
# Create cache key from endpoint and parameters
params_str = json.dumps(params, sort_keys=True)
key_hash = hashlib.md5(f"{endpoint}:{params_str}".encode()).hexdigest()
key = f"api:{endpoint}:{key_hash}"
return self.set(key, response, ttl)
def get_cached_api_response(self, endpoint: str, params: Dict[str, Any]) -> Optional[Any]:
"""Get cached API response"""
params_str = json.dumps(params, sort_keys=True)
key_hash = hashlib.md5(f"{endpoint}:{params_str}".encode()).hexdigest()
key = f"api:{endpoint}:{key_hash}"
return self.get(key)
def invalidate_device_cache(self, device_id: int) -> bool:
"""Invalidate all cache entries for a device"""
patterns = [
f"telemetry:device:{device_id}",
f"device:{device_id}:*",
f"api:*:*{device_id}*"
]
total_deleted = 0
for pattern in patterns:
total_deleted += self.clear_pattern(pattern)
return total_deleted > 0
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
try:
info = self.redis_client.info()
return {
'connected_clients': info['connected_clients'],
'used_memory': info['used_memory'],
'used_memory_human': info['used_memory_human'],
'used_memory_peak': info['used_memory_peak'],
'used_memory_peak_human': info['used_memory_peak_human'],
'total_commands_processed': info['total_commands_processed'],
'keyspace_hits': info['keyspace_hits'],
'keyspace_misses': info['keyspace_misses'],
'hit_rate': info['keyspace_hits'] / (info['keyspace_hits'] + info['keyspace_misses']) if (info['keyspace_hits'] + info['keyspace_misses']) > 0 else 0
}
except Exception as e:
self.logger.error(f"Error getting cache stats: {e}")
return {}# app/optimization/connection_pool.py
import asyncio
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from typing import Optional
import logging
class DatabaseConnectionPool:
def __init__(self, database_url: str, pool_size: int = 20, max_overflow: int = 30):
self.engine = create_engine(
database_url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=max_overflow,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)
self.SessionLocal = sessionmaker(bind=self.engine)
self.logger = logging.getLogger(__name__)
def get_session(self):
"""Get database session from pool"""
return self.SessionLocal()
def get_pool_status(self) -> Dict[str, Any]:
"""Get connection pool status"""
pool = self.engine.pool
return {
'size': pool.size(),
'checked_in': pool.checkedin(),
'checked_out': pool.checkedout(),
'overflow': pool.overflow(),
'invalid': pool.invalid()
}
async def test_pool_performance(self, concurrent_connections: int = 50) -> Dict[str, Any]:
"""Test connection pool performance"""
import time
import concurrent.futures
def query_database():
"""Test database query"""
session = self.get_session()
try:
result = session.execute(text("SELECT 1"))
return result.fetchone()[0]
finally:
session.close()
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_connections) as executor:
futures = [executor.submit(query_database) for _ in range(concurrent_connections)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
end_time = time.time()
return {
'concurrent_connections': concurrent_connections,
'total_time': end_time - start_time,
'avg_time_per_query': (end_time - start_time) / concurrent_connections,
'queries_per_second': concurrent_connections / (end_time - start_time),
'success_rate': len(results) / concurrent_connections,
'pool_status': self.get_pool_status()
}# docker-compose.loadbalancer.yml
version: '3.8'
services:
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- app1
- app2
- app3
networks:
- valtronics-network
app1:
build: ./backend
environment:
- DATABASE_URL=postgresql://user:password@postgres:5432/valtronics
- REDIS_URL=redis://redis:6379/0
depends_on:
- postgres
- redis
networks:
- valtronics-network
app2:
build: ./backend
environment:
- DATABASE_URL=postgresql://user:password@postgres:5432/valtronics
- REDIS_URL=redis://redis:6379/0
depends_on:
- postgres
- redis
networks:
- valtronics-network
app3:
build: ./backend
environment:
- DATABASE_URL=postgresql://user:password@postgres:5432/valtronics
- REDIS_URL=redis://redis:6379/0
depends_on:
- postgres
- redis
networks:
- valtronics-network
postgres:
image: postgres:14
environment:
POSTGRES_DB: valtronics
POSTGRES_USER: user
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- valtronics-network
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
networks:
- valtronics-network
volumes:
postgres_data:
redis_data:
networks:
valtronics-network:
driver: bridge# nginx.conf
upstream valtronics_backend {
least_conn;
server app1:8000 max_fails=3 fail_timeout=30s;
server app2:8000 max_fails=3 fail_timeout=30s;
server app3:8000 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
server_name valtronics.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name valtronics.com;
ssl_certificate /etc/nginx/ssl/valtronics.crt;
ssl_certificate_key /etc/nginx/ssl/valtronics.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
ssl_prefer_server_ciphers off;
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=login:10m rate=1r/s;
# Main application
location / {
limit_req zone=api burst=20 nodelay;
proxy_pass http://valtronics_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts
proxy_connect_timeout 30s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
}
# API endpoints
location /api/ {
limit_req zone=api burst=50 nodelay;
proxy_pass http://valtronics_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# Login endpoint - stricter rate limiting
location /api/v1/auth/login {
limit_req zone=login burst=5 nodelay;
proxy_pass http://valtronics_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket connections
location /ws {
proxy_pass http://valtronics_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket specific
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
}
# Static files
location /static/ {
alias /var/www/static/;
expires 1y;
add_header Cache-Control "public, immutable";
}
# Health check
location /health {
access_log off;
return 200 "healthy\n";
add_header Content-Type text/plain;
}
}# app/db/read_replicas.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
from typing import List, Optional
import random
import logging
class DatabaseReplicaManager:
def __init__(self, primary_url: str, replica_urls: List[str]):
self.primary_url = primary_url
self.replica_urls = replica_urls
# Create primary engine
self.primary_engine = create_engine(
primary_url,
poolclass=QueuePool,
pool_size=20,
max_overflow=30,
pool_pre_ping=True,
pool_recycle=3600
)
# Create replica engines
self.replica_engines = []
for url in replica_urls:
engine = create_engine(
url,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
self.replica_engines.append(engine)
self.SessionLocal = sessionmaker(bind=self.primary_engine)
self.ReplicaSessionLocal = sessionmaker(bind=self.get_random_replica_engine())
self.logger = logging.getLogger(__name__)
def get_random_replica_engine(self):
"""Get random replica engine for load balancing"""
if not self.replica_engines:
return self.primary_engine
return random.choice(self.replica_engines)
def get_read_session(self):
"""Get session from replica for read operations"""
if not self.replica_engines:
return self.SessionLocal()
return sessionmaker(bind=self.get_random_replica_engine())()
def get_write_session(self):
"""Get session from primary for write operations"""
return self.SessionLocal()
def get_session(self, read_only: bool = False):
"""Get appropriate session based on operation type"""
if read_only and self.replica_engines:
return self.get_read_session()
else:
return self.get_write_session()
def check_replica_health(self) -> Dict[str, Any]:
"""Check health of all replicas"""
health_status = {
'primary': self._check_engine_health(self.primary_engine),
'replicas': []
}
for i, engine in enumerate(self.replica_engines):
health = self._check_engine_health(engine)
health['replica_index'] = i
health_status['replicas'].append(health)
return health_status
def _check_engine_health(self, engine) -> Dict[str, Any]:
"""Check health of a database engine"""
try:
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
result.fetchone()
return {
'status': 'healthy',
'error': None
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
def failover_to_replica(self):
"""Failover to replica if primary fails"""
if not self.replica_engines:
raise Exception("No replicas available for failover")
# Promote first replica to primary
new_primary_url = self.replica_urls[0]
new_replica_urls = self.replica_urls[1:] + [self.primary_url]
# Reconfigure
self.__init__(new_primary_url, new_replica_urls)
self.logger.warning("Failed over to replica")# app/db/sharding.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from typing import Dict, Any, Optional
import hashlib
import logging
class DatabaseShardingManager:
def __init__(self, shard_configs: Dict[str, str]):
self.shard_configs = shard_configs
self.shard_engines = {}
self.shard_sessions = {}
# Create engines for each shard
for shard_name, database_url in shard_configs.items():
engine = create_engine(
database_url,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
self.shard_engines[shard_name] = engine
self.shard_sessions[shard_name] = sessionmaker(bind=engine)
self.logger = logging.getLogger(__name__)
def get_shard_key(self, device_id: int) -> str:
"""Get shard key for device ID"""
# Simple hash-based sharding
shard_count = len(self.shard_engines)
hash_value = int(hashlib.md5(str(device_id).encode()).hexdigest(), 16)
shard_index = hash_value % shard_count
shard_names = list(self.shard_engines.keys())
return shard_names[shard_index]
def get_shard_session(self, shard_key: str):
"""Get session for specific shard"""
if shard_key not in self.shard_sessions:
raise ValueError(f"Shard {shard_key} not found")
return self.shard_sessions[shard_key]()
def get_session_for_device(self, device_id: int):
"""Get session for device based on sharding"""
shard_key = self.get_shard_key(device_id)
return self.get_shard_session(shard_key)
def execute_query_on_all_shards(self, query: str) -> Dict[str, Any]:
"""Execute query on all shards and combine results"""
results = {}
for shard_name, engine in self.shard_engines.items():
try:
with engine.connect() as conn:
result = conn.execute(text(query))
results[shard_name] = result.fetchall()
except Exception as e:
self.logger.error(f"Error executing query on shard {shard_name}: {e}")
results[shard_name] = []
return results
def get_shard_stats(self) -> Dict[str, Any]:
"""Get statistics for all shards"""
stats = {}
for shard_name, engine in self.shard_engines.items():
try:
with engine.connect() as conn:
result = conn.execute(text("""
SELECT
COUNT(*) as device_count,
pg_size_pretty(pg_database_size(current_database())) as size
FROM devices
"""))
row = result.fetchone()
stats[shard_name] = {
'device_count': row[0],
'size': row[1],
'status': 'healthy'
}
except Exception as e:
stats[shard_name] = {
'device_count': 0,
'size': 'unknown',
'status': 'unhealthy',
'error': str(e)
}
return stats
def rebalance_shards(self) -> bool:
"""Rebalance data across shards"""
# This is a complex operation that would involve:
# 1. Moving data between shards
# 2. Updating sharding logic
# 3. Ensuring data consistency
self.logger.info("Shard rebalancing initiated")
# Implementation would depend on specific requirements
# This is a placeholder for the rebalancing logic
return True# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: valtronics-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: valtronics-api
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100"
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 4
periodSeconds: 15
selectPolicy: Max# app/monitoring/custom_metrics.py
import time
import psutil
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from typing import Dict, Any
import logging
class CustomMetricsCollector:
def __init__(self):
self.logger = logging.getLogger(__name__)
# Define Prometheus metrics
self.api_requests_total = Counter(
'valtronics_api_requests_total',
'Total number of API requests',
['method', 'endpoint', 'status']
)
self.api_request_duration = Histogram(
'valtronics_api_request_duration_seconds',
'API request duration in seconds',
['method', 'endpoint']
)
self.active_connections = Gauge(
'valtronics_active_connections',
'Number of active connections'
)
self.devices_total = Gauge(
'valtronics_devices_total',
'Total number of devices',
['status']
)
self.telemetry_points_total = Counter(
'valtronics_telemetry_points_total',
'Total number of telemetry points',
['device_type']
)
self.system_cpu_usage = Gauge(
'valtronics_system_cpu_usage_percent',
'System CPU usage percentage'
)
self.system_memory_usage = Gauge(
'valtronics_system_memory_usage_percent',
'System memory usage percentage'
)
def record_api_request(self, method: str, endpoint: str, status: int, duration: float):
"""Record API request metrics"""
self.api_requests_total.labels(method=method, endpoint=endpoint, status=status).inc()
self.api_request_duration.labels(method=method, endpoint=endpoint).observe(duration)
def update_active_connections(self, count: int):
"""Update active connections gauge"""
self.active_connections.set(count)
def update_device_metrics(self, device_stats: Dict[str, Any]):
"""Update device metrics"""
# Reset device counts
self.devices_total.labels(status='online').set(0)
self.devices_total.labels(status='offline').set(0)
self.devices_total.labels(status='error').set(0)
self.devices_total.labels(status='warning').set(0)
# Update with current stats
for status, count in device_stats.items():
self.devices_total.labels(status=status).set(count)
def record_telemetry_point(self, device_type: str):
"""Record telemetry point"""
self.telemetry_points_total.labels(device_type=device_type).inc()
def update_system_metrics(self):
"""Update system metrics"""
# CPU usage
cpu_percent = psutil.cpu_percent()
self.system_cpu_usage.set(cpu_percent)
# Memory usage
memory = psutil.virtual_memory()
self.system_memory_usage.set(memory.percent)
def get_metrics_export(self) -> str:
"""Get Prometheus metrics export"""
return generate_latest()
def start_metrics_collection(self, interval: int = 30):
"""Start periodic metrics collection"""
import threading
import time
def collect_metrics():
while True:
try:
self.update_system_metrics()
time.sleep(interval)
except Exception as e:
self.logger.error(f"Error collecting metrics: {e}")
time.sleep(interval)
thread = threading.Thread(target=collect_metrics, daemon=True)
thread.start()
self.logger.info(f"Started metrics collection with {interval}s interval")# app/testing/load_test.py
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict, Any
import logging
class LoadTester:
def __init__(self, base_url: str):
self.base_url = base_url
self.logger = logging.getLogger(__name__)
async def run_load_test(self, endpoint: str, concurrent_users: int,
duration: int, requests_per_second: int) -> Dict[str, Any]:
"""Run load test for specified endpoint"""
self.logger.info(f"Starting load test: {concurrent_users} users, {duration}s, {requests_per_second} rps")
results = {
'endpoint': endpoint,
'concurrent_users': concurrent_users,
'duration': duration,
'requests_per_second': requests_per_second,
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'response_times': [],
'errors': []
}
# Create semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(concurrent_users)
async def make_request(session, url):
"""Make single request"""
nonlocal results
async with semaphore:
start_time = time.time()
try:
async with session.get(url) as response:
end_time = time.time()
response_time = (end_time - start_time) * 1000 # ms
results['response_times'].append(response_time)
if response.status == 200:
results['successful_requests'] += 1
else:
results['failed_requests'] += 1
results['errors'].append({
'status': response.status,
'message': await response.text()
})
results['total_requests'] += 1
except Exception as e:
end_time = time.time()
response_time = (end_time - start_time) * 1000
results['response_times'].append(response_time)
results['failed_requests'] += 1
results['errors'].append({
'error': str(e)
})
results['total_requests'] += 1
# Calculate total requests
total_requests = duration * requests_per_second
# Create tasks for requests
tasks = []
request_interval = 1.0 / requests_per_second
async with aiohttp.ClientSession() as session:
start_time = time.time()
for i in range(total_requests):
task = asyncio.create_task(make_request(session, f"{self.base_url}{endpoint}"))
tasks.append(task)
# Delay to maintain requests per second rate
if i < total_requests - 1:
await asyncio.sleep(request_interval)
# Wait for all tasks to complete
await asyncio.gather(*tasks)
end_time = time.time()
actual_duration = end_time - start_time
# Calculate statistics
if results['response_times']:
results['avg_response_time'] = statistics.mean(results['response_times'])
results['min_response_time'] = min(results['response_times'])
results['max_response_time'] = max(results['response_times'])
results['p95_response_time'] = statistics.quantiles(results['response_times'], n=20)[18] # 95th percentile
results['p99_response_time'] = statistics.quantiles(results['response_times'], n=20)[19] # 99th percentile
else:
results['avg_response_time'] = 0
results['min_response_time'] = 0
results['max_response_time'] = 0
results['p95_response_time'] = 0
results['p99_response_time'] = 0
results['actual_duration'] = actual_duration
results['actual_rps'] = results['total_requests'] / actual_duration
results['success_rate'] = results['successful_requests'] / results['total_requests'] if results['total_requests'] > 0 else 0
return results
async def run_stress_test(self, endpoint: str, max_users: int, step: int = 10) -> Dict[str, Any]:
"""Run stress test gradually increasing users"""
stress_results = {
'endpoint': endpoint,
'max_users': max_users,
'step': step,
'results': []
}
for users in range(step, max_users + 1, step):
self.logger.info(f"Testing with {users} concurrent users")
result = await self.run_load_test(
endpoint=endpoint,
concurrent_users=users,
duration=30, # 30 seconds per step
requests_per_second=users * 2 # 2 requests per user per second
)
stress_results['results'].append({
'users': users,
'avg_response_time': result['avg_response_time'],
'success_rate': result['success_rate'],
'actual_rps': result['actual_rps'],
'errors': result['errors']
})
# Stop if success rate drops below 95%
if result['success_rate'] < 0.95:
self.logger.warning(f"Success rate dropped below 95% at {users} users")
break
return stress_results
async def run_endurance_test(self, endpoint: str, users: int, duration: int = 3600) -> Dict[str, Any]:
"""Run endurance test for extended period"""
self.logger.info(f"Starting endurance test: {users} users for {duration}s")
endurance_results = {
'endpoint': endpoint,
'users': users,
'duration': duration,
'samples': []
}
# Sample metrics every 60 seconds
sample_interval = 60
samples_count = duration // sample_interval
for i in range(samples_count):
start_time = time.time()
result = await self.run_load_test(
endpoint=endpoint,
concurrent_users=users,
duration=sample_interval,
requests_per_second=users * 2
)
end_time = time.time()
endurance_results['samples'].append({
'timestamp': start_time,
'avg_response_time': result['avg_response_time'],
'success_rate': result['success_rate'],
'actual_rps': result['actual_rps'],
'errors': result['errors']
})
# Check for performance degradation
if i > 0:
prev_sample = endurance_results['samples'][i-1]
if result['avg_response_time'] > prev_sample['avg_response_time'] * 1.5:
self.logger.warning(f"Performance degradation detected at sample {i}")
return endurance_results- Implement comprehensive monitoring at all levels
- Set up alerts for performance thresholds
- Use APM tools for application performance
- Monitor database performance metrics
- Track system resource utilization
- Use connection pooling
- Implement proper indexing strategy
- Optimize slow queries
- Use read replicas for scaling reads
- Implement caching strategies
- Use caching for frequently accessed data
- Implement connection pooling
- Optimize algorithms and data structures
- Use asynchronous operations where appropriate
- Implement proper error handling
- Use horizontal scaling for stateless components
- Implement database sharding for large datasets
- Use load balancing for high availability
- Implement auto-scaling based on metrics
- Plan for capacity growth
For performance and scaling support:
- Documentation: System Architecture
- Database: Database Overview
- API: API Overview
- Troubleshooting: Troubleshooting Guide
- Email: autobotsolution@gmail.com
© 2024 Software Customs Auto Bot Solution. All Rights Reserved.
Performance and Scaling Guide v1.0