|
18 | 18 | from ec.datastore import s3
|
19 | 19 | from distutils import util
|
20 | 20 | from ..resources.gleanerS3 import _pythonMinioAddress
|
| 21 | +from ec.reporting.report import generateReportStats |
21 | 22 |
|
22 | 23 | GLEANER_MINIO_ADDRESS = os.environ.get('GLEANERIO_MINIO_ADDRESS')
|
23 | 24 | GLEANER_MINIO_PORT = os.environ.get('GLEANERIO_MINIO_PORT')
|
24 | 25 | GLEANER_MINIO_USE_SSL = bool(util.strtobool(os.environ.get('GLEANERIO_MINIO_USE_SSL', 'true')))
|
25 | 26 | GLEANER_MINIO_SECRET_KEY = os.environ.get('GLEANERIO_MINIO_SECRET_KEY')
|
26 | 27 | GLEANER_MINIO_ACCESS_KEY = os.environ.get('GLEANERIO_MINIO_ACCESS_KEY')
|
27 | 28 | GLEANER_MINIO_BUCKET = os.environ.get('GLEANERIO_MINIO_BUCKET')
|
| 29 | +GLEANERIO_GRAPH_URL = os.environ.get('GLEANERIO_GRAPH_URL') |
| 30 | +GLEANERIO_GRAPH_SUMMARY_NAMESPACE = os.environ.get('GLEANERIO_GRAPH_SUMMARY_NAMESPACE') |
| 31 | +GLEANERIO_CSV_CONFIG_URL = os.environ.get('GLEANERIO_CSV_CONFIG_URL') |
28 | 32 |
|
29 | 33 | MINIO_OPTIONS={"secure":GLEANER_MINIO_USE_SSL
|
30 | 34 |
|
31 | 35 | ,"access_key": GLEANER_MINIO_ACCESS_KEY
|
32 | 36 | ,"secret_key": GLEANER_MINIO_SECRET_KEY
|
33 | 37 | }
|
| 38 | + |
| 39 | +def _graphSummaryEndpoint(community): |
| 40 | + if community == "all": |
| 41 | + url = f"{GLEANERIO_GRAPH_URL}/namespace/{GLEANERIO_GRAPH_SUMMARY_NAMESPACE}/sparql" |
| 42 | + else: |
| 43 | + url = f"{GLEANERIO_GRAPH_URL}/namespace/{community}_summary/sparql" |
| 44 | + return url |
34 | 45 | @asset(group_name="community",key_prefix="task",
|
35 | 46 | required_resource_keys={"triplestore"})
|
36 | 47 | def task_tenant_sources(context) ->Any:
|
37 | 48 | s3_resource = context.resources.triplestore.s3
|
38 |
| - |
39 | 49 | t=s3_resource.getTennatInfo()
|
40 | 50 | tenants = t['tenant']
|
41 | 51 | listTenants = map (lambda a: {a['community']}, tenants)
|
@@ -144,11 +154,9 @@ def loadstatsCommunity(context, task_tenant_sources) -> str:
|
144 | 154 | ts = task_tenant_sources
|
145 | 155 | t =list(filter ( lambda a: a['community']== community_code, ts["tenant"] ))
|
146 | 156 | s = t[0]["sources"]
|
147 |
| - for source in s: |
148 | 157 |
|
| 158 | + for source in s: |
149 | 159 | dirs = s3Minio.listPath(GLEANER_MINIO_BUCKET,path=f"{REPORT_PATH}{source}/",recursive=False )
|
150 |
| - |
151 |
| - |
152 | 160 | for d in dirs:
|
153 | 161 | latestpath = f"{REPORT_PATH}{source}/latest/"
|
154 | 162 | if (d.object_name.casefold() == latestpath.casefold()) or (d.is_dir == False):
|
@@ -210,4 +218,14 @@ def loadstatsCommunity(context, task_tenant_sources) -> str:
|
210 | 218 | # s3.upload_fileobj(f, s3.GLEANERIO_MINIO_BUCKET, f"data/all/all_stats.csv")
|
211 | 219 | context.log.info(f"all_stats.csv uploaded using ec.datastore.putReportFile {s3_config.GLEANERIO_MINIO_BUCKET}tenant/{community_code} ")
|
212 | 220 | #return df_csv # now checking return types
|
| 221 | + |
| 222 | + context.log.info(f"GLEANERIO_CSV_CONFIG_URL {GLEANERIO_CSV_CONFIG_URL} ") |
| 223 | + |
| 224 | + report = generateReportStats(GLEANERIO_CSV_CONFIG_URL, s3_config.GLEANERIO_MINIO_BUCKET, s3Minio, |
| 225 | + _graphSummaryEndpoint(community_code), community_code) |
| 226 | + bucket, object = s3Minio.putReportFile(s3_config.GLEANERIO_MINIO_BUCKET, f"tenant/{community_code}", |
| 227 | + f"report_stats.json", report) |
| 228 | + context.log.info( |
| 229 | + f"report_stats.json uploaded using ec.datastore.putReportFile {s3_config.GLEANERIO_MINIO_BUCKET}tenant/{community_code} ") |
| 230 | + |
213 | 231 | return df_csv
|
0 commit comments