Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create end point for events #9691

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

oyvindeide
Copy link
Collaborator

Issue
Resolves #9001

Approach
Short description of the approach

(Screenshot of new behavior in GUI if applicable)

  • PR title captures the intent of the changes, and is fitting for release notes.
  • Added appropriate release note label
  • Commit history is consistent and clean, in line with the contribution guidelines.
  • Make sure unit tests pass locally after every commit (git rebase -i main --exec 'pytest tests/ert/unit_tests -n logical -m "not integration_test"')

When applicable

  • When there are user facing changes: Updated documentation
  • New behavior or changes to existing untested code: Ensured that unit tests are added (See Ground Rules).
  • Large PR: Prepare changes in small commits for more convenient review
  • Bug fix: Add regression test for the bug
  • Bug fix: Create Backport PR to latest release

Copy link

codspeed-hq bot commented Jan 9, 2025

CodSpeed Performance Report

Merging #9691 will not alter performance

Comparing oyvindeide:add_websockets_endpoint (c92e178) with main (a522de5)

Summary

✅ 25 untouched benchmarks

@oyvindeide oyvindeide force-pushed the add_websockets_endpoint branch 4 times, most recently from 06dfbd1 to 79c008e Compare January 28, 2025 11:25
@oyvindeide oyvindeide force-pushed the add_websockets_endpoint branch 2 times, most recently from c396805 to e734a28 Compare January 29, 2025 07:22
@oyvindeide oyvindeide force-pushed the add_websockets_endpoint branch from e734a28 to c373dd0 Compare January 29, 2025 14:43
Copy link
Contributor

@DanSava DanSava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions and comments

src/everest/bin/utils.py Outdated Show resolved Hide resolved
Comment on lines +244 to +283
if message:
event_dict = json.loads(message)
if "snapshot" in event_dict:
event_dict["snapshot"] = EnsembleSnapshot.from_nested_dict(
event_dict["snapshot"]
)
try:
event = EventWrapper(event=event_dict).event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you rewrite this:

                if message:
                    event_dict = json.loads(message)
                    if "snapshot" in event_dict:
                        event_dict["snapshot"] = EnsembleSnapshot.from_nested_dict(
                            event_dict["snapshot"]
                        )
                    try:
                        event = EventWrapper(event=event_dict).event

as

if message:
    try:
        event = status_event_from_json(message)

will make it a bit more clear and you no longer need EventWrapper

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this then 🤔

pydantic_core._pydantic_core.ValidationError: 1 validation error for tagged-union[ForwardModelStepStart,ForwardModelStepRunning,ForwardModelStepSuccess,ForwardModelStepFailure,ForwardModelStepChecksum,RealizationPending,RealizationRunning,RealizationSuccess,RealizationFailed,RealizationTimeout,RealizationUnknown,RealizationWaiting,RealizationResubmit,EESnapshot,EESnapshotUpdate,EETerminated,EEUserCancel,EEUserDone,EnsembleStarted,EnsembleSucceeded,EnsembleFailed,EnsembleCancelled]
  Input tag 'FullSnapshotEvent' found using 'event_type' does not match any of the expected tags: 'forward_model_step.start', 'forward_model_step.running', 'forward_model_step.success', 'forward_model_step.failure', 'forward_model_step.checksum', 'realization.pending', 'realization.running', 'realization.success', 'realization.failure', 'realization.timeout', 'realization.unknown', 'realization.waiting', 'realization.resubmit', 'ee.snapshot', 'ee.snapshot_update', 'ee.terminated', 'ee.user_cancel', 'ee.user_done', 'ensemble.started', 'ensemble.succeeded', 'ensemble.failed', 'ensemble.cancelled' [type=union_tag_invalid, input_value={'iteration_label': 'Runn...e': 'FullSnapshotEvent'}, input_type=dict]

Comment on lines +259 to +294
new_opt_status = _query_server(cert, auth, opt_endpoint)
if new_opt_status != opt_status:
opt_status = new_opt_status
ret = bool(callback({OPT_PROGRESS_ID: opt_status}))
stop |= ret
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should also add an issue to have a websocket endpoint for the optimization progress updates?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just add them to the same queue, and they will automatically be sent through the same end-point. Can create that issue.

continue

event = jsonable_encoder(item)
self.shared_data["events"].append(event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused by the subscriber pattern you are using to notify the websocket clients about the new events is it not possible to just sent the events to the same websocket endpoint once you get them from the status_queue ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be possible, but the subscribers are in case you connect a second monitor, or in cases where you dont connect a monitor before you start getting events. I have solved that here by subscribers where we know which events the have received, and if you connect at a later time, all the previous events are played back to you.

We could just cache the latest full snapshot and all the changes to it, but there are not that many events, so seemed more future proof to just save all of them and replay. Could start merging partial snapshots in the list, but then keeping track of where the subscribers are is more difficult.

src/everest/detached/jobs/everserver.py Outdated Show resolved Hide resolved
@oyvindeide oyvindeide force-pushed the add_websockets_endpoint branch 3 times, most recently from b9d4f73 to 66155f7 Compare January 31, 2025 08:04
@oyvindeide oyvindeide force-pushed the add_websockets_endpoint branch from 66155f7 to 6221630 Compare February 4, 2025 11:40
@oyvindeide oyvindeide force-pushed the add_websockets_endpoint branch from 6221630 to c92e178 Compare February 4, 2025 11:55
@oyvindeide oyvindeide self-assigned this Feb 4, 2025
@@ -345,7 +273,4 @@ async def test_status_contains_max_runtime_failure(

assert status["status"] == ServerStatus.failed
print(status["message"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

@@ -244,42 +192,22 @@ def mocked_server(url, verify, auth, timeout, proxies):
# The server should fail and store a user-friendly message.
assert status["status"] == ServerStatus.failed
assert OPT_FAILURE_REALIZATIONS in status["message"]
assert "job1 Failed with: job 1 error 1" in status["message"]
assert "job1 Failed with: job 1 error 2" in status["message"]
assert "job2 Failed with: job 2 error 1" in status["message"]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not have this information about the failure anymore?

@frode-aarstad
Copy link
Contributor

Seems like quite a few of the tests are testing less (asserting less) are we sure the changed tests tests everything we want ?


if isinstance(item, EndEvent):
break
await asyncio.sleep(0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this sleep needed for?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pass events from the Everest run model
4 participants