Skip to content

Support making atomic write requests idempotent #2197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
8 of 10 tasks
empiredan opened this issue Feb 21, 2025 · 0 comments
Open
8 of 10 tasks

Support making atomic write requests idempotent #2197

empiredan opened this issue Feb 21, 2025 · 0 comments
Assignees
Labels
type/enhancement Indicates new feature requests

Comments

@empiredan
Copy link
Contributor

empiredan commented Feb 21, 2025

Motivation

Pegasus does not support duplicating atomic write requests including incr, check_and_set and check_and_mutate since they are not idempotent. In practice, various applications use atomic write interfaces in many scenarios. However, such applications cannot use duplication to synchronize data, and therefore cannot benefit from the high performance that duplication provides.

Design

Due to the urgency of the requirements, the first version of the idempotent implementation for the atomic writes should be as simple as possible, without making fundamental changes to the write path.

Therefore, we decided to implement the idempotence of atomic write requests as follows: for each replica, ensure that only one atomic write request is being processed in the write pipeline at any given time. Once the replica server receives an atomic request, firstly it will be cached. It will not be pushed into the write pipeline until all requests before it have been applied. The write pipeline consists of the following stages:

  1. read the current value from RocksDB, calculate the final value according to specific semantics of requested atomic write and build the idempotent request based on it;
  2. append the corresponding mutation to plog;
  3. broadcast the prepare requests to the secondary replicas;
  4. apply the final result back to RocksDB ultimately.

The primary replicas have all 1 ~ 4 stages, and at last reply to the client while the secondary replicas only have stages 2 and 4.

Task List

Support idempotence in replica server

Make incr requests idempotent:

Make check_and_set requests idempotent:

Infrastructure for idempotence:

Support idempotence in meta server

Support idempotence in shell

@empiredan empiredan added the type/enhancement Indicates new feature requests label Feb 21, 2025
empiredan added a commit that referenced this issue Feb 21, 2025
…egasus_server_write` and `replication_app_base` (#2196)

#2197

To support idempotence, a new interface `make_idempotent()` is introduced and
an existing interface `on_batched_write_requests()` is changed for both the classes
`pegasus_server_write` and `replication_app_base`. This is different from what we
have done for `pegasus_write_service` and `pegasus_write_service::impl`, both of
which provide `make_idempotent()` and `put()` by the following PRs:
- #2185
- #2192

`make_idempotent()` for `replication_app_base` is provided as a virtual function
called by primary replicas, implemented internally by `pegasus_server_impl` and
`pegasus_server_write`. `on_batched_write_requests`  for `replication_app_base`
is the same. It is changed with a new parameter `original_request` added. It is just
the original request received from the client. It must be an atomic request (i.e. `incr`,
`check_and_set` and `check_and_mutate`) if it is non-null, and used to decide if a
write request is atomic and generate the response corresponding to the atomic write
request.
@empiredan empiredan self-assigned this Feb 25, 2025
acelyc111 pushed a commit that referenced this issue Mar 10, 2025
…rimary replicas (#2198)

#2197

Suppose that a client issues an `incr` request to increase the base value `100` by `1`.
If the current configuration requires all atomic write requests to be idempotent, the
primary replica will make this request idempotent by following steps after receiving
it:
1. A mutation with `is_blocking = true` will be created to store this request and then
added to the mutation queue as a blocking mutation.
2. Once this blocking mutation is ready to get popped, it will be the first element of
the entire queue, thereby blocking it (which means any mutation cannot be dequeued
from it).
3. This blocking mutation cannot get popped until all previous write requests have
been applied.
4. After popped, the current base value `100` is read from the storage engine, and after
performing the `incr` operation, a single put request is created to store the final value
`101`.
5. Another mutation is then created to store this idempotent single put request, which is
subsequently added to the write pipeline, including writing to `plog` and broadcasting
to secondary replicas.
empiredan added a commit that referenced this issue Mar 18, 2025
… to list/control idempotence for atomic writes (#2205)

#2197

We support enabling the idempotence of atomic write operations at the **meta**
level.

First, we introduce `atomic_idempotent` as **an attribute** of the `app_info` structure,
which represents the basic properties of a table. This attribute is also persisted to
remote storage and will be loaded from remote storage when the meta server starts.  

As for the **interface**, we implement the following on the meta server:
- Support specifying this attribute when creating a table.
- List API will return the entire `app_info` object with `atomic_idempotent` for each
table.  
- Support getting/setting this attribute for any table.
empiredan added a commit that referenced this issue Apr 14, 2025
…nt atomic writes (#2214)

#2197

Previously in #2198, we implemented
idempotence for each atomic write by blocking the entire mutation queue until the 2PC
pipeline was drained. However, this significantly affects performance since the pipeline
is stalled and all write requests get stuck in the mutation queue.

To address this performance issue, we introduce a row-lock mechanism: each hash key
and the highest decree currently in the 2PC phase are recorded in a hash table. For each
atomic write request, if the maximum decree associated with its hash key has not yet been
applied to the storage engine, the request is blocked in the mutation queue. Otherwise,
the hash key is considered unlocked and the request can proceed into the 2PC phase at
any time.

To avoid the performance overhead of deserialization, we directly use the `partition_hash`
(an unsigned 64-bit integer) from the client instead of the hash key. This also makes memory
usage more predictable, as the `partition_hash` has a fixed size.

Additionally, to mitigate the performance impact caused by frequent insertions and deletions
in the row-lock hash table, we introduce an LRU strategy: keys are only evicted when the hash
table exceeds a certain size threshold, and only the least recently used keys with no active
usage are removed.

Give a concrete example to illustrate how atomic write requests are handled after introducing
row locks. Suppose a client issues an `incr` request to a primary replica. If the primary replica
has been configured to make all atomic write requests idempotent, then:
1. A mutation will be created as a blocking candidate to hold this atomic write request and then
appended to the mutation queue.
2. This mutation will be blocked and cannot get popped once the hash key contained in it is
locked(i.e. the maximum decree associated with the hash key has not been applied to the
storage engine).
3. This mutation can get popped only after its hash key becomes unlocked.
4. Popped from the mutation queue, the current base value 100 is read from the storage
engine, and create a single put request to store the final value 101.
5. Another mutation is then created to hold this idempotent single put request.
6. Subsequently the new mutation enters 2PC phase, appended to `plog` and broadcast to
secondary replicas.
empiredan added a commit that referenced this issue Apr 14, 2025
… `.app-info` file for each replica (#2220)

#2197

In #2205, we introduced a
new attribute `atomic_idempotent` to meta, which is used to enable/disable
the idempotence of atomic write operations.

This attribute should be broadcast to each replica: whenever one replica is
promoted to the primary, by this attribute it can decide whether to make all
atomic writes idempotent. This attribute will be persisted into `.app-info`
file to ensure it can be loaded after restarted.
empiredan added a commit that referenced this issue Apr 15, 2025
…ls` commands on shell (#2221)

#2197

Support `atomic_idempotent` for `create` and `ls` by:
- Add a flag to the `create` command to decide whether the created table is
`atomic_idempotent` (false by default).
- Add a column `atomic_idempotent` to the result shown by `ls` command.
empiredan added a commit that referenced this issue Apr 15, 2025
…to decide whether to make all atomic writes idempotent (#2222)

#2197

Since in #2220 `atomic_idempotent`
was synced from meta server to replica server, it could be used to decide whether a
primary replica makes all atomic writes idempotent.
empiredan added a commit that referenced this issue Apr 17, 2025
…mpotent` on shell (#2229)

#2197

Following commands are supported on shell to operate `atomic_idempotent`,
which decides whether all atomic requests written to a table are idempotent":
- `get_atomic_idempotent` 
- `enable_atomic_idempotent`
- `disable_atomic_idempotent`

All of these commands are based on a table, thus an argument `app_name`
is required.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement Indicates new feature requests
Projects
None yet
Development

No branches or pull requests

1 participant