Skip to content

Commit 63ecc8e

Browse files
authored
Merge branch 'master' into test-importer-interface
2 parents 3725e0e + 1136476 commit 63ecc8e

File tree

8 files changed

+143
-9
lines changed

8 files changed

+143
-9
lines changed

extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.datatransferproject.datatransfer.google.videos.GoogleVideosImporter;
3838
import org.datatransferproject.spi.cloud.storage.AppCredentialStore;
3939
import org.datatransferproject.spi.cloud.storage.JobStore;
40+
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor;
41+
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutorExtension;
4042
import org.datatransferproject.types.common.models.DataVertical;
4143
import org.datatransferproject.spi.transfer.extension.TransferExtension;
4244
import org.datatransferproject.spi.transfer.provider.Exporter;
@@ -107,6 +109,10 @@ public void initialize(ExtensionContext context) {
107109
GoogleCredentialFactory credentialFactory =
108110
new GoogleCredentialFactory(httpTransport, jsonFactory, appCredentials, monitor);
109111

112+
IdempotentImportExecutor idempotentImportExecutor = context.getService(
113+
IdempotentImportExecutorExtension.class).getRetryingIdempotentImportExecutor(context);
114+
boolean enableRetrying = context.getSetting("enableRetrying", false);
115+
110116
ImmutableMap.Builder<DataVertical, Importer> importerBuilder = ImmutableMap.builder();
111117
importerBuilder.put(BLOBS, new DriveImporter(credentialFactory, jobStore, monitor));
112118
importerBuilder.put(CONTACTS, new GoogleContactsImporter(credentialFactory));
@@ -120,7 +126,9 @@ public void initialize(ExtensionContext context) {
120126
jobStore,
121127
jsonFactory,
122128
monitor,
123-
context.getSetting("googleWritesPerSecond", 1.0)));
129+
context.getSetting("googleWritesPerSecond", 1.0),
130+
idempotentImportExecutor,
131+
enableRetrying));
124132
importerBuilder.put(VIDEOS, new GoogleVideosImporter(appCredentials, jobStore, monitor));
125133
importerMap = importerBuilder.build();
126134

@@ -141,4 +149,4 @@ public void initialize(ExtensionContext context) {
141149

142150
initialized = true;
143151
}
144-
}
152+
}

extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,29 @@ public class GooglePhotosImporter
7878
private final Map<UUID, GooglePhotosInterface> photosInterfacesMap;
7979
private final GooglePhotosInterface photosInterface;
8080
private final HashMap<UUID, BaseMultilingualDictionary> multilingualStrings = new HashMap<>();
81+
private IdempotentImportExecutor retryingIdempotentExecutor;
82+
private Boolean enableRetrying;
83+
84+
public GooglePhotosImporter(
85+
GoogleCredentialFactory credentialFactory,
86+
JobStore jobStore,
87+
JsonFactory jsonFactory,
88+
Monitor monitor,
89+
double writesPerSecond,
90+
IdempotentImportExecutor retryingIdempotentExecutor,
91+
boolean enableRetrying) {
92+
this(
93+
credentialFactory,
94+
jobStore,
95+
jsonFactory,
96+
new HashMap<>(),
97+
null,
98+
new ConnectionProvider(jobStore),
99+
monitor,
100+
writesPerSecond,
101+
retryingIdempotentExecutor,
102+
enableRetrying);
103+
}
81104

82105
public GooglePhotosImporter(
83106
GoogleCredentialFactory credentialFactory,
@@ -106,6 +129,30 @@ public GooglePhotosImporter(
106129
ConnectionProvider connectionProvider,
107130
Monitor monitor,
108131
double writesPerSecond) {
132+
this(
133+
credentialFactory,
134+
jobStore,
135+
jsonFactory,
136+
photosInterfacesMap,
137+
photosInterface,
138+
connectionProvider,
139+
monitor,
140+
writesPerSecond,
141+
null,
142+
false);
143+
}
144+
145+
GooglePhotosImporter(
146+
GoogleCredentialFactory credentialFactory,
147+
JobStore jobStore,
148+
JsonFactory jsonFactory,
149+
Map<UUID, GooglePhotosInterface> photosInterfacesMap,
150+
GooglePhotosInterface photosInterface,
151+
ConnectionProvider connectionProvider,
152+
Monitor monitor,
153+
double writesPerSecond,
154+
IdempotentImportExecutor retryingIdempotentExecutor,
155+
boolean enableRetrying) {
109156
this.credentialFactory = credentialFactory;
110157
this.jobStore = jobStore;
111158
this.jsonFactory = jsonFactory;
@@ -114,6 +161,8 @@ public GooglePhotosImporter(
114161
this.connectionProvider = connectionProvider;
115162
this.monitor = monitor;
116163
this.writesPerSecond = writesPerSecond;
164+
this.retryingIdempotentExecutor = retryingIdempotentExecutor;
165+
this.enableRetrying = enableRetrying;
117166
}
118167

119168
// TODO(aksingh737) WARNING: stop maintaining this code here; this needs to be reconciled against
@@ -131,10 +180,12 @@ public ImportResult importItem(
131180
// Nothing to do
132181
return ImportResult.OK;
133182
}
134-
GPhotosUpload gPhotosUpload = new GPhotosUpload(jobId, idempotentImportExecutor, authData);
183+
IdempotentImportExecutor executor =
184+
(retryingIdempotentExecutor != null && enableRetrying) ? retryingIdempotentExecutor : idempotentImportExecutor;
185+
GPhotosUpload gPhotosUpload = new GPhotosUpload(jobId, executor, authData);
135186

136187
for (PhotoAlbum album : data.getAlbums()) {
137-
idempotentImportExecutor.executeAndSwallowIOExceptions(
188+
executor.executeAndSwallowIOExceptions(
138189
album.getId(), album.getName(), () -> importSingleAlbum(jobId, authData, album));
139190
}
140191
long bytes = importPhotos(data.getPhotos(), gPhotosUpload);
@@ -157,7 +208,7 @@ String importSingleAlbum(UUID jobId, TokensAndUrlAuthData authData, PhotoAlbum i
157208

158209
@VisibleForTesting // TODO(aksingh737,jzacsh) stop exposing this to unit tests
159210
public long importPhotos(Collection<PhotoModel> photos, GPhotosUpload gPhotosUpload)
160-
throws Exception {
211+
throws Exception {
161212
return gPhotosUpload.uploadItemsViaBatching(photos, this::importPhotoBatch);
162213
}
163214

portability-transfer/src/main/java/org/datatransferproject/transfer/CallableImporter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public ImportResult call() throws Exception {
7070
Collection<ErrorDetail> errors = idempotentImportExecutor.getRecentErrors();
7171
success = result.getType() == ImportResult.ResultType.OK && errors.isEmpty();
7272

73-
if (!success) {
73+
if (!success && errors.iterator().hasNext() && !errors.iterator().next().canSkip()) {
7474
throw new IOException(
7575
"Problem with importer, forcing a retry, "
7676
+ "first error: "

portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/errors/ErrorDetail.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public abstract class ErrorDetail {
3636
private static final String DATA_KEY = "Data";
3737

3838
public static ErrorDetail.Builder builder() {
39-
return new org.datatransferproject.types.transfer.errors.AutoValue_ErrorDetail.Builder();
39+
return new org.datatransferproject.types.transfer.errors.AutoValue_ErrorDetail.Builder()
40+
.setCanSkip(false);
4041
}
4142

4243
@JsonProperty("id")
@@ -48,6 +49,9 @@ public static ErrorDetail.Builder builder() {
4849
@JsonProperty("exception")
4950
public abstract String exception();
5051

52+
@JsonProperty("canSkip")
53+
public abstract boolean canSkip();
54+
5155
@AutoValue.Builder
5256
public abstract static class Builder {
5357
@JsonCreator
@@ -65,5 +69,8 @@ private static ErrorDetail.Builder create() {
6569

6670
@JsonProperty("exception")
6771
public abstract Builder setException(String exception);
72+
73+
@JsonProperty("canSkip")
74+
public abstract Builder setCanSkip(boolean canSkip);
6875
}
6976
}

portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryException.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,16 @@
2222
public class RetryException extends Exception {
2323

2424
private final int triesSoFar;
25+
private final boolean canSkip;
2526

2627
RetryException(int triesSoFar, Exception exception) {
28+
this(triesSoFar, exception, false);
29+
}
30+
31+
RetryException(int triesSoFar, Exception exception, boolean canSkip) {
2732
super(exception);
2833
this.triesSoFar = triesSoFar;
34+
this.canSkip = canSkip;
2935
}
3036

3137
@Override
@@ -36,4 +42,8 @@ public Exception getCause() {
3642
public int getTriesSoFar() {
3743
return triesSoFar;
3844
}
45+
46+
public boolean canSkip() {
47+
return canSkip;
48+
}
3949
}

portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryStrategy.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
@JsonSubTypes({
3434
@JsonSubTypes.Type(value = UniformRetryStrategy.class, name = "Uniform"),
3535
@JsonSubTypes.Type(value = ExponentialBackoffStrategy.class, name = "Exponential"),
36-
@JsonSubTypes.Type(value = NoRetryStrategy.class, name = "Fatal")
36+
@JsonSubTypes.Type(value = NoRetryStrategy.class, name = "Fatal"),
37+
@JsonSubTypes.Type(value = SkipRetryStrategy.class, name = "Skip")
3738
})
3839
public interface RetryStrategy {
3940

@@ -52,4 +53,9 @@ public interface RetryStrategy {
5253
* Gets milliseconds until the next retry, given elapsed time so far
5354
*/
5455
long getRemainingIntervalMillis(int tries, long elapsedMillis);
56+
57+
/** Shows whether exception should be skipped */
58+
default boolean canSkip() {
59+
return false;
60+
}
5561
}

portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,11 @@ public T call() throws RetryException {
116116
monitor.debug(
117117
() ->
118118
String.format("Strategy canTryAgain returned false after %d retries", attempts));
119-
throw new RetryException(attempts, mostRecentException);
119+
if (strategy.canSkip()) {
120+
throw new RetryException(attempts, mostRecentException, true);
121+
} else {
122+
throw new RetryException(attempts, mostRecentException);
123+
}
120124
}
121125
}
122126
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2023 The Data Transfer Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.datatransferproject.types.transfer.retry;
18+
19+
/** {@link RetryStrategy} that allows exception to be skipped. Useful for known non fatal errors. */
20+
public class SkipRetryStrategy implements RetryStrategy {
21+
22+
public SkipRetryStrategy() {}
23+
24+
@Override
25+
public boolean canTryAgain(int tries) {
26+
return false;
27+
}
28+
29+
@Override
30+
public long getNextIntervalMillis(int tries) {
31+
return -1L;
32+
}
33+
34+
@Override
35+
public long getRemainingIntervalMillis(int tries, long elapsedMillis) {
36+
return -1L;
37+
}
38+
39+
@Override
40+
public boolean canSkip() {
41+
return true;
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "SkipRetryStrategy{}";
47+
}
48+
}

0 commit comments

Comments
 (0)