Skip to content

Commit 58d980e

Browse files
[FEATURE] [internal] adds metrics for ingestion phases (#351)
* adds metrics for ingestion phases * fixes formatting for metrics * removes current account counters * adds counter for operations processed
1 parent c4968d6 commit 58d980e

File tree

6 files changed

+303
-14
lines changed

6 files changed

+303
-14
lines changed

internal/indexer/indexer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type IndexerBufferInterface interface {
2525
GetTransactionsParticipants() map[string]set.Set[string]
2626
GetOperationsParticipants() map[int64]set.Set[string]
2727
GetNumberOfTransactions() int
28+
GetNumberOfOperations() int
2829
GetTransactions() []types.Transaction
2930
GetOperations() []types.Operation
3031
GetStateChanges() []types.StateChange

internal/indexer/indexer_buffer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ func (b *IndexerBuffer) GetNumberOfTransactions() int {
105105
return len(b.txByHash)
106106
}
107107

108+
// GetNumberOfOperations returns the count of unique operations in the buffer.
109+
// Thread-safe: uses read lock.
110+
func (b *IndexerBuffer) GetNumberOfOperations() int {
111+
b.mu.RLock()
112+
defer b.mu.RUnlock()
113+
114+
return len(b.opByID)
115+
}
116+
108117
// GetTransactions returns all unique transactions.
109118
// Thread-safe: uses read lock.
110119
func (b *IndexerBuffer) GetTransactions() []types.Transaction {

internal/metrics/metrics.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ type MetricsService interface {
3636
ObserveDBTransactionDuration(status string, duration float64)
3737
ObserveDBBatchSize(operation, table string, size int)
3838
IncSignatureVerificationExpired(expiredSeconds float64)
39+
// Ingestion Phase Metrics
40+
ObserveIngestionPhaseDuration(phase string, duration float64)
41+
IncIngestionLedgersProcessed(count int)
42+
IncIngestionTransactionsProcessed(count int)
43+
IncIngestionOperationsProcessed(count int)
44+
ObserveIngestionBatchSize(size int)
45+
ObserveIngestionParticipantsCount(count int)
3946
// GraphQL Metrics
4047
ObserveGraphQLFieldDuration(operationName, fieldName string, duration float64)
4148
IncGraphQLField(operationName, fieldName string, success bool)
@@ -83,6 +90,14 @@ type metricsService struct {
8390
// Signature Verification Metrics
8491
signatureVerificationExpired *prometheus.CounterVec
8592

93+
// Ingestion Phase Metrics
94+
ingestionPhaseDuration *prometheus.SummaryVec
95+
ingestionLedgersProcessed prometheus.Counter
96+
ingestionTransactionsTotal prometheus.Counter
97+
ingestionOperationsTotal prometheus.Counter
98+
ingestionBatchSize prometheus.Histogram
99+
ingestionParticipantsCount prometheus.Histogram
100+
86101
// GraphQL Metrics
87102
graphqlFieldDuration *prometheus.SummaryVec
88103
graphqlFieldsTotal *prometheus.CounterVec
@@ -261,6 +276,48 @@ func NewMetricsService(db *sqlx.DB) MetricsService {
261276
[]string{"expired_seconds"},
262277
)
263278

279+
// Ingestion Phase Metrics
280+
m.ingestionPhaseDuration = prometheus.NewSummaryVec(
281+
prometheus.SummaryOpts{
282+
Name: "ingestion_phase_duration_seconds",
283+
Help: "Duration of each ingestion phase",
284+
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
285+
},
286+
[]string{"phase"},
287+
)
288+
m.ingestionLedgersProcessed = prometheus.NewCounter(
289+
prometheus.CounterOpts{
290+
Name: "ingestion_ledgers_processed_total",
291+
Help: "Total number of ledgers processed during ingestion",
292+
},
293+
)
294+
m.ingestionTransactionsTotal = prometheus.NewCounter(
295+
prometheus.CounterOpts{
296+
Name: "ingestion_transactions_processed_total",
297+
Help: "Total number of transactions processed during ingestion",
298+
},
299+
)
300+
m.ingestionOperationsTotal = prometheus.NewCounter(
301+
prometheus.CounterOpts{
302+
Name: "ingestion_operations_processed_total",
303+
Help: "Total number of operations processed during ingestion",
304+
},
305+
)
306+
m.ingestionBatchSize = prometheus.NewHistogram(
307+
prometheus.HistogramOpts{
308+
Name: "ingestion_batch_size",
309+
Help: "Number of ledgers processed per ingestion batch",
310+
Buckets: prometheus.ExponentialBuckets(1, 2, 8), // 1, 2, 4, 8, 16, 32, 64, 128
311+
},
312+
)
313+
m.ingestionParticipantsCount = prometheus.NewHistogram(
314+
prometheus.HistogramOpts{
315+
Name: "ingestion_participants_count",
316+
Help: "Number of unique participants per ingestion batch",
317+
Buckets: prometheus.ExponentialBuckets(1, 2, 12), // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048
318+
},
319+
)
320+
264321
// GraphQL Metrics
265322
m.graphqlFieldDuration = prometheus.NewSummaryVec(
266323
prometheus.SummaryOpts{
@@ -322,6 +379,12 @@ func (m *metricsService) registerMetrics() {
322379
m.dbTxnDuration,
323380
m.dbBatchSize,
324381
m.signatureVerificationExpired,
382+
m.ingestionPhaseDuration,
383+
m.ingestionLedgersProcessed,
384+
m.ingestionTransactionsTotal,
385+
m.ingestionOperationsTotal,
386+
m.ingestionBatchSize,
387+
m.ingestionParticipantsCount,
325388
m.graphqlFieldDuration,
326389
m.graphqlFieldsTotal,
327390
m.graphqlComplexity,
@@ -514,6 +577,31 @@ func (m *metricsService) IncSignatureVerificationExpired(expiredSeconds float64)
514577
m.signatureVerificationExpired.WithLabelValues(fmt.Sprintf("%fs", expiredSeconds)).Inc()
515578
}
516579

580+
// Ingestion Phase Metrics
581+
func (m *metricsService) ObserveIngestionPhaseDuration(phase string, duration float64) {
582+
m.ingestionPhaseDuration.WithLabelValues(phase).Observe(duration)
583+
}
584+
585+
func (m *metricsService) IncIngestionLedgersProcessed(count int) {
586+
m.ingestionLedgersProcessed.Add(float64(count))
587+
}
588+
589+
func (m *metricsService) IncIngestionTransactionsProcessed(count int) {
590+
m.ingestionTransactionsTotal.Add(float64(count))
591+
}
592+
593+
func (m *metricsService) IncIngestionOperationsProcessed(count int) {
594+
m.ingestionOperationsTotal.Add(float64(count))
595+
}
596+
597+
func (m *metricsService) ObserveIngestionBatchSize(size int) {
598+
m.ingestionBatchSize.Observe(float64(size))
599+
}
600+
601+
func (m *metricsService) ObserveIngestionParticipantsCount(count int) {
602+
m.ingestionParticipantsCount.Observe(float64(count))
603+
}
604+
517605
// GraphQL Metrics
518606
func (m *metricsService) ObserveGraphQLFieldDuration(operationName, fieldName string, duration float64) {
519607
m.graphqlFieldDuration.WithLabelValues(operationName, fieldName).Observe(duration)

internal/metrics/metrics_test.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,153 @@ func TestDBBatchSizeMetrics(t *testing.T) {
461461
})
462462
}
463463

464+
func TestIngestionPhaseMetrics(t *testing.T) {
465+
db := setupTestDB(t)
466+
defer db.Close()
467+
468+
ms := NewMetricsService(db)
469+
470+
t.Run("ingestion phase duration metrics", func(t *testing.T) {
471+
// Record durations for different phases
472+
ms.ObserveIngestionPhaseDuration("fetch_ledgers", 0.5)
473+
ms.ObserveIngestionPhaseDuration("collect_transaction_data", 1.2)
474+
ms.ObserveIngestionPhaseDuration("fetch_existing_accounts", 0.3)
475+
ms.ObserveIngestionPhaseDuration("process_and_buffer", 2.1)
476+
ms.ObserveIngestionPhaseDuration("merge_buffers", 0.1)
477+
ms.ObserveIngestionPhaseDuration("db_insertion", 1.5)
478+
479+
metricFamilies, err := ms.GetRegistry().Gather()
480+
require.NoError(t, err)
481+
482+
found := false
483+
for _, mf := range metricFamilies {
484+
if mf.GetName() == "ingestion_phase_duration_seconds" {
485+
found = true
486+
// Should have 6 metrics (one for each phase)
487+
assert.Equal(t, 6, len(mf.GetMetric()))
488+
489+
// Verify each phase is recorded
490+
phaseLabels := make(map[string]bool)
491+
for _, metric := range mf.GetMetric() {
492+
labels := make(map[string]string)
493+
for _, label := range metric.GetLabel() {
494+
labels[label.GetName()] = label.GetValue()
495+
}
496+
phaseLabels[labels["phase"]] = true
497+
assert.Equal(t, uint64(1), metric.GetSummary().GetSampleCount())
498+
}
499+
500+
assert.True(t, phaseLabels["fetch_ledgers"])
501+
assert.True(t, phaseLabels["collect_transaction_data"])
502+
assert.True(t, phaseLabels["fetch_existing_accounts"])
503+
assert.True(t, phaseLabels["process_and_buffer"])
504+
assert.True(t, phaseLabels["merge_buffers"])
505+
assert.True(t, phaseLabels["db_insertion"])
506+
}
507+
}
508+
assert.True(t, found)
509+
})
510+
511+
t.Run("ingestion ledgers processed counter", func(t *testing.T) {
512+
ms.IncIngestionLedgersProcessed(10)
513+
ms.IncIngestionLedgersProcessed(5)
514+
515+
metricFamilies, err := ms.GetRegistry().Gather()
516+
require.NoError(t, err)
517+
518+
found := false
519+
for _, mf := range metricFamilies {
520+
if mf.GetName() == "ingestion_ledgers_processed_total" {
521+
found = true
522+
metric := mf.GetMetric()[0]
523+
assert.Equal(t, float64(15), metric.GetCounter().GetValue())
524+
}
525+
}
526+
assert.True(t, found)
527+
})
528+
529+
t.Run("ingestion transactions processed counter", func(t *testing.T) {
530+
ms.IncIngestionTransactionsProcessed(100)
531+
ms.IncIngestionTransactionsProcessed(50)
532+
533+
metricFamilies, err := ms.GetRegistry().Gather()
534+
require.NoError(t, err)
535+
536+
found := false
537+
for _, mf := range metricFamilies {
538+
if mf.GetName() == "ingestion_transactions_processed_total" {
539+
found = true
540+
metric := mf.GetMetric()[0]
541+
assert.Equal(t, float64(150), metric.GetCounter().GetValue())
542+
}
543+
}
544+
assert.True(t, found)
545+
})
546+
547+
t.Run("ingestion operations processed counter", func(t *testing.T) {
548+
ms.IncIngestionOperationsProcessed(200)
549+
ms.IncIngestionOperationsProcessed(75)
550+
551+
metricFamilies, err := ms.GetRegistry().Gather()
552+
require.NoError(t, err)
553+
554+
found := false
555+
for _, mf := range metricFamilies {
556+
if mf.GetName() == "ingestion_operations_processed_total" {
557+
found = true
558+
metric := mf.GetMetric()[0]
559+
assert.Equal(t, float64(275), metric.GetCounter().GetValue())
560+
}
561+
}
562+
assert.True(t, found)
563+
})
564+
565+
t.Run("ingestion batch size histogram", func(t *testing.T) {
566+
// Record various batch sizes
567+
ms.ObserveIngestionBatchSize(1)
568+
ms.ObserveIngestionBatchSize(10)
569+
ms.ObserveIngestionBatchSize(50)
570+
ms.ObserveIngestionBatchSize(25)
571+
572+
metricFamilies, err := ms.GetRegistry().Gather()
573+
require.NoError(t, err)
574+
575+
found := false
576+
for _, mf := range metricFamilies {
577+
if mf.GetName() == "ingestion_batch_size" {
578+
found = true
579+
metric := mf.GetMetric()[0]
580+
histogram := metric.GetHistogram()
581+
assert.Equal(t, uint64(4), histogram.GetSampleCount())
582+
assert.Equal(t, float64(86), histogram.GetSampleSum()) // 1 + 10 + 50 + 25 = 86
583+
}
584+
}
585+
assert.True(t, found)
586+
})
587+
588+
t.Run("ingestion participants count histogram", func(t *testing.T) {
589+
// Record various participant counts
590+
ms.ObserveIngestionParticipantsCount(5)
591+
ms.ObserveIngestionParticipantsCount(100)
592+
ms.ObserveIngestionParticipantsCount(500)
593+
594+
metricFamilies, err := ms.GetRegistry().Gather()
595+
require.NoError(t, err)
596+
597+
found := false
598+
for _, mf := range metricFamilies {
599+
if mf.GetName() == "ingestion_participants_count" {
600+
found = true
601+
metric := mf.GetMetric()[0]
602+
histogram := metric.GetHistogram()
603+
assert.Equal(t, uint64(3), histogram.GetSampleCount())
604+
assert.Equal(t, float64(605), histogram.GetSampleSum()) // 5 + 100 + 500 = 605
605+
}
606+
}
607+
assert.True(t, found)
608+
})
609+
}
610+
464611
func TestPoolMetrics(t *testing.T) {
465612
db := setupTestDB(t)
466613
defer db.Close()

internal/metrics/mocks.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,32 @@ func (m *MockMetricsService) IncSignatureVerificationExpired(expiredSeconds floa
113113
m.Called(expiredSeconds)
114114
}
115115

116+
// Ingestion Phase Metrics
117+
func (m *MockMetricsService) ObserveIngestionPhaseDuration(phase string, duration float64) {
118+
m.Called(phase, duration)
119+
}
120+
121+
func (m *MockMetricsService) IncIngestionLedgersProcessed(count int) {
122+
m.Called(count)
123+
}
124+
125+
func (m *MockMetricsService) IncIngestionTransactionsProcessed(count int) {
126+
m.Called(count)
127+
}
128+
129+
func (m *MockMetricsService) IncIngestionOperationsProcessed(count int) {
130+
m.Called(count)
131+
}
132+
133+
func (m *MockMetricsService) ObserveIngestionBatchSize(size int) {
134+
m.Called(size)
135+
}
136+
137+
func (m *MockMetricsService) ObserveIngestionParticipantsCount(count int) {
138+
m.Called(count)
139+
}
140+
141+
// GraphQL Metrics
116142
func (m *MockMetricsService) ObserveGraphQLFieldDuration(operationName, fieldName string, duration float64) {
117143
m.Called(operationName, fieldName, duration)
118144
}

0 commit comments

Comments
 (0)