diff --git a/src/StfSender/StfSenderDevice.cxx b/src/StfSender/StfSenderDevice.cxx index 4c0fa14..f70810c 100644 --- a/src/StfSender/StfSenderDevice.cxx +++ b/src/StfSender/StfSenderDevice.cxx @@ -318,16 +318,19 @@ void StfSenderDevice::ResetTask() I().mFileSink->stop(); } - // Stop output handler + // Stop the RPC server first, so no new connect/disconnect/data requests can + // reference the output handler while it is being torn down. + if (!standalone()) { + I().mRpcServer->stop(); + } + + // Stop output handler (also required for standalone/flp-only runs) if (I().mOutputHandler) { I().mOutputHandler->stop(); } if (!standalone()) { - // Stop the RPC server after output - I().mRpcServer->stop(); - - // Stop the Scheduler RPC client + // Stop the Scheduler RPC client I().mTfSchedulerRpcClient.stop(); } diff --git a/src/StfSender/StfSenderOutputUCX.cxx b/src/StfSender/StfSenderOutputUCX.cxx index 22c7f07..b7891f8 100644 --- a/src/StfSender/StfSenderOutputUCX.cxx +++ b/src/StfSender/StfSenderOutputUCX.cxx @@ -132,6 +132,21 @@ void StfSenderOutputUCX::stop() if (mDeallocThread.joinable()) { mDeallocThread.join(); } + + // join any pending async endpoint-close threads while the workers/context are still valid + { + std::vector lCloseThreads; + { + std::scoped_lock lLock(mDisconnectThreadsLock); + lCloseThreads = std::move(mDisconnectThreads); + mDisconnectThreads.clear(); + } + for (auto &lThread : lCloseThreads) { + if (lThread.joinable()) { + lThread.join(); + } + } + } DDDLOG("StfSenderOutputUCX::stop: stopped all threads."); // close all connections @@ -244,8 +259,11 @@ bool StfSenderOutputUCX::disconnectTfBuilder(const std::string &pTfBuilderId) lConnInfo = std::move(lConnInfoNode.mapped()); } - // Transport is only closed when other side execute close as well. Execute async - std::thread([this, pConnInfo = std::move(lConnInfo), pTfBuilderId](){ + // Transport is only closed when other side execute close as well. Execute async. + // NOTE: do not detach. The thread uses our UCX workers and member state, so it must + // be joined in stop() before the workers/context are destroyed (otherwise teardown at + // end of run races the close and segfaults). See https://its.cern.ch/jira/browse/R3C-1147 + std::thread lCloseThread([this, pConnInfo = std::move(lConnInfo), pTfBuilderId](){ DDDLOG("StfSenderOutputUCX::disconnectTfBuilder: closing transport for tf_builder={}", pTfBuilderId); // acquire the lock and close the connection std::unique_lock lTfSenderLock(pConnInfo->mTfBuilderLock); @@ -256,8 +274,12 @@ bool StfSenderOutputUCX::disconnectTfBuilder(const std::string &pTfBuilderId) std::scoped_lock lLockTfBuilders(mStfsInFlightMutex); mDisconnectedTfBuilders.insert(pTfBuilderId); } + }); - }).detach(); + { + std::scoped_lock lLock(mDisconnectThreadsLock); + mDisconnectThreads.emplace_back(std::move(lCloseThread)); + } return true; } diff --git a/src/StfSender/StfSenderOutputUCX.h b/src/StfSender/StfSenderOutputUCX.h index 1bd7834..9e8545e 100644 --- a/src/StfSender/StfSenderOutputUCX.h +++ b/src/StfSender/StfSenderOutputUCX.h @@ -68,6 +68,20 @@ class StfSenderOutputUCX : public ISubTimeFrameConstVisitor { StfSenderOutputUCX(std::shared_ptr pDiscoveryConfig, StdSenderOutputCounters &pCounters); + ~StfSenderOutputUCX() { + // safety net: make sure no async close thread outlives the object if stop() was skipped + std::vector lCloseThreads; + { + std::scoped_lock lLock(mDisconnectThreadsLock); + lCloseThreads = std::move(mDisconnectThreads); + } + for (auto &lThread : lCloseThreads) { + if (lThread.joinable()) { + lThread.join(); + } + } + } + bool start(); void stop(); @@ -212,6 +226,11 @@ class StfSenderOutputUCX : public ISubTimeFrameConstVisitor { ConcurrentFifo> mSendRequestQueue; + /// async endpoint-close threads spawned by disconnectTfBuilder() + /// tracked so stop() can join them before destroying the workers/context + std::mutex mDisconnectThreadsLock; + std::vector mDisconnectThreads; + /// map of STFs waiting on transfers std::mutex mStfsInFlightMutex; std::map> mStfsInFlight;