Skip to content

Commit

Permalink
updating namespace and repository source control metadata manually
Browse files Browse the repository at this point in the history
  • Loading branch information
adrikagupta committed May 7, 2024
1 parent aedcb1f commit 06dc4b3
Show file tree
Hide file tree
Showing 18 changed files with 431 additions and 657 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefresher;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
Expand Down Expand Up @@ -215,7 +215,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);
bind(SourceControlMetadataRefresher.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -260,7 +260,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);
bind(SourceControlMetadataRefresher.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -318,7 +318,7 @@ protected void configure() {
bind(StorageProviderNamespaceAdmin.class)
.to(DistributedStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);
bind(SourceControlMetadataRefresher.class).in(Scopes.SINGLETON);

bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class)
.in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import io.cdap.cdap.common.NamespaceNotFoundException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.NotImplementedException;
import io.cdap.cdap.common.RepositoryNotFoundException;
import io.cdap.cdap.common.ServiceException;
import io.cdap.cdap.common.SourceControlMetadataNotFoundException;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
Expand Down Expand Up @@ -296,7 +298,8 @@ public void getAllApps(HttpRequest request, HttpResponder responder,
}

/**
* Retrieves all namespace source control metadata for applications within the specified namespace.
* Retrieves all namespace source control metadata for applications within the specified namespace,
* next page token and last refresh time.
*
* @param request The HTTP request containing parameters for retrieving metadata.
* @param responder The HTTP responder for sending the response.
Expand All @@ -321,32 +324,25 @@ public void getAllNamespaceSourceControlMetadata(FullHttpRequest request, HttpRe
) throws Exception {
validateNamespace(namespaceId);
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
apps.add(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
boolean pageLimitReached;
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
apps::add);
SourceControlMetadataRecord record = apps.isEmpty() ? null :apps.get(apps.size() - 1);
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
nextPageToken, lastRefreshTime == 0L ? null : lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
* Retrieves the source control metadata for the specified application within the specified namespace.
* Retrieves the source control metadata for the specified application within the specified namespace
* and last refresh time of git pull/push.
*
* @param request The HTTP request containing parameters for retrieving metadata.
* @param responder The HTTP responder for sending the response.
Expand All @@ -358,7 +354,9 @@ public void getAllNamespaceSourceControlMetadata(FullHttpRequest request, HttpRe
@Path("/apps/{app-id}/sourcecontrol")
public void getNamespaceSourceControlMetadata(HttpRequest request, HttpResponder responder,
@PathParam("namespace-id") final String namespaceId,
@PathParam("app-id") final String appName) throws Exception {
@PathParam("app-id") final String appName)
throws BadRequestException, NamespaceNotFoundException,
SourceControlMetadataNotFoundException, RepositoryNotFoundException {
validateApplicationId(namespaceId, appName);
SourceControlMetadataRecord app = applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public class SourceControlManagementHttpHandler extends AbstractAppFabricHttpHan
private final SourceControlManagementService sourceControlService;
private final FeatureFlagsProvider featureFlagsProvider;
private static final Gson GSON = new Gson();
public static final String APP_LIST_PAGINATED_KEY_SHORT = "apps";
private final int batchSize;

@Inject
Expand Down Expand Up @@ -159,27 +158,19 @@ public void listAllApplications(FullHttpRequest request, HttpResponder responder
checkSourceControlFeatureFlag();
validateNamespaceId(namespaceId);
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
apps.add(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
boolean pageLimitReached;
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
apps::add);
SourceControlMetadataRecord record = apps.isEmpty() ? null :apps.get(apps.size() - 1);
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId);
long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId);
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
nextPageToken, lastRefreshTime == 0L ? null : lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataMigrationService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefresher;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.credential.CredentialProviderService;
Expand Down Expand Up @@ -98,7 +98,7 @@ public class AppFabricServer extends AbstractIdleService {
private final RepositoryCleanupService repositoryCleanupService;
private final OperationNotificationSubscriberService operationNotificationSubscriberService;
private final SourceControlMetadataMigrationService sourceControlMetadataMigrationService;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;
private final SourceControlMetadataRefresher sourceControlMetadataRefresher;
private final CConfiguration cConf;
private final SConfiguration sConf;
private final boolean sslEnabled;
Expand Down Expand Up @@ -140,7 +140,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService,
SourceControlMetadataMigrationService sourceControlMetadataMigrationService,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
SourceControlMetadataRefresher sourceControlMetadataRefresher) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand Down Expand Up @@ -170,7 +170,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.repositoryCleanupService = repositoryCleanupService;
this.operationNotificationSubscriberService = operationNotificationSubscriberService;
this.sourceControlMetadataMigrationService = sourceControlMetadataMigrationService;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
this.sourceControlMetadataRefresher = sourceControlMetadataRefresher;
}

/**
Expand Down Expand Up @@ -203,8 +203,7 @@ protected void startUp() throws Exception {
sourceControlOperationRunner.start(),
repositoryCleanupService.start(),
operationNotificationSubscriberService.start(),
sourceControlMetadataMigrationService.start(),
sourceControlMetadataRefreshService.start()
sourceControlMetadataMigrationService.start()
));
Futures.allAsList(futuresList).get();

Expand Down Expand Up @@ -267,7 +266,6 @@ protected void shutDown() throws Exception {
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
sourceControlMetadataMigrationService.stopAndWait();
sourceControlMetadataRefreshService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
import io.cdap.cdap.common.ArtifactNotFoundException;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.CannotBeDeletedException;
import io.cdap.cdap.common.ForbiddenException;
import io.cdap.cdap.common.InvalidArtifactException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.RepositoryNotFoundException;
import io.cdap.cdap.common.SourceControlMetadataNotFoundException;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
Expand All @@ -82,7 +82,7 @@
import io.cdap.cdap.internal.app.runtime.artifact.ArtifactDetail;
import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository;
import io.cdap.cdap.internal.app.runtime.artifact.Artifacts;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefresher;
import io.cdap.cdap.internal.app.store.ApplicationMeta;
import io.cdap.cdap.internal.app.store.RunRecordDetail;
import io.cdap.cdap.internal.app.store.state.AppStateKey;
Expand Down Expand Up @@ -190,7 +190,7 @@ public class ApplicationLifecycleService extends AbstractIdleService {
private final MetricsCollectionService metricsCollectionService;
private final FeatureFlagsProvider featureFlagsProvider;
private final TransactionRunner transactionRunner;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;
private final SourceControlMetadataRefresher sourceControlMetadataRefresher;

/**
* Construct the ApplicationLifeCycleService with service factory and cConf coming from guice
Expand All @@ -207,7 +207,7 @@ public ApplicationLifecycleService(CConfiguration cConf,
MessagingService messagingService, Impersonator impersonator,
CapabilityReader capabilityReader,
MetricsCollectionService metricsCollectionService, TransactionRunner transactionRunner,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
SourceControlMetadataRefresher sourceControlMetadataRefresher) {
this.cConf = cConf;
this.appUpdateSchedules = cConf.getBoolean(Constants.AppFabric.APP_UPDATE_SCHEDULES,
Constants.AppFabric.DEFAULT_APP_UPDATE_SCHEDULES);
Expand All @@ -225,7 +225,7 @@ public ApplicationLifecycleService(CConfiguration cConf,
this.authenticationContext = authenticationContext;
this.impersonator = impersonator;
this.capabilityReader = capabilityReader;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
this.sourceControlMetadataRefresher = sourceControlMetadataRefresher;
this.adminEventPublisher = new AdminEventPublisher(cConf,
new MultiThreadMessagingContext(messagingService));
this.metricsCollectionService = metricsCollectionService;
Expand Down Expand Up @@ -323,13 +323,11 @@ public boolean scanApplications(ScanApplicationsRequest request,
*/
public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest request,
int txBatchSize,
Consumer<SourceControlMetadataRecord> consumer) throws IOException {
Consumer<SourceControlMetadataRecord> consumer)
throws IOException, RepositoryNotFoundException {
NamespaceId namespaceId = new NamespaceId(request.getNamespace());

// Triggering source control metadata refresh service
if (isSourceControlMetadataManualRefreshFlagEnabled()) {
sourceControlMetadataRefreshService.runRefreshService(namespaceId);
}
// Checking if repository config is present in the namespace
sourceControlMetadataRefresher.getRepositoryMeta(namespaceId);

String lastKey = request.getScanAfter();
int currentLimit = request.getLimit();
Expand Down Expand Up @@ -358,13 +356,10 @@ public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest reques
return true;
}

public Long getLastRefreshTime(String namespace) {
return sourceControlMetadataRefreshService.getLastRefreshTime(new NamespaceId(namespace));
public long getLastRefreshTime(String namespace) {
return sourceControlMetadataRefresher.getLastRefreshTime(new NamespaceId(namespace));
}

private boolean isSourceControlMetadataManualRefreshFlagEnabled() {
return Feature.SOURCE_CONTROL_METADATA_MANUAL_REFRESH.isEnabled(featureFlagsProvider);
}

private void processApplications(List<Map.Entry<ApplicationId, ApplicationMeta>> list,
Consumer<ApplicationDetail> consumer) {
Expand Down Expand Up @@ -436,7 +431,9 @@ public ApplicationDetail getLatestAppDetail(ApplicationReference appRef)
* reference is not found.
*/
public SourceControlMetadataRecord getSourceControlMetadataRecord(ApplicationReference appRef)
throws SourceControlMetadataNotFoundException {
throws SourceControlMetadataNotFoundException, RepositoryNotFoundException {
// Checking if repository config exists for the namespace
sourceControlMetadataRefresher.getRepositoryMeta(new NamespaceId(appRef.getNamespace()));
SourceControlMetadataRecord record = store.getNamespaceSourceControlMetadataRecord(appRef);
if (record == null) {
throw new SourceControlMetadataNotFoundException(appRef);
Expand Down
Loading

0 comments on commit 06dc4b3

Please sign in to comment.