Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion core/services/workflows/syncer/v2/grpc_workflow_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,39 @@ func (g *GRPCWorkflowSource) ListWorkflowMetadata(ctx context.Context, don capab
return nil, nil, err
}

g.lggr.Debugw("[DEBUG-TRACE] Raw proto workflows received from GRPC client",
"count", len(workflows),
"start", start,
"hasMore", hasMore,
)

// Convert workflows to views, skipping invalid ones
for _, wf := range workflows {
for i, wf := range workflows {
g.lggr.Debugw("[DEBUG-TRACE] Raw proto workflow before conversion",
"index", i,
"workflowName", wf.GetWorkflowName(),
"proto.BinaryUrl", wf.GetBinaryUrl(),
"proto.ConfigUrl", wf.GetConfigUrl(),
"proto.BinaryUrl.len", len(wf.GetBinaryUrl()),
"proto.ConfigUrl.len", len(wf.GetConfigUrl()),
)
view, err := g.toWorkflowMetadataView(wf)
if err != nil {
g.lggr.Warnw("Failed to parse workflow metadata, skipping",
"workflowName", wf.GetWorkflowName(),
"error", err)
continue
}

g.lggr.Debugw("[DEBUG-TRACE] WorkflowMetadataView after conversion",
"workflowID", view.WorkflowID.Hex(),
"workflowName", view.WorkflowName,
"view.BinaryURL", view.BinaryURL,
"view.ConfigURL", view.ConfigURL,
"view.BinaryURL.len", len(view.BinaryURL),
"view.ConfigURL.len", len(view.ConfigURL),
)

allViews = append(allViews, view)
}

Expand Down Expand Up @@ -344,6 +368,15 @@ func (g *GRPCWorkflowSource) toWorkflowMetadataView(wf *pb.WorkflowMetadata) (Wo
// Map proto status enum to internal representation
statusVal := GRPCStatusToInternal(wf.GetStatus(), g.lggr)

g.lggr.Debugw("[DEBUG-TRACE] GRPC source received workflow metadata from proto",
"workflowID", workflowID.Hex(),
"workflowName", wf.GetWorkflowName(),
"wf.GetBinaryUrl()", wf.GetBinaryUrl(),
"wf.GetConfigUrl()", wf.GetConfigUrl(),
"binaryURLEmpty", wf.GetBinaryUrl() == "",
"configURLEmpty", wf.GetConfigUrl() == "",
)

return WorkflowMetadataView{
WorkflowID: workflowID,
Owner: ownerBytes,
Expand Down
38 changes: 38 additions & 0 deletions core/services/workflows/syncer/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
}

wfID := payload.WorkflowID.Hex()

h.lggr.Debugw("[DEBUG-TRACE] Handler received WorkflowActivated event",
"workflowID", wfID,
"workflowName", payload.WorkflowName,
"payload.BinaryURL", payload.BinaryURL,
"payload.ConfigURL", payload.ConfigURL,
)
wfOwner := hex.EncodeToString(payload.WorkflowOwner)
orgID, ferr := h.fetchOrganizationID(ctx, wfOwner)
if ferr != nil {
Expand Down Expand Up @@ -350,8 +357,21 @@ func (h *eventHandler) workflowActivatedEvent(
ctx context.Context,
payload WorkflowActivatedEvent,
) error {
h.lggr.Debugw("[DEBUG-TRACE] workflowActivatedEvent called",
"workflowID", payload.WorkflowID.Hex(),
"payload.BinaryURL", payload.BinaryURL,
"payload.ConfigURL", payload.ConfigURL,
)

// Convert WorkflowActivatedEvent to WorkflowRegisteredEvent since they have identical fields
registeredPayload := WorkflowRegisteredEvent(payload)

h.lggr.Debugw("[DEBUG-TRACE] After conversion to WorkflowRegisteredEvent",
"workflowID", registeredPayload.WorkflowID.Hex(),
"registeredPayload.BinaryURL", registeredPayload.BinaryURL,
"registeredPayload.ConfigURL", registeredPayload.ConfigURL,
)

return h.workflowRegisteredEvent(ctx, registeredPayload)
}

Expand All @@ -364,6 +384,15 @@ func (h *eventHandler) workflowRegisteredEvent(
ctx context.Context,
payload WorkflowRegisteredEvent,
) error {
h.lggr.Debugw("[DEBUG-TRACE] workflowRegisteredEvent called",
"workflowID", payload.WorkflowID.Hex(),
"workflowName", payload.WorkflowName,
"payload.BinaryURL", payload.BinaryURL,
"payload.ConfigURL", payload.ConfigURL,
"binaryURLEmpty", payload.BinaryURL == "",
"configURLEmpty", payload.ConfigURL == "",
)

status := toSpecStatus(payload.Status)

// First, let's synchronize the database state.
Expand Down Expand Up @@ -437,6 +466,15 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR
wfID := payload.WorkflowID.Hex()
owner := hex.EncodeToString(payload.WorkflowOwner)

h.lggr.Debugw("[DEBUG-TRACE] createWorkflowSpec called",
"workflowID", wfID,
"workflowName", payload.WorkflowName,
"payload.BinaryURL", payload.BinaryURL,
"payload.ConfigURL", payload.ConfigURL,
"binaryURLEmpty", payload.BinaryURL == "",
"configURLEmpty", payload.ConfigURL == "",
)

ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: owner, Workflow: wfID})

// With Workflow Registry contract v2 the BinaryURL and ConfigURL are expected to be identifiers that put through the Storage Service.
Expand Down
13 changes: 13 additions & 0 deletions core/services/workflows/syncer/v2/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,13 @@ func (w *workflowRegistry) generateReconciliationEvents(

delete(pendingEvents, id)

w.lggr.Debugw("[DEBUG-TRACE] Creating WorkflowActivatedEvent from WorkflowMetadataView",
"workflowID", wfMeta.WorkflowID.Hex(),
"workflowName", wfMeta.WorkflowName,
"wfMeta.BinaryURL", wfMeta.BinaryURL,
"wfMeta.ConfigURL", wfMeta.ConfigURL,
)

toActivatedEvent := WorkflowActivatedEvent{
WorkflowID: wfMeta.WorkflowID,
WorkflowOwner: wfMeta.Owner,
Expand All @@ -451,6 +458,12 @@ func (w *workflowRegistry) generateReconciliationEvents(
Attributes: wfMeta.Attributes,
Source: wfMeta.Source,
}

w.lggr.Debugw("[DEBUG-TRACE] WorkflowActivatedEvent created",
"workflowID", toActivatedEvent.WorkflowID.Hex(),
"event.BinaryURL", toActivatedEvent.BinaryURL,
"event.ConfigURL", toActivatedEvent.ConfigURL,
)
events = append(events, &reconciliationEvent{
Event: Event{
Data: toActivatedEvent,
Expand Down
Loading