9 template <
typename _Ty >
45 bool wait(
double sec = -1 )
65 TaskCtx() : mtxTask(true), cdtTask(true), status(
taskPending), pool(nullptr), posted(false), aborted(false), prevTask(nullptr) { }
73 template <
typename _Ty >
78 template <
typename... _ArgType >
96 this->val = ivk->invoke();
100 std::cout << e.
what() << std::endl;
105 std::cout <<
"unknown" << std::endl;
120 template <
typename... _ArgType >
141 std::cout << e.
what() << std::endl;
146 std::cout <<
"unknown" << std::endl;
163 explicit ThreadPool() : _mtxPool(true), _cdtPool(true), _poolStop(false), _taskChainCount(0)
170 explicit ThreadPool(
int threadCount ) : _mtxPool(true), _cdtPool(true), _poolStop(false), _taskChainCount(0)
172 this->startup(threadCount);
177 this->whenEmptyStopAndWait();
183 _group.create( threadCount, [
this] () {
189 while ( _queueTask.empty() && !_poolStop )
191 _cdtPool.wait(_mtxPool);
193 if ( !_queueTask.empty() )
195 taskCtx = _queueTask.front();
198 _cdtPool.notifyAll();
205 taskCtx->routineForPool->run();
207 taskCtx->posted =
false;
216 template <
typename _Fx,
typename... _ArgType >
227 _cdtPool.notifyAll();
233 return _group.wait(sec);
243 _cdtPool.waitUntil( [
this] () {
return _queueTask.empty(); }, _mtxPool );
245 if ( this->_taskChainCount <= 0 )
break;
255 ScopeGuard guard( const_cast<Mutex &>(_mtxPool) );
256 return _queueTask.size();
262 return ++_taskChainCount;
268 return --_taskChainCount;
276 _queueTask.push(taskCtx);
284 std::queue< SharedPointer<TaskCtx> > _queueTask;
299 this->
pool->_postTask(p);
317 template <
typename _Ty >
324 template <
typename _Fx,
typename... _ArgType >
334 taskCtx->
exec( routine.get() );
342 }, _taskCtx.get() ) );
346 template <
typename _Fx,
typename... _ArgType >
351 _taskCtx->prevTask = prevTaskCtx.get();
357 taskCtx->
exec( routine.get() );
360 prevTaskCtx->nextTask.reset();
368 }, prevTaskCtx, _taskCtx.get() ) );
373 template <
typename _Fx,
typename... _ArgType >
378 _taskCtx->prevTask = prevTaskCtx.get();
381 auto routine =
MakeSimple(
NewRunable( fnRoutine, obj, std::forward<_ArgType>(argRoutine)... ) );
384 taskCtx->
exec( routine.get() );
387 prevTaskCtx->nextTask.reset();
395 }, prevTaskCtx, _taskCtx.get() ) );
400 template <
typename _Ty2,
typename _Fx,
typename... _ArgType >
405 _taskCtx->prevTask = prevTaskCtx.get();
408 auto routine =
MakeSimple(
NewRunable( fnRoutine, _Ty2(), std::forward<_ArgType>(argRoutine)... ) );
411 std::get<0>(routine->_tuple) = std::move(prevTaskCtx->val);
412 taskCtx->
exec( routine.get() );
415 prevTaskCtx->nextTask.reset();
423 }, prevTaskCtx, _taskCtx.get() ) );
428 template <
typename _Ty2,
typename _Fx,
typename... _ArgType >
433 _taskCtx->prevTask = prevTaskCtx.get();
436 auto routine =
MakeSimple(
NewRunable( fnRoutine, obj, _Ty2(), std::forward<_ArgType>(argRoutine)... ) );
439 std::get<1>(routine->_tuple) = std::move(prevTaskCtx->val);
440 taskCtx->
exec( routine.get() );
443 prevTaskCtx->nextTask.reset();
451 }, prevTaskCtx, _taskCtx.get() ) );
460 template <
typename _Fx,
typename... _ArgType >
476 cur = _taskCtx.get();
477 while ( cur !=
nullptr )
485 cur = _taskCtx.get();
499 this->_taskCtx->pool->incTaskChainCount();
514 return _taskCtx->get();
520 template <
typename _Ty0 >
Task & post()
任务必须投递,否则不会被执行
RunableT< _Fx, std::tuple< typename std::decay< _ArgType >::type... > > * NewRunable(_Fx fn, _ArgType &&...arg)
创建一个Runable对象
void whenEmptyStopAndWait()
当任务队列为空,任务链为0,就停止线程池运行,并等待线程组线程正常退出
Condition cdtTask
用于判断运行状态
size_t getTaskCount() const
队列里的任务数
void exec(RunableInvoker< void > *ivk) noexcept
static SharedPointer< TaskCtxT > Create(_ArgType &&...arg)
Task(ThreadPool *pool, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor1 创建一个起始任务,需要提供一个线程池
bool wait(double sec=-1)
wait(sec>0)等待一定的时间长用于等待任务运行。当调用stop()后,wait(sec<0)等待线程组线程全部正常退出 ...
Task< typename FuncTraits< _Fx >::ReturnType > task(_Fx fn, _ArgType &&...arg)
创建一个新任务
SharedPointer< TaskCtx > nextTask
下一个任务,执行完本任务后应该投递到任务池中
void updateStatus(TaskStatus st, bool isNotifyAll=false)
更新运行状态
#define DISABLE_OBJECT_COPY(clsname)
Task< typename FuncTraits< _Fx >::ReturnType > then(_Fx fn, _ArgType &&...arg)
创建一个后续任务
Task(SharedPointer< TaskCtxT< void > > prevTaskCtx, _Fx fnRoutine, typename FuncTraits< _Fx >::ClassType *obj, _ArgType &&...argRoutine)
Ctor2-2 给一个任务创建一个后续任务 - 类方法执行
bool wait(double sec=-1)
等待任务结束
bool aborted
是否中止,任务中止则不投递nextTask
bool waitUntil(_Predicate pred, Mutex &mutex, double sec=-1)
等待谓词条件达成
Task(SharedPointer< TaskCtxT< _Ty2 > > prevTaskCtx, _Fx fnRoutine, typename FuncTraits< _Fx >::ClassType *obj, _ArgType &&...argRoutine)
Ctor3-2 给一个任务创建一个后续任务,并把上一个任务返回值移动给后续任务 - 类方法执行 ...
TaskCtx * getStartTask()
获取起始任务
TaskCtxT(ThreadPool *pool, TaskCtx::TaskStatus status=TaskCtx::taskPending)
线程池,创建一组线程等待着从任务队列中获取任务执行
void tryPostNext()
尝试投递后续任务,如果有的话
Task(SharedPointer< TaskCtxT< void > > prevTaskCtx, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor2-1 给一个任务创建一个后续任务
virtual char const * what() const
代表投递到线程池的一个任务,用于等待执行完毕获取返回值或者接着投递下一个任务
bool posted
是否已被投递,只有起始任务才能被投递
void exec(RunableInvoker< _Ty > *ivk) noexcept
static SharedPointer< TaskCtxT > Create(_ArgType &&...arg)
ThreadPool(int threadCount)
构造函数1
void wait(double sec=-1)
等待任务执行完毕
SimplePointer< _Ty > MakeSimple(_Ty *newObj)
int notifyAll()
通知所有正在wait()中的线程醒来
WeakPointer< TaskCtx > weakThis
自己的弱引用
TaskCtxT(ThreadPool *pool, TaskCtx::TaskStatus status=TaskCtx::taskPending)
ThreadPool & startup(int threadCount)
启动指定数量的线程
SimplePointer< Runable > routineForPool
投递到线程池的例程
Task(SharedPointer< TaskCtxT< _Ty2 > > prevTaskCtx, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor3-1 给一个任务创建一个后续任务,并把上一个任务返回值移动给后续任务