-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Fix ML calendar event update scalability issues #136886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix ML calendar event update scalability issues #136886
Conversation
- Refactor JobManager.submitJobEventUpdate() to bypass UpdateJobProcessNotifier queue - Use RefCountingListener for parallel calendar/filter updates - Add comprehensive logging throughout the system - Create CalendarScalabilityIT integration tests - Add helper methods to base test class Fixes issue where calendar events failed to update some jobs when associated with large numbers of jobs (>1000) due to queue capacity limits and sequential processing.
|
Hi @valeriy42, I've created a changelog YAML for you. |
…g to API calls and processing job updates asynchronously in the background.
…thub.com/valeriy42/elasticsearch into bugfix/limited-update-notification-queue
…e handling in JobManager to include skipped updates. Update logging to reflect skipped updates during background calendar processing.
…hods and updating job creation visibility. Enhance ScheduledEventsIT to verify asynchronous calendar updates and add a plugin for tracking UpdateProcessAction calls.
…the updated logging package. This change improves consistency and aligns with recent codebase updates.
|
Pinging @elastic/ml-core (Team:ML) |
…thub.com/valeriy42/elasticsearch into bugfix/limited-update-notification-queue
| updateListener.onResponse(Boolean.TRUE); | ||
| private boolean isExpectedFailure(Exception e) { | ||
| // Job deleted, closed, etc. - not real errors | ||
| return ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException || e.getMessage().contains("is not open"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be safer to be more explicit with this contains() check, to prevent an error with a similar message getting ignored when it shouldn't be? I think that the full error message should be "Cannot perform requested action because job [" + jobId + "] is not open" so maybe that's what we should check? You could even extract the code in TransportJobTaskAction,doExecute() that creates the error message into a static method and call that here so that this check is guaranteed to always have the correct string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I extended the check to be more explicit.
| // Post events and verify API completes quickly (async behavior) | ||
| long startTime = System.currentTimeMillis(); | ||
| postScheduledEvents(calendarId, events); | ||
| long duration = System.currentTimeMillis() - startTime; | ||
|
|
||
| assertThat("API should complete quickly with async implementation", duration, lessThan(5000L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a chance that this might end up being flaky if the machine running the test is overloaded and/or a GC happens while postScheduledEvents() is being called? Can we guarantee that it will never take longer than 5 seconds for postScheduledEvents() to return? Also, if the implementation wasn't async, could we guarantee that it would always take longer than 5 seconds? If not, then maybe we don't need this check since it's not providing much value, or perhaps some other way to differentiate between async and sync implementations could be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion will eventually fail in CI and the fix will be to bump the timeout at which point the assertion starts to become meaningless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the assertion. The real verification is that the ActionFilter captured the call (lines 525-536), and the async behavior is verified by the immediate response rather than waiting for completion.
| ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds, | ||
| contains(jobId) | ||
| ); | ||
| }, 5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this timeout need to be so small? It seems like something that could easily become flaky if the hardware running the test was overloaded. Unless the code is making a guarantee somewhere that it will always take less than 5 seconds for the action filter to be applied, this should probably be using a longer, default timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use esing the default assertBusy timeout.
| /** | ||
| * Test calendar updates with closed jobs (should not fail) | ||
| */ | ||
| public void testCalendarUpdateWithClosedJobs() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worthwhile having a test where there are both closed and non-closed jobs, and confirming that the closed ones are skipped but the non-closed ones are updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added testCalendarUpdateWithMixedOpenAndClosedJobs() to cover this use case.
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall idea makes sense to me. My concern is that all the new logging is mostly unnecessary and should be debug (especially the one that tracks the time it took to make the change).
...in/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UpdateJobProcessNotifier empties the queue every 1 second. To overflow that queue the cluster must have over 1,000 open jobs or there are multiple updates per job. The latter could happen if multiple calendars are updated in a less than a second.
Wouldn't it be simpler to make the queue in UpdateJobProcessNotifier unbounded or larger? then the behaviour of serialising the updates is preserved rather than firing off all updates at once.
The max number of open jobs on an node is limited by xpack.ml.max_open_jobs, the queue size could be a function of that setting and the number of ml nodes in the cluster + some overhead for multiple updates.
I also see optimisations that could be considered for a follow up PR. UpdateJobProcessNotifier should collapse all the calendar updates for a job as the latest calendar events are used anyway. In the case where 1,000 jobs are updated because a single calendar has changed then the search to get the calendar events is executed 1,000 times- once for each job. It would be better for each ml node to search the latest events then update all the jobs on that node.
| // Post events and verify API completes quickly (async behavior) | ||
| long startTime = System.currentTimeMillis(); | ||
| postScheduledEvents(calendarId, events); | ||
| long duration = System.currentTimeMillis() - startTime; | ||
|
|
||
| assertThat("API should complete quickly with async implementation", duration, lessThan(5000L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion will eventually fail in CI and the fix will be to bump the timeout at which point the assertion starts to become meaningless.
...de-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java
Show resolved
Hide resolved
| } | ||
| } else { | ||
| logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString); | ||
| logger.debug("[{}] No process update required for job update: {}", jobUpdate.getJobId(), jobUpdate.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a supplier means that jobUpdate.toString() won't be evaluated unless debug level logging is enabled. jobUpdate.getJobId() is trivial but jobUpdate.toString() is not. What's the reasoning behind this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. I think I changed the logger to use elastic's one and now I have the following error:
error: method debug in interface Logger cannot be applied to given types;
logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString);
But I'll rewrite to prevent premature execution.
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
Show resolved
Hide resolved
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java
Outdated
Show resolved
Hide resolved
…bugfix/limited-update-notification-queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the reviews. I updated code and answered questions.
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
Outdated
Show resolved
Hide resolved
...in/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
Outdated
Show resolved
Hide resolved
| updateListener.onResponse(Boolean.TRUE); | ||
| private boolean isExpectedFailure(Exception e) { | ||
| // Job deleted, closed, etc. - not real errors | ||
| return ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException || e.getMessage().contains("is not open"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I extended the check to be more explicit.
…roved readability
I considered this approach, but bypassing the queue is the better solution for several reasons:
The optimization you mentioned (collapsing calendar updates per job) is a great follow-up idea and would work well with this parallel approach. I'll create an issue to capture it an work on it later. |
|
@DonalEvans , @benwtrent , @davidkyle thank you for your comments. I introduced the suggested changed. Looking forward to your new feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes issue where calendar events failed to update some jobs when associated with large numbers of jobs (>1000) due to queue capacity limits and sequential processing.
Problem:
UpdateJobProcessNotifierhas a 1000-item queue and processes updates sequentially. It usesoffer()on the queue, which silently drops updates when the queue is full.However, calendar/filter updates don't need ordering guarantees. Hence,
JobManager.submitJobEventUpdate()can bypass the queue and avoid the bottleneck of the queue size.Another problem is the "fire-and-forget" pattern:
submitJobEventUpdate()returns immediately without waiting for the update to complete. I introduceRefCountingListenerto track the calendar updates. We start a background thread that updates the jobs and tracks succeeded, failed, and skipped jobs, while the request is returned immediately to prevent a timeout.Finally, if the problem with failed job updates persists, I enhanced the logging throughout the system to create a trace for future diagnostics.
Fixes #129777