-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathPooledThreadExecutor.cpp
More file actions
103 lines (83 loc) · 2.33 KB
/
PooledThreadExecutor.cpp
File metadata and controls
103 lines (83 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/core/utils/threading/PooledThreadExecutor.h>
#include <aws/core/utils/threading/ThreadTask.h>
#include <thread>
static const char* POOLED_CLASS_TAG = "PooledThreadExecutor";
using namespace Aws::Utils::Threading;
PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) :
m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy)
{
for (size_t index = 0; index < m_poolSize; ++index)
{
m_threadTaskHandles.push_back(Aws::New<ThreadTask>(POOLED_CLASS_TAG, *this));
}
}
PooledThreadExecutor::~PooledThreadExecutor()
{
WaitUntilStopped();
}
void PooledThreadExecutor::WaitUntilStopped()
{
{
std::lock_guard<std::mutex> locker(m_queueLock);
m_stopped = true;
}
{
std::lock_guard lock(m_queueLock);
for(auto threadTask : m_threadTaskHandles)
{
threadTask->StopProcessingWork();
}
}
m_sync.notify_all();
for (auto threadTask : m_threadTaskHandles)
{
Aws::Delete(threadTask);
}
m_threadTaskHandles.clear();
while(m_tasks.size() > 0)
{
std::function<void()>* fn = m_tasks.front();
m_tasks.pop();
if(fn)
{
Aws::Delete(fn);
}
}
}
bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn)
{
//avoid the need to do copies inside the lock. Instead lets do a pointer push.
std::function<void()>* fnCpy = Aws::New<std::function<void()>>(POOLED_CLASS_TAG, std::forward<std::function<void()>>(fn));
{
std::lock_guard<std::mutex> locker(m_queueLock);
if (m_stopped || (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize))
{
Aws::Delete(fnCpy);
return false;
}
m_tasks.push(fnCpy);
}
m_sync.notify_one();
return true;
}
std::function<void()>* PooledThreadExecutor::PopTask()
{
if (m_tasks.size() > 0)
{
std::function<void()>* fn = m_tasks.front();
if (fn)
{
m_tasks.pop();
return fn;
}
}
return nullptr;
}
bool PooledThreadExecutor::HasTasks() const
{
return m_tasks.size() > 0;
}