Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use official cloud event proto schema #4487

Merged
merged 17 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentWorker.cs

namespace Microsoft.AutoGen.Abstractions;

public interface IAgentWorker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static CloudEvent ToCloudEvent<T>(this T message, string source) where T
Type = message.Descriptor.FullName,
Source = source,
Id = Guid.NewGuid().ToString(),
SpecVersion = "1.0",
Attributes = { { "datacontenttype", new CloudEvent.Types.CloudEventAttributeValue { CeString = PROTO_DATA_CONTENT_TYPE } } }
};
}
Expand Down
2 changes: 1 addition & 1 deletion dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected internal async Task HandleRpcMessage(Message msg, CancellationToken ca
{
case Message.MessageOneofCase.CloudEvent:
{
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Metadata);
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Attributes);
await this.InvokeWithActivityAsync(
static ((AgentBase Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.CallHandler(state.Item),
(this, msg.CloudEvent),
Expand Down
7 changes: 7 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// AgentBaseExtensions.cs

using System.Diagnostics;
using Google.Protobuf.Collections;
using static Microsoft.AutoGen.Abstractions.CloudEvent.Types;

namespace Microsoft.AutoGen.Agents;

Expand Down Expand Up @@ -57,6 +59,11 @@ public static class AgentBaseExtensions
return activity;
}

public static Activity? ExtractActivity(this AgentBase agent, string activityName, MapField<string, CloudEventAttributeValue> metadata)
{
return ExtractActivity(agent, activityName, metadata.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.CeString));
}

/// <summary>
/// Invokes a function asynchronously within the context of an <see cref="Activity"/>.
/// </summary>
Expand Down
42 changes: 41 additions & 1 deletion dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// AgentRuntime.cs

using System.Diagnostics;
using Google.Protobuf.Collections;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.Logging;
using static Microsoft.AutoGen.Abstractions.CloudEvent.Types;

namespace Microsoft.AutoGen.Agents;

Expand All @@ -28,13 +30,38 @@ internal sealed class AgentRuntime(AgentId agentId, IAgentWorker worker, ILogger
out var traceState);
return (traceParent, traceState);
}
public (string?, string?) GetTraceIdAndState(MapField<string, CloudEventAttributeValue> metadata)
{
DistributedContextPropagator.ExtractTraceIdAndState(metadata,
static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (MapField<string, CloudEventAttributeValue>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out var ceValue);
fieldValue = ceValue?.CeString;
},
out var traceParent,
out var traceState);
return (traceParent, traceState);
}
public void Update(RpcRequest request, Activity? activity = null)
{
DistributedContextPropagator.Inject(activity, request.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
}
public void Update(CloudEvent cloudEvent, Activity? activity = null)
{
DistributedContextPropagator.Inject(activity, cloudEvent.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
DistributedContextPropagator.Inject(activity, cloudEvent.Attributes, static (carrier, key, value) =>
{
var mapField = (MapField<string, CloudEventAttributeValue>)carrier!;
if (mapField.TryGetValue(key, out var ceValue))
{
mapField[key] = new CloudEventAttributeValue { CeString = value };
}
else
{
mapField.Add(key, new CloudEventAttributeValue { CeString = value });
}
});
}
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -73,4 +100,17 @@ public IDictionary<string, string> ExtractMetadata(IDictionary<string, string> m

return baggage as IDictionary<string, string> ?? new Dictionary<string, string>();
}

public IDictionary<string, string> ExtractMetadata(MapField<string, CloudEventAttributeValue> metadata)
{
var baggage = DistributedContextPropagator.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (MapField<string, CloudEventAttributeValue>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out var ceValue);
fieldValue = ceValue?.CeString;
});

return baggage as IDictionary<string, string> ?? new Dictionary<string, string>();
}
}
2 changes: 1 addition & 1 deletion protos/agent_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ message Message {
oneof message {
RpcRequest request = 1;
RpcResponse response = 2;
cloudevent.CloudEvent cloudEvent = 3;
io.cloudevents.v1.CloudEvent cloudEvent = 3;
RegisterAgentTypeRequest registerAgentTypeRequest = 4;
RegisterAgentTypeResponse registerAgentTypeResponse = 5;
AddSubscriptionRequest addSubscriptionRequest = 6;
Expand Down
22 changes: 16 additions & 6 deletions protos/cloudevent.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
// https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.proto

/**
* CloudEvent Protobuf Format
*
* - Required context attributes are explicitly represented.
* - Optional and Extension context attributes are carried in a map structure.
* - Data may be represented as binary, text, or protobuf messages.
*/

syntax = "proto3";

package cloudevent;
package io.cloudevents.v1;

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
Expand All @@ -20,12 +30,12 @@ message CloudEvent {

// Optional & Extension Attributes
map<string, CloudEventAttributeValue> attributes = 5;
map<string, string> metadata = 6;

// -- CloudEvent Data (Bytes, Text, or Proto)
oneof data {
bytes binary_data = 7;
string text_data = 8;
google.protobuf.Any proto_data = 9;
bytes binary_data = 6;
string text_data = 7;
google.protobuf.Any proto_data = 8;
}

/**
Expand All @@ -45,4 +55,4 @@ message CloudEvent {
google.protobuf.Timestamp ce_timestamp = 7;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,38 @@ async def _process_event(self, event: cloudevent_pb2.CloudEvent) -> None:
agent = await self._get_agent(agent_id)
with MessageHandlerContext.populate_context(agent.id):

def stringify_attributes(
attributes: Mapping[str, cloudevent_pb2.CloudEvent.CloudEventAttributeValue],
) -> Mapping[str, str]:
result: Dict[str, str] = {}
for key, value in attributes.items():
item = None
match value.WhichOneof("attr"):
case "ce_boolean":
item = str(value.ce_boolean)
case "ce_integer":
item = str(value.ce_integer)
case "ce_string":
item = value.ce_string
case "ce_bytes":
item = str(value.ce_bytes)
case "ce_uri":
item = value.ce_uri
case "ce_uri_ref":
item = value.ce_uri_ref
case "ce_timestamp":
item = str(value.ce_timestamp)
case _:
raise ValueError("Unknown attribute kind")
result[key] = item

return result

async def send_message(agent: Agent, message_context: MessageContext) -> Any:
with self._trace_helper.trace_block(
"process",
agent.id,
parent=event.metadata,
parent=stringify_attributes(event.attributes),
extraAttributes={"message_type": message_type},
):
await agent.on_message(message, ctx=message_context)
Expand Down
Loading
Loading