- 
                Notifications
    You must be signed in to change notification settings 
- Fork 45
WIP: refactor(metrics): migrate from prometheus client to opentelemetry #187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -72,13 +72,15 @@ def example(): | |
|  | ||
| Current services are swagger, request, tracer, metrics | ||
| """ | ||
|  | ||
| config_resource = CONFIG_BASE | ||
| services: List[str] = [] | ||
| application = Flask | ||
| swagger: Optional[DriverService] = None | ||
| request: Optional[DriverService] = None | ||
| tracer: Optional[DriverService] = None | ||
| metrics: Optional[DriverService] = None | ||
| opentelemetry: Optional[DriverService] = None | ||
| _singleton = True | ||
|  | ||
| def __init__(self, *args, **kwargs): | ||
|  | @@ -105,7 +107,9 @@ def init_services(self) -> None: | |
| """ | ||
| services_resources = ServicesResource() | ||
| for service_name, service in services_resources.get_services(): | ||
| if service_name not in self.services or not getattr(self, service_name, False): | ||
| if service_name not in self.services or not getattr( | ||
| self, service_name, False | ||
| ): | ||
| self.services.append(service_name) | ||
| setattr(self, service_name, service) | ||
|  | ||
|  | @@ -150,7 +154,7 @@ def init_logger(self) -> None: | |
| :return: | ||
| """ | ||
| self.application.logger = logger | ||
| os.environ['WERKZEUG_RUN_MAIN'] = "true" | ||
| os.environ["WERKZEUG_RUN_MAIN"] = "true" | ||
|  | ||
| formatter = CustomJsonFormatter() | ||
| formatter.add_service_name(self.application.config["APP_NAME"]) | ||
|  | @@ -172,11 +176,16 @@ def init_app(self) -> Flask: | |
| :return: None | ||
| """ | ||
| if self.swagger: | ||
| application = self.swagger.init_app(config=self.config.to_flask(), path=self.path) | ||
| application = self.swagger.init_app( | ||
| config=self.config.to_flask(), path=self.path | ||
| ) | ||
| else: | ||
| check_package_exists("flask") | ||
| application = Flask(__name__, static_folder=os.path.join(self.path, 'static'), | ||
| template_folder=os.path.join(self.path, 'templates')) | ||
| application = Flask( | ||
| __name__, | ||
| static_folder=os.path.join(self.path, "static"), | ||
| template_folder=os.path.join(self.path, "templates"), | ||
| ) | ||
|  | ||
| application.root_path = self.path | ||
|  | ||
|  | @@ -192,11 +201,32 @@ def init_metrics(self) -> None: | |
| if self.metrics: | ||
| self.application.register_blueprint(self.metrics.metrics_blueprint) | ||
| self.metrics.add_logger_handler( | ||
| self.application.logger, | ||
| self.application.config["APP_NAME"] | ||
| self.application.logger, self.application.config["APP_NAME"] | ||
| ) | ||
| self.metrics.monitor(self.application.config["APP_NAME"], self.application) | ||
|  | ||
| def init_opentelemetry(self) -> None: | ||
| if self.opentelemetry: | ||
| if self.opentelemetry.config.metrics.enabled: | ||
| # Set metrics backend | ||
| self.opentelemetry.set_metrics_backend() | ||
| # Set the metrics blueprint | ||
| # DISCLAIMER this endpoint may be only necessary with prometheus client | ||
| self.application.register_blueprint(self.opentelemetry.blueprint) | ||
| # Set instrumentations | ||
| if self.opentelemetry.config.metrics.instrumentations.flask: | ||
| self.opentelemetry.monitor( | ||
| self.application.config["APP_NAME"], self.application | ||
| ) | ||
| if self.opentelemetry.config.metrics.instrumentations.logger: | ||
| self.opentelemetry.add_logger_handler( | ||
| self.application.logger, self.application.config["APP_NAME"] | ||
| ) | ||
| if self.opentelemetry.config.tracing.enabled: | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @avara1986 Do you think it make sense to add this to the PR? They won't be useful, since the tracing and logging are out of scope, but it may be useful to have it already designed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with the new method of serviceDriver  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Open telemetry is a kind of a meta service. As it may instrument other services (there is an issue for redis integration already afair) it seems that configuring integration should actually be a responsibility of the service that provides instrumented module. Maybe in similar style to ‘init_action’ there should be ‘init_telemetry’ that would be called by opentelemetry service? Or is it too tightly coupled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What you're proposing that for every service there's a telemetry section? It may make sense, but not all possible instrumentation of OT will be available as service. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've checked code for some connectors that would be useful for services (not only documentation this time) and while some of example instrumentations take params it seems to really be just an optional tracer. So it's possible to configure those by simply iterating over a list after all. There aren't that many instrumentations that need custom setup and would be useful here - Celery, Flask, WSGI, maybe a few more that may work here due to not being async-related. But requests also uses optional callback so advanced config may be useful here. Also not sure if support for multiple tracers is important? opentelemetry:
      integrations:
      - redis
      - requestsyou end with something like this: redis:
      telemetry: true
requests:
      telemetry: trueAbout  redis:
      telemetry: true
      service_enabled: false
requests:
      telemetry: trueBut there is one major advantage when using OT instrumentation list in config: those simple instrumentations may be enabled completely without a service - or even without any additional support in pyms. But then something like this would need to be suported:  | ||
| self.opentelemetry.set_tracing_backend() | ||
| if self.opentelemetry.config.logging.enabled: | ||
| self.opentelemetry.set_logging_backend() | ||
|  | ||
| def reload_conf(self): | ||
| self.delete_services() | ||
| self.config.reload() | ||
|  | @@ -230,7 +260,11 @@ def create_app(self) -> Flask: | |
|  | ||
| self.init_metrics() | ||
|  | ||
| logger.debug("Started app with PyMS and this services: {}".format(self.services)) | ||
| self.init_opentelemetry() | ||
|  | ||
| logger.debug( | ||
| "Started app with PyMS and this services: {}".format(self.services) | ||
| ) | ||
|  | ||
| return self.application | ||
|  | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| import logging | ||
| import time | ||
| from typing import Text | ||
|  | ||
| from flask import Blueprint, Response, request | ||
| from pyms.flask.services.driver import DriverService | ||
|  | ||
| from opentelemetry import metrics | ||
| from opentelemetry.exporter.prometheus import PrometheusMetricsExporter | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have an opinion about optional imports? With this service there will be a lot of imports, at least one per instrumentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not import using https://github.com/pallets/werkzeug/blob/master/src/werkzeug/utils.py#L778 - it's already a flask dependency. | ||
| from opentelemetry.sdk.metrics import Counter, MeterProvider, ValueRecorder | ||
| from opentelemetry.sdk.metrics.export.controller import PushController | ||
| from prometheus_client import generate_latest | ||
|  | ||
| # TODO set sane defaults | ||
| # https://github.com/python-microservices/pyms/issues/218 | ||
| # TODO validate config | ||
| # https://github.com/python-microservices/pyms/issues/219 | ||
| PROMETHEUS_CLIENT = "prometheus" | ||
|  | ||
|  | ||
| class FlaskMetricsWrapper: | ||
| def __init__(self, app_name: str, meter: MeterProvider): | ||
| self.app_name = app_name | ||
| # TODO add Histogram support for flask when available | ||
| # https://github.com/open-telemetry/opentelemetry-python/issues/1255 | ||
| self.flask_request_latency = meter.create_metric( | ||
| "http_server_requests_seconds", | ||
| "Flask Request Latency", | ||
| "http_server_requests_seconds", | ||
| float, | ||
| ValueRecorder, | ||
| ("service", "method", "uri", "status"), | ||
| ) | ||
| self.flask_request_count = meter.create_metric( | ||
| "http_server_requests_count", | ||
| "Flask Request Count", | ||
| "http_server_requests_count", | ||
| int, | ||
| Counter, | ||
| ["service", "method", "uri", "status"], | ||
| ) | ||
|  | ||
| def before_request(self): # pylint: disable=R0201 | ||
| request.start_time = time.time() | ||
|  | ||
| def after_request(self, response: Response) -> Response: | ||
| if hasattr(request.url_rule, "rule"): | ||
| path = request.url_rule.rule | ||
| else: | ||
| path = request.path | ||
| request_latency = time.time() - request.start_time | ||
| labels = { | ||
| "service": self.app_name, | ||
| "method": str(request.method), | ||
| "uri": path, | ||
| "status": str(response.status_code), | ||
| } | ||
|  | ||
| self.flask_request_latency.record(request_latency, labels) | ||
| self.flask_request_count.add(1, labels) | ||
|  | ||
| return response | ||
|  | ||
|  | ||
| class Service(DriverService): | ||
| """ | ||
| Adds [OpenTelemetry](https://opentelemetry.io/) metrics using the [Opentelemetry Client Library](https://opentelemetry-python.readthedocs.io/en/latest/exporter/). | ||
| """ | ||
|  | ||
| config_resource: Text = "opentelemetry" | ||
|  | ||
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
| self.blueprint = Blueprint("opentelemetry", __name__) | ||
| self.serve_metrics() | ||
|  | ||
| def set_metrics_backend(self): | ||
| # Set meter provider | ||
| metrics.set_meter_provider(MeterProvider()) | ||
| self.meter = metrics.get_meter(__name__) | ||
| if self.config.metrics.backend.lower() == PROMETHEUS_CLIENT: | ||
| exporter = PrometheusMetricsExporter() | ||
| else: | ||
| pass | ||
| # Create the push controller that will update the metrics when the | ||
| # interval is met | ||
| PushController(self.meter, exporter, self.config.metrics.interval) | ||
|  | ||
| def set_tracing_backend(self): | ||
| pass | ||
|  | ||
| def set_logging_backend(self): | ||
| pass | ||
|  | ||
| def monitor(self, app_name, app): | ||
| metric = FlaskMetricsWrapper(app_name, self.meter) | ||
| app.before_request(metric.before_request) | ||
| app.after_request(metric.after_request) | ||
|  | ||
| def serve_metrics(self): | ||
| @self.blueprint.route("/metrics", methods=["GET"]) | ||
| def metrics(): # pylint: disable=unused-variable | ||
| return Response( | ||
| generate_latest(), | ||
| mimetype="text/print()lain", | ||
| content_type="text/plain; charset=utf-8", | ||
| ) | ||
|  | ||
| def add_logger_handler( | ||
| self, logger: logging.Logger, service_name: str | ||
| ) -> logging.Logger: | ||
| logger.addHandler(MetricsLogHandler(service_name, self.meter)) | ||
| return logger | ||
|  | ||
|  | ||
| class MetricsLogHandler(logging.Handler): | ||
| """A LogHandler that exports logging metrics for OpenTelemetry.""" | ||
|  | ||
| def __init__(self, app_name: str, meter: MeterProvider): | ||
| super().__init__() | ||
| self.app_name = str(app_name) | ||
| self.logger_total_messages = meter.create_metric( | ||
| "logger_messages_total", | ||
| "Count of log entries by service and level.", | ||
| "logger_messages_total", | ||
| int, | ||
| Counter, | ||
| ["service", "level"], | ||
| ) | ||
|  | ||
| def emit(self, record) -> None: | ||
| labels = {"service": self.app_name, "level": record.levelname} | ||
| self.logger_total_messages.add(1, labels) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avara1986 I'm having some doubts defining the opentelemetry service.
For example, in this section. There may be tens of instrumentations: aiohttp, flask, sqlite3, jinja, mysql, ... There's already more than 15. This means that in this section, there will grow a lot.
My question is, should this conditionals be directly on the service? Some of them have to be here, but only a couple or so. This would make this section lighter in the future, but this seems to not be the way the rest of the services are doing this, so maybe for consistency this shouldn't.