JeVois  1.22
JeVois Smart Embedded Machine Vision Toolkit
Share this page:
Loading...
Searching...
No Matches
ThreadPool.C
Go to the documentation of this file.
1// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
2//
3// JeVois Smart Embedded Machine Vision Toolkit - Copyright (C) 2020 by Laurent Itti, the University of Southern
4// California (USC), and iLab at USC. See http://iLab.usc.edu and http://jevois.org for information about this project.
5//
6// This file is part of the JeVois Smart Embedded Machine Vision Toolkit. This program is free software; you can
7// redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software
8// Foundation, version 2. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
10// License for more details. You should have received a copy of the GNU General Public License along with this program;
11// if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
12//
13// Contact information: Laurent Itti - 3641 Watt Way, HNB-07A - Los Angeles, CA 90089-2520 - USA.
14// Tel: +1 213 740 3527 - itti@pollux.usc.edu - http://iLab.usc.edu - http://jevois.org
15// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
16/*! \file */
17
18#ifdef JEVOIS_PRO
19
20// This code modified from:
21
22// Copyright Mateusz Jaworski 2020 - 2020.
23// Distributed under the Boost Software License, Version 1.0.
24// (See accompanying file LICENSE.md or copy at
25// https://www.boost.org/LICENSE_1_0.txt)
26
28#include <jevois/Util/Async.H>
29#include <jevois/Debug/Log.H>
30#include <pthread.h>
31
32// ##############################################################################################################
33jevois::ThreadPool::ThreadPool(unsigned int threads, bool little) :
34 _size(0), _exit(false)
35{
36 _pool.reserve(threads);
37
38#ifdef JEVOIS_PLATFORM
39 // Define a CPU mask:
40 cpu_set_t cpuset;
41 CPU_ZERO(&cpuset);
42
43 // Little cores are 0, 1; big cores are 2,3,4,5:
44 if (little)
45 {
46 CPU_SET(0, &cpuset);
47 CPU_SET(1, &cpuset);
48 }
49 else
50 {
51 CPU_SET(2, &cpuset);
52 CPU_SET(3, &cpuset);
53 CPU_SET(4, &cpuset);
54 CPU_SET(5, &cpuset);
55 }
56#endif
57
58 for(unsigned int i = 0; i < threads; ++i)
59 {
60 _pool.emplace_back([this]{
61 fu2::unique_function<void()> task;
62
63 for (;;)
64 {
65 {
66 std::unique_lock<std::mutex> lock(_mtx);
67 _new_task.wait(lock, [this]{ return _size || _exit; });
68 }
69
70 //if (_exit && !_size)) return; // JEVOIS: this can lock up the destructor...
71
72 if (_exit)
73 {
74 while (_tasks.try_dequeue(task)) { _size--; task(); }
75 return;
76 }
77
78 if (_tasks.try_dequeue(task))
79 {
80 _size--;
81 task();
82 }
83 }
84
85 });
86
87#ifdef JEVOIS_PLATFORM
88 // JeVois: set CPU affinity for this thread:
89 int rc = pthread_setaffinity_np(_pool.back().native_handle(), sizeof(cpu_set_t), &cpuset);
90 if (rc) LERROR("Error calling pthread_setaffinity_np: " << rc);
91#endif
92 }
93
94 // Run a phony job, otherwise the threadpool hangs on destruction if it was never used:
95#ifdef JEVOIS_PLATFORM
96 auto fut = execute([threads,little](){
97 LINFO("Initialized with " << threads << (little ? " A53 threads." : " A73 threads.")); });
98#else
99 auto fut = execute([threads](){
100 LINFO("Initialized with " << threads << " threads."); });
101 (void)little; // keep compiler happy
102#endif
103}
104
105// ##############################################################################################################
107{
108 {
109 std::scoped_lock<std::mutex> lock(_mtx);
110 _exit = true;
111 }
112
113 _new_task.notify_all();
114 for (auto & thread: _pool) thread.join();
115}
116
117// ##############################################################################################################
119{
120 return _pool.size();
121}
122
123// ##############################################################################################################
124// ##############################################################################################################
125// ##############################################################################################################
127{ }
128
129// ##############################################################################################################
132
133// ##############################################################################################################
135 cv::parallel::ParallelForAPI::FN_parallel_for_body_cb_t body_callback,
136 void * callback_data)
137{
138 LDEBUG("Called with " << tasks << " tasks");
139
140 // Dispatch several groups of parallel threads; assumes all tasks take the same time...
141 for (int group = 0; group < tasks; group += itsNumThreads)
142 {
143 std::vector<std::future<void>> fvec;
144 int const last = std::min(tasks, group + itsNumThreads);
145
146 for (int i = group; i < last; ++i)
147 fvec.emplace_back(itsThreadpool->execute(body_callback, i, i+1, callback_data));
148
149 // Wait until all tasks done, throw a single exception if any task threw:
150 jevois::joinall(fvec);
151 }
152}
153
154// ##############################################################################################################
156{
157 return (int)(size_t)(void*)pthread_self(); // no zero-based indexing
158}
159
160// ##############################################################################################################
162{ return itsNumThreads; }
163
164// ##############################################################################################################
166{
167 if (nThreads != 4) LERROR("Only 4 threads supported -- IGNORED request to set " << nThreads);
168 itsNumThreads = nThreads;
169 return nThreads;
170}
171
172// ##############################################################################################################
174{ return "jevois"; }
175
176#endif // JEVOIS_PRO
177
virtual int getNumThreads() const override
Get number of threads for concurrency (always 4 on JeVois-Pro, unless changed by setNumThreads() here...
Definition ThreadPool.C:161
virtual char const * getName() const override
Get the name: 'jevois'.
Definition ThreadPool.C:173
virtual int getThreadNum() const override
Get some index for the current thread.
Definition ThreadPool.C:155
virtual void parallel_for(int tasks, cv::parallel::ParallelForAPI::FN_parallel_for_body_cb_t body_callback, void *callback_data) override
Run a parallelized OpenCV task.
Definition ThreadPool.C:134
ParallelForAPIjevois(ThreadPool *tp)
Constructor from an existing ThreadPool.
Definition ThreadPool.C:126
virtual int setNumThreads(int nThreads) override
Set number of threads for OpenCV. NOTE: currently does nothing, we always use 4 big A73 threads.
Definition ThreadPool.C:165
virtual ~ParallelForAPIjevois()
Virtual destructor for save inheritance.
Definition ThreadPool.C:130
A thread pool with CPU affinity.
Definition ThreadPool.H:48
auto getPoolSize() -> size_t
Get the pool size.
Definition ThreadPool.C:118
auto execute(Func &&func, Args &&... args) -> std::future< decltype(func(args...))>
Execute a function and get a future.
~ThreadPool()
Destructor.
Definition ThreadPool.C:106
ThreadPool(unsigned int threads=std::thread::hardware_concurrency(), bool little=false)
Constructor.
Definition ThreadPool.C:33
#define LDEBUG(msg)
Convenience macro for users to print out console or syslog messages, DEBUG level.
Definition Log.H:173
#define LERROR(msg)
Convenience macro for users to print out console or syslog messages, ERROR level.
Definition Log.H:211
#define LINFO(msg)
Convenience macro for users to print out console or syslog messages, INFO level.
Definition Log.H:194
std::vector< T > joinall(std::vector< std::future< T > > &fvec, bool multiline=true)
Collect results from several async threads that are all returning a T result.