diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 24c028a0471d..374151c36c4b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -117,6 +117,7 @@ import io.cdap.cdap.internal.app.services.RunRecordCorrectorService; import io.cdap.cdap.internal.app.services.RunRecordMonitorService; import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.DefaultStore; import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules; import io.cdap.cdap.internal.capability.CapabilityModule; @@ -214,6 +215,7 @@ protected void configure() { bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class); bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class); bind(UGIProvider.class).toProvider(UgiProviderProvider.class); + bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON); Multibinder servicesNamesBinder = Multibinder.newSetBinder(binder(), String.class, @@ -258,6 +260,7 @@ protected void configure() { bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class); bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class); bind(UGIProvider.class).toProvider(UgiProviderProvider.class); + bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON); Multibinder servicesNamesBinder = Multibinder.newSetBinder(binder(), String.class, @@ -315,6 +318,7 @@ protected void configure() { bind(StorageProviderNamespaceAdmin.class) .to(DistributedStorageProviderNamespaceAdmin.class); bind(UGIProvider.class).toProvider(UgiProviderProvider.class); + bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON); bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class) .in(Scopes.SINGLETON); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java index f3d8e85c0524..6bd2ee384ca8 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java @@ -75,6 +75,7 @@ import io.cdap.cdap.security.impersonation.UGIProvider; import io.cdap.cdap.security.spi.authorization.AccessEnforcer; import io.cdap.cdap.security.spi.authorization.ContextAccessEnforcer; +import io.cdap.cdap.sourcecontrol.guice.SourceControlModule; import io.cdap.cdap.store.DefaultOwnerStore; import org.apache.twill.filesystem.LocationFactory; @@ -185,6 +186,10 @@ protected void configure() { bind(MetadataAdmin.class).to(DefaultMetadataAdmin.class); expose(MetadataAdmin.class); + //TODO(adrika): cleanup this dependency later + // This is needed because there is a transitive dependency on source control operation + // runner which is not actually being used here + install(new SourceControlModule()); bindPreviewRunner(binder()); expose(PreviewRunner.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/ListSourceControlMetadataResponse.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/ListSourceControlMetadataResponse.java new file mode 100644 index 000000000000..10e991dd3974 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/ListSourceControlMetadataResponse.java @@ -0,0 +1,58 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.app.store; + +import io.cdap.cdap.proto.SourceControlMetadataRecord; +import java.util.List; + +/** + * Represents a response containing a list of source control metadata records, next page token and + * last refresh time. + */ + +public class ListSourceControlMetadataResponse { + + private final List apps; + private final String nextPageToken; + private final Long lastRefreshTime; + + /** + * Constructs a ListSourceControlMetadataResponse object. + * + * @param apps The list of source control metadata records. + * @param nextPageToken The token for fetching the next page of results. + * @param lastRefreshTime The timestamp of the last refresh operation. + */ + public ListSourceControlMetadataResponse(List apps, + String nextPageToken, Long lastRefreshTime) { + this.apps = apps; + this.nextPageToken = nextPageToken; + this.lastRefreshTime = lastRefreshTime; + } + + public List getApps() { + return apps; + } + + public String getNextPageToken() { + return nextPageToken; + } + + public Long getLastRefreshTime() { + return lastRefreshTime; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/SingleSourceControlMetadataResponse.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/SingleSourceControlMetadataResponse.java new file mode 100644 index 000000000000..57a1ac0ff4b4 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/SingleSourceControlMetadataResponse.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.app.store; + +import io.cdap.cdap.proto.SourceControlMetadataRecord; + +/** + * Represents a response containing a single source control metadata record and last refresh time. + */ +public class SingleSourceControlMetadataResponse { + + private final SourceControlMetadataRecord app; + private final Long lastRefreshTime; + + /** + * Constructs a SingleSourceControlMetadataResponse object. + * + * @param app The source control metadata record. + * @param lastRefreshTime The timestamp of the last refresh operation. + */ + public SingleSourceControlMetadataResponse(SourceControlMetadataRecord app, + Long lastRefreshTime) { + this.app = app; + this.lastRefreshTime = lastRefreshTime; + } + + public SourceControlMetadataRecord getApp() { + return app; + } + + public Long getLastRefreshTime() { + return lastRefreshTime; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java index 8c470730e79a..9d4903f35b59 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java @@ -495,8 +495,6 @@ int scanAppSourceControlMetadata(ScanSourceControlMetadataRequest request, int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request, Consumer consumer); - void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash); - /** * Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}. * diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java index 38c968899b61..9a31e36c2b7e 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java @@ -33,8 +33,10 @@ import io.cdap.cdap.api.security.AccessException; import io.cdap.cdap.app.runtime.ProgramRuntimeService; import io.cdap.cdap.app.store.ApplicationFilter; +import io.cdap.cdap.app.store.ListSourceControlMetadataResponse; import io.cdap.cdap.app.store.ScanApplicationsRequest; import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest; +import io.cdap.cdap.app.store.SingleSourceControlMetadataResponse; import io.cdap.cdap.common.ApplicationNotFoundException; import io.cdap.cdap.common.ArtifactAlreadyExistsException; import io.cdap.cdap.common.BadRequestException; @@ -129,7 +131,6 @@ public class AppLifecycleHttpHandler extends AbstractAppLifecycleHttpHandler { * Key in json paginated applications list response. */ public static final String APP_LIST_PAGINATED_KEY = "applications"; - public static final String APP_LIST_PAGINATED_KEY_SHORT = "apps"; /** * Runtime program service for running and managing programs. @@ -319,27 +320,29 @@ public void getAllNamespaceSourceControlMetadata(FullHttpRequest request, HttpRe @QueryParam("filter") String filter ) throws Exception { validateNamespace(namespaceId); - - JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT, - jsonListResponder -> { - AtomicReference lastRecord = new AtomicReference<>(null); - ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest( - namespaceId, - pageToken, pageSize, sortOrder, sortOn, filter); - boolean pageLimitReached = false; - try { - pageLimitReached = applicationLifecycleService.scanSourceControlMetadata( - scanRequest, batchSize, - scmMetaRecord -> { - jsonListResponder.send(scmMetaRecord); - lastRecord.set(scmMetaRecord); - }); - } catch (IOException e) { - responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); - } - SourceControlMetadataRecord record = lastRecord.get(); - return !pageLimitReached || record == null ? null : record.getName(); - }); + List apps = new ArrayList<>(); + AtomicReference lastRecord = new AtomicReference<>(null); + ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest( + namespaceId, + pageToken, pageSize, sortOrder, sortOn, filter); + boolean pageLimitReached = false; + try { + pageLimitReached = applicationLifecycleService.scanSourceControlMetadata( + scanRequest, batchSize, + scmMetaRecord -> { + apps.add(scmMetaRecord); + lastRecord.set(scmMetaRecord); + }); + } catch (IOException e) { + responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + SourceControlMetadataRecord record = lastRecord.get(); + String nextPageToken = !pageLimitReached || record == null ? null : + record.getName(); + Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId); + ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps, + nextPageToken, lastRefreshTime); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response)); } /** @@ -357,10 +360,11 @@ public void getNamespaceSourceControlMetadata(HttpRequest request, HttpResponder @PathParam("namespace-id") final String namespaceId, @PathParam("app-id") final String appName) throws Exception { validateApplicationId(namespaceId, appName); - - responder.sendJson(HttpResponseStatus.OK, - GSON.toJson(applicationLifecycleService.getSourceControlMetadataRecord( - new ApplicationReference(namespaceId, appName)))); + SourceControlMetadataRecord app = applicationLifecycleService.getSourceControlMetadataRecord( + new ApplicationReference(namespaceId, appName)); + Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId); + SingleSourceControlMetadataResponse response = new SingleSourceControlMetadataResponse(app, lastRefreshTime); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response)); } private ScanApplicationsRequest getScanRequest(String namespaceId, String artifactVersion, diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java index 1a0680fdb32a..9b9e87b996bb 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java @@ -21,10 +21,10 @@ import com.google.gson.JsonSyntaxException; import com.google.inject.Inject; import io.cdap.cdap.api.feature.FeatureFlagsProvider; +import io.cdap.cdap.app.store.ListSourceControlMetadataResponse; import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest; import io.cdap.cdap.common.BadRequestException; import io.cdap.cdap.common.ForbiddenException; -import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.conf.Constants.AppFabric; @@ -59,6 +59,8 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -156,29 +158,29 @@ public void listAllApplications(FullHttpRequest request, HttpResponder responder @QueryParam("filter") String filter) throws Exception { checkSourceControlFeatureFlag(); validateNamespaceId(namespaceId); - JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT, - jsonListResponder -> { - AtomicReference lastRecord = new AtomicReference<>(null); - ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest( - namespaceId, - pageToken, pageSize, sortOrder, sortOn, filter); - boolean pageLimitReached = false; - try { - pageLimitReached = sourceControlService.scanRepoMetadata( - scanRequest, batchSize, - record -> { - jsonListResponder.send(record); - lastRecord.set(record); - }); - } catch (IOException e) { - responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); - } catch (NotFoundException e) { - responder.sendString(HttpResponseStatus.NOT_FOUND, e.getMessage()); - } - SourceControlMetadataRecord record = lastRecord.get(); - return !pageLimitReached || record == null ? null : - record.getName(); - }); + List apps = new ArrayList<>(); + AtomicReference lastRecord = new AtomicReference<>(null); + ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest( + namespaceId, + pageToken, pageSize, sortOrder, sortOn, filter); + boolean pageLimitReached = false; + try { + pageLimitReached = sourceControlService.scanRepoMetadata( + scanRequest, batchSize, + record -> { + apps.add(record); + lastRecord.set(record); + }); + } catch (IOException e) { + responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + SourceControlMetadataRecord record = lastRecord.get(); + String nextPageToken = !pageLimitReached || record == null ? null : + record.getName(); + Long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId); + ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps, + nextPageToken, lastRefreshTime); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response)); } /** diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java index 7e42da293add..9c5391cea6df 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java @@ -38,6 +38,7 @@ import io.cdap.cdap.common.security.HttpsEnabler; import io.cdap.cdap.features.Feature; import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataMigrationService; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.AppMetadataStore; import io.cdap.cdap.internal.bootstrap.BootstrapService; import io.cdap.cdap.internal.credential.CredentialProviderService; @@ -97,6 +98,7 @@ public class AppFabricServer extends AbstractIdleService { private final RepositoryCleanupService repositoryCleanupService; private final OperationNotificationSubscriberService operationNotificationSubscriberService; private final SourceControlMetadataMigrationService sourceControlMetadataMigrationService; + private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService; private final CConfiguration cConf; private final SConfiguration sConf; private final boolean sslEnabled; @@ -137,7 +139,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, SourceControlOperationRunner sourceControlOperationRunner, RepositoryCleanupService repositoryCleanupService, OperationNotificationSubscriberService operationNotificationSubscriberService, - SourceControlMetadataMigrationService sourceControlMetadataMigrationService) { + SourceControlMetadataMigrationService sourceControlMetadataMigrationService, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { this.hostname = hostname; this.discoveryService = discoveryService; this.handlers = handlers; @@ -167,6 +170,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, this.repositoryCleanupService = repositoryCleanupService; this.operationNotificationSubscriberService = operationNotificationSubscriberService; this.sourceControlMetadataMigrationService = sourceControlMetadataMigrationService; + this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService; } /** @@ -199,7 +203,8 @@ protected void startUp() throws Exception { sourceControlOperationRunner.start(), repositoryCleanupService.start(), operationNotificationSubscriberService.start(), - sourceControlMetadataMigrationService.start() + sourceControlMetadataMigrationService.start(), + sourceControlMetadataRefreshService.start() )); Futures.allAsList(futuresList).get(); @@ -262,6 +267,7 @@ protected void shutDown() throws Exception { namespaceCredentialProviderService.stopAndWait(); operationNotificationSubscriberService.stopAndWait(); sourceControlMetadataMigrationService.stopAndWait(); + sourceControlMetadataRefreshService.stopAndWait(); } private Cancellable startHttpService(NettyHttpService httpService) throws Exception { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java index a56e32f69c60..1ad1bdf33741 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java @@ -58,6 +58,7 @@ import io.cdap.cdap.common.ArtifactNotFoundException; import io.cdap.cdap.common.BadRequestException; import io.cdap.cdap.common.CannotBeDeletedException; +import io.cdap.cdap.common.ForbiddenException; import io.cdap.cdap.common.InvalidArtifactException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.SourceControlMetadataNotFoundException; @@ -81,6 +82,7 @@ import io.cdap.cdap.internal.app.runtime.artifact.ArtifactDetail; import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository; import io.cdap.cdap.internal.app.runtime.artifact.Artifacts; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.ApplicationMeta; import io.cdap.cdap.internal.app.store.RunRecordDetail; import io.cdap.cdap.internal.app.store.state.AppStateKey; @@ -188,6 +190,7 @@ public class ApplicationLifecycleService extends AbstractIdleService { private final MetricsCollectionService metricsCollectionService; private final FeatureFlagsProvider featureFlagsProvider; private final TransactionRunner transactionRunner; + private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService; /** * Construct the ApplicationLifeCycleService with service factory and cConf coming from guice @@ -203,7 +206,8 @@ public ApplicationLifecycleService(CConfiguration cConf, AccessEnforcer accessEnforcer, AuthenticationContext authenticationContext, MessagingService messagingService, Impersonator impersonator, CapabilityReader capabilityReader, - MetricsCollectionService metricsCollectionService, TransactionRunner transactionRunner) { + MetricsCollectionService metricsCollectionService, TransactionRunner transactionRunner, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { this.cConf = cConf; this.appUpdateSchedules = cConf.getBoolean(Constants.AppFabric.APP_UPDATE_SCHEDULES, Constants.AppFabric.DEFAULT_APP_UPDATE_SCHEDULES); @@ -221,6 +225,7 @@ public ApplicationLifecycleService(CConfiguration cConf, this.authenticationContext = authenticationContext; this.impersonator = impersonator; this.capabilityReader = capabilityReader; + this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService; this.adminEventPublisher = new AdminEventPublisher(cConf, new MultiThreadMessagingContext(messagingService)); this.metricsCollectionService = metricsCollectionService; @@ -319,6 +324,13 @@ public boolean scanApplications(ScanApplicationsRequest request, public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest request, int txBatchSize, Consumer consumer) throws IOException { + NamespaceId namespaceId = new NamespaceId(request.getNamespace()); + + // Triggering source control metadata refresh service + if (isSourceControlMetadataManualRefreshFlagEnabled()) { + sourceControlMetadataRefreshService.runRefreshService(namespaceId); + } + String lastKey = request.getScanAfter(); int currentLimit = request.getLimit(); @@ -342,9 +354,18 @@ public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest reques currentLimit -= txBatchSize; } + return true; } + public Long getLastRefreshTime(String namespace) { + return sourceControlMetadataRefreshService.getLastRefreshTime(new NamespaceId(namespace)); + } + + private boolean isSourceControlMetadataManualRefreshFlagEnabled() { + return Feature.SOURCE_CONTROL_METADATA_MANUAL_REFRESH.isEnabled(featureFlagsProvider); + } + private void processApplications(List> list, Consumer consumer) { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshService.java new file mode 100644 index 000000000000..f64d87bb7e8c --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshService.java @@ -0,0 +1,314 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.services; + +import com.google.common.util.concurrent.AbstractScheduledService; +import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest; +import io.cdap.cdap.common.RepositoryNotFoundException; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.AppFabric; +import io.cdap.cdap.common.conf.Constants.SourceControlManagement; +import io.cdap.cdap.common.namespace.NamespaceAdmin; +import io.cdap.cdap.internal.app.store.NamespaceSourceControlMetadataStore; +import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; +import io.cdap.cdap.proto.SourceControlMetadataRecord; +import io.cdap.cdap.proto.element.EntityType; +import io.cdap.cdap.proto.id.ApplicationReference; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; +import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; +import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; +import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository; +import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; +import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; +import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.TableNotFoundException; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import io.cdap.cdap.store.RepositoryTable; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import javax.inject.Inject; +import org.apache.twill.common.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for refreshing namespace and repository source control metadata of a namespace at + * scheduled intervals and manually. + */ +public class NamespaceSourceControlMetadataRefreshService extends AbstractScheduledService { + + private static final Logger LOG = LoggerFactory.getLogger( + NamespaceSourceControlMetadataRefreshService.class); + private final long runInterval; + private final long bufferTime; + private final TransactionRunner transactionRunner; + private final SourceControlOperationRunner sourceControlOperationRunner; + private final NamespaceId namespaceId; + private final NamespaceAdmin namespaceAdmin; + private final AtomicLong lastRefreshTime; + private final AtomicBoolean running; + private static final Integer MAX_JITTER_SECONDS = 10; + private ScheduledExecutorService executor; + private final int batchSize; + + + /** + * Constructs a {@link NamespaceSourceControlMetadataRefreshService} instance. + */ + @Inject + public NamespaceSourceControlMetadataRefreshService(CConfiguration cConf, + TransactionRunner transactionRunner, + SourceControlOperationRunner sourceControlOperationRunner, + NamespaceId namespaceId, NamespaceAdmin namespaceAdmin) { + this.transactionRunner = transactionRunner; + this.sourceControlOperationRunner = sourceControlOperationRunner; + this.bufferTime = cConf.getLong(SourceControlManagement.METADATA_REFRESH_BUFFER_SECONDS); + this.namespaceId = namespaceId; + this.namespaceAdmin = namespaceAdmin; + this.lastRefreshTime = new AtomicLong(0); + this.running = new AtomicBoolean(false); + this.batchSize = cConf.getInt(AppFabric.STREAMING_BATCH_SIZE); + + // Adding a jitter (a random value between 0 and 10) to the runInterval because if services for all + // the namespaces ran at the same time, it may create availability issues in task workers. + long interval = + cConf.getLong(SourceControlManagement.METADATA_REFRESH_INTERVAL_SECONDS); + if (interval > 0) { + Random random = new Random(); + this.runInterval = interval + random.nextInt( + MAX_JITTER_SECONDS + 1); + } else { + this.runInterval = interval; + } + } + + private RepositorySourceControlMetadataStore getRepoSourceControlMetadataStore( + StructuredTableContext context) { + return RepositorySourceControlMetadataStore.create(context); + } + + private NamespaceSourceControlMetadataStore getNamespaceSourceControlMetadataStore( + StructuredTableContext context) { + return NamespaceSourceControlMetadataStore.create(context); + } + + @Override + protected final ScheduledExecutorService executor() { + executor = Executors.newSingleThreadScheduledExecutor( + Threads.createDaemonThreadFactory("source-control-metadata-refresh-service-for-namespace-" + + namespaceId.getNamespace())); + return executor; + } + + private RepositoryTable getRepositoryTable(StructuredTableContext context) + throws TableNotFoundException { + return new RepositoryTable(context); + } + + + // TODO(CDAP-21020): Refactor this function + private RepositoryMeta getRepositoryMeta(NamespaceId namespace) + throws RepositoryNotFoundException { + return TransactionRunners.run(transactionRunner, context -> { + RepositoryTable table = getRepositoryTable(context); + RepositoryMeta repoMeta = table.get(namespace); + if (repoMeta == null) { + throw new RepositoryNotFoundException(namespace); + } + + return repoMeta; + }, RepositoryNotFoundException.class); + } + + // TODO(CDAP-21017): Optimize periodic refresh of source control metadata + @Override + public void runOneIteration() { + if (!running.compareAndSet(false, true)) { + return; + } + runRefreshService(false); + } + + /** + * Triggers a manual refresh of source control metadata for the associated namespace. + * + * @param forced If true, forces the refresh by skipping the check whether the time elapsed since + * the last refresh is within the buffer time. + */ + public void triggerManualRefresh(boolean forced) { + if (!running.compareAndSet(false, true)) { + return; + } + executor().execute(() -> runRefreshService(forced)); + } + + private void runRefreshService(boolean forced) { + running.set(true); + Long refreshStartTime = System.currentTimeMillis(); + + try { + + if (!namespaceAdmin.exists(namespaceId)) { + this.stop(); + } + + // Check if the time elapsed since the last refresh is within the buffer time. + // This is done to prevent a high number of refresh jobs from being queued up + // unnecessarily. + if (!forced && System.currentTimeMillis() - lastRefreshTime.get() < bufferTime * 1000) { + return; + } + + // Getting repository config for a specific namespace + RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig(); + + // Listing the apps from remote repo + RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list( + new NamespaceRepository(namespaceId, repoConfig)); + + // Cleaning up the repo source control metadata table + HashSet repoFileNames = repositoryAppsResponse.getApps().stream() + .map(RepositoryApp::getName) + .collect(Collectors.toCollection(HashSet::new)); + + cleanupRepoSourceControlMetaInBatches(namespaceId.getNamespace(), repoFileNames); + + // Updating the namespace and repo source control metadata table + for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { + updateSourceControlMeta( + new ApplicationReference(namespaceId.getNamespace(), repoApp.getName()), + repoApp.getFileHash(), + refreshStartTime); + } + + lastRefreshTime.set(refreshStartTime); + + } catch (Exception e) { + LOG.error("Failed to refresh source control metadata for namespace " + + namespaceId.getNamespace(), e); + } finally { + running.set(false); + } + } + + public Long getLastRefreshTime() { + return this.lastRefreshTime.get(); + } + + @Override + protected void startUp() throws Exception { + LOG.info("Starting SourceControlMetadataRefreshService for namespace " + + namespaceId.getNamespace() + " with interval " + runInterval + " seconds"); + } + + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedRateSchedule(1, runInterval, TimeUnit.SECONDS); + } + + @Override + protected void shutDown() throws Exception { + if (executor != null) { + executor.shutdownNow(); + } + } + + private void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash, + Long refreshStartTime) { + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore repoMetadataStore = getRepoSourceControlMetadataStore( + context); + NamespaceSourceControlMetadataStore namespaceMetadataStore = + getNamespaceSourceControlMetadataStore(context); + SourceControlMeta namespaceSourceControlMeta = namespaceMetadataStore.get(appRef); + + if (namespaceSourceControlMeta == null) { + repoMetadataStore.write(appRef, false, 0L); + return; + } + + if (namespaceSourceControlMeta.getLastSyncedAt() != null + && namespaceSourceControlMeta.getLastSyncedAt().isAfter( + Instant.ofEpochMilli(refreshStartTime))) { + repoMetadataStore.write(appRef, + Boolean.TRUE.equals(namespaceSourceControlMeta.getSyncStatus()), + namespaceSourceControlMeta.getLastSyncedAt().toEpochMilli()); + return; + } + boolean isSynced = repoFileHash.equals(namespaceSourceControlMeta.getFileHash()); + namespaceMetadataStore.write(appRef, + SourceControlMeta.builder(namespaceSourceControlMeta).setSyncStatus(isSynced).build()); + repoMetadataStore.write(appRef, + isSynced, namespaceSourceControlMeta.getLastSyncedAt() == null ? 0L : + namespaceSourceControlMeta.getLastSyncedAt().toEpochMilli()); + }); + } + + private void cleanupRepoSourceControlMetaInBatches(String namespace, HashSet repoFiles) { + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore store = getRepoSourceControlMetadataStore(context); + ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest + .builder() + .setNamespace(namespace) + .setLimit(Integer.MAX_VALUE) + .build(); + ArrayList records = new ArrayList<>(); + String type = EntityType.APPLICATION.toString(); + + store.scan(request, type, record -> { + records.add(record); + if (records.size() == batchSize) { + try { + cleanUpRecords(store, repoFiles, records); + } catch (IOException e) { + throw new RuntimeException(e); + } + records.clear(); + } + }); + + if (!records.isEmpty()) { + cleanUpRecords(store, repoFiles, records); + } + }); + } + + + private void cleanUpRecords(RepositorySourceControlMetadataStore store, HashSet repoFiles, + List records) + throws IOException { + for (SourceControlMetadataRecord record : records) { + if (!repoFiles.contains(record.getName())) { + store.delete(new ApplicationReference(record.getNamespace(), record.getName())); + } + } + } + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java index 11d23fbfb304..899160c44c8f 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.feature.FeatureFlagsProvider; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.api.metrics.MetricsContext; import io.cdap.cdap.api.security.store.SecureStore; @@ -33,9 +34,12 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants.Metrics.SourceControlManagement; import io.cdap.cdap.common.conf.Constants.Metrics.Tag; +import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider; +import io.cdap.cdap.features.Feature; import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.proto.ApplicationDetail; @@ -64,14 +68,11 @@ import io.cdap.cdap.sourcecontrol.RepositoryManager; import io.cdap.cdap.sourcecontrol.SourceControlConfig; import io.cdap.cdap.sourcecontrol.SourceControlException; -import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository; import io.cdap.cdap.sourcecontrol.operationrunner.PullAppOperationRequest; import io.cdap.cdap.sourcecontrol.operationrunner.PullAppResponse; import io.cdap.cdap.sourcecontrol.operationrunner.PushAppMeta; import io.cdap.cdap.sourcecontrol.operationrunner.PushAppOperationRequest; import io.cdap.cdap.sourcecontrol.operationrunner.PushAppsResponse; -import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; -import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.TableNotFoundException; @@ -104,10 +105,13 @@ public class SourceControlManagementService { private final OperationLifecycleManager operationLifecycleManager; private final MetricsCollectionService metricsCollectionService; private final Clock clock; + private final FeatureFlagsProvider featureFlagsProvider; + private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService; private static final Logger LOG = LoggerFactory.getLogger(SourceControlManagementService.class); /** + /** * Constructor for SourceControlManagementService with all params injected via guice. */ @Inject @@ -120,11 +124,13 @@ public SourceControlManagementService(CConfiguration cConf, ApplicationLifecycleService applicationLifecycleService, Store store, OperationLifecycleManager operationLifecycleManager, - MetricsCollectionService metricsCollectionService) { + MetricsCollectionService metricsCollectionService, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { this (cConf, secureStore, transactionRunner, accessEnforcer, authenticationContext, sourceControlOperationRunner, applicationLifecycleService, - store, operationLifecycleManager, metricsCollectionService, Clock.systemUTC()); + store, operationLifecycleManager, metricsCollectionService, Clock.systemUTC(), + sourceControlMetadataRefreshService); } @VisibleForTesting @@ -138,7 +144,8 @@ public SourceControlManagementService(CConfiguration cConf, Store store, OperationLifecycleManager operationLifecycleManager, MetricsCollectionService metricsCollectionService, - Clock clock) { + Clock clock, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { this.cConf = cConf; this.secureStore = secureStore; this.transactionRunner = transactionRunner; @@ -150,6 +157,8 @@ public SourceControlManagementService(CConfiguration cConf, this.operationLifecycleManager = operationLifecycleManager; this.metricsCollectionService = metricsCollectionService; this.clock = clock; + this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); + this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService; } private RepositoryTable getRepositoryTable(StructuredTableContext context) @@ -188,6 +197,7 @@ public RepositoryMeta setRepository(NamespaceId namespace, RepositoryConfig repo RepositoryTable repoTable = getRepositoryTable(context); repoTable.create(namespace, repository); + sourceControlMetadataRefreshService.addRefreshService(namespace); return repoTable.get(namespace); }, NamespaceNotFoundException.class); } @@ -204,7 +214,9 @@ public void deleteRepository(NamespaceId namespace) { TransactionRunners.run(transactionRunner, context -> { RepositoryTable repoTable = getRepositoryTable(context); repoTable.delete(namespace); + sourceControlMetadataRefreshService.removeRefreshService(namespace); }); + } /** @@ -392,38 +404,17 @@ private PullAppResponse pullAndValidateApplication(ApplicationReference appRe * @param txBatchSize The transaction batch size for processing metadata in each iteration. * @param consumer The consumer to process the scanned repository metadata records. * @return True if the page limit has reached, false otherwise. - * @throws NotFoundException If the requested repository metadata is not found. - * @throws AuthenticationConfigException If an authentication configuration error occurs. - * @throws IOException If an I/O error occurs during the scanning process. + * @throws IOException If an I/O error occurs during the scanning process. */ public boolean scanRepoMetadata(ScanSourceControlMetadataRequest request, int txBatchSize, - Consumer consumer) throws NotFoundException, - AuthenticationConfigException, IOException { + Consumer consumer) throws IOException { NamespaceId namespaceId = new NamespaceId(request.getNamespace()); - accessEnforcer.enforce(namespaceId, authenticationContext.getPrincipal(), - NamespacePermission.READ_REPOSITORY); - RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig(); - // TODO(CDAP-20993): List API is used here for testing. It will be moved to a separate background job in the next PR - RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list( - new NamespaceRepository(namespaceId, repoConfig)); - LOG.debug("Successfully received apps in namespace {} from repository : response: {}", - namespaceId, - repositoryAppsResponse); - // Cleaning up the repo source control metadata table - HashSet repoFileNames = new HashSet<>(); - for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { - repoFileNames.add(repoApp.getName()); - } - TransactionRunners.run(transactionRunner, context -> { - getRepoSourceControlMetadataStore(context).cleanupRepoSourceControlMeta( - namespaceId.getNamespace(), repoFileNames); - }); - // Updating the namespace and repo source control metadata table - for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { - store.updateSourceControlMeta( - new ApplicationReference(request.getNamespace(), - repoApp.getName()), repoApp.getFileHash()); + + // Triggering source control metadata refresh service + if (isSourceControlMetadataManualRefreshFlagEnabled()) { + sourceControlMetadataRefreshService.runRefreshService(namespaceId); } + // Getting repo files String lastKey = request.getScanAfter(); int currentLimit = request.getLimit(); @@ -447,9 +438,14 @@ public boolean scanRepoMetadata(ScanSourceControlMetadataRequest request, int tx } currentLimit -= txBatchSize; } + return true; } + public Long getLastRefreshTime(String namespace) { + return sourceControlMetadataRefreshService.getLastRefreshTime(new NamespaceId(namespace)); + } + /** * The method to push multiple applications in the same namespace to the linked repository. * @@ -504,4 +500,8 @@ private MetricsContext getMetricContext(NamespaceId namespace) { return metricsCollectionService.getContext( ImmutableMap.of(Tag.NAMESPACE, namespace.getNamespace())); } + + private boolean isSourceControlMetadataManualRefreshFlagEnabled() { + return Feature.SOURCE_CONTROL_METADATA_MANUAL_REFRESH.isEnabled(featureFlagsProvider); + } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java new file mode 100644 index 000000000000..3cfb11defcbd --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java @@ -0,0 +1,172 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.sourcecontrol; + +import com.google.common.util.concurrent.AbstractIdleService; +import io.cdap.cdap.api.feature.FeatureFlagsProvider; +import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider; +import io.cdap.cdap.common.namespace.NamespaceAdmin; +import io.cdap.cdap.features.Feature; +import io.cdap.cdap.internal.app.services.NamespaceSourceControlMetadataRefreshService; +import io.cdap.cdap.proto.NamespaceMeta; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; +import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.TableNotFoundException; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import io.cdap.cdap.store.RepositoryTable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class manages a mapping of namespaces with repository configurations to their respective + * NamespaceSourceControlMetadataRefreshService scheduler instances. + */ +public class SourceControlMetadataRefreshService extends AbstractIdleService { + + private ConcurrentMap + refreshSchedulers = new ConcurrentHashMap<>(); + private final TransactionRunner transactionRunner; + private final NamespaceAdmin namespaceAdmin; + private final CConfiguration cConf; + private final SourceControlOperationRunner sourceControlOperationRunner; + private static final Logger LOG = LoggerFactory.getLogger( + SourceControlMetadataRefreshService.class); + private final FeatureFlagsProvider featureFlagsProvider; + + /** + * Constructs a SourceControlMetadataRefreshService instance. + */ + @Inject + public SourceControlMetadataRefreshService(TransactionRunner transactionRunner, + NamespaceAdmin namespaceAdmin, CConfiguration cConf, + SourceControlOperationRunner sourceControlOperationRunner, + Store store) { + this.transactionRunner = transactionRunner; + this.namespaceAdmin = namespaceAdmin; + this.cConf = cConf; + this.sourceControlOperationRunner = sourceControlOperationRunner; + this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); + } + + @Override + protected void startUp() throws Exception { + if (!isSourceControlMetadataPeriodicRefreshFlagEnabled()) { + return; + } + LOG.info("Starting SourceControlMetadataRefreshService"); + + namespaceAdmin.list().stream() + .map(NamespaceMeta::getNamespaceId) + .filter(namespaceId -> getRepositoryMeta(namespaceId) != null) + .forEach(this::addRefreshService); + } + + @Override + protected void shutDown() throws Exception { + LOG.info("Shutting down SourceControlMetadataRefreshService"); + refreshSchedulers.values().forEach(NamespaceSourceControlMetadataRefreshService::stopAndWait); + } + + private NamespaceSourceControlMetadataRefreshService createRefreshService(NamespaceId namespace) { + return new NamespaceSourceControlMetadataRefreshService(cConf, transactionRunner, + sourceControlOperationRunner, namespace, namespaceAdmin); + } + + /** + * Adds {@link NamespaceSourceControlMetadataRefreshService} for the specified namespace. If not + * created before, it starts the service or else, triggers a forced manual refresh + * + * @param namespaceId The ID of the namespace for which to add the refresh service. + */ + public void addRefreshService(NamespaceId namespaceId) { + refreshSchedulers.putIfAbsent(namespaceId, createRefreshService(namespaceId)); + if (isSourceControlMetadataPeriodicRefreshFlagEnabled()) { + refreshSchedulers.get(namespaceId).start(); + LOG.debug("Added Scheduled NamespaceSourceControlMetadataRefreshService for " + + namespaceId.getNamespace()); + } + } + + /** + * Stops and removes the refresh service associated with the specified namespace. + * + * @param namespaceId The ID of the namespace for which to remove the refresh service. + */ + public void removeRefreshService(NamespaceId namespaceId) { + refreshSchedulers.computeIfPresent(namespaceId, (id, service) -> { + service.stop(); + return null; + }); + } + + /** + * Retrieves the timestamp of the last refresh for the specified namespace. + * + * @param namespaceId The ID of the namespace for which to retrieve the last refresh time. + * @return The timestamp of the last refresh + */ + public Long getLastRefreshTime(NamespaceId namespaceId) { + NamespaceSourceControlMetadataRefreshService service = refreshSchedulers.get(namespaceId); + if (service != null) { + return service.getLastRefreshTime(); + } + return null; + } + + /** + * Initiates a manual refresh for the specified namespace. If a refresh service exists for the + * namespace, triggers an unforced manual refresh. If no refresh service exists, adds a new + * refresh service and starts it. This is triggered when the user calls the list API for namespace + * and repository source control metadata. + * + * @param namespaceId The ID of the namespace for which to run the refresh service. + */ + public void runRefreshService(NamespaceId namespaceId) { + NamespaceSourceControlMetadataRefreshService service = refreshSchedulers.get(namespaceId); + if (service != null) { + refreshSchedulers.get(namespaceId).triggerManualRefresh(false); + } else { + addRefreshService(namespaceId); + } + } + + private RepositoryMeta getRepositoryMeta(NamespaceId namespace) { + return TransactionRunners.run(transactionRunner, context -> { + RepositoryTable table = getRepositoryTable(context); + RepositoryMeta repoMeta = table.get(namespace); + return repoMeta; + }); + } + + private RepositoryTable getRepositoryTable(StructuredTableContext context) + throws TableNotFoundException { + return new RepositoryTable(context); + } + + private boolean isSourceControlMetadataPeriodicRefreshFlagEnabled() { + return Feature.SOURCE_CONTROL_METADATA_PERIODIC_REFRESH.isEnabled(featureFlagsProvider); + } + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java index 473de3a2a1cc..e1c1d86d023f 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java @@ -44,6 +44,7 @@ import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.ProgramNotFoundException; import io.cdap.cdap.common.lang.FunctionWithException; +import io.cdap.cdap.common.utils.ImmutablePair; import io.cdap.cdap.data2.dataset2.DatasetFramework; import io.cdap.cdap.internal.app.ForwardingApplicationSpecification; import io.cdap.cdap.internal.app.store.state.AppStateKey; @@ -76,6 +77,7 @@ import io.cdap.cdap.spi.data.transaction.TransactionRunners; import io.cdap.cdap.store.StoreDefinition; import java.io.IOException; +import java.time.Instant; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -617,9 +619,22 @@ public void markApplicationsLatest(Collection appIds) @Override public int addLatestApplication(ApplicationId id, ApplicationMeta meta) throws ConflictException { return TransactionRunners.run(transactionRunner, context -> { + ApplicationReference appRef = id.getAppReference(); + RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); getNamespaceSourceControlMetadataStore(context).write(id.getAppReference(), meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta() : SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true).build()); + + if (meta.getSourceControlMeta() != null) { + Instant lastSyncedAt = meta.getSourceControlMeta().getLastSyncedAt(); + repoStore.write(appRef, true, lastSyncedAt == null ? null : + lastSyncedAt.toEpochMilli()); + } + else{ + if(repoStore.get(appRef) != null) { + repoStore.writeSyncStatus(appRef, false); + } + } return getAppMetadataStore(context).createLatestApplicationVersion(id, meta); }, ConflictException.class); } @@ -628,9 +643,25 @@ public int addLatestApplication(ApplicationId id, ApplicationMeta meta) throws C public int addApplication(ApplicationId id, ApplicationMeta meta, boolean isLatest) throws ConflictException { return TransactionRunners.run(transactionRunner, context -> { - getNamespaceSourceControlMetadataStore(context).write(id.getAppReference(), - meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta() - : SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true).build()); + ApplicationReference appRef = id.getAppReference(); + if (isLatest) { + RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); + getNamespaceSourceControlMetadataStore(context).write(appRef, + meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta() + : SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true) + .build()); + + if (meta.getSourceControlMeta() != null) { + Instant lastSyncedAt = meta.getSourceControlMeta().getLastSyncedAt(); + repoStore.write(appRef, true, lastSyncedAt == null ? null : + lastSyncedAt.toEpochMilli()); + } + else{ + if(repoStore.get(appRef) != null) { + repoStore.writeSyncStatus(appRef, false); + } + } + } return getAppMetadataStore(context).createApplicationVersion(id, meta, isLatest); }, ConflictException.class); } @@ -640,14 +671,19 @@ public void updateApplicationSourceControlMeta( Map updateRequests) throws IOException { TransactionRunners.run(transactionRunner, context -> { - NamespaceSourceControlMetadataStore sourceControlMetadataStore = getNamespaceSourceControlMetadataStore( - context); + NamespaceSourceControlMetadataStore sourceControlMetadataStore = + getNamespaceSourceControlMetadataStore(context); + RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); AppMetadataStore appMetadataStore = getAppMetadataStore(context); for (Map.Entry updateRequest : updateRequests.entrySet()) { ApplicationId appId = updateRequest.getKey(); if (appMetadataStore.getApplication(appId) != null) { sourceControlMetadataStore.write(appId.getAppReference(), SourceControlMeta.builder(updateRequest.getValue()).setSyncStatus(true).build()); + repoStore.write(appId.getAppReference(), true, + updateRequest.getValue().getLastSyncedAt() == null ? 0L : + updateRequest.getValue().getLastSyncedAt().toEpochMilli()); + } } }, IOException.class); @@ -954,10 +990,15 @@ public Map getApplications(Collection { getNamespaceSourceControlMetadataStore(context).write(appRef, SourceControlMeta.builder(sourceControlMeta).setSyncStatus(true).build()); + RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); + repoStore.write(appRef, true, sourceControlMeta.getLastSyncedAt() == null ? null : + sourceControlMeta.getLastSyncedAt().toEpochMilli()); + }); } @@ -992,33 +1033,6 @@ public ApplicationMeta getLatest(ApplicationReference appRef) { }); } - @Override - public void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash) { - TransactionRunners.run(transactionRunner, context -> { - RepositorySourceControlMetadataStore repoMetadataStore = getRepoSourceControlMetadataStore( - context); - NamespaceSourceControlMetadataStore namespaceMetadataStore = getNamespaceSourceControlMetadataStore( - context); - SourceControlMeta namespaceSourceControlMeta = namespaceMetadataStore.get(appRef); - - if (namespaceSourceControlMeta == null || namespaceSourceControlMeta.getFileHash() == null) { - repoMetadataStore.write(appRef, false, 0L); - return; - } - Boolean isSynced = namespaceSourceControlMeta.getFileHash().equals(repoFileHash); - if (isSynced) { - namespaceMetadataStore.write(appRef, - new SourceControlMeta(namespaceSourceControlMeta.getFileHash(), - namespaceSourceControlMeta.getCommitId(), - namespaceSourceControlMeta.getLastSyncedAt(), isSynced)); - repoMetadataStore.write(appRef, - isSynced, namespaceSourceControlMeta.getLastSyncedAt().toEpochMilli()); - } else { - repoMetadataStore.write(appRef, isSynced, 0L); - } - }); - } - @Override public int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request, Consumer consumer) { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/NamespaceSourceControlMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/NamespaceSourceControlMetadataStore.java index 285f8e6239a2..2939f279fce4 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/NamespaceSourceControlMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/NamespaceSourceControlMetadataStore.java @@ -371,4 +371,23 @@ void deleteNamespaceSourceControlMetadataTable() throws IOException { Range.Bound.INCLUSIVE)); } + /** + * Retrieves all source control metadata records for the specified namespace and type. + * + * @param namespace The namespace to retrieve records from. + * @param type The type of records to retrieve. + * @return A list of metadata records. + * @throws IOException If an I/O error occurs. + */ + @VisibleForTesting + public List getAll(String namespace, String type) + throws IOException { + ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest.builder() + .setNamespace(namespace).build(); + List records = new ArrayList<>(); + scan(request, type, records::add); + return records; + } + + } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStore.java index e140de59e2b5..492dab2bfe2a 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStore.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.function.Consumer; @@ -151,12 +150,24 @@ public int scan(ScanSourceControlMetadataRequest request, * @param lastSynced The timestamp of the last git pull/push of the entity * @throws IOException If an I/O error occurs while writing the source control metadata. */ - public void write(ApplicationReference appRef, Boolean isSynced, + public void write(ApplicationReference appRef, boolean isSynced, Long lastSynced) throws IOException { StructuredTable repoTable = getRepositorySourceControlMetadataTable(); repoTable.upsert(getAllFields(appRef, isSynced, lastSynced)); } + /** + * Writes the sync status for the specified application reference. + * + * @param appRef The application reference for which synchronization status is to be written. + * @param isSynced A boolean indicating whether the application is synced or not. + * @throws IOException If an I/O error occurs while writing the synchronization status. + */ + public void writeSyncStatus(ApplicationReference appRef, boolean isSynced) throws IOException { + SourceControlMetadataRecord record = get(appRef); + write(appRef, isSynced, record.getLastModified()); + } + /** * Deletes the source control metadata associated with the specified repository metadata ID. * @@ -169,32 +180,6 @@ public void delete(ApplicationReference appRef) throws IOException { getPrimaryKey(appRef)); } - /** - * Cleans up repository source control metadata. This method removes metadata records for - * repository files that are no longer present in the repository. - * - * @param namespace The namespace for which to clean up the repository source control metadata. - * @param repoFiles The set of repository files to retain metadata records for. - * @throws IOException If an I/O error occurs during the cleanup process. - */ - public void cleanupRepoSourceControlMeta(String namespace, HashSet repoFiles) - throws IOException { - ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest - .builder() - .setNamespace(namespace) - .setLimit(Integer.MAX_VALUE) - .build(); - ArrayList records = new ArrayList<>(); - String type = EntityType.APPLICATION.toString(); - scan(request, type, records::add); - for (SourceControlMetadataRecord record : records) { - if (!repoFiles.contains(record.getName())) { - delete( - new ApplicationReference(record.getNamespace(), record.getName())); - } - } - } - private CloseableIterator getScanIterator( StructuredTable table, Range range, @@ -220,8 +205,8 @@ private SourceControlMetadataRecord decodeRow(StructuredRow row) { } Boolean isSynced = row.getBoolean( StoreDefinition.RepositorySourceControlMetadataStore.IS_SYNCED_FIELD); - return new SourceControlMetadataRecord(namespace, type, name, null, null, lastModified, - isSynced); + return new SourceControlMetadataRecord(namespace, type, name, null, null, + lastModified, isSynced); } private Collection> getAllFields(ApplicationReference appRef, @@ -229,7 +214,7 @@ private Collection> getAllFields(ApplicationReference appRef, List> fields = getPrimaryKey(appRef); fields.add( Fields.longField(StoreDefinition.RepositorySourceControlMetadataStore.LAST_MODIFIED_FIELD, - lastSynced)); + lastSynced == null ? 0L : lastSynced)); fields.add( Fields.booleanField(StoreDefinition.RepositorySourceControlMetadataStore.IS_SYNCED_FIELD, isSynced)); @@ -246,11 +231,11 @@ private List> getRangeFields(ScanSourceControlMetadataRequest request, fields.add( Fields.stringField(StoreDefinition.RepositorySourceControlMetadataStore.TYPE_FIELD, type)); if (request.getSortOn() == SortBy.LAST_SYNCED_AT) { - ImmutablePair lastModifiedAndStatusPair = get( + SourceControlMetadataRecord record = get( new ApplicationReference(request.getNamespace(), request.getScanAfter())); fields.add(Fields.longField( StoreDefinition.RepositorySourceControlMetadataStore.LAST_MODIFIED_FIELD, - lastModifiedAndStatusPair.getFirst())); + record.getLastModified())); } fields.add(Fields.stringField(StoreDefinition.RepositorySourceControlMetadataStore.NAME_FIELD, request.getScanAfter())); @@ -271,17 +256,18 @@ private List> getPrimaryKey(ApplicationReference appRef) { return primaryKey; } - @VisibleForTesting - ImmutablePair get(ApplicationReference appRef) throws IOException { + /** + * Retrieves sync status and last modified for the specified {@link ApplicationReference}. + * + * @param appRef The application reference to retrieve metadata for. + * @return An {@link ImmutablePair} containing the lastSynced timestamp and sync status, + * or null if metadata is not found. + * @throws IOException If an I/O error occurs. + */ + public SourceControlMetadataRecord get(ApplicationReference appRef) throws IOException { List> primaryKey = getPrimaryKey(appRef); StructuredTable table = getRepositorySourceControlMetadataTable(); - return table.read(primaryKey).map(row -> { - Long lastSynced = row.getLong( - StoreDefinition.RepositorySourceControlMetadataStore.LAST_MODIFIED_FIELD); - Boolean syncStatus = row.getBoolean( - StoreDefinition.RepositorySourceControlMetadataStore.IS_SYNCED_FIELD); - return new ImmutablePair(lastSynced, syncStatus); - }).orElse(null); + return table.read(primaryKey).map(this::decodeRow).orElse(null); } /** @@ -297,4 +283,22 @@ public void deleteAll(String namespace) throws IOException { ImmutableList.of( Fields.stringField(StoreDefinition.AppMetadataStore.NAMESPACE_FIELD, namespace)))); } + + /** + * Retrieves all source control metadata records for the specified namespace and type. + * + * @param namespace The namespace to retrieve records from. + * @param type The type of records to retrieve. + * @return A list of metadata records. + * @throws IOException If an I/O error occurs. + */ + @VisibleForTesting + public List getAll(String namespace, String type) + throws IOException { + ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest.builder() + .setNamespace(namespace).build(); + List records = new ArrayList<>(); + scan(request, type, records::add); + return records; + } } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshServiceTest.java new file mode 100644 index 000000000000..f34b35b110d4 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshServiceTest.java @@ -0,0 +1,314 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.services; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.Injector; +import io.cdap.cdap.common.NotFoundException; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.SourceControlManagement; +import io.cdap.cdap.common.id.Id; +import io.cdap.cdap.common.id.Id.Namespace; +import io.cdap.cdap.common.namespace.NamespaceAdmin; +import io.cdap.cdap.internal.AppFabricTestHelper; +import io.cdap.cdap.internal.app.store.NamespaceSourceControlMetadataStore; +import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; +import io.cdap.cdap.proto.SourceControlMetadataRecord; +import io.cdap.cdap.proto.element.EntityType; +import io.cdap.cdap.proto.id.ApplicationReference; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.sourcecontrol.AuthConfig; +import io.cdap.cdap.proto.sourcecontrol.AuthType; +import io.cdap.cdap.proto.sourcecontrol.PatConfig; +import io.cdap.cdap.proto.sourcecontrol.Provider; +import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; +import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; +import io.cdap.cdap.sourcecontrol.ApplicationManager; +import io.cdap.cdap.sourcecontrol.AuthenticationConfigException; +import io.cdap.cdap.sourcecontrol.NoChangesToPushException; +import io.cdap.cdap.sourcecontrol.operationrunner.MultiPullAppOperationRequest; +import io.cdap.cdap.sourcecontrol.operationrunner.MultiPushAppOperationRequest; +import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository; +import io.cdap.cdap.sourcecontrol.operationrunner.PullAppOperationRequest; +import io.cdap.cdap.sourcecontrol.operationrunner.PullAppResponse; +import io.cdap.cdap.sourcecontrol.operationrunner.PushAppOperationRequest; +import io.cdap.cdap.sourcecontrol.operationrunner.PushAppsResponse; +import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; +import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; +import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import io.cdap.cdap.store.RepositoryTable; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; + +public class NamespaceSourceControlMetadataRefreshServiceTest { + + private static TransactionRunner transactionRunner; + private static CConfiguration cConf; + private static NamespaceAdmin namespaceAdmin; + private static NamespaceId NAMESPACE_ID = new NamespaceId(Id.Namespace.DEFAULT.getId()); + private static RepositoryConfig REPO_CONFIG = new RepositoryConfig.Builder().setProvider( + Provider.GITHUB) + .setLink("testUrl") + .setAuth(new AuthConfig(AuthType.PAT, new PatConfig("password", "user"))) + .build(); + private static NamespaceSourceControlMetadataRefreshService refreshService; + private static final String TYPE = EntityType.APPLICATION.toString(); + + @BeforeClass + public static void beforeClass() throws Exception { + Injector injector = AppFabricTestHelper.getInjector(); + AppFabricTestHelper.ensureNamespaceExists(NAMESPACE_ID.DEFAULT); + transactionRunner = injector.getInstance(TransactionRunner.class); + cConf = injector.getInstance(CConfiguration.class); + namespaceAdmin = injector.getInstance(NamespaceAdmin.class); + cConf.set(SourceControlManagement.METADATA_REFRESH_INTERVAL_SECONDS, "300"); + cConf.set(SourceControlManagement.METADATA_REFRESH_BUFFER_SECONDS, "120"); + refreshService = new NamespaceSourceControlMetadataRefreshService( + cConf, transactionRunner, sourceControlOperationRunnerSpy, NAMESPACE_ID, + namespaceAdmin); + } + + private static final Instant fixedInstant = Instant.ofEpochSecond(1646358109); + private static final Instant instantGreaterThanStartRefresh = Instant.ofEpochMilli( + System.currentTimeMillis() + 100000000); + private static final SourceControlOperationRunner sourceControlOperationRunnerSpy = + spy(new MockSourceControlOperationRunner()); + + @Test + public void testRefresh() throws Exception { + RepositoryAppsResponse expectedListResult = new RepositoryAppsResponse( + Arrays.asList( + new RepositoryApp("app1", "hash1"), + new RepositoryApp("app3", "hash3"), + new RepositoryApp("app4", "hash4"), + new RepositoryApp("app5", "hash5"), + new RepositoryApp("app6", "hash6"), + new RepositoryApp("app7", "hashDiff") + ) + ); + + setRepository(); + + doReturn(expectedListResult) + .when(sourceControlOperationRunnerSpy).list(Mockito.any(NamespaceRepository.class)); + + insertRepoSourceControlMetadataTests(); + insertNamespaceSourceControlTests(); + + List expectedRecords = new ArrayList<>(); + List gotRecords = new ArrayList<>(); + + refreshService.runOneIteration(); + + // Checking the all apps stored in repo source control table + gotRecords = getRepoRecords(NAMESPACE_ID.getNamespace()); + expectedRecords = getUpdatedRepoRecords(); + + Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); + + // Checking the all apps stored in namespace source control table + gotRecords = getNamespaceRecords(NAMESPACE_ID.getNamespace()); + expectedRecords = getUpdatedNamespaceRecords(); + + Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); + + deleteAll(); + + } + + private List getUpdatedRepoRecords() { + SourceControlMetadataRecord record1 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app1", null, null, fixedInstant.toEpochMilli(), true); + SourceControlMetadataRecord record2 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app3", null, null, null, false); + SourceControlMetadataRecord record3 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app4", null, null, instantGreaterThanStartRefresh.toEpochMilli(), true); + SourceControlMetadataRecord record4 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app5", null, null, instantGreaterThanStartRefresh.toEpochMilli(), false); + SourceControlMetadataRecord record5 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app6", null, null, fixedInstant.toEpochMilli(), true); + SourceControlMetadataRecord record6 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app7", null, null, fixedInstant.toEpochMilli(), false); + List insertedRecords = Arrays.asList(record1, record2, record3, + record4, + record5, record6); + return insertedRecords; + } + + private List getUpdatedNamespaceRecords() { + SourceControlMetadataRecord record1 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app1", "hash1", "commit1", + fixedInstant.toEpochMilli(), true); + SourceControlMetadataRecord record2 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app4", "hash4", "commit1", instantGreaterThanStartRefresh.toEpochMilli(), true); + SourceControlMetadataRecord record3 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app5", "hash5", "commit1", instantGreaterThanStartRefresh.toEpochMilli(), false); + SourceControlMetadataRecord record4 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app6", "hash6", "commit1", + fixedInstant.toEpochMilli(), true); + SourceControlMetadataRecord record5 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app7", "hash7", "commit1", + fixedInstant.toEpochMilli(), false); + List insertedRecords = Arrays.asList(record1, record2, record3, + record4, + record5); + return insertedRecords; + } + + private void insertRepoSourceControlMetadataTests() { + insertRepoRecord(Namespace.DEFAULT.getId(), "app1", TYPE, null, null, 0L, false); + insertRepoRecord(Namespace.DEFAULT.getId(), "app2", TYPE, null, null, + Instant.now().toEpochMilli(), false); + } + + private void insertRepoRecord(String namespace, String name, String type, + String specHash, String commitId, Long lastModified, Boolean isSycned) { + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( + context); + store.write(new ApplicationReference(namespace, name), isSycned, lastModified); + }); + } + + private void insertNamespaceSourceControlTests() { + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app1", TYPE, "hash1", "commit1", + fixedInstant.toEpochMilli(), false); + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app4", TYPE, "hash4", "commit1", + instantGreaterThanStartRefresh.toEpochMilli(), true); + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app5", TYPE, "hash5", "commit1", + instantGreaterThanStartRefresh.toEpochMilli(), false); + + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app6", TYPE, "hash6", "commit1", + fixedInstant.toEpochMilli(), false); + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app7", TYPE, "hash7", "commit1", + fixedInstant.toEpochMilli(), false); + } + + private void insertNamespaceRecord(String namespace, String name, String type, + String specHash, String commitId, Long lastModified, Boolean isSycned) { + TransactionRunners.run(transactionRunner, context -> { + NamespaceSourceControlMetadataStore store = NamespaceSourceControlMetadataStore.create( + context); + store.write(new ApplicationReference(namespace, name), + new SourceControlMeta(specHash, commitId, Instant.ofEpochMilli(lastModified), isSycned)); + }); + } + + private List getRepoRecords(String namespace) { + AtomicReference> records = new AtomicReference<>( + new ArrayList<>()); + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( + context); + records.set(store.getAll(namespace, TYPE)); + }); + return records.get(); + } + + private List getNamespaceRecords(String namespace) { + AtomicReference> records = new AtomicReference<>( + new ArrayList<>()); + TransactionRunners.run(transactionRunner, context -> { + NamespaceSourceControlMetadataStore store = NamespaceSourceControlMetadataStore.create( + context); + records.set(store.getAll(namespace, TYPE)); + }); + return records.get(); + } + + private void setRepository() { + TransactionRunners.run(transactionRunner, context -> { + RepositoryTable store = new RepositoryTable(context); + store.create(NAMESPACE_ID, REPO_CONFIG); + }); + } + + private void deleteAll() { + TransactionRunners.run(transactionRunner, context -> { + RepositoryTable repoTableStore = new RepositoryTable(context); + repoTableStore.delete(NAMESPACE_ID); + NamespaceSourceControlMetadataStore namespaceStore = NamespaceSourceControlMetadataStore.create( + context); + namespaceStore.deleteAll(NAMESPACE_ID.getNamespace()); + RepositorySourceControlMetadataStore repoStore = RepositorySourceControlMetadataStore.create( + context); + repoStore.deleteAll(NAMESPACE_ID.getNamespace()); + }); + } + + /** + * A Mock {@link SourceControlOperationRunner} that can be used for tests. + */ + private static class MockSourceControlOperationRunner extends + AbstractIdleService implements + SourceControlOperationRunner { + + @Override + public PushAppsResponse push(PushAppOperationRequest pushAppOperationRequest) + throws NoChangesToPushException, AuthenticationConfigException { + return null; + } + + @Override + public PushAppsResponse multiPush(MultiPushAppOperationRequest pushRequest, + ApplicationManager appManager) + throws NoChangesToPushException, AuthenticationConfigException { + return null; + } + + @Override + public PullAppResponse pull(PullAppOperationRequest pullRequest) + throws NotFoundException, AuthenticationConfigException { + return null; + } + + @Override + public void multiPull(MultiPullAppOperationRequest pullRequest, + Consumer> consumer) + throws NotFoundException, AuthenticationConfigException { + } + + @Override + public RepositoryAppsResponse list(NamespaceRepository nameSpaceRepository) + throws AuthenticationConfigException, NotFoundException { + return null; + } + + @Override + protected void startUp() throws Exception { + // no-op. + } + + @Override + protected void shutDown() throws Exception { + // no-op. + } + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlManagementServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlManagementServiceTest.java index d9fbe4ae9050..34360780ce9d 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlManagementServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlManagementServiceTest.java @@ -38,8 +38,9 @@ import io.cdap.cdap.common.id.Id; import io.cdap.cdap.common.id.Id.Namespace; import io.cdap.cdap.common.namespace.NamespaceAdmin; +import io.cdap.cdap.features.Feature; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; -import io.cdap.cdap.internal.app.store.NamespaceSourceControlMetadataStore; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.metadata.MetadataSubscriberService; @@ -57,7 +58,6 @@ import io.cdap.cdap.proto.sourcecontrol.Provider; import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; -import io.cdap.cdap.proto.sourcecontrol.SortBy; import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; import io.cdap.cdap.security.impersonation.CurrentUGIProvider; import io.cdap.cdap.security.impersonation.UGIProvider; @@ -79,16 +79,15 @@ import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; -import io.cdap.cdap.spi.data.SortOrder; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.time.Clock; import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; +import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; import org.junit.Assert; @@ -148,13 +147,15 @@ public SourceControlManagementService provideSourceControlManagementService( SourceControlOperationRunner sourceControlRunner, ApplicationLifecycleService applicationLifecycleService, Store store, MetricsCollectionService metricsCollectionService, - OperationLifecycleManager manager) { + OperationLifecycleManager manager, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { return new SourceControlManagementService(cConf, secureStore, transactionRunner, accessEnforcer, authenticationContext, sourceControlRunner, applicationLifecycleService, store, manager, metricsCollectionService, - Clock.fixed(fixedInstant, ZoneId.systemDefault())); + Clock.fixed(fixedInstant, ZoneId.systemDefault()), + sourceControlMetadataRefreshService); } }); } @@ -523,19 +524,19 @@ public void testPullAndDeployNoChangesToPullException() throws Exception { sourceControlService.deleteRepository(namespaceId); } - @Test(expected = SourceControlException.class) - public void testScanRepoMetadataFailed() throws Exception { - NamespaceId namespaceId = new NamespaceId(Id.Namespace.DEFAULT.getId()); - sourceControlService.setRepository(namespaceId, REPOSITORY_CONFIG); - - Mockito.doThrow(SourceControlException.class) - .when(sourceControlOperationRunnerSpy).list(Mockito.any(NamespaceRepository.class)); - + @Test + public void testScanRepoMetadataWithFalseFlag() throws Exception { + setAutoRefreshFeatureFlag(false); List gotRecords = new ArrayList<>(); ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest.builder() .setNamespace(Namespace.DEFAULT.getId()).build(); sourceControlService.scanRepoMetadata(request, batchSize, gotRecords::add); - sourceControlService.deleteRepository(namespaceId); + } + + private static void setAutoRefreshFeatureFlag(boolean flag) { + String featureFlagPrefix = "feature."; + cConf.setBoolean(featureFlagPrefix + + Feature.SOURCE_CONTROL_METADATA_MANUAL_REFRESH.getFeatureFlagString(), flag); } @Test @@ -545,21 +546,19 @@ public void testOperationRunnerStarted() { @Test public void testScanRepoMetadata() throws Exception { + RepositoryApp app1 = new RepositoryApp("app1", "hash1"); - RepositoryApp app3 = new RepositoryApp("app3", "hash3"); - RepositoryApp app4 = new RepositoryApp("app4", "hash4"); RepositoryAppsResponse expectedListResult = new RepositoryAppsResponse( - Arrays.asList(app1, app3, app4)); + Collections.singletonList(app1)); NamespaceId namespaceId = new NamespaceId(Id.Namespace.DEFAULT.getId()); + //deleteAllRepoSourceControlRecords(namespaceId.getNamespace()); sourceControlService.setRepository(namespaceId, REPOSITORY_CONFIG); Mockito.doReturn(expectedListResult) .when(sourceControlOperationRunnerSpy).list(Mockito.any(NamespaceRepository.class)); - insertRepoSourceControlMetadataTests(); - insertNamespaceSourceControlTests(); List gotRecords = new ArrayList<>(); - List insertedRecords = getInsertedRecords(); + List insertedRecords = insertRepoSourceControlMetadataTests(); List expectedRecords = new ArrayList<>(); // verify the scan without filters picks all apps for default namespace @@ -577,16 +576,7 @@ public void testScanRepoMetadata() throws Exception { expectedRecords = insertedRecords.stream().limit(2).collect(Collectors.toList()); Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); - // verify pageToken with limit - gotRecords.clear(); - request = ScanSourceControlMetadataRequest.builder().setNamespace(Namespace.DEFAULT.getId()) - .setLimit(5).setScanAfter("app3").build(); - sourceControlService.scanRepoMetadata(request, batchSize, gotRecords::add); - expectedRecords = insertedRecords.stream().filter(rec -> rec.getName().equals("app4")).collect( - Collectors.toList()); - Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); - - // verify pageToken with limit + // verify pageToken with name filter gotRecords.clear(); request = ScanSourceControlMetadataRequest.builder().setNamespace(Namespace.DEFAULT.getId()) .setFilter(new SourceControlMetadataFilter("1", null)).build(); @@ -595,57 +585,31 @@ public void testScanRepoMetadata() throws Exception { Collectors.toList()); Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); - // verify sorting by desc order on last modified - gotRecords.clear(); - request = ScanSourceControlMetadataRequest.builder().setNamespace(Namespace.DEFAULT.getId()) - .setSortOrder( - SortOrder.DESC).setSortOn(SortBy.LAST_SYNCED_AT).build(); - sourceControlService.scanRepoMetadata(request, batchSize, gotRecords::add); - expectedRecords = insertedRecords.stream() - .sorted(Comparator.nullsFirst( - Comparator.comparing( - (SourceControlMetadataRecord record) -> record.getLastModified(), - Comparator.nullsFirst(Comparator.naturalOrder()) - ).reversed())) - .collect(Collectors.toList()); - Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); - - deleteAllNamespaceSourceControlRecords(Namespace.DEFAULT.getId()); deleteAllRepoSourceControlRecords(Namespace.DEFAULT.getId()); sourceControlService.deleteRepository(new NamespaceId(Namespace.DEFAULT.getId())); } - private void deleteAllNamespaceSourceControlRecords(String namespace) { + private void deleteAllRepoSourceControlRecords(String namespace) { TransactionRunners.run(transactionRunner, context -> { - NamespaceSourceControlMetadataStore store = NamespaceSourceControlMetadataStore.create( + RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( context); store.deleteAll(namespace); }); } - private void deleteAllRepoSourceControlRecords(String namespace) { + private List insertRepoSourceControlMetadataTests() { + AtomicReference> records = new AtomicReference<>( + new ArrayList<>()); + for (int i = 0; i < 10; i++) { + insertRepoRecord(Namespace.DEFAULT.getId(), "app" + i, TYPE, null, null, + 0L, i % 3 == 0 ? true : false); + } TransactionRunners.run(transactionRunner, context -> { RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( context); - store.deleteAll(namespace); + records.set(store.getAll(Namespace.DEFAULT.getId(), TYPE)); }); - } - - private List getInsertedRecords() { - SourceControlMetadataRecord record1 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), - TYPE, "app1", null, null, fixedInstant.toEpochMilli(), true); - SourceControlMetadataRecord record2 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), - TYPE, "app3", null, null, null, false); - SourceControlMetadataRecord record3 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), - TYPE, "app4", null, null, fixedInstant2.toEpochMilli(), true); - List insertedRecords = Arrays.asList(record1, record2, record3); - return insertedRecords; - } - - private void insertRepoSourceControlMetadataTests() { - insertRepoRecord(Namespace.DEFAULT.getId(), "app1", TYPE, null, null, 0L, false); - insertRepoRecord(Namespace.DEFAULT.getId(), "app2", TYPE, null, null, - Instant.now().toEpochMilli(), false); + return records.get(); } private void insertRepoRecord(String namespace, String name, String type, @@ -657,24 +621,6 @@ private void insertRepoRecord(String namespace, String name, String type, }); } - private void insertNamespaceSourceControlTests() { - insertNamespaceRecord(Namespace.DEFAULT.getId(), "app1", TYPE, "hash1", "commit1", - fixedInstant.toEpochMilli(), false); - insertNamespaceRecord(Namespace.DEFAULT.getId(), "app3", TYPE, "hash4", "commit1", - fixedInstant.toEpochMilli(), false); - insertNamespaceRecord(Namespace.DEFAULT.getId(), "app4", TYPE, "hash4", "commit1", - fixedInstant2.toEpochMilli(), true); - } - - private void insertNamespaceRecord(String namespace, String name, String type, - String specHash, String commitId, Long lastModified, Boolean isSycned) { - TransactionRunners.run(transactionRunner, context -> { - NamespaceSourceControlMetadataStore store = NamespaceSourceControlMetadataStore.create( - context); - store.write(new ApplicationReference(namespace, name), - new SourceControlMeta(specHash, commitId, Instant.ofEpochMilli(lastModified), isSycned)); - }); - } /** * A Mock {@link SourceControlOperationRunner} that can be used for tests. diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java index 11fcae89246b..3559b8e6905f 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java @@ -55,6 +55,7 @@ import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository; import io.cdap.cdap.internal.app.services.ApplicationLifecycleService; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.state.AppStateKey; import io.cdap.cdap.internal.app.store.state.AppStateKeyValue; import io.cdap.cdap.internal.capability.CapabilityReader; @@ -109,7 +110,6 @@ public class AppLifecycleHttpHandlerTest extends AppFabricTestBase { private static CConfiguration cConf; private static final String FEATURE_FLAG_PREFIX = "feature."; - @BeforeClass public static void beforeClass() throws Throwable { cConf = createBasicCconf(); @@ -143,12 +143,14 @@ public ApplicationLifecycleService createLifeCycleService(CConfiguration cConf, MetadataServiceClient metadataServiceClient, AccessEnforcer accessEnforcer, AuthenticationContext authenticationContext, MessagingService messagingService, Impersonator impersonator, - CapabilityReader capabilityReader, TransactionRunner transactionRunner) { + CapabilityReader capabilityReader, TransactionRunner transactionRunner, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { return Mockito.spy(new ApplicationLifecycleService(cConf, store, scheduler, usageRegistry, preferencesService, metricsSystemClient, ownerAdmin, artifactRepository, managerFactory, metadataServiceClient, accessEnforcer, authenticationContext, - messagingService, impersonator, capabilityReader, new NoOpMetricsCollectionService(), transactionRunner)); + messagingService, impersonator, capabilityReader, new NoOpMetricsCollectionService(), transactionRunner, + sourceControlMetadataRefreshService)); } }); } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java index 45f51e315205..ec2ec69ebf9c 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java @@ -37,6 +37,7 @@ import io.cdap.cdap.internal.app.services.ApplicationLifecycleService; import io.cdap.cdap.internal.app.services.SourceControlManagementService; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.metadata.MetadataSubscriberService; import io.cdap.cdap.proto.ApplicationRecord; @@ -124,18 +125,19 @@ protected void configure() { @Singleton public SourceControlManagementService provideSourceControlManagementService( CConfiguration cConf, - SecureStore secureStore, - TransactionRunner transactionRunner, - AccessEnforcer accessEnforcer, - AuthenticationContext authenticationContext, - SourceControlOperationRunner sourceControlRunner, - ApplicationLifecycleService applicationLifecycleService, - Store store, OperationLifecycleManager manager, MetricsCollectionService metricsService) { + SecureStore secureStore, + TransactionRunner transactionRunner, + AccessEnforcer accessEnforcer, + AuthenticationContext authenticationContext, + SourceControlOperationRunner sourceControlRunner, + ApplicationLifecycleService applicationLifecycleService, + Store store, OperationLifecycleManager manager, MetricsCollectionService metricsService, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { return Mockito.spy(new SourceControlManagementService(cConf, secureStore, transactionRunner, - accessEnforcer, authenticationContext, - sourceControlRunner, applicationLifecycleService, - store, manager, metricsService)); + accessEnforcer, authenticationContext, + sourceControlRunner, applicationLifecycleService, + store, manager, metricsService, sourceControlMetadataRefreshService)); } }); } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStoreTest.java index 11752e745e4e..49bdf7a4fb06 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStoreTest.java @@ -306,9 +306,9 @@ public void testWrite() { RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( context); store.write(APP_REF, IS_SYNCED, LAST_MODIFIED.toEpochMilli()); - ImmutablePair pair = store.get(APP_REF); - assertEquals(IS_SYNCED, pair.getSecond()); - assertEquals(LAST_MODIFIED.toEpochMilli(), pair.getFirst()); + SourceControlMetadataRecord record = store.get(APP_REF); + assertEquals(IS_SYNCED, record.getIsSynced()); + assertEquals(LAST_MODIFIED.toEpochMilli(), record.getLastModified().longValue()); store.deleteAll(NAMESPACE); }); } @@ -319,11 +319,12 @@ public void testDelete() { RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( context); store.write(APP_REF, IS_SYNCED, LAST_MODIFIED.toEpochMilli()); - ImmutablePair pair = store.get(APP_REF); - assertEquals(pair, new ImmutablePair<>(LAST_MODIFIED.toEpochMilli(), IS_SYNCED)); + SourceControlMetadataRecord record = store.get(APP_REF); + assertEquals(record, new SourceControlMetadataRecord(NAMESPACE, TYPE, NAME, null, null, + LAST_MODIFIED.toEpochMilli(), IS_SYNCED)); store.delete(APP_REF); - pair = store.get(APP_REF); - assertEquals(null, pair); + record = store.get(APP_REF); + assertEquals(null, record); }); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index c5b98e048b22..61d031bb184a 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2482,6 +2482,10 @@ public static final class SourceControlManagement { public static final String REPOSITORY_TTL_SECONDS = "source.control.repository.ttl.seconds"; public static final String METADATA_MIGRATION_INTERVAL_SECONDS = "source.control.metadata.migration.interval.seconds"; + public static final String METADATA_REFRESH_INTERVAL_SECONDS = + "source.control.metadata.refresh.interval.seconds"; + public static final String METADATA_REFRESH_BUFFER_SECONDS = + "source.control.metadata.refresh.buffer.seconds"; } /** diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 6251096a3b9c..3f641b2dd0de 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -6012,6 +6012,24 @@ + + feature.source.control.metadata.manual.refresh.enabled + true + + If true, source control metadata refresh service will be triggered when + list API is called for namespace and repository source control metadata. + + + + + feature.source.control.metadata.periodic.refresh.enabled + true + + If true, source control metadata refresh service for a namespace will be + triggered periodically. + + + artifact.cache.bind.address 0.0.0.0 @@ -6129,6 +6147,22 @@ The service only retries when the migration job fails. + + source.control.metadata.refresh.interval.seconds + 10800 + + Scheduler interval for SCM metadata refresh service, by default 3 hrs + + + + source.control.metadata.refresh.buffer.seconds + 600 + + Buffer time interval for SCM metadata refresh service, by default, + is set to 10 minutes. If a new refresh request occurs within the + buffer time span since the last request, it will be skipped. + + diff --git a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java index a1b79703135f..07e34f34e731 100644 --- a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java +++ b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java @@ -44,7 +44,9 @@ public enum Feature { WRANGLER_SCHEMA_MANAGEMENT("6.10.0"), NAMESPACED_SERVICE_ACCOUNTS("6.10.0"), WRANGLER_KRYO_SERIALIZATION("6.10.1"), - SOURCE_CONTROL_MANAGEMENT_GITLAB_BITBUCKET("6.10.1"); + SOURCE_CONTROL_MANAGEMENT_GITLAB_BITBUCKET("6.10.1"), + SOURCE_CONTROL_METADATA_MANUAL_REFRESH("6.10.1"), + SOURCE_CONTROL_METADATA_PERIODIC_REFRESH("6.10.1"); private final PlatformInfo.Version versionIntroduced; private final boolean defaultAfterIntroduction;