diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h new file mode 100644 index 00000000..a9d74428 --- /dev/null +++ b/include/pulsar/AutoClusterFailover.h @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_ +#define PULSAR_AUTO_CLUSTER_FAILOVER_H_ + +#include + +#include +#include +#include +#include +#include + +namespace pulsar { + +class Client; +class AutoClusterFailoverImpl; + +class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { + public: + struct Config { + const ServiceInfo primary; + const std::vector secondary; + std::chrono::milliseconds checkInterval{5000}; // 5 seconds + uint32_t failoverThreshold{1}; + uint32_t switchBackThreshold{1}; + + Config(ServiceInfo primary, std::vector secondary) + : primary(std::move(primary)), secondary(std::move(secondary)) {} + }; + + /** + * Builder helps create an AutoClusterFailover configuration. + * + * Example: + * ServiceInfo primary{...}; + * std::vector secondaries{...}; + * AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries) + * .withCheckInterval(std::chrono::seconds(5)) + * .withFailoverThreshold(3) + * .withSwitchBackThreshold(3) + * .build(); + * + * Notes: + * - primary: the preferred cluster to use when available. + * - secondary: ordered list of fallback clusters. + * - checkInterval: frequency of health probes. + * - failoverThreshold: the number of consecutive failed probes required before switching away from + * the current cluster. + * - switchBackThreshold: the number of consecutive successful probes to the primary required before + * switching back from a secondary while that secondary remains available. If the active secondary + * becomes unavailable and the primary is available, the implementation may switch back to the + * primary immediately, regardless of this threshold. + */ + class Builder { + public: + Builder(ServiceInfo primary, std::vector secondary) + : config_(std::move(primary), std::move(secondary)) {} + + // Set how frequently probes run against the active cluster(s). Default: 5 seconds. + Builder& withCheckInterval(std::chrono::milliseconds interval) { + config_.checkInterval = interval; + return *this; + } + + // Set the number of consecutive failed probes required before attempting failover. Default: 1. + Builder& withFailoverThreshold(uint32_t threshold) { + config_.failoverThreshold = threshold; + return *this; + } + + // Set the number of consecutive successful primary probes required before switching back from a + // healthy secondary. If the active secondary becomes unavailable and the primary is available, + // the implementation may switch back immediately regardless of this threshold. Default: 1. + Builder& withSwitchBackThreshold(uint32_t threshold) { + config_.switchBackThreshold = threshold; + return *this; + } + + AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); } + + private: + Config config_; + }; + + explicit AutoClusterFailover(Config&& config); + + ~AutoClusterFailover() final; + + ServiceInfo initialServiceInfo() final; + + void initialize(std::function onServiceInfoUpdate) final; + + private: + std::shared_ptr impl_; +}; + +} // namespace pulsar + +#endif diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc new file mode 100644 index 00000000..4fdfc1e4 --- /dev/null +++ b/lib/AutoClusterFailover.cc @@ -0,0 +1,418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "AsioTimer.h" +#include "LogUtils.h" +#include "ServiceURI.h" +#include "Url.h" + +#ifdef USE_ASIO +#include +#include +#include +#include +#include +#include +#else +#include +#include +#include +#include +#include +#include +#endif + +#include "AsioDefines.h" + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +class AutoClusterFailoverImpl : public std::enable_shared_from_this { + public: + AutoClusterFailoverImpl(AutoClusterFailover::Config&& config) + : config_(std::move(config)), currentServiceInfo_(&config_.primary) {} + + ~AutoClusterFailoverImpl() { + using namespace std::chrono_literals; + if (!thread_.joinable()) { + return; + } + + cancelTimer(*timer_); + workGuard_.reset(); + ioContext_.stop(); + + if (future_.wait_for(3s) != std::future_status::ready) { + LOG_WARN("AutoClusterFailoverImpl is not stopped within 3 seconds, waiting for it to finish"); + } + thread_.join(); + } + + auto primary() const noexcept { return config_.primary; } + + void initialize(std::function&& onServiceInfoUpdate) { + onServiceInfoUpdate_ = std::move(onServiceInfoUpdate); + workGuard_.emplace(ASIO::make_work_guard(ioContext_)); + timer_.emplace(ioContext_); + + auto weakSelf = weak_from_this(); + ASIO::post(ioContext_, [weakSelf] { + if (auto self = weakSelf.lock()) { + self->scheduleFailoverCheck(); + } + }); + + // Capturing `this` is safe because the thread will be joined in the destructor + std::promise promise; + future_ = promise.get_future(); + thread_ = std::thread([this, promise{std::move(promise)}]() mutable { + ioContext_.run(); + promise.set_value(); + }); + } + + private: + static constexpr std::chrono::milliseconds probeTimeout_{30000}; + using CompletionCallback = std::function; + using ProbeCallback = std::function; + + struct ProbeContext { + ASIO::ip::tcp::resolver resolver; + ASIO::ip::tcp::socket socket; + ASIO::steady_timer timer; + ProbeCallback callback; + bool done{false}; + std::string hostUrl; + + ProbeContext(ASIO::io_context& ioContext, std::string hostUrl, ProbeCallback callback) + : resolver(ioContext), + socket(ioContext), + timer(ioContext), + callback(std::move(callback)), + hostUrl(std::move(hostUrl)) {} + }; + + AutoClusterFailover::Config config_; + const ServiceInfo* currentServiceInfo_; + uint32_t consecutiveFailureCount_{0}; + uint32_t consecutivePrimaryRecoveryCount_{0}; + + std::thread thread_; + std::future future_; + + ASIO::io_context ioContext_; + std::function onServiceInfoUpdate_; + + std::optional> workGuard_; + std::optional timer_; + + bool isUsingPrimary() const noexcept { return currentServiceInfo_ == &config_.primary; } + + const ServiceInfo& current() const noexcept { return *currentServiceInfo_; } + + void scheduleFailoverCheck() { + timer_->expires_after(config_.checkInterval); + auto weakSelf = weak_from_this(); + timer_->async_wait([weakSelf](ASIO_ERROR error) { + if (error) { + return; + } + if (auto self = weakSelf.lock()) { + self->executeFailoverCheck(); + } + }); + } + + void executeFailoverCheck() { + auto done = [weakSelf = weak_from_this()] { + if (auto self = weakSelf.lock()) { + self->scheduleFailoverCheck(); + } + }; + + if (isUsingPrimary()) { + checkAndFailoverToSecondaryAsync(std::move(done)); + } else { + checkSecondaryAndPrimaryAsync(std::move(done)); + } + } + + static void completeProbe(const std::shared_ptr& context, bool success, + const ASIO_ERROR& error = ASIO_SUCCESS) { + if (context->done) { + return; + } + + context->done = true; + ASIO_ERROR ignored; + context->resolver.cancel(); + context->socket.close(ignored); + context->timer.cancel(ignored); + + context->callback(success); + } + + void probeHostAsync(const std::string& hostUrl, ProbeCallback callback) { + Url parsedUrl; + if (!Url::parse(hostUrl, parsedUrl)) { + LOG_WARN("Failed to parse service URL for probing: " << hostUrl); + callback(false); + return; + } + + auto context = std::make_shared(ioContext_, hostUrl, std::move(callback)); + context->timer.expires_after(probeTimeout_); + context->timer.async_wait([context](const ASIO_ERROR& error) { + if (!error) { + completeProbe(context, false, ASIO::error::timed_out); + } + }); + + context->resolver.async_resolve( + parsedUrl.host(), std::to_string(parsedUrl.port()), + [context](const ASIO_ERROR& error, const ASIO::ip::tcp::resolver::results_type& endpoints) { + if (error) { + completeProbe(context, false, error); + return; + } + + ASIO::async_connect( + context->socket, endpoints, + [context](const ASIO_ERROR& connectError, const ASIO::ip::tcp::endpoint&) { + completeProbe(context, !connectError, connectError); + }); + }); + } + + void probeHostsAsync(const std::shared_ptr>& hosts, size_t index, + ProbeCallback callback) { + if (index >= hosts->size()) { + callback(false); + return; + } + + auto hostUrl = (*hosts)[index]; + auto weakSelf = weak_from_this(); + probeHostAsync(hostUrl, + [weakSelf, hosts, index, callback = std::move(callback)](bool available) mutable { + if (available) { + callback(true); + return; + } + if (auto self = weakSelf.lock()) { + self->probeHostsAsync(hosts, index + 1, std::move(callback)); + } + }); + } + + void probeAvailableAsync(const ServiceInfo& serviceInfo, ProbeCallback callback) { + try { + ServiceURI serviceUri{serviceInfo.serviceUrl()}; + auto hosts = std::make_shared>(serviceUri.getServiceHosts()); + if (hosts->empty()) { + callback(false); + return; + } + probeHostsAsync(hosts, 0, std::move(callback)); + } catch (const std::exception& e) { + LOG_WARN("Failed to probe service URL " << serviceInfo.serviceUrl() << ": " << e.what()); + callback(false); + } + } + + void switchTo(const ServiceInfo* serviceInfo) { + if (currentServiceInfo_ == serviceInfo) { + return; + } + + LOG_INFO("Switch service URL from " << current().serviceUrl() << " to " << serviceInfo->serviceUrl()); + currentServiceInfo_ = serviceInfo; + consecutiveFailureCount_ = 0; + consecutivePrimaryRecoveryCount_ = 0; + onServiceInfoUpdate_(current()); + } + + void probeSecondaryFrom(size_t index, const ServiceInfo* excludedServiceInfo, ProbeCallback callback) { + if (index >= config_.secondary.size()) { + callback(false); + return; + } + + if (&config_.secondary[index] == excludedServiceInfo) { + probeSecondaryFrom(index + 1, excludedServiceInfo, std::move(callback)); + return; + } + + auto weakSelf = weak_from_this(); + probeAvailableAsync( + config_.secondary[index], + [weakSelf, index, excludedServiceInfo, callback = std::move(callback)](bool available) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + LOG_DEBUG("Detected secondary " << self->config_.secondary[index].serviceUrl() + << " availability: " << available); + if (available) { + self->switchTo(&self->config_.secondary[index]); + callback(true); + return; + } + + self->probeSecondaryFrom(index + 1, excludedServiceInfo, std::move(callback)); + }); + } + + void checkAndFailoverToSecondaryAsync(CompletionCallback done) { + auto weakSelf = weak_from_this(); + probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + LOG_DEBUG("Detected primary " << self->current().serviceUrl() + << " availability: " << primaryAvailable); + if (primaryAvailable) { + self->consecutiveFailureCount_ = 0; + done(); + return; + } + + if (++self->consecutiveFailureCount_ < self->config_.failoverThreshold) { + done(); + return; + } + + self->probeSecondaryFrom(0, nullptr, [done = std::move(done)](bool) mutable { done(); }); + }); + } + + void failoverFromUnavailableSecondaryAsync(CompletionCallback done) { + auto weakSelf = weak_from_this(); + probeAvailableAsync( + config_.primary, [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + LOG_DEBUG("Detected primary while secondary is unavailable " + << self->config_.primary.serviceUrl() << " availability: " << primaryAvailable); + if (primaryAvailable) { + self->switchTo(&self->config_.primary); + done(); + return; + } + + self->probeSecondaryFrom( + 0, self->currentServiceInfo_, + [weakSelf, done = std::move(done)](bool switchedToAnotherSecondary) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (switchedToAnotherSecondary) { + done(); + return; + } + + self->checkSwitchBackToPrimaryAsync(std::move(done), false); + }); + }); + } + + void checkSwitchBackToPrimaryAsync(CompletionCallback done, std::optional primaryAvailableHint) { + auto handlePrimaryAvailable = [weakSelf = weak_from_this(), + done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (!primaryAvailable) { + self->consecutivePrimaryRecoveryCount_ = 0; + done(); + return; + } + + if (++self->consecutivePrimaryRecoveryCount_ >= self->config_.switchBackThreshold) { + self->switchTo(&self->config_.primary); + } + done(); + }; + + if (primaryAvailableHint.has_value()) { + handlePrimaryAvailable(*primaryAvailableHint); + return; + } + + probeAvailableAsync(config_.primary, std::move(handlePrimaryAvailable)); + } + + void checkSecondaryAndPrimaryAsync(CompletionCallback done) { + auto weakSelf = weak_from_this(); + probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool secondaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + LOG_DEBUG("Detected secondary " << self->current().serviceUrl() + << " availability: " << secondaryAvailable); + if (secondaryAvailable) { + self->consecutiveFailureCount_ = 0; + self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); + return; + } + + if (++self->consecutiveFailureCount_ < self->config_.failoverThreshold) { + self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); + return; + } + + self->failoverFromUnavailableSecondaryAsync(std::move(done)); + }); + } +}; + +AutoClusterFailover::AutoClusterFailover(Config&& config) + : impl_(std::make_shared(std::move(config))) {} + +AutoClusterFailover::~AutoClusterFailover() {} + +ServiceInfo AutoClusterFailover::initialServiceInfo() { return impl_->primary(); } + +void AutoClusterFailover::initialize(std::function onServiceInfoUpdate) { + impl_->initialize(std::move(onServiceInfoUpdate)); +} + +} // namespace pulsar diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 82f5f6f7..175c5319 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -17,16 +17,20 @@ * under the License. */ #include +#include #include #include +#include #include #include #include #include +#include #include "PulsarFriend.h" #include "WaitUtils.h" +#include "lib/AsioDefines.h" #include "lib/LogUtils.h" DECLARE_LOG_OBJECT() @@ -34,6 +38,113 @@ DECLARE_LOG_OBJECT() using namespace pulsar; using namespace std::chrono_literals; +namespace { + +class ProbeTcpServer { + public: + ProbeTcpServer() { start(); } + + ~ProbeTcpServer() { stop(); } + + void start() { + if (running_) { + return; + } + + auto ioContext = std::unique_ptr(new ASIO::io_context); + auto acceptor = std::unique_ptr(new ASIO::ip::tcp::acceptor(*ioContext)); + ASIO::ip::tcp::endpoint endpoint{ASIO::ip::tcp::v4(), static_cast(port_)}; + acceptor->open(endpoint.protocol()); + acceptor->set_option(ASIO::ip::tcp::acceptor::reuse_address(true)); + acceptor->bind(endpoint); + acceptor->listen(); + + port_ = acceptor->local_endpoint().port(); + ioContext_ = std::move(ioContext); + acceptor_ = std::move(acceptor); + running_ = true; + + scheduleAccept(); + serverThread_ = std::thread([this] { ioContext_->run(); }); + } + + void stop() { + if (!running_.exchange(false)) { + return; + } + + ASIO::post(*ioContext_, [this] { + ASIO_ERROR ignored; + if (acceptor_ && acceptor_->is_open()) { + acceptor_->close(ignored); + } + }); + + if (serverThread_.joinable()) { + serverThread_.join(); + } + + acceptor_.reset(); + ioContext_.reset(); + } + + std::string getServiceUrl() const { return "pulsar://127.0.0.1:" + std::to_string(port_); } + + private: + void scheduleAccept() { + if (!running_ || !acceptor_ || !acceptor_->is_open()) { + return; + } + + auto socket = std::make_shared(*ioContext_); + acceptor_->async_accept(*socket, [this, socket](const ASIO_ERROR &error) { + if (!error) { + ASIO_ERROR ignored; + socket->close(ignored); + } + + if (running_ && acceptor_ && acceptor_->is_open()) { + scheduleAccept(); + } + }); + } + + int port_{0}; + std::atomic_bool running_{false}; + std::unique_ptr ioContext_; + std::unique_ptr acceptor_; + std::thread serverThread_; +}; + +class ServiceUrlObserver { + public: + void onUpdate(const ServiceInfo &serviceInfo) { + std::lock_guard lock(mutex_); + serviceUrls_.emplace_back(serviceInfo.serviceUrl()); + } + + size_t size() const { + std::lock_guard lock(mutex_); + return serviceUrls_.size(); + } + + std::string last() const { + std::lock_guard lock(mutex_); + return serviceUrls_.empty() ? std::string() : serviceUrls_.back(); + } + + std::vector snapshot() const { + std::lock_guard lock(mutex_); + return serviceUrls_; + } + + private: + mutable std::mutex mutex_; + std::vector serviceUrls_; +}; + +} // namespace + class ServiceInfoHolder { public: ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {} @@ -93,6 +204,113 @@ class TestServiceInfoProvider : public ServiceInfoProvider { mutable std::mutex mutex_; }; +TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) { + ProbeTcpServer availableSecondary; + ProbeTcpServer unavailableSecondary; + const auto primaryUrl = unavailableSecondary.getServiceUrl(); + unavailableSecondary.stop(); + + ProbeTcpServer skippedSecondary; + const auto skippedSecondaryUrl = skippedSecondary.getServiceUrl(); + skippedSecondary.stop(); + + const auto availableSecondaryUrl = availableSecondary.getServiceUrl(); + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), + {ServiceInfo(skippedSecondaryUrl), ServiceInfo(availableSecondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverThreshold(6) + .withSwitchBackThreshold(6) + .build(); + + ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl); + + observer.onUpdate(provider.initialServiceInfo()); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_FALSE(waitUntil( + 80ms, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); + ASSERT_TRUE(waitUntil( + 2s, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 2u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], availableSecondaryUrl); +} + +TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) { + ProbeTcpServer primary; + const auto primaryUrl = primary.getServiceUrl(); + primary.stop(); + + ProbeTcpServer secondary; + const auto secondaryUrl = secondary.getServiceUrl(); + + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(secondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverThreshold(4) + .withSwitchBackThreshold(6) + .build(); + + observer.onUpdate(provider.initialServiceInfo()); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); + + primary.start(); + + ASSERT_FALSE(waitUntil(80ms, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); + ASSERT_TRUE(waitUntil(2s, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 3u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], secondaryUrl); + ASSERT_EQ(updates[2], primaryUrl); +} + +TEST(AutoClusterFailoverTest, testFailoverToAnotherSecondaryWhenCurrentSecondaryIsUnavailable) { + ProbeTcpServer primary; + const auto primaryUrl = primary.getServiceUrl(); + primary.stop(); + + ProbeTcpServer firstSecondary; + const auto firstSecondaryUrl = firstSecondary.getServiceUrl(); + + ProbeTcpServer secondSecondary; + const auto secondSecondaryUrl = secondSecondary.getServiceUrl(); + + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), + {ServiceInfo(firstSecondaryUrl), ServiceInfo(secondSecondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverThreshold(4) + .withSwitchBackThreshold(6) + .build(); + + observer.onUpdate(provider.initialServiceInfo()); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_TRUE( + waitUntil(2s, [&observer, &firstSecondaryUrl] { return observer.last() == firstSecondaryUrl; })); + + firstSecondary.stop(); + + ASSERT_TRUE( + waitUntil(2s, [&observer, &secondSecondaryUrl] { return observer.last() == secondSecondaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 3u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], firstSecondaryUrl); + ASSERT_EQ(updates[2], secondSecondaryUrl); +} + TEST(ServiceInfoProviderTest, testSwitchCluster) { extern std::string getToken(); // from tests/AuthTokenTest.cc // Access "private/auth" namespace in cluster 1