Skip to content

Commit 900f74b

Browse files
committed
refactor: address PR review on streaming logs
- collapse upload_logs/decline_logs config fields into a single Optional[bool] (tri-state); projection to share_logs/decline_logs happens at the setup_streaming call site. - streaming.py: replace module-level globals + atexit teardown with a StreamingLogs context manager. set_run_status disappears entirely — __exit__ infers the run status from the exception that closed the with block. set_report_run_id is now an instance method. Logger handler wiring iterates over (cli_logger, sdk_logger) instead of repeating itself. - log_uploader.py: drop _LEVEL_MAP, use logging.getLevelName directly. Wire format changes WARN/ERROR-for-CRITICAL to WARNING/CRITICAL. - log_uploader.py: tidy BatchedLogUploader.stop so the final _flush always runs and the thread shutdown only runs when there is a thread. - socketcli.py: wrap main_code body in 'with setup_streaming(...) as streaming:'; cli()'s exception handlers no longer need to set status before re-raising. - tests updated for the CM API and the new level strings.
1 parent 678518c commit 900f74b

6 files changed

Lines changed: 883 additions & 858 deletions

File tree

socketsecurity/config.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,9 @@ class CliConfig:
139139
ignore_commit_files: bool = False
140140
disable_blocking: bool = False
141141
disable_ignore: bool = False
142-
upload_logs: bool = False
143-
decline_logs: bool = False
142+
# Tri-state log-upload preference: True = --upload-logs, False = --no-upload-logs,
143+
# None = neither (server-side override decides).
144+
upload_logs: Optional[bool] = None
144145
strict_blocking: bool = False
145146
integration_type: IntegrationType = "api"
146147
integration_org_slug: Optional[str] = None
@@ -216,6 +217,9 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig':
216217

217218
if args.upload_logs and args.decline_logs:
218219
parser.error("--upload-logs and --no-upload-logs are mutually exclusive")
220+
upload_logs: Optional[bool] = (
221+
True if args.upload_logs else False if args.decline_logs else None
222+
)
219223

220224
if args.reach_exclude_paths:
221225
logging.warning(
@@ -287,8 +291,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig':
287291
'ignore_commit_files': args.ignore_commit_files,
288292
'disable_blocking': args.disable_blocking,
289293
'disable_ignore': args.disable_ignore,
290-
'upload_logs': args.upload_logs,
291-
'decline_logs': args.decline_logs,
294+
'upload_logs': upload_logs,
292295
'strict_blocking': args.strict_blocking,
293296
'integration_type': args.integration,
294297
'pending_head': args.pending_head,

socketsecurity/core/log_uploader.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,6 @@
2626

2727
_FLUSH_GUARD = threading.local()
2828

29-
_LEVEL_MAP = {
30-
logging.DEBUG: "DEBUG",
31-
logging.INFO: "INFO",
32-
logging.WARNING: "WARN",
33-
logging.ERROR: "ERROR",
34-
logging.CRITICAL: "ERROR",
35-
}
36-
3729

3830
def _now_str() -> str:
3931
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
@@ -69,12 +61,10 @@ def start(self) -> None:
6961
self._thread.start()
7062

7163
def stop(self, timeout: float = 2.0) -> None:
72-
if self._thread is None:
73-
self._flush()
74-
return
75-
self._stop.set()
76-
self._thread.join(timeout=timeout)
77-
self._thread = None
64+
if self._thread is not None:
65+
self._stop.set()
66+
self._thread.join(timeout=timeout)
67+
self._thread = None
7868
self._flush()
7969

8070
def _run(self) -> None:
@@ -114,7 +104,7 @@ def emit(self, record: logging.LogRecord) -> None:
114104
try:
115105
self._uploader.add({
116106
"timestamp": _now_str(),
117-
"level": _LEVEL_MAP.get(record.levelno, "INFO"),
107+
"level": logging.getLevelName(record.levelno),
118108
"message": self.format(record),
119109
"context": self._context,
120110
})

socketsecurity/core/streaming.py

Lines changed: 112 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,118 @@
1-
"""Wire the server log streaming pipeline for one CLI run.
2-
3-
`setup_streaming` registers the run with the backend, attaches handlers that
4-
route the CLI's own log output through both the local terminal and a batched
5-
uploader, and forces the loggers into DEBUG so the upload captures everything
6-
regardless of local terminal verbosity.
7-
8-
Returns a teardown callable to invoke on exit (typically via `atexit.register`).
9-
Returns None if registration failed; in that case nothing was wired up.
1+
"""Server log streaming pipeline for one CLI run.
2+
3+
`setup_streaming` returns a `StreamingLogs` context manager. On enter it
4+
registers the run with the backend, attaches handlers that route the CLI's
5+
own log output through both the local terminal and a batched uploader, and
6+
forces the loggers into DEBUG so the upload captures everything regardless
7+
of local terminal verbosity. On exit it tears the handlers back down and
8+
finalizes the run; the status sent to finalize is inferred from the
9+
exception that closed the `with` block (success / failure / cancelled).
10+
11+
If registration fails the manager becomes a no-op — nothing is wired up
12+
and __exit__ does nothing.
1013
"""
1114

1215
import logging
13-
from typing import Callable, Optional
16+
from typing import Optional
1417

1518
from .cli_client import CliClient
1619
from .cli_run import finalize_cli_run, register_cli_run
1720
from .log_uploader import BatchedLogUploader, UploadingLogHandler
1821

19-
_run_status: str = "success"
20-
_report_run_id: Optional[str] = None
21-
22-
23-
def set_run_status(status: str) -> None:
24-
global _run_status
25-
_run_status = status
26-
2722

28-
def set_report_run_id(report_run_id: Optional[str]) -> None:
29-
global _report_run_id
30-
_report_run_id = report_run_id
23+
class StreamingLogs:
24+
def __init__(
25+
self,
26+
*,
27+
client: CliClient,
28+
cli_logger: logging.Logger,
29+
sdk_logger: logging.Logger,
30+
client_version: str,
31+
share_logs: bool,
32+
decline_logs: bool,
33+
enable_debug: bool,
34+
):
35+
self._client = client
36+
self._loggers = (cli_logger, sdk_logger)
37+
self._client_version = client_version
38+
self._share_logs = share_logs
39+
self._decline_logs = decline_logs
40+
self._enable_debug = enable_debug
41+
42+
self._run_id: Optional[str] = None
43+
self._report_run_id: Optional[str] = None
44+
self._uploader: Optional[BatchedLogUploader] = None
45+
self._upload_handler: Optional[UploadingLogHandler] = None
46+
self._terminal_handler: Optional[logging.StreamHandler] = None
47+
self._saved_levels: tuple = ()
48+
self._saved_propagate: tuple = ()
49+
50+
def set_report_run_id(self, report_run_id: Optional[str]) -> None:
51+
self._report_run_id = report_run_id
52+
53+
def __enter__(self) -> "StreamingLogs":
54+
self._run_id = register_cli_run(
55+
self._client,
56+
client_version=self._client_version,
57+
share_logs=self._share_logs,
58+
decline_logs=self._decline_logs,
59+
)
60+
cli_logger = self._loggers[0]
61+
if not self._run_id:
62+
cli_logger.debug("server log streaming not active for this run")
63+
return self
64+
65+
self._uploader = BatchedLogUploader(self._client, self._run_id)
66+
self._uploader.start()
67+
self._upload_handler = UploadingLogHandler(self._uploader, context="socket-python-cli")
68+
self._upload_handler.setFormatter(logging.Formatter("%(message)s"))
69+
70+
self._terminal_handler = logging.StreamHandler()
71+
self._terminal_handler.setLevel(logging.DEBUG if self._enable_debug else logging.INFO)
72+
self._terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
73+
74+
self._saved_levels = tuple(lg.level for lg in self._loggers)
75+
self._saved_propagate = tuple(lg.propagate for lg in self._loggers)
76+
for lg in self._loggers:
77+
lg.setLevel(logging.DEBUG)
78+
lg.propagate = False
79+
lg.addHandler(self._terminal_handler)
80+
lg.addHandler(self._upload_handler)
81+
82+
cli_logger.debug(f"server log streaming enabled (run_id={self._run_id})")
83+
return self
84+
85+
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
86+
if self._run_id is None:
87+
return False
88+
89+
status = self._status_for_exit(exc_type, exc_val)
90+
for lg in self._loggers:
91+
lg.removeHandler(self._upload_handler)
92+
self._uploader.stop()
93+
finalize_cli_run(
94+
self._client,
95+
self._run_id,
96+
status=status,
97+
report_run_id=self._report_run_id,
98+
)
99+
for lg in self._loggers:
100+
lg.removeHandler(self._terminal_handler)
101+
for lg, level, propagate in zip(self._loggers, self._saved_levels, self._saved_propagate):
102+
lg.setLevel(level)
103+
lg.propagate = propagate
104+
return False
105+
106+
@staticmethod
107+
def _status_for_exit(exc_type, exc_val) -> str:
108+
if exc_type is None:
109+
return "success"
110+
if issubclass(exc_type, KeyboardInterrupt):
111+
return "cancelled"
112+
# SystemExit with code 0 / None is a clean exit; non-zero codes signal failure.
113+
if issubclass(exc_type, SystemExit) and not getattr(exc_val, "code", None):
114+
return "success"
115+
return "failure"
31116

32117

33118
def setup_streaming(
@@ -39,49 +124,13 @@ def setup_streaming(
39124
share_logs: bool,
40125
decline_logs: bool,
41126
enable_debug: bool,
42-
) -> Optional[Callable[[], None]]:
43-
run_id = register_cli_run(
44-
client,
127+
) -> StreamingLogs:
128+
return StreamingLogs(
129+
client=client,
130+
cli_logger=cli_logger,
131+
sdk_logger=sdk_logger,
45132
client_version=client_version,
46133
share_logs=share_logs,
47134
decline_logs=decline_logs,
135+
enable_debug=enable_debug,
48136
)
49-
if not run_id:
50-
cli_logger.debug("server log streaming not active for this run")
51-
return None
52-
53-
log_uploader = BatchedLogUploader(client, run_id)
54-
log_uploader.start()
55-
upload_handler = UploadingLogHandler(log_uploader, context="socket-python-cli")
56-
upload_handler.setFormatter(logging.Formatter("%(message)s"))
57-
58-
terminal_handler = logging.StreamHandler()
59-
terminal_handler.setLevel(logging.DEBUG if enable_debug else logging.INFO)
60-
terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
61-
62-
saved_levels = (cli_logger.level, sdk_logger.level)
63-
saved_propagate = (cli_logger.propagate, sdk_logger.propagate)
64-
cli_logger.setLevel(logging.DEBUG)
65-
sdk_logger.setLevel(logging.DEBUG)
66-
cli_logger.propagate = False
67-
sdk_logger.propagate = False
68-
cli_logger.addHandler(terminal_handler)
69-
sdk_logger.addHandler(terminal_handler)
70-
cli_logger.addHandler(upload_handler)
71-
sdk_logger.addHandler(upload_handler)
72-
73-
cli_logger.debug(f"server log streaming enabled (run_id={run_id})")
74-
75-
def teardown() -> None:
76-
cli_logger.removeHandler(upload_handler)
77-
sdk_logger.removeHandler(upload_handler)
78-
log_uploader.stop()
79-
finalize_cli_run(client, run_id, status=_run_status, report_run_id=_report_run_id)
80-
cli_logger.removeHandler(terminal_handler)
81-
sdk_logger.removeHandler(terminal_handler)
82-
cli_logger.setLevel(saved_levels[0])
83-
sdk_logger.setLevel(saved_levels[1])
84-
cli_logger.propagate = saved_propagate[0]
85-
sdk_logger.propagate = saved_propagate[1]
86-
87-
return teardown

0 commit comments

Comments
 (0)