|
1 |
| -from presidio_analyzer import AnalyzerEngine, RecognizerRegistry |
| 1 | +import logging |
| 2 | +from typing import List, Optional |
| 3 | + |
| 4 | +from presidio_analyzer import ( |
| 5 | + AnalyzerEngine, |
| 6 | + Pattern, |
| 7 | + PatternRecognizer, |
| 8 | + RecognizerRegistry, |
| 9 | +) |
2 | 10 | from presidio_analyzer.nlp_engine import NlpEngineProvider
|
3 | 11 |
|
4 | 12 | from .analyzer import CustomSpacyRecognizer
|
5 | 13 |
|
| 14 | +logger = logging.getLogger("presidio-engine-init").setLevel(logging.ERROR) |
| 15 | + |
6 | 16 |
|
7 | 17 | # Helper methods
|
| 18 | +def create_ad_hoc_deny_list_recognizer( |
| 19 | + deny_list=Optional[List[str]], |
| 20 | +) -> Optional[PatternRecognizer]: |
| 21 | + if not deny_list: |
| 22 | + return None |
| 23 | + |
| 24 | + deny_list_recognizer = PatternRecognizer( |
| 25 | + supported_entity="CUSTOM_PII", deny_list=deny_list |
| 26 | + ) |
| 27 | + return deny_list_recognizer |
| 28 | + |
| 29 | + |
| 30 | +def create_ad_hoc_regex_recognizer( |
| 31 | + regex: str, entity_type: str, score: float, context: Optional[List[str]] = None |
| 32 | +) -> Optional[PatternRecognizer]: |
| 33 | + if not regex: |
| 34 | + return None |
| 35 | + pattern = Pattern(name="Regex Pattern", regex=regex, score=score) |
| 36 | + regex_recognizer = PatternRecognizer( |
| 37 | + supported_entity=entity_type, patterns=[pattern], context=context |
| 38 | + ) |
| 39 | + return regex_recognizer |
| 40 | + |
| 41 | + |
8 | 42 | def analyzer_engine():
|
9 | 43 | """Return AnalyzerEngine."""
|
10 | 44 |
|
11 | 45 | spacy_recognizer = CustomSpacyRecognizer()
|
12 | 46 | configuration = {
|
13 | 47 | "nlp_engine_name": "spacy",
|
14 | 48 | "models": [{"lang_code": "en", "model_name": "en_spacy_pii_fast"}],
|
| 49 | + "ner_model_configuration": { |
| 50 | + "model_to_presidio_entity_mapping": { |
| 51 | + "PER": "PERSON", |
| 52 | + "PERSON": "PERSON", |
| 53 | + "NORP": "NRP", |
| 54 | + "FAC": "FACILITY", |
| 55 | + "LOC": "LOCATION", |
| 56 | + "GPE": "LOCATION", |
| 57 | + "LOCATION": "LOCATION", |
| 58 | + "ORG": "ORGANIZATION", |
| 59 | + "ORGANIZATION": "ORGANIZATION", |
| 60 | + "DATE": "DATE_TIME", |
| 61 | + "TIME": "DATE_TIME", |
| 62 | + }, |
| 63 | + "low_confidence_score_multiplier": 0.4, |
| 64 | + "low_score_entity_names": ["ORG", "ORGANIZATION"], |
| 65 | + "labels_to_ignore": ["DATE_TIME"], |
| 66 | + }, |
15 | 67 | }
|
16 | 68 |
|
17 | 69 | # Create NLP engine based on configuration
|
@@ -59,6 +111,23 @@ def scan(text, **kwargs):
|
59 | 111 | kwargs.setdefault("language", "en")
|
60 | 112 | kwargs.setdefault("score_threshold", 0.35)
|
61 | 113 | kwargs.setdefault("nlp_artifacts", None)
|
| 114 | + kwargs.setdefault("entities", []) |
| 115 | + kwargs.setdefault("allow_list", []) |
| 116 | + kwargs.setdefault("deny_list", []) |
| 117 | + |
| 118 | + """Analyze input using Analyzer engine and input arguments (kwargs).""" |
| 119 | + if "entities" not in kwargs or "All" in kwargs["entities"]: |
| 120 | + kwargs["entities"] = None |
| 121 | + |
| 122 | + if "deny_list" in kwargs and kwargs["deny_list"] is not None: |
| 123 | + ad_hoc_recognizer = create_ad_hoc_deny_list_recognizer(kwargs["deny_list"]) |
| 124 | + kwargs["ad_hoc_recognizers"] = [ad_hoc_recognizer] if ad_hoc_recognizer else [] |
| 125 | + del kwargs["deny_list"] |
| 126 | + |
| 127 | + if "regex_params" in kwargs and len(kwargs["regex_params"]) > 0: |
| 128 | + ad_hoc_recognizer = create_ad_hoc_regex_recognizer(*kwargs["regex_params"]) |
| 129 | + kwargs["ad_hoc_recognizers"] = [ad_hoc_recognizer] if ad_hoc_recognizer else [] |
| 130 | + del kwargs["regex_params"] |
62 | 131 |
|
63 | 132 | # init analyzer instance
|
64 | 133 | analyzer = analyzer_engine()
|
|
0 commit comments