Shared-Memory IPC
Shared-memory IPC allows multiple tasks to access a common memory segment. This technique is fast because there is no need to copy data between tasks, but it requires proper synchronization mechanisms like semaphores or mutexes to ensure data consistency.
Table of contents
Mutex (Lock)
Unlike message-passing IPC, shared memory is not managed by the OS. Therefore, developers must use a mutex (refers to mutual exclusion, often referred to as a lock) to prevent simultaneous access to shared resources by multiple threads.
Condition Variable
Condition variables are a synchronization primitive that allows threads to wait until a specific condition is matched. They are essential for coordinating actions between threads, especially in scenarios where threads need to communicate about the state of shared data. In short, mutexes protect shared data while condition variables allow threads to wait efficiently for changes in that shared data.
Shared Memory
Shared memory is a region of memory that can be accessed by multiple processes. It enables high-speed data exchange but requires explicit synchronization to prevent data corruption or race conditions.
sequenceDiagram
participant Process A
participant Mutex
participant Shared Memory
participant Process B
Process A->>Mutex: Lock Mutex (Access Shared Memory)
Process A->>Shared Memory: Write Data (Critical Section)
Process A->>Mutex: Unlock Mutex
Process B->>Mutex: Lock Mutex (Wait for Process A)
Process B->>Shared Memory: Read Data (Critical Section)
Process B->>Mutex: Unlock Mutex
Characteristics of Shared Memory
- Direct Access
Shared memory involves creating a region of memory in userland that multiple processes or threads can directly access. - Speed
Shared memory is generally faster than message passing because it avoids the overhead of copying data. - Developer Responsibility
The OS provides the mechanism to create the shared memory region, but it does not manage concurrent access.
Python Example
import time
import threading
SHARED_ARR_SIZE = 5
SHARED_ARR = [-1] * SHARED_ARR_SIZE
lock = threading.Lock()
def producer(writing_time: float = 1):
thread_name = threading.current_thread().name
for i in range(SHARED_ARR_SIZE):
time.sleep(writing_time)
with lock:
SHARED_ARR[i] = i
print(f"{thread_name} writes index {i}: {i}")
def consumer(penalty=1):
thread_name = threading.current_thread().name
for i in range(SHARED_ARR_SIZE):
while True:
with lock:
data = SHARED_ARR[i]
if data == -1:
print(f"{thread_name}: No data at index {i}, waiting {penalty} sec")
time.sleep(penalty)
else:
print(f"{thread_name} consumes index {i}, data: {data}")
break
if __name__ == "__main__":
consumer_thread = threading.Thread(name="Consumer", target=consumer, args=(1,))
producer_thread = threading.Thread(name="Producer", target=producer, args=(0.5,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
C++ Example
#include <iostream>
#include <thread>
#include <string>
#include <chrono>
#include <mutex>
static const int s_NumData =5;
static int s_SharedArr[s_NumData];
std::mutex mutex;
void ProduceJob(unsigned int producingSeconds)
{
std::cout << std::this_thread::get_id() << "\n";
auto producingTime = std::chrono::seconds(producingSeconds);
std::this_thread::sleep_for(producingTime);
for (int idx=0; idx<s_NumData; ++idx)
{
std::lock_guard<std::mutex> lk(mutex);
std::cout << "Writing [" << idx << "] data \n";
s_SharedArr[idx] = idx;
}
}
void ConsumeJob(unsigned int penaltySeconds) {
for (int idx = 0; idx < s_NumData; ++idx) {
while (true) {
{
std::lock_guard<std::mutex> lk(mutex);
if (s_SharedArr[idx] != -1) {
std::cout << "Consumer read [" << idx << "] data: " << s_SharedArr[idx] << "\n";
break;
}
}
std::cout << "Consumer: No data at index " << idx << ", waiting " << penaltySeconds << " sec\n";
std::this_thread::sleep_for(std::chrono::seconds(penaltySeconds));
}
}
}
int main()
{
for (int idx=0; idx<s_NumData; ++idx)
{
s_SharedArr[idx] = -1;
}
const int numThread = 2;
std::thread threads[numThread];
threads[0] = std::thread(ProduceJob,1);
threads[1] = std::thread(ConsumeJob,2);
for (int idx=0; idx < numThread; ++idx)
{
threads[idx].join();
}
}
Thread Pool Pattern
A thread pool consists of multiple worker threads that execute tasks concurrently. These threads share resources, reducing the overhead of creating and destroying threads frequently. Synchronization techniques, such as mutexes and condition variables, help manage access to shared resources efficiently.
sequenceDiagram
participant Client1
participant Client2
participant TaskQueue
participant ThreadPool
participant Worker1
participant Worker2
Client1->>TaskQueue: Submit Task A
Client2->>TaskQueue: Submit Task B
TaskQueue-->>ThreadPool: Notify New Tasks
ThreadPool->>Worker1: Assign Task A
Worker1->>Worker1: Process Task A
ThreadPool->>Worker2: Assign Task B
Worker2->>Worker2: Process Task B
Thread Pool in Python
import time
from queue import Queue
import threading
from threading import Thread
from typing import Callable, Any, Tuple
from utils import Timer
# typedef
Callback = Callable[..., None]
Task = Tuple[Callback, Any, Any]
class Worker(Thread):
def __init__(self, tasks:Queue[Task]):
super().__init__()
self.tasks : Queue = tasks
def run(self):
while True:
if not self.tasks.empty():
func, args, kwargs = self.tasks.get()
func(*args, **kwargs)
self.tasks.task_done()
class ThreadPool:
def __init__(self, num_threads:int):
self.tasks = Queue(num_threads)
self.num_threads = num_threads
for _ in range(self.num_threads):
worker = Worker(self.tasks)
worker.setDaemon(True)
worker.start()
def submit(self, func:Callback, *args, **kwargs):
self.tasks.put((func, args, kwargs))
def wait(self):
self.tasks.join()
def cpu_waster(idx: int) -> None:
name = threading.current_thread().getName()
print(f"{name} : doing {idx} work")
time.sleep(3)
def main():
num_jobs : int = 20
num_threads : int = 5
timer : Timer = Timer()
pool = ThreadPool(num_threads = num_threads)
for idx in range(num_jobs):
pool.submit(cpu_waster, idx)
pool.wait()
if "__main__" == __name__:
main()
Thread Pool in C++
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <atomic>
std::mutex coutMtx;
using Task = std::function<void()>;
class ThreadPool {
public:
ThreadPool(size_t numThreads) : mb_Stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
m_Workers.emplace_back([this] { workerThread(); });
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(m_queueMtx);
mb_Stop = true;
}
m_CV.notify_all();
for (auto& worker : m_Workers) {
worker.join();
}
}
template <typename Func, typename... Args>
void Submit(Func&& func, Args&&... args) {
{
std::unique_lock<std::mutex> lk(m_queueMtx);
m_Tasks.emplace(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
}
m_CV.notify_one();
}
void Wait() {
std::unique_lock<std::mutex> lk(m_WaitMtx);
m_CVWait.wait(lk, [this] { return m_Tasks.empty(); });
}
private:
void workerThread() {
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(m_queueMtx);
m_CV.wait(lock, [this] { return mb_Stop || !m_Tasks.empty(); });
if (mb_Stop && m_Tasks.empty()) {
return;
}
task = std::move(m_Tasks.front());
m_Tasks.pop();
if(m_Tasks.empty()){
m_CVWait.notify_one();
}
}
task();
}
}
std::vector<std::thread> m_Workers;
std::queue<Task> m_Tasks;
std::mutex m_queueMtx;
std::condition_variable m_CV;
std::mutex m_WaitMtx;
std::condition_variable m_CVWait;
std::atomic<bool> mb_Stop;
};
void cpu_waster(int idx) {
std::this_thread::sleep_for(std::chrono::seconds(3));
{
std::lock_guard<std::mutex> lk(coutMtx);
std::cout << std::this_thread::get_id() << " : doing " << idx << " work" << std::endl;
}
}
int main() {
int numJobs = 20;
int numThreads = 5;
ThreadPool pool(numThreads);
for (int idx = 0; idx < numJobs; ++idx) {
pool.Submit(cpu_waster, idx);
}
pool.Wait();
return 0;
}
Conclusion
Inter-Process Communication (IPC) is fundamental to concurrent programming, allowing processes and threads to coordinate efficiently. Shared-memory IPC techniques provide high performance but require great caution from developers to ensure safe code because OS merely manages the shared memory control in userland.