Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-20993] Namespace and Repository Source control metadata refresh #15641

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefresher;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
Expand Down Expand Up @@ -214,6 +215,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefresher.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -258,6 +260,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefresher.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -315,6 +318,7 @@ protected void configure() {
bind(StorageProviderNamespaceAdmin.class)
.to(DistributedStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefresher.class).in(Scopes.SINGLETON);

bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class)
.in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.cdap.cdap.security.impersonation.UGIProvider;
import io.cdap.cdap.security.spi.authorization.AccessEnforcer;
import io.cdap.cdap.security.spi.authorization.ContextAccessEnforcer;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
import io.cdap.cdap.store.DefaultOwnerStore;
import org.apache.twill.filesystem.LocationFactory;

Expand Down Expand Up @@ -185,6 +186,10 @@ protected void configure() {
bind(MetadataAdmin.class).to(DefaultMetadataAdmin.class);
expose(MetadataAdmin.class);

//TODO(adrika): cleanup this dependency later
// This is needed because there is a transitive dependency on source control operation
// runner which is not actually being used here
install(new SourceControlModule());
bindPreviewRunner(binder());
expose(PreviewRunner.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;
import java.util.List;

/**
* Represents a response containing a list of source control metadata records, next page token and
* last refresh time.
*/

public class ListSourceControlMetadataResponse {

private final List<SourceControlMetadataRecord> apps;
private final String nextPageToken;
private final Long lastRefreshTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use boxed type? Is it nullable? It's better not to use box type if possible. Is there a meaning default value for it that you can use?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its null if the refresh has not been run yet. I think it would be more appropiate to have default value as null in the API response than a actual long value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can ever be null. The value comes from SourceControlMetadataRefresher.getLastRefreshTime, which returns primitive long, and the implementation is defaulting it to 0L


/**
* Constructs a ListSourceControlMetadataResponse object.
*
* @param apps The list of source control metadata records.
* @param nextPageToken The token for fetching the next page of results.
* @param lastRefreshTime The timestamp of the last refresh operation.
*/
public ListSourceControlMetadataResponse(List<SourceControlMetadataRecord> apps,
String nextPageToken, Long lastRefreshTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate parameters that can be null with @Nullable.

this.apps = apps;
this.nextPageToken = nextPageToken;
this.lastRefreshTime = lastRefreshTime;
}

public List<SourceControlMetadataRecord> getApps() {
return apps;
}

public String getNextPageToken() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these getter methods used? If not, no need to define them.

return nextPageToken;
}

public Long getLastRefreshTime() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate with @Nullable

return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;

/**
* Represents a response containing a single source control metadata record and last refresh time.
*/
public class SingleSourceControlMetadataResponse {

private final SourceControlMetadataRecord app;
private final Long lastRefreshTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this nullable? If so, please annotate accordingly.


/**
* Constructs a SingleSourceControlMetadataResponse object.
*
* @param app The source control metadata record.
* @param lastRefreshTime The timestamp of the last refresh operation.
*/
public SingleSourceControlMetadataResponse(SourceControlMetadataRecord app,
Long lastRefreshTime) {
this.app = app;
this.lastRefreshTime = lastRefreshTime;
}

public SourceControlMetadataRecord getApp() {
return app;
}

public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@
* @return a {@link Map} from the {@link ProgramId} to the list of run records; there will be no entry for programs
* that do not exist.
*/
Map<ProgramId, Collection<RunRecordDetail>> getActiveRuns(Collection<ProgramReference> programRefs);

Check warning on line 355 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.OverloadMethodsDeclarationOrderCheck

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '322'.

/**
* Fetches the active (i.e STARTING or RUNNING or SUSPENDED) run records for the
Expand Down Expand Up @@ -495,8 +495,6 @@
int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request,
Consumer<SourceControlMetadataRecord> consumer);

void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash);

/**
* Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import io.cdap.cdap.api.security.AccessException;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.app.store.ApplicationFilter;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanApplicationsRequest;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.app.store.SingleSourceControlMetadataResponse;
import io.cdap.cdap.common.ApplicationNotFoundException;
import io.cdap.cdap.common.ArtifactAlreadyExistsException;
import io.cdap.cdap.common.BadRequestException;
Expand All @@ -43,7 +45,9 @@
import io.cdap.cdap.common.NamespaceNotFoundException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.NotImplementedException;
import io.cdap.cdap.common.RepositoryNotFoundException;
import io.cdap.cdap.common.ServiceException;
import io.cdap.cdap.common.SourceControlMetadataNotFoundException;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
Expand Down Expand Up @@ -129,7 +133,6 @@
* Key in json paginated applications list response.
*/
public static final String APP_LIST_PAGINATED_KEY = "applications";
public static final String APP_LIST_PAGINATED_KEY_SHORT = "apps";

/**
* Runtime program service for running and managing programs.
Expand Down Expand Up @@ -295,7 +298,8 @@
}

/**
* Retrieves all namespace source control metadata for applications within the specified namespace.
* Retrieves all namespace source control metadata for applications within the specified namespace,
* next page token and last refresh time.
*
* @param request The HTTP request containing parameters for retrieving metadata.
* @param responder The HTTP responder for sending the response.
Expand All @@ -319,31 +323,26 @@
@QueryParam("filter") String filter
) throws Exception {
validateNamespace(namespaceId);

JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
jsonListResponder.send(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null : record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check if repoconfig is present here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached;
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
apps::add);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old code we don't buffer the app, but in the new code we do. I remember do removed the buffered to avoid high memory consumption, but now we are bringing the same problem back. Is buffering needed? Seems like we can refactor the JsonPaginatedListResponder so that it can emit extra fields (lastRefreshTime in this case) before closing the json object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can extend JsonPaginatedListResponder to also add additional field.
@chtyim Is it ok to take this up as a fast followup for the change ?

SourceControlMetadataRecord record = apps.isEmpty() ? null :apps.get(apps.size() - 1);

Check warning on line 334 in cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.WhitespaceAroundCheck

WhitespaceAround: ':' is not followed by whitespace. Empty blocks may only be represented as {} when not part of a multi-block statement (4.1.3)
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like unwise to sync all apps, shouldn't we just sync the apps fetched in this page?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are cloning the full repository which would be time taking it might be wasteful to just update few pipelines.

ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime == 0L ? null : lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
* Retrieves the source control metadata for the specified application within the specified namespace.
* Retrieves the source control metadata for the specified application within the specified namespace
* and last refresh time of git pull/push.
*
* @param request The HTTP request containing parameters for retrieving metadata.
* @param responder The HTTP responder for sending the response.
Expand All @@ -355,12 +354,15 @@
@Path("/apps/{app-id}/sourcecontrol")
public void getNamespaceSourceControlMetadata(HttpRequest request, HttpResponder responder,
@PathParam("namespace-id") final String namespaceId,
@PathParam("app-id") final String appName) throws Exception {
@PathParam("app-id") final String appName)
throws BadRequestException, NamespaceNotFoundException,
SourceControlMetadataNotFoundException, RepositoryNotFoundException {
validateApplicationId(namespaceId, appName);

responder.sendJson(HttpResponseStatus.OK,
GSON.toJson(applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName))));
SourceControlMetadataRecord app = applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName));
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
SingleSourceControlMetadataResponse response = new SingleSourceControlMetadataResponse(app, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

private ScanApplicationsRequest getScanRequest(String namespaceId, String artifactVersion,
Expand All @@ -375,7 +377,7 @@
}
if (nameFilter != null && !nameFilter.isEmpty()) {
if (nameFilterType != null) {
switch (nameFilterType) {

Check warning on line 380 in cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.MissingSwitchDefaultCheck

switch without "default" clause.
case EQUALS:
builder.setApplicationReference(new ApplicationReference(namespaceId, nameFilter));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ForbiddenException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
Expand Down Expand Up @@ -58,8 +58,10 @@
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;

Check warning on line 61 in cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.UnusedImportsCheck

Unused import - java.io.IOException.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

Check warning on line 64 in cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.UnusedImportsCheck

Unused import - java.util.concurrent.atomic.AtomicReference.
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand All @@ -76,7 +78,6 @@
private final SourceControlManagementService sourceControlService;
private final FeatureFlagsProvider featureFlagsProvider;
private static final Gson GSON = new Gson();
public static final String APP_LIST_PAGINATED_KEY_SHORT = "apps";
private final int batchSize;

@Inject
Expand Down Expand Up @@ -156,29 +157,21 @@
@QueryParam("filter") String filter) throws Exception {
checkSourceControlFeatureFlag();
validateNamespaceId(namespaceId);
JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
jsonListResponder.send(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (NotFoundException e) {
responder.sendString(HttpResponseStatus.NOT_FOUND, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
samdgupi marked this conversation as resolved.
Show resolved Hide resolved
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached;
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
apps::add);
SourceControlMetadataRecord record = apps.isEmpty() ? null :apps.get(apps.size() - 1);

Check warning on line 168 in cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.WhitespaceAroundCheck

WhitespaceAround: ':' is not followed by whitespace. Empty blocks may only be represented as {} when not part of a multi-block statement (4.1.3)
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId);
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime == 0L ? null : lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataMigrationService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefresher;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.credential.CredentialProviderService;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class AppFabricServer extends AbstractIdleService {
private final RepositoryCleanupService repositoryCleanupService;
private final OperationNotificationSubscriberService operationNotificationSubscriberService;
private final SourceControlMetadataMigrationService sourceControlMetadataMigrationService;
private final SourceControlMetadataRefresher sourceControlMetadataRefresher;
private final CConfiguration cConf;
private final SConfiguration sConf;
private final boolean sslEnabled;
Expand Down Expand Up @@ -137,7 +139,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService,
SourceControlMetadataMigrationService sourceControlMetadataMigrationService) {
SourceControlMetadataMigrationService sourceControlMetadataMigrationService,
SourceControlMetadataRefresher sourceControlMetadataRefresher) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand Down Expand Up @@ -167,6 +170,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.repositoryCleanupService = repositoryCleanupService;
this.operationNotificationSubscriberService = operationNotificationSubscriberService;
this.sourceControlMetadataMigrationService = sourceControlMetadataMigrationService;
this.sourceControlMetadataRefresher = sourceControlMetadataRefresher;
}

/**
Expand Down
Loading
Loading