Skip to content

Commit

Permalink
feat(backend): add user defined attrs to spans
Browse files Browse the repository at this point in the history
closes #1740
  • Loading branch information
anupcowkur committed Feb 4, 2025
1 parent 0721674 commit d29f5cb
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 29 deletions.
9 changes: 8 additions & 1 deletion backend/api/filter/appfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,14 @@ func (af *AppFilter) getDeviceNames(ctx context.Context) (deviceNames []string,
// getUDAttrKeys finds distinct user defined attribute
// key and its types.
func (af *AppFilter) getUDAttrKeys(ctx context.Context) (keytypes []event.UDKeyType, err error) {
stmt := sqlf.From("user_def_attrs").
var table_name string
if af.Span {
table_name = "span_user_def_attrs"
} else {
table_name = "user_def_attrs"
}

stmt := sqlf.From(table_name).
Select("distinct key").
Select("toString(type) type").
Clause("prewhere app_id = toUUID(?)", af.AppID).
Expand Down
16 changes: 15 additions & 1 deletion backend/api/measure/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,18 @@ func (e eventreq) validate() error {
if err := e.spans[i].Validate(); err != nil {
return err
}

// only process user defined attributes
// if the payload contains any.
//
// this check is super important to have
// because SDKs without support for user
// defined attributes won't ever send these.
if !e.spans[i].UserDefinedAttribute.Empty() {
if err := e.spans[i].UserDefinedAttribute.Validate(); err != nil {
return err
}
}
}

if e.size >= int64(maxBatchSize) {
Expand Down Expand Up @@ -1227,7 +1239,9 @@ func (e eventreq) ingestSpans(ctx context.Context) error {
Set(`attribute.device_manufacturer`, e.spans[i].Attributes.DeviceManufacturer).
Set(`attribute.device_locale`, e.spans[i].Attributes.DeviceLocale).
Set(`attribute.device_low_power_mode`, e.spans[i].Attributes.LowPowerModeEnabled).
Set(`attribute.device_thermal_throttling_enabled`, e.spans[i].Attributes.ThermalThrottlingEnabled)
Set(`attribute.device_thermal_throttling_enabled`, e.spans[i].Attributes.ThermalThrottlingEnabled).
// user defined attribute
Set(`user_defined_attribute`, e.spans[i].UserDefinedAttribute.Parameterize())
}

return server.Server.ChPool.AsyncInsert(ctx, stmt.String(), false, stmt.Args()...)
Expand Down
74 changes: 51 additions & 23 deletions backend/api/span/span.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package span

import (
"backend/api/event"
"backend/api/filter"
"backend/api/platform"
"backend/api/server"
Expand Down Expand Up @@ -97,17 +98,18 @@ type SpanAttributes struct {
}

type SpanField struct {
AppID uuid.UUID `json:"app_id" binding:"required"`
SpanName string `json:"name" binding:"required"`
SpanID string `json:"span_id" binding:"required"`
ParentID string `json:"parent_id"`
TraceID string `json:"trace_id" binding:"required"`
SessionID uuid.UUID `json:"session_id" binding:"required"`
Status uint8 `json:"status" binding:"required"`
StartTime time.Time `json:"start_time" binding:"required"`
EndTime time.Time `json:"end_time" binding:"required"`
CheckPoints []CheckPointField `json:"checkpoints"`
Attributes SpanAttributes `json:"attributes"`
AppID uuid.UUID `json:"app_id" binding:"required"`
SpanName string `json:"name" binding:"required"`
SpanID string `json:"span_id" binding:"required"`
ParentID string `json:"parent_id"`
TraceID string `json:"trace_id" binding:"required"`
SessionID uuid.UUID `json:"session_id" binding:"required"`
Status uint8 `json:"status" binding:"required"`
StartTime time.Time `json:"start_time" binding:"required"`
EndTime time.Time `json:"end_time" binding:"required"`
CheckPoints []CheckPointField `json:"checkpoints"`
Attributes SpanAttributes `json:"attributes"`
UserDefinedAttribute event.UDAttribute `json:"user_defined_attribute" binding:"required"`
}

type RootSpanDisplay struct {
Expand Down Expand Up @@ -138,6 +140,7 @@ type SpanDisplay struct {
ThreadName string `json:"thread_name"`
LowPowerModeEnabled bool `json:"device_low_power_mode"`
ThermalThrottlingEnabled bool `json:"device_thermal_throttling_enabled"`
UserDefinedAttribute event.UDAttribute `json:"user_defined_attributes"`
CheckPoints []CheckPointField `json:"checkpoints"`
}

Expand Down Expand Up @@ -427,21 +430,12 @@ func GetSpanInstancesWithFilter(ctx context.Context, spanName string, af *filter
Select("attribute.device_manufacturer").
Select("attribute.device_model").
From("spans").
Clause("prewhere app_id = toUUID(?) and span_name = ? and start_time >= ? and end_time <= ?", af.AppID, spanName, af.From, af.To).
OrderBy("start_time desc")
Clause("prewhere app_id = toUUID(?) and span_name = ? and start_time >= ? and end_time <= ?", af.AppID, spanName, af.From, af.To)

if len(af.SpanStatuses) > 0 {
stmt.Where("status").In(af.SpanStatuses)
}

if af.Limit > 0 {
stmt.Limit(uint64(af.Limit) + 1)
}

if af.Offset >= 0 {
stmt.Offset(uint64(af.Offset))
}

if af.HasVersions() {
selectedVersions, err := af.VersionPairs()
if err != nil {
Expand Down Expand Up @@ -488,6 +482,24 @@ func GetSpanInstancesWithFilter(ctx context.Context, spanName string, af *filter
stmt.Where("attribute.device_name in ?", af.DeviceNames)
}

if af.HasUDExpression() && !af.UDExpression.Empty() {
subQuery := sqlf.From("span_user_def_attrs").
Select("span_id id").
Where("app_id = toUUID(?)", af.AppID)
af.UDExpression.Augment(subQuery)
stmt.Clause("AND span_id in").SubQuery("(", ")", subQuery)
}

stmt.OrderBy("start_time desc")

if af.Limit > 0 {
stmt.Limit(uint64(af.Limit) + 1)
}

if af.Offset >= 0 {
stmt.Offset(uint64(af.Offset))
}

defer stmt.Close()

rows, err := server.Server.ChPool.Query(ctx, stmt.String(), stmt.Args()...)
Expand Down Expand Up @@ -590,6 +602,14 @@ func GetSpanMetricsPlotWithFilter(ctx context.Context, spanName string, af *filt
stmt.Where("device_name in ?", af.DeviceNames)
}

if af.HasUDExpression() && !af.UDExpression.Empty() {
subQuery := sqlf.From("span_user_def_attrs").
Select("span_id id").
Where("app_id = toUUID(?)", af.AppID)
af.UDExpression.Augment(subQuery)
stmt.Clause("AND span_id in").SubQuery("(", ")", subQuery)
}

stmt.GroupBy("app_version, datetime")
stmt.OrderBy("datetime, tupleElement(app_version, 2) desc")

Expand Down Expand Up @@ -639,6 +659,7 @@ func GetTrace(ctx context.Context, traceId string) (trace TraceDisplay, err erro
Select("toString(attribute.thread_name)").
Select("attribute.device_low_power_mode").
Select("attribute.device_thermal_throttling_enabled").
Select("user_defined_attribute").
From("spans").
Where("trace_id = ?", traceId).
OrderBy("start_time desc")
Expand All @@ -654,9 +675,10 @@ func GetTrace(ctx context.Context, traceId string) (trace TraceDisplay, err erro

for rows.Next() {
var rawCheckpoints [][]interface{}
var rawUserDefAttr map[string][]any
span := SpanField{}

if err = rows.Scan(&span.AppID, &span.TraceID, &span.SessionID, &span.Attributes.UserID, &span.SpanID, &span.SpanName, &span.ParentID, &span.StartTime, &span.EndTime, &span.Status, &rawCheckpoints, &span.Attributes.AppVersion, &span.Attributes.AppBuild, &span.Attributes.OSName, &span.Attributes.OSVersion, &span.Attributes.DeviceManufacturer, &span.Attributes.DeviceModel, &span.Attributes.NetworkType, &span.Attributes.ThreadName, &span.Attributes.LowPowerModeEnabled, &span.Attributes.ThermalThrottlingEnabled); err != nil {
if err = rows.Scan(&span.AppID, &span.TraceID, &span.SessionID, &span.Attributes.UserID, &span.SpanID, &span.SpanName, &span.ParentID, &span.StartTime, &span.EndTime, &span.Status, &rawCheckpoints, &span.Attributes.AppVersion, &span.Attributes.AppBuild, &span.Attributes.OSName, &span.Attributes.OSVersion, &span.Attributes.DeviceManufacturer, &span.Attributes.DeviceModel, &span.Attributes.NetworkType, &span.Attributes.ThreadName, &span.Attributes.LowPowerModeEnabled, &span.Attributes.ThermalThrottlingEnabled, &rawUserDefAttr); err != nil {
fmt.Println(err)
return
}
Expand All @@ -665,7 +687,12 @@ func GetTrace(ctx context.Context, traceId string) (trace TraceDisplay, err erro
return
}

// Map rawCheckpoints into CheckPointField
// Map rawUserDefAttr
if len(rawUserDefAttr) > 0 {
span.UserDefinedAttribute.Scan(rawUserDefAttr)
}

// Map rawCheckpoints
for _, cp := range rawCheckpoints {
rawName, _ := cp[0].(string)
name := strings.ReplaceAll(rawName, "\u0000", "")
Expand Down Expand Up @@ -699,6 +726,7 @@ func GetTrace(ctx context.Context, traceId string) (trace TraceDisplay, err erro
span.Attributes.ThreadName,
span.Attributes.LowPowerModeEnabled,
span.Attributes.ThermalThrottlingEnabled,
span.UserDefinedAttribute,
span.CheckPoints,
}

Expand Down
28 changes: 28 additions & 0 deletions backend/cleanup/cleanup/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func DeleteStaleData(ctx context.Context) {
// delete user defined attributes
deleteUserDefAttrs(ctx, appRetentions)

// delete span user defined attributes
deleteSpanUserDefAttrs(ctx, appRetentions)

// delete sessions
deleteSessions(ctx, appRetentions)

Expand Down Expand Up @@ -260,6 +263,31 @@ func deleteUserDefAttrs(ctx context.Context, retentions []AppRetention) {
}
}

// deleteSpanUserDefAttrs deletes stale span user defined attributes
// for each app's retention threshold.
func deleteSpanUserDefAttrs(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
stmt := sqlf.
DeleteFrom("span_user_def_attrs").
Where("app_id = toUUID(?)", retention.AppID).
Where("end_of_month < ?", retention.Threshold)

if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale span user defined attributes for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

stmt.Close()
}

if errCount < 1 {
fmt.Println("Successfully deleted stale span user defined attributes")
}
}

// deleteSessions deletes stale sessions for each
// app's retention threshold.
func deleteSessions(ctx context.Context, retentions []AppRetention) {
Expand Down
2 changes: 1 addition & 1 deletion frontend/dashboard/app/[teamId]/traces/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export default function TracesOverview({ params }: { params: { teamId: string }
showLocales={true}
showDeviceManufacturers={true}
showDeviceNames={true}
showUdAttrs={false}
showUdAttrs={true}
showFreeText={false}
onFiltersChanged={(updatedFilters) => setFilters(updatedFilters)} />
<div className="py-4" />
Expand Down
1 change: 1 addition & 0 deletions frontend/dashboard/app/api/api_calls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ export const emptyTrace = {
"end_time": "",
"duration": 0,
"thread_name": "",
"user_defined_attributes": null,
"checkpoints": [
{
"name": "",
Expand Down
7 changes: 7 additions & 0 deletions frontend/dashboard/app/components/trace_viz.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ interface Span {
leftOffset?: number
width?: number
visibility?: SpanVisibility
user_defined_attributes?: Map<string, string> | null
checkpoints: Checkpoint[] | null
}

Expand Down Expand Up @@ -289,6 +290,12 @@ const TraceViz: React.FC<TraceVizProps> = ({ inputTrace }) => {
<p className={keyStyle}>Span Status</p>
<p className={`${valueStyle} ${selectedSpan.status === 1 ? "text-green-300" : selectedSpan.status === 2 ? "text-red-300" : ""}`}> {selectedSpan.status === 0 ? "Unset" : selectedSpan.status === 1 ? "Okay" : "Error"}</p>
</div>
{selectedSpan.user_defined_attributes !== null && selectedSpan.user_defined_attributes !== undefined && Object.entries(selectedSpan.user_defined_attributes!).map(([attrKey, attrValue]) => (
<div className="flex flex-row mt-1" key={attrKey}>
<p className={keyStyle}>{attrKey}</p>
<p className={valueStyle}>{attrValue?.toString()}</p>
</div>
))}
<div className='flex flex-row mt-1'>
<p className={keyStyle}>Checkpoints</p>
<p className={`${valueStyle}`}>{selectedSpan.checkpoints !== null && selectedSpan.checkpoints.length > 0 ? ": " : ": []"}</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ const UserDefAttrSelector: React.FC<UserDefAttrSelectorProps> = ({ attrs, ops, o
</div>

{isOpen && (
<div className="z-50 origin-top-right absolute right-0 mt-2 w-[600px] max-h-96 overflow-auto rounded-md shadow-lg ring-1 ring-black ring-opacity-5">
<div className={`z-50 origin-top-right absolute mt-2 w-[600px] max-h-96 overflow-auto rounded-md shadow-lg ring-1 ring-black ring-opacity-5 ${dropdownRef.current && dropdownRef.current.getBoundingClientRect().left > window.innerWidth / 2 ? 'right-0' : 'left-0'}`}>
<div
role="menu"
aria-orientation="vertical"
Expand Down
15 changes: 15 additions & 0 deletions self-host/clickhouse/20250204070350_alter_spans_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- migrate:up
alter table spans
add column if not exists user_defined_attribute Map(LowCardinality(String), Tuple(Enum('string' = 1, 'int64', 'float64', 'bool'), String)) codec(ZSTD(3)) after `attribute.device_thermal_throttling_enabled`,
comment column if exists user_defined_attribute 'user defined attributes',
add index if not exists user_defined_attribute_key_bloom_idx mapKeys(user_defined_attribute) type bloom_filter(0.01) granularity 16,
add index if not exists user_defined_attribute_key_minmax_idx mapKeys(user_defined_attribute) type minmax granularity 16,
materialize index if exists user_defined_attribute_key_bloom_idx,
materialize index if exists user_defined_attribute_key_minmax_idx;


-- migrate:down
alter table spans
drop column if exists user_defined_attribute,
drop index if exists user_defined_attribute_key_bloom_idx,
drop index if exists user_defined_attribute_key_minmax_idx;
27 changes: 27 additions & 0 deletions self-host/clickhouse/20250204070357_create_span_user_def_attrs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- migrate:up
create table if not exists span_user_def_attrs
(
`app_id` UUID not null comment 'associated app id' codec(LZ4),
`span_id` FixedString(16) not null comment 'id of the span' codec(ZSTD(3)),
`session_id` UUID not null comment 'id of the session' codec(LZ4),
`end_of_month` DateTime not null comment 'last day of the month' codec(DoubleDelta, ZSTD(3)),
`app_version` Tuple(LowCardinality(String), LowCardinality(String)) not null comment 'composite app version' codec(ZSTD(3)),
`os_version` Tuple(LowCardinality(String), LowCardinality(String)) comment 'composite os version' codec (ZSTD(3)),
`key` LowCardinality(String) comment 'key of the user defined attribute' codec (ZSTD(3)),
`type` Enum('string' = 1, 'int64', 'float64', 'bool') comment 'type of the user defined attribute' codec (ZSTD(3)),
`value` String comment 'value of the user defined attribute' codec (ZSTD(3)),
index end_of_month_minmax_idx end_of_month type minmax granularity 2,
index key_bloom_idx key type bloom_filter(0.05) granularity 1,
index key_set_idx key type set(1000) granularity 2,
index session_bloom_idx session_id type bloom_filter granularity 2
)
engine = ReplacingMergeTree
partition by toYYYYMM(end_of_month)
order by (app_id, end_of_month, app_version, os_version, key, type, value, span_id, session_id)
settings index_granularity = 8192
comment 'derived span user defined attributes';


-- migrate:down
drop table if exists span_user_def_attrs;

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- migrate:up
create materialized view span_user_def_attrs_mv to span_user_def_attrs as
select distinct app_id,
span_id,
session_id,
toLastDayOfMonth(start_time) as end_of_month,
attribute.app_version as app_version,
attribute.os_version as os_version,
arr_key as key,
tupleElement(arr_val, 1) as type,
tupleElement(arr_val, 2) as value
from spans
array join
mapKeys(user_defined_attribute) as arr_key,
mapValues(user_defined_attribute) as arr_val
where length(user_defined_attribute) > 0
group by app_id, end_of_month, app_version, os_version,
key, type, value, span_id, session_id
order by app_id;


-- migrate:down
drop view if exists span_user_def_attrs_mv;
Loading

0 comments on commit d29f5cb

Please sign in to comment.