共享内存 IPC 同步(无锁)

编程入门 行业动态 更新时间:2024-10-23 01:42:34
本文介绍了共享内存 IPC 同步(无锁)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

Consider the following scenario:

Requirements:

  • Intel x64 Server (multiple CPU-sockets => NUMA)
  • Ubuntu 12, GCC 4.6
  • Two processes sharing large amounts of data over (named) shared-memory
  • Classical producer-consumer scenario
  • Memory is arranged in a circular buffer (with M elements)

Program sequence (pseudo code):

Process A (Producer):

int bufferPos = 0; while( true ) { if( isBufferEmpty( bufferPos ) ) { writeData( bufferPos ); setBufferFull( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } }

Process B (Consumer):

int bufferPos = 0; while( true ) { if( isBufferFull( bufferPos ) ) { readData( bufferPos ); setBufferEmpty( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } }

Now the age-old question: How to synchronize them effectively!?

  • Protect every read/write access with mutexes
  • Introduce a "grace period", to allow writes to complete: Read data in buffer N, when buffer(N+3) has been marked as full (dangerous, but seems to work...)
  • ?!?
  • Ideally I would like something along the lines of a memory-barrier, that guarantees that all previous reads/writes are visible across all CPUs, along the lines of:

    writeData( i ); MemoryBarrier(); //All data written and visible, set flag setBufferFull( i );

    This way, I would only have to monitor the buffer flags and then could read the large data chunks safely.

    Generally I'm looking for something along the lines of acquire/release fences as described by Preshing here:

    preshing/20130922/acquire-and-release-fences/

    (if I understand it correctly the C++11 atomics only work for threads of a single process and not along multiple processes.)

    However the GCC-own memory barriers (__sync_synchronize in combination with the compiler barrier asm volatile( "" ::: "memory" ) to be sure) don't seem to work as expected, as writes become visible after the barrier, when I expected them to be completed.

    Any help would be appreciated...

    BTW: Under windows this just works fine using volatile variables (a Microsoft specific behaviour)...

    解决方案

    Boost Interprocess has support for Shared Memory.

    Boost Lockfree has a Single-Producer Single-Consumer queue type (spsc_queue). This is basically what you refer to as a circular buffer.

    Here's a demonstration that passes IPC messages (in this case, of type string) using this queue, in a lock-free fashion.

    Defining the types

    First, let's define our types:

    namespace bip = boost::interprocess; namespace shm { template <typename T> using alloc = bip::allocator<T, bip::managed_shared_memory::segment_manager>; using char_alloc = alloc<char>; using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc >; using string_alloc = alloc<shared_string>; using ring_buffer = boost::lockfree::spsc_queue< shared_string, boost::lockfree::capacity<200> // alternatively, pass // boost::lockfree::allocator<string_alloc> >; }

    For simplicity I chose to demo the runtime-size spsc_queue implementation, randomly requesting a capacity of 200 elements.

    The shared_string typedef defines a string that will transparently allocate from the shared memory segment, so they are also "magically" shared with the other process.

    The consumer side

    This is the simplest, so:

    int main() { // create segment and corresponding allocator bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::string_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();

    This opens the shared memory area, locates the shared queue if it exists. NOTE This should be synchronized in real life.

    Now for the actual demonstration:

    while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); shm::shared_string v(char_alloc); if (queue->pop(v)) std::cout << "Processed: '" << v << "' "; }

    The consumer just infinitely monitors the queue for pending jobs and processes one each ~10ms.

    The Producer side

    The producer side is very similar:

    int main() { bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::char_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();

    Again, add proper synchronization to the initialization phase. Also, you would probably make the producer in charge of freeing the shared memory segment in due time. In this demonstration, I just "let it hang". This is nice for testing, see below.

    So, what does the producer do?

    for (const char* s : { "hello world", "the answer is 42", "where is your towel" }) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); queue->push({s, char_alloc}); } }

    Right, the producer produces precisely 3 messages in ~750ms and then exits.

    Note that consequently if we do (assume a POSIX shell with job control):

    ./producer& ./producer& ./producer& wait ./consumer&

    Will print 3x3 messages "immediately", while leaving the consumer running. Doing

    ./producer& ./producer& ./producer&

    again after this, will show the messages "trickle in" in realtime (in burst of 3 at ~250ms intervals) because the consumer is still running in the background

    See the full code online in this gist: gist.github/sehe/9376856

    更多推荐

    共享内存 IPC 同步(无锁)

    本文发布于:2023-11-30 20:05:27,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1651337.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:内存   IPC

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!