Skip to content

Commit 2facea8

Browse files
Sid MohanSid Mohan
authored andcommitted
otel init
1 parent 4aeb2f0 commit 2facea8

File tree

438 files changed

+24467
-30991
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

438 files changed

+24467
-30991
lines changed

.env

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
APPLICATIONINSIGHTS_CONNECTION_STRING="InstrumentationKey=00bea047-1836-46fa-9652-26d43d63a3fa;IngestionEndpoint=https://eastus-8.in.applicationinsights.azure.com/;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/;ApplicationId=959cc365-c112-491b-af69-b196d0943ca4"
2+
3+
4+
# note this is an Azure specific implementation of the OpenTelemetry distro. for more information please see https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/monitor/azure-monitor-opentelemetry

datafog/main.py

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,34 @@
1313
from .services.image_service import ImageService
1414
from .services.spark_service import SparkService
1515
from .services.text_service import TextService
16-
from .telemetry.open_telemetry import Telemetry
17-
16+
from opentelemetry import trace
17+
from opentelemetry.sdk.trace import TracerProvider
18+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
19+
import os
20+
from opentelemetry import trace
21+
from opentelemetry.sdk.trace import TracerProvider
22+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
23+
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
24+
from azure.monitor.opentelemetry import configure_azure_monitor
25+
import platform
26+
from opentelemetry.trace import Status, StatusCode
27+
28+
# Use environment variable if available, otherwise fall back to hardcoded value
29+
from opentelemetry import trace
30+
from opentelemetry.sdk.trace import TracerProvider
31+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
32+
from logging import INFO, getLogger
33+
from dotenv import load_dotenv
34+
import logging
35+
36+
load_dotenv() # Load environment variables from .env file
37+
APPLICATIONINSIGHTS_CONNECTION_STRING = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
38+
configure_azure_monitor(connection_string=APPLICATIONINSIGHTS_CONNECTION_STRING)
39+
trace.set_tracer_provider(TracerProvider())
40+
exporter = AzureMonitorTraceExporter(connection_string=APPLICATIONINSIGHTS_CONNECTION_STRING)
41+
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))
42+
logger = logging.getLogger("datafog_logger")
43+
logger.setLevel(INFO)
1844

1945
class DataFog:
2046
def __init__(
@@ -28,25 +54,52 @@ def __init__(
2854
self.text_service = text_service
2955
self.spark_service: SparkService = spark_service
3056
self.operations: List[OperationType] = operations
31-
self.telemetry = Telemetry()
32-
self.telemetry.set_tracer()
57+
self.logger = logging.getLogger(__name__)
58+
self.logger.info("Initializing DataFog class with the following services and operations:")
59+
self.logger.info(f"Image Service: {type(image_service)}")
60+
self.logger.info(f"Text Service: {type(text_service)}")
61+
self.logger.info(f"Spark Service: {type(spark_service) if spark_service else 'None'}")
62+
self.logger.info(f"Operations: {operations}")
63+
self.tracer = trace.get_tracer(__name__)
3364

3465
async def run_ocr_pipeline(self, image_urls: List[str]):
3566
"""Run the OCR pipeline asynchronously."""
36-
extracted_text = await self.image_service.ocr_extract(image_urls)
37-
if OperationType.ANNOTATE_PII in self.operations:
38-
annotated_text = await self.text_service.batch_annotate_texts(
39-
extracted_text
40-
)
41-
return annotated_text
42-
return extracted_text
43-
67+
with self.tracer.start_as_current_span("run_ocr_pipeline") as span:
68+
try:
69+
extracted_text = await self.image_service.ocr_extract(image_urls)
70+
self.logger.info(f"OCR extraction completed for {len(image_urls)} images.")
71+
self.logger.debug(f"Total length of extracted text: {sum(len(text) for text in extracted_text)}")
72+
73+
if OperationType.ANNOTATE_PII in self.operations:
74+
annotated_text = await self.text_service.batch_annotate_texts(extracted_text)
75+
self.logger.info(f"Text annotation completed with {len(annotated_text)} annotations.")
76+
return annotated_text
77+
78+
return extracted_text
79+
except Exception as e:
80+
self.logger.error(f"Error in run_ocr_pipeline: {str(e)}")
81+
span.set_status(Status(StatusCode.ERROR, str(e)))
82+
raise
4483
async def run_text_pipeline(self, texts: List[str]):
4584
"""Run the text pipeline asynchronously."""
46-
if OperationType.ANNOTATE_PII in self.operations:
47-
annotated_text = await self.text_service.batch_annotate_texts(texts)
48-
return annotated_text
49-
return texts
85+
with self.tracer.start_as_current_span("run_text_pipeline") as span:
86+
try:
87+
self.logger.info(f"Starting text pipeline with {len(texts)} texts.")
88+
if OperationType.ANNOTATE_PII in self.operations:
89+
annotated_text = await self.text_service.batch_annotate_texts(texts)
90+
self.logger.info(f"Text annotation completed with {len(annotated_text)} annotations.")
91+
return annotated_text
92+
93+
self.logger.info("No annotation operation found; returning original texts.")
94+
return texts
95+
except Exception as e:
96+
self.logger.error(f"Error in run_text_pipeline: {str(e)}")
97+
span.set_status(Status(StatusCode.ERROR, str(e)))
98+
raise
99+
def _add_attributes(self, span, attributes: dict):
100+
"""Add multiple attributes to a span."""
101+
for key, value in attributes.items():
102+
span.set_attribute(key, value)
50103

51104

52105
class OCRPIIAnnotator:

datafog/telemetry/open_telemetry.py

Lines changed: 45 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,113 +1,72 @@
1-
import asyncio
2-
import json
3-
import logging
41
import os
2+
from opentelemetry import trace
3+
from opentelemetry.sdk.trace import TracerProvider
4+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
5+
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
6+
from azure.monitor.opentelemetry import configure_azure_monitor
57
import platform
6-
from typing import Any
8+
from opentelemetry.trace import Status, StatusCode
79

8-
import pkg_resources
9-
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
10+
# Use environment variable if available, otherwise fall back to hardcoded value
1011
from opentelemetry import trace
11-
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
1212
from opentelemetry.sdk.trace import TracerProvider
1313
from opentelemetry.sdk.trace.export import BatchSpanProcessor
14-
from opentelemetry.trace import Status, StatusCode
14+
from logging import INFO, getLogger
15+
from dotenv import load_dotenv
16+
from azure.monitor.opentelemetry import configure_azure_monitor
17+
load_dotenv()
1518

19+
APPLICATIONINSIGHTS_CONNECTION_STRING = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
1620

1721
class Telemetry:
18-
"""A class to handle anonymous telemetry for the DataFog package."""
19-
20-
def __init__(self, instrumentation_key: str = os.getenv("INSTRUMENTATION_KEY")):
22+
def __init__(self):
2123
self.ready = False
2224
self.trace_set = False
2325
try:
24-
self.resource = Resource(attributes={SERVICE_NAME: "datafog-python"})
25-
self.provider = TracerProvider(resource=self.resource)
26+
# Create a new TracerProvider and set it as the global trace provider
27+
tracer_provider = TracerProvider()
28+
trace.set_tracer_provider(tracer_provider)
29+
30+
# Configure Azure Monitor with the connection string from environment variables
31+
configure_azure_monitor(connection_string=APPLICATIONINSIGHTS_CONNECTION_STRING, logger_name="datafog_logger")
32+
33+
# Create an exporter that sends data to Application Insights
2634
exporter = AzureMonitorTraceExporter(
27-
connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
35+
connection_string=APPLICATIONINSIGHTS_CONNECTION_STRING
2836
)
29-
processor = BatchSpanProcessor(exporter)
30-
self.provider.add_span_processor(processor)
37+
38+
# Create a span processor and add it to the tracer provider
39+
span_processor = BatchSpanProcessor(exporter)
40+
tracer_provider.add_span_processor(span_processor)
41+
42+
# Get a tracer
43+
self.tracer = trace.get_tracer(__name__)
44+
3145
self.ready = True
32-
except BaseException as e:
33-
if isinstance(
34-
e,
35-
(SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError),
36-
):
37-
raise
38-
self.ready = False
46+
self.trace_set = True
3947

40-
def set_tracer(self):
41-
"""Sets the tracer for telemetry."""
42-
if self.ready:
43-
try:
44-
trace.set_tracer_provider(self.provider)
45-
self.trace_set = True
46-
except Exception:
47-
self.trace_set = False
48+
except Exception as e:
49+
print(f"Error setting up Azure Monitor: {e}")
4850

49-
def log_system_info(self):
50-
"""Logs system information."""
51-
if self.ready:
52-
try:
53-
tracer = trace.get_tracer("datafog.telemetry")
54-
with tracer.start_as_current_span("System Info") as span:
55-
self._add_attribute(
56-
span,
57-
"datafog_version",
58-
pkg_resources.get_distribution("datafog").version,
59-
)
60-
self._add_attribute(
61-
span, "python_version", platform.python_version()
62-
)
63-
self._add_attribute(span, "os", platform.system())
64-
self._add_attribute(span, "platform_version", platform.version())
65-
self._add_attribute(span, "cpus", os.cpu_count())
66-
span.set_status(Status(StatusCode.OK))
67-
except Exception:
68-
pass
6951

70-
def pipeline_execution(self, datafog, input_data, output_data):
71-
"""Records the execution of a DataFog pipeline."""
72-
if self.ready:
73-
try:
74-
tracer = trace.get_tracer("datafog.telemetry")
75-
with tracer.start_as_current_span("Pipeline Execution") as span:
76-
self._add_attribute(
77-
span,
78-
"datafog_version",
79-
pkg_resources.get_distribution("datafog").version,
80-
)
81-
self._add_attribute(
82-
span, "pipeline_type", datafog.__class__.__name__
83-
)
84-
self._add_attribute(span, "input_data", input_data)
85-
self._add_attribute(span, "output_data", output_data)
86-
span.set_status(Status(StatusCode.OK))
87-
except Exception:
88-
pass
8952

90-
def end_pipeline(self, datafog, output):
53+
def datafog_creation(self, name: str):
9154
if self.ready:
9255
try:
93-
tracer = trace.get_tracer("datafog.telemetry")
94-
with tracer.start_as_current_span("Pipeline Ended") as span:
95-
self._add_attribute(
96-
span,
97-
"datafog_version",
98-
pkg_resources.get_distribution("datafog").version,
99-
)
100-
self._add_attribute(
101-
span, "pipeline_type", datafog.__class__.__name__
102-
)
103-
self._add_attribute(span, "output", output)
104-
span.set_status(Status(StatusCode.OK))
105-
except Exception:
106-
pass
56+
tracer = trace.get_tracer(__name__)
57+
span = tracer.start_span("datafog object created")
58+
self._add_attribute(span, "datafog_name", name)
59+
self._add_attribute(span, "datafog_version", platform.python_version())
60+
span.set_status(Status(StatusCode.OK))
61+
span.end()
62+
except Exception as e:
63+
print(f"Error starting span: {e}")
64+
return None
10765

10866
def _add_attribute(self, span, key, value):
10967
"""Add an attribute to a span."""
11068
try:
111-
span.set_attribute(key, value)
69+
return span.set_attribute(key, value)
11270
except Exception:
11371
pass
72+

env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
APPLICATIONINSIGHTS_CONNECTION_STRING="" # note this is an Azure specific implementation of the OpenTelemetry distro. for more information please see https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/monitor/azure-monitor-opentelemetry

requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ spacy==3.4.4
55
# pyspark==3.4.1
66
pytest==8.0.2
77
Requests==2.31.0
8-
setuptools==58.1.0
8+
setuptools==70.0.0
99
pydantic==1.10.15
1010
fastapi
1111
pandas
@@ -18,5 +18,7 @@ asyncio
1818
aiohttp
1919
pytest-asyncio
2020
azure-monitor-opentelemetry-exporter==1.0.0b25
21+
azure-monitor-opentelemetry
2122
opentelemetry-sdk
2223
en_spacy_pii_fast==0.0.0
24+
python-dotenv

setup.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77

88
def __version__():
9-
return "3.2.1b3"
9+
return "3.2.1b9"
1010

1111

1212
project_urls = {
@@ -42,6 +42,10 @@ def __version__():
4242
"pytesseract",
4343
"aiohttp",
4444
"pytest-asyncio",
45+
"python-dotenv",
46+
"azure-monitor-opentelemetry-exporter==1.0.0b25",
47+
"opentelemetry-sdk",
48+
"azure-monitor-opentelemetry"
4549
],
4650
python_requires=">=3.10",
4751
classifiers=[

tests/.datafog_env/bin/pip

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
# -*- coding: utf-8 -*-
33
import re
44
import sys
5-
65
from pip._internal.cli.main import main
7-
8-
if __name__ == "__main__":
9-
sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0])
6+
if __name__ == '__main__':
7+
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
108
sys.exit(main())

tests/.datafog_env/bin/pip3

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
# -*- coding: utf-8 -*-
33
import re
44
import sys
5-
65
from pip._internal.cli.main import main
7-
8-
if __name__ == "__main__":
9-
sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0])
6+
if __name__ == '__main__':
7+
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
108
sys.exit(main())

tests/.datafog_env/bin/pip3.11

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
# -*- coding: utf-8 -*-
33
import re
44
import sys
5-
65
from pip._internal.cli.main import main
7-
8-
if __name__ == "__main__":
9-
sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0])
6+
if __name__ == '__main__':
7+
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
108
sys.exit(main())

0 commit comments

Comments
 (0)