Skip to content

Commit da2da2d

Browse files
smithdhamadio
authored andcommitted
[Cl] Fix a possible deadlock in extreme copy. Fix xrootd#2619
1 parent 1024599 commit da2da2d

File tree

4 files changed

+97
-22
lines changed

4 files changed

+97
-22
lines changed

src/XrdCl/XrdClXCpCtx.cc

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ XCpCtx::XCpCtx( const std::vector<std::string> &urls, uint64_t blockSize, uint8_
3737
pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
3838
pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
3939
pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40-
pDoneCV( 0 ), pRefCount( 1 )
40+
pDoneCV( 0 ), pRefCount( 1 ), pDeleteCV( 0 ), pDelete( false )
4141
{
4242
SetFileSize( fileSize );
4343
}
@@ -65,11 +65,12 @@ bool XCpCtx::GetNextUrl( std::string & url )
6565

6666
XCpSrc* XCpCtx::WeakestLink( XCpSrc *exclude )
6767
{
68-
XrdSysMutexHelper lck( pMtx );
6968
uint64_t transferRate = -1; // set transferRate to max uint64 value
7069
XCpSrc *ret = 0;
7170

7271
std::list<XCpSrc*>::iterator itr;
72+
XrdSysMutexHelper lck( pMtx );
73+
7374
for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
7475
{
7576
XCpSrc *src = *itr;
@@ -82,7 +83,8 @@ XCpSrc* XCpCtx::WeakestLink( XCpSrc *exclude )
8283
}
8384
}
8485

85-
return ret;
86+
if( !ret ) return ret;
87+
return ret->Self();
8688
}
8789

8890
void XCpCtx::PutChunk( PageInfo* chunk )
@@ -104,10 +106,10 @@ std::pair<uint64_t, uint64_t> XCpCtx::GetBlock()
104106

105107
void XCpCtx::SetFileSize( int64_t size )
106108
{
107-
XrdSysMutexHelper lck( pMtx );
109+
XrdSysCondVarHelper lckcv( pFileSizeCV );
110+
XrdSysMutexHelper lckmtx( pMtx );
108111
if( pFileSize < 0 && size >= 0 )
109112
{
110-
XrdSysCondVarHelper lck( pFileSizeCV );
111113
pFileSize = size;
112114
pFileSizeCV.Broadcast();
113115

@@ -125,10 +127,23 @@ XRootDStatus XCpCtx::Initialize()
125127
{
126128
XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this );
127129
pSources.push_back( src );
128-
src->Start();
129130
}
130131

131-
if( pSources.empty() )
132+
auto scpy = pSources;
133+
bool ok = false;
134+
for(auto src: scpy) {
135+
if( src->Start() )
136+
{
137+
// src destructor will remove src from pSources
138+
src->Delete();
139+
}
140+
else
141+
{
142+
ok = true;
143+
}
144+
}
145+
146+
if( !ok )
132147
{
133148
Log *log = DefaultEnv::GetLog();
134149
log->Error( UtilityMsg, "Failed to initialize (failed to create new threads)" );
@@ -190,6 +205,8 @@ size_t XCpCtx::GetRunning()
190205
// count active sources
191206
size_t nbRunning = 0;
192207
std::list<XCpSrc*>::iterator itr;
208+
XrdSysMutexHelper lck( pMtx );
209+
193210
for( itr = pSources.begin() ; itr != pSources.end() ; ++ itr)
194211
if( (*itr)->IsRunning() )
195212
++nbRunning;

src/XrdCl/XrdClXCpCtx.hh

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,44 @@ class XCpCtx
5656
XCpCtx( const std::vector<std::string> &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize );
5757

5858
/**
59-
* Deletes the instance if the reference counter reached 0.
59+
* Decrements the reference count and then waits for it to reach
60+
* zero, then deletes the instance. Should only be called once.
6061
*/
6162
void Delete()
6263
{
63-
XrdSysMutexHelper lck( pMtx );
64+
XrdSysMutexHelper lckmtx( pMtx );
6465
--pRefCount;
6566
if( !pRefCount )
6667
{
67-
lck.UnLock();
68+
lckmtx.UnLock();
6869
delete this;
70+
return;
71+
}
72+
lckmtx.UnLock();
73+
74+
XrdSysCondVarHelper lckcv( pDoneCV );
75+
pDone = true;
76+
pDoneCV.Broadcast();
77+
lckcv.UnLock();
78+
79+
lckcv.Lock( &pDeleteCV );
80+
while( !pDelete ) pDeleteCV.Wait();
81+
lckcv.UnLock();
82+
delete this;
83+
}
84+
85+
/**
86+
* Decrements the reference count and signal when we reach 0
87+
*/
88+
void Release()
89+
{
90+
XrdSysMutexHelper lck( pMtx );
91+
--pRefCount;
92+
if( !pRefCount )
93+
{
94+
XrdSysCondVarHelper lckcv( pDeleteCV );
95+
pDelete = true;
96+
pDeleteCV.Broadcast();
6997
}
7098
}
7199

@@ -257,6 +285,7 @@ class XCpCtx
257285
/**
258286
* File Size conditional variable.
259287
* (notifies waiters if the file size has been set)
288+
* If both required, lock order is pFileSizeCV then pMtx.
260289
*/
261290
XrdSysCondVar pFileSizeCV;
262291

@@ -298,6 +327,18 @@ class XCpCtx
298327
* Reference counter
299328
*/
300329
size_t pRefCount;
330+
331+
/**
332+
* A condition variable, signals Delete() in case we had to block
333+
* the deleter until all sources had finished.
334+
* If both required, lock order is pMtx and then pDeleteCV.
335+
*/
336+
XrdSysCondVar pDeleteCV;
337+
338+
/**
339+
* Predicate for pDeleteCV
340+
*/
341+
bool pDelete;
301342
};
302343

303344
} /* namespace XrdCl */

src/XrdCl/XrdClXCpSrc.cc

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,17 @@ XCpSrc::XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *
117117
XCpSrc::~XCpSrc()
118118
{
119119
pCtx->RemoveSrc( this );
120-
pCtx->Delete();
120+
// we release ctx, it is always Delete() by its creator
121+
// not by us.
122+
pCtx->Release();
121123
}
122124

123-
void XCpSrc::Start()
125+
int XCpSrc::Start()
124126
{
125127
pRunning = true;
126128
int rc = pthread_create( &pThread, 0, Run, this );
127-
if( rc )
128-
{
129-
pRunning = false;
130-
pCtx->RemoveSrc( this );
131-
pCtx->Delete();
132-
}
129+
if( rc ) pRunning = false;
130+
return rc;
133131
}
134132

135133
void* XCpSrc::Run( void* arg )
@@ -466,7 +464,19 @@ void XCpSrc::Steal( XCpSrc *src )
466464
{
467465
if( !src ) return;
468466

469-
XrdSysMutexHelper lck1( pMtx ), lck2( src->pMtx );
467+
// use the address of the mutex to form an
468+
// order for acquiring the locks.
469+
XrdSysMutexHelper lck1, lck2;
470+
if ( std::less{}(&pMtx, &src->pMtx) )
471+
{
472+
lck2.Lock( &src->pMtx );
473+
lck1.Lock( &pMtx );
474+
}
475+
else
476+
{
477+
lck1.Lock( &pMtx );
478+
lck2.Lock( &src->pMtx );
479+
}
470480

471481
Log *log = DefaultEnv::GetLog();
472482
std::string myHost = URL( pUrl ).GetHostName(), srcHost = URL( src->pUrl ).GetHostName();
@@ -572,8 +582,10 @@ XRootDStatus XCpSrc::GetWork()
572582
return XRootDStatus();
573583
}
574584

585+
// WeakestLink() increases ref count on wLink, so decrease after
575586
XCpSrc *wLink = pCtx->WeakestLink( this );
576587
Steal( wLink );
588+
if( wLink ) wLink->Delete();
577589

578590
// if we managed to steal something declare success
579591
if( pCurrentOffset < pBlkEnd || !pRecovered.empty() ) return XRootDStatus();

src/XrdCl/XrdClXCpSrc.hh

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
#include "XrdCl/XrdClSyncQueue.hh"
3030
#include "XrdSys/XrdSysPthread.hh"
3131

32+
#include <atomic>
33+
3234
namespace XrdCl
3335
{
3436

@@ -55,7 +57,7 @@ class XCpSrc
5557
/**
5658
* Creates new thread with XCpSrc::Run as the start routine.
5759
*/
58-
void Start();
60+
int Start();
5961

6062
/**
6163
* Stops the thread.
@@ -87,6 +89,9 @@ class XCpSrc
8789
XCpSrc* Self()
8890
{
8991
XrdSysMutexHelper lck( pMtx );
92+
// if Ctx is trying to increase our ref count it is possible
93+
// we are already in our destrutor, waiting for RemoveSrc().
94+
if( !pRefCount ) return nullptr;
9095
++pRefCount;
9196
return this;
9297
}
@@ -311,7 +316,7 @@ class XCpSrc
311316
/**
312317
* Total number of data transferred from this source.
313318
*/
314-
uint64_t pDataTransfered;
319+
std::atomic<uint64_t> pDataTransfered;
315320

316321
/**
317322
* A map of ongoing transfers (the offset is the key,
@@ -348,7 +353,7 @@ class XCpSrc
348353
* false means the source has been stopped,
349354
* or failed.
350355
*/
351-
bool pRunning;
356+
std::atomic<bool> pRunning;
352357

353358
/**
354359
* The time when we started / restarted chunks

0 commit comments

Comments
 (0)