Skip to content

Commit

Permalink
Connection Updater worker should use data plane to run check_connecti…
Browse files Browse the repository at this point in the history
…on (#6191)
  • Loading branch information
xiaohansong committed Apr 27, 2023
1 parent 8347949 commit a82ce3d
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 27 deletions.
3 changes: 3 additions & 0 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ object ContainerOrchestratorJavaOpts : Temporary<String>(key = "container-orches

object NewTrialPolicyEnabled : Temporary<Boolean>(key = "billing.newTrialPolicy", default = false)

object CheckConnectionUseApiEnabled : Temporary<Boolean>(key = "check-connection-use-api", default = false)


/**
* The default value is 3 hours, it is larger than what is configured by default in the airbyte self owned instance.
* The goal is to allow more room for OSS deployment that airbyte can not monitor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity;
import io.airbyte.workers.temporal.check.connection.SubmitCheckConnectionActivity;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
Expand Down Expand Up @@ -77,7 +78,8 @@ public List<Object> connectionManagerActivities(
final RecordMetricActivity recordMetricActivity,
final WorkflowConfigActivity workflowConfigActivity,
final RouteToSyncTaskQueueActivity routeToSyncTaskQueueActivity,
final FeatureFlagFetchActivity featureFlagFetchActivity) {
final FeatureFlagFetchActivity featureFlagFetchActivity,
final SubmitCheckConnectionActivity submitCheckConnectionActivity) {
return List.of(generateInputActivity,
jobCreationAndStatusUpdateActivity,
configFetchActivity,
Expand All @@ -87,7 +89,8 @@ public List<Object> connectionManagerActivities(
recordMetricActivity,
workflowConfigActivity,
routeToSyncTaskQueueActivity,
featureFlagFetchActivity);
featureFlagFetchActivity,
submitCheckConnectionActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.check.connection;

import io.airbyte.config.ConnectorJobOutput;
import io.temporal.activity.ActivityInterface;
import java.util.UUID;

/**
* Temporal activity to submit a check_connection request to airbyte server.
*/
@ActivityInterface
public interface SubmitCheckConnectionActivity {

/**
* Submits an API request to airbyte server to run check connection for source.
*/
ConnectorJobOutput submitCheckConnectionToSource(final UUID sourceId);

/**
* Submits an API request to airbyte server to run check connection for destination.
*/
ConnectorJobOutput submitCheckConnectionToDestination(final UUID destinationId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.check.connection;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.DestinationApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.model.generated.CheckConnectionRead;
import io.airbyte.api.client.model.generated.CheckConnectionRead.StatusEnum;
import io.airbyte.api.client.model.generated.DestinationIdRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import jakarta.inject.Singleton;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

/**
*
* Implementation of SubmitCheckConnectionActivity.
*/
@Slf4j
@Singleton
public class SubmitCheckConnectionActivityImpl implements SubmitCheckConnectionActivity {

private final SourceApi sourceApi;
private final DestinationApi destinationApi;
private final EnvVariableFeatureFlags envVariableFeatureFlags;

public SubmitCheckConnectionActivityImpl(final SourceApi sourceApi,
DestinationApi destinationApi,
final EnvVariableFeatureFlags envVariableFeatureFlags) {
this.sourceApi = sourceApi;
this.destinationApi = destinationApi;
this.envVariableFeatureFlags = envVariableFeatureFlags;
}

@Override
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
public ConnectorJobOutput submitCheckConnectionToSource(final UUID sourceId) {
ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION);
try {
CheckConnectionRead checkResult = AirbyteApiClient.retryWithJitter(
() -> sourceApi.checkConnectionToSource(new SourceIdRequestBody().sourceId(sourceId)),
"Trigger check connection to source");
jobOutput.withCheckConnection(convertApiOutputToStandardOutput(checkResult));
} catch (Exception ex) {
jobOutput.withFailureReason(new FailureReason().withFailureType(FailureType.SYSTEM_ERROR).withInternalMessage(ex.getMessage()));
throw ex;
}
return jobOutput;
}

@Override
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
public ConnectorJobOutput submitCheckConnectionToDestination(final UUID destinationId) {
ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION);
try {
CheckConnectionRead checkResult = AirbyteApiClient.retryWithJitter(
() -> destinationApi.checkConnectionToDestination(new DestinationIdRequestBody().destinationId(destinationId)),
"Trigger check connection to destination");
jobOutput.withCheckConnection(convertApiOutputToStandardOutput(checkResult));
} catch (Exception ex) {
jobOutput.withFailureReason(new FailureReason().withFailureType(FailureType.SYSTEM_ERROR).withInternalMessage(ex.getMessage()));
throw ex;
}
return jobOutput;
}

private StandardCheckConnectionOutput convertApiOutputToStandardOutput(final CheckConnectionRead apiOutput) {
StandardCheckConnectionOutput output = new StandardCheckConnectionOutput().withMessage(apiOutput.getMessage());

if (apiOutput.getStatus().equals(StatusEnum.SUCCEEDED)) {
output.withStatus(Status.SUCCEEDED);
} else {
output.withStatus(Status.FAILED);
}
return output;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
Expand All @@ -37,6 +38,7 @@
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput;
import io.airbyte.workers.temporal.check.connection.SubmitCheckConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionActivityInput;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionOutput;
Expand Down Expand Up @@ -98,6 +100,7 @@
public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow {

private static final String GENERATE_CHECK_INPUT_TAG = "generate_check_input";
private static final String CHECK_WITH_API_TAG = "check_with_api";
private static final int GENERATE_CHECK_INPUT_CURRENT_VERSION = 1;

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());
Expand All @@ -106,6 +109,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow

private static final String GET_FEATURE_FLAGS_TAG = "get_feature_flags";
private static final int GET_FEATURE_FLAGS_CURRENT_VERSION = 1;
private static final int CHECK_WITH_API_CURRENT_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private GenerateInputActivity getSyncInputActivity;
Expand All @@ -118,6 +122,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private CheckConnectionActivity checkActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private SubmitCheckConnectionActivity submitCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private StreamResetActivity streamResetActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RecordMetricActivity recordMetricActivity;
Expand Down Expand Up @@ -237,7 +243,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
StandardSyncOutput standardSyncOutput = null;

try {
final SyncCheckConnectionResult syncCheckConnectionResult = checkConnections(getJobRunConfig(), jobInputs);
final SyncCheckConnectionResult syncCheckConnectionResult = checkConnections(getJobRunConfig(), jobInputs, featureFlags);
if (syncCheckConnectionResult.isFailed()) {
final StandardSyncOutput checkFailureOutput = syncCheckConnectionResult.buildFailureOutput();
workflowState.setFailed(getFailStatus(checkFailureOutput));
Expand Down Expand Up @@ -399,7 +405,8 @@ private SyncJobCheckConnectionInputs getCheckConnectionInputFromSync(final Gener
}

private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConfig,
@Nullable final GenerateInputActivity.GeneratedJobInput jobInputs) {
@Nullable final GenerateInputActivity.GeneratedJobInput jobInputs,
final Map<String, Boolean> featureFlags) {
final SyncCheckConnectionResult checkConnectionResult = new SyncCheckConnectionResult(jobRunConfig);

final JobCheckFailureInput jobStateInput =
Expand Down Expand Up @@ -431,7 +438,18 @@ private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConf
log.info("SOURCE CHECK: Skipped, reset job");
} else {
log.info("SOURCE CHECK: Starting");
final ConnectorJobOutput sourceCheckResponse = getCheckResponse(checkSourceInput);
final ConnectorJobOutput sourceCheckResponse;

final int checkWithApiVersion =
Workflow.getVersion(CHECK_WITH_API_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_API_CURRENT_VERSION);

if (checkWithApiVersion >= CHECK_WITH_API_CURRENT_VERSION && featureFlags.get(CheckConnectionUseApiEnabled.INSTANCE.getKey())) {
sourceCheckResponse = runMandatoryActivityWithOutput(submitCheckActivity::submitCheckConnectionToSource,
checkInputs.getSourceCheckConnectionInput().getActorId());
} else {
sourceCheckResponse = getCheckResponse(checkSourceInput);
}

if (SyncCheckConnectionResult.isOutputFailed(sourceCheckResponse)) {
checkConnectionResult.setFailureOrigin(FailureReason.FailureOrigin.SOURCE);
checkConnectionResult.setFailureOutput(sourceCheckResponse);
Expand All @@ -450,7 +468,15 @@ private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConf
log.info("DESTINATION CHECK: Skipped, source check failed");
} else {
log.info("DESTINATION CHECK: Starting");
final ConnectorJobOutput destinationCheckResponse = getCheckResponse(checkDestinationInput);
final ConnectorJobOutput destinationCheckResponse;
final int checkWithApiVersion =
Workflow.getVersion(CHECK_WITH_API_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_API_CURRENT_VERSION);
if (checkWithApiVersion >= CHECK_WITH_API_CURRENT_VERSION && featureFlags.get(CheckConnectionUseApiEnabled.INSTANCE.getKey())) {
destinationCheckResponse = runMandatoryActivityWithOutput(submitCheckActivity::submitCheckConnectionToDestination,
checkInputs.getDestinationCheckConnectionInput().getActorId());
} else {
destinationCheckResponse = getCheckResponse(checkDestinationInput);
}
if (SyncCheckConnectionResult.isOutputFailed(destinationCheckResponse)) {
checkConnectionResult.setFailureOrigin(FailureReason.FailureOrigin.DESTINATION);
checkConnectionResult.setFailureOutput(destinationCheckResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Flag;
import io.airbyte.featureflag.Workspace;
Expand Down Expand Up @@ -55,7 +56,7 @@ public FeatureFlagFetchOutput getFeatureFlags(final FeatureFlagFetchInput input)

// No feature flags are currently in use.
// To get value for a feature flag with the workspace context, add it to the workspaceFlags list.
final List<Flag> workspaceFlags = List.of();
final List<Flag> workspaceFlags = List.of(CheckConnectionUseApiEnabled.INSTANCE);
final Map<String, Boolean> featureFlags = new HashMap<>();
for (final Flag flag : workspaceFlags) {
featureFlags.put(flag.getKey(), featureFlagClient.enabled(flag, new Workspace(workspaceId)));
Expand Down
Loading

0 comments on commit a82ce3d

Please sign in to comment.