Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions instrumentation/base/lib/opentelemetry/instrumentation/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# SPDX-License-Identifier: Apache-2.0

require 'opentelemetry/instrumentation/dynamic_validator'

module OpenTelemetry
module Instrumentation
# The Base class holds all metadata and configuration for an
Expand Down Expand Up @@ -72,6 +74,16 @@ class << self

private_constant :NAME_REGEX, :VALIDATORS

def get_validation_type(validation)
if VALIDATORS[validation] || validation.is_a?(DynamicValidator)
validation
elsif validation.respond_to?(:include?)
:enum
else
:callable
end
end

private :new

def inherited(subclass) # rubocop:disable Lint/MissingSuper
Expand Down Expand Up @@ -142,22 +154,17 @@ def compatible(&blk)
# and a validation callable to be provided.
# @param [String] name The name of the configuration option
# @param default The default value to be used, or to used if validation fails
# @param [Callable, Symbol] validate Accepts a callable or a symbol that matches
# a key in the VALIDATORS hash. The supported keys are, :array, :boolean,
# :callable, :integer, :string.
# @param [Callable, Symbol] validate Accepts a callable, Enumerable,
# DynamicValidator, or a symbol that matches a key in the VALIDATORS
# hash. The supported keys are, :array, :boolean, :callable, :integer,
# :string.
def option(name, default:, validate:)
validator = VALIDATORS[validate] || validate
raise ArgumentError, "validate must be #{VALIDATORS.keys.join(', ')}, or a callable" unless validator.respond_to?(:call) || validator.respond_to?(:include?)
raise ArgumentError, "validate must be #{VALIDATORS.keys.join(', ')}, or a callable" unless validator.respond_to?(:call) || validator.respond_to?(:include?) || validator.is_a?(DynamicValidator)

@options ||= []

validation_type = if VALIDATORS[validate]
validate
elsif validate.respond_to?(:include?)
:enum
else
:callable
end
validation_type = get_validation_type(validate)

@options << { name: name, default: default, validator: validator, validation_type: validation_type }
end
Expand Down Expand Up @@ -277,17 +284,25 @@ def config_options(user_config)
option_name = option[:name]
config_value = user_config[option_name]
config_override = coerce_env_var(config_overrides[option_name], option[:validation_type]) if config_overrides[option_name]
static_validator =
if option[:validator].is_a?(DynamicValidator)
option[:validator].static_validation
else
option[:validator]
end

# rubocop:disable Lint/DuplicateBranch
value = if config_value.nil? && config_override.nil?
option[:default]
elsif option[:validator].respond_to?(:include?) && option[:validator].include?(config_override)
elsif static_validator.respond_to?(:include?) && static_validator.include?(config_override)
config_override
elsif option[:validator].respond_to?(:include?) && option[:validator].include?(config_value)
elsif static_validator.respond_to?(:include?) && static_validator.include?(config_value)
config_value
elsif option[:validator].respond_to?(:call) && option[:validator].call(config_override)
elsif static_validator.respond_to?(:call) && static_validator.call(config_override)
config_override
elsif option[:validator].respond_to?(:call) && option[:validator].call(config_value)
elsif static_validator.respond_to?(:call) && static_validator.call(config_value)
config_value
elsif option[:validator].is_a?(DynamicValidator) && config_value.respond_to?(:call)
config_value
else
OpenTelemetry.logger.warn(
Expand All @@ -300,6 +315,7 @@ def config_options(user_config)

h[option_name] = value
rescue StandardError => e
pp e
OpenTelemetry.handle_error(exception: e, message: "Instrumentation #{name} unexpected configuration error")
h[option_name] = option[:default]
end
Expand Down Expand Up @@ -378,6 +394,8 @@ def coerce_env_var(env_var, validation_type)
"configurable using environment variables. Ignoring raw value: #{env_var}"
)
nil
when DynamicValidator
coerce_env_var(env_var, self.class.get_validation_type(validation_type.static_validation))
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Instrumentation
# Can wrap a static validation in this class to allow users to
# alternatively invoke a callable.
class DynamicValidator
def initialize(static_validation)
raise ArgumentError, 'static_validation cannot be dynamic' if static_validation.is_a?(self.class)

@static_validation = static_validation
end

attr_reader :static_validation
end
end
end
67 changes: 59 additions & 8 deletions instrumentation/base/test/instrumentation/base_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def initialize(*args)
option(:third, default: 1, validate: ->(v) { v <= 10 })
option(:forth, default: false, validate: :boolean)
option(:fifth, default: true, validate: :boolean)
option(:sixth, default: :cool, validate: OpenTelemetry::Instrumentation::DynamicValidator.new(%i[cool beans man]))
end
end

Expand All @@ -208,42 +209,42 @@ def initialize(*args)
it 'installs options defined by environment variable and overrides defaults' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENV_CONTROLLED_CONFIG_OPTS' => 'first=non_default_value') do
instance.install
_(instance.config).must_equal(first: 'non_default_value', second: :no, third: 1, forth: false, fifth: true)
_(instance.config).must_equal(first: 'non_default_value', second: :no, third: 1, forth: false, fifth: true, sixth: :cool)
end
end

it 'installs boolean type options defined by environment variable and only evalutes the lowercase string "true" to be truthy' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENV_CONTROLLED_CONFIG_OPTS' => 'first=non_default_value;forth=true;fifth=truthy') do
instance.install
_(instance.config).must_equal(first: 'non_default_value', second: :no, third: 1, forth: true, fifth: false)
_(instance.config).must_equal(first: 'non_default_value', second: :no, third: 1, forth: true, fifth: false, sixth: :cool)
end
end

it 'installs only enum options defined by environment variable that accept a symbol' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENV_CONTROLLED_CONFIG_OPTS' => 'second=maybe') do
instance.install
_(instance.config).must_equal(first: 'first_default', second: :maybe, third: 1, forth: false, fifth: true)
_(instance.config).must_equal(first: 'first_default', second: :maybe, third: 1, forth: false, fifth: true, sixth: :cool)
end
end

it 'installs options defined by environment variable and overrides local configuration' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENV_CONTROLLED_CONFIG_OPTS' => 'first=non_default_value') do
instance.install(first: 'another_default')
_(instance.config).must_equal(first: 'non_default_value', second: :no, third: 1, forth: false, fifth: true)
_(instance.config).must_equal(first: 'non_default_value', second: :no, third: 1, forth: false, fifth: true, sixth: :cool)
end
end

it 'installs multiple options defined by environment variable' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENV_CONTROLLED_CONFIG_OPTS' => 'first=non_default_value;second=maybe') do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENV_CONTROLLED_CONFIG_OPTS' => 'first=non_default_value;second=maybe;sixth=beans') do
instance.install(first: 'another_default', second: :yes)
_(instance.config).must_equal(first: 'non_default_value', second: :maybe, third: 1, forth: false, fifth: true)
_(instance.config).must_equal(first: 'non_default_value', second: :maybe, third: 1, forth: false, fifth: true, sixth: :beans)
end
end

it 'does not install callable options defined by environment variable' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENV_CONTROLLED_CONFIG_OPTS' => 'first=non_default_value;second=maybe;third=5') do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENV_CONTROLLED_CONFIG_OPTS' => 'first=non_default_value;second=maybe;third=5;sixth=beans') do
instance.install(first: 'another_default', second: :yes)
_(instance.config).must_equal(first: 'non_default_value', second: :maybe, third: 1, forth: false, fifth: true)
_(instance.config).must_equal(first: 'non_default_value', second: :maybe, third: 1, forth: false, fifth: true, sixth: :beans)
end
end
end
Expand Down Expand Up @@ -317,6 +318,56 @@ def initialize(*args)
end
end
end

describe 'when there is an option with a DynamicValidator (wrapping an enum) validation type' do
after do
# Force re-install of instrumentation
instance.instance_variable_set(:@installed, false)
end

let(:enum_instrumentation) do
Class.new(OpenTelemetry::Instrumentation::Base) do
instrumentation_name 'opentelemetry_instrumentation_enum'
instrumentation_version '0.0.2'

present { true }
compatible { true }
install { true }

option(:first, default: :no, validate: OpenTelemetry::Instrumentation::DynamicValidator.new(%I[yes no maybe]))
option(:second, default: :no, validate: OpenTelemetry::Instrumentation::DynamicValidator.new(%I[yes no maybe]))
end
end

let(:instance) { enum_instrumentation.instance }

it 'falls back to the default if user option is not an enumerable option' do
instance.install(first: :yes, second: :perhaps)
_(instance.config).must_equal(first: :yes, second: :no)
end

it 'installs options defined by environment variable and overrides defaults and user config' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENUM_CONFIG_OPTS' => 'first=yes') do
instance.install(first: :maybe, second: :no)
_(instance.config).must_equal(first: :yes, second: :no)
end
end

it 'falls back to install options defined by user config when environment variable fails validation' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENUM_CONFIG_OPTS' => 'first=perhaps') do
instance.install(first: :maybe, second: :no)
_(instance.config).must_equal(first: :maybe, second: :no)
end
end

it 'allows a callable option to be passed' do
OpenTelemetry::TestHelpers.with_env('OTEL_RUBY_INSTRUMENTATION_ENUM_CONFIG_OPTS' => 'first=perhaps') do
instance.install(first: :maybe, second: -> { :yes })
_(instance.config[:first]).must_equal(:maybe)
_(instance.config[:second].call).must_equal(:yes)
end
end
end
end

describe 'when uninstallable' do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module Sidekiq
# - `:none` - The job will be represented by a separate trace from the span that enqueued the job.
# There will be no explicit relationship between the job trace and the trace containing the span that
# enqueued the job.
# - Can alternatively be a callable which resolves to one of the above values.
#
# ### `:trace_launcher_heartbeat`
#
Expand Down Expand Up @@ -101,7 +102,7 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
end

option :span_naming, default: :queue, validate: %I[job_class queue]
option :propagation_style, default: :link, validate: %i[link child none]
option :propagation_style, default: :link, validate: DynamicValidator.new(%i[link child none])
option :trace_launcher_heartbeat, default: false, validate: :boolean
option :trace_poller_enqueue, default: false, validate: :boolean
option :trace_poller_wait, default: false, validate: :boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ def call(_worker, msg, _queue)
extracted_context = OpenTelemetry.propagation.extract(msg)
created_at = time_from_timestamp(msg['created_at'])
enqueued_at = time_from_timestamp(msg['created_at'])
propagation_style =
if instrumentation_config[:propagation_style].respond_to?(:call)
instrumentation_config[:propagation_style].call
else
instrumentation_config[:propagation_style]
end

OpenTelemetry::Context.with_current(extracted_context) do
if instrumentation_config[:propagation_style] == :child
if propagation_style == :child
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
span.add_event('created_at', timestamp: created_at)
span.add_event('enqueued_at', timestamp: enqueued_at)
Expand All @@ -43,7 +50,7 @@ def call(_worker, msg, _queue)
else
links = []
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
links << OpenTelemetry::Trace::Link.new(span_context) if propagation_style == :link && span_context.valid?
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
OpenTelemetry::Trace.with_span(span) do
span.add_event('created_at', timestamp: created_at)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,64 @@
end
end

describe 'when propagation_style is a callable that resolves to `:child`' do
let(:config) do
{ propagation_style: -> { :child } }
end

it 'continues the enqueuer trace to the job process' do
SimpleJob.perform_async
SimpleJob.drain

_(job_span.parent_span_id).must_equal(enqueuer_span.span_id)
_(job_span.trace_id).must_equal(enqueuer_span.trace_id)
end

it 'fan out jobs are a continous trace' do
SimpleEnqueueingJob.perform_async
Sidekiq::Worker.drain_all

_(exporter.finished_spans.size).must_equal 4

_(root_span.parent_span_id).must_equal OpenTelemetry::Trace::INVALID_SPAN_ID
_(root_span.name).must_equal 'default publish'
_(root_span.kind).must_equal :producer

child_span1 = spans.find { |s| s.parent_span_id == root_span.span_id }
_(child_span1.name).must_equal 'default process'
_(child_span1.kind).must_equal :consumer

child_span2 = spans.find { |s| s.parent_span_id == child_span1.span_id }
_(child_span2.name).must_equal 'default publish'
_(child_span2.kind).must_equal :producer

child_span3 = spans.find { |s| s.parent_span_id == child_span2.span_id }
_(child_span3.name).must_equal 'default process'
_(child_span3.kind).must_equal :consumer
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageTestingJob.perform_async
end

Sidekiq::Worker.drain_all

_(job_span.attributes['success']).must_equal(true)
end

it 'records exceptions' do
ExceptionTestingJob.perform_async
_(-> { Sidekiq::Worker.drain_all }).must_raise(RuntimeError)

ev = job_span.events
_(ev[2].attributes['exception.type']).must_equal('RuntimeError')
_(ev[2].attributes['exception.message']).must_equal('a little hell')
_(ev[2].attributes['exception.stacktrace']).wont_be_nil
end
end

describe 'when propagation_style is none' do
let(:config) { { propagation_style: :none } }

Expand Down
Loading