From 5b0aaf1acde6572e2db2c5883d11ea954158c593 Mon Sep 17 00:00:00 2001 From: Hang Chen Date: Mon, 29 Jan 2024 11:43:19 +0800 Subject: [PATCH 1/5] Generate 4.16.4 website doc (#4191) ### Modification Generate 4.16.4 website docs --- site3/website/docusaurus.config.js | 2 +- .../admin/autorecovery.md | 0 .../admin/bookies.md | 0 .../admin/decomission.md | 0 .../admin/geo-replication.md | 0 .../admin/http.md | 0 .../admin/metrics.md | 0 .../admin/perf.md | 0 .../admin/placement.md | 0 .../admin/upgrade.md | 0 .../api/distributedlog-api.md | 0 .../api/ledger-adv-api.md | 0 .../api/ledger-api.md | 8 ++-- .../api/overview.md | 0 .../deployment/kubernetes.md | 0 .../deployment/manual.md | 0 .../development/codebase.md | 0 .../development/protocol.md | 0 .../getting-started/concepts.md | 0 .../getting-started/installation.md | 0 .../getting-started/run-locally.md | 0 .../overview/overview.md | 6 +-- .../reference/cli.md | 0 .../reference/config.md | 0 .../security/overview.md | 0 .../security/sasl.md | 0 .../security/tls.md | 0 .../security/zookeeper.md | 0 ...bars.json => version-4.16.4-sidebars.json} | 46 +++++++++---------- site3/website/versions.json | 2 +- 30 files changed, 32 insertions(+), 32 deletions(-) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/autorecovery.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/bookies.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/decomission.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/geo-replication.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/http.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/metrics.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/perf.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/placement.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/admin/upgrade.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/api/distributedlog-api.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/api/ledger-adv-api.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/api/ledger-api.md (99%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/api/overview.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/deployment/kubernetes.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/deployment/manual.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/development/codebase.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/development/protocol.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/getting-started/concepts.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/getting-started/installation.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/getting-started/run-locally.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/overview/overview.md (96%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/reference/cli.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/reference/config.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/security/overview.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/security/sasl.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/security/tls.md (100%) rename site3/website/versioned_docs/{version-4.16.3 => version-4.16.4}/security/zookeeper.md (100%) rename site3/website/versioned_sidebars/{version-4.16.3-sidebars.json => version-4.16.4-sidebars.json} (68%) diff --git a/site3/website/docusaurus.config.js b/site3/website/docusaurus.config.js index 0beff46af65..c001c3ef29f 100644 --- a/site3/website/docusaurus.config.js +++ b/site3/website/docusaurus.config.js @@ -7,7 +7,7 @@ const baseUrl = process.env.BASE_URL || "/" const deployUrl = process.env.DEPLOY_URL || "https://bookkeeper.apache.org"; const variables = { /** They are used in .md files*/ - latest_release: "4.16.3", + latest_release: "4.16.4", stable_release: "4.14.8", github_repo: "https://github.com/apache/bookkeeper", github_master: "https://github.com/apache/bookkeeper/tree/master", diff --git a/site3/website/versioned_docs/version-4.16.3/admin/autorecovery.md b/site3/website/versioned_docs/version-4.16.4/admin/autorecovery.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/autorecovery.md rename to site3/website/versioned_docs/version-4.16.4/admin/autorecovery.md diff --git a/site3/website/versioned_docs/version-4.16.3/admin/bookies.md b/site3/website/versioned_docs/version-4.16.4/admin/bookies.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/bookies.md rename to site3/website/versioned_docs/version-4.16.4/admin/bookies.md diff --git a/site3/website/versioned_docs/version-4.16.3/admin/decomission.md b/site3/website/versioned_docs/version-4.16.4/admin/decomission.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/decomission.md rename to site3/website/versioned_docs/version-4.16.4/admin/decomission.md diff --git a/site3/website/versioned_docs/version-4.16.3/admin/geo-replication.md b/site3/website/versioned_docs/version-4.16.4/admin/geo-replication.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/geo-replication.md rename to site3/website/versioned_docs/version-4.16.4/admin/geo-replication.md diff --git a/site3/website/versioned_docs/version-4.16.3/admin/http.md b/site3/website/versioned_docs/version-4.16.4/admin/http.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/http.md rename to site3/website/versioned_docs/version-4.16.4/admin/http.md diff --git a/site3/website/versioned_docs/version-4.16.3/admin/metrics.md b/site3/website/versioned_docs/version-4.16.4/admin/metrics.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/metrics.md rename to site3/website/versioned_docs/version-4.16.4/admin/metrics.md diff --git a/site3/website/versioned_docs/version-4.16.3/admin/perf.md b/site3/website/versioned_docs/version-4.16.4/admin/perf.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/perf.md rename to site3/website/versioned_docs/version-4.16.4/admin/perf.md diff --git a/site3/website/versioned_docs/version-4.16.3/admin/placement.md b/site3/website/versioned_docs/version-4.16.4/admin/placement.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/placement.md rename to site3/website/versioned_docs/version-4.16.4/admin/placement.md diff --git a/site3/website/versioned_docs/version-4.16.3/admin/upgrade.md b/site3/website/versioned_docs/version-4.16.4/admin/upgrade.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/admin/upgrade.md rename to site3/website/versioned_docs/version-4.16.4/admin/upgrade.md diff --git a/site3/website/versioned_docs/version-4.16.3/api/distributedlog-api.md b/site3/website/versioned_docs/version-4.16.4/api/distributedlog-api.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/api/distributedlog-api.md rename to site3/website/versioned_docs/version-4.16.4/api/distributedlog-api.md diff --git a/site3/website/versioned_docs/version-4.16.3/api/ledger-adv-api.md b/site3/website/versioned_docs/version-4.16.4/api/ledger-adv-api.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/api/ledger-adv-api.md rename to site3/website/versioned_docs/version-4.16.4/api/ledger-adv-api.md diff --git a/site3/website/versioned_docs/version-4.16.3/api/ledger-api.md b/site3/website/versioned_docs/version-4.16.4/api/ledger-api.md similarity index 99% rename from site3/website/versioned_docs/version-4.16.3/api/ledger-api.md rename to site3/website/versioned_docs/version-4.16.4/api/ledger-api.md index f94c364b2c7..7160c9e0b87 100644 --- a/site3/website/versioned_docs/version-4.16.3/api/ledger-api.md +++ b/site3/website/versioned_docs/version-4.16.4/api/ledger-api.md @@ -21,7 +21,7 @@ If you're using [Maven](https://maven.apache.org/), add this to your [`pom.xml`] ```xml -4.16.3 +4.16.4 @@ -37,7 +37,7 @@ shaded library, which relocate classes of protobuf and guava into a different na ```xml -4.16.3 +4.16.4 @@ -53,12 +53,12 @@ If you're using [Gradle](https://gradle.org/), add this to your [`build.gradle`] ```groovy dependencies { - compile group: 'org.apache.bookkeeper', name: 'bookkeeper-server', version: '4.16.3' + compile group: 'org.apache.bookkeeper', name: 'bookkeeper-server', version: '4.16.4' } // Alternatively: dependencies { - compile 'org.apache.bookkeeper:bookkeeper-server:4.16.3' + compile 'org.apache.bookkeeper:bookkeeper-server:4.16.4' } ``` diff --git a/site3/website/versioned_docs/version-4.16.3/api/overview.md b/site3/website/versioned_docs/version-4.16.4/api/overview.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/api/overview.md rename to site3/website/versioned_docs/version-4.16.4/api/overview.md diff --git a/site3/website/versioned_docs/version-4.16.3/deployment/kubernetes.md b/site3/website/versioned_docs/version-4.16.4/deployment/kubernetes.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/deployment/kubernetes.md rename to site3/website/versioned_docs/version-4.16.4/deployment/kubernetes.md diff --git a/site3/website/versioned_docs/version-4.16.3/deployment/manual.md b/site3/website/versioned_docs/version-4.16.4/deployment/manual.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/deployment/manual.md rename to site3/website/versioned_docs/version-4.16.4/deployment/manual.md diff --git a/site3/website/versioned_docs/version-4.16.3/development/codebase.md b/site3/website/versioned_docs/version-4.16.4/development/codebase.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/development/codebase.md rename to site3/website/versioned_docs/version-4.16.4/development/codebase.md diff --git a/site3/website/versioned_docs/version-4.16.3/development/protocol.md b/site3/website/versioned_docs/version-4.16.4/development/protocol.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/development/protocol.md rename to site3/website/versioned_docs/version-4.16.4/development/protocol.md diff --git a/site3/website/versioned_docs/version-4.16.3/getting-started/concepts.md b/site3/website/versioned_docs/version-4.16.4/getting-started/concepts.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/getting-started/concepts.md rename to site3/website/versioned_docs/version-4.16.4/getting-started/concepts.md diff --git a/site3/website/versioned_docs/version-4.16.3/getting-started/installation.md b/site3/website/versioned_docs/version-4.16.4/getting-started/installation.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/getting-started/installation.md rename to site3/website/versioned_docs/version-4.16.4/getting-started/installation.md diff --git a/site3/website/versioned_docs/version-4.16.3/getting-started/run-locally.md b/site3/website/versioned_docs/version-4.16.4/getting-started/run-locally.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/getting-started/run-locally.md rename to site3/website/versioned_docs/version-4.16.4/getting-started/run-locally.md diff --git a/site3/website/versioned_docs/version-4.16.3/overview/overview.md b/site3/website/versioned_docs/version-4.16.4/overview/overview.md similarity index 96% rename from site3/website/versioned_docs/version-4.16.3/overview/overview.md rename to site3/website/versioned_docs/version-4.16.4/overview/overview.md index 21166d428e6..c3b1b231b96 100644 --- a/site3/website/versioned_docs/version-4.16.3/overview/overview.md +++ b/site3/website/versioned_docs/version-4.16.4/overview/overview.md @@ -1,6 +1,6 @@ --- id: overview -title: Apache BookKeeper 4.16.3-SNAPSHOT +title: Apache BookKeeper 4.16.4-SNAPSHOT --- -This documentation is for Apache BookKeeper™ version 4.16.3. +This documentation is for Apache BookKeeper™ version 4.16.4. Apache BookKeeper™ is a scalable, fault-tolerant, low-latency storage service optimized for real-time workloads. It offers durability, replication, and strong consistency as essentials for building reliable real-time applications. @@ -39,7 +39,7 @@ Object/[BLOB](https://en.wikipedia.org/wiki/Binary_large_object) storage | Stori Learn more about Apache BookKeeper™ and what it can do for your organization: -- [Apache BookKeeper 4.16.3 Release Notes](/release-notes#4163) +- [Apache BookKeeper 4.16.4 Release Notes](/release-notes#4164) - [Java API docs]({{ site.javadoc_base_url }}) Or start [using](../getting-started/installation) Apache BookKeeper today. diff --git a/site3/website/versioned_docs/version-4.16.3/reference/cli.md b/site3/website/versioned_docs/version-4.16.4/reference/cli.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/reference/cli.md rename to site3/website/versioned_docs/version-4.16.4/reference/cli.md diff --git a/site3/website/versioned_docs/version-4.16.3/reference/config.md b/site3/website/versioned_docs/version-4.16.4/reference/config.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/reference/config.md rename to site3/website/versioned_docs/version-4.16.4/reference/config.md diff --git a/site3/website/versioned_docs/version-4.16.3/security/overview.md b/site3/website/versioned_docs/version-4.16.4/security/overview.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/security/overview.md rename to site3/website/versioned_docs/version-4.16.4/security/overview.md diff --git a/site3/website/versioned_docs/version-4.16.3/security/sasl.md b/site3/website/versioned_docs/version-4.16.4/security/sasl.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/security/sasl.md rename to site3/website/versioned_docs/version-4.16.4/security/sasl.md diff --git a/site3/website/versioned_docs/version-4.16.3/security/tls.md b/site3/website/versioned_docs/version-4.16.4/security/tls.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/security/tls.md rename to site3/website/versioned_docs/version-4.16.4/security/tls.md diff --git a/site3/website/versioned_docs/version-4.16.3/security/zookeeper.md b/site3/website/versioned_docs/version-4.16.4/security/zookeeper.md similarity index 100% rename from site3/website/versioned_docs/version-4.16.3/security/zookeeper.md rename to site3/website/versioned_docs/version-4.16.4/security/zookeeper.md diff --git a/site3/website/versioned_sidebars/version-4.16.3-sidebars.json b/site3/website/versioned_sidebars/version-4.16.4-sidebars.json similarity index 68% rename from site3/website/versioned_sidebars/version-4.16.3-sidebars.json rename to site3/website/versioned_sidebars/version-4.16.4-sidebars.json index 970cb708173..b18efb25991 100644 --- a/site3/website/versioned_sidebars/version-4.16.3-sidebars.json +++ b/site3/website/versioned_sidebars/version-4.16.4-sidebars.json @@ -2,7 +2,7 @@ "docsSidebar": [ { "type": "doc", - "id": "version-4.16.3/overview/overview", + "id": "version-4.16.4/overview/overview", "label": "Overview" }, { @@ -12,17 +12,17 @@ "items": [ { "type": "doc", - "id": "version-4.16.3/getting-started/installation", + "id": "version-4.16.4/getting-started/installation", "label": "Installation" }, { "type": "doc", - "id": "version-4.16.3/getting-started/run-locally", + "id": "version-4.16.4/getting-started/run-locally", "label": "Run bookies locally" }, { "type": "doc", - "id": "version-4.16.3/getting-started/concepts", + "id": "version-4.16.4/getting-started/concepts", "label": "Concepts and architecture" } ] @@ -33,12 +33,12 @@ "items": [ { "type": "doc", - "id": "version-4.16.3/deployment/manual", + "id": "version-4.16.4/deployment/manual", "label": "Manual deployment" }, { "type": "doc", - "id": "version-4.16.3/deployment/kubernetes", + "id": "version-4.16.4/deployment/kubernetes", "label": "BookKeeper on Kubernetes" } ] @@ -49,32 +49,32 @@ "items": [ { "type": "doc", - "id": "version-4.16.3/admin/bookies", + "id": "version-4.16.4/admin/bookies", "label": "BookKeeper administration" }, { "type": "doc", - "id": "version-4.16.3/admin/autorecovery", + "id": "version-4.16.4/admin/autorecovery", "label": "AutoRecovery" }, { "type": "doc", - "id": "version-4.16.3/admin/metrics", + "id": "version-4.16.4/admin/metrics", "label": "Metrics collection" }, { "type": "doc", - "id": "version-4.16.3/admin/upgrade", + "id": "version-4.16.4/admin/upgrade", "label": "Upgrade" }, { "type": "doc", - "id": "version-4.16.3/admin/http", + "id": "version-4.16.4/admin/http", "label": "Admin REST API" }, { "type": "doc", - "id": "version-4.16.3/admin/decomission", + "id": "version-4.16.4/admin/decomission", "label": "Decommissioning Bookies" } ] @@ -85,22 +85,22 @@ "items": [ { "type": "doc", - "id": "version-4.16.3/api/overview", + "id": "version-4.16.4/api/overview", "label": "Overview" }, { "type": "doc", - "id": "version-4.16.3/api/ledger-api", + "id": "version-4.16.4/api/ledger-api", "label": "Ledger API" }, { "type": "doc", - "id": "version-4.16.3/api/ledger-adv-api", + "id": "version-4.16.4/api/ledger-adv-api", "label": "Advanced Ledger API" }, { "type": "doc", - "id": "version-4.16.3/api/distributedlog-api", + "id": "version-4.16.4/api/distributedlog-api", "label": "DistributedLog" } ] @@ -111,22 +111,22 @@ "items": [ { "type": "doc", - "id": "version-4.16.3/security/overview", + "id": "version-4.16.4/security/overview", "label": "Overview" }, { "type": "doc", - "id": "version-4.16.3/security/tls", + "id": "version-4.16.4/security/tls", "label": "TLS Authentication" }, { "type": "doc", - "id": "version-4.16.3/security/sasl", + "id": "version-4.16.4/security/sasl", "label": "SASL Authentication" }, { "type": "doc", - "id": "version-4.16.3/security/zookeeper", + "id": "version-4.16.4/security/zookeeper", "label": "ZooKeeper Authentication" } ] @@ -137,7 +137,7 @@ "items": [ { "type": "doc", - "id": "version-4.16.3/development/protocol", + "id": "version-4.16.4/development/protocol", "label": "BookKeeper protocol" } ] @@ -148,12 +148,12 @@ "items": [ { "type": "doc", - "id": "version-4.16.3/reference/config", + "id": "version-4.16.4/reference/config", "label": "Configuration" }, { "type": "doc", - "id": "version-4.16.3/reference/cli", + "id": "version-4.16.4/reference/cli", "label": "Command-line tools" } ] diff --git a/site3/website/versions.json b/site3/website/versions.json index fb1e43169c7..2b8455dbdbd 100644 --- a/site3/website/versions.json +++ b/site3/website/versions.json @@ -1,5 +1,5 @@ [ - "4.16.3", + "4.16.4", "4.15.5", "4.14.8", "4.13.0", From 7b1912f11c40978d6ee5920785dac69b0b85ea0b Mon Sep 17 00:00:00 2001 From: Hang Chen Date: Mon, 29 Jan 2024 11:43:48 +0800 Subject: [PATCH 2/5] Generate 4.16.4 release note (#4182) ### Motivation Generate BookKeeper 4.16.4 release note --- site3/website/src/pages/release-notes.md | 42 ++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/site3/website/src/pages/release-notes.md b/site3/website/src/pages/release-notes.md index 30bf767574c..0bd6217203e 100644 --- a/site3/website/src/pages/release-notes.md +++ b/site3/website/src/pages/release-notes.md @@ -1,6 +1,48 @@ # Release notes +## 4.16.4 + +Release 4.16.4 includes multiple bug fixes and improvements, also we have a few dependency updates. + +Apache BookKeeper users are encouraged to upgrade to 4.16.4 if you are using 4.16.x. +The technical details of this release are summarized below. + +### Highlights + +#### Bugs +* Fix calculate checkSum when using Java9IntHash [PR #4140](https://github.com/apache/bookkeeper/pull/4140) +* Fix the autorecovery failed replicate by add entry fenced error [PR #4163](https://github.com/apache/bookkeeper/pull/4163) +* Fixing memory leak error when using DirectEntryLogger [PR #4135](https://github.com/apache/bookkeeper/pull/4135) +* Fix bug of negative JournalQueueSize [PR #4077](https://github.com/apache/bookkeeper/pull/4077) +* Fix NoSuchElementException when rereplicate empty ledgers [PR #4039](https://github.com/apache/bookkeeper/pull/4039) +* Change the method getUnderreplicatedFragments to the package private [PR #4174](https://github.com/apache/bookkeeper/pull/4174) +* Fix auditor elector executor block problem. [PR #4165](https://github.com/apache/bookkeeper/pull/4165) +* Fix auditor thread leak problem. [PR #4162](https://github.com/apache/bookkeeper/pull/4162) +* Use Flaky flag to skip testBookieServerZKSessionExpireBehaviour test [PR #4144](https://github.com/apache/bookkeeper/pull/4144) +* Add ledgersCount.incrementAndGet in setExplicitLac function [PR #4138](https://github.com/apache/bookkeeper/pull/4138) +* Fix no known bookies after reset racks for all BKs [PR #4128](https://github.com/apache/bookkeeper/pull/4128) +* Fix a slow gc thread shutdown when compacting [PR #4127](https://github.com/apache/bookkeeper/pull/4127) +* Remove the unused logs in the CleanupLedgerManager.recordPromise [PR #4121](https://github.com/apache/bookkeeper/pull/4121) +* Fix Flaky-test: HandleFailuresTest.testHandleFailureBookieNotInWriteSet [PR #4110](https://github.com/apache/bookkeeper/pull/4110) +* Ignore the empty `perRegionPlacement` when RegionAwareEnsemblePlacementPolicy#newEnsemble [PR #4106](https://github.com/apache/bookkeeper/pull/4106) +* Fix LedgerHandle `ensembleChangeCounter` not used. [PR #4103](https://github.com/apache/bookkeeper/pull/4103) +* Tune the TestReplicationWorker test. [PR #4093](https://github.com/apache/bookkeeper/pull/4093) +* Make AuditorBookieTest#waitForNewAuditor stronger. [PR #4078](https://github.com/apache/bookkeeper/pull/4078) +* Print compaction progress [PR #4071](https://github.com/apache/bookkeeper/pull/4071) +* Fix readEntry parameter order [PR #4059](https://github.com/apache/bookkeeper/pull/4059) +* Skip sync the RocksDB when no changes [PR #3904](https://github.com/apache/bookkeeper/pull/3904) +* Try to use jdk api to create hardlink when rename file when compaction. [PR #3876](https://github.com/apache/bookkeeper/pull/3876) + +#### Dependency updates +* Upgrade Zookeeper to 3.8.3 to address CVE-2023-44981 [PR #4112](https://github.com/apache/bookkeeper/pull/4112) +* Update Jetty dependency [PR #4141](https://github.com/apache/bookkeeper/pull/4141) +* Upgrade bc-fips to 1.0.2.4 to fix CVE-2022-45146 [PR #3915](https://github.com/apache/bookkeeper/pull/3915) + +#### Details + +https://github.com/apache/bookkeeper/pulls?q=is%3Apr+label%3Arelease%2F4.16.4+is%3Amerged+ + ## 4.15.5 Release 4.15.5 includes multiple bug fixes and improvements, also we have a few dependency updates. From 1eceb5dd2180ab57e5b21be968206292bb825177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=A4=E6=9C=88?= <974226358@qq.com> Date: Mon, 29 Jan 2024 20:19:44 +0800 Subject: [PATCH 3/5] [improve][docker] fix yaml and dockerfile (#4186) Descriptions of the changes in this PR: 1. There are multiple spec.ports fields in sts yaml 2. The RUN command specified in dockerfile is not standardized Master Issue: #4185 --- deploy/kubernetes/gke/bookkeeper.statefulset.yml | 1 - tests/docker-images/statestore-image/Dockerfile | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/deploy/kubernetes/gke/bookkeeper.statefulset.yml b/deploy/kubernetes/gke/bookkeeper.statefulset.yml index 49548f09d05..ca32b91137d 100644 --- a/deploy/kubernetes/gke/bookkeeper.statefulset.yml +++ b/deploy/kubernetes/gke/bookkeeper.statefulset.yml @@ -124,7 +124,6 @@ metadata: app: bookkeeper component: bookie spec: - ports: ports: - name: bookie port: 3181 diff --git a/tests/docker-images/statestore-image/Dockerfile b/tests/docker-images/statestore-image/Dockerfile index 7605541188d..e89175be811 100644 --- a/tests/docker-images/statestore-image/Dockerfile +++ b/tests/docker-images/statestore-image/Dockerfile @@ -40,10 +40,10 @@ RUN set -x \ && apt-get install -y --no-install-recommends python3 pip \ && ln -s /usr/bin/python3 /usr/bin/python \ && apt-get install -y --no-install-recommends gpg gpg-agent wget sudo \ - && apt-get -y --purge autoremove \ - && apt-get autoclean \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* + && apt-get -y --purge autoremove \ + && apt-get autoclean \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ && pip install zk-shell \ && mkdir -pv /opt \ && cd /opt From dc2bb1dfed38d7f9ad7a29fc92bd907e51af5b21 Mon Sep 17 00:00:00 2001 From: Hang Chen Date: Thu, 1 Feb 2024 14:49:07 +0800 Subject: [PATCH 4/5] Enable reorder read sequence for bk client by default (#4139) ### Motivation If one ledger's ensemble is [bk0, bk1] and bk0 is down, the bookie client may send a read request to bk0 first then fail with the following errors, and resend the read request to bk1 in the end. ``` 2023-10-19T18:33:52,042 - ERROR - [BookKeeperClientWorker-OrderedExecutor-3-0:PerChannelBookieClient@563] - Cannot connect to 192.168.31.216:3181 as endpoint resolution failed (probably bookie is down) err org.apache.bookkeeper.proto.BookieAddressResolver$BookieIdNotResolvedException: Cannot resolve bookieId 192.168.31.216:3181, bookie does not exist or it is not running 2023-10-19T18:33:52,042 - INFO - [BookKeeperClientWorker-OrderedExecutor-3-0:DefaultBookieAddressResolver@77] - Cannot resolve 192.168.31.216:3181, bookie is unknown org.apache.bookkeeper.client.BKException$BKBookieHandleNotAvailableException: Bookie handle is not available 2023-10-19T18:33:52,042 - INFO - [BookKeeperClientWorker-OrderedExecutor-3-0:PendingReadOp$LedgerEntryRequest@223] - Error: Bookie handle is not available while reading L6 E40 from bookie: 192.168.31.216:3181 ``` One of the related issues is in the auto-recovery decommission and there is one PR in the BookKeeper repo: https://github.com/apache/bookkeeper/pull/4113 However, the bookie client already knows the bk0 is down and we should send the read request to bk1 first. So we can reorder the read request based on the known bookie list. If one bookie is lost, it will reorder the lost bookie to the end of the read list. ### Modifications Enable the `reorderReadSequence` by default for auto-recovery. --- .../org/apache/bookkeeper/conf/ClientConfiguration.java | 2 +- .../apache/bookkeeper/client/MockBookKeeperTestCase.java | 4 +++- .../bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java | 7 ++++--- conf/bk_server.conf | 3 +++ 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 297a2f62f47..66dc160fd5f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -1148,7 +1148,7 @@ public ClientConfiguration setRecoveryReadBatchSize(int batchSize) { * @return true if reorder read sequence is enabled, otherwise false. */ public boolean isReorderReadSequenceEnabled() { - return getBoolean(REORDER_READ_SEQUENCE_ENABLED, false); + return getBoolean(REORDER_READ_SEQUENCE_ENABLED, true); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index e7710124707..f43b6136c81 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -96,6 +96,7 @@ public abstract class MockBookKeeperTestCase { protected BookieClient bookieClient; protected LedgerManager ledgerManager; protected LedgerIdGenerator ledgerIdGenerator; + protected EnsemblePlacementPolicy placementPolicy; private BookieWatcher bookieWatcher; @@ -152,6 +153,7 @@ public void setup() throws Exception { scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(4).name("bk-test").build(); executor = OrderedExecutor.newBuilder().build(); bookieWatcher = mock(BookieWatcher.class); + placementPolicy = new DefaultEnsemblePlacementPolicy(); bookieClient = mock(BookieClient.class); ledgerManager = mock(LedgerManager.class); @@ -194,7 +196,7 @@ public BookieWatcher getBookieWatcher() { @Override public EnsemblePlacementPolicy getPlacementPolicy() { - return null; + return placementPolicy; } @Override diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java index 8e3cfd72e42..760f2490182 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java @@ -85,7 +85,7 @@ public class ReadLastConfirmedAndEntryOpTest { private ScheduledExecutorService scheduler; private OrderedScheduler orderedScheduler; private ClientInternalConf internalConf; - private EnsemblePlacementPolicy mockPlacementPolicy; + private EnsemblePlacementPolicy placementPolicy; private LedgerMetadata ledgerMetadata; private DistributionSchedule distributionSchedule; private DigestManager digestManager; @@ -121,10 +121,11 @@ public void setup() throws Exception { .build(); this.mockBookieClient = mock(BookieClient.class); - this.mockPlacementPolicy = mock(EnsemblePlacementPolicy.class); + //this.mockPlacementPolicy = mock(EnsemblePlacementPolicy.class); + this.placementPolicy = new DefaultEnsemblePlacementPolicy(); this.mockClientCtx = mock(ClientContext.class); when(mockClientCtx.getBookieClient()).thenReturn(mockBookieClient); - when(mockClientCtx.getPlacementPolicy()).thenReturn(mockPlacementPolicy); + when(mockClientCtx.getPlacementPolicy()).thenReturn(placementPolicy); when(mockClientCtx.getConf()).thenReturn(internalConf); when(mockClientCtx.getScheduler()).thenReturn(orderedScheduler); when(mockClientCtx.getMainWorkerPool()).thenReturn(orderedScheduler); diff --git a/conf/bk_server.conf b/conf/bk_server.conf index a391c1aa056..a36a2fbf971 100755 --- a/conf/bk_server.conf +++ b/conf/bk_server.conf @@ -1057,6 +1057,9 @@ statsProviderClass=org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvi # Enable/disable having read operations for a ledger to be sticky to a single bookie. stickyReadSEnabled=true +# Enable/disable reordering read sequence on reading entries. +reorderReadSequenceEnabled=true + # The grace period, in milliseconds, that the replication worker waits before fencing and # replicating a ledger fragment that's still being written to upon bookie failure. # openLedgerRereplicationGracePeriod=30000 From e1d72cf4ece2ac15e88cd025aa347125207ed8cc Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Mon, 5 Feb 2024 09:42:39 +0800 Subject: [PATCH 5/5] [BP-62] Refactor read op, and introduce batchReadOp. (#4190) ### Motivation This is the fourth PR for the batch read(https://github.com/apache/bookkeeper/pull/4051) feature. Refactor read op, extract ReadOpBase. Introduce batchedReadOp. --- .../bookkeeper/client/BatchedReadOp.java | 321 +++++++++++ .../client/ListenerBasedPendingReadOp.java | 2 +- .../bookkeeper/client/PendingReadOp.java | 526 +++++------------- .../apache/bookkeeper/client/ReadOpBase.java | 293 ++++++++++ .../bookkeeper/client/TestParallelRead.java | 17 +- 5 files changed, 760 insertions(+), 399 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java new file mode 100644 index 00000000000..4892882e1d1 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -0,0 +1,321 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.BatchedReadEntryCallback; +import org.apache.bookkeeper.proto.checksum.DigestManager; +import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.MathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallback { + + private static final Logger LOG = LoggerFactory.getLogger(BatchedReadOp.class); + + final int maxCount; + final long maxSize; + + BatchedLedgerEntryRequest request; + + BatchedReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + int maxCount, + long maxSize, + boolean isRecoveryRead) { + super(lh, clientCtx, startEntryId, -1L, isRecoveryRead); + this.maxCount = maxCount; + this.maxSize = maxSize; + } + + @Override + void initiate() { + this.requestTimeNanos = MathUtils.nowInNano(); + List ensemble = getLedgerMetadata().getEnsembleAt(startEntryId); + request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize); + request.read(); + if (clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), request); + } + } + + @Override + protected void submitCallback(int code) { + // ensure callback once + if (!complete.compareAndSet(false, true)) { + return; + } + + cancelSpeculativeTask(true); + + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); + if (code != BKException.Code.OK) { + LOG.error( + "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " + + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", + lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, + BKException.getMessage(code), startEntryId, code); + clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + // release the entries + + request.close(); + future.completeExceptionally(BKException.create(code)); + } else { + clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + future.complete(LedgerEntriesImpl.create(request.entries)); + } + } + + @Override + public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBufList bufList, Object ctx) { + final ReadContext rctx = (ReadContext) ctx; + final BatchedLedgerEntryRequest entry = (BatchedLedgerEntryRequest) rctx.entry; + + if (rc != BKException.Code.OK) { + entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); + return; + } + + heardFromHosts.add(rctx.to); + heardFromHostsBitSet.set(rctx.bookieIndex, true); + + bufList.retain(); + // if entry has completed don't handle twice + if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) { + if (!isRecoveryRead) { + // do not advance LastAddConfirmed for recovery reads + lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); + } + submitCallback(BKException.Code.OK); + } else { + bufList.release(); + } + } + + void sendReadTo(int bookieIndex, BookieId to, BatchedLedgerEntryRequest entry) throws InterruptedException { + if (lh.throttler != null) { + lh.throttler.acquire(); + } + if (isRecoveryRead) { + int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; + clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, entry.eId, + maxCount, maxSize, this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); + } else { + clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, entry.eId, maxCount, maxSize, + this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + } + } + + abstract class BatchedLedgerEntryRequest extends LedgerEntryRequest { + + //Indicate which ledger the BatchedLedgerEntryRequest is reading. + final long lId; + final int maxCount; + final long maxSize; + + final List entries; + + BatchedLedgerEntryRequest(List ensemble, long lId, long eId, int maxCount, long maxSize) { + super(ensemble, eId); + this.lId = lId; + this.maxCount = maxCount; + this.maxSize = maxSize; + this.entries = new ArrayList<>(maxCount); + } + + boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { + if (isComplete()) { + return false; + } + if (!complete.getAndSet(true)) { + for (int i = 0; i < bufList.size(); i++) { + ByteBuf buffer = bufList.getBuffer(i); + ByteBuf content; + try { + content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer); + } catch (BKException.BKDigestMatchException e) { + clientCtx.getClientStats().getReadOpDmCounter().inc(); + logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", + BKException.Code.DigestMatchException); + return false; + } + rc = BKException.Code.OK; + /* + * The length is a long and it is the last field of the metadata of an entry. + * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length. + */ + LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i); + entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8)); + entryImpl.setEntryBuf(content); + entries.add(entryImpl); + } + writeSet.recycle(); + return true; + } else { + writeSet.recycle(); + return false; + } + } + + @Override + public String toString() { + return String.format("L%d-E%d~%d s-%d", lh.getId(), eId, eId + maxCount, maxSize); + } + } + + class SequenceReadRequest extends BatchedLedgerEntryRequest { + + static final int NOT_FOUND = -1; + int nextReplicaIndexToReadFrom = 0; + final BitSet sentReplicas; + final BitSet erroredReplicas; + SequenceReadRequest(List ensemble, + long lId, + long eId, + int maxCount, + long maxSize) { + super(ensemble, lId, eId, maxCount, maxSize); + this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + } + + @Override + void read() { + sendNextRead(); + } + + private synchronized int getNextReplicaIndexToReadFrom() { + return nextReplicaIndexToReadFrom; + } + + private BitSet getSentToBitSet() { + BitSet b = new BitSet(ensemble.size()); + + for (int i = 0; i < sentReplicas.length(); i++) { + if (sentReplicas.get(i)) { + b.set(writeSet.get(i)); + } + } + return b; + } + + private boolean readsOutstanding() { + return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0; + } + + @Override + synchronized BookieId maybeSendSpeculativeRead(BitSet heardFrom) { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + return null; + } + + BitSet sentTo = getSentToBitSet(); + sentTo.and(heardFrom); + + // only send another read if we have had no successful response at all + // (even for other entries) from any of the other bookies we have sent the + // request to + if (sentTo.cardinality() == 0) { + clientCtx.getClientStats().getSpeculativeReadCounter().inc(); + return sendNextRead(); + } else { + return null; + } + } + + synchronized BookieId sendNextRead() { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + // we are done, the read has failed from all replicas, just fail the + // read + fail(firstError); + return null; + } + + // ToDo: pick replica with writable PCBC. ISSUE #1239 + // https://github.com/apache/bookkeeper/issues/1239 + int replica = nextReplicaIndexToReadFrom; + int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom); + nextReplicaIndexToReadFrom++; + + try { + BookieId to = ensemble.get(bookieIndex); + sendReadTo(bookieIndex, to, this); + sentToHosts.add(to); + sentReplicas.set(replica); + return to; + } catch (InterruptedException ie) { + LOG.error("Interrupted reading entry " + this, ie); + Thread.currentThread().interrupt(); + fail(BKException.Code.InterruptedException); + return null; + } + } + + @Override + synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); + int replica = writeSet.indexOf(bookieIndex); + if (replica == NOT_FOUND) { + LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble); + return; + } + erroredReplicas.set(replica); + if (isRecoveryRead && (numBookiesMissingEntry >= requiredBookiesMissingEntryForRecovery)) { + /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies report that they do not + * have the entry */ + fail(BKException.Code.NoSuchEntryException); + return; + } + + if (!readsOutstanding()) { + sendNextRead(); + } + } + + @Override + boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { + boolean completed = super.complete(bookieIndex, host, bufList); + if (completed) { + int numReplicasTried = getNextReplicaIndexToReadFrom(); + // Check if any speculative reads were issued and mark any slow bookies before + // the first successful speculative read as "slow" + for (int i = 0; i < numReplicasTried - 1; i++) { + int slowBookieIndex = writeSet.get(i); + BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId); + } + } + return completed; + } + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java index 6733b2e9ea9..fedb79696a9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java @@ -45,7 +45,7 @@ class ListenerBasedPendingReadOp extends PendingReadOp { @Override protected void submitCallback(int code) { - LedgerEntryRequest request; + SingleLedgerEntryRequest request; while (!seq.isEmpty() && (request = seq.getFirst()) != null) { if (!request.isComplete()) { return; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 73715859c0d..15d48c64351 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -21,27 +21,16 @@ package org.apache.bookkeeper.client; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; import io.netty.buffer.ByteBuf; import java.util.BitSet; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.bookkeeper.client.BKException.BKDigestMatchException; -import org.apache.bookkeeper.client.api.LedgerEntries; -import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; @@ -54,85 +43,165 @@ * application as soon as it arrives rather than waiting for the whole thing. * */ -class PendingReadOp implements ReadEntryCallback, Runnable { +class PendingReadOp extends ReadOpBase implements ReadEntryCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class); - private ScheduledFuture speculativeTask = null; - protected final LinkedList seq; - private final CompletableFuture future; - private final Set heardFromHosts; - private final BitSet heardFromHostsBitSet; - private final Set sentToHosts = new HashSet(); - LedgerHandle lh; - final ClientContext clientCtx; + protected boolean parallelRead = false; + protected final LinkedList seq; - long numPendingEntries; - final long startEntryId; - final long endEntryId; - long requestTimeNanos; + PendingReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + long endEntryId, + boolean isRecoveryRead) { + super(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead); + this.seq = new LinkedList<>(); + numPendingEntries = endEntryId - startEntryId + 1; + } + + PendingReadOp parallelRead(boolean enabled) { + this.parallelRead = enabled; + return this; + } + + void initiate() { + long nextEnsembleChange = startEntryId, i = startEntryId; + this.requestTimeNanos = MathUtils.nowInNano(); + List ensemble = null; + do { + if (i == nextEnsembleChange) { + ensemble = getLedgerMetadata().getEnsembleAt(i); + nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i); + } + SingleLedgerEntryRequest entry; + if (parallelRead) { + entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); + } else { + entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); + } + seq.add(entry); + i++; + } while (i <= endEntryId); + // read the entries. + for (LedgerEntryRequest entry : seq) { + entry.read(); + if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); + } + } + } - final int requiredBookiesMissingEntryForRecovery; - final boolean isRecoveryRead; + @Override + public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { + final ReadContext rctx = (ReadContext) ctx; + final SingleLedgerEntryRequest entry = (SingleLedgerEntryRequest) rctx.entry; + + if (rc != BKException.Code.OK) { + entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); + return; + } + + heardFromHosts.add(rctx.to); + heardFromHostsBitSet.set(rctx.bookieIndex, true); + + buffer.retain(); + // if entry has completed don't handle twice + if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { + if (!isRecoveryRead) { + // do not advance LastAddConfirmed for recovery reads + lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); + } + submitCallback(BKException.Code.OK); + } else { + buffer.release(); + } + + if (numPendingEntries < 0) { + LOG.error("Read too many values for ledger {} : [{}, {}].", + ledgerId, startEntryId, endEntryId); + } + + } + + protected void submitCallback(int code) { + if (BKException.Code.OK == code) { + numPendingEntries--; + if (numPendingEntries != 0) { + return; + } + } + + // ensure callback once + if (!complete.compareAndSet(false, true)) { + return; + } - boolean parallelRead = false; - final AtomicBoolean complete = new AtomicBoolean(false); - boolean allowFailFast = false; + cancelSpeculativeTask(true); - abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable { + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); + if (code != BKException.Code.OK) { + long firstUnread = LedgerHandle.INVALID_ENTRY_ID; + Integer firstRc = null; + for (LedgerEntryRequest req : seq) { + if (!req.isComplete()) { + firstUnread = req.eId; + firstRc = req.rc; + break; + } + } + LOG.error( + "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " + + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", + lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, + BKException.getMessage(code), firstUnread, firstRc); + clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + // release the entries + seq.forEach(LedgerEntryRequest::close); + future.completeExceptionally(BKException.create(code)); + } else { + clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl))); + } + } - final AtomicBoolean complete = new AtomicBoolean(false); + void sendReadTo(int bookieIndex, BookieId to, SingleLedgerEntryRequest entry) throws InterruptedException { + if (lh.throttler != null) { + lh.throttler.acquire(); + } - int rc = BKException.Code.OK; - int firstError = BKException.Code.OK; - int numBookiesMissingEntry = 0; + if (isRecoveryRead) { + int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; + clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, + this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); + } else { + clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, + this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + } + } - final List ensemble; - final DistributionSchedule.WriteSet writeSet; + abstract class SingleLedgerEntryRequest extends LedgerEntryRequest { final LedgerEntryImpl entryImpl; - final long eId; - LedgerEntryRequest(List ensemble, long lId, long eId) { + SingleLedgerEntryRequest(List ensemble, long lId, long eId) { + super(ensemble, eId); this.entryImpl = LedgerEntryImpl.create(lId, eId); - this.ensemble = ensemble; - this.eId = eId; - - if (clientCtx.getConf().enableReorderReadSequence) { - writeSet = clientCtx.getPlacementPolicy() - .reorderReadSequence( - ensemble, - lh.getBookiesHealthInfo(), - lh.getWriteSetForReadOperation(eId)); - } else { - writeSet = lh.getWriteSetForReadOperation(eId); - } } @Override public void close() { - // this request has succeeded before, can't recycle writeSet again - if (complete.compareAndSet(false, true)) { - rc = BKException.Code.UnexpectedConditionException; - writeSet.recycle(); - } + super.close(); entryImpl.close(); } - /** - * Execute the read request. - */ - abstract void read(); - /** * Complete the read request from host. * - * @param bookieIndex - * bookie index - * @param host - * host that respond the read - * @param buffer - * the data buffer + * @param bookieIndex bookie index + * @param host host that respond the read + * @param buffer the data buffer * @return return true if we managed to complete the entry; - * otherwise return false if the read entry is not complete or it is already completed before + * otherwise return false if the read entry is not complete or it is already completed before */ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { ByteBuf content; @@ -141,7 +210,7 @@ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { } try { content = lh.macManager.verifyDigestAndReturnData(eId, buffer); - } catch (BKDigestMatchException e) { + } catch (BKException.BKDigestMatchException e) { clientCtx.getClientStats().getReadOpDmCounter().inc(); logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException); return false; @@ -161,125 +230,9 @@ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { return false; } } - - /** - * Fail the request with given result code rc. - * - * @param rc - * result code to fail the request. - * @return true if we managed to fail the entry; otherwise return false if it already failed or completed. - */ - boolean fail(int rc) { - if (complete.compareAndSet(false, true)) { - this.rc = rc; - submitCallback(rc); - return true; - } else { - return false; - } - } - - /** - * Log error errMsg and reattempt read from host. - * - * @param bookieIndex - * bookie index - * @param host - * host that just respond - * @param errMsg - * error msg to log - * @param rc - * read result code - */ - synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { - if (BKException.Code.OK == firstError - || BKException.Code.NoSuchEntryException == firstError - || BKException.Code.NoSuchLedgerExistsException == firstError) { - firstError = rc; - } else if (BKException.Code.BookieHandleNotAvailableException == firstError - && BKException.Code.NoSuchEntryException != rc - && BKException.Code.NoSuchLedgerExistsException != rc) { - // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is - // returned we need to update firstError to indicate that it might be a valid read but just - // failed. - firstError = rc; - } - if (BKException.Code.NoSuchEntryException == rc - || BKException.Code.NoSuchLedgerExistsException == rc) { - ++numBookiesMissingEntry; - if (LOG.isDebugEnabled()) { - LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", - lh.ledgerId, eId, host); - } - } else { - if (LOG.isInfoEnabled()) { - LOG.info("{} while reading L{} E{} from bookie: {}", - errMsg, lh.ledgerId, eId, host); - } - } - - lh.recordReadErrorOnBookie(bookieIndex); - } - - /** - * Send to next replica speculatively, if required and possible. - * This returns the host we may have sent to for unit testing. - * - * @param heardFromHostsBitSet - * the set of hosts that we already received responses. - * @return host we sent to if we sent. null otherwise. - */ - abstract BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet); - - /** - * Whether the read request completed. - * - * @return true if the read request is completed. - */ - boolean isComplete() { - return complete.get(); - } - - /** - * Get result code of this entry. - * - * @return result code. - */ - int getRc() { - return rc; - } - - @Override - public String toString() { - return String.format("L%d-E%d", lh.getId(), eId); - } - - /** - * Issues a speculative request and indicates if more speculative - * requests should be issued. - * - * @return whether more speculative requests should be issued - */ - @Override - public ListenableFuture issueSpeculativeRequest() { - return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable() { - @Override - public Boolean call() throws Exception { - if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Send speculative read for {}. Hosts sent are {}, " - + " Hosts heard are {}, ensemble is {}.", - this, sentToHosts, heardFromHostsBitSet, ensemble); - } - return true; - } - return false; - } - }); - } } - class ParallelReadRequest extends LedgerEntryRequest { + class ParallelReadRequest extends SingleLedgerEntryRequest { int numPendings; @@ -326,7 +279,7 @@ BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) { } } - class SequenceReadRequest extends LedgerEntryRequest { + class SequenceReadRequest extends SingleLedgerEntryRequest { static final int NOT_FOUND = -1; int nextReplicaIndexToReadFrom = 0; @@ -456,205 +409,4 @@ boolean complete(int bookieIndex, BookieId host, ByteBuf buffer) { return completed; } } - - PendingReadOp(LedgerHandle lh, - ClientContext clientCtx, - long startEntryId, - long endEntryId, - boolean isRecoveryRead) { - this.seq = new LinkedList<>(); - this.future = new CompletableFuture<>(); - this.lh = lh; - this.clientCtx = clientCtx; - this.startEntryId = startEntryId; - this.endEntryId = endEntryId; - this.isRecoveryRead = isRecoveryRead; - - this.allowFailFast = false; - numPendingEntries = endEntryId - startEntryId + 1; - requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize() - - getLedgerMetadata().getAckQuorumSize() + 1; - heardFromHosts = new HashSet<>(); - heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); - } - - CompletableFuture future() { - return future; - } - - protected LedgerMetadata getLedgerMetadata() { - return lh.getLedgerMetadata(); - } - - protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { - if (speculativeTask != null) { - speculativeTask.cancel(mayInterruptIfRunning); - speculativeTask = null; - } - } - - public ScheduledFuture getSpeculativeTask() { - return speculativeTask; - } - - PendingReadOp parallelRead(boolean enabled) { - this.parallelRead = enabled; - return this; - } - - void allowFailFastOnUnwritableChannel() { - allowFailFast = true; - } - - public void submit() { - clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this); - } - - void initiate() { - long nextEnsembleChange = startEntryId, i = startEntryId; - this.requestTimeNanos = MathUtils.nowInNano(); - List ensemble = null; - do { - if (i == nextEnsembleChange) { - ensemble = getLedgerMetadata().getEnsembleAt(i); - nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i); - } - LedgerEntryRequest entry; - if (parallelRead) { - entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); - } else { - entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); - } - seq.add(entry); - i++; - } while (i <= endEntryId); - // read the entries. - for (LedgerEntryRequest entry : seq) { - entry.read(); - if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { - speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() - .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); - } - } - } - - @Override - public void run() { - initiate(); - } - - private static class ReadContext implements ReadEntryCallbackCtx { - final int bookieIndex; - final BookieId to; - final LedgerEntryRequest entry; - long lac = LedgerHandle.INVALID_ENTRY_ID; - - ReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { - this.bookieIndex = bookieIndex; - this.to = to; - this.entry = entry; - } - - @Override - public void setLastAddConfirmed(long lac) { - this.lac = lac; - } - - @Override - public long getLastAddConfirmed() { - return lac; - } - } - - private static ReadContext createReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { - return new ReadContext(bookieIndex, to, entry); - } - - void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry) throws InterruptedException { - if (lh.throttler != null) { - lh.throttler.acquire(); - } - - if (isRecoveryRead) { - int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; - clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); - } else { - clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); - } - } - - @Override - public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { - final ReadContext rctx = (ReadContext) ctx; - final LedgerEntryRequest entry = rctx.entry; - - if (rc != BKException.Code.OK) { - entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); - return; - } - - heardFromHosts.add(rctx.to); - heardFromHostsBitSet.set(rctx.bookieIndex, true); - - buffer.retain(); - // if entry has completed don't handle twice - if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { - if (!isRecoveryRead) { - // do not advance LastAddConfirmed for recovery reads - lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); - } - submitCallback(BKException.Code.OK); - } else { - buffer.release(); - } - - if (numPendingEntries < 0) { - LOG.error("Read too many values for ledger {} : [{}, {}].", - ledgerId, startEntryId, endEntryId); - } - } - - protected void submitCallback(int code) { - if (BKException.Code.OK == code) { - numPendingEntries--; - if (numPendingEntries != 0) { - return; - } - } - - // ensure callback once - if (!complete.compareAndSet(false, true)) { - return; - } - - cancelSpeculativeTask(true); - - long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); - if (code != BKException.Code.OK) { - long firstUnread = LedgerHandle.INVALID_ENTRY_ID; - Integer firstRc = null; - for (LedgerEntryRequest req : seq) { - if (!req.isComplete()) { - firstUnread = req.eId; - firstRc = req.rc; - break; - } - } - LOG.error( - "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " - + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", - lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, - BKException.getMessage(code), firstUnread, firstRc); - clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); - // release the entries - seq.forEach(LedgerEntryRequest::close); - future.completeExceptionally(BKException.create(code)); - } else { - clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); - future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl))); - } - } - } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java new file mode 100644 index 00000000000..cbd68ec657a --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java @@ -0,0 +1,293 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ReadOpBase implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ReadOpBase.class); + + protected ScheduledFuture speculativeTask = null; + protected final CompletableFuture future; + protected final Set heardFromHosts; + protected final BitSet heardFromHostsBitSet; + protected final Set sentToHosts = new HashSet(); + LedgerHandle lh; + protected ClientContext clientCtx; + + protected final long startEntryId; + protected long requestTimeNanos; + + protected final int requiredBookiesMissingEntryForRecovery; + protected final boolean isRecoveryRead; + + protected final AtomicBoolean complete = new AtomicBoolean(false); + protected boolean allowFailFast = false; + long numPendingEntries; + final long endEntryId; + protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, + boolean isRecoveryRead) { + this.lh = lh; + this.future = new CompletableFuture<>(); + this.startEntryId = startEntryId; + this.endEntryId = endEntryId; + this.isRecoveryRead = isRecoveryRead; + this.requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize() + - getLedgerMetadata().getAckQuorumSize() + 1; + this.heardFromHosts = new HashSet<>(); + this.heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); + this.allowFailFast = false; + this.clientCtx = clientCtx; + } + + protected LedgerMetadata getLedgerMetadata() { + return lh.getLedgerMetadata(); + } + + protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { + if (speculativeTask != null) { + speculativeTask.cancel(mayInterruptIfRunning); + speculativeTask = null; + } + } + + public ScheduledFuture getSpeculativeTask() { + return speculativeTask; + } + + CompletableFuture future() { + return future; + } + + void allowFailFastOnUnwritableChannel() { + allowFailFast = true; + } + + public void submit() { + clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this); + } + + @Override + public void run() { + initiate(); + } + + abstract void initiate(); + + protected abstract void submitCallback(int code); + + abstract class LedgerEntryRequest implements SpeculativeRequestExecutor { + + final AtomicBoolean complete = new AtomicBoolean(false); + + int rc = BKException.Code.OK; + int firstError = BKException.Code.OK; + int numBookiesMissingEntry = 0; + + final long eId; + + final List ensemble; + final DistributionSchedule.WriteSet writeSet; + + + LedgerEntryRequest(List ensemble, final long eId) { + this.ensemble = ensemble; + this.eId = eId; + if (clientCtx.getConf().enableReorderReadSequence) { + writeSet = clientCtx.getPlacementPolicy() + .reorderReadSequence( + ensemble, + lh.getBookiesHealthInfo(), + lh.getWriteSetForReadOperation(eId)); + } else { + writeSet = lh.getWriteSetForReadOperation(eId); + } + } + + public void close() { + // this request has succeeded before, can't recycle writeSet again + if (complete.compareAndSet(false, true)) { + rc = BKException.Code.UnexpectedConditionException; + writeSet.recycle(); + } + } + + /** + * Execute the read request. + */ + abstract void read(); + + /** + * Fail the request with given result code rc. + * + * @param rc + * result code to fail the request. + * @return true if we managed to fail the entry; otherwise return false if it already failed or completed. + */ + boolean fail(int rc) { + if (complete.compareAndSet(false, true)) { + this.rc = rc; + writeSet.recycle(); + submitCallback(rc); + return true; + } else { + return false; + } + } + + /** + * Log error errMsg and reattempt read from host. + * + * @param bookieIndex + * bookie index + * @param host + * host that just respond + * @param errMsg + * error msg to log + * @param rc + * read result code + */ + synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + if (BKException.Code.OK == firstError + || BKException.Code.NoSuchEntryException == firstError + || BKException.Code.NoSuchLedgerExistsException == firstError) { + firstError = rc; + } else if (BKException.Code.BookieHandleNotAvailableException == firstError + && BKException.Code.NoSuchEntryException != rc + && BKException.Code.NoSuchLedgerExistsException != rc) { + // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is + // returned we need to update firstError to indicate that it might be a valid read but just + // failed. + firstError = rc; + } + if (BKException.Code.NoSuchEntryException == rc + || BKException.Code.NoSuchLedgerExistsException == rc) { + ++numBookiesMissingEntry; + if (LOG.isDebugEnabled()) { + LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", + lh.ledgerId, eId, host); + } + } else { + if (LOG.isInfoEnabled()) { + LOG.info("{} while reading L{} E{} from bookie: {}", + errMsg, lh.ledgerId, eId, host); + } + } + + lh.recordReadErrorOnBookie(bookieIndex); + } + + /** + * Send to next replica speculatively, if required and possible. + * This returns the host we may have sent to for unit testing. + * + * @param heardFromHostsBitSet + * the set of hosts that we already received responses. + * @return host we sent to if we sent. null otherwise. + */ + abstract BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet); + + /** + * Whether the read request completed. + * + * @return true if the read request is completed. + */ + boolean isComplete() { + return complete.get(); + } + + /** + * Get result code of this entry. + * + * @return result code. + */ + int getRc() { + return rc; + } + + @Override + public String toString() { + return String.format("L%d-E%d", lh.getId(), eId); + } + + /** + * Issues a speculative request and indicates if more speculative + * requests should be issued. + * + * @return whether more speculative requests should be issued + */ + @Override + public ListenableFuture issueSpeculativeRequest() { + return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable() { + @Override + public Boolean call() throws Exception { + if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send speculative read for {}. Hosts sent are {}, " + + " Hosts heard are {}, ensemble is {}.", + this, sentToHosts, heardFromHostsBitSet, ensemble); + } + return true; + } + return false; + } + }); + } + } + + protected static class ReadContext implements BookkeeperInternalCallbacks.ReadEntryCallbackCtx { + final int bookieIndex; + final BookieId to; + final PendingReadOp.LedgerEntryRequest entry; + long lac = LedgerHandle.INVALID_ENTRY_ID; + + ReadContext(int bookieIndex, BookieId to, PendingReadOp.LedgerEntryRequest entry) { + this.bookieIndex = bookieIndex; + this.to = to; + this.entry = entry; + } + + @Override + public void setLastAddConfirmed(long lac) { + this.lac = lac; + } + + @Override + public long getLastAddConfirmed() { + return lac; + } + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java index f2ada1e5dc8..423e02b4aad 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java @@ -33,7 +33,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import java.lang.reflect.Method; import java.util.Iterator; import java.util.List; import java.util.TreeMap; @@ -268,8 +267,8 @@ public void testLedgerEntryRequestComplete() throws Exception { PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1, 2, false); pendingReadOp.parallelRead(true); pendingReadOp.initiate(); - PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0); - PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1); + PendingReadOp.SingleLedgerEntryRequest first = pendingReadOp.seq.get(0); + PendingReadOp.SingleLedgerEntryRequest second = pendingReadOp.seq.get(1); pendingReadOp.submitCallback(-105); @@ -287,13 +286,9 @@ public void testLedgerEntryRequestComplete() throws Exception { assertTrue(second.complete.get()); // Mock ledgerEntryImpl reuse - Method method = PendingReadOp.class.getDeclaredMethod("createReadContext", - int.class, BookieId.class, PendingReadOp.LedgerEntryRequest.class); - method.setAccessible(true); - ByteBuf byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); // byteBuf has been release assertEquals(byteBuf.refCnt(), 1); @@ -308,15 +303,15 @@ public void testLedgerEntryRequestComplete() throws Exception { // read entry failed twice, will not close twice pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); // will not complete twice when completed byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); assertEquals(1, byteBuf.refCnt()); }