Skip to content

kgen: parallelize #9841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 1, 2022
Merged

kgen: parallelize #9841

merged 7 commits into from
Jan 1, 2022

Conversation

benesch
Copy link
Contributor

@benesch benesch commented Jan 1, 2022

Parallelize kgen and use it in cloudbench. More details in individual commits.

The goal here is to remove mzbench (see #9779).

Motivation

  • This PR refactors existing code.

Tips for reviewer

  • The diff is a bit smaller if viewed with whitespace hidden.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR adds a release note for any user-facing behavior changes.

@benesch benesch requested a review from umanwizard January 1, 2022 19:59
@benesch
Copy link
Contributor Author

benesch commented Jan 1, 2022

I verified the changes to cloudbench—the avro_ingest benchmark still works just fine. I also did a few quick experiments and the new kgen is as fast as kafka-avro-generator, if not a touch faster. They both produce 100MM records in ~25s on my 32 CPU machine.

Comment on lines -119 to -121
let f = self.bytes.get_mut(&p).unwrap();
let mut val = vec![];
f(&mut val);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is strange; I don't remember at all why I had these take the output vector as a function argument rather than just returning Vec<u8>. Any insight?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really! I can only assume you wrote the signature of the generator functions to re-use buffers before you realized that the Avro API made that infeasible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds plausible.

return Err(e.into());
Retry::default()
.clamp_backoff(Duration::from_secs(1))
.retry(|_| match producer.send(rec.take().unwrap()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unwrap will cause a panic when we get a non-"QueueFull" error, because we will retry without resetting rec (in the previous code, we would return the error in that case).

That's probably fine because the behavior in either case is the program failing, but if we do choose to have that behavior, we should make it more explicit (e.g., panic on receiving the error, rather than here).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point! I'll address.


use futures::{ready, Stream, StreamExt};
use pin_project::pin_project;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::time::Duration;
use tokio::time::{self, Duration, Instant, Sleep};

// TODO(benesch): remove this if the `duration_constants` feature stabilizes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zomg!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually went to see if duration_constants was stabilized, saw that it hadn't been, and got sad. Did not think to see if Duration::MAX had escaped separately! Tyvm!

benesch added a commit to benesch/materialize that referenced this pull request Jan 1, 2022
mzbench has been obsoleted by the feature benchmark framework (in
test/feature-benchmark) and the cloudbench tool (bin/cloudbench).  The
kafka-avro-generator tool has been obsoleted by parallelizing kgen
directly (MaterializeInc#9841). So this commit removes mzbench.

To expound on the rational for removing mzbench:

  * mzbench configurations require an unmaintainable duplication of
    mzcompose.yml files. Each mzbench configuration contains 300+ lines of
    nearly identical definitions. There was talk of improving this (see
    MaterializeInc#6676), but the plans never came to fruition.

  * The interplay between mzbench and mzcompose is unnecessarily
    delicate. mzbench expects a composition with workflows named just
    so, and then parses their output. This makes it very difficult to
    refactor the underlying compositions, since you don't know if you're
    breaking the contract with mzbench. I think most of mzbench's
    features could be recreated much more simply with an e.g.
    `--num-trials` parameter to mzcompose.

  * mzbench introduced quite a bit of complexity by trying to be both a demo
    of using Materialize to power a real-time dashboard [0] and a
    benchmarking framework. Experience suggests that this results in a
    tool that is a suboptimal dashboard and a suboptimal benchmarking
    framework. Better to have two separate tools optimized for their
    specific purpose.

The new feature benchmarking framework resolves the above concerns. It
is only focused on being a benchmarking framework and does not suffer
from the code duplication problem.

[0]: https://github.com/MaterializeInc/materialize/blob/45586f38a/doc/developer/mzbench.md#worker-balance-visualization
Having a separate `RetryStream` cleanly separates the specification of a
retry policy from the state required to execute that policy. A
forthcoming commit will add a synchronous retry API which makes the
separation of policy from implementation more important.

This commit makes the new `RetryStream` an internal implementation
detail rather than part of the public API, as it doesn't seem useful
outside of `Retry::retry` and `RetryReader`. If we *do* discover a use
for it, it's easy to slap a `pub` on the new `RetryStream` type down the
road.
To make space for a synchronous Retry::retry method.
This operates identically to `Retry::retry_async` but uses
`std::thread::sleep` to wait rather than Tokio timers.
The duration_constants feature actually isn't stable yet, but
Duration::MAX got stabilized separately.

h/t @umanwizard
This should improve throughput when the rkdkafka producer queue fills
up.
Teach kgen to optionally spawn multiple threads, defaulting to the
number of physical CPUs available on the machine.

Thread safety made this surprisingly irritating. This commit refactors
the Avro generator so that the ThreadRng is only ever passed as a
parameter, never stored, as otherwise the Avro generator does not
implement `Send`. It also introduces a rather goofy `Generator` trait
whose only purpose is to make it possible to clone the generator
closures.
kafka-avro-generator is going away soon. Use kgen directly instead.
@benesch benesch mentioned this pull request Jan 1, 2022
2 tasks
@benesch benesch enabled auto-merge January 1, 2022 21:54
@benesch benesch merged commit 19b3d67 into MaterializeInc:main Jan 1, 2022
@benesch benesch deleted the kgen-refactor branch January 1, 2022 22:11
benesch added a commit to benesch/materialize that referenced this pull request Jan 4, 2022
mzbench has been obsoleted by the feature benchmark framework (in
test/feature-benchmark) and the cloudbench tool (bin/cloudbench).  The
kafka-avro-generator tool has been obsoleted by parallelizing kgen
directly (MaterializeInc#9841). So this commit removes mzbench.

To expound on the rational for removing mzbench:

  * mzbench configurations require an unmaintainable duplication of
    mzcompose.yml files. Each mzbench configuration contains 300+ lines of
    nearly identical definitions. There was talk of improving this (see
    MaterializeInc#6676), but the plans never came to fruition.

  * The interplay between mzbench and mzcompose is unnecessarily
    delicate. mzbench expects a composition with workflows named just
    so, and then parses their output. This makes it very difficult to
    refactor the underlying compositions, since you don't know if you're
    breaking the contract with mzbench. I think most of mzbench's
    features could be recreated much more simply with an e.g.
    `--num-trials` parameter to mzcompose.

  * mzbench introduced quite a bit of complexity by trying to be both a demo
    of using Materialize to power a real-time dashboard [0] and a
    benchmarking framework. Experience suggests that this results in a
    tool that is a suboptimal dashboard and a suboptimal benchmarking
    framework. Better to have two separate tools optimized for their
    specific purpose.

The new feature benchmarking framework resolves the above concerns. It
is only focused on being a benchmarking framework and does not suffer
from the code duplication problem.

[0]: https://github.com/MaterializeInc/materialize/blob/45586f38a/doc/developer/mzbench.md#worker-balance-visualization
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants