From 5df139c913e2af0db8e78de2981f6e6aacc8b0a3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 13 Mar 2026 16:19:37 +0800 Subject: [PATCH 01/13] PIP-121: Implement AutoClusterFailover --- include/pulsar/AutoClusterFailover.h | 105 +++++++ include/pulsar/ServiceInfo.h | 1 + lib/AutoClusterFailover.cc | 413 +++++++++++++++++++++++++++ tests/ServiceInfoProviderTest.cc | 188 ++++++++++++ 4 files changed, 707 insertions(+) create mode 100644 include/pulsar/AutoClusterFailover.h create mode 100644 lib/AutoClusterFailover.cc diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h new file mode 100644 index 00000000..40c27ecd --- /dev/null +++ b/include/pulsar/AutoClusterFailover.h @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_ +#define PULSAR_AUTO_CLUSTER_FAILOVER_H_ + +#include + +#include + +namespace pulsar { + +class Client; +class AutoClusterFailoverImpl; + +class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { + public: + struct Config { + ServiceInfo primary; + std::vector secondary; + std::chrono::milliseconds checkInterval{30000}; // 30 seconds + std::chrono::milliseconds failoverDelay{30000}; // 30 seconds + std::chrono::milliseconds switchBackDelay{60000}; // 60 seconds + }; + + /** + * Builder helps create an AutoClusterFailover configuration. + * + * Example: + * ServiceInfo primary{...}; + * std::vector secondaries{...}; + * AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries) + * .withCheckInterval(std::chrono::seconds(30)) + * .withFailoverDelay(std::chrono::seconds(30)) + * .withSwitchBackDelay(std::chrono::seconds(60)) + * .build(); + * + * Notes: + * - primary: the preferred cluster to use when available. + * - secondary: ordered list of fallback clusters. + * - checkInterval: frequency of health probes. + * - failoverDelay: how long the current cluster must be unreachable before switching. + * - switchBackDelay: how long the primary must remain healthy before switching back. + */ + class Builder { + public: + Builder(ServiceInfo primary, std::vector secondary) { + config_.primary = std::move(primary); + config_.secondary = std::move(secondary); + } + + // Set how frequently probes run against the active cluster(s). + Builder& withCheckInterval(std::chrono::milliseconds interval) { + config_.checkInterval = interval; + return *this; + } + + // Set how long the current cluster must be unreachable before attempting failover. + Builder& withFailoverDelay(std::chrono::milliseconds delay) { + config_.failoverDelay = delay; + return *this; + } + + // Set how long the primary must remain healthy before switching back from a secondary. + Builder& withSwitchBackDelay(std::chrono::milliseconds delay) { + config_.switchBackDelay = delay; + return *this; + } + + AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); } + + private: + Config config_; + }; + + explicit AutoClusterFailover(Config&& config); + + ~AutoClusterFailover() final; + + ServiceInfo initialServiceInfo() final; + + void initialize(std::function onServiceInfoUpdate) final; + + private: + std::shared_ptr impl_; +}; + +} // namespace pulsar + +#endif diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h index 1f63ce38..23d3b6d5 100644 --- a/include/pulsar/ServiceInfo.h +++ b/include/pulsar/ServiceInfo.h @@ -32,6 +32,7 @@ namespace pulsar { */ class PULSAR_PUBLIC ServiceInfo final { public: + ServiceInfo() = default; // only for storing in containers like std::vector, not for public use ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication = AuthFactory::Disabled(), std::optional tlsTrustCertsFilePath = std::nullopt); diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc new file mode 100644 index 00000000..13f2405e --- /dev/null +++ b/lib/AutoClusterFailover.cc @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "ServiceURI.h" +#include "Url.h" +#include "lib/LogUtils.h" + +#ifdef USE_ASIO +#include +#include +#include +#include +#include +#include +#else +#include +#include +#include +#include +#include +#include +#endif + +#include "AsioDefines.h" + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +class AutoClusterFailoverImpl : public std::enable_shared_from_this { + public: + AutoClusterFailoverImpl(AutoClusterFailover::Config&& config) + : config_(std::move(config)), currentIndex_(0) { + clusters_.reserve(1 + config_.secondary.size()); + clusters_.emplace_back(config_.primary); + for (const auto& info : config_.secondary) { + clusters_.emplace_back(info); + } + } + + ~AutoClusterFailoverImpl() { + using namespace std::chrono_literals; + if (timer_) { + timer_->cancel(); + } + workGuard_.reset(); + if (future_.valid()) { + if (auto result = future_.wait_for(3s); result != std::future_status::ready) { + LOG_WARN("AutoClusterFailoverImpl is not stopped within 3 seconds, force stop it"); + ioContext_.stop(); + if (auto result = future_.wait_for(1s); result != std::future_status::ready) { + LOG_ERROR("Failed to stop AutoClusterFailoverImpl within 1 seconds after force stop"); + } + } + } + } + + auto primary() const noexcept { return config_.primary; } + + void initialize(std::function&& onServiceInfoUpdate) { + onServiceInfoUpdate_ = std::move(onServiceInfoUpdate); + workGuard_.emplace(ASIO::make_work_guard(ioContext_)); + timer_.emplace(ioContext_); + + auto weakSelf = weak_from_this(); + ASIO::post(ioContext_, [weakSelf] { + auto self = weakSelf.lock(); + if (self) { + self->onServiceInfoUpdate_(self->current()); + self->scheduleFailoverCheck(); + } + }); + + future_ = std::async(std::launch::async, [this] { ioContext_.run(); }); + } + + private: + static constexpr std::chrono::milliseconds probeTimeout_{30000}; + using CompletionCallback = std::function; + using ProbeCallback = std::function; + + struct ProbeContext { + ASIO::ip::tcp::resolver resolver; + ASIO::ip::tcp::socket socket; + ASIO::steady_timer timer; + ProbeCallback callback; + bool done{false}; + std::string hostUrl; + + ProbeContext(ASIO::io_context& ioContext, std::string hostUrl, ProbeCallback callback) + : resolver(ioContext), + socket(ioContext), + timer(ioContext), + callback(std::move(callback)), + hostUrl(std::move(hostUrl)) {} + }; + + AutoClusterFailover::Config config_; + std::vector clusters_; + size_t currentIndex_; + std::optional failedSince_; + std::optional recoveredSince_; + std::future future_; + ASIO::io_context ioContext_; + std::function onServiceInfoUpdate_; + + std::optional> workGuard_; + std::optional timer_; + + const ServiceInfo& current() const noexcept { return clusters_[currentIndex_]; } + + void scheduleFailoverCheck() { + if (!timer_) { + return; + } + timer_->expires_after(config_.checkInterval); + auto weakSelf = weak_from_this(); + timer_->async_wait([this, weakSelf](ASIO_ERROR error) { + if (error) { + LOG_INFO("Failover check timer is cancelled or failed: " << error.message()); + return; + } + auto self = weakSelf.lock(); + if (!self) { + LOG_INFO("AutoClusterFailoverImpl is destroyed, skip failover check"); + return; + } + executeFailoverCheck(); + }); + } + + void executeFailoverCheck() { + auto done = [weakSelf = weak_from_this()] { + if (auto self = weakSelf.lock()) { + self->scheduleFailoverCheck(); + } + }; + + if (currentIndex_ == 0) { + checkAndFailoverToSecondaryAsync(std::move(done)); + } else { + checkSecondaryAndPrimaryAsync(std::move(done)); + } + } + + static void completeProbe(const std::shared_ptr& context, bool success, + const ASIO_ERROR& error = ASIO_SUCCESS) { + if (context->done) { + return; + } + + context->done = true; + ASIO_ERROR ignored; + context->resolver.cancel(); + context->socket.close(ignored); + context->timer.cancel(ignored); + + if (error && error != ASIO::error::operation_aborted) { + LOG_DEBUG("Probe error for " << context->hostUrl << ": " << error.message()); + } + + auto callback = std::move(context->callback); + callback(success); + } + + void probeHostAsync(const std::string& hostUrl, ProbeCallback callback) { + Url parsedUrl; + if (!Url::parse(hostUrl, parsedUrl)) { + LOG_WARN("Failed to parse service URL for probing: " << hostUrl); + callback(false); + return; + } + + auto context = std::make_shared(ioContext_, hostUrl, std::move(callback)); + context->timer.expires_after(probeTimeout_); + context->timer.async_wait([context](const ASIO_ERROR& error) { + if (!error) { + completeProbe(context, false, ASIO::error::timed_out); + } + }); + + context->resolver.async_resolve( + parsedUrl.host(), std::to_string(parsedUrl.port()), + [context](const ASIO_ERROR& error, const ASIO::ip::tcp::resolver::results_type& endpoints) { + if (error) { + completeProbe(context, false, error); + return; + } + + ASIO::async_connect( + context->socket, endpoints, + [context](const ASIO_ERROR& connectError, const ASIO::ip::tcp::endpoint&) { + completeProbe(context, !connectError, connectError); + }); + }); + } + + void probeHostsAsync(const std::shared_ptr>& hosts, size_t index, + ProbeCallback callback) { + if (index >= hosts->size()) { + callback(false); + return; + } + + auto hostUrl = (*hosts)[index]; + probeHostAsync(hostUrl, [this, hosts, index, callback = std::move(callback)](bool available) mutable { + if (available) { + callback(true); + return; + } + probeHostsAsync(hosts, index + 1, std::move(callback)); + }); + } + + void probeAvailableAsync(const ServiceInfo& serviceInfo, ProbeCallback callback) { + try { + ServiceURI serviceUri{serviceInfo.serviceUrl()}; + auto hosts = std::make_shared>(serviceUri.getServiceHosts()); + if (hosts->empty()) { + callback(false); + return; + } + probeHostsAsync(hosts, 0, std::move(callback)); + } catch (const std::exception& e) { + LOG_WARN("Failed to probe service URL " << serviceInfo.serviceUrl() << ": " << e.what()); + callback(false); + } + } + + void switchTo(size_t index) { + if (currentIndex_ == index) { + return; + } + + LOG_INFO("Switch service URL from " << current().serviceUrl() << " to " + << clusters_[index].serviceUrl()); + currentIndex_ = index; + failedSince_.reset(); + recoveredSince_.reset(); + onServiceInfoUpdate_(current()); + } + + void probeSecondaryFrom(size_t index, CompletionCallback done) { + if (index >= clusters_.size()) { + done(); + return; + } + + auto weakSelf = weak_from_this(); + probeAvailableAsync(clusters_[index], + [weakSelf, index, done = std::move(done)](bool available) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (available) { + self->switchTo(index); + done(); + return; + } + + self->probeSecondaryFrom(index + 1, std::move(done)); + }); + } + + void checkAndFailoverToSecondaryAsync(CompletionCallback done) { + auto weakSelf = weak_from_this(); + probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + const auto now = std::chrono::steady_clock::now(); + if (primaryAvailable) { + self->failedSince_.reset(); + done(); + return; + } + + if (!self->failedSince_) { + self->failedSince_ = now; + done(); + return; + } + + if (now - *self->failedSince_ < self->config_.failoverDelay) { + done(); + return; + } + + self->probeSecondaryFrom(1, std::move(done)); + }); + } + + void checkSwitchBackToPrimaryAsync(CompletionCallback done, std::optional primaryAvailableHint) { + const auto now = std::chrono::steady_clock::now(); + auto handlePrimaryAvailable = [weakSelf = weak_from_this(), now, + done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (!primaryAvailable) { + self->recoveredSince_.reset(); + done(); + return; + } + + if (!self->recoveredSince_) { + self->recoveredSince_ = now; + done(); + return; + } + + if (now - *self->recoveredSince_ >= self->config_.switchBackDelay) { + self->switchTo(0); + } + done(); + }; + + if (primaryAvailableHint.has_value()) { + handlePrimaryAvailable(*primaryAvailableHint); + return; + } + + probeAvailableAsync(config_.primary, std::move(handlePrimaryAvailable)); + } + + void checkSecondaryAndPrimaryAsync(CompletionCallback done) { + auto weakSelf = weak_from_this(); + probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool secondaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + const auto now = std::chrono::steady_clock::now(); + if (secondaryAvailable) { + self->failedSince_.reset(); + self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); + return; + } + + if (!self->failedSince_) { + self->failedSince_ = now; + self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); + return; + } + + if (now - *self->failedSince_ < self->config_.failoverDelay) { + self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); + return; + } + + self->probeAvailableAsync(self->config_.primary, + [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (primaryAvailable) { + self->switchTo(0); + done(); + return; + } + + self->checkSwitchBackToPrimaryAsync(std::move(done), false); + }); + }); + } +}; + +AutoClusterFailover::AutoClusterFailover(Config&& config) + : impl_(std::make_shared(std::move(config))) {} + +AutoClusterFailover::~AutoClusterFailover() {} + +ServiceInfo AutoClusterFailover::initialServiceInfo() { return impl_->primary(); } + +void AutoClusterFailover::initialize(std::function onServiceInfoUpdate) { + impl_->initialize(std::move(onServiceInfoUpdate)); +} + +} // namespace pulsar diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 82f5f6f7..051a3332 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -17,16 +17,20 @@ * under the License. */ #include +#include #include #include +#include #include #include #include #include +#include #include "PulsarFriend.h" #include "WaitUtils.h" +#include "lib/AsioDefines.h" #include "lib/LogUtils.h" DECLARE_LOG_OBJECT() @@ -34,6 +38,113 @@ DECLARE_LOG_OBJECT() using namespace pulsar; using namespace std::chrono_literals; +namespace { + +class ProbeTcpServer { + public: + ProbeTcpServer() { start(); } + + ~ProbeTcpServer() { stop(); } + + void start() { + if (running_) { + return; + } + + auto ioContext = std::unique_ptr(new ASIO::io_context); + auto acceptor = std::unique_ptr(new ASIO::ip::tcp::acceptor(*ioContext)); + ASIO::ip::tcp::endpoint endpoint{ASIO::ip::tcp::v4(), static_cast(port_)}; + acceptor->open(endpoint.protocol()); + acceptor->set_option(ASIO::ip::tcp::acceptor::reuse_address(true)); + acceptor->bind(endpoint); + acceptor->listen(); + + port_ = acceptor->local_endpoint().port(); + ioContext_ = std::move(ioContext); + acceptor_ = std::move(acceptor); + running_ = true; + + scheduleAccept(); + serverThread_ = std::thread([this] { ioContext_->run(); }); + } + + void stop() { + if (!running_.exchange(false)) { + return; + } + + ASIO::post(*ioContext_, [this] { + ASIO_ERROR ignored; + if (acceptor_ && acceptor_->is_open()) { + acceptor_->close(ignored); + } + }); + + if (serverThread_.joinable()) { + serverThread_.join(); + } + + acceptor_.reset(); + ioContext_.reset(); + } + + std::string getServiceUrl() const { return "pulsar://127.0.0.1:" + std::to_string(port_); } + + private: + void scheduleAccept() { + if (!running_ || !acceptor_ || !acceptor_->is_open()) { + return; + } + + auto socket = std::make_shared(*ioContext_); + acceptor_->async_accept(*socket, [this, socket](const ASIO_ERROR &error) { + if (!error) { + ASIO_ERROR ignored; + socket->close(ignored); + } + + if (running_ && acceptor_ && acceptor_->is_open()) { + scheduleAccept(); + } + }); + } + + int port_{0}; + std::atomic_bool running_{false}; + std::unique_ptr ioContext_; + std::unique_ptr acceptor_; + std::thread serverThread_; +}; + +class ServiceUrlObserver { + public: + void onUpdate(const ServiceInfo &serviceInfo) { + std::lock_guard lock(mutex_); + serviceUrls_.emplace_back(serviceInfo.serviceUrl()); + } + + size_t size() const { + std::lock_guard lock(mutex_); + return serviceUrls_.size(); + } + + std::string last() const { + std::lock_guard lock(mutex_); + return serviceUrls_.empty() ? std::string() : serviceUrls_.back(); + } + + std::vector snapshot() const { + std::lock_guard lock(mutex_); + return serviceUrls_; + } + + private: + mutable std::mutex mutex_; + std::vector serviceUrls_; +}; + +} // namespace + class ServiceInfoHolder { public: ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {} @@ -93,6 +204,83 @@ class TestServiceInfoProvider : public ServiceInfoProvider { mutable std::mutex mutex_; }; +TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) { + try { + ProbeTcpServer availableSecondary; + ProbeTcpServer unavailableSecondary; + const auto primaryUrl = unavailableSecondary.getServiceUrl(); + unavailableSecondary.stop(); + + ProbeTcpServer skippedSecondary; + const auto skippedSecondaryUrl = skippedSecondary.getServiceUrl(); + skippedSecondary.stop(); + + const auto availableSecondaryUrl = availableSecondary.getServiceUrl(); + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(skippedSecondaryUrl), + ServiceInfo(availableSecondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverDelay(120ms) + .withSwitchBackDelay(120ms) + .build(); + + ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl); + + provider.initialize([&observer](ServiceInfo serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_TRUE(waitUntil(1s, [&observer] { return observer.size() >= 1; })); + ASSERT_EQ(observer.last(), primaryUrl); + ASSERT_FALSE(waitUntil( + 80ms, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); + ASSERT_TRUE(waitUntil( + 2s, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 2u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], availableSecondaryUrl); + } catch (const ASIO_SYSTEM_ERROR &e) { + GTEST_SKIP() << "Cannot bind local probe server in this environment: " << e.what(); + } +} + +TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) { + try { + ProbeTcpServer primary; + const auto primaryUrl = primary.getServiceUrl(); + primary.stop(); + + ProbeTcpServer secondary; + const auto secondaryUrl = secondary.getServiceUrl(); + + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(secondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverDelay(80ms) + .withSwitchBackDelay(120ms) + .build(); + + provider.initialize([&observer](ServiceInfo serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); + + primary.start(); + + ASSERT_FALSE(waitUntil(80ms, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); + ASSERT_TRUE(waitUntil(2s, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 3u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], secondaryUrl); + ASSERT_EQ(updates[2], primaryUrl); + } catch (const ASIO_SYSTEM_ERROR &e) { + GTEST_SKIP() << "Cannot bind local probe server in this environment: " << e.what(); + } +} + TEST(ServiceInfoProviderTest, testSwitchCluster) { extern std::string getToken(); // from tests/AuthTokenTest.cc // Access "private/auth" namespace in cluster 1 From d15a8228673e7c5cd14ca48f9b3f2c06a7304e9b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 13 Mar 2026 18:44:14 +0800 Subject: [PATCH 02/13] fix tidy errors --- tests/ServiceInfoProviderTest.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 051a3332..95e4418e 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -227,7 +227,7 @@ TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) { ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl); - provider.initialize([&observer](ServiceInfo serviceInfo) { observer.onUpdate(serviceInfo); }); + provider.initialize([&observer](const ServiceInfo& serviceInfo) { observer.onUpdate(serviceInfo); }); ASSERT_TRUE(waitUntil(1s, [&observer] { return observer.size() >= 1; })); ASSERT_EQ(observer.last(), primaryUrl); @@ -262,7 +262,7 @@ TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) { .withSwitchBackDelay(120ms) .build(); - provider.initialize([&observer](ServiceInfo serviceInfo) { observer.onUpdate(serviceInfo); }); + provider.initialize([&observer](const ServiceInfo& serviceInfo) { observer.onUpdate(serviceInfo); }); ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); From f56ac5cd66169a008129f8ca55540ae1b2066fb2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 13 Mar 2026 19:27:17 +0800 Subject: [PATCH 03/13] fix format --- tests/ServiceInfoProviderTest.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 95e4418e..95521179 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -227,7 +227,7 @@ TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) { ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl); - provider.initialize([&observer](const ServiceInfo& serviceInfo) { observer.onUpdate(serviceInfo); }); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); ASSERT_TRUE(waitUntil(1s, [&observer] { return observer.size() >= 1; })); ASSERT_EQ(observer.last(), primaryUrl); @@ -262,7 +262,7 @@ TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) { .withSwitchBackDelay(120ms) .build(); - provider.initialize([&observer](const ServiceInfo& serviceInfo) { observer.onUpdate(serviceInfo); }); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); From 8a0d64407f9a654465da5f3183bf1c716d9b2f81 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 16 Mar 2026 15:32:19 +0800 Subject: [PATCH 04/13] address some comments --- include/pulsar/AutoClusterFailover.h | 13 +++++++------ include/pulsar/ServiceInfo.h | 1 - lib/AutoClusterFailover.cc | 8 +++----- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h index 40c27ecd..c6f056f8 100644 --- a/include/pulsar/AutoClusterFailover.h +++ b/include/pulsar/AutoClusterFailover.h @@ -31,11 +31,14 @@ class AutoClusterFailoverImpl; class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { public: struct Config { - ServiceInfo primary; - std::vector secondary; + const ServiceInfo primary; + const std::vector secondary; std::chrono::milliseconds checkInterval{30000}; // 30 seconds std::chrono::milliseconds failoverDelay{30000}; // 30 seconds std::chrono::milliseconds switchBackDelay{60000}; // 60 seconds + + Config(ServiceInfo primary, std::vector secondary) + : primary(std::move(primary)), secondary(std::move(secondary)) {} }; /** @@ -59,10 +62,8 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { */ class Builder { public: - Builder(ServiceInfo primary, std::vector secondary) { - config_.primary = std::move(primary); - config_.secondary = std::move(secondary); - } + Builder(ServiceInfo primary, std::vector secondary) + : config_(std::move(primary), std::move(secondary)) {} // Set how frequently probes run against the active cluster(s). Builder& withCheckInterval(std::chrono::milliseconds interval) { diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h index 23d3b6d5..1f63ce38 100644 --- a/include/pulsar/ServiceInfo.h +++ b/include/pulsar/ServiceInfo.h @@ -32,7 +32,6 @@ namespace pulsar { */ class PULSAR_PUBLIC ServiceInfo final { public: - ServiceInfo() = default; // only for storing in containers like std::vector, not for public use ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication = AuthFactory::Disabled(), std::optional tlsTrustCertsFilePath = std::nullopt); diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index 13f2405e..02f283cd 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -26,9 +26,10 @@ #include #include +#include "AsioTimer.h" +#include "LogUtils.h" #include "ServiceURI.h" #include "Url.h" -#include "lib/LogUtils.h" #ifdef USE_ASIO #include @@ -65,9 +66,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thiscancel(); - } + cancelTimer(*timer_); workGuard_.reset(); if (future_.valid()) { if (auto result = future_.wait_for(3s); result != std::future_status::ready) { @@ -91,7 +90,6 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisonServiceInfoUpdate_(self->current()); self->scheduleFailoverCheck(); } }); From 2664b932b29e7d288c95b1176ba242377857a164 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 16 Mar 2026 16:43:45 +0800 Subject: [PATCH 05/13] fix incorrect thread model --- lib/AutoClusterFailover.cc | 46 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index 02f283cd..cce1949d 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -66,17 +66,21 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisscheduleFailoverCheck(); } }); - future_ = std::async(std::launch::async, [this] { ioContext_.run(); }); + // Capturing `this` is safe because the thread will be joined in the destructor + std::promise promise; + future_ = promise.get_future(); + thread_ = std::thread([this, promise{std::move(promise)}]() mutable { + ioContext_.run(); + promise.set_value(); + }); } private: @@ -123,7 +132,10 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this failedSince_; std::optional recoveredSince_; + + std::thread thread_; std::future future_; + ASIO::io_context ioContext_; std::function onServiceInfoUpdate_; @@ -133,22 +145,16 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisexpires_after(config_.checkInterval); auto weakSelf = weak_from_this(); - timer_->async_wait([this, weakSelf](ASIO_ERROR error) { + timer_->async_wait([weakSelf](ASIO_ERROR error) { if (error) { LOG_INFO("Failover check timer is cancelled or failed: " << error.message()); return; } - auto self = weakSelf.lock(); - if (!self) { - LOG_INFO("AutoClusterFailoverImpl is destroyed, skip failover check"); - return; + if (auto self = weakSelf.lock()) { + self->executeFailoverCheck(); } - executeFailoverCheck(); }); } From 4cfd0d30c4249feddb6615a8fb6ed24dc58f3b93 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 16 Mar 2026 17:41:17 +0800 Subject: [PATCH 06/13] avoid copy on original service infos in config --- lib/AutoClusterFailover.cc | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index cce1949d..5b1e2e75 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -56,13 +56,7 @@ namespace pulsar { class AutoClusterFailoverImpl : public std::enable_shared_from_this { public: AutoClusterFailoverImpl(AutoClusterFailover::Config&& config) - : config_(std::move(config)), currentIndex_(0) { - clusters_.reserve(1 + config_.secondary.size()); - clusters_.emplace_back(config_.primary); - for (const auto& info : config_.secondary) { - clusters_.emplace_back(info); - } - } + : config_(std::move(config)), currentServiceInfo_(&config_.primary) {} ~AutoClusterFailoverImpl() { using namespace std::chrono_literals; @@ -128,8 +122,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this clusters_; - size_t currentIndex_; + const ServiceInfo* currentServiceInfo_; std::optional failedSince_; std::optional recoveredSince_; @@ -142,7 +135,9 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this> workGuard_; std::optional timer_; - const ServiceInfo& current() const noexcept { return clusters_[currentIndex_]; } + bool isUsingPrimary() const noexcept { return currentServiceInfo_ == &config_.primary; } + + const ServiceInfo& current() const noexcept { return *currentServiceInfo_; } void scheduleFailoverCheck() { timer_->expires_after(config_.checkInterval); @@ -165,7 +160,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisserviceUrl()); + currentServiceInfo_ = serviceInfo; failedSince_.reset(); recoveredSince_.reset(); onServiceInfoUpdate_(current()); } void probeSecondaryFrom(size_t index, CompletionCallback done) { - if (index >= clusters_.size()) { + if (index >= config_.secondary.size()) { done(); return; } auto weakSelf = weak_from_this(); - probeAvailableAsync(clusters_[index], + probeAvailableAsync(config_.secondary[index], [weakSelf, index, done = std::move(done)](bool available) mutable { auto self = weakSelf.lock(); if (!self) { @@ -284,7 +278,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisswitchTo(index); + self->switchTo(&self->config_.secondary[index]); done(); return; } @@ -319,7 +313,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisprobeSecondaryFrom(1, std::move(done)); + self->probeSecondaryFrom(0, std::move(done)); }); } @@ -345,7 +339,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisrecoveredSince_ >= self->config_.switchBackDelay) { - self->switchTo(0); + self->switchTo(&self->config_.primary); } done(); }; @@ -392,7 +386,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisswitchTo(0); + self->switchTo(&self->config_.primary); done(); return; } From 13dbf89529efbce71e7ee97868c2ab8959ca4684 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 16 Mar 2026 17:43:18 +0800 Subject: [PATCH 07/13] fix lifetime of probeHostsAsync --- lib/AutoClusterFailover.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index 5b1e2e75..7e340099 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -227,12 +227,15 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisprobeHostsAsync(hosts, index + 1, std::move(callback)); + } }); } From 88bacdb46dba76d1b8880565cc5a57a373cfabc8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 16 Mar 2026 19:34:40 +0800 Subject: [PATCH 08/13] remove unnecessary logs --- lib/AutoClusterFailover.cc | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index 7e340099..41dac90e 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -144,7 +144,6 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisasync_wait([weakSelf](ASIO_ERROR error) { if (error) { - LOG_INFO("Failover check timer is cancelled or failed: " << error.message()); return; } if (auto self = weakSelf.lock()) { @@ -179,12 +178,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thissocket.close(ignored); context->timer.cancel(ignored); - if (error && error != ASIO::error::operation_aborted) { - LOG_DEBUG("Probe error for " << context->hostUrl << ": " << error.message()); - } - - auto callback = std::move(context->callback); - callback(success); + context->callback(success); } void probeHostAsync(const std::string& hostUrl, ProbeCallback callback) { From afc659f426a4442435ed815b0f2f1f0257676ed7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 16 Mar 2026 20:29:04 +0800 Subject: [PATCH 09/13] replace the delay-based configs with count-based configs --- include/pulsar/AutoClusterFailover.h | 29 +++++++------ lib/AutoClusterFailover.cc | 62 ++++++++++------------------ tests/ServiceInfoProviderTest.cc | 8 ++-- 3 files changed, 41 insertions(+), 58 deletions(-) diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h index c6f056f8..f9517abd 100644 --- a/include/pulsar/AutoClusterFailover.h +++ b/include/pulsar/AutoClusterFailover.h @@ -22,6 +22,7 @@ #include #include +#include namespace pulsar { @@ -33,9 +34,9 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { struct Config { const ServiceInfo primary; const std::vector secondary; - std::chrono::milliseconds checkInterval{30000}; // 30 seconds - std::chrono::milliseconds failoverDelay{30000}; // 30 seconds - std::chrono::milliseconds switchBackDelay{60000}; // 60 seconds + std::chrono::milliseconds checkInterval{30000}; // 30 seconds + uint32_t failoverThreshold{1}; + uint32_t switchBackThreshold{1}; Config(ServiceInfo primary, std::vector secondary) : primary(std::move(primary)), secondary(std::move(secondary)) {} @@ -49,16 +50,18 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { * std::vector secondaries{...}; * AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries) * .withCheckInterval(std::chrono::seconds(30)) - * .withFailoverDelay(std::chrono::seconds(30)) - * .withSwitchBackDelay(std::chrono::seconds(60)) + * .withFailoverThreshold(3) + * .withSwitchBackThreshold(3) * .build(); * * Notes: * - primary: the preferred cluster to use when available. * - secondary: ordered list of fallback clusters. * - checkInterval: frequency of health probes. - * - failoverDelay: how long the current cluster must be unreachable before switching. - * - switchBackDelay: how long the primary must remain healthy before switching back. + * - failoverThreshold: the number of consecutive failed probes required before switching away from + * the current cluster. + * - switchBackThreshold: the number of consecutive successful probes to the primary required before + * switching back from a secondary. */ class Builder { public: @@ -71,15 +74,15 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { return *this; } - // Set how long the current cluster must be unreachable before attempting failover. - Builder& withFailoverDelay(std::chrono::milliseconds delay) { - config_.failoverDelay = delay; + // Set the number of consecutive failed probes required before attempting failover. + Builder& withFailoverThreshold(uint32_t threshold) { + config_.failoverThreshold = threshold; return *this; } - // Set how long the primary must remain healthy before switching back from a secondary. - Builder& withSwitchBackDelay(std::chrono::milliseconds delay) { - config_.switchBackDelay = delay; + // Set the number of consecutive successful primary probes required before switching back. + Builder& withSwitchBackThreshold(uint32_t threshold) { + config_.switchBackThreshold = threshold; return *this; } diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index 41dac90e..86e61293 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -123,8 +123,8 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this failedSince_; - std::optional recoveredSince_; + uint32_t consecutiveFailureCount_{0}; + uint32_t consecutivePrimaryRecoveryCount_{0}; std::thread thread_; std::future future_; @@ -222,15 +222,16 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisprobeHostsAsync(hosts, index + 1, std::move(callback)); - } - }); + probeHostAsync(hostUrl, + [weakSelf, hosts, index, callback = std::move(callback)](bool available) mutable { + if (available) { + callback(true); + return; + } + if (auto self = weakSelf.lock()) { + self->probeHostsAsync(hosts, index + 1, std::move(callback)); + } + }); } void probeAvailableAsync(const ServiceInfo& serviceInfo, ProbeCallback callback) { @@ -255,8 +256,8 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisserviceUrl()); currentServiceInfo_ = serviceInfo; - failedSince_.reset(); - recoveredSince_.reset(); + consecutiveFailureCount_ = 0; + consecutivePrimaryRecoveryCount_ = 0; onServiceInfoUpdate_(current()); } @@ -292,20 +293,13 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisfailedSince_.reset(); - done(); - return; - } - - if (!self->failedSince_) { - self->failedSince_ = now; + self->consecutiveFailureCount_ = 0; done(); return; } - if (now - *self->failedSince_ < self->config_.failoverDelay) { + if (++self->consecutiveFailureCount_ < self->config_.failoverThreshold) { done(); return; } @@ -315,8 +309,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this primaryAvailableHint) { - const auto now = std::chrono::steady_clock::now(); - auto handlePrimaryAvailable = [weakSelf = weak_from_this(), now, + auto handlePrimaryAvailable = [weakSelf = weak_from_this(), done = std::move(done)](bool primaryAvailable) mutable { auto self = weakSelf.lock(); if (!self) { @@ -324,18 +317,12 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisrecoveredSince_.reset(); + self->consecutivePrimaryRecoveryCount_ = 0; done(); return; } - if (!self->recoveredSince_) { - self->recoveredSince_ = now; - done(); - return; - } - - if (now - *self->recoveredSince_ >= self->config_.switchBackDelay) { + if (++self->consecutivePrimaryRecoveryCount_ >= self->config_.switchBackThreshold) { self->switchTo(&self->config_.primary); } done(); @@ -357,20 +344,13 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisfailedSince_.reset(); - self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); - return; - } - - if (!self->failedSince_) { - self->failedSince_ = now; + self->consecutiveFailureCount_ = 0; self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); return; } - if (now - *self->failedSince_ < self->config_.failoverDelay) { + if (++self->consecutiveFailureCount_ < self->config_.failoverThreshold) { self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); return; } diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 95521179..be84283e 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -221,8 +221,8 @@ TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) { AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(skippedSecondaryUrl), ServiceInfo(availableSecondaryUrl)}) .withCheckInterval(20ms) - .withFailoverDelay(120ms) - .withSwitchBackDelay(120ms) + .withFailoverThreshold(6) + .withSwitchBackThreshold(6) .build(); ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl); @@ -258,8 +258,8 @@ TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) { AutoClusterFailover provider = AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(secondaryUrl)}) .withCheckInterval(20ms) - .withFailoverDelay(80ms) - .withSwitchBackDelay(120ms) + .withFailoverThreshold(4) + .withSwitchBackThreshold(6) .build(); provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); From fb0cb42ef1691204ffe823ef384bc4838d386498 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 16 Mar 2026 20:40:41 +0800 Subject: [PATCH 10/13] reduce the default check interval --- include/pulsar/AutoClusterFailover.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h index f9517abd..ae52d732 100644 --- a/include/pulsar/AutoClusterFailover.h +++ b/include/pulsar/AutoClusterFailover.h @@ -34,7 +34,7 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { struct Config { const ServiceInfo primary; const std::vector secondary; - std::chrono::milliseconds checkInterval{30000}; // 30 seconds + std::chrono::milliseconds checkInterval{5000}; // 5 seconds uint32_t failoverThreshold{1}; uint32_t switchBackThreshold{1}; @@ -49,7 +49,7 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { * ServiceInfo primary{...}; * std::vector secondaries{...}; * AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries) - * .withCheckInterval(std::chrono::seconds(30)) + * .withCheckInterval(std::chrono::seconds(5)) * .withFailoverThreshold(3) * .withSwitchBackThreshold(3) * .build(); @@ -68,19 +68,19 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { Builder(ServiceInfo primary, std::vector secondary) : config_(std::move(primary), std::move(secondary)) {} - // Set how frequently probes run against the active cluster(s). + // Set how frequently probes run against the active cluster(s). Default: 5 seconds. Builder& withCheckInterval(std::chrono::milliseconds interval) { config_.checkInterval = interval; return *this; } - // Set the number of consecutive failed probes required before attempting failover. + // Set the number of consecutive failed probes required before attempting failover. Default: 1. Builder& withFailoverThreshold(uint32_t threshold) { config_.failoverThreshold = threshold; return *this; } - // Set the number of consecutive successful primary probes required before switching back. + // Set the number of consecutive successful primary probes required before switching back. Default: 1. Builder& withSwitchBackThreshold(uint32_t threshold) { config_.switchBackThreshold = threshold; return *this; From aa93505e2e4004cd789ccfee1c87ff175a57b409 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 17 Mar 2026 11:23:50 +0800 Subject: [PATCH 11/13] fix failed tests --- lib/AutoClusterFailover.cc | 39 +++++---- tests/ServiceInfoProviderTest.cc | 134 +++++++++++++++---------------- 2 files changed, 86 insertions(+), 87 deletions(-) diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index 86e61293..c601dc9c 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -74,7 +74,6 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisconfig_.secondary[index].serviceUrl() + << " availability: " << available); if (available) { self->switchTo(&self->config_.secondary[index]); done(); @@ -293,6 +294,8 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thiscurrent().serviceUrl() + << " availability: " << primaryAvailable); if (primaryAvailable) { self->consecutiveFailureCount_ = 0; done(); @@ -344,6 +347,8 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thiscurrent().serviceUrl() + << " availability: " << secondaryAvailable); if (secondaryAvailable) { self->consecutiveFailureCount_ = 0; self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); @@ -355,21 +360,23 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisprobeAvailableAsync(self->config_.primary, - [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { - auto self = weakSelf.lock(); - if (!self) { - return; - } - - if (primaryAvailable) { - self->switchTo(&self->config_.primary); - done(); - return; - } - - self->checkSwitchBackToPrimaryAsync(std::move(done), false); - }); + self->probeAvailableAsync( + self->config_.primary, [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + LOG_DEBUG("Detected primary after secondary is available " + << self->config_.primary.serviceUrl() << " availability: " << primaryAvailable); + if (primaryAvailable) { + self->switchTo(&self->config_.primary); + done(); + return; + } + + self->checkSwitchBackToPrimaryAsync(std::move(done), false); + }); }); } }; diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index be84283e..42f98958 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -205,80 +205,72 @@ class TestServiceInfoProvider : public ServiceInfoProvider { }; TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) { - try { - ProbeTcpServer availableSecondary; - ProbeTcpServer unavailableSecondary; - const auto primaryUrl = unavailableSecondary.getServiceUrl(); - unavailableSecondary.stop(); - - ProbeTcpServer skippedSecondary; - const auto skippedSecondaryUrl = skippedSecondary.getServiceUrl(); - skippedSecondary.stop(); - - const auto availableSecondaryUrl = availableSecondary.getServiceUrl(); - ServiceUrlObserver observer; - AutoClusterFailover provider = - AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(skippedSecondaryUrl), - ServiceInfo(availableSecondaryUrl)}) - .withCheckInterval(20ms) - .withFailoverThreshold(6) - .withSwitchBackThreshold(6) - .build(); - - ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl); - - provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); - - ASSERT_TRUE(waitUntil(1s, [&observer] { return observer.size() >= 1; })); - ASSERT_EQ(observer.last(), primaryUrl); - ASSERT_FALSE(waitUntil( - 80ms, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); - ASSERT_TRUE(waitUntil( - 2s, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); - - const auto updates = observer.snapshot(); - ASSERT_EQ(updates.size(), 2u); - ASSERT_EQ(updates[0], primaryUrl); - ASSERT_EQ(updates[1], availableSecondaryUrl); - } catch (const ASIO_SYSTEM_ERROR &e) { - GTEST_SKIP() << "Cannot bind local probe server in this environment: " << e.what(); - } + ProbeTcpServer availableSecondary; + ProbeTcpServer unavailableSecondary; + const auto primaryUrl = unavailableSecondary.getServiceUrl(); + unavailableSecondary.stop(); + + ProbeTcpServer skippedSecondary; + const auto skippedSecondaryUrl = skippedSecondary.getServiceUrl(); + skippedSecondary.stop(); + + const auto availableSecondaryUrl = availableSecondary.getServiceUrl(); + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), + {ServiceInfo(skippedSecondaryUrl), ServiceInfo(availableSecondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverThreshold(6) + .withSwitchBackThreshold(6) + .build(); + + ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl); + + observer.onUpdate(provider.initialServiceInfo()); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_FALSE(waitUntil( + 80ms, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); + ASSERT_TRUE(waitUntil( + 2s, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 2u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], availableSecondaryUrl); } TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) { - try { - ProbeTcpServer primary; - const auto primaryUrl = primary.getServiceUrl(); - primary.stop(); - - ProbeTcpServer secondary; - const auto secondaryUrl = secondary.getServiceUrl(); - - ServiceUrlObserver observer; - AutoClusterFailover provider = - AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(secondaryUrl)}) - .withCheckInterval(20ms) - .withFailoverThreshold(4) - .withSwitchBackThreshold(6) - .build(); - - provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); - - ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); - - primary.start(); - - ASSERT_FALSE(waitUntil(80ms, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); - ASSERT_TRUE(waitUntil(2s, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); - - const auto updates = observer.snapshot(); - ASSERT_EQ(updates.size(), 3u); - ASSERT_EQ(updates[0], primaryUrl); - ASSERT_EQ(updates[1], secondaryUrl); - ASSERT_EQ(updates[2], primaryUrl); - } catch (const ASIO_SYSTEM_ERROR &e) { - GTEST_SKIP() << "Cannot bind local probe server in this environment: " << e.what(); - } + ProbeTcpServer primary; + const auto primaryUrl = primary.getServiceUrl(); + primary.stop(); + + ProbeTcpServer secondary; + const auto secondaryUrl = secondary.getServiceUrl(); + + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(secondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverThreshold(4) + .withSwitchBackThreshold(6) + .build(); + + observer.onUpdate(provider.initialServiceInfo()); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); + + primary.start(); + + ASSERT_FALSE(waitUntil(80ms, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); + ASSERT_TRUE(waitUntil(2s, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 3u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], secondaryUrl); + ASSERT_EQ(updates[2], primaryUrl); } TEST(ServiceInfoProviderTest, testSwitchCluster) { From bf5edbecb16fd841fda0e8ba34c200409fa7ed88 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 17 Mar 2026 11:52:34 +0800 Subject: [PATCH 12/13] address some comments --- include/pulsar/AutoClusterFailover.h | 11 +++++++++-- lib/AutoClusterFailover.cc | 12 +++++------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h index ae52d732..a9d74428 100644 --- a/include/pulsar/AutoClusterFailover.h +++ b/include/pulsar/AutoClusterFailover.h @@ -23,6 +23,9 @@ #include #include +#include +#include +#include namespace pulsar { @@ -61,7 +64,9 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { * - failoverThreshold: the number of consecutive failed probes required before switching away from * the current cluster. * - switchBackThreshold: the number of consecutive successful probes to the primary required before - * switching back from a secondary. + * switching back from a secondary while that secondary remains available. If the active secondary + * becomes unavailable and the primary is available, the implementation may switch back to the + * primary immediately, regardless of this threshold. */ class Builder { public: @@ -80,7 +85,9 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { return *this; } - // Set the number of consecutive successful primary probes required before switching back. Default: 1. + // Set the number of consecutive successful primary probes required before switching back from a + // healthy secondary. If the active secondary becomes unavailable and the primary is available, + // the implementation may switch back immediately regardless of this threshold. Default: 1. Builder& withSwitchBackThreshold(uint32_t threshold) { config_.switchBackThreshold = threshold; return *this; diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index c601dc9c..fa40c579 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -60,7 +60,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisconfig_.primary.serviceUrl() << " availability: " << primaryAvailable); if (primaryAvailable) { self->switchTo(&self->config_.primary); From 997c98537c95127d5e7db207e5d84a648c9411b5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 17 Mar 2026 12:08:37 +0800 Subject: [PATCH 13/13] fix secondary list are not respected --- lib/AutoClusterFailover.cc | 99 ++++++++++++++++++++------------ tests/ServiceInfoProviderTest.cc | 38 ++++++++++++ 2 files changed, 100 insertions(+), 37 deletions(-) diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc index fa40c579..4fdfc1e4 100644 --- a/lib/AutoClusterFailover.cc +++ b/lib/AutoClusterFailover.cc @@ -258,30 +258,36 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this= config_.secondary.size()) { - done(); + callback(false); + return; + } + + if (&config_.secondary[index] == excludedServiceInfo) { + probeSecondaryFrom(index + 1, excludedServiceInfo, std::move(callback)); return; } auto weakSelf = weak_from_this(); - probeAvailableAsync(config_.secondary[index], - [weakSelf, index, done = std::move(done)](bool available) mutable { - auto self = weakSelf.lock(); - if (!self) { - return; - } - - LOG_DEBUG("Detected secondary " << self->config_.secondary[index].serviceUrl() - << " availability: " << available); - if (available) { - self->switchTo(&self->config_.secondary[index]); - done(); - return; - } - - self->probeSecondaryFrom(index + 1, std::move(done)); - }); + probeAvailableAsync( + config_.secondary[index], + [weakSelf, index, excludedServiceInfo, callback = std::move(callback)](bool available) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + LOG_DEBUG("Detected secondary " << self->config_.secondary[index].serviceUrl() + << " availability: " << available); + if (available) { + self->switchTo(&self->config_.secondary[index]); + callback(true); + return; + } + + self->probeSecondaryFrom(index + 1, excludedServiceInfo, std::move(callback)); + }); } void checkAndFailoverToSecondaryAsync(CompletionCallback done) { @@ -305,10 +311,45 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisprobeSecondaryFrom(0, std::move(done)); + self->probeSecondaryFrom(0, nullptr, [done = std::move(done)](bool) mutable { done(); }); }); } + void failoverFromUnavailableSecondaryAsync(CompletionCallback done) { + auto weakSelf = weak_from_this(); + probeAvailableAsync( + config_.primary, [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + LOG_DEBUG("Detected primary while secondary is unavailable " + << self->config_.primary.serviceUrl() << " availability: " << primaryAvailable); + if (primaryAvailable) { + self->switchTo(&self->config_.primary); + done(); + return; + } + + self->probeSecondaryFrom( + 0, self->currentServiceInfo_, + [weakSelf, done = std::move(done)](bool switchedToAnotherSecondary) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (switchedToAnotherSecondary) { + done(); + return; + } + + self->checkSwitchBackToPrimaryAsync(std::move(done), false); + }); + }); + } + void checkSwitchBackToPrimaryAsync(CompletionCallback done, std::optional primaryAvailableHint) { auto handlePrimaryAvailable = [weakSelf = weak_from_this(), done = std::move(done)](bool primaryAvailable) mutable { @@ -358,23 +399,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_thisprobeAvailableAsync( - self->config_.primary, [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { - auto self = weakSelf.lock(); - if (!self) { - return; - } - - LOG_DEBUG("Detected primary while secondary is unavailable " - << self->config_.primary.serviceUrl() << " availability: " << primaryAvailable); - if (primaryAvailable) { - self->switchTo(&self->config_.primary); - done(); - return; - } - - self->checkSwitchBackToPrimaryAsync(std::move(done), false); - }); + self->failoverFromUnavailableSecondaryAsync(std::move(done)); }); } }; diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 42f98958..175c5319 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -273,6 +273,44 @@ TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) { ASSERT_EQ(updates[2], primaryUrl); } +TEST(AutoClusterFailoverTest, testFailoverToAnotherSecondaryWhenCurrentSecondaryIsUnavailable) { + ProbeTcpServer primary; + const auto primaryUrl = primary.getServiceUrl(); + primary.stop(); + + ProbeTcpServer firstSecondary; + const auto firstSecondaryUrl = firstSecondary.getServiceUrl(); + + ProbeTcpServer secondSecondary; + const auto secondSecondaryUrl = secondSecondary.getServiceUrl(); + + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), + {ServiceInfo(firstSecondaryUrl), ServiceInfo(secondSecondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverThreshold(4) + .withSwitchBackThreshold(6) + .build(); + + observer.onUpdate(provider.initialServiceInfo()); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_TRUE( + waitUntil(2s, [&observer, &firstSecondaryUrl] { return observer.last() == firstSecondaryUrl; })); + + firstSecondary.stop(); + + ASSERT_TRUE( + waitUntil(2s, [&observer, &secondSecondaryUrl] { return observer.last() == secondSecondaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 3u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], firstSecondaryUrl); + ASSERT_EQ(updates[2], secondSecondaryUrl); +} + TEST(ServiceInfoProviderTest, testSwitchCluster) { extern std::string getToken(); // from tests/AuthTokenTest.cc // Access "private/auth" namespace in cluster 1