6
6
- TextPIIAnnotator: Class for annotating PII in text.
7
7
8
8
These classes provide high-level interfaces for image and text processing,
9
- including OCR, PII detection, and annotation .
9
+ including OCR, PII detection, annotation, and anonymization .
10
10
"""
11
11
12
12
import json
13
13
import logging
14
14
from typing import List
15
15
16
16
from .config import OperationType
17
+ from .models .anonymizer import Anonymizer , AnonymizerType , HashType
17
18
from .processing .text_processing .spacy_pii_annotator import SpacyPIIAnnotator
18
19
from .services .image_service import ImageService
19
20
from .services .spark_service import SparkService
@@ -27,26 +28,28 @@ class DataFog:
27
28
"""
28
29
Main class for running OCR and text processing pipelines.
29
30
30
- Handles image and text processing operations, including OCR and PII detection.
31
+ Handles image and text processing operations, including OCR, PII detection, and anonymization .
31
32
32
33
Attributes:
33
34
image_service: Service for image processing and OCR.
34
35
text_service: Service for text processing and annotation.
35
36
spark_service: Optional Spark service for distributed processing.
36
37
operations: List of operations to perform.
38
+ anonymizer: Anonymizer for PII redaction, replacement, or hashing.
37
39
"""
38
40
39
41
def __init__ (
40
42
self ,
41
43
image_service = ImageService (),
42
44
text_service = TextService (),
43
45
spark_service = None ,
44
- operations : List [OperationType ] = [OperationType .ANNOTATE_PII ],
46
+ operations : List [OperationType ] = [OperationType .SCAN ],
45
47
):
46
48
self .image_service = image_service
47
49
self .text_service = text_service
48
50
self .spark_service : SparkService = spark_service
49
51
self .operations : List [OperationType ] = operations
52
+ self .anonymizer = Anonymizer ()
50
53
self .logger = logging .getLogger (__name__ )
51
54
self .logger .info (
52
55
"Initializing DataFog class with the following services and operations:"
@@ -64,38 +67,22 @@ async def run_ocr_pipeline(self, image_urls: List[str]):
64
67
65
68
This method performs optical character recognition (OCR) on the images specified by the URLs.
66
69
If PII annotation is enabled, it also annotates the extracted text for personally identifiable information.
70
+ If redaction, replacement, or hashing is enabled, it applies the corresponding anonymization.
67
71
68
72
Args:
69
73
image_urls (List[str]): A list of URLs pointing to the images to be processed.
70
74
71
75
Returns:
72
- List: If PII annotation is enabled, returns a list of annotated text results.
73
- Otherwise, returns a list of extracted text from the images.
76
+ List: Processed text results based on the enabled operations.
74
77
75
78
Raises:
76
- Exception: Any error encountered during the OCR or annotation process.
77
-
78
- Note:
79
- The method logs various stages of the process, including completion of OCR extraction
80
- and text annotation, as well as any errors encountered.
79
+ Exception: Any error encountered during the OCR or text processing.
81
80
"""
82
81
try :
83
82
extracted_text = await self .image_service .ocr_extract (image_urls )
84
83
self .logger .info (f"OCR extraction completed for { len (image_urls )} images." )
85
- self .logger .debug (
86
- f"Total length of extracted text: { sum (len (text ) for text in extracted_text )} "
87
- )
88
84
89
- if OperationType .ANNOTATE_PII in self .operations :
90
- annotated_text = await self .text_service .batch_annotate_text_async (
91
- extracted_text
92
- )
93
- self .logger .info (
94
- f"Text annotation completed with { len (annotated_text )} annotations."
95
- )
96
- return annotated_text
97
- else :
98
- return extracted_text
85
+ return await self ._process_text (extracted_text )
99
86
except Exception as e :
100
87
logging .error (f"Error in run_ocr_pipeline: { str (e )} " )
101
88
return [f"Error: { str (e )} " ]
@@ -105,75 +92,126 @@ async def run_text_pipeline(self, str_list: List[str]):
105
92
Run the text pipeline asynchronously on a list of input text.
106
93
107
94
This method processes a list of text strings, potentially annotating them for personally
108
- identifiable information (PII) if the ANNOTATE_PII operation is enabled.
95
+ identifiable information (PII) and applying anonymization if enabled.
109
96
110
97
Args:
111
98
str_list (List[str]): A list of text strings to be processed.
112
99
113
100
Returns:
114
- List: If PII annotation is enabled, returns a list of annotated text results.
115
- Otherwise, returns the original list of text strings.
101
+ List: Processed text results based on the enabled operations.
116
102
117
103
Raises:
118
- Exception: Any error encountered during the text processing or annotation.
119
-
120
- Note:
121
- The method logs the start of the pipeline, the completion of text annotation if applicable,
122
- and any errors encountered during processing.
104
+ Exception: Any error encountered during the text processing.
123
105
"""
124
106
try :
125
107
self .logger .info (f"Starting text pipeline with { len (str_list )} texts." )
126
- if OperationType .ANNOTATE_PII in self .operations :
127
- annotated_text = await self .text_service .batch_annotate_text_async (
128
- str_list
129
- )
130
- self .logger .info (
131
- f"Text annotation completed with { len (annotated_text )} annotations."
132
- )
133
- return annotated_text
134
-
135
- self .logger .info ("No annotation operation found; returning original texts." )
136
- return str_list
108
+ return await self ._process_text (str_list )
137
109
except Exception as e :
138
110
self .logger .error (f"Error in run_text_pipeline: { str (e )} " )
139
111
raise
140
112
113
+ async def _process_text (self , text_list : List [str ]):
114
+ """
115
+ Internal method to process text based on enabled operations.
116
+ """
117
+ if OperationType .SCAN in self .operations :
118
+ annotated_text = await self .text_service .batch_annotate_text_async (
119
+ text_list
120
+ )
121
+ self .logger .info (
122
+ f"Text annotation completed with { len (annotated_text )} annotations."
123
+ )
124
+
125
+ if OperationType .REDACT in self .operations :
126
+ return [
127
+ self .anonymizer .anonymize (
128
+ text , annotations , AnonymizerType .REDACT
129
+ ).anonymized_text
130
+ for text , annotations in zip (text_list , annotated_text , strict = True )
131
+ ]
132
+ elif OperationType .REPLACE in self .operations :
133
+ return [
134
+ self .anonymizer .anonymize (
135
+ text , annotations , AnonymizerType .REPLACE
136
+ ).anonymized_text
137
+ for text , annotations in zip (text_list , annotated_text , strict = True )
138
+ ]
139
+ elif OperationType .HASH in self .operations :
140
+ return [
141
+ self .anonymizer .anonymize (
142
+ text , annotations , AnonymizerType .HASH
143
+ ).anonymized_text
144
+ for text , annotations in zip (text_list , annotated_text , strict = True )
145
+ ]
146
+ else :
147
+ return annotated_text
148
+
149
+ self .logger .info (
150
+ "No annotation or anonymization operation found; returning original texts."
151
+ )
152
+ return text_list
153
+
141
154
def run_text_pipeline_sync (self , str_list : List [str ]):
142
155
"""
143
156
Run the text pipeline synchronously on a list of input text.
144
157
145
158
This method processes a list of text strings in a synchronous manner, potentially
146
- annotating them for personally identifiable information (PII) if the ANNOTATE_PII
147
- operation is enabled.
159
+ annotating them for personally identifiable information (PII) and applying
160
+ anonymization if enabled.
148
161
149
162
Args:
150
163
str_list (List[str]): A list of text strings to be processed.
151
164
152
165
Returns:
153
- List: If PII annotation is enabled, returns a list of annotated text results.
154
- Otherwise, returns the original list of text strings.
166
+ List: Processed text results based on the enabled operations.
155
167
156
168
Raises:
157
- Exception: Any error encountered during the text processing or annotation.
158
-
159
- Note:
160
- The method logs the start of the pipeline, the completion of text annotation if applicable,
161
- and any errors encountered during processing. This synchronous version may be preferred
162
- for smaller datasets or when immediate results are required.
169
+ Exception: Any error encountered during the text processing.
163
170
"""
164
171
try :
165
172
self .logger .info (f"Starting text pipeline with { len (str_list )} texts." )
166
- if OperationType .ANNOTATE_PII in self .operations :
173
+ if OperationType .SCAN in self .operations :
167
174
annotated_text = self .text_service .batch_annotate_text_sync (str_list )
168
175
self .logger .info (
169
176
f"Text annotation completed with { len (annotated_text )} annotations."
170
177
)
171
- return annotated_text
172
178
173
- self .logger .info ("No annotation operation found; returning original texts." )
179
+ if OperationType .REDACT in self .operations :
180
+ return [
181
+ self .anonymizer .anonymize (
182
+ text , annotations , AnonymizerType .REDACT
183
+ ).anonymized_text
184
+ for text , annotations in zip (
185
+ str_list , annotated_text , strict = True
186
+ )
187
+ ]
188
+ elif OperationType .REPLACE in self .operations :
189
+ return [
190
+ self .anonymizer .anonymize (
191
+ text , annotations , AnonymizerType .REPLACE
192
+ ).anonymized_text
193
+ for text , annotations in zip (
194
+ str_list , annotated_text , strict = True
195
+ )
196
+ ]
197
+ elif OperationType .HASH in self .operations :
198
+ return [
199
+ self .anonymizer .anonymize (
200
+ text , annotations , AnonymizerType .HASH
201
+ ).anonymized_text
202
+ for text , annotations in zip (
203
+ str_list , annotated_text , strict = True
204
+ )
205
+ ]
206
+ else :
207
+ return annotated_text
208
+
209
+ self .logger .info (
210
+ "No annotation or anonymization operation found; returning original texts."
211
+ )
174
212
return str_list
175
213
except Exception as e :
176
- self .logger .error (f"Error in run_text_pipeline : { str (e )} " )
214
+ self .logger .error (f"Error in run_text_pipeline_sync : { str (e )} " )
177
215
raise
178
216
179
217
def _add_attributes (self , attributes : dict ):
@@ -194,7 +232,7 @@ def _add_attributes(self, attributes: dict):
194
232
using this method to avoid overwriting existing attributes.
195
233
"""
196
234
for key , value in attributes .items ():
197
- pass
235
+ setattr ( self , key , value )
198
236
199
237
200
238
class TextPIIAnnotator :
@@ -225,4 +263,5 @@ def run(self, text, output_path=None):
225
263
226
264
finally :
227
265
# Ensure Spark resources are released
228
- pass
266
+ if self .spark_processor :
267
+ self .spark_processor .stop ()
0 commit comments