Skip to content

Commit 348e1e3

Browse files
committed
.
1 parent 429ed9b commit 348e1e3

File tree

8 files changed

+208
-78
lines changed

8 files changed

+208
-78
lines changed

metacat/common/dbbase.py

+28-34
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from metacat.util import fetch_generator
2-
import json
1+
from metacat.util import fetch_generator, chunked
2+
import json, io, csv
33

44
def transactioned(method):
55
def decorated(first, *params, transaction=None, **args):
66

77
if transaction is not None:
88
return method(first, *params, transaction=transaction, **args)
9-
9+
1010
if isinstance(first, HasDB):
1111
transaction = first.DB.transaction()
1212
elif isinstance(first, type):
@@ -20,37 +20,31 @@ def decorated(first, *params, transaction=None, **args):
2020

2121
return decorated
2222

23-
def insert_many(transaction, table, column_names, tuples, copy_threshold = 100):
24-
25-
# if the tuples list or iterable is short enough, do it as multiple inserts
26-
tuples_lst, tuples = make_list_if_short(tuples, copy_threshold)
27-
if tuples_lst is not None and len(tuples_lst) <= copy_threshold:
28-
columns = ",". join(column_names)
29-
placeholders = ",".join(["%s"]*len(column_names))
30-
try:
31-
transaction.executemany(f"""
32-
insert into parent_child({columns}) values({placeholders})
33-
""", tuples_lst)
34-
if do_commit: cursor.execute("commit")
35-
except Exception as e:
36-
cursor.execute("rollback")
37-
raise
38-
else:
39-
40-
csv_file = io.StringIO()
41-
writer = csv.writer(csv_file, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
42-
43-
for tup in tuples:
44-
assert len(tup) == len(column_names)
45-
tup = ["\\N" if x is None else x for x in tup]
46-
writer.writerow(tup)
47-
csv_file.seek(0,0)
48-
try:
49-
cursor.copy_from(csv_file, table, columns = column_names)
50-
if do_commit: cursor.execute("commit")
51-
except Exception as e:
52-
cursor.execute("rollback")
53-
raise
23+
@transactioned
24+
def insert_many(db, table, items, column_names=None, copy_threshold=0, chunk_size=1000, make_tuple=None, transaction=None):
25+
for chunk in chunked(items, chunk_size):
26+
if chunk:
27+
if make_tuple is not None:
28+
chunk = [make_tuple(item) for item in chunk]
29+
if len(chunk) <= copy_threshold:
30+
cols = "" if not column_names else "(" + ",".join(column_names) + ")"
31+
ncols = len(column_names) if column_names else len(chunk[0])
32+
vals = ",".join(["%s"] * ncols)
33+
print("cols:", cols)
34+
print("vals:", vals)
35+
print("chunk:", chunk)
36+
sql = f"insert into {table} {cols} values({vals})"
37+
print("sql:", sql)
38+
transaction.executemany(sql, chunk)
39+
else:
40+
csv_file = io.StringIO()
41+
writer = csv.writer(csv_file, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
42+
for tup in chunk:
43+
assert len(tup) == len(column_names)
44+
tup = ["\\N" if x is None else x for x in tup] # null in Postgres
45+
writer.writerow(tup)
46+
csv_file.seek(0,0)
47+
transaction.copy_from(csv_file, table, columns = column_names)
5448

5549

5650
class HasDB(object):

metacat/db/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from .common import (
66
AlreadyExistsError, NotFoundError, IntegrityError, MetaValidationError, DatasetCircularDependencyDetected,
7-
parse_name, alias, make_list_if_short, insert_bulk
7+
parse_name, alias, make_list_if_short
88
)
99

1010
from .param_category import DBParamCategory

metacat/db/common.py

-33
Original file line numberDiff line numberDiff line change
@@ -71,36 +71,3 @@ def make_list_if_short(iterable, limit):
7171
return head, None
7272
else:
7373
return None, iterable
74-
75-
def insert_bulk(cursor, table, column_names, tuples, do_commit=True, copy_threshold = 100):
76-
77-
# if the tuples list or iterable is short enough, do it as multiple inserts
78-
tuples_lst, tuples = make_list_if_short(tuples, copy_threshold)
79-
if tuples_lst is not None and len(tuples_lst) <= copy_threshold:
80-
columns = ",". join(column_names)
81-
placeholders = ",".join(["%s"]*len(column_names))
82-
try:
83-
cursor.executemany(f"""
84-
insert into parent_child({columns}) values({placeholders})
85-
""", tuples_lst)
86-
if do_commit: cursor.execute("commit")
87-
except Exception as e:
88-
cursor.execute("rollback")
89-
raise
90-
else:
91-
92-
csv_file = io.StringIO()
93-
writer = csv.writer(csv_file, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
94-
95-
for tup in tuples:
96-
assert len(tup) == len(column_names)
97-
tup = ["\\N" if x is None else x for x in tup]
98-
writer.writerow(tup)
99-
csv_file.seek(0,0)
100-
try:
101-
cursor.copy_from(csv_file, table, columns = column_names)
102-
if do_commit: cursor.execute("commit")
103-
except Exception as e:
104-
cursor.execute("rollback")
105-
raise
106-

metacat/db/dbobjects2.py

+36-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def debug(*parts):
1616

1717
from .common import (
1818
AlreadyExistsError, DatasetCircularDependencyDetected, NotFoundError, MetaValidationError,
19-
parse_name, alias, insert_bulk
19+
parse_name, alias
2020
)
2121

2222
class DBFileSet(DBObject):
@@ -586,10 +586,11 @@ def create(self, creator=None, transaction=None):
586586
(self.FID, self.Namespace, self.Name, meta, self.Size, checksums, creator))
587587
self.CreatedTimestamp = c.fetchone()[0]
588588
if self.Parents:
589-
insert_many(transaction,
589+
insert_many(self.DB,
590590
"parent_child",
591-
["parent_id", "child_id"],
592591
((p.FID if isinstance(p, DBFile) else p, self.FID) for p in self.Parents),
592+
column_names=["parent_id", "child_id"],
593+
transaction=transaction
593594
)
594595
return self
595596

@@ -761,8 +762,8 @@ def get_files(db, files, transaction=None):
761762
name text);
762763
truncate table {temp_table};
763764
""")
764-
cvs = strio.getvalue()
765-
transaction.copy_from(io.StringIO(cvs), temp_table)
765+
csv = strio.getvalue()
766+
transaction.copy_from(io.StringIO(csv), temp_table)
766767
#print("DBFile.get_files: strio:", strio.getvalue())
767768

768769
columns = DBFile.all_columns("f")
@@ -775,6 +776,36 @@ def get_files(db, files, transaction=None):
775776

776777
return DBFileSet(db, sql=sql)
777778

779+
@staticmethod
780+
@transactioned
781+
def move_to_namespace(db, namespace, files, transaction=None):
782+
"""
783+
784+
WARNING: DOES NOT check namespace permissions for the source namespace
785+
786+
files expected to be a list of DBFile objects with correct file ids
787+
"""
788+
789+
suffix = int(time.time()*1000) % 10000
790+
temp_table = f"temp_fids_{suffix}"
791+
transaction.execute(f"create temp table {temp_table} ( id text );")
792+
793+
errors = []
794+
795+
for chunk in chunked(files, chunk_size=1000):
796+
chunk =
797+
798+
insert_many(db, temp_table, ((f.FID,) for f in files), column_names=["id"], transaction=transaction)
799+
800+
transaction.execute(f"""
801+
update files set namespace = %(ns)s
802+
from {temp_table} tt
803+
where files.id = tt.id
804+
and files.namespace != %(ns)s
805+
""", {"ns": namespace}
806+
)
807+
return transaction.rowcount
808+
778809
@staticmethod
779810
@transactioned
780811
def get(db, fid = None, namespace = None, name = None, with_metadata = False, transaction=None):

metacat/ui/metacat_file.py

+38-1
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ class AddCommand(CLICommand):
658658
-s|--sample - print JOSN file list sample
659659
"""
660660

661-
Usage = 'Use "metacat dataset add..." instead'
661+
Usage = 'DEPRECATED. Use "metacat dataset add..." instead'
662662

663663
AddSample = json.dumps(
664664
[
@@ -707,6 +707,43 @@ def __call__(self, command, client, opts, args):
707707
file_list = load_file_list(opts["-f"])
708708
dataset = args[-1]
709709
out = client.add_files(dataset, file_list)
710+
711+
class MoveCommand(CLICommand):
712+
713+
Usage = """[options] - move files to a new namespace
714+
715+
-n|--namespace - new namespace
716+
717+
Specify files explicitly
718+
-f|--files <file namespace>:<file name>[,...]
719+
-f|--files <file id>[,...]
720+
-f|--files <file> - read the list from text file
721+
-f|--files <JSON file> - read the list from JSON file
722+
-f|--files - - read the list from stdin
723+
724+
Use results of a query
725+
-q|--query "<MQL query>"
726+
-q|--query <file> - read query from the file
727+
-q|--query - - read query from stdin
728+
729+
"""
730+
Opts = ("f:n:q:", ["namespace=", "files=", "query="])
731+
732+
def __call__(self, command, client, opts, args):
733+
namespace = opts.get("-n") or opts.get("--namespace")
734+
if not namespace:
735+
raise InvalidArguments("Namespace must be specified")
736+
query = opts.get("-q") or opts.get("--query")
737+
if query:
738+
query = load_text(query)
739+
file_list = opts.get("-f") or opts.get("--files")
740+
if file_list:
741+
file_list = load_file_list(file_list)
742+
if (file_list is None) == (query is None):
743+
raise InvalidArguments("Either query or file list must be specified, but not both")
744+
client.Timeout = None # this may take long time, so turn the timeout off
745+
nmoved = client.move_files(namespace, file_list=file_list, query=query)
746+
print(nmoved, "files moved")
710747

711748
FileCLI = CLI(
712749
"declare", DeclareSingleCommand(),

metacat/webapi/webapi.py

+49-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ class HTTPClient(object):
102102
def __init__(self, server_url, token, timeout=None):
103103
self.ServerURL = server_url
104104
self.Token = token
105-
self.Timeout = timeout or self.DefaultTimeout
105+
if timeout is not None and timeout <= 0:
106+
self.Timeout = None # no timeout
107+
else:
108+
self.Timeout = timeout or self.DefaultTimeout
106109
self.LastResponse = self.LastURL = self.LastStatusCode = None
107110

108111
def retry_request(self, method, url, timeout=None, **args):
@@ -725,6 +728,51 @@ def declare_files(self, dataset, files, namespace=None, dry_run=False):
725728
out = self.post_json(url, lst)
726729
return out
727730

731+
def move_files(self, namespace, file_list=None, query=None):
732+
"""
733+
Arguments
734+
---------
735+
namespace : str
736+
namespace to move files to
737+
query : str
738+
MQL query to run and add files matching the query
739+
file_list : list
740+
List of dictionaries, one dictionary per file. Each dictionary must contain either a file id
741+
742+
.. code-block:: python
743+
744+
{ "fid": "abcd12345" }
745+
746+
or namespace/name:
747+
748+
.. code-block:: python
749+
750+
{ "name": "filename.data", "namespace": "my_namespace" }
751+
752+
or DID:
753+
754+
.. code-block:: python
755+
756+
{ "did": "my_namespace:filename.data" }
757+
"""
758+
params = {
759+
"namespace": namespace,
760+
}
761+
if file_list is not None:
762+
lst = []
763+
for f in file_list:
764+
spec = ObjectSpec.from_dict(f, default_namespace)
765+
spec.validate()
766+
lst.append(spec.as_dict())
767+
params["files"] = lst
768+
elif query:
769+
params["query"] = query
770+
else:
771+
raise ValueError("Either file_list or query must be specified, but not both")
772+
773+
out = self.post_json(url, params)
774+
return out["files_moved"]
775+
728776
def update_file(self, did=None, namespace=None, name=None, fid=None, replace=False,
729777
size=None, checksums=None, parents=None, children=None, metadata=None
730778
):

upload_pypi.sh

+1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44
rm -rf build dist *.egg-info
55
python setup.py sdist bdist_wheel
66
twine upload dist/*
7+
rm -rf build dist *.egg-info
78

0 commit comments

Comments
 (0)