Skip to content

Conversation

@BiteTheDDDDt
Copy link
Contributor

What problem does this PR solve?

=================================================================
==3150626==ERROR: AddressSanitizer: heap-use-after-free on address 0x7c6358d59ac8 at pc 0x5592a812e6b9 bp 0x7b48314d57d0 sp 0x7b48314d57c8
READ of size 8 at 0x7c6358d59ac8 thread T1749
    #0 0x5592a812e6b8 in std::__uniq_ptr_impl<std::mutex, std::default_delete<std::mutex>>::_M_ptr() const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:193:51
    #1 0x5592a812e6b8 in std::unique_ptr<std::mutex, std::default_delete<std::mutex>>::get() const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:473:21
    #2 0x5592a812e6b8 in std::unique_ptr<std::mutex, std::default_delete<std::mutex>>::operator*() const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:457:10
    #3 0x5592a812e6b8 in doris::pipeline::ExchangeSinkBuffer::_send_rpc(doris::pipeline::RpcInstance&) /root/doris/be/build_ASAN/../src/pipeline/exec/exchange_sink_buffer.cpp:235:39
    #4 0x5592a8135a2a in doris::pipeline::ExchangeSinkBuffer::_send_rpc(doris::pipeline::RpcInstance&)::$_4::operator()(doris::pipeline::RpcInstance*, bool const&, doris::PTransmitDataResult const&, long const&) const /root/doris/be/build_ASAN/../src/pipeline/exec/exchange_sink_buffer.cpp:486:17
    #5 0x5592a8135a2a in void std::__invoke_impl<void, doris::pipeline::ExchangeSinkBuffer::_send_rpc(doris::pipeline::RpcInstance&)::$_4&, doris::pipeline::RpcInstance*, bool const&, doris::PTransmitDataResult const&, long const&>(std::__invoke_other, doris::pipeline::ExchangeSinkBuffer::_send_rpc(doris::pipeline::RpcInstance&)::$_4&, doris::pipeline::RpcInstance*&&, bool const&, doris::PTransmitDataResult const&, long const&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:63:14
    #6 0x5592a8135a2a in std::enable_if<is_invocable_r_v<void, doris::pipeline::ExchangeSinkBuffer::_send_rpc(doris::pipeline::RpcInstance&)::$_4&, doris::pipeline::RpcInstance*, bool const&, doris::PTransmitDataResult const&, long const&>, void>::type std::__invoke_r<void, doris::pipeline::ExchangeSinkBuffer::_send_rpc(doris::pipeline::RpcInstance&)::$_4&, doris::pipeline::RpcInstance*, bool const&, doris::PTransmitDataResult const&, long const&>(doris::pipeline::ExchangeSinkBuffer::_send_rpc(doris::pipeline::RpcInstance&)::$_4&, doris::pipeline::RpcInstance*&&, bool const&, doris::PTransmitDataResult const&, long const&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:113:2
    #7 0x5592a8135a2a in std::_Function_handler<void (doris::pipeline::RpcInstance*, bool const&, doris::PTransmitDataResult const&, long const&), doris::pipeline::ExchangeSinkBuffer::_send_rpc(doris::pipeline::RpcInstance&)::$_4>::_M_invoke(std::_Any_data const&, doris::pipeline::RpcInstance*&&, bool const&, doris::PTransmitDataResult const&, long const&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292:9
    #8 0x5592a815097c in std::function<void (doris::pipeline::RpcInstance*, bool const&, doris::PTransmitDataResult const&, long const&)>::operator()(doris::pipeline::RpcInstance*, bool const&, doris::PTransmitDataResult const&, long const&) const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:593:9
    #9 0x5592a815097c in doris::pipeline::ExchangeSendCallback<doris::PTransmitDataResult>::call() /root/doris/be/build_ASAN/../src/pipeline/exec/exchange_sink_buffer.h:204:17
    #10 0x5592a816020c in doris::AutoReleaseClosure<doris::PTransmitDataParams, doris::pipeline::ExchangeSendCallback<doris::PTransmitDataResult>>::Run() /root/doris/be/build_ASAN/../src/util/brpc_closure.h:102:18
    #11 0x55928d232f3b in doris::FailureDetectClosure::Run() /root/doris/be/build_ASAN/../src/util/brpc_client_cache.h:90:20
    #12 0x5592aca1ff9e in brpc::Controller::EndRPC(brpc::Controller::CompletionInfo const&) (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x4682bf9e)
    #13 0x5592aca4f467 in brpc::policy::ProcessRpcResponse(brpc::InputMessageBase*) (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x4685b467)
    #14 0x5592aca46426 in brpc::ProcessInputMessage(void*) (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x46852426)
    #15 0x5592aca46f20 in brpc::InputMessenger::InputMessageClosure::~InputMessageClosure() (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x46852f20)
    #16 0x5592aca478dd in brpc::InputMessenger::OnNewMessages(brpc::Socket*) (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x468538dd)
    #17 0x5592acb8faec in brpc::Socket::ProcessEvent(void*) (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x4699baec)
    #18 0x5592ac9cfe76 in bthread::TaskGroup::task_runner(long) (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x467dbe76)
    #19 0x5592ac9baae0 in bthread_make_fcontext (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x467c6ae0)

0x7c6358d59ac8 is located 8 bytes inside of 200-byte region [0x7c6358d59ac0,0x7c6358d59b88)
freed by thread T998 (brpc_light) here:
    #0 0x55928cd09f12 in operator delete(void*, unsigned long) (/home/work/unlimit_teamcity/TeamCity/Agents/20260212144004agent_172.17.0.180_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x26b15f12)
    #1 0x5592a814cde1 in std::default_delete<doris::pipeline::RpcInstance>::operator()(doris::pipeline::RpcInstance*) const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:93:2
    #2 0x5592a814cde1 in std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>::~unique_ptr() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:399:4
    #3 0x5592a814cde1 in std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>::~pair() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_pair.h:302:12
    #4 0x5592a814cde1 in void std::destroy_at<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>(std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>*) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_construct.h:88:15
    #5 0x5592a814cde1 in void std::allocator_traits<std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>::destroy<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>(std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>&, std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>*) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/alloc_traits.h:698:4
    #6 0x5592a814cde1 in decltype(std::allocator_traits<std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>::destroy(fp0, fp1)) phmap::allocator_traits<std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>::destroy_impl<std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>, std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>(int, std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>&, std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>*) /var/local/thirdparty/installed/include/parallel_hashmap/phmap_base.h:1542:9
    #7 0x5592a814cde1 in void phmap::allocator_traits<std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>::destroy<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>(std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>&, std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>*) /var/local/thirdparty/installed/include/parallel_hashmap/phmap_base.h:1500:9
    #8 0x5592a814cde1 in void phmap::priv::map_slot_policy<long, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>::destroy<std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>(std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>*, phmap::priv::map_slot_type<long, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>*) /var/local/thirdparty/installed/include/parallel_hashmap/phmap_base.h:4690:13
    #9 0x5592a814c998 in void phmap::priv::FlatHashMapPolicy<long, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>::destroy<std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>(std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>*, phmap::priv::map_slot_type<long, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>*) /var/local/thirdparty/installed/include/parallel_hashmap/phmap.h:4209:9
    #10 0x5592a814c998 in void phmap::priv::hash_policy_traits<phmap::priv::FlatHashMapPolicy<long, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>, void>::destroy<std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>(std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>*, phmap::priv::map_slot_type<long, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>*) /var/local/thirdparty/installed/include/parallel_hashmap/phmap_base.h:479:9
    #11 0x5592a814c998 in phmap::priv::raw_hash_set<phmap::priv::FlatHashMapPolicy<long, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>, phmap::Hash<long>, phmap::EqualTo<long>, std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>::destroy_slots() /var/local/thirdparty/installed/include/parallel_hashmap/phmap.h:1995:17
    #12 0x5592a814c7e5 in phmap::priv::raw_hash_set<phmap::priv::FlatHashMapPolicy<long, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>, phmap::Hash<long>, phmap::EqualTo<long>, std::allocator<std::pair<long const, std::unique_ptr<doris::pipeline::RpcInstance, std::default_delete<doris::pipeline::RpcInstance>>>>>::~raw_hash_set() /var/local/thirdparty/installed/include/parallel_hashmap/phmap.h:1236:23
    #13 0x5592a814c7e5 in doris::pipeline::ExchangeSinkBuffer::~ExchangeSinkBuffer() /root/doris/be/build_ASAN/../src/pipeline/exec/exchange_sink_buffer.h:275:44
    #14 0x5592a814c84d in doris::pipeline::ExchangeSinkBuffer::~ExchangeSinkBuffer() /root/doris/be/build_ASAN/../src/pipeline/exec/exchange_sink_buffer.h:275:44
    #15 0x55928cd38181 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:345:8
    #16 0x5592a80faa14 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1069:11
    #17 0x5592a80faa14 in std::__shared_ptr<doris::pipeline::ExchangeSinkBuffer, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1531:31
    #18 0x5592a80faa14 in doris::pipeline::ExchangeSinkOperatorX::~ExchangeSinkOperatorX() /root/doris/be/build_ASAN/../src/pipeline/exec/exchange_sink_operator.h:190:7
    #19 0x55928cd38181 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:345:8
    #20 0x5592aa75a7bf in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1069:11
    #21 0x5592aa75a7bf in std::__shared_ptr<doris::pipeline::DataSinkOperatorXBase, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1531:31
    #22 0x5592aa75a7bf in std::__shared_ptr<doris::pipeline::DataSinkOperatorXBase, (__gnu_cxx::_Lock_policy)2>::reset() /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1649:9
    #23 0x5592aa75a7bf in doris::pipeline::PipelineFragmentContext::_release_resource() /root/doris/be/build_ASAN/../src/pipeline/pipeline_fragment_context.cpp:2111:11
    #24 0x5592aa7ab4ec in doris::pipeline::PipelineFragmentContext::set_to_rerun() /root/doris/be/build_ASAN/../src/pipeline/pipeline_fragment_context.cpp:2084:5
    #25 0x55929194cd35 in doris::FragmentMgr::rerun_fragment(doris::TUniqueId const&, int, doris::PRerunFragmentParams_Opcode) /root/doris/be/build_ASAN/../src/runtime/fragment_mgr.cpp:1460:34
    #26 0x55929229716a in doris::PInternalService::rerun_fragment(google::protobuf::RpcController*, doris::PRerunFragmentParams const*, doris::PRerunFragmentResult*, google::protobuf::Closure*)::$_0::operator()() const /root/doris/be/build_ASAN/../src/service/internal_service.cpp:1650:44
    #27 0x55929229716a in void std::__invoke_impl<void, doris::PInternalService::rerun_fragment(google::protobuf::RpcController*, doris::PRerunFragmentParams const*, doris::PRerunFragmentResult*, google::protobuf::Closure*)::$_0&>(std::__invoke_other, doris::PInternalService::rerun_fragment(google::protobuf::RpcController*, doris::PRerunFragmentParams const*, doris::PRerunFragmentResult*, google::protobuf::Closure*)::$_0&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:63:14
    #28 0x55929229716a in std::enable_if<is_invocable_r_v<void, doris::PInternalService::rerun_fragment(google::protobuf::RpcController*, doris::PRerunFragmentParams const*, doris::PRerunFragmentResult*, google::protobuf::Closure*)::$_0&>, void>::type std::__invoke_r<void, doris::PInternalService::rerun_fragment(google::protobuf::RpcController*, doris::PRerunFragmentParams const*, doris::PRerunFragmentResult*, google::protobuf::Closure*)::$_0&>(doris::PInternalService::rerun_fragment(google::protobuf::RpcController*, doris::PRerunFragmentParams const*, doris::PRerunFragmentResult*, google::protobuf::Closure*)::$_0&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:113:2
    #29 0x55929229716a in std::_Function_handler<void (), doris::PInternalService::rerun_fragment(google::protobuf::RpcController*, doris::PRerunFragmentParams const*, doris::PRerunFragmentResult*, google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292:9
    #30 0x5592922e048b in std::function<void ()>::operator()() const /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:593:9
    #31 0x5592922e048b in doris::WorkThreadPool<false>::work_thread(int) /root/doris/be/build_ASAN/../src/util/work_thread_pool.hpp:159:17
    #32 0x5592bbf4d4ff in execute_native_thread_routine archive64.c

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

Copilot AI review requested due to automatic review settings February 13, 2026 04:29
@hello-stephen
Copy link
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to fix an ASAN-reported heap-use-after-free in ExchangeSinkBuffer::_send_rpc by changing how async BRPC callbacks decide whether it’s still safe to access ExchangeSinkBuffer state.

Changes:

  • Switch BRPC callback liveness checks from TaskExecutionContext weak locking to using ExchangeSinkBuffer’s weak_from_this().
  • Remove HasTaskExecutionCtx inheritance from ExchangeSinkBuffer.
  • Simplify _turn_off_channel() to always notify _parents without a prior liveness check.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
be/src/pipeline/exec/exchange_sink_buffer.h Changes ExchangeSinkBuffer inheritance to use enable_shared_from_this and removes HasTaskExecutionCtx.
be/src/pipeline/exec/exchange_sink_buffer.cpp Updates async callback handlers to lock weak_from_this() and adjusts _turn_off_channel() behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

parent->on_channel_finished(ins.id);
}
for (auto& parent : _parents) {
parent->on_channel_finished(ins.id);
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_parents stores raw ExchangeSinkLocalState*, and this method can be reached from async RPC callbacks. Removing the previous liveness guard and unconditionally calling parent->on_channel_finished(...) can lead to use-after-free if the local state has already been destroyed (even if the sink buffer is still alive). Please reintroduce a safe lifetime check (e.g., lock a TaskExecutionContext/other owning context before iterating) or change _parents to a lifetime-safe reference (or clear/unregister parents under lock during close) before calling into them.

Suggested change
parent->on_channel_finished(ins.id);
if (parent != nullptr) {
parent->on_channel_finished(ins.id);
}

Copilot uses AI. Check for mistakes.
closure);
#endif
class ExchangeSinkBuffer : public HasTaskExecutionCtx {
class ExchangeSinkBuffer : std::enable_shared_from_this<ExchangeSinkBuffer> {
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::enable_shared_from_this should be inherited publicly. With the current declaration class ExchangeSinkBuffer : std::enable_shared_from_this<...>, the inheritance is private, which prevents std::shared_ptr from properly wiring up the internal weak pointer and can make weak_from_this() always return empty. Since this PR relies on weak_from_this() to guard async callbacks, please change it to public std::enable_shared_from_this<ExchangeSinkBuffer> so the lifetime check works as intended (and to avoid future bad_weak_ptr issues if shared_from_this() is used).

Suggested change
class ExchangeSinkBuffer : std::enable_shared_from_this<ExchangeSinkBuffer> {
class ExchangeSinkBuffer : public std::enable_shared_from_this<ExchangeSinkBuffer> {

Copilot uses AI. Check for mistakes.
@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants