-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathpipeline.py
More file actions
49 lines (38 loc) · 1.68 KB
/
pipeline.py
File metadata and controls
49 lines (38 loc) · 1.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
"""Safety pipeline — runs layers in order, first non-PASS short-circuits.
Airport-security model: SQL walks a *line* of layers. The first layer that
does not return PASS decides the outcome and the rest are skipped. If every
layer PASSes, the pipeline returns a PASS carrying the (possibly rewritten)
SQL forward — so a layer that REWRITEs is itself a non-PASS short-circuit in
V1, while accumulated rewrites only matter once we have multiple rewriting
layers (V1.5+).
"""
from __future__ import annotations
from typing import Sequence
from ..core.ports.safety import (
SafetyContext,
SafetyDecision,
SafetyLayerPort,
Verdict,
)
from .layers import TimeoutLayer, WhitelistLayer
def _default_layers() -> list[SafetyLayerPort]:
# Whitelist first (cheap, fail-closed reject), then Timeout (exec config).
return [WhitelistLayer(), TimeoutLayer()]
class SafetyPipeline:
"""Ordered safety layers. Implements ``SafetyPipelinePort``."""
def __init__(self, layers: Sequence[SafetyLayerPort] | None = None) -> None:
self._layers: list[SafetyLayerPort] = (
list(layers) if layers is not None else _default_layers()
)
@property
def layers(self) -> Sequence[SafetyLayerPort]:
return self._layers
def evaluate(self, sql: str, ctx: SafetyContext) -> SafetyDecision:
current = sql
for layer in self._layers:
decision = layer.check(current, ctx)
if decision.verdict is not Verdict.PASS:
return decision
# Carry any rewritten SQL forward to the next layer.
current = decision.sql
return SafetyDecision(verdict=Verdict.PASS, sql=current, layer="pipeline")