Skip to content

Commit

Permalink
apply @RomainMuller suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Eliott Bouhana <[email protected]>
  • Loading branch information
eliottness committed Feb 3, 2025
1 parent cd86428 commit 373e2fd
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 26 deletions.
39 changes: 30 additions & 9 deletions internal/newtelemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,22 @@ func (c *client) Config() ClientConfig {
return c.clientConfig
}

// Flush sends all the data sources before calling flush
// This function is called by the flushTicker so it should not panic, or it will crash the whole customer application.
// If a panic occurs, we stop the telemetry and log the error.
func (c *client) Flush() {
var payloads []transport.Payload
defer func() {
r := recover()
if r == nil {
return
}
log.Warn("panic while flushing telemetry data, stopping telemetry: %v", r)
if gc, ok := GlobalClient().(*client); ok && gc == c {
SwapClient(nil)
}
}()

payloads := make([]transport.Payload, 0, 8)
for _, ds := range c.dataSources {
if payload := ds.Payload(); payload != nil {
payloads = append(payloads, payload)
Expand All @@ -203,21 +217,28 @@ func (c *client) Flush() {
_, _ = c.flush(payloads)
}

func (c *client) transform(payloads []transport.Payload) []transport.Payload {
c.flushMapperMu.Lock()
defer c.flushMapperMu.Unlock()
payloads, c.flushMapper = c.flushMapper.Transform(payloads)
return payloads
}

// flush sends all the data sources to the writer by let them flow through the given transformer function.
// The transformer function is used to transform the bodies before sending them to the writer.
func (c *client) flush(payloads []transport.Payload) (int, error) {
// Transform the bodies
{
c.flushMapperMu.Lock()
payloads, c.flushMapper = c.flushMapper.Transform(payloads)
c.flushMapperMu.Unlock()
}
payloads = c.transform(payloads)

if c.payloadQueue.IsEmpty() && len(payloads) == 0 {
c.flushTicker.DecreaseSpeed()
return 0, nil
}

if c.payloadQueue.IsEmpty() {
c.flushTicker.CanDecreaseSpeed()
} else if c.payloadQueue.IsFull() {
c.flushTicker.CanIncreaseSpeed()
}

// We enqueue the new payloads to preserve the order of the payloads
c.payloadQueue.Enqueue(payloads...)
payloads = c.payloadQueue.Flush()
Expand All @@ -243,7 +264,7 @@ func (c *client) flush(payloads []transport.Payload) (int, error) {

if !speedIncreased && successfulCall.PayloadByteSize > c.clientConfig.EarlyFlushPayloadSize {
// We increase the speed of the flushTicker to try to flush the remaining bodies faster as we are at risk of sending too large bodies to the backend
c.flushTicker.IncreaseSpeed()
c.flushTicker.CanIncreaseSpeed()
speedIncreased = true
}

Expand Down
14 changes: 7 additions & 7 deletions internal/newtelemetry/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,9 @@ type ClientConfig struct {
FlushInterval Range[time.Duration]

// PayloadQueueSize is the size of the payload queue.
// This means that, by default, we incur dataloss if we spend ~30mins without flushing, considering we send telemetry data this looks reasonable.
// This also means that in the worst case scenario, memory-wise, the app is stabilized after running for 30mins.
// Ideally both values should be power of 2 because of the way the ring queue is implemented as it's growing
PayloadQueueSize Range[int]

// DistributionsSize is the size of the distribution queue.
// Default max size is a 2^14 array of float64 (2^3 bytes) which makes a distribution 128KB bytes array _at worse_.
// Considering we add a point per user request on a simple http server, we would be losing data after 2^14 requests per minute or about 280 requests per second or under 3ms per request.
// If this throughput is constant, the telemetry client flush ticker speed will increase to, at best, double twice to flush 15 seconds of data each time.
// Which will bring our max throughput to 1100 points per second or about 750µs per request.
DistributionsSize Range[int]

// Debug enables debug mode for the telemetry clientt and sent it to the backend so it logs the request
Expand Down Expand Up @@ -114,12 +107,19 @@ var (
maxPayloadSize = 5 * 1024 * 1024 // 5MB

// TODO: tweak this value once we get real telemetry data from the telemetry client
// This means that, by default, we incur dataloss if we spend ~30mins without flushing, considering we send telemetry data this looks reasonable.
// This also means that in the worst case scenario, memory-wise, the app is stabilized after running for 30mins.
// Ideally both values should be power of 2 because of the way the ring queue is implemented as it's growing
defaultPayloadQueueSize = Range[int]{
Min: 4,
Max: 32,
}

// TODO: tweak this value once we get telemetry data from the telemetry client
// Default max size is a 2^14 array of float64 (2^3 bytes) which makes a distribution 128KB bytes array _at worse_.
// Considering we add a point per user request on a simple http server, we would be losing data after 2^14 requests per minute or about 280 requests per second or under 3ms per request.
// If this throughput is constant, the telemetry client flush ticker speed will increase to, at best, double twice to flush 15 seconds of data each time.
// Which will bring our max throughput to 1100 points per second or about 750µs per request.
distributionsSize = Range[int]{
Min: 1 << 8,
Max: 1 << 14,
Expand Down
6 changes: 3 additions & 3 deletions internal/newtelemetry/internal/mapper/app_started.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func (t *appStartedReducer) Transform(payloads []transport.Payload) ([]transport

payloadLefts := make([]transport.Payload, 0, len(payloads))
for _, payload := range payloads {
switch payload.(type) {
switch payload := payload.(type) {
case transport.AppClientConfigurationChange:
appStarted.Configuration = payload.(transport.AppClientConfigurationChange).Configuration
appStarted.Configuration = payload.Configuration
case transport.AppProductChange:
appStarted.Products = payload.(transport.AppProductChange).Products
appStarted.Products = payload.Products
default:
payloadLefts = append(payloadLefts, payload)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/newtelemetry/internal/mapper/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,17 @@ func (t *heartbeatEnricher) Transform(payloads []transport.Payload) ([]transport
// Composition described here:
// https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/main/GeneratedDocumentation/ApiDocs/v2/producing-telemetry.md#app-extended-heartbeat
for _, payload := range payloads {
switch p := payload.(type) {
switch payload := payload.(type) {
case transport.AppStarted:
// Should be sent only once anyway
t.extendedHeartbeat.Configuration = p.Configuration
t.extendedHeartbeat.Configuration = payload.Configuration
case transport.AppDependenciesLoaded:
if t.extendedHeartbeat.Dependencies == nil {
t.extendedHeartbeat.Dependencies = p.Dependencies
t.extendedHeartbeat.Dependencies = payload.Dependencies
}
case transport.AppIntegrationChange:
// The number of integrations should be small enough so we can just append to the list
t.extendedHeartbeat.Integrations = append(t.extendedHeartbeat.Integrations, p.Integrations...)
t.extendedHeartbeat.Integrations = append(t.extendedHeartbeat.Integrations, payload.Integrations...)
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/newtelemetry/internal/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func NewTicker(tickFunc TickFunc, minInterval, maxInterval time.Duration) *Ticke
return ticker
}

func (t *Ticker) IncreaseSpeed() {
func (t *Ticker) CanIncreaseSpeed() {
t.tickSpeedMu.Lock()
defer t.tickSpeedMu.Unlock()

t.tickSpeed = max(t.tickSpeed/2, t.minInterval)
t.Reset(t.tickSpeed)
}

func (t *Ticker) DecreaseSpeed() {
func (t *Ticker) CanDecreaseSpeed() {
t.tickSpeedMu.Lock()
defer t.tickSpeedMu.Unlock()

Expand Down
1 change: 1 addition & 0 deletions internal/newtelemetry/internal/transport/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Body struct {
Host Host `json:"host"`
}

// UnmarshalJSON is used to test the telemetry client end to end
func (b *Body) UnmarshalJSON(bytes []byte) error {
var anyMap map[string]json.RawMessage
var err error
Expand Down
2 changes: 1 addition & 1 deletion internal/newtelemetry/internal/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (w *writer) Flush(payload transport.Payload) ([]EndpointRequestResult, erro
defer response.Body.Close()

if response.StatusCode >= 300 || response.StatusCode < 200 {
respBodyBytes, _ := io.ReadAll(response.Body) // maybe we can find an error reason in the response body
respBodyBytes, _ := io.ReadAll(io.LimitReader(response.Body, 256)) // maybe we can find an error reason in the response body
results = append(results, EndpointRequestResult{Error: &WriterStatusCodeError{
Status: response.Status,
Body: string(respBodyBytes),
Expand Down

0 comments on commit 373e2fd

Please sign in to comment.