Skip to content

Commit

Permalink
Add datadog for postgres source (#5088)
Browse files Browse the repository at this point in the history
Integrate postgres source Datadog APM traces.
  • Loading branch information
octavia-squidington-iii committed Apr 4, 2023
1 parent c465c54 commit 1346fb9
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ public class WorkerConstants {

public static final String WORKER_ENVIRONMENT = "WORKER_ENVIRONMENT";

public static final String DD_ENV_VAR = "-XX:+ExitOnOutOfMemoryError -javaagent:/airbyte/dd-java-agent.jar "
+ "-Ddd.profiling.enabled=true "
+ "-XX:FlightRecorderOptions=stackdepth=256 "
+ "-Ddd.trace.sample.rate=30 "
+ "-Ddd.trace.request_header.tags=User-Agent:http.useragent";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;

import com.google.api.client.util.Preconditions;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.EnvConfigs;
import io.airbyte.workers.WorkerConstants;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.micrometer.common.util.StringUtils;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.tuple.ImmutablePair;

/**
* Utility class for Connector level Datadog support helper.
*/
public class ConnectorDatadogSupportHelper {

private static final String JAVA_OPTS = "JAVA_OPTS";
private static final String DD_SERVICE = "DD_SERVICE";
private static final String DD_VERSION = "DD_VERSION";

/**
* addServerNameAndVersionToEnvVars.
*
* @param imageNameAndVersion ConnectorNameAndVersion record.
* @param envVars List of environment variables.
*/
public void addServerNameAndVersionToEnvVars(final String imageNameAndVersion, final List<EnvVar> envVars) {
Preconditions.checkNotNull(imageNameAndVersion);
Preconditions.checkNotNull(envVars);

final Optional<ImmutablePair<String, AirbyteVersion>> imageNameVersionPair = extractAirbyteVersionFromImageName(imageNameAndVersion, ":");
if (imageNameVersionPair.isPresent()) {
envVars.add(new EnvVar(DD_SERVICE, imageNameVersionPair.get().left, null));
envVars.add(new EnvVar(DD_VERSION, imageNameVersionPair.get().right.serialize(), null));
}
}

/**
* getImageNameAndVersion.
*
* @param imageName image name in string format.
* @param delimiter delimiter seperating connector image name and version.
* @return ConnectorNameAndVersion Parsed ConnectorNameAndVersion record.
*/
public Optional<ImmutablePair<String, AirbyteVersion>> extractAirbyteVersionFromImageName(final String imageName, final String delimiter) {
Preconditions.checkNotNull(imageName);
Preconditions.checkNotNull(delimiter);

final String[] imageNameAndVersion = imageName.split(delimiter);
final int expectedCount = 2;
if (imageNameAndVersion.length == expectedCount && StringUtils.isNotEmpty(imageNameAndVersion[0])) {
return Optional.of(ImmutablePair.of(imageNameAndVersion[0], new AirbyteVersion(imageNameAndVersion[1])));
}
return Optional.empty();
}

/**
* addDatadogVars.
*
* @param envVars List of environment variables.
*/
public void addDatadogVars(final List<EnvVar> envVars) {
envVars.add(new EnvVar(JAVA_OPTS, WorkerConstants.DD_ENV_VAR, null));

if (System.getenv(EnvConfigs.DD_AGENT_HOST) != null) {
envVars.add(new EnvVar(EnvConfigs.DD_AGENT_HOST, System.getenv(EnvConfigs.DD_AGENT_HOST), null));
}
if (System.getenv(EnvConfigs.DD_DOGSTATSD_PORT) != null) {
envVars.add(new EnvVar(EnvConfigs.DD_DOGSTATSD_PORT, System.getenv(EnvConfigs.DD_DOGSTATSD_PORT), null));
}
}

/**
* Utility function to check if connector image supports Datadog.
*
* @param firstConnectorVersionWithDatadog image and version when Datadog support was first added.
* @param currentConnectorVersion current image and version number
* @return True if current image version has Datadog support.
*/
public boolean connectorVersionCompare(final String firstConnectorVersionWithDatadog, final String currentConnectorVersion) {
final Optional<ImmutablePair<String, AirbyteVersion>> firstSupportedVersion =
extractAirbyteVersionFromImageName(firstConnectorVersionWithDatadog, "=");
final Optional<ImmutablePair<String, AirbyteVersion>> currentVersion = extractAirbyteVersionFromImageName(currentConnectorVersion, ":");

return firstSupportedVersion.isPresent()
&& currentVersion.isPresent()
&& currentVersion.get().left.compareTo(firstSupportedVersion.get().left) == 0
&& currentVersion.get().right.greaterThanOrEqualTo(firstSupportedVersion.get().right);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.config.AllowedHosts;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import java.io.IOException;
Expand Down Expand Up @@ -43,6 +44,8 @@ public class DockerProcessFactory implements ProcessFactory {
private static final Path DATA_MOUNT_DESTINATION = Path.of("/data");
private static final Path LOCAL_MOUNT_DESTINATION = Path.of("/local");
private static final String IMAGE_EXISTS_SCRIPT = "image_exists.sh";
private static final String DD_SUPPORT_CONNECTOR_NAMES = "CONNECTOR_DATADOG_SUPPORT_NAMES";
public static final String JAVA_OPTS = "JAVA_OPTS";

private final String workspaceMountSource;
private final WorkerConfigs workerConfigs;
Expand Down Expand Up @@ -153,6 +156,12 @@ public Process create(final String jobType,
cmd.add(envEntry.getKey() + "=" + envEntry.getValue());
}

if (System.getenv(DD_SUPPORT_CONNECTOR_NAMES) != null
&& Arrays.stream(System.getenv(DD_SUPPORT_CONNECTOR_NAMES).split(",")).anyMatch(imageName::contains)) {
cmd.add("-e");
cmd.add(JAVA_OPTS + "=" + WorkerConstants.DD_ENV_VAR);
}

if (!Strings.isNullOrEmpty(entrypoint)) {
cmd.add("--entrypoint");
cmd.add(entrypoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.config.TolerationPOJO;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.workers.helper.ConnectorDatadogSupportHelper;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
Expand Down Expand Up @@ -45,6 +46,7 @@
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -116,6 +118,7 @@ public class KubePodProcess implements KubePod {
private static final ResourceRequirements DEFAULT_SOCAT_RESOURCES = new ResourceRequirements()
.withMemoryLimit(configs.getSidecarKubeMemoryLimit()).withMemoryRequest(configs.getSidecarMemoryRequest())
.withCpuLimit(configs.getSocatSidecarKubeCpuLimit()).withCpuRequest(configs.getSocatSidecarKubeCpuRequest());
private static final String DD_SUPPORT_CONNECTOR_NAMES = "CONNECTOR_DATADOG_SUPPORT_NAMES";

private static final String PIPES_DIR = "/pipes";
private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin";
Expand Down Expand Up @@ -144,6 +147,7 @@ public class KubePodProcess implements KubePod {

private static final int INIT_RETRY_MAX_ITERATIONS = (int) (INIT_RETRY_TIMEOUT_MINUTES.toSeconds() / INIT_SLEEP_PERIOD_SECONDS);

private static final ConnectorDatadogSupportHelper CONNECTOR_DATADOG_SUPPORT_HELPER = new ConnectorDatadogSupportHelper();
private final KubernetesClient fabricClient;
private final Pod podDefinition;

Expand Down Expand Up @@ -232,6 +236,11 @@ private static Container getMain(final String image,
.map(entry -> new EnvVar(entry.getKey(), entry.getValue(), null))
.collect(Collectors.toList());

if (System.getenv(DD_SUPPORT_CONNECTOR_NAMES) != null && isSupportDatadog(image)) {
CONNECTOR_DATADOG_SUPPORT_HELPER.addDatadogVars(envVars);
CONNECTOR_DATADOG_SUPPORT_HELPER.addServerNameAndVersionToEnvVars(image, envVars);
}

final ContainerBuilder containerBuilder = new ContainerBuilder()
.withName(MAIN_CONTAINER_NAME)
.withPorts(containerPorts)
Expand All @@ -249,6 +258,11 @@ private static Container getMain(final String image,
return containerBuilder.build();
}

private static boolean isSupportDatadog(final String image) {
return Arrays.stream(System.getenv(DD_SUPPORT_CONNECTOR_NAMES).split(","))
.anyMatch(connectorNameAndVersion -> CONNECTOR_DATADOG_SUPPORT_HELPER.connectorVersionCompare(connectorNameAndVersion, image));
}

/**
* Create port list for pod.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.airbyte.commons.version.AirbyteVersion;
import java.util.Optional;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;

class ConnectorDatadogSupportHelperTest {

public static final String CONNECTOR_VERSION = "postgres=2.0.5";
private final ConnectorDatadogSupportHelper supportHelper = new ConnectorDatadogSupportHelper();

@Test
void extractAirbyteVersionFromImageName() {
final Optional<ImmutablePair<String, AirbyteVersion>> pair =
supportHelper.extractAirbyteVersionFromImageName(CONNECTOR_VERSION, "=");

assertTrue(pair.isPresent());
assertEquals("postgres".compareTo(pair.get().left), 0);
assertEquals("2.0.5".compareTo(pair.get().right.serialize()), 0);
}

@Test
void connectorVersionCompare() {
assertTrue(supportHelper.connectorVersionCompare(CONNECTOR_VERSION, "postgres:2.0.5"));
assertTrue(supportHelper.connectorVersionCompare(CONNECTOR_VERSION, "postgres:2.0.6"));

assertFalse(supportHelper.connectorVersionCompare(CONNECTOR_VERSION, "postgres:2.0.4"));
assertFalse(supportHelper.connectorVersionCompare(CONNECTOR_VERSION, "postgres:1.2.8"));

assertFalse(supportHelper.connectorVersionCompare(CONNECTOR_VERSION, "mysql:2.0.5"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void testEnvMapSet() throws IOException, WorkerException, InterruptedException {

final WorkerConfigs workerConfigs = spy(new WorkerConfigs(new EnvConfigs()));
when(workerConfigs.getEnvMap()).thenReturn(Map.of("ENV_VAR_1", "ENV_VALUE_1"));
when(workerConfigs.getEnvMap()).thenReturn(Map.of("ENV_VAR_1", "ENV_VALUE_1"));

final DockerProcessFactory processFactory =
new DockerProcessFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class ContainerOrchestratorConfigBeanFactory {
private static final String DD_ENV_ENV_VAR = "DD_ENV";
private static final String DD_SERVICE_ENV_VAR = "DD_SERVICE";
private static final String DD_VERSION_ENV_VAR = "DD_VERSION";
private static final String DD_SUPPORT_CONNECTOR_NAMES_VAR = "CONNECTOR_DATADOG_SUPPORT_NAMES";
private static final String JAVA_OPTS_ENV_VAR = "JAVA_OPTS";
private static final String PUBLISH_METRICS_ENV_VAR = "PUBLISH_METRICS";
private static final String CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR = "CONTROL_PLANE_AUTH_ENDPOINT";
Expand Down Expand Up @@ -70,6 +71,7 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
@Value("${airbyte.metric.client}") final String metricClient,
@Value("${datadog.agent.host}") final String dataDogAgentHost,
@Value("${datadog.agent.port}") final String dataDogStatsdPort,
@Value("${airbyte.connector.datadog-support-names}") final String datadogSupportConnectors,
@Value("${airbyte.metric.should-publish}") final String shouldPublishMetrics,
final FeatureFlags featureFlags,
@Value("${airbyte.container.orchestrator.java-opts}") final String containerOrchestratorJavaOpts,
Expand All @@ -95,6 +97,7 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
environmentVariables.put(DD_AGENT_HOST_ENV_VAR, dataDogAgentHost);
environmentVariables.put(DD_SERVICE_ENV_VAR, "airbyte-container-orchestrator");
environmentVariables.put(DD_DOGSTATSD_PORT_ENV_VAR, dataDogStatsdPort);
environmentVariables.put(DD_SUPPORT_CONNECTOR_NAMES_VAR, datadogSupportConnectors);
environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics);
environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState()));
environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema()));
Expand Down
1 change: 1 addition & 0 deletions airbyte-workers/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ airbyte:
secret-access-key: ${STATE_STORAGE_S3_SECRET_ACCESS_KEY:}
connector:
specific-resource-defaults-enabled: ${CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED:false}
datadog-support-names: ${CONNECTOR_DATADOG_SUPPORT_NAMES:}
container:
orchestrator:
enabled: ${CONTAINER_ORCHESTRATOR_ENABLED:false}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.datadog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ services:
environment:
<<: *datadogged-environment
DD_SERVICE: airbyte-worker
CONNECTOR_DATADOG_SUPPORT_NAMES: postgres-source
server:
<<: *datadogged-volumes
environment:
Expand Down

0 comments on commit 1346fb9

Please sign in to comment.