Skip to content

Commit ef63f09

Browse files
OpenTelemetry tracing, with jaeger in development environment.
For developers - includes an instance of Jaeger in the docker-compose development environment, available at port 16686. For production - supports OTLP/http exported traces to a specified endpoint in the configuration files. Please see docs/tracing.md for more details. Signed-off-by: Mike Kingsbury <[email protected]>
1 parent 750d47d commit ef63f09

25 files changed

+1384
-170
lines changed

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Documents that outgrew this README can be found in the `docs/` drectory.
5151
* [pip.md](./docs/pip.md) is a guide for using pip with Cachito
5252
* [using_requests_locally.md](./docs/using_requests_locally.md) explains how to use Cachito
5353
requests to run builds on your PC
54+
* [tracing.md](./docs/tracing.md) documents Cachito's support for OpenTelemetry tracing
5455

5556
## Coding Standards
5657

@@ -548,6 +549,8 @@ Custom configuration for the Celery workers are listed below:
548549
* `cachito_subprocess_timeout` - a number (in seconds) to set a timeout for commands executed by
549550
the `subprocess` module. Default is 3600 seconds. A timeout is always required, and there is no
550551
way provided by Cachito to disable it. Set a larger number to give the subprocess execution more time.
552+
* `cachito_otlp_exporter_endpoint` - A valid URL with a port number as necessary to a OTLP/http-compatible
553+
endpoint to receive OpenTelemetry trace data.
551554

552555
To configure the workers to use a Kerberos keytab for authentication, set the `KRB5_CLIENT_KTNAME`
553556
environment variable to the path of the keytab. Additional Kerberos configuration can be made in
@@ -574,6 +577,8 @@ Custom configuration for the API:
574577
* `CACHITO_WORKER_USERNAMES` - the list of usernames that are allowed to use the `/requests/<id>`
575578
PATCH endpoint.
576579
* `LOGIN_DISABLED` - disables authentication requirements.
580+
* `CACHITO_OTLP_EXPORTER_ENDPOINT` - A valid URL with a port number as necessary to a OTLP/http-compatible
581+
endpoint to receive OpenTelemetry trace data.
577582

578583
Additionally, to configure the communication with the Cachito Celery workers, create a Python file
579584
at `/etc/cachito/celery.py`, and set the

cachito/web/api_v1.py

+21-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from celery import chain
1515
from flask import stream_with_context
1616
from flask_login import current_user, login_required
17+
from opentelemetry import trace
1718
from sqlalchemy import and_, func
1819
from sqlalchemy.orm import joinedload, load_only
1920
from werkzeug.exceptions import BadRequest, Forbidden, Gone, InternalServerError, NotFound
@@ -43,6 +44,8 @@
4344

4445
api_v1 = flask.Blueprint("api_v1", __name__)
4546

47+
tracer = trace.get_tracer(__name__)
48+
4649

4750
class RequestsArgs(pydantic.BaseModel):
4851
"""Query parameters for /request endpoint."""
@@ -153,6 +156,7 @@ def get_requests():
153156
return flask.jsonify(response)
154157

155158

159+
@tracer.start_as_current_span("get_request")
156160
def get_request(request_id):
157161
"""
158162
Retrieve details for the given request.
@@ -231,6 +235,7 @@ def get_request_environment_variables(request_id):
231235
return flask.jsonify(env_vars_json)
232236

233237

238+
@tracer.start_as_current_span("download_archive")
234239
def download_archive(request_id):
235240
"""
236241
Download archive of source code.
@@ -311,6 +316,7 @@ def list_packages_and_dependencies(request_id):
311316

312317

313318
@login_required
319+
@tracer.start_as_current_span("create_request")
314320
def create_request():
315321
"""
316322
Submit a request to resolve and cache the given source code and its dependencies.
@@ -342,12 +348,25 @@ def create_request():
342348
cachito_metrics["gauge_state"].labels(state="total").inc()
343349
cachito_metrics["gauge_state"].labels(state=request.state.state_name).inc()
344350

351+
ctx = trace.get_current_span().get_span_context()
352+
# Format the trace_id to a conventional 32 digit hexadecimal number that can be used
353+
# by jaeger or other endpoints for tracing.
354+
trace_id = "{trace:032x}".format(trace=ctx.trace_id)
355+
356+
current_span = trace.get_current_span()
357+
current_span.set_attribute("cachito.request.id", request.id)
358+
345359
if current_user.is_authenticated:
346360
flask.current_app.logger.info(
347-
"The user %s submitted request %d", current_user.username, request.id
361+
"The user %s submitted request %d; trace_id: %s",
362+
current_user.username,
363+
request.id,
364+
trace_id,
348365
)
349366
else:
350-
flask.current_app.logger.info("An anonymous user submitted request %d", request.id)
367+
flask.current_app.logger.info(
368+
"An anonymous user submitted request %d; trace_id: %s", request.id, trace_id
369+
)
351370

352371
# Chain tasks
353372
error_callback = tasks.failed_request_callback.s(request.id)

cachito/web/app.py

+64-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@
1010
from flask.logging import default_handler
1111
from flask_login import LoginManager
1212
from flask_migrate import Migrate
13+
from opentelemetry import trace
14+
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
15+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
16+
from opentelemetry.instrumentation.celery import CeleryInstrumentor
17+
from opentelemetry.instrumentation.flask import FlaskInstrumentor
18+
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
19+
from opentelemetry.instrumentation.requests import RequestsInstrumentor
20+
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
21+
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
22+
from opentelemetry.sdk.trace import TracerProvider
23+
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
1324
from sqlalchemy.exc import SQLAlchemyError
1425
from werkzeug.exceptions import InternalServerError, default_exceptions
1526

@@ -133,10 +144,62 @@ def create_app(config_obj=None):
133144
app.register_error_handler(pydantic.ValidationError, validation_error)
134145

135146
init_metrics(app)
136-
147+
_instrument_app(app)
137148
return app
138149

139150

151+
def _instrument_app(app):
152+
"""
153+
Instrument the Flask app.
154+
155+
Sets up the OpenTelemetry tracing exporter, configures the endpoint
156+
to send trace data to.
157+
"""
158+
# Some of the following has already been executed due to the manner in which
159+
# the tasks config is included....
160+
161+
service_name = "cachito-api"
162+
resource = Resource(attributes={SERVICE_NAME: service_name})
163+
provider = TracerProvider(resource=resource)
164+
165+
# Used for local development environment aka docker-compose up.
166+
if "CACHITO_JAEGER_EXPORTER_ENDPOINT" in app.config.keys():
167+
app.logger.info("Configuring Jaeger Exporter")
168+
jaeger_exporter = JaegerExporter(
169+
agent_host_name=app.config["CACHITO_JAEGER_EXPORTER_ENDPOINT"],
170+
agent_port=int(app.config["CACHITO_JAEGER_EXPORTER_PORT"]),
171+
)
172+
processor = BatchSpanProcessor(jaeger_exporter)
173+
# test/stage/prod environments....
174+
elif "CACHITO_OTLP_EXPORTER_ENDPOINT" in app.config.keys():
175+
app.logger.info(
176+
"Configuring OTLP Exporter: " + str(app.config["CACHITO_OTLP_EXPORTER_ENDPOINT"])
177+
)
178+
otlp_exporter = OTLPSpanExporter(endpoint=app.config["CACHITO_OTLP_EXPORTER_ENDPOINT"])
179+
processor = BatchSpanProcessor(otlp_exporter)
180+
# Undefined; send data to the console.
181+
else:
182+
app.logger.info("Configuring ConsoleSpanExporter")
183+
processor = BatchSpanProcessor(ConsoleSpanExporter())
184+
185+
# Toggle between sending to jaeger and displaying span info on console
186+
provider.add_span_processor(processor)
187+
trace.set_tracer_provider(provider)
188+
189+
FlaskInstrumentor().instrument_app(
190+
app, excluded_urls="/static/*,/favicon.ico,/metrics,/healthcheck"
191+
)
192+
RequestsInstrumentor().instrument()
193+
CeleryInstrumentor().instrument()
194+
SQLAlchemyInstrumentor().instrument(
195+
enable_commenter=True,
196+
commenter_options={
197+
"db_driver": True,
198+
},
199+
)
200+
Psycopg2Instrumentor().instrument(enable_commenter=True, commenter_options={})
201+
202+
140203
def create_cli_app():
141204
"""
142205
Create a Flask application instance and validate the configuration for the Flask CLI.

cachito/web/config.py

+2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class DevelopmentConfig(Config):
5656
SQLALCHEMY_DATABASE_URI = "postgresql+psycopg2://cachito:cachito@db:5432/cachito"
5757
SQLALCHEMY_TRACK_MODIFICATIONS = True
5858
LOGIN_DISABLED = True
59+
CACHITO_JAEGER_EXPORTER_ENDPOINT = "jaeger"
60+
CACHITO_JAEGER_EXPORTER_PORT = 6831
5961

6062

6163
class TestingConfig(DevelopmentConfig):

cachito/web/content_manifest.py

-2
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ def process_gomod(self, package, dependency, type="icm"):
9191
:param type: icm or sbom component
9292
"""
9393
if dependency.type == "gomod":
94-
9594
parent_module_name = package.name
9695
relpath_from_parent_module_to_dep = None
9796

@@ -339,7 +338,6 @@ def to_json(self):
339338
self._gitsubmodule_data = {}
340339

341340
for package in self.packages:
342-
343341
if package.type == "go-package":
344342
purl = to_top_level_purl(package, self.request, subpath=package.path)
345343
self._gopkg_data.setdefault(

cachito/workers/__init__.py

+16-14
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88
from tarfile import ExtractError, TarFile
99
from typing import Iterator
1010

11+
from opentelemetry import trace
12+
1113
from cachito.errors import SubprocessCallError
1214
from cachito.workers.config import get_worker_config
1315
from cachito.workers.errors import CachitoCalledProcessError
1416

1517
log = logging.getLogger(__name__)
18+
tracer = trace.get_tracer(__name__)
1619

1720

1821
def run_cmd(cmd, params, exc_msg=None):
@@ -29,22 +32,22 @@ def run_cmd(cmd, params, exc_msg=None):
2932
params.setdefault("capture_output", True)
3033
params.setdefault("universal_newlines", True)
3134
params.setdefault("encoding", "utf-8")
35+
with tracer.start_as_current_span("running cmd " + " ".join(cmd)):
36+
conf = get_worker_config()
37+
params.setdefault("timeout", conf.cachito_subprocess_timeout)
3238

33-
conf = get_worker_config()
34-
params.setdefault("timeout", conf.cachito_subprocess_timeout)
35-
36-
try:
37-
response = subprocess.run(cmd, **params) # nosec
38-
except subprocess.TimeoutExpired as e:
39-
raise SubprocessCallError(str(e))
39+
try:
40+
response = subprocess.run(cmd, **params) # nosec
41+
except subprocess.TimeoutExpired as e:
42+
raise SubprocessCallError(str(e))
4043

41-
if response.returncode != 0:
42-
log.error('The command "%s" failed with: %s', " ".join(cmd), response.stderr)
43-
raise CachitoCalledProcessError(
44-
exc_msg or "An unexpected error occurred", response.returncode
45-
)
44+
if response.returncode != 0:
45+
log.error('The command "%s" failed with: %s', " ".join(cmd), response.stderr)
46+
raise CachitoCalledProcessError(
47+
exc_msg or "An unexpected error occurred", response.returncode
48+
)
4649

47-
return response.stdout
50+
return response.stdout
4851

4952

5053
def load_json_stream(s: str) -> Iterator:
@@ -79,7 +82,6 @@ def safe_extract(tar: TarFile, path: str = ".", *, numeric_owner: bool = False):
7982
"""
8083
abs_path = Path(path).resolve()
8184
for member in tar.getmembers():
82-
8385
member_path = Path(path).joinpath(member.name)
8486
abs_member_path = member_path.resolve()
8587

cachito/workers/config.py

+18-3
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66

77
import celery
88
import kombu
9+
from opentelemetry.instrumentation.requests import RequestsInstrumentor
910

1011
from cachito.errors import ConfigError
1112

1213
ARCHIVES_VOLUME = os.path.join(tempfile.gettempdir(), "cachito-archives")
1314

15+
RequestsInstrumentor().instrument()
16+
17+
1418
app = celery.Celery()
1519

1620

@@ -68,6 +72,9 @@ class Config(object):
6872
cachito_task_log_format = (
6973
"[%(asctime)s #%(request_id)s %(name)s %(levelname)s %(module)s.%(funcName)s] %(message)s"
7074
)
75+
cachito_jaeger_exporter_endpoint: Optional[str] = ""
76+
cachito_jaeger_exporter_port: Optional[int]
77+
cachito_otlp_exporter_endpoint: Optional[str] = ""
7178
include = [
7279
"cachito.workers.tasks.general",
7380
"cachito.workers.tasks.gomod",
@@ -140,6 +147,8 @@ class DevelopmentConfig(Config):
140147
}
141148
cachito_request_file_logs_dir: Optional[str] = "/var/log/cachito/requests"
142149
cachito_sources_dir = os.path.join(ARCHIVES_VOLUME, "sources")
150+
cachito_jaeger_exporter_endpoint = "jaeger"
151+
cachito_jaeger_exporter_port = 6831
143152

144153

145154
class TestingConfig(DevelopmentConfig):
@@ -166,6 +175,14 @@ def configure_celery(celery_app):
166175
167176
:param celery.Celery celery: the Celery application instance to configure
168177
"""
178+
config = get_config()
179+
180+
celery_app.config_from_object(config, force=True)
181+
logging.getLogger("cachito.workers").setLevel(celery_app.conf.cachito_log_level)
182+
183+
184+
def get_config():
185+
"""Read in the config based on the environment."""
169186
config = ProductionConfig
170187
prod_config_file_path = "/etc/cachito/celery.py"
171188
if os.getenv("CACHITO_DEV", "").lower() == "true":
@@ -190,9 +207,7 @@ def configure_celery(celery_app):
190207
# The _user_config dictionary will contain the __builtins__ key, which we need to skip
191208
if not key.startswith("__"):
192209
setattr(config, key, value)
193-
194-
celery_app.config_from_object(config, force=True)
195-
logging.getLogger("cachito.workers").setLevel(celery_app.conf.cachito_log_level)
210+
return config
196211

197212

198213
def validate_celery_config(conf, **kwargs):

cachito/workers/pkg_managers/general.py

+8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import aiohttp
1010
import aiohttp_retry
1111
import requests
12+
from opentelemetry import trace
1213

1314
from cachito.common.checksum import hash_file
1415
from cachito.errors import InvalidChecksum, InvalidRequestData, NetworkError, UnknownHashAlgorithm
@@ -27,6 +28,7 @@
2728
]
2829

2930
log = logging.getLogger(__name__)
31+
tracer = trace.get_tracer(__name__)
3032

3133
ChecksumInfo = collections.namedtuple("ChecksumInfo", "algorithm hexdigest")
3234

@@ -45,6 +47,7 @@ def _get_request_url(request_id):
4547
return f'{config.cachito_api_url.rstrip("/")}/requests/{request_id}'
4648

4749

50+
@tracer.start_as_current_span("update_request_with_config_files")
4851
def update_request_with_config_files(request_id, config_files):
4952
"""
5053
Update the Cachito request with the input configuration files.
@@ -77,6 +80,7 @@ def update_request_with_config_files(request_id, config_files):
7780
raise InvalidRequestData(f"Adding configuration files on request {request_id} failed")
7881

7982

83+
@tracer.start_as_current_span("update_request_env_vars")
8084
def update_request_env_vars(request_id: int, env_vars: Dict[str, Dict[str, str]]) -> None:
8185
"""Update environment variables of a request.
8286
@@ -111,6 +115,7 @@ def update_request_env_vars(request_id: int, env_vars: Dict[str, Dict[str, str]]
111115
raise InvalidRequestData(f"Updating environment variables on request {request_id} failed")
112116

113117

118+
@tracer.start_as_current_span("verify_checksum")
114119
def verify_checksum(file_path: str, checksum_info: ChecksumInfo, chunk_size: int = 10240):
115120
"""
116121
Verify the checksum of the file at the given path matches the expected checksum info.
@@ -138,6 +143,7 @@ def verify_checksum(file_path: str, checksum_info: ChecksumInfo, chunk_size: int
138143
raise InvalidChecksum(msg)
139144

140145

146+
@tracer.start_as_current_span("download_binary_file")
141147
def download_binary_file(url, download_path, auth=None, insecure=False, chunk_size=8192):
142148
"""
143149
Download a binary file (such as a TAR archive) from a URL.
@@ -205,6 +211,7 @@ async def async_download_binary_file(
205211
log.debug(f"Download completed - {tarball_name}")
206212

207213

214+
@tracer.start_as_current_span("download_raw_component")
208215
def download_raw_component(raw_component_name, raw_repo_name, download_path, nexus_auth):
209216
"""
210217
Download raw component if present in raw repo.
@@ -222,6 +229,7 @@ def download_raw_component(raw_component_name, raw_repo_name, download_path, nex
222229
return False
223230

224231

232+
@tracer.start_as_current_span("upload_raw_package")
225233
def upload_raw_package(repo_name, artifact_path, dest_dir, filename, is_request_repository):
226234
"""
227235
Upload a raw package to a Nexus repository.

0 commit comments

Comments
 (0)