Message-Passing IPC
Message-passing IPC provides a structured way for tasks to communicate without directly sharing memory. It is widely used in distributed systems and multiprocessing environments where isolation between processes is crucial.
In message-passing IPC, each task is identified by a unique name, and tasks interact by sending and receiving messages to and from named tasks.
The OS establishes a communication channel and provides proper system calls for tasks to pass messages through this channel.
The advantage of Message-Passing is that the OS manages the channel, providing a useful interface to send and receive data without conflict. However, there is a huge communication cost. To transfer any piece of information between tasks, information must be copied from the task’s user space to the OS channel through system calls and then copied back to the address space of the receiving task.
Table of contents
Pipe
Pipes provide a unidirectional communication channel between two processes (one-to-one, one way, write then read). They are commonly used for parent-child process communication and allow data transfer in a sequential manner.
sequenceDiagram
participant Process A
participant Pipe
participant Process B
Process A->>Pipe: Write Data
Pipe-->>Process B: Read Data
Process B->>Process B: Process Data
Python Example
import time
import threading
from threading import Thread
from multiprocessing import Pipe
from multiprocessing.connection import Connection
from utils import Timer ## (ref) - algorithm tab - Useful Timer Class
class WriterThread(object):
def __init__(self, name:str, conn:Connection, data:list=[]):
self.conn = conn
self.data = data
self._thread = Thread(
target=self.do_thread_job,
name=name,
)
def start(self):
self._thread.start()
def join(self):
self._thread.join()
def do_thread_job(self):
print(f"{self._thread.name} thread: {threading.current_thread()}")
time.sleep(1)
self.conn.send(self.data)
class ReceiverThread(object):
def __init__(self, name:str, conn: Connection):
self.conn = conn
self._thread = Thread(
target=self.do_thread_job,
name=name
)
def start(self):
self._thread.start()
def join(self):
self._thread.join()
def do_thread_job(self):
print(f"{self._thread.name} thread: {threading.current_thread()}")
messages = self.conn.recv()
print(messages)
def main():
timer = Timer()
conn1, conn2 = Pipe()
writer_thread = WriterThread(
name="writer",
conn=conn1,
data=["Hello World!", "why so serious"]
)
receiver_thread = ReceiverThread(
name="receiver",
conn=conn2
)
threads = [writer_thread, receiver_thread]
for thread_ in threads:
thread_.start()
for thread_ in threads:
thread_.join()
if "__main__" == __name__:
main()
C++ Example
In C++, we need to use OS dependent headers to use message passing IPC. In the following examples, we will use UNIX (POSIX) dependent libraries (i.e. unistd.h).
#include <iostream>
#include <thread>
#include <string>
#include <chrono>
#include <vector>
#include <sstream>
#include <cstring>
// UNIX dependency - POSIX . Not working in Windows
#include <unistd.h> // for pipe(), write(), read()
#include "Timer.h" // (ref) - algorithm tab - Useful Timer Class
struct ThreadMessage
{
int messageNumber;
std::string message;
};
class PipedThread
{
public:
PipedThread(std::string name, int pipeNum, int numMessages, void(*threadJob)(int, int)) // use function pointer
:m_Name(name)
,m_PipeNum(pipeNum)
,m_Thread(threadJob, pipeNum, numMessages)
{
std::cout << "Initiated " << m_Name <<" Thread: " << std::this_thread::get_id() << "\n";
}
void Join()
{
m_Thread.join();
}
private:
std::string m_Name;
std::thread m_Thread;
int m_PipeNum;
};
void WriteToPipe(const int writeFd, const int numMessages)
{
using namespace std::chrono_literals;
std::stringstream message[numMessages];
for (int i=0; i<numMessages; i++)
{
std::this_thread::sleep_for(1s);
for (int j=0; j<i+1; j++)
{
message[i] << "Pika! ";
}
std::string stringMessage = message[i].str();
ThreadMessage messageWritten = {i, stringMessage};
write(writeFd, &messageWritten, sizeof(messageWritten));
std::cout << "[Writer] Message " << i+1 << " Sented \n";
}
}
void ReadFromPipe(const int readerFd, const int numMessages)
{
std::stringstream messages[numMessages];
for (int i=0; i < numMessages ; i++)
{
ThreadMessage messageReceived;
read(readerFd, &messageReceived, sizeof(messageReceived));
std::cout << "[Reader] Message " << i+1 << " was " << messageReceived.message <<"\n";
}
}
int main()
{
Timer timer;
int pipeFds[2];
const int numMessages = 2;
if (pipe(pipeFds) == -1) {
std::cerr << "Pipe creation failed!\n";
return 1;
}
PipedThread writerThread = {"Writer", pipeFds[1], numMessages, WriteToPipe}; // 1 is writer
PipedThread readerThread = {"Reader", pipeFds[0], numMessages, ReadFromPipe}; // 0 is reader
writerThread.Join();
readerThread.Join();
}
The expected output will be as follows:
Initiated Writer Thread: 0x1ef558c00
Initiated Reader Thread: 0x1ef558c00
[Writer] Message 1 Sented
[Reader] Message 1 was Pika!
[Writer] Message 2 Sented
[Reader] Message 2 was Pika! Pika!
Process Ended in 2.00611 [s].
Message Queue
Message queues allow multiple processes to exchange discrete messages asynchronously. This mechanism ensures structured communication and enables better decoupling of process execution.
sequenceDiagram
participant Process A
participant Process B
participant Process C
participant Message Queue
participant Process D
participant Process E
Process A->>Message Queue: Send Message 1
Process B->>Message Queue: Send Message 2
Process C->>Message Queue: Send Message 3
Message Queue-->>Process D: Deliver Message 1
Message Queue-->>Process E: Deliver Message 2
Message Queue-->>Process D: Deliver Message 3
Process D->>Process D: Process Message 1 & 3
Process E->>Process E: Process Message 2
Python Example
from queue import Queue
from threading import Thread, current_thread
from utils import Timer
class QueueSwaper(Thread):
def __init__(self, queue_in: Queue, queue_out: Queue, id: int):
super().__init__(name=str(id))
self.queue_in = queue_in
self.queue_out = queue_out
def run(self) -> None:
while not self.queue_in.empty():
item = self.queue_in.get()
self.queue_out.put(f"Thread-{current_thread}: Data - {item}")
def main(num_items:int, num_threads: int) -> None:
queue_in = Queue()
queue_out = Queue()
for item_number in range(num_items):
queue_in.put(item_number)
threads = []
for thread_idx in range(num_threads):
thread_ = QueueSwaper(queue_in, queue_out, thread_idx + 1)
threads.append(thread_)
for thread_ in threads:
thread_.start()
for thread_ in threads:
thread_.join()
for item in queue_out.queue:
print(item)
if "__main__" == __name__:
num_threads: int = 2
num_items: int = 65545
timer: Timer = Timer()
main(num_items, num_threads)
C++ Example
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include <string>
#include <queue>
#include <deque>
#include <array>
#include <sstream>
#include <atomic>
#include "Timer.h"
struct QueueThreadSafe
{
QueueThreadSafe(std::string id)
: m_ID(id), m_Done(false) {}
std::string m_ID;
std::deque<std::string> m_Data;
std::mutex m_Mutex;
std::condition_variable m_CV;
std::atomic<bool> m_Done;
};
void ConsumeTime(QueueThreadSafe& queue)
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
std::cout << "Thread ["<< std::this_thread::get_id() << "] finalizes the job. (spends some time) \n";
}
void SwapQueue(QueueThreadSafe& srcQueue, QueueThreadSafe& destQueue) {
// Timer timer;
std::thread::id threadID = std::this_thread::get_id();
while (true) {
std::string data;
{
std::unique_lock<std::mutex> src_lk(srcQueue.m_Mutex);
srcQueue.m_CV.wait(
src_lk, [&srcQueue] { return !srcQueue.m_Data.empty() || srcQueue.m_Done; }
);
if (srcQueue.m_Data.empty() && srcQueue.m_Done) {
break;
}
data = srcQueue.m_Data.front();
srcQueue.m_Data.pop_front();
}
std::stringstream tempStr;
tempStr << "[" << threadID << "] :" << data;
std::string processedData = tempStr.str();
{
std::lock_guard<std::mutex> dest_lk(destQueue.m_Mutex);
destQueue.m_Data.push_back(processedData);
}
destQueue.m_CV.notify_one();
}
ConsumeTime(destQueue);
}
int main()
{
Timer timer;
const int numThreads = 2;
const int numDummyStrings = 100;
QueueThreadSafe srcQueue("srcQueue");
QueueThreadSafe destQueue("destQueue");
for (int idx = 0; idx < numDummyStrings; idx++)
{
std::stringstream tempStr;
tempStr << "Data - " << idx;
srcQueue.m_Data.push_back(tempStr.str());
}
std::array<std::thread, numThreads> threads;
// Start worker threads
for (std::thread& thread : threads)
{
thread = std::thread(SwapQueue, std::ref(srcQueue), std::ref(destQueue));
}
{
std::lock_guard<std::mutex> lock(srcQueue.m_Mutex);
srcQueue.m_Done = true; // Mark queue as done
}
srcQueue.m_CV.notify_all(); // Wake up all threads
// Wait for all threads to finish
for (auto& thread : threads)
{
thread.join();
}
// Print results
for (auto& datum : destQueue.m_Data)
{
std::cout << datum << "\n";
}
return 0;
}
The expected output would be as follows:
Thread [0x16b81f000] finalizes the job. (spends some time)
Thread [0x16b793000] finalizes the job. (spends some time)
[0x16b793000] :Data - 0
[0x16b81f000] :Data - 1
[0x16b793000] :Data - 2
[0x16b793000] :Data - 4
[0x16b81f000] :Data - 3
[0x16b793000] :Data - 5
...
[0x16b793000] :Data - 99
Process Ended in 1.00558 [s].
Socket
Sockets provide communication between processes over a network or within the same machine. They support both connection-oriented (TCP) and connectionless (UDP) communication models, making them suitable for distributed systems. In this case, we use local socket by using AF_UNIX to communicate between threads.
sequenceDiagram
participant Process A
participant Socket
participant Process B
Process A->>Socket: Connect to Process B
Process B-->>Socket: Accept Connection
Process A->>Socket: Send Data (Message 1)
Socket-->>Process B: Receive Data (Message 1)
Process B->>Socket: Send Response (Message 2)
Socket-->>Process A: Receive Response (Message 2)
Using multiple sockets, we can provide typical web server architecture, which will be discussed in the future.
Python Example
import os
import socket
from threading import Thread
import time
SOCKET_FILE = "./mailBox"
class Sender(Thread):
def __init__(self):
super().__init__()
self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def run(self):
self.client.connect(SOCKET_FILE)
messages = ["Hello World!", " ", "Why so serious?"]
with self.client:
for message in messages:
self.client.sendall(message.encode())
class Receiver(Thread):
def __init__(self):
super().__init__()
self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def run(self):
self.server.bind(SOCKET_FILE)
self.server.listen()
conn, addr = self.server.accept()
while True:
data = conn.recv(1024)
if not data:
break
message = data.decode()
print(message)
self.server.close()
def remove_socket():
if os.path.exists(SOCKET_FILE):
os.remove(SOCKET_FILE)
def main():
remove_socket()
receiver : Thread = Receiver()
sender : Thread = Sender()
receiver.start()
time.sleep(1)
sender.start()
receiver.join()
sender.join()
remove_socket()
if "__main__" == __name__:
main()
C++ Example
#include <iostream>
#include <cstring> // For memset()
#include <sys/socket.h>
#include <sys/un.h> // For local file sockets
#include <unistd.h> // For close(), unlink()
#include <thread>
#include <chrono>
const char* SOCKET_FILE_PATH = "./mailBox";
class Client
{
public:
Client()
{
m_Socket = socket(AF_UNIX, SOCK_STREAM, 0);
if (m_Socket == -1)
{
perror("Client: Socket creation failed");
exit(EXIT_FAILURE);
}
memset(&m_Client, 0, sizeof(m_Client));
m_Client.sun_family = AF_UNIX;
strncpy(m_Client.sun_path, SOCKET_FILE_PATH, sizeof(m_Client.sun_path) - 1);
}
void SendMessageLoop(int numMessages)
{
int messageCount = 1;
while (messageCount < numMessages)
{
int sock = socket(AF_UNIX, SOCK_STREAM, 0);
if (connect(sock, (struct sockaddr*)&m_Client, sizeof(m_Client)) == -1)
{
perror("Client: Connection failed");
close(sock);
return;
}
std::string message = "Message " + std::to_string(messageCount++);
send(sock, message.c_str(), message.size(), 0);
close(sock);
std::this_thread::sleep_for(std::chrono::milliseconds(60));
}
++messageCount;
}
private:
int m_Socket;
struct sockaddr_un m_Client;
};
class Server
{
public:
Server()
{
m_Socket = socket(AF_UNIX, SOCK_STREAM, 0);
if (m_Socket == -1)
{
perror("Server: Socket creation failed");
exit(EXIT_FAILURE);
}
memset(&m_Server, 0, sizeof(m_Server));
m_Server.sun_family = AF_UNIX;
strncpy(m_Server.sun_path, SOCKET_FILE_PATH, sizeof(m_Server.sun_path) - 1);
unlink(SOCKET_FILE_PATH);
if (bind(m_Socket, (struct sockaddr*)&m_Server, sizeof(m_Server)) == -1)
{
perror("Server: Bind failed");
close(m_Socket);
exit(EXIT_FAILURE);
}
if (listen(m_Socket, 5) == -1)
{
perror("Server: Listen failed");
close(m_Socket);
exit(EXIT_FAILURE);
}
}
~Server()
{
close(m_Socket);
unlink(SOCKET_FILE_PATH);
}
void ReceiveMessage()
{
while (true)
{
std::cout << "Server: Waiting for a client...\n";
int clientSocket = accept(m_Socket, nullptr, nullptr);
if (clientSocket == -1)
{
perror("Server: Accept failed");
continue;
}
char messageBuffer[1024] = {0};
recv(clientSocket, messageBuffer, sizeof(messageBuffer), 0);
std::cout << "Server received: " << messageBuffer << "\n";
close(clientSocket);
}
}
private:
int m_Socket;
struct sockaddr_un m_Server;
};
int main()
{
Server server;
std::thread serverThread(&Server::ReceiveMessage, &server); // not server.ReceiveMessage (binded funciton only can be used as function call)
sleep(1); // system sleep
Client client;
std::thread clientThread(&Client::SendMessageLoop, &client, 10);
serverThread.join();
clientThread.join();
}
The expected output would be as follows:
Server: Waiting for a client...
Server received: Message 1
Server: Waiting for a client...
Server received: Message 2
Server: Waiting for a client...
Server received: Message 3
Server: Waiting for a client...
Server received: Message 4
Server: Waiting for a client...
Server received: Message 5
Server: Waiting for a client...
Server received: Message 6
Server: Waiting for a client...
Server received: Message 7
Server: Waiting for a client...
Server received: Message 8
Server: Waiting for a client...
Server received: Message 9
Server: Waiting for a client...
Conclusion
Inter-Process Communication (IPC) is fundamental to concurrent programming, allowing processes and threads to coordinate efficiently. Message-passing IPC techniques provide better isolation and modularity at the cost of high-performance compared to shared-memory IPC.