Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4df6cde
Fix calendar event update scalability issues
valeriy42 Oct 21, 2025
6957a46
[CI] Auto commit changes from spotless
Oct 21, 2025
0311f6c
Update docs/changelog/136886.yaml
valeriy42 Oct 21, 2025
9eda793
checkstyle
valeriy42 Oct 21, 2025
defa8c2
Merge branch 'main' into bugfix/limited-update-notification-queue
valeriy42 Oct 21, 2025
8514752
[CI] Auto commit changes from spotless
Oct 21, 2025
ddd9e53
Progress-Based Response for calendar updates by immediately respondin…
valeriy42 Oct 22, 2025
10fea2a
Merge branch 'bugfix/limited-update-notification-queue' of https://gi…
valeriy42 Oct 22, 2025
771b430
spotless
valeriy42 Oct 22, 2025
95a07dd
Remove CalendarScalabilityIT integration tests and refactor job updat…
valeriy42 Oct 22, 2025
613bcce
Refactor integration tests for ML job handling by removing unused met…
valeriy42 Oct 22, 2025
8bbe8af
Refactor logging imports in TransportPostCalendarEventsAction to use …
valeriy42 Oct 22, 2025
8361516
fix logger check
valeriy42 Oct 22, 2025
ed18e77
Merge branch 'main' into bugfix/limited-update-notification-queue
valeriy42 Oct 22, 2025
6ea309f
Update unit tests.
valeriy42 Oct 23, 2025
8edc65a
Merge branch 'bugfix/limited-update-notification-queue' of https://gi…
valeriy42 Oct 23, 2025
29d7f73
Merge branch 'main' into bugfix/limited-update-notification-queue
valeriy42 Oct 23, 2025
61a61ab
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 Oct 31, 2025
d56d390
reiviewer comments addressed.
valeriy42 Oct 31, 2025
1becbbc
revert debug logging with supplier function
valeriy42 Oct 31, 2025
ca79166
Refactor debug logging in JobManager to use lambda expression for imp…
valeriy42 Oct 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136886.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136886
summary: Fix ML calendar event update scalability issues
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
Expand All @@ -28,8 +38,10 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
Expand All @@ -45,6 +57,13 @@ public void cleanUpTest() {
cleanUp();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(ScheduledEventsIT.UpdateProcessActionTrackerPlugin.class);
return plugins;
}

public void testScheduledEvents() throws IOException {

TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
Expand Down Expand Up @@ -464,6 +483,166 @@ public void testNewJobWithGlobalCalendar() throws Exception {
assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened"));
}

/**
* Test that verifies UpdateProcessAction is called with correct parameters when calendar events
* are posted asynchronously, using ActionFilter to directly intercept the calls
*/
public void testCalendarUpdateCallsUpdateProcessAction() throws Exception {
// Reset tracker
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset();

TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
String jobId = "calendar-update-test";
String calendarId = "test-calendar";

// Create and open single job
createJob(jobId, bucketSpan);
openJob(jobId);

// Create calendar with the job
putCalendar(calendarId, List.of(jobId), "Update process action test");

// Create scheduled event
List<ScheduledEvent> events = List.of(
new ScheduledEvent.Builder().description("Direct Test Event")
.startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000))
.endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000))
.calendarId(calendarId)
.build()
);

// Post events - API should return immediately with async implementation
postScheduledEvents(calendarId, events);

// Wait for and verify ActionFilter captured the UpdateProcessAction call
// We intercept the call to UpdateProcessAction using an ActionFilter to verify the call was made
assertBusy(() -> {
assertThat(
"Should have intercepted UpdateProcessAction call",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(),
equalTo(1)
);
assertThat(
"Should have called UpdateProcessAction for the correct job",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds,
contains(jobId)
);
});

// Verify calendar events were stored correctly
verifyCalendarEventsStored(calendarId, 1);

// Cleanup
closeJob(jobId);

logger.info("Successfully verified UpdateProcessAction call with updateScheduledEvents=true for job [{}]", jobId);
}

/**
* Test calendar updates with closed jobs (should not fail)
*/
public void testCalendarUpdateWithClosedJobs() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worthwhile having a test where there are both closed and non-closed jobs, and confirming that the closed ones are skipped but the non-closed ones are updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added testCalendarUpdateWithMixedOpenAndClosedJobs() to cover this use case.

TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
String jobId = "closed-job-test";

// Create and run job, then close it
Job.Builder job = createJob(jobId, bucketSpan);
long startTime = 1514764800000L;
runJob(job, startTime, bucketSpan, 10);

// Create calendar with the closed job
String calendarId = "closed-job-calendar";
putCalendar(calendarId, Collections.singletonList(jobId), "Calendar with closed job");

// Create scheduled event
List<ScheduledEvent> events = new ArrayList<>();
long eventStartTime = startTime + (bucketSpan.millis() * 5);
long eventEndTime = eventStartTime + (bucketSpan.millis() * 2);
events.add(
new ScheduledEvent.Builder().description("Closed Job Event")
.startTime(Instant.ofEpochMilli(eventStartTime))
.endTime(Instant.ofEpochMilli(eventEndTime))
.calendarId(calendarId)
.build()
);

// This should not fail even though the job is closed
// The async implementation should gracefully skip closed jobs
postScheduledEvents(calendarId, events);

// Verify job is still closed and buckets don't have the new event
// (since the job was closed when the event was added)
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
List<Bucket> buckets = getBuckets(getBucketsRequest);

// All buckets should be empty of scheduled events since job was closed when event was added
for (Bucket bucket : buckets) {
assertThat("Closed job buckets should not contain new scheduled events", bucket.getScheduledEvents(), empty());
}
}

/**
* Test calendar updates with mixed open and closed jobs - verify open jobs are updated and closed jobs are skipped
*/
public void testCalendarUpdateWithMixedOpenAndClosedJobs() throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);

// Create two jobs
String openJobId = "mixed-test-open-job";
String closedJobId = "mixed-test-closed-job";

// Create and open first job
createJob(openJobId, bucketSpan);
openJob(openJobId);

// Create and run second job, then close it
Job.Builder closedJob = createJob(closedJobId, bucketSpan);
long startTime = 1514764800000L;
runJob(closedJob, startTime, bucketSpan, 10);

// Create calendar with both jobs
String calendarId = "mixed-jobs-calendar";
putCalendar(calendarId, List.of(openJobId, closedJobId), "Calendar with mixed open and closed jobs");

// Reset tracker
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset();

// Create scheduled event
List<ScheduledEvent> events = List.of(
new ScheduledEvent.Builder().description("Mixed Jobs Event")
.startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000))
.endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000))
.calendarId(calendarId)
.build()
);

// Post events - should update open job and skip closed job
postScheduledEvents(calendarId, events);

// Wait for ActionFilter to capture the UpdateProcessAction call
// Should only be called for the open job, not the closed one
assertBusy(() -> {
assertThat(
"Should have intercepted UpdateProcessAction call for open job only",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(),
equalTo(1)
);
assertThat(
"Should have called UpdateProcessAction for the open job only",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds,
contains(openJobId)
);
assertThat(
"Should not have called UpdateProcessAction for the closed job",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds.contains(closedJobId),
is(false)
);
});

// Cleanup
closeJob(openJobId);
}

private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
Detector.Builder detector = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
Expand All @@ -486,4 +665,62 @@ private void runJob(Job.Builder job, long startTime, TimeValue bucketSpan, int b
);
closeJob(job.getId());
}

/**
* Helper method to verify that calendar events are stored and retrievable
*/
private void verifyCalendarEventsStored(String calendarId, int expectedEventCount) {
GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request(calendarId);
GetCalendarEventsAction.Response response = client().execute(GetCalendarEventsAction.INSTANCE, request).actionGet();

assertThat(
"Calendar should have the expected number of events",
response.getResources().results().size(),
equalTo(expectedEventCount)
);
}

/**
* Test plugin that tracks UpdateProcessAction calls with updateScheduledEvents=true
* using an ActionFilter to verify native process interaction in integration tests
*/
public static class UpdateProcessActionTrackerPlugin extends Plugin implements ActionPlugin {

public static final AtomicInteger updateProcessCallCount = new AtomicInteger(0);
public static final List<String> updatedJobIds = Collections.synchronizedList(new ArrayList<>());

public static void reset() {
updateProcessCallCount.set(0);
updatedJobIds.clear();
}

@Override
public List<ActionFilter> getActionFilters() {
return List.of(new ActionFilter() {
@Override
public int order() {
return 0;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse> void apply(
Task task,
String action,
Request request,
ActionListener<Response> listener,
ActionFilterChain<Request, Response> chain
) {
if (UpdateProcessAction.NAME.equals(action) && request instanceof UpdateProcessAction.Request) {
UpdateProcessAction.Request updateRequest = (UpdateProcessAction.Request) request;
if (updateRequest.isUpdateScheduledEvents()) {
updateProcessCallCount.incrementAndGet();
updatedJobIds.add(updateRequest.getJobId());
}
}
chain.proceed(task, action, request, listener);
}
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -42,6 +44,8 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<
PostCalendarEventsAction.Request,
PostCalendarEventsAction.Response> {

private static final Logger logger = LogManager.getLogger(TransportPostCalendarEventsAction.class);

private final Client client;
private final JobResultsProvider jobResultsProvider;
private final JobManager jobManager;
Expand Down Expand Up @@ -75,6 +79,13 @@ protected void doExecute(
List<ScheduledEvent> events = request.getScheduledEvents();

ActionListener<Calendar> calendarListener = ActionListener.wrap(calendar -> {
logger.debug(
"Calendar [{}] accepted for background update: {} jobs with {} events",
request.getCalendarId(),
calendar.getJobIds().size(),
events.size()
);

BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();

for (ScheduledEvent event : events) {
Expand Down Expand Up @@ -102,13 +113,10 @@ protected void doExecute(
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
jobManager.updateProcessOnCalendarChanged(
calendar.getJobIds(),
ActionListener.wrap(
r -> listener.onResponse(new PostCalendarEventsAction.Response(events)),
listener::onFailure
)
);
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(r -> {
logger.debug("Calendar [{}] update initiated successfully", request.getCalendarId());
listener.onResponse(new PostCalendarEventsAction.Response(events));
}, listener::onFailure));
}

@Override
Expand Down
Loading