Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 42 additions & 45 deletions go/ai/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,7 @@ func DefineGenerateAction(ctx context.Context, r api.Registry) *generateAction {
"err", err)
}()

spanMetadata := &tracing.SpanMetadata{
Name: "generate",
Type: "util",
Subtype: "util",
}
return tracing.RunInNewSpan(ctx, spanMetadata, actionOpts,
func(ctx context.Context, actionOpts *GenerateActionOptions) (*ModelResponse, error) {
return GenerateWithRequest(ctx, r, actionOpts, nil, cb)
})
return GenerateWithRequest(ctx, r, actionOpts, nil, cb)
}))
}

Expand Down Expand Up @@ -324,52 +316,57 @@ func GenerateWithRequest(ctx context.Context, r api.Registry, opts *GenerateActi

fn := core.ChainMiddleware(mw...)(m.Generate)

currentTurn := 0
for {
resp, err := fn(ctx, req, cb)
if err != nil {
return nil, err
// Inline recursive helper function that captures variables from parent scope.
var generate func(context.Context, *ModelRequest, int) (*ModelResponse, error)

generate = func(ctx context.Context, req *ModelRequest, currentTurn int) (*ModelResponse, error) {
spanMetadata := &tracing.SpanMetadata{
Name: "generate",
Type: "util",
Subtype: "util",
}

if formatHandler != nil {
resp.Message, err = formatHandler.ParseMessage(resp.Message)
return tracing.RunInNewSpan(ctx, spanMetadata, req, func(ctx context.Context, req *ModelRequest) (*ModelResponse, error) {
resp, err := fn(ctx, req, cb)
if err != nil {
logger.FromContext(ctx).Debug("model failed to generate output matching expected schema", "error", err.Error())
return nil, core.NewError(core.INTERNAL, "model failed to generate output matching expected schema: %v", err)
return nil, err
}
}

toolCount := 0
for _, part := range resp.Message.Content {
if part.IsToolRequest() {
toolCount++
if formatHandler != nil {
resp.Message, err = formatHandler.ParseMessage(resp.Message)
if err != nil {
logger.FromContext(ctx).Debug("model failed to generate output matching expected schema", "error", err.Error())
return nil, core.NewError(core.INTERNAL, "model failed to generate output matching expected schema: %v", err)
}
}
}
if toolCount == 0 || opts.ReturnToolRequests {
return resp, nil
}

if currentTurn+1 > maxTurns {
return nil, core.NewError(core.ABORTED, "exceeded maximum tool call iterations (%d)", maxTurns)
}
if len(resp.ToolRequests()) == 0 || opts.ReturnToolRequests {
return resp, nil
}

newReq, interruptMsg, err := handleToolRequests(ctx, r, req, resp, cb)
if err != nil {
return nil, err
}
if interruptMsg != nil {
resp.FinishReason = "interrupted"
resp.FinishMessage = "One or more tool calls resulted in interrupts."
resp.Message = interruptMsg
return resp, nil
}
if newReq == nil {
return resp, nil
}
if currentTurn+1 > maxTurns {
return nil, core.NewError(core.ABORTED, "exceeded maximum tool call iterations (%d)", maxTurns)
}

req = newReq
currentTurn++
newReq, interruptMsg, err := handleToolRequests(ctx, r, req, resp, cb)
if err != nil {
return nil, err
}
if interruptMsg != nil {
resp.FinishReason = "interrupted"
resp.FinishMessage = "One or more tool calls resulted in interrupts."
resp.Message = interruptMsg
return resp, nil
}
if newReq == nil {
return resp, nil
}

return generate(ctx, newReq, currentTurn+1)
})
}

return generate(ctx, req, 0)
}

// Generate generates a model response based on the provided options.
Expand Down
77 changes: 64 additions & 13 deletions go/plugins/firebase/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,77 @@ import (
"os"

"github.com/firebase/genkit/go/plugins/googlecloud"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/oauth2/google"
)

// FirebaseTelemetryOptions provides configuration for Firebase telemetry.
// FirebaseTelemetryOptions provides comprehensive configuration for Firebase telemetry.
// This matches the Google Cloud plugin options for full compatibility, with Firebase-specific
// project ID resolution that checks FIREBASE_PROJECT_ID first.
//
// Environment Variables:
// - GENKIT_ENV: Set to "dev" to disable export unless ForceDevExport is true
// - FIREBASE_PROJECT_ID: Project ID for telemetry if not provided inline
// - GOOGLE_CLOUD_PROJECT: Project ID for telemetry if not provided inline
// - GCLOUD_PROJECT: Project ID for telemetry if not provided inline
type FirebaseTelemetryOptions struct {
// ProjectID is the Firebase/Google Cloud project ID.
// If empty, will be read from FIREBASE_PROJECT_ID or GOOGLE_CLOUD_PROJECT environment variables.
// If empty, will be auto-detected from environment variables in priority order:
// 1. FIREBASE_PROJECT_ID, 2. GOOGLE_CLOUD_PROJECT, 3. GCLOUD_PROJECT
ProjectID string

// ForceDevExport forces telemetry export even in development environment.
// Defaults to false (only exports in production unless forced).
ForceDevExport bool
// Credentials for authenticating with Google Cloud.
// If nil, uses Application Default Credentials (ADC).
// Primarily intended for use in environments outside of GCP.
// On GCP, credentials will typically be inferred from the environment via ADC.
Credentials *google.Credentials

// Sampler controls trace sampling. If nil, uses AlwaysOnSampler.
// Examples: AlwaysOnSampler, AlwaysOffSampler, TraceIdRatioBasedSampler
Sampler sdktrace.Sampler

// MetricExportIntervalMillis controls metrics export frequency.
// Google Cloud requires minimum 5000ms. Defaults to 5000 (dev) or 300000 (prod).
MetricExportIntervalMillis *int

// MetricExportTimeoutMillis controls metrics export timeout.
// Defaults to match MetricExportIntervalMillis.
MetricExportTimeoutMillis *int

// DisableMetrics disables metric export to Google Cloud.
// Traces and logs may still be exported. Defaults to false.
DisableMetrics bool

// DisableTraces disables trace export to Google Cloud.
// Metrics and logs may still be exported. Defaults to false.
DisableTraces bool

// DisableLoggingInputAndOutput disables input/output logging.
// Defaults to false (input/output logging enabled).
DisableLoggingInputAndOutput bool

// ForceDevExport forces telemetry export even in development environment.
// Defaults to false (only exports in production unless forced).
ForceDevExport bool
}

// EnableFirebaseTelemetry enables comprehensive telemetry export to Genkit Monitoring,
// backed by Google Cloud Observability (Cloud Logging, Metrics, and Trace).
//
// Example usage:
//
// // Basic usage - uses environment variables for project ID
// firebase.EnableFirebaseTelemetry(nil)
// g, err := genkit.Init(ctx, genkit.WithPlugins(&googlegenai.GoogleAI{}))
//
// // With configuration
// // With full configuration
// firebase.EnableFirebaseTelemetry(&firebase.FirebaseTelemetryOptions{
// ProjectID: "my-firebase-project",
// ForceExport: true,
// ExportInputAndOutput: true,
// ProjectID: "my-firebase-project",
// ForceDevExport: true,
// DisableLoggingInputAndOutput: false,
// DisableMetrics: false,
// DisableTraces: false,
// MetricExportIntervalMillis: &[]int{10000}[0], // 10 seconds
// })
// g, err := genkit.Init(ctx, genkit.WithPlugins(&googlegenai.GoogleAI{}))
func EnableFirebaseTelemetry(options *FirebaseTelemetryOptions) {
Expand All @@ -70,19 +111,25 @@ func initializeTelemetry(options *FirebaseTelemetryOptions) {

projectID := resolveFirebaseProjectID(options.ProjectID)
if projectID == "" {
slog.Warn("Firebase project ID not found. Set FIREBASE_PROJECT_ID or GOOGLE_CLOUD_PROJECT environment variable, or pass ProjectID in options.")
slog.Warn("Firebase project ID not found. Set FIREBASE_PROJECT_ID, GOOGLE_CLOUD_PROJECT, or GCLOUD_PROJECT environment variable, or pass ProjectID in options.")
}

gcOptions := &googlecloud.GoogleCloudTelemetryOptions{
ProjectID: projectID,
ForceDevExport: options.ForceDevExport,
Credentials: options.Credentials,
Sampler: options.Sampler,
MetricExportIntervalMillis: options.MetricExportIntervalMillis,
MetricExportTimeoutMillis: options.MetricExportTimeoutMillis,
DisableMetrics: options.DisableMetrics,
DisableTraces: options.DisableTraces,
DisableLoggingInputAndOutput: options.DisableLoggingInputAndOutput,
ForceDevExport: options.ForceDevExport,
}
googlecloud.EnableGoogleCloudTelemetry(gcOptions)
}

// resolveFirebaseProjectID resolves the Firebase project ID from various sources.
// Priority: 1) Provided projectID, 2) FIREBASE_PROJECT_ID, 3) GOOGLE_CLOUD_PROJECT
// Priority: 1) Provided projectID, 2) FIREBASE_PROJECT_ID, 3) GOOGLE_CLOUD_PROJECT, 4) GCLOUD_PROJECT
func resolveFirebaseProjectID(projectID string) string {
if projectID != "" {
return projectID
Expand All @@ -92,5 +139,9 @@ func resolveFirebaseProjectID(projectID string) string {
return envID
}

return os.Getenv("GOOGLE_CLOUD_PROJECT")
if envID := os.Getenv("GOOGLE_CLOUD_PROJECT"); envID != "" {
return envID
}

return os.Getenv("GCLOUD_PROJECT")
}
48 changes: 25 additions & 23 deletions go/plugins/googlecloud/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,31 @@ func (a *ActionTelemetry) Tick(span sdktrace.ReadOnlySpan, logInputOutput bool,

subtype := extractStringAttribute(attributes, "genkit:metadata:subtype")

if subtype == "tool" || actionName == "generate" {
path := extractStringAttribute(attributes, "genkit:path")
if path == "" {
path = "<unknown>"
}

input := truncate(extractStringAttribute(attributes, "genkit:input"))
output := truncate(extractStringAttribute(attributes, "genkit:output"))
sessionID := extractStringAttribute(attributes, "genkit:sessionId")
threadName := extractStringAttribute(attributes, "genkit:threadName")

featureName := extractOuterFeatureNameFromPath(path)
if featureName == "" || featureName == "<unknown>" {
featureName = actionName
}

if input != "" {
a.writeLog(span, "Input", featureName, path, input, projectID, sessionID, threadName)
}

if output != "" {
a.writeLog(span, "Output", featureName, path, output, projectID, sessionID, threadName)
}
if subtype != "tool" && actionName != "generate" {
return
}

path := extractStringAttribute(attributes, "genkit:path")
if path == "" {
path = "<unknown>"
}

input := truncate(extractStringAttribute(attributes, "genkit:input"))
output := truncate(extractStringAttribute(attributes, "genkit:output"))
sessionID := extractStringAttribute(attributes, "genkit:sessionId")
threadName := extractStringAttribute(attributes, "genkit:threadName")

featureName := extractOuterFeatureNameFromPath(path)
if featureName == "" || featureName == "<unknown>" {
featureName = actionName
}

if input != "" {
a.writeLog(span, "Input", featureName, path, input, projectID, sessionID, threadName)
}

if output != "" {
a.writeLog(span, "Output", featureName, path, output, projectID, sessionID, threadName)
}
}

Expand Down
20 changes: 10 additions & 10 deletions go/plugins/googlecloud/engagement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestEngagementTelemetry_PipelineIntegration(t *testing.T) {
_, span := f.tracer.Start(ctx, "test-span")

span.SetAttributes(
attribute.String("genkit:type", "action"), // Required for telemetry processing
attribute.String("genkit:type", "userEngagement"), // Required for telemetry processing
attribute.String("genkit:metadata:subtype", "userFeedback"),
attribute.String("genkit:path", "/{testFlow,t:flow}/{myAction,t:action}"),
attribute.String("genkit:metadata:feedbackValue", "positive"),
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestEngagementTelemetry_MetricCapture(t *testing.T) {
{
name: "user feedback captures metrics correctly",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:metadata:subtype": "userFeedback",
"genkit:path": "/{chatFlow,t:flow}/{generateResponse,t:action}",
"genkit:metadata:feedbackValue": "positive",
Expand All @@ -190,7 +190,7 @@ func TestEngagementTelemetry_MetricCapture(t *testing.T) {
{
name: "user feedback without text",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:metadata:subtype": "userFeedback",
"genkit:path": "/{testFlow,t:flow}/{myAction,t:action}",
"genkit:metadata:feedbackValue": "negative",
Expand All @@ -204,7 +204,7 @@ func TestEngagementTelemetry_MetricCapture(t *testing.T) {
{
name: "user acceptance captures metrics correctly",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:metadata:subtype": "userAcceptance",
"genkit:path": "/{codeAssistant,t:flow}/{suggestCode,t:action}",
"genkit:metadata:acceptanceValue": "accepted",
Expand All @@ -217,7 +217,7 @@ func TestEngagementTelemetry_MetricCapture(t *testing.T) {
{
name: "unknown subtype captures no metrics",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:metadata:subtype": "unknownType",
"genkit:path": "/{testFlow,t:flow}/{myAction,t:action}",
},
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestEngagementTelemetry_ComprehensiveScenarios(t *testing.T) {
{
name: "user feedback with text",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:metadata:subtype": "userFeedback",
"genkit:path": "/{chatFlow,t:flow}/{generateResponse,t:action}",
"genkit:metadata:feedbackValue": "positive",
Expand All @@ -379,7 +379,7 @@ func TestEngagementTelemetry_ComprehensiveScenarios(t *testing.T) {
{
name: "user feedback without text",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:metadata:subtype": "userFeedback",
"genkit:path": "/{testFlow,t:flow}/{myAction,t:action}",
"genkit:metadata:feedbackValue": "negative",
Expand All @@ -391,7 +391,7 @@ func TestEngagementTelemetry_ComprehensiveScenarios(t *testing.T) {
{
name: "user acceptance",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:metadata:subtype": "userAcceptance",
"genkit:path": "/{codeAssistant,t:flow}/{suggestCode,t:action}",
"genkit:metadata:acceptanceValue": "accepted",
Expand All @@ -403,7 +403,7 @@ func TestEngagementTelemetry_ComprehensiveScenarios(t *testing.T) {
{
name: "unknown subtype",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:metadata:subtype": "unknownType",
"genkit:path": "/{testFlow,t:flow}/{myAction,t:action}",
},
Expand All @@ -413,7 +413,7 @@ func TestEngagementTelemetry_ComprehensiveScenarios(t *testing.T) {
{
name: "no subtype",
attrs: map[string]string{
"genkit:type": "action",
"genkit:type": "userEngagement",
"genkit:path": "/{testFlow,t:flow}/{myAction,t:action}",
},
expectLog: false,
Expand Down
5 changes: 0 additions & 5 deletions go/plugins/googlecloud/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ func NewGenerateTelemetry() *GenerateTelemetry {
func (g *GenerateTelemetry) Tick(span sdktrace.ReadOnlySpan, logInputOutput bool, projectID string) {
attributes := span.Attributes()

subtype := extractStringAttribute(attributes, "genkit:metadata:subtype")
if subtype != "model" {
return
}

modelName := truncate(extractStringAttribute(attributes, "genkit:name"), 1024)
path := extractStringAttribute(attributes, "genkit:path")
inputStr := extractStringAttribute(attributes, "genkit:input")
Expand Down
Loading
Loading