34 _size(0), _exit(false)
36 _pool.reserve(threads);
58 for(
unsigned int i = 0; i < threads; ++i)
60 _pool.emplace_back([
this]{
61 fu2::unique_function<void()> task;
66 std::unique_lock<std::mutex> lock(_mtx);
67 _new_task.wait(lock, [
this]{
return _size || _exit; });
74 while (_tasks.try_dequeue(task)) { _size--; task(); }
78 if (_tasks.try_dequeue(task))
89 int rc = pthread_setaffinity_np(_pool.back().native_handle(),
sizeof(cpu_set_t), &cpuset);
90 if (rc)
LERROR(
"Error calling pthread_setaffinity_np: " << rc);
96 auto fut =
execute([threads,little](){
97 LINFO(
"Initialized with " << threads << (little ?
" A53 threads." :
" A73 threads.")); });
100 LINFO(
"Initialized with " << threads <<
" threads."); });
109 std::scoped_lock<std::mutex> lock(_mtx);
113 _new_task.notify_all();
114 for (
auto & thread: _pool) thread.join();
135 cv::parallel::ParallelForAPI::FN_parallel_for_body_cb_t body_callback,
136 void * callback_data)
138 LDEBUG(
"Called with " << tasks <<
" tasks");
141 for (
int group = 0; group < tasks; group += itsNumThreads)
143 std::vector<std::future<void>> fvec;
144 int const last = std::min(tasks, group + itsNumThreads);
146 for (
int i = group; i < last; ++i)
147 fvec.emplace_back(itsThreadpool->execute(body_callback, i, i+1, callback_data));
157 return (
int)(size_t)(
void*)pthread_self();
162{
return itsNumThreads; }
167 if (nThreads != 4)
LERROR(
"Only 4 threads supported -- IGNORED request to set " << nThreads);
168 itsNumThreads = nThreads;
virtual int getNumThreads() const override
Get number of threads for concurrency (always 4 on JeVois-Pro, unless changed by setNumThreads() here...
virtual char const * getName() const override
Get the name: 'jevois'.
virtual int getThreadNum() const override
Get some index for the current thread.
virtual void parallel_for(int tasks, cv::parallel::ParallelForAPI::FN_parallel_for_body_cb_t body_callback, void *callback_data) override
Run a parallelized OpenCV task.
ParallelForAPIjevois(ThreadPool *tp)
Constructor from an existing ThreadPool.
virtual int setNumThreads(int nThreads) override
Set number of threads for OpenCV. NOTE: currently does nothing, we always use 4 big A73 threads.
virtual ~ParallelForAPIjevois()
Virtual destructor for save inheritance.
A thread pool with CPU affinity.
auto getPoolSize() -> size_t
Get the pool size.
auto execute(Func &&func, Args &&... args) -> std::future< decltype(func(args...))>
Execute a function and get a future.
ThreadPool(unsigned int threads=std::thread::hardware_concurrency(), bool little=false)
Constructor.
#define LDEBUG(msg)
Convenience macro for users to print out console or syslog messages, DEBUG level.
#define LERROR(msg)
Convenience macro for users to print out console or syslog messages, ERROR level.
#define LINFO(msg)
Convenience macro for users to print out console or syslog messages, INFO level.
std::vector< T > joinall(std::vector< std::future< T > > &fvec, bool multiline=true)
Collect results from several async threads that are all returning a T result.