Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions agentex/src/config/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@ class EnvironmentVariables(BaseModel):
MONGODB_DATABASE_NAME: str | None = "agentex"
MONGODB_MAX_POOL_SIZE: int = 50
MONGODB_MIN_POOL_SIZE: int = 5
REDIS_MAX_CONNECTIONS: int = 50 # Increased for SSE streaming
# SSE streaming currently holds one blocking XREAD connection per connected
# client, so the pool needs headroom for peak concurrent streams per pod.
# NOTE: this is only the in-code default — deployed environments override it
# via the REDIS_MAX_CONNECTIONS env var, which is the real cap. Bumping this
# buys headroom but does NOT change the 1-connection-per-client scaling; the
# durable fix is a shared per-pod reader that fans out to in-process queues.
REDIS_MAX_CONNECTIONS: int = 200
REDIS_CONNECTION_TIMEOUT: int = 60 # Connection timeout in seconds
REDIS_SOCKET_TIMEOUT: int = 30 # Socket timeout in seconds
REDIS_STREAM_MAXLEN: int = (
Expand Down Expand Up @@ -193,7 +199,7 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
os.environ.get(EnvVarKeys.MONGODB_MIN_POOL_SIZE, "5")
),
REDIS_MAX_CONNECTIONS=int(
os.environ.get(EnvVarKeys.REDIS_MAX_CONNECTIONS, "100")
os.environ.get(EnvVarKeys.REDIS_MAX_CONNECTIONS, "200")
),
REDIS_CONNECTION_TIMEOUT=int(
os.environ.get(EnvVarKeys.REDIS_CONNECTION_TIMEOUT, "20")
Expand Down
27 changes: 24 additions & 3 deletions agentex/src/domain/use_cases/streams_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ async def stream_task_events(
ping_interval = float(
self.environment_variables.SSE_KEEPALIVE_PING_INTERVAL
) # Configurable keepalive ping interval
# Track consecutive read failures so we can back off and avoid a
# tight error loop. When the Redis pool is exhausted, every connected
# client's read fails on each cycle; without backoff this turns into a
# log-ingestion firehose (one failure per client per cycle, ~once/sec).
consecutive_errors = 0
try:
# Application-level control loop
while True:
Expand All @@ -133,6 +138,10 @@ async def stream_task_events(
last_message_time = asyncio.get_running_loop().time()
await asyncio.sleep(0.02)

# A read cycle completed without raising — the stream is
# healthy again, so reset the backoff/error counter.
consecutive_errors = 0

# If we didn't get any messages, add a small pause
# to prevent tight loops and send keepalive ping if needed
if message_count == 0:
Expand All @@ -151,13 +160,25 @@ async def stream_task_events(
)
raise
except Exception as e:
consecutive_errors += 1
# Always log the full traceback — nothing is swallowed.
# Volume is controlled two ways instead of by dropping
# diagnostics: structured JSON logging keeps each traceback
# to a single log entry (see utils.logging), and the
# exponential backoff below caps how often a sustained
# failure can repeat. The failure counter gives context on
# how long a stream has been erroring.
logger.error(
f"Error processing events for task {task_id}: {e}",
f"Error processing events for task {task_id} "
f"(failure #{consecutive_errors}): {e}",
exc_info=True,
)
yield f"data: {TaskStreamErrorEventEntity(type='error', message=str(e)).model_dump_json()}\n\n"
# Add a small delay before continuing
await asyncio.sleep(1)
# Exponential backoff (capped) so a sustained failure (e.g.
# Redis pool exhaustion) doesn't spin a tight per-client
# loop hammering Redis and flooding logs.
backoff = min(2.0 ** min(consecutive_errors - 1, 5), 30.0)
await asyncio.sleep(backoff)

except asyncio.CancelledError:
# Just exit the generator on cancellation
Expand Down
46 changes: 44 additions & 2 deletions agentex/src/utils/logging.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import contextvars
import json
import logging
import os
import re
import sys
from collections.abc import Sequence
from typing import Any

import ddtrace
import json_log_formatter
Expand All @@ -14,6 +16,17 @@
# Check if Datadog is configured
_is_datadog_configured = bool(os.environ.get("DD_AGENT_HOST"))

# Emit structured JSON logs in all deployed environments. JSON keeps a
# multi-line traceback (exc_info=True) as a single log entry — the newlines
# live inside the quoted `exc_info` field — instead of fanning out into one
# cluster-log entry per traceback line. Splitting tracebacks per line was a
# primary multiplier behind a log-ingestion spike on a plain-text-logging
# cluster. Local development keeps plain text for readable console output;
# JSON is the default everywhere else (including when ENVIRONMENT is unset)
# so a deployed cluster can never silently fall back to per-line tracebacks.
_is_local_dev = os.environ.get("ENVIRONMENT", "").lower() == "development"
_use_json_logs = not _is_local_dev

# Include Datadog trace IDs only when Datadog is configured
if _is_datadog_configured:
LOG_FORMAT: str = (
Expand Down Expand Up @@ -89,6 +102,30 @@ def filter(self, record: logging.LogRecord) -> bool:
_sensitive_data_filter = SensitiveDataFilter()


# Cap the size of individual structured fields in JSON logs. Request logging
# (LoggedAPIRoute.log_request) attaches the decoded body, headers, and
# query_params as ``extra``, and the JSON formatter serializes ``extra`` —
# unlike the plain-text formatter, which drops it. Without a cap, a single
# large request payload would create a very large per-request log entry on
# every request, reintroducing the log-volume problem this mode exists to
# avoid. ``exc_info`` is exempt: a traceback is bounded per error (not per
# request) and the full stack is worth keeping.
_MAX_JSON_FIELD_CHARS = 4096
_UNCAPPED_JSON_FIELDS = frozenset({"exc_info"})


def _truncate_log_value(value: Any) -> Any:
"""Return ``value`` unchanged if small, else a truncated string marker."""
try:
rendered = value if isinstance(value, str) else json.dumps(value, default=str)
except (TypeError, ValueError):
rendered = str(value)
if len(rendered) <= _MAX_JSON_FIELD_CHARS:
return value
dropped = len(rendered) - _MAX_JSON_FIELD_CHARS
return rendered[:_MAX_JSON_FIELD_CHARS] + f"...[truncated {dropped} chars]"


class CustomJSONFormatter(json_log_formatter.JSONFormatter):
def json_record(self, message: str, extra: dict, record: logging.LogRecord) -> dict:
extra = super().json_record(message, extra, record)
Expand Down Expand Up @@ -123,7 +160,12 @@ def json_record(self, message: str, extra: dict, record: logging.LogRecord) -> d
if version_override:
extra["dd.version"] = version_override

return extra
# Bound per-field size so large request bodies/headers/query_params
# logged via `extra` can't create oversized entries on every request.
return {
k: v if k in _UNCAPPED_JSON_FIELDS else _truncate_log_value(v)
for k, v in extra.items()
}
Comment on lines +165 to +168

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Small bodies still log

This only caps oversized structured fields; it does not stop JSON logging from serializing request extra fields. LoggedAPIRoute.log_request still passes body, headers, and query_params, and strip_sensitive_items only removes blacklisted keys. In non-development environments, any non-blacklisted request body under 4096 characters is still emitted into production logs, which can expose request payload data and keeps per-request body logging enabled. Please drop or allowlist these structured request fields before JSON serialization rather than only truncating large values.

Rule Used: What: Never log full response bodies, request bodi... (source)

Artifacts

Repro: focused runtime harness exercising log_request and JSON formatting

  • Contains supporting evidence from the run (text/x-python; charset=utf-8).

Repro: execution output showing serialized body headers and query_params in JSON log

  • Keeps the command output available without making the summary code-heavy.

View artifacts

T-Rex Ran code and verified through T-Rex

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/utils/logging.py
Line: 165-168

Comment:
**Small bodies still log**

This only caps oversized structured fields; it does not stop JSON logging from serializing request `extra` fields. `LoggedAPIRoute.log_request` still passes `body`, `headers`, and `query_params`, and `strip_sensitive_items` only removes blacklisted keys. In non-development environments, any non-blacklisted request body under 4096 characters is still emitted into production logs, which can expose request payload data and keeps per-request body logging enabled. Please drop or allowlist these structured request fields before JSON serialization rather than only truncating large values.

**Rule Used:** What: Never log full response bodies, request bodi... ([source](https://app.greptile.com/scale-ai/-/custom-context?memory=fa8d684f-4686-4f3e-b1ef-c27453f614ea))

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — this is a fair principle, but it's pre-existing behavior rather than something this PR introduces, so handling it as a scoped follow-up:

  • log_request already logs the request body today (it interpolates the decoded body into the message string, not just extra), so request-body logging predates this change on every cluster. This PR adds a now-bounded structured copy; it doesn't turn body logging on.
  • Sensitive keys are already stripped (strip_sensitive_items: api_key/token/password/secret/cookie/authorization/acting-user), and messages also run value-level redact_sensitive_text.
  • Per-field volume is now capped at 4KB.

Fully satisfying the "never log request bodies" rule means dropping/allowlisting body+headers+query_params in log_request, which changes established request-logging behavior on all clusters (incl. Datadog) — out of scope for a log-volume mitigation. Filed as a follow-up: AGX1-405. Merging this PR (approved + green) and addressing body logging there.



def make_logger(name: str) -> logging.Logger:
Expand All @@ -134,7 +176,7 @@ def make_logger(name: str) -> logging.Logger:

logger = logging.getLogger(name)
stream_handler = logging.StreamHandler()
if _is_datadog_configured:
if _use_json_logs:
stream_handler.setFormatter(CustomJSONFormatter())
Comment thread
greptile-apps[bot] marked this conversation as resolved.
else:
stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))
Expand Down
Loading