Skip to content

Commit 02cafcc

Browse files
authored
Merge pull request #380 from broadinstitute/development
Release 1.39.0
2 parents 887729d + 655d7d1 commit 02cafcc

File tree

10 files changed

+113
-23
lines changed

10 files changed

+113
-23
lines changed

.github/workflows/minify_ontologies.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ jobs:
1414
steps:
1515
- name: Checkout code
1616
uses: actions/checkout@v4
17+
with:
18+
ref: ${{ github.head_ref }}
1719

1820
- name: Copy and decompress ontologies in repo
1921
run: cd ingest/validation/ontologies; mkdir tmp; cp -r *.min.tsv.gz tmp/; gzip -d tmp/*.min.tsv.gz
@@ -86,7 +88,7 @@ jobs:
8688
8789
# Commit changes
8890
git commit -m "Update minified ontologies via GitHub Actions"
89-
git push origin ${{ github.head_ref }}
91+
git push
9092
else
9193
echo "No changes to commit."
9294
fi

ingest/ingest_pipeline.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
from opencensus.ext.stackdriver.trace_exporter import StackdriverExporter
106106
from opencensus.trace.samplers import AlwaysOnSampler
107107
from opencensus.trace.tracer import Tracer
108-
from pymongo import MongoClient
108+
from mongo_connection import MongoConnection, graceful_auto_reconnect
109109
from subsample import SubSample
110110
from validation.validate_metadata import (
111111
report_issues,
@@ -210,19 +210,7 @@ def __init__(
210210

211211
# Will be replaced by MongoConnection as defined in SCP-2629
212212
def get_mongo_db(self):
213-
host = os.environ["DATABASE_HOST"]
214-
user = os.environ["MONGODB_USERNAME"]
215-
password = os.environ["MONGODB_PASSWORD"]
216-
db_name = os.environ["DATABASE_NAME"]
217-
client = MongoClient(
218-
host,
219-
username=user,
220-
password=password,
221-
authSource=db_name,
222-
authMechanism="SCRAM-SHA-1",
223-
)
224-
225-
return client[db_name]
213+
return MongoConnection()._client
226214

227215
def initialize_file_connection(self, file_type, file_path):
228216
"""Initializes connection to file.
@@ -258,6 +246,7 @@ def initialize_file_connection(self, file_type, file_path):
258246
self.report_validation("failure")
259247
raise ValueError(v)
260248

249+
@graceful_auto_reconnect
261250
def insert_many(self, collection_name, documents):
262251
if not config.bypass_mongo_writes():
263252
self.db[collection_name].insert_many(documents)
@@ -333,7 +322,7 @@ def load_subsample(
333322
},
334323
):
335324
documents.append(model)
336-
self.db["data_arrays"].insert_many(documents)
325+
self.insert_many("data_arrays", documents)
337326

338327
except Exception as e:
339328
log_exception(IngestPipeline.dev_logger, IngestPipeline.user_logger, e)

ingest/mongo_connection.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ def retry(attempt_num):
6767
@functools.wraps(mongo_op_func)
6868
def wrapper(*args, **kwargs):
6969
args = list(args)
70+
# detect which argument is the list of inserted documents
71+
# when called from IngestPipeline, first arg is ingest instance, and 3rd is the list
72+
# when called from GeneExpression (static implementation), first arg is list
73+
if args[0].__class__.__name__ == 'IngestPipeline':
74+
docs_idx = 2
75+
else:
76+
docs_idx = 0
7077
for attempt in range(MongoConnection.MAX_AUTO_RECONNECT_ATTEMPTS):
7178
try:
7279
return mongo_op_func(*args, **kwargs)
@@ -84,12 +91,12 @@ def wrapper(*args, **kwargs):
8491
error = bwe.details["writeErrors"]
8592
# Check error code to see if any failures are due to violating a unique index (error code 11000)
8693
# and discard those documents before retrying
87-
filtered_docs = discard_inserted_documents(error, args[0])
94+
filtered_docs = discard_inserted_documents(error, args[docs_idx])
8895
if len(filtered_docs) > 0:
89-
args[0] = filtered_docs
96+
args[docs_idx] = filtered_docs
9097
retry(attempt)
9198
else:
92-
return args[0]
99+
return args[docs_idx]
93100
else:
94101
log_error_without_values(bwe)
95102
raise bwe
1.11 KB
Binary file not shown.
1.38 KB
Binary file not shown.
4.02 KB
Binary file not shown.
191 Bytes
Binary file not shown.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1733232455 # validation cache key
1+
1737653567 # validation cache key

scripts/docker-compose-setup.sh

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,18 @@ case $OPTION in
3535
done
3636

3737
if [[ $GCR_IMAGE = "" ]]; then
38-
IMAGE_NAME="gcr.io/broad-singlecellportal-staging/scp-ingest-pipeline-development"
39-
LATEST_TAG=$(gcloud container images list-tags ${IMAGE_NAME} --format='get(tags)' | head -n 1)
38+
# Google Artifact Registry (GAR) formats Docker image names differently than the canonical Container Registry format
39+
# both refer to the same image digest and will work with 'docker pull', but the GCR names do not work with any
40+
# 'gcloud artifacts docker' commands
41+
# for example:
42+
# GCR name: gcr.io/broad-singlecellportal-staging/scp-ingest-pipeline-development
43+
# GAR name: us-docker.pkg.dev/broad-singlecellportal-staging/gcr.io/scp-ingest-pipeline-development
44+
REPO='gcr.io'
45+
PROJECT='broad-singlecellportal-staging'
46+
IMAGE='scp-ingest-pipeline-development'
47+
GAR_NAME="us-docker.pkg.dev/$PROJECT/$REPO/$IMAGE"
48+
IMAGE_NAME="$REPO/$PROJECT/$IMAGE"
49+
LATEST_TAG=$(gcloud artifacts docker images list ${GAR_NAME} --include-tags --sort-by=~CREATE_TIME --format="get(tags)" | head -n 1)
4050
export GCR_IMAGE="${IMAGE_NAME}:${LATEST_TAG}"
4151
fi
4252

tests/test_ingest.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from test_dense import mock_load_r_files
3737
import os
3838

39-
from pymongo.errors import AutoReconnect
39+
from pymongo.errors import AutoReconnect, BulkWriteError
4040
from test_expression_files import mock_expression_load
4141
from mock_gcp import mock_storage_client, mock_storage_blob
4242

@@ -813,6 +813,88 @@ def test_get_action_from_args(self):
813813
bad_args = ["foo", "bar", "bing"]
814814
self.assertEqual("", get_action_from_args(bad_args))
815815

816+
@patch("mongo_connection.MongoConnection.MAX_AUTO_RECONNECT_ATTEMPTS", 3)
817+
def test_insert_reconnect(self):
818+
args = [
819+
"--study-id",
820+
"5d276a50421aa9117c982845",
821+
"--study-file-id",
822+
"5dd5ae25421aa910a723a337",
823+
"ingest_subsample",
824+
"--cluster-file",
825+
"../tests/data/good_subsample_cluster.csv",
826+
"--name",
827+
"cluster1",
828+
"--cell-metadata-file",
829+
"../tests/data/test_cell_metadata.csv",
830+
"--subsample",
831+
]
832+
parsed_args = create_parser().parse_args(args)
833+
validate_arguments(parsed_args)
834+
arguments = vars(parsed_args)
835+
ingest = IngestPipeline(**arguments)
836+
client_mock = MagicMock()
837+
ingest.db = client_mock
838+
docs = [
839+
{
840+
'id': 1,
841+
'name': 'foo',
842+
'study_id': 1,
843+
'study_file_id': 1,
844+
'array_index': 0,
845+
'linear_data_type': 'Cluster',
846+
},
847+
{
848+
'id': 2,
849+
'name': 'bar',
850+
'study_id': 1,
851+
'study_file_id': 1,
852+
'array_index': 0,
853+
'linear_data_type': 'Cluster',
854+
},
855+
]
856+
ingest.insert_many("data_arrays", docs)
857+
client_mock["data_arrays"].insert_many.assert_called_with(docs)
858+
859+
client_mock["data_arrays"].insert_many.side_effect = ValueError("Foo")
860+
self.assertRaises(
861+
Exception, ingest.insert_many, "data_arrays", docs
862+
)
863+
client_mock.reset_mock()
864+
865+
# Test exponential back off for auto reconnect
866+
client_mock["data_arrays"].insert_many.side_effect = AutoReconnect
867+
self.assertRaises(
868+
AutoReconnect, ingest.insert_many, "data_arrays", docs
869+
)
870+
self.assertEqual(client_mock["data_arrays"].insert_many.call_count, 3)
871+
client_mock.reset_mock()
872+
873+
def raiseError(*args, **kwargs):
874+
details = {
875+
"writeErrors": [
876+
{
877+
"code": 11000,
878+
"op": {
879+
'id': 1,
880+
'name': 'foo',
881+
'study_id': 1,
882+
'study_file_id': 1,
883+
'array_index': 0,
884+
'linear_data_type': 'Cluster',
885+
},
886+
}
887+
]
888+
}
889+
raise BulkWriteError(details)
890+
891+
# Test exponential back off for BulkWriteError
892+
client_mock["data_arrays"].insert_many.side_effect = raiseError
893+
self.assertRaises(
894+
BulkWriteError, ingest.insert_many, "data_arrays", docs
895+
)
896+
self.assertEqual(client_mock["data_arrays"].insert_many.call_count, 3)
897+
816898

817899
if __name__ == "__main__":
818900
unittest.main()

0 commit comments

Comments
 (0)