Skip to content

draft: adding filter eval for client spans #250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@
// for use in a grpc.Dial call.
// Interceptor format will be replaced with the stats.Handler since instrumentation has moved to the stats.Handler.
// See: https://github.com/open-telemetry/opentelemetry-go-contrib/blob/v1.36.0/instrumentation/google.golang.org/grpc/otelgrpc/example_test.go
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
o := &options{}
for _, opt := range opts {
opt(o)
}

Check warning on line 18 in instrumentation/hypertrace/google.golang.org/hypergrpc/client.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/hypertrace/google.golang.org/hypergrpc/client.go#L14-L18

Added lines #L14 - L18 were not covered by tests

return sdkgrpc.WrapUnaryClientInterceptor(
grpcunaryinterceptors.UnaryClientInterceptor(),
opentelemetry.SpanFromContext,
o.toSDKOptions(),

Check warning on line 23 in instrumentation/hypertrace/google.golang.org/hypergrpc/client.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/hypertrace/google.golang.org/hypergrpc/client.go#L23

Added line #L23 was not covered by tests
map[string]string{},
)
}
9 changes: 7 additions & 2 deletions instrumentation/hypertrace/net/hyperhttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@

// NewTransport wraps the provided http.RoundTripper with one that
// starts a span and injects the span context into the outbound request headers.
func NewTransport(base http.RoundTripper) http.RoundTripper {
func NewTransport(base http.RoundTripper, opts ...Option) http.RoundTripper {
o := &options{}
for _, opt := range opts {
opt(o)
}

Check warning on line 17 in instrumentation/hypertrace/net/hyperhttp/transport.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/hypertrace/net/hyperhttp/transport.go#L13-L17

Added lines #L13 - L17 were not covered by tests

return otelhttp.NewTransport(
sdkhttp.WrapTransport(base, opentelemetry.SpanFromContext, map[string]string{}),
sdkhttp.WrapTransport(base, opentelemetry.SpanFromContext, o.toSDKOptions(), map[string]string{}),

Check warning on line 20 in instrumentation/hypertrace/net/hyperhttp/transport.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/hypertrace/net/hyperhttp/transport.go#L20

Added line #L20 was not covered by tests
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import (

// WrapUnaryClientInterceptor returns a new unary client interceptor that will
// complement existing OpenTelemetry instrumentation
func WrapUnaryClientInterceptor(delegate grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
return sdkgrpc.WrapUnaryClientInterceptor(delegate, opentelemetry.SpanFromContext, map[string]string{})
func WrapUnaryClientInterceptor(delegate grpc.UnaryClientInterceptor, options *sdkgrpc.Options) grpc.UnaryClientInterceptor {
return sdkgrpc.WrapUnaryClientInterceptor(delegate, opentelemetry.SpanFromContext, options, map[string]string{})
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/hypertrace/goagent/instrumentation/opentelemetry/google.golang.org/hypergrpc/internal/helloworld"
"github.com/hypertrace/goagent/instrumentation/opentelemetry/grpcunaryinterceptors"
"github.com/hypertrace/goagent/instrumentation/opentelemetry/internal/tracetesting"
sdkgrpc "github.com/hypertrace/goagent/sdk/instrumentation/google.golang.org/grpc"
"github.com/stretchr/testify/assert"
otelcodes "go.opentelemetry.io/otel/codes"
"google.golang.org/grpc"
Expand Down Expand Up @@ -39,6 +40,7 @@ func TestClientHelloWorldSuccess(t *testing.T) {
grpc.WithUnaryInterceptor(
WrapUnaryClientInterceptor(
grpcunaryinterceptors.UnaryClientInterceptor(),
&sdkgrpc.Options{},
),
),
)
Expand Down Expand Up @@ -113,6 +115,7 @@ func TestClientRegisterPersonFails(t *testing.T) {
grpc.WithUnaryInterceptor(
WrapUnaryClientInterceptor(
grpcunaryinterceptors.UnaryClientInterceptor(),
&sdkgrpc.Options{},
),
),
)
Expand Down Expand Up @@ -159,6 +162,7 @@ func BenchmarkClientRequestResponseBodyMarshaling(b *testing.B) {
grpc.WithUnaryInterceptor(
WrapUnaryClientInterceptor(
grpcunaryinterceptors.UnaryClientInterceptor(),
&sdkgrpc.Options{},
),
),
)
Expand Down
4 changes: 2 additions & 2 deletions instrumentation/opentelemetry/net/hyperhttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import (
// WrapTransport wraps an uninstrumented RoundTripper (e.g. http.DefaultTransport)
// and returns an instrumented RoundTripper that has to be used as base for the
// OTel's RoundTripper.
func WrapTransport(delegate http.RoundTripper) http.RoundTripper {
return sdkhttp.WrapTransport(delegate, opentelemetry.SpanFromContext, map[string]string{})
func WrapTransport(delegate http.RoundTripper, options *sdkhttp.Options) http.RoundTripper {
return sdkhttp.WrapTransport(delegate, opentelemetry.SpanFromContext, options, map[string]string{})
}
15 changes: 7 additions & 8 deletions instrumentation/opentelemetry/net/hyperhttp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ import (
"net/http/httptest"
"testing"

"google.golang.org/protobuf/types/known/wrapperspb"

config "github.com/hypertrace/agent-config/gen/go/v1"
"github.com/hypertrace/goagent/instrumentation/opentelemetry/internal/tracetesting"
sdkconfig "github.com/hypertrace/goagent/sdk/config"
"go.opentelemetry.io/otel/propagation"

sdkhttp "github.com/hypertrace/goagent/sdk/instrumentation/net/http"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/wrapperspb"
)

func TestClientRequestIsSuccessfullyTraced(t *testing.T) {
Expand Down Expand Up @@ -53,7 +52,7 @@ func TestClientRequestIsSuccessfullyTraced(t *testing.T) {

client := &http.Client{
Transport: otelhttp.NewTransport(
WrapTransport(http.DefaultTransport),
WrapTransport(http.DefaultTransport, &sdkhttp.Options{}),
),
}

Expand Down Expand Up @@ -104,7 +103,7 @@ func TestClientFailureRequestIsSuccessfullyTraced(t *testing.T) {
expectedErr := errors.New("roundtrip error")
client := &http.Client{
Transport: otelhttp.NewTransport(
WrapTransport(failingTransport{expectedErr}),
WrapTransport(failingTransport{expectedErr}, &sdkhttp.Options{}),
),
}

Expand Down Expand Up @@ -164,7 +163,7 @@ func TestClientRecordsRequestAndResponseBodyAccordingly(t *testing.T) {

client := &http.Client{
Transport: otelhttp.NewTransport(
WrapTransport(http.DefaultTransport),
WrapTransport(http.DefaultTransport, &sdkhttp.Options{}),
),
}

Expand Down Expand Up @@ -216,7 +215,7 @@ func TestTransportRequestInjectsHeadersSuccessfully(t *testing.T) {

client := &http.Client{
Transport: otelhttp.NewTransport(
WrapTransport(http.DefaultTransport),
WrapTransport(http.DefaultTransport, &sdkhttp.Options{}),
),
}

Expand Down
30 changes: 29 additions & 1 deletion sdk/instrumentation/google.golang.org/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,27 @@
"strings"

"github.com/hypertrace/goagent/sdk"
codes "github.com/hypertrace/goagent/sdk"
"github.com/hypertrace/goagent/sdk/filter"
internalconfig "github.com/hypertrace/goagent/sdk/internal/config"
"github.com/hypertrace/goagent/sdk/internal/container"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// WrapUnaryClientInterceptor returns an interceptor that records the request and response message's body
// and serialize it as JSON.
func WrapUnaryClientInterceptor(delegateInterceptor grpc.UnaryClientInterceptor, spanFromContext sdk.SpanFromContext,
func WrapUnaryClientInterceptor(
delegateInterceptor grpc.UnaryClientInterceptor,
spanFromContext sdk.SpanFromContext,
options *Options,
spanAttributes map[string]string) grpc.UnaryClientInterceptor {
var filter filter.Filter = &filter.NoopFilter{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the filter be an interface type instead of just using the NoopFilter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its an interface type. I am initialising the variable in this way because the interface variable can be assigned from 2 different interface implementations

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I see we are reading it from the options below too.

if options != nil && options.Filter != nil {
filter = options.Filter
}

defaultAttributes := map[string]string{
"rpc.system": "grpc",
}
Expand Down Expand Up @@ -60,6 +71,23 @@
setAttributesFromRequestOutgoingMetadata(ctx, span)
}

fr := filter.Evaluate(span)
if fr.Block {
statusText := StatusText(int(fr.ResponseStatusCode))
statusCode := StatusCode(int(fr.ResponseStatusCode))
span.SetStatus(codes.StatusCodeError, statusText)
span.SetAttribute("rpc.grpc.status_code", statusCode)
return status.Error(statusCode, statusText)
} else if fr.Decorations != nil {
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for _, header := range fr.Decorations.RequestHeaderInjections {
md.Append(header.Key, header.Value)
span.SetAttribute("rpc.request.metadata."+header.Key, header.Value)
}
ctx = metadata.NewIncomingContext(ctx, md)

Check warning on line 87 in sdk/instrumentation/google.golang.org/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

sdk/instrumentation/google.golang.org/grpc/client.go#L82-L87

Added lines #L82 - L87 were not covered by tests
}
}

err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
return err
Expand Down
90 changes: 90 additions & 0 deletions sdk/instrumentation/google.golang.org/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import (
"strings"
"testing"

"github.com/hypertrace/goagent/sdk"
"github.com/hypertrace/goagent/sdk/filter/result"
"github.com/hypertrace/goagent/sdk/instrumentation/google.golang.org/grpc/internal/helloworld"
"github.com/hypertrace/goagent/sdk/internal/mock"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

func makeMockUnaryClientInterceptor(mockSpans *[]*mock.Span) grpc.UnaryClientInterceptor {
Expand Down Expand Up @@ -46,6 +50,7 @@ func TestUnaryClientHelloWorldSuccess(t *testing.T) {
WrapUnaryClientInterceptor(
makeMockUnaryClientInterceptor(&spans),
mock.SpanFromContext,
&Options{},
map[string]string{"foo": "bar"},
),
),
Expand Down Expand Up @@ -191,6 +196,7 @@ func TestBodyTruncation(t *testing.T) {
WrapUnaryClientInterceptor(
makeMockUnaryClientInterceptor(&spans),
mock.SpanFromContext,
&Options{},
map[string]string{"foo": "bar"},
),
),
Expand Down Expand Up @@ -244,3 +250,87 @@ func TestBodyTruncation(t *testing.T) {
_ = span.ReadAttribute("container_id") // needed in containarized envs
assert.Zero(t, span.RemainingAttributes(), "unexpected remaining attribute: %v", span.Attributes)
}

func TestClientFilter(t *testing.T) {

s := grpc.NewServer()
defer s.Stop()

helloworld.RegisterGreeterServer(s, &server{
replyHeader: metadata.Pairs("test_header_key", "test_header_value"),
replyTrailer: metadata.Pairs("test_trailer_key", "test_trailer_value"),
})

dialer := createDialer(s)

tests := []struct {
name string
block bool
}{
{
name: "blocking disabled",
block: false,
},
{
name: "blocking enabled",
block: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
spans := []*mock.Span{}
ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(
WrapUnaryClientInterceptor(
makeMockUnaryClientInterceptor(&spans),
mock.SpanFromContext,
&Options{
Filter: mock.Filter{
Evaluator: func(span sdk.Span) result.FilterResult {
span.SetAttribute("filter.evaluated", true)
return result.FilterResult{
Block: tt.block,
ResponseStatusCode: 403,
ResponseMessage: "Access Denied",
}
},
},
},
map[string]string{"foo": "bar"},
),
),
)
if err != nil {
t.Fatalf("failed to dial bufnet: %v", err)
}
defer conn.Close()

client := helloworld.NewGreeterClient(conn)

ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key_1", "test_value_1"))
_, err = client.SayHello(
ctx,
&helloworld.HelloRequest{
Name: "Cuchi",
},
)

if !tt.block {
assert.NoError(t, err)
} else {
assert.Error(t, err)
assert.Equal(t, codes.PermissionDenied, status.Code(err))
}

assert.Equal(t, 1, len(spans))
span := spans[0]
assert.True(t, span.ReadAttribute("filter.evaluated").(bool))
})
}
}
41 changes: 39 additions & 2 deletions sdk/instrumentation/net/http/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
"bytes"
"io"
"net/http"
"strings"

config "github.com/hypertrace/agent-config/gen/go/v1"
"github.com/hypertrace/goagent/sdk"
codes "github.com/hypertrace/goagent/sdk"
"github.com/hypertrace/goagent/sdk/filter"
internalconfig "github.com/hypertrace/goagent/sdk/internal/config"
"github.com/hypertrace/goagent/sdk/internal/container"
)
Expand All @@ -18,6 +21,7 @@
defaultAttributes map[string]string
spanFromContextRetriever sdk.SpanFromContext
dataCaptureConfig *config.DataCapture
filter filter.Filter
}

func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
Expand Down Expand Up @@ -58,6 +62,29 @@
req.Body = io.NopCloser(bytes.NewBuffer(body))
}

filterResult := rt.filter.Evaluate(span)
if filterResult.Block {
Copy link
Collaborator

@prodion23 prodion23 Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this for blocking client requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since we are anyways calling libtraceable, thought we can add blocking support as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I guess I'm wondering on use case, I understand that because we want to add sampling, we could add blocking support.
I think question is if we should.

This seems like it introduces ambiguity in our blocking feature as a whole. Up to this point, blocking has specifically been about preventing bad requests from making it into the system.

Since we don't currently propagate string taint from the original request to downstream client spans, it's unclear what would drive the decision to block here. The only scenarios I can think of are:

1.) The app loads something from a DB or config and decides to block the outbound request - which is totally unrelated to the original inbound request.
2.) We're trying to enforce some kind of policy based purely on app-generated or static context - which feels more like something the app should handle directly.

If we allow blocking here, we're essentially saying: "some internal logic or app state, something not tied to the inbound request caused us to block an outbound call."

Copy link
Contributor Author

@varkey98 varkey98 Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. But we also have DLP based blocking which is basically to prevent data leaks and was made for client spans only, but at that time none of our agents did blocking eval on client spans and hence user said they'll put all of their egress under a proxy and instrument our agent there so that DLP based blocking will work.
Also, I was thinking that even if we dont turn it on today, we can add the blocking capability in code and later when we want to turn it on, we can enable blocking for client spans as well, since we have a config on libtraceable, which by default skips client spans for blocking eval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if you think even having the capability makes our feature ambiguous, then definitely removing this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, okay, but don't remove just because I don't think worth it -

i think worth identifying what we are trying to do..(I see you also did header injections) so is the purpose of these tickets to add sampling support, or add sampling support, header injections & blocking.
(my main intention was to ensure feature parity in the lang agents.., if we are adding these here, we need to make sure we do them in other lang agents)

I'd think this is a product question, if we see value in blocking client spans then sure..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, lets start a thread with protection team/pm involved as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sampling makes sense for client spans. Blocking I am not so sure. Let's get clarity from PM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have started a thread here: https://harness.slack.com/archives/C08SEG313E3/p1751005003984389

We kind of discussed it back when rolling dlp out, but since its a huge lift, never prioritised it. I am unsure if we should prioritise it now as well, but I thought I can add it here since its not a lot of extra changes from adding sampling support to get it supported in goagent.

span.SetStatus(codes.StatusCodeError, "Access Denied")
span.SetAttribute("http.status_code", filterResult.ResponseStatusCode)
return &http.Response{
Status: "Access Denied",
StatusCode: int(filterResult.ResponseStatusCode),
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Request: req,
Header: map[string][]string{
"Content-Type": {"text/plain"},
},
Body: io.NopCloser(strings.NewReader(filterResult.ResponseMessage)),
}, nil
} else if filterResult.Decorations != nil {
for _, header := range filterResult.Decorations.RequestHeaderInjections {
req.Header.Add(header.Key, header.Value)
span.SetAttribute("http.request.header."+header.Key, header.Value)
}

Check warning on line 85 in sdk/instrumentation/net/http/transport.go

View check run for this annotation

Codecov / codecov/patch

sdk/instrumentation/net/http/transport.go#L82-L85

Added lines #L82 - L85 were not covered by tests
}

res, err := rt.delegate.RoundTrip(req)
if err != nil {
return res, err
Expand Down Expand Up @@ -90,7 +117,7 @@

// WrapTransport returns a new http.RoundTripper that should be wrapped
// by an instrumented http.RoundTripper
func WrapTransport(delegate http.RoundTripper, spanFromContextRetriever sdk.SpanFromContext, spanAttributes map[string]string) http.RoundTripper {
func WrapTransport(delegate http.RoundTripper, spanFromContextRetriever sdk.SpanFromContext, options *Options, spanAttributes map[string]string) http.RoundTripper {
defaultAttributes := make(map[string]string)
for k, v := range spanAttributes {
defaultAttributes[k] = v
Expand All @@ -99,5 +126,15 @@
defaultAttributes["container_id"] = containerID
}

return &roundTripper{delegate, defaultAttributes, spanFromContextRetriever, internalconfig.GetConfig().GetDataCapture()}
var filter filter.Filter = &filter.NoopFilter{}
if options != nil && options.Filter != nil {
filter = options.Filter
}
return &roundTripper{
delegate: delegate,
defaultAttributes: defaultAttributes,
spanFromContextRetriever: spanFromContextRetriever,
dataCaptureConfig: internalconfig.GetConfig().GetDataCapture(),
filter: filter,
}
}
Loading
Loading