Shared-Memory IPC

Shared-memory IPC allows multiple tasks to access a common memory segment. This technique is fast because there is no need to copy data between tasks, but it requires proper synchronization mechanisms like semaphores or mutexes to ensure data consistency.

Table of contents

Mutex (Lock)

Unlike message-passing IPC, shared memory is not managed by the OS. Therefore, developers must use a mutex (refers to mutual exclusion, often referred to as a lock) to prevent simultaneous access to shared resources by multiple threads.

Condition Variable

Condition variables are a synchronization primitive that allows threads to wait until a specific condition is matched. They are essential for coordinating actions between threads, especially in scenarios where threads need to communicate about the state of shared data. In short, mutexes protect shared data while condition variables allow threads to wait efficiently for changes in that shared data.

Shared Memory

Shared memory is a region of memory that can be accessed by multiple processes. It enables high-speed data exchange but requires explicit synchronization to prevent data corruption or race conditions.

 
sequenceDiagram
    participant Process A
    participant Mutex
    participant Shared Memory
    participant Process B

    Process A->>Mutex: Lock Mutex (Access Shared Memory)
    Process A->>Shared Memory: Write Data (Critical Section)
    Process A->>Mutex: Unlock Mutex

    Process B->>Mutex: Lock Mutex (Wait for Process A)
    Process B->>Shared Memory: Read Data (Critical Section)
    Process B->>Mutex: Unlock Mutex

Characteristics of Shared Memory

  1. Direct Access
    Shared memory involves creating a region of memory in userland that multiple processes or threads can directly access.
  2. Speed
    Shared memory is generally faster than message passing because it avoids the overhead of copying data.
  3. Developer Responsibility
    The OS provides the mechanism to create the shared memory region, but it does not manage concurrent access.

Python Example

import time
import threading

SHARED_ARR_SIZE = 5
SHARED_ARR = [-1] * SHARED_ARR_SIZE
lock = threading.Lock()

def producer(writing_time: float = 1):
    thread_name = threading.current_thread().name
    for i in range(SHARED_ARR_SIZE):
        time.sleep(writing_time)
        with lock:
            SHARED_ARR[i] = i
            print(f"{thread_name} writes index {i}: {i}")

def consumer(penalty=1):
    thread_name = threading.current_thread().name
    for i in range(SHARED_ARR_SIZE):
        while True:
            with lock:
                data = SHARED_ARR[i]
            if data == -1:
                print(f"{thread_name}: No data at index {i}, waiting {penalty} sec")
                time.sleep(penalty)
            else:
                print(f"{thread_name} consumes index {i}, data: {data}")
                break

if __name__ == "__main__":
    consumer_thread = threading.Thread(name="Consumer", target=consumer, args=(1,))
    producer_thread = threading.Thread(name="Producer", target=producer, args=(0.5,))

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()

C++ Example

#include <iostream>
#include <thread>
#include <string>
#include <chrono>
#include <mutex>

static const int s_NumData =5;
static int s_SharedArr[s_NumData];
std::mutex mutex;


void ProduceJob(unsigned int producingSeconds)
{
    
    std::cout << std::this_thread::get_id() << "\n";
    auto producingTime = std::chrono::seconds(producingSeconds);
    std::this_thread::sleep_for(producingTime);
    for (int idx=0; idx<s_NumData; ++idx)
    {
        std::lock_guard<std::mutex> lk(mutex);
        std::cout << "Writing [" << idx << "] data \n";
        s_SharedArr[idx] = idx;
    }
}

void ConsumeJob(unsigned int penaltySeconds) {
    for (int idx = 0; idx < s_NumData; ++idx) {
        while (true) {
            {
                std::lock_guard<std::mutex> lk(mutex);
                if (s_SharedArr[idx] != -1) {
                    std::cout << "Consumer read [" << idx << "] data: " << s_SharedArr[idx] << "\n";
                    break;
                }
            }
            std::cout << "Consumer: No data at index " << idx << ", waiting " << penaltySeconds << " sec\n";
            std::this_thread::sleep_for(std::chrono::seconds(penaltySeconds));
        }
    }
}


int main()
{
    for (int idx=0; idx<s_NumData; ++idx)
    {
        s_SharedArr[idx] = -1;
    }
    const int numThread = 2;
    std::thread threads[numThread];
    threads[0] = std::thread(ProduceJob,1);  
    threads[1] = std::thread(ConsumeJob,2);

    for (int idx=0; idx < numThread; ++idx)
    {
        threads[idx].join();
    }
}

Thread Pool Pattern

A thread pool consists of multiple worker threads that execute tasks concurrently. These threads share resources, reducing the overhead of creating and destroying threads frequently. Synchronization techniques, such as mutexes and condition variables, help manage access to shared resources efficiently.

sequenceDiagram
    participant Client1
    participant Client2
    participant TaskQueue
    participant ThreadPool
    participant Worker1
    participant Worker2

    Client1->>TaskQueue: Submit Task A
    Client2->>TaskQueue: Submit Task B
    TaskQueue-->>ThreadPool: Notify New Tasks
    ThreadPool->>Worker1: Assign Task A
    Worker1->>Worker1: Process Task A
    ThreadPool->>Worker2: Assign Task B
    Worker2->>Worker2: Process Task B

Thread Pool in Python

import time
from queue import Queue
import threading
from threading import Thread
from typing import Callable, Any, Tuple

from utils import Timer

# typedef
Callback = Callable[..., None]
Task = Tuple[Callback, Any, Any]

class Worker(Thread):
    def __init__(self, tasks:Queue[Task]):
        super().__init__()
        self.tasks : Queue = tasks
    
    def run(self):
        while True:
            if not self.tasks.empty():
                func, args, kwargs = self.tasks.get()
                func(*args, **kwargs)
                self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads:int):
        self.tasks = Queue(num_threads)
        self.num_threads = num_threads

        for _ in range(self.num_threads):
            worker = Worker(self.tasks)
            worker.setDaemon(True)
            worker.start()

    def submit(self, func:Callback, *args, **kwargs):
        self.tasks.put((func, args, kwargs))
    
    def wait(self):
        self.tasks.join()

def cpu_waster(idx: int) -> None:
    name = threading.current_thread().getName()
    print(f"{name} : doing {idx} work")
    time.sleep(3)

def main():
    num_jobs     : int   = 20
    num_threads  : int   = 5
    timer        : Timer = Timer()

    pool = ThreadPool(num_threads = num_threads)
    for idx in range(num_jobs):
        pool.submit(cpu_waster, idx)
    pool.wait()

if "__main__" == __name__:
    main()

Thread Pool in C++

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <atomic>

std::mutex coutMtx;

using Task = std::function<void()>;

class ThreadPool {
public:
    ThreadPool(size_t numThreads) : mb_Stop(false) {
        for (size_t i = 0; i < numThreads; ++i) {
            m_Workers.emplace_back([this] { workerThread(); });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(m_queueMtx);
            mb_Stop = true;
        }
        m_CV.notify_all();
        for (auto& worker : m_Workers) {
            worker.join();
        }
    }

    template <typename Func, typename... Args>
    void Submit(Func&& func, Args&&... args) {
        {
            std::unique_lock<std::mutex> lk(m_queueMtx);
            m_Tasks.emplace(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
        }
        m_CV.notify_one();
    }

    void Wait() {
        std::unique_lock<std::mutex> lk(m_WaitMtx);
        m_CVWait.wait(lk, [this] { return m_Tasks.empty(); });
    }

private:
    void workerThread() {
        while (true) {
            Task task;
            {
                std::unique_lock<std::mutex> lock(m_queueMtx);
                m_CV.wait(lock, [this] { return mb_Stop || !m_Tasks.empty(); });
                if (mb_Stop && m_Tasks.empty()) {
                    return;
                }
                task = std::move(m_Tasks.front());
                m_Tasks.pop();

                if(m_Tasks.empty()){
                    m_CVWait.notify_one();
                }
            }

            task();
        }
    }

    std::vector<std::thread> m_Workers;
    std::queue<Task> m_Tasks;
    std::mutex m_queueMtx;
    std::condition_variable m_CV;
    std::mutex m_WaitMtx;
    std::condition_variable m_CVWait;
    std::atomic<bool> mb_Stop;
};

void cpu_waster(int idx) {
    std::this_thread::sleep_for(std::chrono::seconds(3));
    {   
        std::lock_guard<std::mutex> lk(coutMtx);
        std::cout << std::this_thread::get_id() << " : doing " << idx << " work" << std::endl;
    }
}

int main() {
    int numJobs = 20;
    int numThreads = 5;

    ThreadPool pool(numThreads);

    for (int idx = 0; idx < numJobs; ++idx) {
        pool.Submit(cpu_waster, idx);
    }
    pool.Wait();

    return 0;
}

Conclusion

Inter-Process Communication (IPC) is fundamental to concurrent programming, allowing processes and threads to coordinate efficiently. Shared-memory IPC techniques provide high performance but require great caution from developers to ensure safe code because OS merely manages the shared memory control in userland.