Skip to content

Conversation

untitaker
Copy link
Member

@untitaker untitaker commented Jul 18, 2025

we already use localbroker in one test, we can make the integration tests much faster and add more examples. some of the examples are failing but i don't have time to investigate and skipped them

Also change how we clean up topics, so that a test can be run multiple
times without cross-contamination. We should probably move to topic
names with UUIDs in them though and stop messing with getsentry's topics
@untitaker untitaker marked this pull request as ready for review August 12, 2025 15:43
@untitaker untitaker requested a review from a team as a code owner August 12, 2025 15:43
self.clock.sleep(1.0)
# Process any pending window closes
if self.processor is not None:
self.processor._run_once()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: PipelineTestHarness Time Precision and Dependency Issues

The PipelineTestHarness class has two issues:

  1. The advance_time method truncates the seconds float parameter to an integer, losing fractional precision which can prevent windowing operations from triggering correctly.
  2. It directly calls the private _run_once() method of StreamProcessor, creating a brittle dependency on Arroyo's internal implementation that may break.
Fix in Cursor Fix in Web

Copy link
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change generally. One thing I'd like to change.

env:
FLINK_LIBS: ./flink_libs

integration-tests:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep this as a separate CI check, to avoid clutter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure? to me this is just another kind of test and can be part of tests/

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this can replace @evanh 's integration test as it uses the Python Arroyo adapter. We do not support that adapter anymore. @evanh 's tests use the real prod runner with the rust adapter that we would use in prod.

While I like the applaud the idea of using the local broker more for tests I see two possible path forward:

  • go on with this PR but reinstate @evanh 's test because the Python Arroyo adapter does not replace an integration test with the Rust Adapter we are actively developing.
  • Support the local broker in the rust adapter and use that one.

Comment on lines +86 to +93
self.adapter = ArroyoAdapter.build(
{
"env": {},
"steps_config": steps_config,
},
consumers,
producers,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the runtime we would use in prod. It is also not maintained and is going to be deleted soon.
I don't think we can rely on this one for integration tests. @evanh 's previous implementation (by running the actual runner tested the code we would run in production, which is the rust adapter). We should not move back to the python one.

I do support the idea of a harness to test the pipeline end to end with the local broker and those can be considered standard tests, they do not have anything special that make them being a separate suite. Though:

  1. It has to run the code in the rust adapter or at least in one that we maintain.
  2. I still see a value in having 1-2 integration test where we start with the runner file and use a real Kafka. Those are a like Sentry acceptance tests.

There is a substantial difference between this test and @evanh's integration test: the adapter is not the same so the only relevant part is the pipeline code. Evan's test is actually testing end to end what SBC would use.

Copy link
Member Author

@untitaker untitaker Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, yeah you're right. would it be sufficient to move to rust-arroyo and its localbroker concept? not sure how exactly we'd do it but in principle my preferred solution to keep a real kafka broker out of this

i am not thrilled about having integration tests since they have been demonstrated to be really slow especially in this repo. i also think that we will at some point have to have a real story for testing in getsentry where integration tests are expensive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would definitely prefer a local broker over the actual Kafka broker. I did it the way I did because I couldn't figure out a clean way to make rust arroyo + local broker work without significant refactoring.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not thrilled about having integration tests since they have been demonstrated to be really slow especially in this repo.

I'd still think having 1 smoke test that uses a real kafka is valuable. It is a basic acceptance test, the last line of defense. One is enough.
For the rest I agree local broker is the way to go.


extract_steps(pipeline)

# If extraction failed, use defaults
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this fail in a valid scenario ?

Comment on lines +73 to +76
# Setup ArroyoAdapter with LocalBroker
consumers: Dict[str, KafkaConsumer] = {}
producers: Dict[str, KafkaProducer] = {}
steps_config: Dict[str, Dict[str, Dict[str, Any]]] = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered subclassing the Adapter making it a TestingAdapter that uses localConsumer and LocalProducer instead of a real one. That should simplify this PR considerably.

Comment on lines +43 to +54
import importlib.util

# Dynamically import the pipeline module
spec = importlib.util.spec_from_file_location(f"example_{test.name}", test.pipeline_module)
if spec is None or spec.loader is None:
raise ImportError(f"Cannot load module from {test.pipeline_module}")

module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)

# Get the pipeline from the module
pipeline = module.pipeline
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we could use this code in the runner as well and in other tests. Why not putting it in a module in the application code rather than test code ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants