Data Decomposition

Data decomposition is achieved by dividing the data / works into chunks properly. However, we’ve already trained these decomposition alot. Generally, we could use loop, fork-join, threadpool, map and reduce patterns (solely or combined).

Table of contents

Loop-level parallelism

Loop-level parallelism is distributing independent interactions of a loop accross multiple tasks. We’ve done this a lot in the beginning of introducing multithreading or multiprocessing.

Loop-level in Python

import time
import threading
import typing as T

def waste_cpu(idx:int, item:int, out_arr:T.List[int]):
    out_arr[idx] = item ** 2
    time.sleep(2)

def loop_level_parallelism():
    NUM_ARR = 10
    input_arr = list(range(NUM_ARR))
    output_arr = [None] * NUM_ARR

    threads = []
    for idx, item in enumerate(input_arr):
        t = threading.Thread(target=waste_cpu, args=(idx, item, output_arr))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    print(output_arr)

if "__main__" == __name__:
    loop_level_parallelism()

Loop-level in C++

#include<array>
#include<chrono>
#include<iostream>
#include<thread>

void WasteCpu(int idx, int item, std::array<int, 10>& outArr)
{
  using namespace std::chrono_literals;
    outArr[idx] = item * item;
    std::this_thread::sleep_for(2s);
}

int main()
{
    std::array<int, 10> inArr;
    std::array<int, 10> outArr;
    std::array<std::thread, 10> threads;

    for (size_t i=0; i<10; ++i)
    {
        inArr[i] = i;
    }

    for (int i=0; i<10; ++i)
    {
        threads[i] = std::thread(WasteCpu, i, inArr[i], std::ref(outArr));
    }

    for (auto& t: threads)
    {
        t.join();
    }

    for (int& item : outArr)
    {
        std::cout << item << " ";
    }
    std::cout << std::endl;
}

Map reduce (using explicit fork/join)

#include <iostream>
#include <mutex>
#include <thread>
#include <random>
#include <vector>
#include <unordered_map>

struct Task
{
    int ID;
    int Input;
    void (*Callback) (int, int);
};

std::unordered_map<int, std::vector<int>> results;
std::mutex resultMutex;
std::mutex printMutex;

void ProcessTask(const Task& task)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    int result = task.Input * 2;
    task.Callback(task.ID, result);
}

void CategorizeBy4(int taskID, int result)
{
    int category = result % 4;
    {
        std::lock_guard<std::mutex> lk(resultMutex);
        results[category].push_back(result);
    }
    {
        std::lock_guard<std::mutex> lk2(printMutex);
        std::cout << "Task " << taskID
          << " completed with result : " << result << "\n";
    }
}

int main()
{
    std::vector<Task> tasks;
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<int> distrib(1, 100);

    for (int i=0; i<20; ++i)
    {
        tasks.push_back({i, distrib(gen), CategorizeBy4});
    }

    std::vector<std::thread> threads;
    for (const auto& task : tasks)
    {
        threads.emplace_back(ProcessTask, task);
    }

    for (auto& thread: threads)
    {
        if (thread.joinable())
        {
            thread.join();
        }
    }

    for (const auto& [category, values]: results)
    {
        std::cout << "Category " << category << ": ";
        for (int value: values)
        {
            std::cout << value << " ";
        }
        std::cout << std::endl;
    }
}

Map reduce (using threadpool)

#include <algorithm>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <unordered_map>
#include <condition_variable>
#include <queue>
#include <random>

using Summary = std::unordered_map<int, int>;

class ThreadPool {
public:
    ThreadPool(size_t numThreads) : m_Stop(false) {
        for (size_t i = 0; i < numThreads; ++i) {
            m_Workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lk(m_QueueMutex);
                        m_CV.wait(lk, [this] { return m_Stop || !m_Tasks.empty(); });
                        if (m_Stop && m_Tasks.empty()) {
                            return;
                        }
                        task = std::move(m_Tasks.front());
                        m_Tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lk(m_QueueMutex);
            m_Stop = true;
        }
        m_CV.notify_all();
        for (std::thread& worker : m_Workers) {
            worker.join();
        }
    }

    template <class Func, class... Args>
    void Enqueue(Func&& f, Args&&... args) {
        {
            std::unique_lock<std::mutex> lk(m_QueueMutex);
            if (m_Stop) {
                throw std::runtime_error("Enqueue stopped");
            }
            m_Tasks.emplace(std::bind(std::forward<Func>(f), std::forward<Args>(args)...));
        }
        m_CV.notify_one();
    }

private:
    std::vector<std::thread> m_Workers;
    std::queue<std::function<void()>> m_Tasks;
    std::mutex m_QueueMutex;
    std::condition_variable m_CV;
    bool m_Stop;
};

Summary ProcessPile(const std::vector<int>& pile) {
    Summary summary;
    for (int vote : pile) {
        summary[vote]++;
    }
    return summary;
}

Summary VoteUp(const std::vector<int>& pile, int numWorkers = 4) {
    int voteCount = pile.size();
    int unitWorkload = voteCount / numWorkers;
    std::vector<std::vector<int>> votePiles(numWorkers);
    for (int i = 0; i < numWorkers; ++i) {
        int start = i * unitWorkload;
        int end = (i == numWorkers - 1) ? voteCount : (i + 1) * unitWorkload;
        votePiles[i] = std::vector<int>(pile.begin() + start, pile.begin() + end);
    }

    ThreadPool pool(numWorkers);
    std::vector<Summary> workerSummaries(numWorkers);
    std::mutex summaryMutex;
    std::vector<std::pair<Summary, int>> results;

    std::mutex tasksMutex;
    std::condition_variable tasksCV;
    int tasksCompleted = 0;

    for (int i = 0; i < numWorkers; ++i) {
        pool.Enqueue([&, i]() {
            Summary localSummary = ProcessPile(votePiles[i]);
            {
                std::lock_guard<std::mutex> lk(summaryMutex);
                results.emplace_back(localSummary, i);
            }
            {
                std::lock_guard<std::mutex> lk(tasksMutex);
                tasksCompleted++;
            }
            tasksCV.notify_one();
        });
    }

    {
        std::unique_lock<std::mutex> lk(tasksMutex);
        tasksCV.wait(lk, [&] { return tasksCompleted == numWorkers; });
    }

    std::sort(results.begin(), results.end(), [](const auto& a, const auto& b) { return a.second < b.second; });

    for (int i = 0; i < numWorkers; ++i) {
        workerSummaries[i] = results[i].first;
        std::cout << "Votes from worker " << i << ": {";
        for (auto const& [key, val] : workerSummaries[i]) {
            std::cout << key << " : " << val << ", ";
        }
        std::cout << "}\n";
    }

    Summary totSummary;
    for (const auto& workerSummary : workerSummaries) {
        for (const auto& [key, val] : workerSummary) {
            totSummary[key] += val;
        }
    }
    std::cout << "Total number of votes: {";
    for (auto const& [key, val] : totSummary) {
        std::cout << key << ":" << val << ", ";
    }
    std::cout << "}" << std::endl;
    return totSummary;
}

int main() {
    int numCandidates = 3;
    int numVoters = 1000000;
    std::vector<int> pile(numVoters);

    std::random_device randDevice;
    std::mt19937 gen(randDevice());
    std::uniform_int_distribution<> uniformDist(1, numCandidates);

    std::generate(pile.begin(), pile.end(), [&]() { return uniformDist(gen); });

    Summary counts = VoteUp(pile);

    return 0;
}