Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions openfaas-function/llm-global-spend/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def __init__(self, base_url: Optional[str] = None, api_key: Optional[str] = None
"accept": "application/json",
"x-goog-api-key": api_key,
}
logger.debug(f"LiteLLMAPIClient initialized with base URL: {self.base_url}")
logger.debug(
f"LiteLLMAPIClient initialized with base URL: {self.base_url}")

def fetch_spending_logs(
self, user_id=None, start_date="2024-06-01", end_date="2025-12-31"
Expand All @@ -51,20 +52,23 @@ def fetch_spending_logs(
requests.exceptions.RequestException: If the API request fails
"""
spending_logs_endpoint = f"{self.base_url}/spend/logs"
params = {"user_id": user_id, "start_date": start_date, "end_date": end_date}
params = {"user_id": user_id,
"start_date": start_date, "end_date": end_date}

logger.info(
f"Fetching spend logs for user {user_id} from {start_date} to {end_date}"
)
logger.debug(f"API request to {spending_logs_endpoint} with params: {params}")
logger.debug(
f"API request to {spending_logs_endpoint} with params: {params}")

try:
response = requests.get(
spending_logs_endpoint, headers=self.headers, params=params
)
response.raise_for_status()
data = response.json()
logger.success(f"Successfully fetched {len(data)} spending log entries")
logger.success(
f"Successfully fetched {len(data)} spending log entries")
return data
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch spending logs: {str(e)}")
Expand All @@ -82,7 +86,8 @@ def __init__(self, spend_data: List[Dict]):
spend_data: List of spending records
"""
self.spend_data = spend_data
logger.debug(f"SpendAnalyzer initialized with {len(spend_data)} records")
logger.debug(
f"SpendAnalyzer initialized with {len(spend_data)} records")

def get_total_spend(self) -> float:
"""Calculate the total spend across all records"""
Expand All @@ -103,20 +108,23 @@ def get_filtered_spend(
Returns:
float: Total spend in the specified date range
"""
logger.debug(f"Calculating filtered spend from {start_date} to {end_date}")
logger.debug(
f"Calculating filtered spend from {start_date} to {end_date}")
filtered_spend = 0
valid_entries = 0
invalid_entries = 0

for entry in self.spend_data:
date_string = entry.get("startTime", "")
try:
entry_date = datetime.datetime.strptime(date_string, "%Y-%m-%d")
entry_date = datetime.datetime.strptime(
date_string, "%Y-%m-%d")
if start_date <= entry_date < end_date:
spend_amount = entry.get("spend", 0)
filtered_spend += spend_amount
valid_entries += 1
logger.debug(f"Added spend entry: {date_string} = {spend_amount}")
logger.debug(
f"Added spend entry: {date_string} = {spend_amount}")
except ValueError:
logger.warning(f"Invalid date format in entry: {date_string}")
invalid_entries += 1
Expand All @@ -131,7 +139,8 @@ def get_current_month_spend(self) -> float:
"""Calculate spend for the current month"""
now = datetime.datetime.now()
first_day = now.replace(day=1)
next_month = (now.replace(day=28) + datetime.timedelta(days=4)).replace(day=1)
next_month = (now.replace(day=28) +
datetime.timedelta(days=4)).replace(day=1)

logger.info(
f"Calculating current month spend ({first_day.strftime('%Y-%m-%d')} to {next_month.strftime('%Y-%m-%d')})"
Expand All @@ -144,7 +153,8 @@ def get_today_spend(self) -> float:
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow = today + datetime.timedelta(days=1)

logger.info(f"Calculating today's spend ({today.strftime('%Y-%m-%d')})")
logger.info(
f"Calculating today's spend ({today.strftime('%Y-%m-%d')})")
return self.get_filtered_spend(today, tomorrow)


Expand Down Expand Up @@ -192,7 +202,8 @@ def handle(event, context):
"today_spend": float(today_spend),
}

logger.success(f"Successfully processed request, returning data: {response}")
logger.success(
f"Successfully processed request, returning data: {response}")
return {
"statusCode": 200,
"body": json.dumps(response),
Expand Down
12 changes: 8 additions & 4 deletions openfaas-function/spot-start-service/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
ENDPOINTS_CONFIG = PORTAINER_ENDPOINTS_CONFIG.copy()
except ImportError as err_msg:
logger.exception(err_msg)
logger.exception("Failed to import constants.py. You may need to create it.")
logger.exception(
"Failed to import constants.py. You may need to create it.")
raise err_msg


Expand Down Expand Up @@ -121,7 +122,8 @@ def handle(event, context):
# Find the service name from the referral URL
service_name = service_manager.find_service_from_url(referral_url)
if not service_name:
logger.error(f"No service found for referral URL: {referral_url}")
logger.error(
f"No service found for referral URL: {referral_url}")
return build_response(
status_code=404,
body={
Expand All @@ -133,14 +135,16 @@ def handle(event, context):
# Start the service
result = service_manager.start_service(service_name)
status_code = 200 if result.get("success", False) else 500
logger.success(f"Request processed with status {status_code}: {result}")
logger.success(
f"Request processed with status {status_code}: {result}")
return build_response(status_code=status_code, body=result)

except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in request: {str(e)}")
return build_response(
status_code=400,
body={"error": "Invalid JSON in request", "event": format_event(event)},
body={"error": "Invalid JSON in request",
"event": format_event(event)},
)

except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion openfaas-function/spot-start-service/handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def test_service_manager_find_service(self):
"""Test finding service location"""
# Create manager with mock client
mock_client = MagicMock()
manager = ServiceManager(mock_client, endpoints_config=ENDPOINTS_CONFIG)
manager = ServiceManager(
mock_client, endpoints_config=ENDPOINTS_CONFIG)

# Test finding existing service
location = manager.find_service_location("service-a")
Expand Down
39 changes: 26 additions & 13 deletions openfaas-function/spot-start-service/portainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ def __init__(
self.verify_ssl = False

if not self.base_url or not self.username or not self.password:
logger.error("Portainer URL, username and password must be provided")
raise ValueError("Portainer URL, username and password must be provided")
logger.error(
"Portainer URL, username and password must be provided")
raise ValueError(
"Portainer URL, username and password must be provided")

logger.debug(f"PortainerAPIClient initialized with base URL: {self.base_url}")
logger.debug(
f"PortainerAPIClient initialized with base URL: {self.base_url}")

def authenticate(self) -> str:
"""
Expand All @@ -52,7 +55,8 @@ def authenticate(self) -> str:

if not token:
logger.error("Authentication failed: No JWT token in response")
raise ValueError("Authentication failed: No JWT token in response")
raise ValueError(
"Authentication failed: No JWT token in response")

self.jwt_token = token
logger.success("Successfully authenticated with Portainer API")
Expand Down Expand Up @@ -102,7 +106,8 @@ def get_containers(self, endpoint_id: str, docker_version: str) -> List[Dict]:
)
response.raise_for_status()
containers = response.json()
logger.success(f"Successfully fetched {len(containers)} containers")
logger.success(
f"Successfully fetched {len(containers)} containers")
return containers

except requests.exceptions.RequestException as e:
Expand Down Expand Up @@ -222,7 +227,8 @@ def find_service_location(self, service_name: str) -> Optional[Tuple[str, str]]:
)
return (endpoint_id, stack_id)

logger.warning(f"Service '{service_name}' not found in endpoints configuration")
logger.warning(
f"Service '{service_name}' not found in endpoints configuration")
return None

def check_service_health(
Expand All @@ -247,7 +253,8 @@ def check_service_health(
docker_version = endpoint_info.get("docker_version", "v1.24")

try:
containers = self.api_client.get_containers(endpoint_id, docker_version)
containers = self.api_client.get_containers(
endpoint_id, docker_version)

# Filter containers belonging to this service/stack
service_containers = []
Expand All @@ -259,7 +266,8 @@ def check_service_health(
service_containers.append(container)

if not service_containers:
logger.warning(f"No containers found for service '{service_name}'")
logger.warning(
f"No containers found for service '{service_name}'")
return (False, service_location)

# Check if all containers are healthy
Expand Down Expand Up @@ -306,7 +314,8 @@ def start_service(self, service_name: str) -> Dict[str, Any]:
endpoint_id, stack_id = location

if is_healthy:
logger.info(f"Service '{service_name}' is already running and healthy")
logger.info(
f"Service '{service_name}' is already running and healthy")
return {
"success": True,
"message": f"Service '{service_name}' is already running",
Expand All @@ -325,7 +334,8 @@ def start_service(self, service_name: str) -> Dict[str, Any]:
self.api_client.stop_stack(stack_id, endpoint_id)
time.sleep(5) # Give it some time to stop
except Exception as e:
logger.warning(f"Error stopping service (continuing anyway): {str(e)}")
logger.warning(
f"Error stopping service (continuing anyway): {str(e)}")

# Start the service
self.api_client.start_stack(stack_id, endpoint_id)
Expand All @@ -339,7 +349,8 @@ def start_service(self, service_name: str) -> Dict[str, Any]:
}

except Exception as e:
logger.exception(f"Error starting service '{service_name}': {str(e)}")
logger.exception(
f"Error starting service '{service_name}': {str(e)}")
return {"success": False, "message": f"Error starting service: {str(e)}"}

def find_service_from_url(self, referral_url: str) -> Optional[str]:
Expand Down Expand Up @@ -375,14 +386,16 @@ def find_service_from_url(self, referral_url: str) -> Optional[str]:

docker_version = endpoint_info.get("docker_version", "v1.24")
try:
containers = self.api_client.get_containers(endpoint_id, docker_version)
containers = self.api_client.get_containers(
endpoint_id, docker_version)
for container in containers:
labels: dict = container.get("Labels", {})
domain_label: str = (
labels.get("home.resolve.domain", "").lower().strip()
)
project_name: str = (
labels.get("com.docker.compose.project", "").lower().strip()
labels.get("com.docker.compose.project",
"").lower().strip()
)

# Skip if missing important labels
Expand Down
6 changes: 4 additions & 2 deletions python/delete_old_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def run(path="/path/to/downloads", exceptions=None, max_days=10):
if not any(
map(file_path.__contains__, exceptions)
) and not file_path.endswith(".!qb"):
modified_time = datetime.fromtimestamp(os.path.getmtime(file_path))
modified_time = datetime.fromtimestamp(
os.path.getmtime(file_path))
today = datetime.today()
file_age = today - modified_time
if file_age.days >= max_days:
Expand All @@ -53,7 +54,8 @@ def run(path="/path/to/downloads", exceptions=None, max_days=10):
except Exception as _err:
logging.error("Service interrupted. %s", _err)
logging.error(
"Deleted %s files of %s bytes until error.", str(del_count), str(del_size)
"Deleted %s files of %s bytes until error.", str(
del_count), str(del_size)
)


Expand Down
3 changes: 2 additions & 1 deletion python/login_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def get_key():
return response.json().get("value")
except requests.exceptions.Timeout:
logging.warning(
"[%s/%s] Timed-out so trying again.", str(count + 1), str(retries)
"[%s/%s] Timed-out so trying again.", str(
count + 1), str(retries)
)
except requests.ConnectionError as error:
logging.error(
Expand Down
3 changes: 2 additions & 1 deletion python/login_notification_ssm.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def get_key():
return response.json().get("value")
except requests.exceptions.Timeout:
logging.warning(
"[%s/%s] Timed-out so trying again.", str(count + 1), str(retries)
"[%s/%s] Timed-out so trying again.", str(
count + 1), str(retries)
)
except requests.ConnectionError as error:
logging.error(
Expand Down
3 changes: 2 additions & 1 deletion python/macro_keyboard_companion.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def turn_off_tv():
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_uname, password=ssh_pass)
# skipcq: BAN-B601
ssh.exec_command(f"echo {ssh_pass} | sudo -S systemctl stop gdm3 && exit")
ssh.exec_command(
f"echo {ssh_pass} | sudo -S systemctl stop gdm3 && exit")
ssh.close()
# skipcq: PYL-W0703
except Exception as error:
Expand Down
9 changes: 6 additions & 3 deletions python/openwebui/pipe_mcts.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ def get_chunk_content(self, chunk):
if "content" in delta:
yield delta["content"]
except json.JSONDecodeError:
logger.error('ChunkDecodeError: unable to parse "%s"', chunk_str[:100])
logger.error('ChunkDecodeError: unable to parse "%s"',
chunk_str[:100])


class Node:
Expand Down Expand Up @@ -473,7 +474,8 @@ async def evaluate_answer(self, answer: str):
score = int(re.search(r"\d+", result).group())
return score
except Exception as e:
logger.error("Failed to parse score from result: %s - %s", result, e)
logger.error(
"Failed to parse score from result: %s - %s", result, e)
return 0

async def generate_completion(self, prompt: str):
Expand Down Expand Up @@ -715,7 +717,8 @@ async def pipe(
if match:
backend, model_name = match.groups()
else:
logger.error("Model ID should be in the format '*.mcts/backend/model_name'")
logger.error(
"Model ID should be in the format '*.mcts/backend/model_name'")
logger.error("Invalid model ID: %s", model_id)
return ""

Expand Down
Loading
Loading