Data Decomposition
Data decomposition is achieved by dividing the data / works into chunks properly. However, we’ve already trained these decomposition alot. Generally, we could use loop, fork-join, threadpool, map and reduce patterns (solely or combined).
Table of contents
Loop-level parallelism
Loop-level parallelism is distributing independent interactions of a loop accross multiple tasks. We’ve done this a lot in the beginning of introducing multithreading or multiprocessing.
Loop-level in Python
import time
import threading
import typing as T
def waste_cpu(idx:int, item:int, out_arr:T.List[int]):
out_arr[idx] = item ** 2
time.sleep(2)
def loop_level_parallelism():
NUM_ARR = 10
input_arr = list(range(NUM_ARR))
output_arr = [None] * NUM_ARR
threads = []
for idx, item in enumerate(input_arr):
t = threading.Thread(target=waste_cpu, args=(idx, item, output_arr))
t.start()
threads.append(t)
for t in threads:
t.join()
print(output_arr)
if "__main__" == __name__:
loop_level_parallelism()
Loop-level in C++
#include<array>
#include<chrono>
#include<iostream>
#include<thread>
void WasteCpu(int idx, int item, std::array<int, 10>& outArr)
{
using namespace std::chrono_literals;
outArr[idx] = item * item;
std::this_thread::sleep_for(2s);
}
int main()
{
std::array<int, 10> inArr;
std::array<int, 10> outArr;
std::array<std::thread, 10> threads;
for (size_t i=0; i<10; ++i)
{
inArr[i] = i;
}
for (int i=0; i<10; ++i)
{
threads[i] = std::thread(WasteCpu, i, inArr[i], std::ref(outArr));
}
for (auto& t: threads)
{
t.join();
}
for (int& item : outArr)
{
std::cout << item << " ";
}
std::cout << std::endl;
}
Map reduce (using explicit fork/join)
#include <iostream>
#include <mutex>
#include <thread>
#include <random>
#include <vector>
#include <unordered_map>
struct Task
{
int ID;
int Input;
void (*Callback) (int, int);
};
std::unordered_map<int, std::vector<int>> results;
std::mutex resultMutex;
std::mutex printMutex;
void ProcessTask(const Task& task)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
int result = task.Input * 2;
task.Callback(task.ID, result);
}
void CategorizeBy4(int taskID, int result)
{
int category = result % 4;
{
std::lock_guard<std::mutex> lk(resultMutex);
results[category].push_back(result);
}
{
std::lock_guard<std::mutex> lk2(printMutex);
std::cout << "Task " << taskID
<< " completed with result : " << result << "\n";
}
}
int main()
{
std::vector<Task> tasks;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int> distrib(1, 100);
for (int i=0; i<20; ++i)
{
tasks.push_back({i, distrib(gen), CategorizeBy4});
}
std::vector<std::thread> threads;
for (const auto& task : tasks)
{
threads.emplace_back(ProcessTask, task);
}
for (auto& thread: threads)
{
if (thread.joinable())
{
thread.join();
}
}
for (const auto& [category, values]: results)
{
std::cout << "Category " << category << ": ";
for (int value: values)
{
std::cout << value << " ";
}
std::cout << std::endl;
}
}
Map reduce (using threadpool)
#include <algorithm>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <unordered_map>
#include <condition_variable>
#include <queue>
#include <random>
using Summary = std::unordered_map<int, int>;
class ThreadPool {
public:
ThreadPool(size_t numThreads) : m_Stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
m_Workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lk(m_QueueMutex);
m_CV.wait(lk, [this] { return m_Stop || !m_Tasks.empty(); });
if (m_Stop && m_Tasks.empty()) {
return;
}
task = std::move(m_Tasks.front());
m_Tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lk(m_QueueMutex);
m_Stop = true;
}
m_CV.notify_all();
for (std::thread& worker : m_Workers) {
worker.join();
}
}
template <class Func, class... Args>
void Enqueue(Func&& f, Args&&... args) {
{
std::unique_lock<std::mutex> lk(m_QueueMutex);
if (m_Stop) {
throw std::runtime_error("Enqueue stopped");
}
m_Tasks.emplace(std::bind(std::forward<Func>(f), std::forward<Args>(args)...));
}
m_CV.notify_one();
}
private:
std::vector<std::thread> m_Workers;
std::queue<std::function<void()>> m_Tasks;
std::mutex m_QueueMutex;
std::condition_variable m_CV;
bool m_Stop;
};
Summary ProcessPile(const std::vector<int>& pile) {
Summary summary;
for (int vote : pile) {
summary[vote]++;
}
return summary;
}
Summary VoteUp(const std::vector<int>& pile, int numWorkers = 4) {
int voteCount = pile.size();
int unitWorkload = voteCount / numWorkers;
std::vector<std::vector<int>> votePiles(numWorkers);
for (int i = 0; i < numWorkers; ++i) {
int start = i * unitWorkload;
int end = (i == numWorkers - 1) ? voteCount : (i + 1) * unitWorkload;
votePiles[i] = std::vector<int>(pile.begin() + start, pile.begin() + end);
}
ThreadPool pool(numWorkers);
std::vector<Summary> workerSummaries(numWorkers);
std::mutex summaryMutex;
std::vector<std::pair<Summary, int>> results;
std::mutex tasksMutex;
std::condition_variable tasksCV;
int tasksCompleted = 0;
for (int i = 0; i < numWorkers; ++i) {
pool.Enqueue([&, i]() {
Summary localSummary = ProcessPile(votePiles[i]);
{
std::lock_guard<std::mutex> lk(summaryMutex);
results.emplace_back(localSummary, i);
}
{
std::lock_guard<std::mutex> lk(tasksMutex);
tasksCompleted++;
}
tasksCV.notify_one();
});
}
{
std::unique_lock<std::mutex> lk(tasksMutex);
tasksCV.wait(lk, [&] { return tasksCompleted == numWorkers; });
}
std::sort(results.begin(), results.end(), [](const auto& a, const auto& b) { return a.second < b.second; });
for (int i = 0; i < numWorkers; ++i) {
workerSummaries[i] = results[i].first;
std::cout << "Votes from worker " << i << ": {";
for (auto const& [key, val] : workerSummaries[i]) {
std::cout << key << " : " << val << ", ";
}
std::cout << "}\n";
}
Summary totSummary;
for (const auto& workerSummary : workerSummaries) {
for (const auto& [key, val] : workerSummary) {
totSummary[key] += val;
}
}
std::cout << "Total number of votes: {";
for (auto const& [key, val] : totSummary) {
std::cout << key << ":" << val << ", ";
}
std::cout << "}" << std::endl;
return totSummary;
}
int main() {
int numCandidates = 3;
int numVoters = 1000000;
std::vector<int> pile(numVoters);
std::random_device randDevice;
std::mt19937 gen(randDevice());
std::uniform_int_distribution<> uniformDist(1, numCandidates);
std::generate(pile.begin(), pile.end(), [&]() { return uniformDist(gen); });
Summary counts = VoteUp(pile);
return 0;
}