将执行从一个线程移到另一个线程,以实现任务并行和未来呼叫(Moving execution from one thread to another to implement task paralleli

编程入门 行业动态 更新时间:2024-10-19 08:47:49
将执行从一个线程移到另一个线程,以实现任务并行和未来呼叫(Moving execution from one thread to another to implement task parallelism and call-by-future)

我试图在C ++中实现一个未来呼叫机制。 虽然这只是一个测试代码(有点匆忙),但我打算为我正在使用的语言的运行时使用类似的东西来实现透明并行。

我已经干了我正在努力的代码,使它更小一点,但它仍然很大:

#include <cstdlib> #include <cstdio> #include <iostream> #include <vector> #include <queue> #include <future> #include <thread> #include <functional> #include <type_traits> #include <utility> using namespace std; using namespace std::chrono; //------------------------------------------------------------------------------ // Simple locked printer static std::recursive_mutex print_lock; inline void print_() { return; }; template<typename T, typename... Args> inline void print_(T t, Args... args) { print_lock.lock(); std::cout << t; print_(args...); print_lock.unlock(); }; //------------------------------------------------------------------------------ template<typename R> class PooledTask { public: explicit PooledTask(function<R()>); // Possibly execute the task and return the value R &operator () () { // If we can get the lock, we're not executing if(lock.try_lock()) { // We may already have executed it if(done) goto end; // Otherwise, execute it now try { result = move(task()); } catch(...) { // If an exception is thrown, save it for later eptr = current_exception(); failed = true; }; done = true; goto end; } else { // Wait until the task is completed lock.lock(); end: { lock.unlock(); // Maybe we got an exception! if(failed) rethrow_exception(eptr); // Otherwise, just return the result return result; }; }; }; private: exception_ptr eptr; function<R()> task; bool done; bool failed; mutex lock; R result; }; extern class TaskPool pool; class TaskPool { public: TaskPool() noexcept: TaskPool(thread::hardware_concurrency() - 1) { return; }; TaskPool(const TaskPool &) = delete; TaskPool(TaskPool &&) = delete; template<typename T> void push(PooledTask<T> *task) noexcept { lock_guard<mutex> guard(lock); builders.push([=] { try { (*task)(); } catch(...) { // Ignore it here! The task will save it. :) }; }); }; ~TaskPool() { // TODO: wait for all tasks to finish... }; private: queue<thread *> threads; queue<function<void()>> builders; mutex lock; TaskPool(signed N) noexcept { while(N --> 0) threads.push(new thread([this, N] { for(;;) { pop_task(); }; })); }; void pop_task() noexcept { lock.lock(); if(builders.size()) { auto task = builders.front(); builders.pop(); lock.unlock(); task(); } else lock.unlock(); }; } pool; template<typename R> PooledTask<R>::PooledTask(function<R()> fun): task(fun), done(false), failed(false) { pool.push(this); }; // Should probably return a std::shared_ptr here... template<typename F, typename... Args> auto byfuture(F fun, Args&&... args) noexcept -> PooledTask<decltype(fun(args...))> * { using R = decltype(fun(args...)); auto pooled = new PooledTask<R> { bind(fun, forward<Args>(args)...) }; return pooled; }; //------------------------------------------------------------------------------ #include <map> // Get the current thread id as a simple number static int myid() noexcept { static unsigned N = 0; static map<thread::id, unsigned> hash; static mutex lock; lock_guard<mutex> guard(lock); auto current = this_thread::get_id(); if(!hash[current]) hash[current] = ++N; return hash[current]; }; //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // The fibonacci test implementation int future_fib(int x, int parent) { if(x < 3) return 1; print_("future_fib(", x, ")", " on thread ", myid(), \ ", asked by thread ", parent, "\n"); auto f1 = byfuture(future_fib, x - 1, myid()); auto f2 = byfuture(future_fib, x - 2, myid()); auto res = (*f1)() + (*f2)(); delete f1; delete f2; return res; }; //------------------------------------------------------------------------------ int main() { // Force main thread to get id 1 myid(); // Get task auto f = byfuture(future_fib, 8, myid()); // Make sure it starts on the task pool this_thread::sleep_for(seconds(1)); // Blocks (*f)(); // Simply wait to be sure all threads are clean this_thread::sleep_for(seconds(2)); // return EXIT_SUCCESS; };

这个程序的结果是这样的(我有一个quadcore,所以在池中有3个线程):

future_fib(8) on thread 2, asked by thread 1 future_fib(7) on thread 3, asked by thread 2 future_fib(6) on thread 4, asked by thread 2 future_fib(6) on thread 3, asked by thread 3 future_fib(5) on thread 4, asked by thread 4 future_fib(5) on thread 3, asked by thread 3 future_fib(4) on thread 4, asked by thread 4 future_fib(4) on thread 3, asked by thread 3 future_fib(3) on thread 4, asked by thread 4 future_fib(3) on thread 3, asked by thread 3 future_fib(3) on thread 4, asked by thread 4 future_fib(3) on thread 3, asked by thread 3 future_fib(4) on thread 4, asked by thread 4 future_fib(4) on thread 3, asked by thread 3 future_fib(3) on thread 4, asked by thread 4 future_fib(3) on thread 3, asked by thread 3 future_fib(5) on thread 3, asked by thread 3 future_fib(4) on thread 3, asked by thread 3 future_fib(3) on thread 3, asked by thread 3 future_fib(3) on thread 3, asked by thread 3

与正常的斐波那契函数相比,这种实现非常缓慢。

所以这里的问题是:当池运行fib(8) ,它会创建两个任务,这些任务将在下一个线程上运行,但是当它到达auto res = (*f1)() + (*f2)(); ,这两个任务都已经在运行,所以它会阻塞在f1 (在线程3上运行)。

为了提高速度,我需要做的是线程2,而不是阻塞在f1 ,假设执行任何线程3正在执行,让它准备好接受另一个任务,所以没有线程会睡觉进行计算。

这篇文章在这里http://bartoszmilewski.com/2011/10/10/async-tasks-in-c11-not-quite-there-yet/说,有必要做我想做的事,但没有指定如何。

我的疑问是:我怎么可能做到这一点?

是否有其他替代方案可以做我想要的?

I'm trying to implement a call-by-future mechanism in C++. Although this is just a test code (made in a bit of a hurry), I intend to use something similar for the runtime of a language I'm working on for transparent parallelism.

I've dried the code I'm working on to make it a little bit smaller, though it is still big:

#include <cstdlib> #include <cstdio> #include <iostream> #include <vector> #include <queue> #include <future> #include <thread> #include <functional> #include <type_traits> #include <utility> using namespace std; using namespace std::chrono; //------------------------------------------------------------------------------ // Simple locked printer static std::recursive_mutex print_lock; inline void print_() { return; }; template<typename T, typename... Args> inline void print_(T t, Args... args) { print_lock.lock(); std::cout << t; print_(args...); print_lock.unlock(); }; //------------------------------------------------------------------------------ template<typename R> class PooledTask { public: explicit PooledTask(function<R()>); // Possibly execute the task and return the value R &operator () () { // If we can get the lock, we're not executing if(lock.try_lock()) { // We may already have executed it if(done) goto end; // Otherwise, execute it now try { result = move(task()); } catch(...) { // If an exception is thrown, save it for later eptr = current_exception(); failed = true; }; done = true; goto end; } else { // Wait until the task is completed lock.lock(); end: { lock.unlock(); // Maybe we got an exception! if(failed) rethrow_exception(eptr); // Otherwise, just return the result return result; }; }; }; private: exception_ptr eptr; function<R()> task; bool done; bool failed; mutex lock; R result; }; extern class TaskPool pool; class TaskPool { public: TaskPool() noexcept: TaskPool(thread::hardware_concurrency() - 1) { return; }; TaskPool(const TaskPool &) = delete; TaskPool(TaskPool &&) = delete; template<typename T> void push(PooledTask<T> *task) noexcept { lock_guard<mutex> guard(lock); builders.push([=] { try { (*task)(); } catch(...) { // Ignore it here! The task will save it. :) }; }); }; ~TaskPool() { // TODO: wait for all tasks to finish... }; private: queue<thread *> threads; queue<function<void()>> builders; mutex lock; TaskPool(signed N) noexcept { while(N --> 0) threads.push(new thread([this, N] { for(;;) { pop_task(); }; })); }; void pop_task() noexcept { lock.lock(); if(builders.size()) { auto task = builders.front(); builders.pop(); lock.unlock(); task(); } else lock.unlock(); }; } pool; template<typename R> PooledTask<R>::PooledTask(function<R()> fun): task(fun), done(false), failed(false) { pool.push(this); }; // Should probably return a std::shared_ptr here... template<typename F, typename... Args> auto byfuture(F fun, Args&&... args) noexcept -> PooledTask<decltype(fun(args...))> * { using R = decltype(fun(args...)); auto pooled = new PooledTask<R> { bind(fun, forward<Args>(args)...) }; return pooled; }; //------------------------------------------------------------------------------ #include <map> // Get the current thread id as a simple number static int myid() noexcept { static unsigned N = 0; static map<thread::id, unsigned> hash; static mutex lock; lock_guard<mutex> guard(lock); auto current = this_thread::get_id(); if(!hash[current]) hash[current] = ++N; return hash[current]; }; //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // The fibonacci test implementation int future_fib(int x, int parent) { if(x < 3) return 1; print_("future_fib(", x, ")", " on thread ", myid(), \ ", asked by thread ", parent, "\n"); auto f1 = byfuture(future_fib, x - 1, myid()); auto f2 = byfuture(future_fib, x - 2, myid()); auto res = (*f1)() + (*f2)(); delete f1; delete f2; return res; }; //------------------------------------------------------------------------------ int main() { // Force main thread to get id 1 myid(); // Get task auto f = byfuture(future_fib, 8, myid()); // Make sure it starts on the task pool this_thread::sleep_for(seconds(1)); // Blocks (*f)(); // Simply wait to be sure all threads are clean this_thread::sleep_for(seconds(2)); // return EXIT_SUCCESS; };

The result for this program is something like this (I've got a quadcore, so 3 threads in the pool):

future_fib(8) on thread 2, asked by thread 1 future_fib(7) on thread 3, asked by thread 2 future_fib(6) on thread 4, asked by thread 2 future_fib(6) on thread 3, asked by thread 3 future_fib(5) on thread 4, asked by thread 4 future_fib(5) on thread 3, asked by thread 3 future_fib(4) on thread 4, asked by thread 4 future_fib(4) on thread 3, asked by thread 3 future_fib(3) on thread 4, asked by thread 4 future_fib(3) on thread 3, asked by thread 3 future_fib(3) on thread 4, asked by thread 4 future_fib(3) on thread 3, asked by thread 3 future_fib(4) on thread 4, asked by thread 4 future_fib(4) on thread 3, asked by thread 3 future_fib(3) on thread 4, asked by thread 4 future_fib(3) on thread 3, asked by thread 3 future_fib(5) on thread 3, asked by thread 3 future_fib(4) on thread 3, asked by thread 3 future_fib(3) on thread 3, asked by thread 3 future_fib(3) on thread 3, asked by thread 3

This implementation got really slow compared to a normal fibonacci function.

So the problem here: when the pool runs fib(8), it will create two tasks that will run on the next threads, but, when it reaches auto res = (*f1)() + (*f2)();, both tasks are already running, so it will block on f1 (running on thread 3).

What I need to do to get speed improvement would be for thread 2, instead of blocking on f1, to assume the execution of whatever thread 3 is doing, leaving it ready to take another task, so no thread will be sleeping doing the calculation.

This article here http://bartoszmilewski.com/2011/10/10/async-tasks-in-c11-not-quite-there-yet/ says it is necessary to do what I want, but doesn't specify how.

My doubt is: how could I possibly do that?

Are there other alternatives to do what I want?

最满意答案

我认为您可能有机会了解目前为C ++标准化提出的可恢复功能 。 该提案尚未获得批准,但Visual Studio 15 CTP实现了该提案,因此您可以尝试制作原型(如果可以使用MSVC编译器)。

Gor Nishanov(最新提案论文的作者之一)描述了一个非常相似的例子,在他的CppCon讲座中,从23:47开始计算斐波纳契的“父母窃取计划”: https : //www.youtube.com/watch? v = KUhSjfSbINE

但是,请注意,我找不到任何spawnable<T>源代码/样本,因此您可能需要联系提案作者以获取详细信息。

I think you might have a chance with the resumable functions currently proposed for C++ standartization. The proposal is not approved yet, but Visual Studio 15 CTP implements the proposal, so you can try making a prototype (if you can use MSVC compiler).

Gor Nishanov (one of the authors of the latest proposal paper) describes a very similar example of calculating Fibonacci with "parent-stealing scheduling" starting at 23:47 in his CppCon talk: https://www.youtube.com/watch?v=KUhSjfSbINE

Note, however, that I couldn't find any sources/samples of the implementation of the spawnable<T>, so you may need to contact the proposal authors for details.

更多推荐

本文发布于:2023-08-04 22:42:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1423355.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:线程   移到   未来   Moving   execution

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!