Task Decomposition

As in the metaphor (Designer, Manufacturer and Manager), we can describe the task decomposition is way to hire the workers into designing / manufacturing and managing and work concurrently as possible.

Table of contents

Pipeline Pattern

Pipeline pattern is the most common in task decomposition.

We design the steps be in different resources (thread / processes ) and then control the internal outputs/artifacts between the steps by IPC or thread-safe queues, because the steps are usually not finished at the same time.

Dependency Graph Example

Python Example

import time
import threading
from queue import Queue
import typing as T
from utils import Timer

Workload = T.AnyStr

class SharedCounter:
    def __init__(self, initial_value:int = 0):
        self._value = initial_value
        self._mutex = threading.Lock()
    def increment(self):
        with self._mutex:
            self._value += 1
    def decrement(self):
        with self._mutex:
            self._value -= 1
    def get(self):
        with self._mutex:
            return self._value

class Worker(threading.Thread):
    def __init__(
        self,
        in_queue: Queue[Workload],
        out_queue: T.Optional[Queue[Workload]],
        job_type: str,
        turnaround_time: int = 4,
        num_projects: int = 4,  # Ensure workers know the number of projects
        process_counter: T.Optional[SharedCounter] = None
    ):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.job_type = job_type
        self.turnaround_time = turnaround_time
        self.num_projects = num_projects
        if process_counter:
            self.process_counter = process_counter
        else:
            self.process_counter = SharedCounter(initial_value=0)

    def run(self) -> None:
        while self.process_counter.get() < self.num_projects:
            try:
                workload = self.in_queue.get(timeout=1)  # Avoid infinite blocking
            except Exception:
                continue  # Keep checking if no task available

            if workload is None:
                self.in_queue.task_done()
                break  # Stop if sentinel received

            print(f"{self.job_type} .... {workload}")
            time.sleep(self.turnaround_time)
            self.process_counter.increment()

            if self.out_queue:
                self.out_queue.put(workload)

            self.in_queue.task_done()

class Pipeline:
    def __init__(self, num_projects: int = 4):
        self.num_projects = num_projects

    def assemble_leathers(self) -> Queue[Workload]:
        projects_in: Queue[Workload] = Queue()
        for idx in range(self.num_projects):
            projects_in.put(f"Shoes #{idx}")
        return projects_in

    def run_concurrently(self) -> None:
        to_be_planned = self.assemble_leathers()
        to_be_programmed = Queue()
        to_be_finalized = Queue()

        designer = Worker(to_be_planned, to_be_programmed, "Designer", turnaround_time=0.2, num_projects=self.num_projects)

        # Assume that data decomposition. we hired multiple manufacturers.
        shared_counter = SharedCounter(initial_value=0) # use as an atomic value in C++
        # suppose that manufacturer1 is more skillful than manufacturer2 
        manufacturer1 = Worker(to_be_programmed, to_be_finalized, "Manufacturer 1", turnaround_time=0.4, num_projects=self.num_projects, process_counter=shared_counter)
        manufacturer2 = Worker(to_be_programmed, to_be_finalized, "Manufacturer 2", turnaround_time=0.5, num_projects=self.num_projects, process_counter=shared_counter)

        manager = Worker(to_be_finalized, None, "Manager", turnaround_time=0.1, num_projects=self.num_projects)

        designer.start()
        manufacturer1.start()
        manufacturer2.start()
        manager.start()

        # Wait for all tasks to be processed
        designer.join()
        manufacturer1.join()
        manufacturer2.join()
        manager.join()


if __name__ == "__main__":
    with Timer() as timer:
        pipeline = Pipeline(num_projects=100)
        pipeline.run_concurrently()

C++ Example

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <memory>
#include <chrono>
#include <string>

template<typename T>
class ThreadSafeQueue
{
public:
    void Push(T item)
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        m_Queue.push(std::move(item));
        m_CV.notify_one();
    }
    
    bool Pop(T& item, std::chrono::milliseconds timeout)
    {
        std::unique_lock<std::mutex> lock(m_Mutex);
        if (m_CV.wait_for(lock, timeout, [this] { return !m_Queue.empty(); }))
        {
            item = std::move(m_Queue.front());
            m_Queue.pop();
            return true;
        }
        return false;
    }

private:
    std::queue<T> m_Queue;
    mutable std::mutex m_Mutex;
    std::condition_variable m_CV;
};

class SharedCounter
{
public:
    SharedCounter(int initialValue = 0)
    : m_Value(initialValue)
    {}
    void Increment()
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        ++m_Value;
    }
    void Decrement()
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        --m_Value;
    }
    int Get()
    {
        std::lock_guard<std::mutex> lock(m_Mutex);
        return m_Value;
    }
private:
    int m_Value;
    std::mutex m_Mutex;

};

class Worker
{
public:
    Worker(
        ThreadSafeQueue<std::string>& queueIn,
        ThreadSafeQueue<std::string>* queueOut,
        std::string jobType,
        std::chrono::milliseconds turnaroundTime,
        int numProjects,
        std::shared_ptr<SharedCounter> processCounter = nullptr
    )
    : m_QueueIn(queueIn)
    , m_QueueOut(queueOut)
    , m_JobType(std::move(jobType))
    , m_TurnaroundTime(turnaroundTime)
    , m_NumProjects(numProjects)
    , m_ProcessCounter(processCounter ? processCounter : std::make_shared<SharedCounter>())
    {}

    void Start()
    {
        m_Thread = std::thread(&Worker::run, this);
    }

    void Join()
    {
        if (m_Thread.joinable())
        {
            m_Thread.join();
        }
    }

    ~Worker()
    {
        Join();
    }

private:
    void run()
    {
        while (m_ProcessCounter->Get() < m_NumProjects)
        {
            std::string workload;
            bool isPopped = m_QueueIn.Pop(workload, std::chrono::seconds(1));
            if (isPopped)
            {
                std::cout << m_JobType << " .... " << workload << std::endl;
                std::this_thread::sleep_for(m_TurnaroundTime);
                m_ProcessCounter->Increment();
                if (m_QueueOut)
                {
                    m_QueueOut->Push(workload);
                }
            }
        }
    }
private:
    ThreadSafeQueue<std::string>& m_QueueIn;
    ThreadSafeQueue<std::string>* m_QueueOut;
    std::string m_JobType;
    std::chrono::milliseconds m_TurnaroundTime;
    int m_NumProjects;
    std::shared_ptr<SharedCounter> m_ProcessCounter;
    std::thread m_Thread;

};

class Pipeline
{
public:
    Pipeline(int numProjects)
    : m_NumProjects(numProjects)
    {}

    void RunConcurrently()
    {
        ThreadSafeQueue<std::string> toBePlanned;
        for (int idx = 0; idx < m_NumProjects; ++idx)
        {
            toBePlanned.Push("Shoes #" + std::to_string(idx));
        }

        ThreadSafeQueue<std::string> toBeProgrammed;
        ThreadSafeQueue<std::string> toBeFinalized;

        Worker designer(toBePlanned, &toBeProgrammed, "Designer", std::chrono::milliseconds(200), m_NumProjects);

        auto sharedCounter = std::make_shared<SharedCounter>();
        Worker manufacturer1(toBeProgrammed, &toBeFinalized, "Manufacturer 1", std::chrono::milliseconds(400), m_NumProjects, sharedCounter);
        Worker manufacturer2(toBeProgrammed, &toBeFinalized, "Manufacturer 2", std::chrono::milliseconds(500), m_NumProjects, sharedCounter);

        Worker manager(toBeFinalized, nullptr, "Manager", std::chrono::milliseconds(100), m_NumProjects);

        designer.Start();
        manufacturer1.Start();
        manufacturer2.Start();
        manager.Start();

        designer.Join();
        manufacturer1.Join();
        manufacturer2.Join();
        manager.Join();
    }
private:
    int m_NumProjects;
};

class Timer
{    
public:
    Timer()
    {
        start = std::chrono::high_resolution_clock::now();
    }

    ~Timer()
    {
        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        std::cout << "Time taken: " << duration.count() << " milliseconds" << std::endl;
    }
private:
    std::chrono::time_point<std::chrono::high_resolution_clock> start;
};

int main() {
    {
        Timer timer;
        Pipeline pipeline(100);
        pipeline.RunConcurrently();
    }
    return 0;
}

Conclusion

Pipeline pattern is often used in the big data ETL (Extract, Transform, Load) process. As you can have noticed, the pipeline allows us to limit the number of threads, such as in the thread pool. This is why it is most useful when the number of shared resources is limited.