Skip to content

Commit f60c891

Browse files
committed
.
1 parent f063196 commit f60c891

File tree

7 files changed

+341
-64
lines changed

7 files changed

+341
-64
lines changed

data_dispatcher/db.py

Lines changed: 107 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -183,18 +183,20 @@ def as_jsonable(self):
183183
)
184184

185185
class HasLogRecord(object):
186-
187-
def __init__(self, log_table, parent_id_columns):
188-
self.LogTable = log_table
189-
self.IDColumns = parent_id_columns
190-
self.IDColumnsText = ",".join(parent_id_columns)
186+
187+
#
188+
# uses class attributes:
189+
#
190+
# LogTable - name of the table to store the log
191+
# LogIDColumns - list of columns in the log table identifying the parent
192+
#
191193

192194
def add_log(self, type, data=None, **kwargs):
193195
#print("add_log:", type, data, kwargs)
194196
c = self.DB.cursor()
195197
data = (data or {}).copy()
196198
data.update(kwargs)
197-
parent_pk_columns = self.IDColumnsText
199+
parent_pk_columns = ",".join(self.LogIDColumns)
198200
parent_pk_values = ",".join([f"'{v}'" for v in self.pk()])
199201
c.execute(f"""
200202
begin;
@@ -203,26 +205,55 @@ def add_log(self, type, data=None, **kwargs):
203205
commit
204206
""", (type, json.dumps(data)))
205207

208+
@sclassmethod
209+
def add_log_bulk(cls, db, records):
210+
"""
211+
records: list of tuples:
212+
(
213+
(
214+
id_column1_value,
215+
id_column2_value, ...
216+
),
217+
type,
218+
{ data }
219+
)
220+
"""
221+
csv = []
222+
for id_values, type, data in records:
223+
row = '\t'.join([str(v) for v in id_values] + [type, json.dumps(data)])
224+
csv.append(row)
225+
csv = io.StringIO("\n".join(csv))
226+
227+
table = cls.LogTable
228+
columns = cls.LogIDColumns + ["type", "data"]
229+
c = db.cursor()
230+
try:
231+
c.execute("begin")
232+
c.copy_from(csv, table, columns=columns)
233+
c.execute("commit")
234+
except:
235+
c.execute("rollback")
236+
raise
237+
206238
def get_log(self, type=None, since=None, reversed=False):
207-
parent_pk_columns = self.IDColumnsText
208-
parent_pk_values = ",".join(["'{v}'" for v in self.pk()])
239+
parent_pk_columns = self.LogIDColumns
240+
parent_pk_values = self.pk()
209241
wheres = [f"{c} = '{v}'" for c, v in zip(parent_pk_columns, parent_pk_values)]
210242
if isinstance(since, (float, int)):
211243
since = datetime.utcfromtimestamp(since).replace(tzinfo=timezone.utc)
212244
wheres.append(f"t >= {since}")
213245
if type is not None:
214246
wheres.append(f"type = '{type}'")
215-
wheres = "" if not wheres else " and " + " and ".join(wheres)
247+
wheres = " and ".join(wheres)
216248
desc = "desc" if reversed else ""
217249
sql = f"""
218-
select type, t, message from {self.LogTable}
219-
{wheres}
250+
select type, t, data from {self.LogTable}
251+
where {wheres}
220252
order by t {desc}
221253
"""
222254
c = self.DB.cursor()
223255
c.execute(sql)
224-
return (DBLogRecord(type, t, message) for type, t, message in cursor_iterator())
225-
256+
return (DBLogRecord(type, t, message) for type, t, message in cursor_iterator(c))
226257

227258
class DBProject(DBObject, HasLogRecord):
228259

@@ -232,8 +263,10 @@ class DBProject(DBObject, HasLogRecord):
232263
Table = "projects"
233264
PK = ["id"]
234265

266+
LogIDColumns = ["project_id"]
267+
LogTable = "project_log"
268+
235269
def __init__(self, db, id, owner=None, created_timestamp=None, end_timestamp=None, state=None, retry_count=0, attributes={}):
236-
HasLogRecord.__init__(self, "project_log", ["project_id"])
237270
self.DB = db
238271
self.ID = id
239272
self.Owner = owner
@@ -481,8 +514,10 @@ class DBFile(DBObject, HasLogRecord):
481514
PK = ["namespace", "name"]
482515
Table = "files"
483516

517+
LogIDColumns = ["namespace", "name"]
518+
LogTable = "file_log"
519+
484520
def __init__(self, db, namespace, name):
485-
HasLogRecord.__init__(self, "file_log", ["namespace", "name"])
486521
self.DB = db
487522
self.Namespace = namespace
488523
self.Name = name
@@ -511,7 +546,10 @@ def replicas(self):
511546
def create_replica(self, rse, path, url, preference=0, available=False):
512547
DBReplica.create(self.DB, self.Namespace, self.Name, rse, path, url, preference=preference, available=available)
513548
self.Replicas = None # force re-load from the DB
514-
549+
self.add_log("found", rse=rse, path=path, url=url)
550+
if available:
551+
self.add_log("available", rse=rse)
552+
515553
def get_replica(self, rse):
516554
return self.replicas().get(rse)
517555

@@ -659,7 +697,7 @@ def create(db, namespace, name, rse, path, url, preference=0, available=False, e
659697
except:
660698
c.execute("rollback")
661699
raise
662-
700+
663701
return DBReplica.get(db, namespace, name, rse)
664702

665703
def save(self):
@@ -697,6 +735,19 @@ def remove_bulk(db, rse, dids):
697735
c.execute("rollback")
698736
raise
699737

738+
log_records = (
739+
(
740+
did.split(":", 1),
741+
"removed",
742+
{
743+
"rse": rse,
744+
}
745+
)
746+
for did in dids
747+
)
748+
749+
DBFile.add_log_bulk(db, log_records)
750+
700751
@staticmethod
701752
def create_bulk(db, rse, preference, replicas):
702753
# replicas: {(namespace, name) -> {"path":.., "url":..}}
@@ -705,31 +756,25 @@ def create_bulk(db, rse, preference, replicas):
705756
csv = ['%s\t%s\t%s\t%s\t%s\t%s' % (namespace, name, rse, info["path"], info["url"], preference)
706757
for (namespace, name), info in replicas.items()]
707758
#print("DBReplica.create_bulk: csv:", csv)
708-
data = io.StringIO("\n".join(csv))
759+
csv = io.StringIO("\n".join(csv))
709760
table = DBReplica.Table
710761
columns = DBReplica.columns(as_text=True)
762+
711763
c = db.cursor()
712764
try:
713765
t = int(time.time()*1000)
714766
temp_table = f"file_replicas_temp_{t}"
715767
c.execute("begin")
716768
c.execute(f"create temp table {temp_table} (ns text, n text, r text, p text, u text, pr int)")
717-
c.copy_from(data, temp_table)
769+
c.copy_from(csv, temp_table)
770+
csv = None # to release memory
718771
c.execute(f"""
719772
insert into {table}({columns})
720773
select t.ns, t.n, t.r, t.p, t.u, t.pr, false from {temp_table} t
721774
on conflict (namespace, name, rse)
722775
do nothing;
723776
""")
724777

725-
if False:
726-
c.execute(f"""
727-
insert into file_log (namespace, name, type, data)
728-
select t.ns, t.n, 'replica added', ('{"rse":"' || t.r || '", "url":"' || t.u || '", "preference":' || t.pr::text || '}')::jsonb
729-
from {temp_table} t
730-
on conflict (namespace, name, t) do nothing
731-
""")
732-
733778
c.execute(f"""
734779
drop table {temp_table};
735780
commit
@@ -739,6 +784,21 @@ def create_bulk(db, rse, preference, replicas):
739784
c.execute("rollback")
740785
raise
741786

787+
log_records = (
788+
(
789+
(namespace, name),
790+
"found",
791+
{
792+
"url": info["url"],
793+
"rse": rse,
794+
"path": info["path"]
795+
}
796+
)
797+
for (namespace, name), info in replicas.items()
798+
)
799+
800+
DBFile.add_log_bulk(db, log_records)
801+
742802
@staticmethod
743803
def update_availability_bulk(db, available, rse, dids):
744804
# dids is list of dids: ["namespace:name", ...]
@@ -761,6 +821,21 @@ def update_availability_bulk(db, available, rse, dids):
761821
c.execute("rollback")
762822
raise
763823

824+
event = "available" if available else "unavailable"
825+
log_records = (
826+
(
827+
(namespace, name),
828+
event,
829+
{
830+
"rse": rse
831+
}
832+
)
833+
for (namespace, name) in undids
834+
)
835+
836+
DBFile.add_log_bulk(db, log_records)
837+
838+
764839
class DBFileHandle(DBObject, HasLogRecord):
765840

766841
Columns = ["project_id", "namespace", "name", "state", "worker_id", "attempts", "attributes"]
@@ -780,8 +855,10 @@ class DBFileHandle(DBObject, HasLogRecord):
780855
]
781856

782857

858+
LogIDColumns = ["project_id", "namespace", "name"]
859+
LogTable = "file_handle_log"
860+
783861
def __init__(self, db, project_id, namespace, name, state=None, worker_id=None, attempts=0, attributes={}):
784-
HasLogRecord.__init__(self, "file_handle_log", ["project_id", "namespace", "name"])
785862
self.DB = db
786863
self.ProjectID = project_id
787864
self.Namespace = namespace
@@ -1032,13 +1109,13 @@ def is_active(self):
10321109

10331110
def done(self):
10341111
self.State = "done"
1112+
self.add_log("done", worker=self.WorkerID)
10351113
self.WorkerID = None
10361114
self.save()
1037-
self.add_log("done", worker=self.WorkerID)
10381115

10391116
def failed(self, retry=True):
10401117
self.State = self.ReadyState if retry else "failed"
1041-
self.add_log("failed", worker=self.WorkerID or None, retry=retry)
1118+
self.add_log("failed", worker=self.WorkerID or None, final=not retry)
10421119
self.WorkerID = None
10431120
self.save()
10441121

docs/ui.rst

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,12 @@ Data Dispatcher provides a way to pass some arbitrary metadata about the project
116116
The metadata is attached to the project and/or project files at the time of the project creation. Project and file metadata can be any JSON dictionary.
117117
If the project is created using a MetaCat query, Data Dispatcher can copy some portions of file metadata from MetaCat to avoid unnecessary
118118
querying MetaCat at the run time.
119-
When the worker issues ``dd next -j ...`` command, the output includes the project and the metadata as part of the JSON output.
119+
When the worker asks for the next file to process, the Data Dispatcher responds with the file information, which includes the project and the
120+
file metadata.
120121
121-
Note that project file attributes defined at the project cteation time do not get stored in MetaCat. Also, because file
122+
Note that the project file attributes defined at the project creation time do not get stored in MetaCat. Also, because file
122123
attributes are associated with project file handles instead of files, if two projects include the same
123-
file, they can define project file attributes independently without interfering with each other.
124+
file, they can define file attributes independently without interfering with each other.
124125
125126
There are several ways to specify project level metadata attributes:
126127
@@ -218,10 +219,10 @@ Viewing projects
218219
-a - show project attributes only
219220
-r - show replicas information
220221
-j - show as JSON
221-
-f [active|ready|available|all|reserved|failed|done] - list files (namespace:name) only
222+
-f [active|initial|available|all|reserved|failed|done] - list files (namespace:name) only
222223
all - all files, including done and failed
223224
active - all except done and failed
224-
ready - ready files only
225+
initial - in initial state
225226
available - available files only
226227
reserved - reserved files only
227228
failed - failed files only

web_server/base.html

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -211,31 +211,41 @@
211211

212212

213213
td.failed {
214-
background-color: #F44;
214+
background-color: #FCC;
215215
}
216216

217217
td.done {
218-
background-color: #88F;
218+
background-color: #CFC;
219219
}
220220

221221
td.initial {
222-
background-color: #DDD;
222+
background-color: #EEE;
223223
}
224224

225225
td.reserved {
226-
background-color: #FD8;
226+
background-color: #FFC;
227227
}
228228

229229
td.found {
230-
background-color: #ACA;
230+
background-color: #CCF;
231231
}
232232

233233
td.available {
234-
background-color: #8F8;
234+
background-color: #CFF; // #AFA
235235
}
236236

237237
td.unavailable {
238-
background-color: #FAA;
238+
background-color: #FFC;
239+
}
240+
241+
a, a:visited {
242+
color: #228;
243+
}
244+
245+
a.button, a.button:visited {
246+
color: gray;
247+
font-weight: bold;
248+
padding: 3px 5px;
239249
}
240250

241251
</style>
@@ -267,14 +277,14 @@ <h1>{{GLOBAL_SiteTitle}}</h1>
267277
<div id="main_menu">
268278
<table class="placement link_menu" width="100%">
269279
<tr>
270-
<td><a href="{{GLOBAL_AppTopPath}}/P/projects">projects</a></td>
271-
<td><a href="{{GLOBAL_AppTopPath}}/R/index">RSEs</a></td>
280+
<td><a class=button href="{{GLOBAL_AppTopPath}}/P/projects">projects</a></td>
281+
<td><a class=button href="{{GLOBAL_AppTopPath}}/R/index">RSEs</a></td>
272282
<td style="width:100%"></td>
273283
<td>
274284
{% if GLOBAL_User %}
275-
<span style="color:#AAA">logged in as {{GLOBAL_User.Name or ""}}&nbsp;<a href="{{GLOBAL_AppTopPath}}/U/user?username={{GLOBAL_User.Username}}">{{GLOBAL_User.Username}}</a>&nbsp;</span><a href="{{GLOBAL_AppTopPath}}/A/logout?redirect={{GLOBAL_AppTopPath}}/index">log out</a>
285+
<span style="color:#AAA">logged in as {{GLOBAL_User.Name or ""}}&nbsp;<a href="{{GLOBAL_AppTopPath}}/U/user?username={{GLOBAL_User.Username}}">{{GLOBAL_User.Username}}</a>&nbsp;</span><a class=button href="{{GLOBAL_AppTopPath}}/A/logout?redirect={{GLOBAL_AppTopPath}}/index">log out</a>
276286
{% else %}
277-
<a href="{{GLOBAL_AppTopPath}}/A/login?redirect={{GLOBAL_AppTopPath}}/index">log in</a>
287+
<a class=button href="{{GLOBAL_AppTopPath}}/A/login?redirect={{GLOBAL_AppTopPath}}/index">log in</a>
278288
{% endif %}
279289
</td>
280290
</tr>

web_server/gui_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from webpie import WPApp, WPHandler
2-
from data_dispatcher.db import DBProject, DBFileHandle, DBRSE
2+
from data_dispatcher.db import DBProject, DBFileHandle, DBRSE, DBUser
33
from data_dispatcher import Version
44
from metacat.auth.server import AuthHandler, BaseHandler, BaseApp
55
import urllib, os, yaml
6-
from urllib.parse import quote, unquote
6+
from urllib.parse import quote, unquote, unquote_plus
77
from wsdbtools import ConnectionPool
88

99
class UsersHandler(BaseHandler):
@@ -152,7 +152,7 @@ def handle(self, request, relpath, project_id=None, namespace=None, name=None, *
152152
handle = DBFileHandle.get(db, int(project_id), namespace, name)
153153
if handle is None:
154154
self.redirect(f"./project?project_id={project_id}&error=Handle+not+found")
155-
return self.render_to_response("handle.html", project_id=project_id, handle=handle)
155+
return self.render_to_response("handle.html", project_id=project_id, handle=handle, handle_log = list(handle.get_log(reversed=True)))
156156

157157

158158
class RSEHandler(BaseHandler):

0 commit comments

Comments
 (0)