Message-Passing IPC

Message-passing IPC provides a structured way for tasks to communicate without directly sharing memory. It is widely used in distributed systems and multiprocessing environments where isolation between processes is crucial.

In message-passing IPC, each task is identified by a unique name, and tasks interact by sending and receiving messages to and from named tasks.

The OS establishes a communication channel and provides proper system calls for tasks to pass messages through this channel.

The advantage of Message-Passing is that the OS manages the channel, providing a useful interface to send and receive data without conflict. However, there is a huge communication cost. To transfer any piece of information between tasks, information must be copied from the task’s user space to the OS channel through system calls and then copied back to the address space of the receiving task.

Table of contents

Pipe

Pipes provide a unidirectional communication channel between two processes (one-to-one, one way, write then read). They are commonly used for parent-child process communication and allow data transfer in a sequential manner.

 
sequenceDiagram
    participant Process A
    participant Pipe
    participant Process B

    Process A->>Pipe: Write Data
    Pipe-->>Process B: Read Data
    Process B->>Process B: Process Data

Python Example

import time
import threading
from threading import Thread

from multiprocessing import Pipe
from multiprocessing.connection import Connection

from utils import Timer ## (ref) - algorithm tab - Useful Timer Class

class WriterThread(object):
    def __init__(self, name:str, conn:Connection, data:list=[]):
        self.conn = conn
        self.data = data
        self._thread = Thread(
            target=self.do_thread_job,
            name=name,
        )

    def start(self):
        self._thread.start()

    def join(self):
        self._thread.join()

    def do_thread_job(self):
        print(f"{self._thread.name} thread: {threading.current_thread()}")
        time.sleep(1)
        self.conn.send(self.data)

class ReceiverThread(object):
    def __init__(self, name:str, conn: Connection):
        self.conn = conn
        self._thread = Thread(
            target=self.do_thread_job,
            name=name
        )

    def start(self):
        self._thread.start()

    def join(self):
        self._thread.join()

    def do_thread_job(self):
        print(f"{self._thread.name} thread: {threading.current_thread()}")
        messages = self.conn.recv()
        print(messages)

def main():
    timer = Timer()
    conn1, conn2 = Pipe()

    writer_thread = WriterThread(
        name="writer",
        conn=conn1,
        data=["Hello World!", "why so serious"]
    )
    receiver_thread = ReceiverThread(
        name="receiver",
        conn=conn2
    )

    threads = [writer_thread, receiver_thread]
    for thread_ in threads:
        thread_.start()
    
    for thread_ in threads:
        thread_.join()

if "__main__" == __name__:
    main()

C++ Example

In C++, we need to use OS dependent headers to use message passing IPC. In the following examples, we will use UNIX (POSIX) dependent libraries (i.e. unistd.h).

#include <iostream>
#include <thread>
#include <string>
#include <chrono>
#include <vector>
#include <sstream>
#include <cstring>
// UNIX dependency - POSIX . Not working in Windows
#include <unistd.h> // for pipe(), write(), read()

#include "Timer.h" // (ref) - algorithm tab - Useful Timer Class

struct ThreadMessage
{
    int messageNumber;
    std::string message;
};

class PipedThread
{
public:
    PipedThread(std::string name, int pipeNum, int numMessages, void(*threadJob)(int, int)) // use function pointer
    :m_Name(name)
    ,m_PipeNum(pipeNum)
    ,m_Thread(threadJob, pipeNum, numMessages)
    {
        std::cout << "Initiated " << m_Name <<" Thread: " << std::this_thread::get_id() << "\n";
    }
    void Join()
    {
        m_Thread.join();
    }
private:
    std::string m_Name;
    std::thread m_Thread;
    int m_PipeNum;
};

void WriteToPipe(const int writeFd, const int numMessages)
{
    using namespace std::chrono_literals;
    std::stringstream message[numMessages];

    for (int i=0; i<numMessages; i++)
    {
        std::this_thread::sleep_for(1s);
        for (int j=0; j<i+1; j++)
        {
            message[i] << "Pika! ";
        }
        std::string stringMessage = message[i].str();
        ThreadMessage messageWritten = {i, stringMessage};
        write(writeFd, &messageWritten, sizeof(messageWritten));
        std::cout << "[Writer] Message " << i+1 << " Sented \n";  
    }
}

void ReadFromPipe(const int readerFd, const int numMessages)
{
    std::stringstream messages[numMessages];

    for (int i=0; i < numMessages ; i++)
    {
        ThreadMessage messageReceived;
        read(readerFd, &messageReceived, sizeof(messageReceived));
        std::cout << "[Reader] Message " << i+1 << " was " << messageReceived.message <<"\n";  

    }

}

int main()
{
    Timer timer;

    int pipeFds[2];
    const int numMessages = 2;

    if (pipe(pipeFds) == -1) {
        std::cerr << "Pipe creation failed!\n";
        return 1;
    }

    PipedThread writerThread = {"Writer", pipeFds[1], numMessages, WriteToPipe}; // 1 is writer
    PipedThread readerThread = {"Reader", pipeFds[0], numMessages, ReadFromPipe};  // 0 is reader

    writerThread.Join();
    readerThread.Join();
}

The expected output will be as follows:

Initiated Writer Thread: 0x1ef558c00
Initiated Reader Thread: 0x1ef558c00
[Writer] Message 1 Sented 
[Reader] Message 1 was Pika! 
[Writer] Message 2 Sented 
[Reader] Message 2 was Pika! Pika! 
Process Ended in 2.00611 [s].

Message Queue

Message queues allow multiple processes to exchange discrete messages asynchronously. This mechanism ensures structured communication and enables better decoupling of process execution.

 
sequenceDiagram
    participant Process A
    participant Process B
    participant Process C
    participant Message Queue
    participant Process D
    participant Process E

    Process A->>Message Queue: Send Message 1
    Process B->>Message Queue: Send Message 2
    Process C->>Message Queue: Send Message 3
    
    Message Queue-->>Process D: Deliver Message 1
    Message Queue-->>Process E: Deliver Message 2
    Message Queue-->>Process D: Deliver Message 3

    Process D->>Process D: Process Message 1 & 3
    Process E->>Process E: Process Message 2

Python Example

from queue import Queue
from threading import Thread, current_thread
from utils import Timer

class QueueSwaper(Thread):
    def __init__(self, queue_in: Queue, queue_out: Queue, id: int):
        super().__init__(name=str(id))
        self.queue_in = queue_in
        self.queue_out = queue_out

    def run(self) -> None:
        while not self.queue_in.empty():
            item = self.queue_in.get()
            self.queue_out.put(f"Thread-{current_thread}: Data - {item}")


def main(num_items:int, num_threads: int) -> None:
    queue_in = Queue()
    queue_out = Queue()

    for item_number in range(num_items):
        queue_in.put(item_number)

    threads = []

    for thread_idx in range(num_threads):
        thread_ = QueueSwaper(queue_in, queue_out, thread_idx + 1)
        threads.append(thread_)

    for thread_ in threads:
        thread_.start()

    for thread_ in threads:
        thread_.join()

    for item in queue_out.queue:
        print(item)

if "__main__" == __name__:
    num_threads: int   = 2
    num_items: int     = 65545
    timer: Timer       = Timer()
    
    main(num_items, num_threads)

C++ Example

#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include <string>
#include <queue>
#include <deque>
#include <array>
#include <sstream>
#include <atomic>

#include "Timer.h"

struct QueueThreadSafe
{
    QueueThreadSafe(std::string id)
    : m_ID(id), m_Done(false) {}

    std::string m_ID;
    std::deque<std::string> m_Data;
    std::mutex m_Mutex;
    std::condition_variable m_CV;
    std::atomic<bool> m_Done;
};

void ConsumeTime(QueueThreadSafe& queue)
{
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(1s);
    std::cout << "Thread ["<< std::this_thread::get_id() << "] finalizes the job. (spends some time) \n"; 
}

void SwapQueue(QueueThreadSafe& srcQueue, QueueThreadSafe& destQueue) {
    // Timer timer;
    std::thread::id threadID = std::this_thread::get_id();
    
    while (true) {
        std::string data;
        {
            std::unique_lock<std::mutex> src_lk(srcQueue.m_Mutex);
            srcQueue.m_CV.wait(
                src_lk, [&srcQueue] { return !srcQueue.m_Data.empty() || srcQueue.m_Done; }
            );

            if (srcQueue.m_Data.empty() && srcQueue.m_Done) {
                break;
            }

            data = srcQueue.m_Data.front();
            srcQueue.m_Data.pop_front();
        }

        std::stringstream tempStr;
        tempStr << "[" << threadID << "] :" << data;
        std::string processedData = tempStr.str();

        {
            std::lock_guard<std::mutex> dest_lk(destQueue.m_Mutex);
            destQueue.m_Data.push_back(processedData);
        }
        destQueue.m_CV.notify_one();
    }
    ConsumeTime(destQueue);
}

int main()
{
    Timer timer;
    const int numThreads = 2;
    const int numDummyStrings = 100;

    QueueThreadSafe srcQueue("srcQueue");
    QueueThreadSafe destQueue("destQueue");

    for (int idx = 0; idx < numDummyStrings; idx++)
    {
        std::stringstream tempStr;
        tempStr << "Data - " << idx;
        srcQueue.m_Data.push_back(tempStr.str());
    }

    std::array<std::thread, numThreads> threads;

    // Start worker threads
    for (std::thread& thread : threads)
    {
        thread = std::thread(SwapQueue, std::ref(srcQueue), std::ref(destQueue));
    }

    {
        std::lock_guard<std::mutex> lock(srcQueue.m_Mutex);
        srcQueue.m_Done = true; // Mark queue as done
    }
    srcQueue.m_CV.notify_all(); // Wake up all threads

    // Wait for all threads to finish
    for (auto& thread : threads)
    {
        thread.join();
    }

    // Print results
    for (auto& datum : destQueue.m_Data)
    {
        std::cout << datum << "\n";
    }

    return 0;
}

The expected output would be as follows:

Thread [0x16b81f000] finalizes the job. (spends some time) 
Thread [0x16b793000] finalizes the job. (spends some time) 
[0x16b793000] :Data - 0
[0x16b81f000] :Data - 1
[0x16b793000] :Data - 2
[0x16b793000] :Data - 4
[0x16b81f000] :Data - 3
[0x16b793000] :Data - 5
...
[0x16b793000] :Data - 99
Process Ended in 1.00558 [s].

Socket

Sockets provide communication between processes over a network or within the same machine. They support both connection-oriented (TCP) and connectionless (UDP) communication models, making them suitable for distributed systems. In this case, we use local socket by using AF_UNIX to communicate between threads.

 
sequenceDiagram
    participant Process A
    participant Socket
    participant Process B

    Process A->>Socket: Connect to Process B
    Process B-->>Socket: Accept Connection

    Process A->>Socket: Send Data (Message 1)
    Socket-->>Process B: Receive Data (Message 1)
    
    Process B->>Socket: Send Response (Message 2)
    Socket-->>Process A: Receive Response (Message 2)

Using multiple sockets, we can provide typical web server architecture, which will be discussed in the future.

Python Example

import os
import socket
from threading import Thread
import time

SOCKET_FILE = "./mailBox"

class Sender(Thread):
    def __init__(self):
        super().__init__()
        self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    
    def run(self):
        self.client.connect(SOCKET_FILE)
        messages = ["Hello World!", " ", "Why so serious?"]
        with self.client:
            for message in messages:
                self.client.sendall(message.encode())

class Receiver(Thread):
    def __init__(self):
        super().__init__()
        self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    
    def run(self):
        self.server.bind(SOCKET_FILE)
        self.server.listen()
        conn, addr = self.server.accept()

        while True:
            data = conn.recv(1024)
            if not data:
                break
            message = data.decode()
            print(message)
        
        self.server.close()

def remove_socket():
    if os.path.exists(SOCKET_FILE):
        os.remove(SOCKET_FILE)

def main():
    remove_socket()
    
    receiver : Thread = Receiver()
    sender   : Thread = Sender()

    receiver.start()
    time.sleep(1)
    sender.start()

    receiver.join()
    sender.join()

    remove_socket()

if "__main__" == __name__:
    main()

C++ Example

#include <iostream>
#include <cstring>  // For memset()
#include <sys/socket.h>
#include <sys/un.h> // For local file sockets
#include <unistd.h> // For close(), unlink()
#include <thread>
#include <chrono>

const char* SOCKET_FILE_PATH = "./mailBox";

class Client
{
public:
    Client()
    {
        m_Socket = socket(AF_UNIX, SOCK_STREAM, 0);
        if (m_Socket == -1)
        {
            perror("Client: Socket creation failed");
            exit(EXIT_FAILURE);
        }

        memset(&m_Client, 0, sizeof(m_Client));
        m_Client.sun_family = AF_UNIX;
        strncpy(m_Client.sun_path, SOCKET_FILE_PATH, sizeof(m_Client.sun_path) - 1);
    }

    void SendMessageLoop(int numMessages)
    {
        int messageCount = 1;
        while (messageCount < numMessages)
        {
            int sock = socket(AF_UNIX, SOCK_STREAM, 0);
            if (connect(sock, (struct sockaddr*)&m_Client, sizeof(m_Client)) == -1)
            {
                perror("Client: Connection failed");
                close(sock);
                return;
            }

            std::string message = "Message " + std::to_string(messageCount++);
            send(sock, message.c_str(), message.size(), 0);
            close(sock);

            std::this_thread::sleep_for(std::chrono::milliseconds(60));
        }
        ++messageCount;
    }

private:
    int m_Socket;
    struct sockaddr_un m_Client;
};

class Server
{
public:
    Server()
    {
        m_Socket = socket(AF_UNIX, SOCK_STREAM, 0);
        if (m_Socket == -1)
        {
            perror("Server: Socket creation failed");
            exit(EXIT_FAILURE);
        }

        memset(&m_Server, 0, sizeof(m_Server));
        m_Server.sun_family = AF_UNIX;
        strncpy(m_Server.sun_path, SOCKET_FILE_PATH, sizeof(m_Server.sun_path) - 1);

        unlink(SOCKET_FILE_PATH);

        if (bind(m_Socket, (struct sockaddr*)&m_Server, sizeof(m_Server)) == -1)
        {
            perror("Server: Bind failed");
            close(m_Socket);
            exit(EXIT_FAILURE);
        }

        if (listen(m_Socket, 5) == -1)
        {
            perror("Server: Listen failed");
            close(m_Socket);
            exit(EXIT_FAILURE);
        }
    }

    ~Server()
    {
        close(m_Socket);
        unlink(SOCKET_FILE_PATH);
    }

    void ReceiveMessage()
    {
        while (true)
        {
            std::cout << "Server: Waiting for a client...\n";

            int clientSocket = accept(m_Socket, nullptr, nullptr);
            if (clientSocket == -1)
            {
                perror("Server: Accept failed");
                continue;
            }

            char messageBuffer[1024] = {0};
            recv(clientSocket, messageBuffer, sizeof(messageBuffer), 0);
            std::cout << "Server received: " << messageBuffer << "\n";

            close(clientSocket);
        }
    }

private:
    int m_Socket;
    struct sockaddr_un m_Server;
};

int main()
{
    Server server;
    std::thread serverThread(&Server::ReceiveMessage, &server); // not server.ReceiveMessage (binded funciton only can be used as function call)

    sleep(1); // system sleep

    Client client;
    std::thread clientThread(&Client::SendMessageLoop, &client, 10);

    serverThread.join();
    clientThread.join();

}

The expected output would be as follows:

Server: Waiting for a client...
Server received: Message 1
Server: Waiting for a client...
Server received: Message 2
Server: Waiting for a client...
Server received: Message 3
Server: Waiting for a client...
Server received: Message 4
Server: Waiting for a client...
Server received: Message 5
Server: Waiting for a client...
Server received: Message 6
Server: Waiting for a client...
Server received: Message 7
Server: Waiting for a client...
Server received: Message 8
Server: Waiting for a client...
Server received: Message 9
Server: Waiting for a client...

Conclusion

Inter-Process Communication (IPC) is fundamental to concurrent programming, allowing processes and threads to coordinate efficiently. Message-passing IPC techniques provide better isolation and modularity at the cost of high-performance compared to shared-memory IPC.