Skip to content

Commit

Permalink
Return error when get_consent_url is called with OAuth param override…
Browse files Browse the repository at this point in the history
…s in place (#5922)

Co-authored-by: terencecho <[email protected]>
  • Loading branch information
JonsSpaghetti and terencecho committed Apr 17, 2023
1 parent 93cd829 commit 09f4d66
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public BaseOAuthFlow(final ConfigRepository configRepository) {
protected JsonNode getSourceOAuthParamConfig(final UUID workspaceId, final UUID sourceDefinitionId) throws IOException, ConfigNotFoundException {
try {
final Optional<SourceOAuthParameter> param = MoreOAuthParameters.getSourceOAuthParameter(
configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId);
configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId, true);
if (param.isPresent()) {
// TODO: if we write a flyway migration to flatten persisted configs in db, we don't need to flatten
// here see https://github.com/airbytehq/airbyte/issues/7624
Expand All @@ -59,7 +59,7 @@ protected JsonNode getDestinationOAuthParamConfig(final UUID workspaceId, final
throws IOException, ConfigNotFoundException {
try {
final Optional<DestinationOAuthParameter> param = MoreOAuthParameters.getDestinationOAuthParameter(
configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId);
configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId, true);
if (param.isPresent()) {
// TODO: if we write a migration to flatten persisted configs in db, we don't need to flatten
// here see https://github.com/airbytehq/airbyte/issues/7624
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.persistence.ConfigNotFoundException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand All @@ -30,43 +31,65 @@ public class MoreOAuthParameters {
public static final String SECRET_MASK = "******";

/**
* Get source OAuth param from stream.
* Get source OAuth param from stream. If the boolean is set, throws when there's a row in the
* actor_oauth_params table that corresponds to the definitionId & workspaceId.
*
* @param stream oauth param stream
* @param workspaceId workspace id
* @param sourceDefinitionId source definition id
* @param throwIfOverridePresent Throw if we find an override oauth param?
* @return oauth params
*/
public static Optional<SourceOAuthParameter> getSourceOAuthParameter(
final Stream<SourceOAuthParameter> stream,
public static Optional<SourceOAuthParameter> getSourceOAuthParameter(final Stream<SourceOAuthParameter> stream,
final UUID workspaceId,
final UUID sourceDefinitionId) {
return stream
final UUID sourceDefinitionId,
final boolean throwIfOverridePresent)
throws ConfigNotFoundException {

Optional<SourceOAuthParameter> sourceOAuthParameter = stream
.filter(p -> sourceDefinitionId.equals(p.getSourceDefinitionId()))
.filter(p -> p.getWorkspaceId() == null || workspaceId.equals(p.getWorkspaceId()))
// we prefer params specific to a workspace before global ones (ie workspace is null)
.min(Comparator.comparing(SourceOAuthParameter::getWorkspaceId, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(SourceOAuthParameter::getOauthParameterId));

if (throwIfOverridePresent) {
if (sourceOAuthParameter.filter(param -> param.getWorkspaceId() != null).isPresent()) {
throw new ConfigNotFoundException("OAuthParamOverride", String.format("[%s] [%s]", workspaceId, sourceDefinitionId));
}
}
return sourceOAuthParameter;
}

/**
* Get destination OAuth param from stream.
* Get destination OAuth param from stream. If the boolean is set, throws when there's a row in the
* actor_oauth_params table that corresponds to the definitionId & workspaceId.
*
* @param stream oauth param stream
* @param workspaceId workspace id
* @param destinationDefinitionId destination definition id
* @param throwIfOverridePresent Throw if we find an override oauth param?
* @return oauth params
*/
public static Optional<DestinationOAuthParameter> getDestinationOAuthParameter(
final Stream<DestinationOAuthParameter> stream,
final UUID workspaceId,
final UUID destinationDefinitionId) {
return stream
final UUID destinationDefinitionId,
boolean throwIfOverridePresent)
throws ConfigNotFoundException {
Optional<DestinationOAuthParameter> destinationOAuthParameter = stream
.filter(p -> destinationDefinitionId.equals(p.getDestinationDefinitionId()))
.filter(p -> p.getWorkspaceId() == null || workspaceId.equals(p.getWorkspaceId()))
// we prefer params specific to a workspace before global ones (ie workspace is null)
.min(Comparator.comparing(DestinationOAuthParameter::getWorkspaceId, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(DestinationOAuthParameter::getOauthParameterId));

if (throwIfOverridePresent) {
if (destinationOAuthParameter.filter(param -> param.getWorkspaceId() != null).isPresent()) {
throw new ConfigNotFoundException("OAuthParamOverride", String.format("[%s] [%s]", workspaceId, destinationDefinitionId));
}
}
return destinationOAuthParameter;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,10 @@ void setup() throws JsonValidationException, IOException {
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
.withOauthParameterId(UUID.randomUUID())
.withSourceDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(getOAuthParamConfig())));
when(configRepository.listDestinationOAuthParam()).thenReturn(List.of(new DestinationOAuthParameter()
.withOauthParameterId(UUID.randomUUID())
.withDestinationDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(getOAuthParamConfig())));
}

Expand Down Expand Up @@ -254,12 +252,10 @@ void testGetConsentUrlIncompleteOAuthParameters() throws IOException, JsonValida
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
.withOauthParameterId(UUID.randomUUID())
.withSourceDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(Jsons.emptyObject())));
when(configRepository.listDestinationOAuthParam()).thenReturn(List.of(new DestinationOAuthParameter()
.withOauthParameterId(UUID.randomUUID())
.withDestinationDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(Jsons.emptyObject())));
assertThrows(IllegalArgumentException.class,
() -> oauthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL, getInputOAuthConfiguration(), getoAuthConfigSpecification()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public LowLevelHttpResponse execute() throws IOException {
configRepository = mock(ConfigRepository.class);
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
.withSourceDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(Jsons.jsonNode(ImmutableMap.builder()
.put("client_id", "test_client_id")
.put("client_secret", "test_client_secret")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public JsonNode maskSourceOAuthParameters(final UUID sourceDefinitionId,
try {
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId);
final ActorDefinitionVersion sourceVersion = actorDefinitionVersionHelper.getSourceVersion(sourceDefinition, workspaceId, sourceId);
MoreOAuthParameters.getSourceOAuthParameter(configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId)
MoreOAuthParameters.getSourceOAuthParameter(configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId, false)
.ifPresent(sourceOAuthParameter -> maskOauthParameters(sourceDefinition.getName(), sourceVersion.getSpec(), sourceConnectorConfig));
return sourceConnectorConfig;
} catch (final JsonValidationException | ConfigNotFoundException e) {
Expand All @@ -111,7 +111,8 @@ public JsonNode maskDestinationOAuthParameters(final UUID destinationDefinitionI
final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
final ActorDefinitionVersion destinationVersion =
actorDefinitionVersionHelper.getDestinationVersion(destinationDefinition, workspaceId, destinationId);
MoreOAuthParameters.getDestinationOAuthParameter(configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId)
MoreOAuthParameters.getDestinationOAuthParameter(configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId,
false)
.ifPresent(destinationOAuthParameter -> maskOauthParameters(destinationDefinition.getName(), destinationVersion.getSpec(),
destinationConnectorConfig));
return destinationConnectorConfig;
Expand All @@ -138,7 +139,7 @@ public JsonNode injectSourceOAuthParameters(final UUID sourceDefinitionId,
try {
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId);
final ActorDefinitionVersion sourceVersion = actorDefinitionVersionHelper.getSourceVersion(sourceDefinition, workspaceId, sourceId);
MoreOAuthParameters.getSourceOAuthParameter(configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId)
MoreOAuthParameters.getSourceOAuthParameter(configRepository.listSourceOAuthParam().stream(), workspaceId, sourceDefinitionId, false)
.ifPresent(sourceOAuthParameter -> {
if (injectOAuthParameters(sourceDefinition.getName(), sourceVersion.getSpec(), sourceOAuthParameter.getConfiguration(),
sourceConnectorConfig)) {
Expand Down Expand Up @@ -171,7 +172,8 @@ public JsonNode injectDestinationOAuthParameters(final UUID destinationDefinitio
final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
final ActorDefinitionVersion destinationVersion =
actorDefinitionVersionHelper.getDestinationVersion(destinationDefinition, workspaceId, destinationId);
MoreOAuthParameters.getDestinationOAuthParameter(configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId)
MoreOAuthParameters.getDestinationOAuthParameter(configRepository.listDestinationOAuthParam().stream(), workspaceId, destinationDefinitionId,
false)
.ifPresent(destinationOAuthParameter -> {
if (injectOAuthParameters(destinationDefinition.getName(), destinationVersion.getSpec(), destinationOAuthParameter.getConfiguration(),
destinationConnectorConfig)) {
Expand Down

0 comments on commit 09f4d66

Please sign in to comment.