Skip to content

Commit 72779d9

Browse files
authored
Merge pull request #54 from CausalInferenceLab/Feature/53-datahub-glossary-연동-구현
Feature/53 datahub glossary 연동 구현
2 parents dc1b65f + d9d1ac1 commit 72779d9

File tree

3 files changed

+3909
-0
lines changed

3 files changed

+3909
-0
lines changed

data_utils/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# data_utils 패키지 초기화 파일

data_utils/datahub_source.py

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66
from datahub.metadata.schema_classes import UpstreamLineageClass
77
from collections import defaultdict
88
import requests
9+
from data_utils.queries import (
10+
ROOT_GLOSSARY_NODES_QUERY,
11+
GLOSSARY_NODE_QUERY,
12+
LIST_QUERIES_QUERY,
13+
)
914

1015

1116
class DatahubMetadataFetcher:
@@ -292,3 +297,287 @@ def process_lineage(direction):
292297
)
293298

294299
return metadata
300+
301+
def get_root_glossary_nodes(self):
302+
"""
303+
DataHub에서 루트 용어집 노드를 가져오는 함수
304+
305+
Returns:
306+
dict: 루트 용어집 노드 정보
307+
"""
308+
# GraphQL 요청 보내기
309+
headers = {"Content-Type": "application/json"}
310+
response = requests.post(
311+
f"{self.gms_server}/api/graphql",
312+
json={"query": ROOT_GLOSSARY_NODES_QUERY},
313+
headers=headers,
314+
)
315+
316+
# 결과 반환
317+
if response.status_code == 200:
318+
return response.json()
319+
else:
320+
return {
321+
"error": True,
322+
"status_code": response.status_code,
323+
"message": response.text,
324+
}
325+
326+
def get_glossary_node_by_urn(self, urn):
327+
"""
328+
DataHub에서 특정 URN의 용어집 노드 및 그 자식 항목을 가져오는 함수
329+
330+
Args:
331+
urn (str): 용어집 노드의 URN
332+
333+
Returns:
334+
dict: 용어집 노드 정보와 자식 항목
335+
"""
336+
# GraphQL 요청 보내기
337+
headers = {"Content-Type": "application/json"}
338+
response = requests.post(
339+
f"{self.gms_server}/api/graphql",
340+
json={"query": GLOSSARY_NODE_QUERY, "variables": {"urn": urn}},
341+
headers=headers,
342+
)
343+
344+
# 결과 반환
345+
if response.status_code == 200:
346+
return response.json()
347+
else:
348+
return {
349+
"error": True,
350+
"status_code": response.status_code,
351+
"message": response.text,
352+
}
353+
354+
def get_node_basic_info(self, node, index):
355+
"""
356+
용어집 노드의 기본 정보를 딕셔너리로 반환하는 함수
357+
358+
Args:
359+
node (dict): 용어집 노드 정보
360+
index (int): 노드의 인덱스
361+
362+
Returns:
363+
dict: 노드의 기본 정보
364+
"""
365+
result = {"index": index, "name": node["properties"]["name"]}
366+
367+
if node["properties"] and node["properties"].get("description"):
368+
result["description"] = node["properties"]["description"]
369+
370+
# 자식 노드/용어 관계 정보 수 추가
371+
if "children" in node and node["children"]["total"] > 0:
372+
result["child_count"] = node["children"]["total"]
373+
374+
return result
375+
376+
def get_child_entity_info(self, entity, index):
377+
"""
378+
자식 엔티티(용어 또는 노드)의 정보를 딕셔너리로 반환하는 함수
379+
380+
Args:
381+
entity (dict): 자식 엔티티 정보
382+
index (int): 엔티티의 인덱스
383+
384+
Returns:
385+
dict: 엔티티 정보
386+
"""
387+
entity_type = entity["type"]
388+
result = {"index": index, "type": entity_type}
389+
390+
if entity_type == "GLOSSARY_TERM":
391+
if "properties" in entity and entity["properties"]:
392+
result["name"] = entity["properties"].get("name", "N/A")
393+
394+
if (
395+
"description" in entity["properties"]
396+
and entity["properties"]["description"]
397+
):
398+
result["description"] = entity["properties"]["description"]
399+
400+
elif entity_type == "GLOSSARY_NODE":
401+
if "properties" in entity and entity["properties"]:
402+
result["name"] = entity["properties"].get("name", "N/A")
403+
404+
return result
405+
406+
def process_node_details(self, node):
407+
"""
408+
노드의 상세 정보를 처리하고 딕셔너리로 반환하는 함수
409+
410+
Args:
411+
node (dict): 용어집 노드 정보
412+
413+
Returns:
414+
dict: 노드의 상세 정보
415+
"""
416+
node_urn = node["urn"]
417+
detailed_node = self.get_glossary_node_by_urn(node_urn)
418+
419+
result = {"name": node["properties"]["name"], "children": []}
420+
421+
if (
422+
detailed_node
423+
and "data" in detailed_node
424+
and "glossaryNode" in detailed_node["data"]
425+
):
426+
node_detail = detailed_node["data"]["glossaryNode"]
427+
428+
# 자식 항목 정보 추출
429+
if "children" in node_detail and node_detail["children"]["total"] > 0:
430+
relationships = node_detail["children"]["relationships"]
431+
432+
for j, rel in enumerate(relationships, 1):
433+
entity = rel["entity"]
434+
result["children"].append(self.get_child_entity_info(entity, j))
435+
436+
return result
437+
438+
def process_glossary_nodes(self, result):
439+
"""
440+
용어집 노드 결과를 처리하고 딕셔너리로 반환하는 함수
441+
442+
Args:
443+
result (dict): API 응답 결과
444+
445+
Returns:
446+
dict: 처리된 용어집 노드 데이터
447+
"""
448+
if "error" in result:
449+
return result
450+
451+
processed_result = {"total_nodes": 0, "nodes": []}
452+
453+
# 노드 목록 추출
454+
nodes = result["data"]["getRootGlossaryNodes"]["nodes"]
455+
processed_result["total_nodes"] = len(nodes)
456+
457+
for i, node in enumerate(nodes, 1):
458+
node_info = self.get_node_basic_info(node, i)
459+
460+
# 자식 노드가 있으면 상세 정보 처리
461+
if "children" in node and node["children"]["total"] > 0:
462+
node_details = self.process_node_details(node)
463+
node_info["details"] = node_details
464+
465+
processed_result["nodes"].append(node_info)
466+
467+
return processed_result
468+
469+
def get_glossary_data(self):
470+
"""
471+
DataHub에서 전체 용어집 데이터를 가져와 처리하는 함수
472+
473+
Returns:
474+
dict: 처리된 용어집 데이터
475+
"""
476+
# DataHub 서버에 연결하여 용어집 노드 가져오기
477+
result = self.get_root_glossary_nodes()
478+
479+
# 결과 처리
480+
if result:
481+
try:
482+
return self.process_glossary_nodes(result)
483+
except KeyError as e:
484+
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
485+
else:
486+
return {"error": True, "message": "용어집 노드를 가져오지 못했습니다."}
487+
488+
def get_queries(self, start=0, count=10, query="*", filters=None):
489+
"""
490+
DataHub에서 쿼리 목록을 가져오는 함수
491+
492+
Args:
493+
start (int): 시작 인덱스 (기본값=0)
494+
count (int): 반환할 쿼리 수 (기본값=10)
495+
query (str): 필터링에 사용할 쿼리 문자열 (기본값="*")
496+
filters (list): 추가 필터 (기본값=None)
497+
498+
Returns:
499+
dict: 쿼리 목록 정보
500+
"""
501+
# GraphQL 요청용 입력 변수 준비
502+
input_params = {"start": start, "count": count, "query": query}
503+
504+
if filters:
505+
input_params["filters"] = filters
506+
507+
variables = {"input": input_params}
508+
509+
# GraphQL 요청 보내기
510+
headers = {"Content-Type": "application/json"}
511+
response = requests.post(
512+
f"{self.gms_server}/api/graphql",
513+
json={"query": LIST_QUERIES_QUERY, "variables": variables},
514+
headers=headers,
515+
)
516+
517+
# 결과 반환
518+
if response.status_code == 200:
519+
return response.json()
520+
else:
521+
return {
522+
"error": True,
523+
"status_code": response.status_code,
524+
"message": response.text,
525+
}
526+
527+
def process_queries(self, result):
528+
"""
529+
쿼리 목록 결과를 처리하고 간소화된 형태로 반환하는 함수
530+
531+
Args:
532+
result (dict): API 응답 결과
533+
534+
Returns:
535+
dict: 처리된 쿼리 목록 데이터 (urn, name, description, statement만 포함)
536+
"""
537+
if "error" in result:
538+
return result
539+
540+
processed_result = {"total_queries": 0, "count": 0, "start": 0, "queries": []}
541+
542+
if "data" in result and "listQueries" in result["data"]:
543+
list_queries = result["data"]["listQueries"]
544+
processed_result["total_queries"] = list_queries.get("total", 0)
545+
processed_result["count"] = list_queries.get("count", 0)
546+
processed_result["start"] = list_queries.get("start", 0)
547+
548+
for query in list_queries.get("queries", []):
549+
query_info = {"urn": query.get("urn")}
550+
551+
props = query.get("properties", {})
552+
query_info["name"] = props.get("name")
553+
query_info["description"] = props.get("description")
554+
query_info["statement"] = props.get("statement", {}).get("value")
555+
556+
processed_result["queries"].append(query_info)
557+
558+
return processed_result
559+
560+
def get_query_data(self, start=0, count=10, query="*", filters=None):
561+
"""
562+
DataHub에서 쿼리 목록을 가져와 처리하는 함수
563+
564+
Args:
565+
start (int): 시작 인덱스 (기본값=0)
566+
count (int): 반환할 쿼리 수 (기본값=10)
567+
query (str): 필터링에 사용할 쿼리 문자열 (기본값="*")
568+
filters (list): 추가 필터 (기본값=None)
569+
570+
Returns:
571+
dict: 처리된 쿼리 목록 데이터
572+
"""
573+
# DataHub 서버에 연결하여 쿼리 목록 가져오기
574+
result = self.get_queries(start, count, query, filters)
575+
576+
# 결과 처리
577+
if result:
578+
try:
579+
return self.process_queries(result)
580+
except KeyError as e:
581+
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
582+
else:
583+
return {"error": True, "message": "쿼리 목록을 가져오지 못했습니다."}

0 commit comments

Comments
 (0)