From d96548e6880b422cd642bb0dc14eb6f90990b84a Mon Sep 17 00:00:00 2001 From: Evan West Date: Sat, 17 Jun 2023 18:56:08 -0400 Subject: [PATCH 1/7] make parallel dsu work and benchmark --- include/dsu.h | 42 ++++++++++++++----- tools/benchmark/graphcc_bench.cpp | 70 +++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 11 deletions(-) diff --git a/include/dsu.h b/include/dsu.h index 2642437b..c010bf4f 100644 --- a/include/dsu.h +++ b/include/dsu.h @@ -11,15 +11,24 @@ struct DSUMergeRet { template class DisjointSetUnion { - std::vector parent; - std::vector size; + // number of items in the DSU + T n; + + // parent and size arrays + T* parent; + T* size; public: - DisjointSetUnion(T n) : parent(n), size(n, 1) { + DisjointSetUnion(T n) : n(n), parent(new T[n]), size(new T[n]) { for (T i = 0; i < n; i++) { parent[i] = i; } } + ~DisjointSetUnion() { + delete[] parent; + delete[] size; + } + inline T find_root(T u) { while(parent[parent[u]] != u) { parent[u] = parent[parent[u]]; @@ -40,7 +49,7 @@ class DisjointSetUnion { } inline void reset() { - for (T i = 0; i < parent.size(); i++) { + for (T i = 0; i < n; i++) { parent[i] = i; size[i] = 1; } @@ -50,19 +59,30 @@ class DisjointSetUnion { // Disjoint set union that uses atomics to be thread safe // thus is a little slower for single threaded use cases template -class MT_DisjoinSetUnion { +class DisjointSetUnion_MT { private: - std::vector> parent; - std::vector size; + // number of items in the DSU + T n; + + // parent and size arrays + std::atomic* parent; + std::atomic* size; public: - MT_DisjoinSetUnion(T n) : parent(n), size(n, 1) { - for (T i = 0; i < n; i++) + DisjointSetUnion_MT(T n) : n(n), parent(new std::atomic[n]), size(new std::atomic[n]) { + for (T i = 0; i < n; i++) { parent[i] = i; + size[i] = 1; + } + } + + ~DisjointSetUnion_MT() { + delete[] parent; + delete[] size; } inline T find_root(T u) { while (parent[parent[u]] != u) { - parent[u] = parent[parent[u]]; + parent[u] = parent[parent[u]].load(); u = parent[u]; } return u; @@ -84,7 +104,7 @@ class MT_DisjoinSetUnion { } inline void reset() { - for (T i = 0; i < parent.size(); i++) { + for (T i = 0; i < n; i++) { parent[i] = i; size[i] = 1; } diff --git a/tools/benchmark/graphcc_bench.cpp b/tools/benchmark/graphcc_bench.cpp index d01f9711..29a21370 100644 --- a/tools/benchmark/graphcc_bench.cpp +++ b/tools/benchmark/graphcc_bench.cpp @@ -365,4 +365,74 @@ static void BM_DSU_Root(benchmark::State& state) { } BENCHMARK(BM_DSU_Root); +// Benchmark the efficiency of parallel DSU merges +// when the sequence of DSU merges is adversarial +// This means we avoid joining roots wherever possible +static void BM_Parallel_DSU_Adversarial(benchmark::State& state) { + constexpr size_t size_of_dsu = 16 * MB; + + auto rng = std::default_random_engine{}; + + std::vector> updates; + // generate updates + for (size_t iter = 0; ((size_t)2 << iter) <= size_of_dsu; iter++) { + size_t loc_size = 1 << iter; + size_t jump = 2 << iter; + std::vector> new_updates; + for (size_t i = 0; i < size_of_dsu; i += jump) { + new_updates.push_back({i + loc_size - 1, i + loc_size - 1 + jump / 2}); + } + std::shuffle(new_updates.begin(), new_updates.end(), rng); + updates.insert(updates.end(), new_updates.begin(), new_updates.end()); + } + + // Perform merge test + for (auto _ : state) { + DisjointSetUnion_MT dsu(size_of_dsu); +#pragma omp parallel for num_threads(state.range(0)) + for (auto upd : updates) { + dsu.merge(upd.first, upd.second); + } + } + state.counters["Merge_Latency"] = + benchmark::Counter(state.iterations() * updates.size(), + benchmark::Counter::kIsRate | benchmark::Counter::kInvert); +} +BENCHMARK(BM_Parallel_DSU_Adversarial)->RangeMultiplier(2)->Range(1, 8)->UseRealTime(); + +// Benchmark the efficiency of parallel DSU merges +// when the sequence of DSU merges is helpful +// this means we only join roots +static void BM_Parallel_DSU_Root(benchmark::State& state) { + constexpr size_t size_of_dsu = 16 * MB; + + auto rng = std::default_random_engine{}; + + // generate updates + std::vector> updates; + // generate updates + for (size_t iter = 0; ((size_t)2 << iter) <= size_of_dsu; iter++) { + size_t jump = 2 << iter; + std::vector> new_updates; + for (size_t i = 0; i < size_of_dsu; i += jump) { + new_updates.push_back({i, i + jump / 2}); + } + std::shuffle(new_updates.begin(), new_updates.end(), rng); + updates.insert(updates.end(), new_updates.begin(), new_updates.end()); + } + + // Perform merge test + for (auto _ : state) { + DisjointSetUnion_MT dsu(size_of_dsu); +#pragma omp parallel for num_threads(state.range(0)) + for (auto upd : updates) { + dsu.merge(upd.first, upd.second); + } + } + state.counters["Merge_Latency"] = + benchmark::Counter(state.iterations() * updates.size(), + benchmark::Counter::kIsRate | benchmark::Counter::kInvert); +} +BENCHMARK(BM_Parallel_DSU_Root)->RangeMultiplier(2)->Range(1, 8)->UseRealTime(); + BENCHMARK_MAIN(); From 6a8ee289faad86d629cfa1cd2e0475171658d63a Mon Sep 17 00:00:00 2001 From: Evan West Date: Sat, 17 Jun 2023 23:52:35 -0400 Subject: [PATCH 2/7] fix bug when copying a DSU --- include/dsu.h | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/include/dsu.h b/include/dsu.h index c010bf4f..55e46f3f 100644 --- a/include/dsu.h +++ b/include/dsu.h @@ -21,6 +21,7 @@ class DisjointSetUnion { DisjointSetUnion(T n) : n(n), parent(new T[n]), size(new T[n]) { for (T i = 0; i < n; i++) { parent[i] = i; + size[i] = 1; } } @@ -29,6 +30,16 @@ class DisjointSetUnion { delete[] size; } + // make a copy of the DSU + DisjointSetUnion(const DisjointSetUnion &oth) : n(oth.n), parent(new T[n]), size(new T[n]) { + for (T i = 0; i < n; i++) { + parent[i] = oth.parent[i]; + size[i] = oth.size[i]; + } + } + + DisjointSetUnion operator=(const DisjointSetUnion &oth) = delete; + inline T find_root(T u) { while(parent[parent[u]] != u) { parent[u] = parent[parent[u]]; @@ -80,6 +91,14 @@ class DisjointSetUnion_MT { delete[] size; } + // make a copy of the DSU + DisjointSetUnion_MT(const DisjointSetUnion_MT &oth) : n(oth.n), parent(new T[n]), size(new T[n]) { + for (T i = 0; i < n; i++) { + parent[i] = oth.parent[i].load(); + size[i] = oth.size[i].load(); + } + } + inline T find_root(T u) { while (parent[parent[u]] != u) { parent[u] = parent[parent[u]].load(); From 92341ce38cfc2b35cc5524a1fad9740a7d6bfc16 Mon Sep 17 00:00:00 2001 From: Evan West Date: Sun, 18 Jun 2023 09:02:30 -0400 Subject: [PATCH 3/7] add asserts to DSU --- include/dsu.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/include/dsu.h b/include/dsu.h index 55e46f3f..6e9ba327 100644 --- a/include/dsu.h +++ b/include/dsu.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include template struct DSUMergeRet { @@ -41,6 +42,7 @@ class DisjointSetUnion { DisjointSetUnion operator=(const DisjointSetUnion &oth) = delete; inline T find_root(T u) { + assert(0 <= u && u < n); while(parent[parent[u]] != u) { parent[u] = parent[parent[u]]; u = parent[u]; @@ -51,6 +53,8 @@ class DisjointSetUnion { inline DSUMergeRet merge(T u, T v) { T a = find_root(u); T b = find_root(v); + assert(0 <= a && a < n); + assert(0 <= b && b < n); if (a == b) return {false, 0, 0}; if (size[a] < size[b]) std::swap(a,b); @@ -100,6 +104,7 @@ class DisjointSetUnion_MT { } inline T find_root(T u) { + assert(0 <= u && u < n); while (parent[parent[u]] != u) { parent[u] = parent[parent[u]].load(); u = parent[u]; @@ -110,6 +115,8 @@ class DisjointSetUnion_MT { // use CAS in this function to allow for simultaneous merge calls inline DSUMergeRet merge(T u, T v) { while ((u = find_root(u)) != (v = find_root(v))) { + assert(0 <= u && u < n); + assert(0 <= v && v < n); if (size[u] < size[v]) std::swap(u, v); From a941b4d6f44add8a92776e9aa9c1b476f403c05b Mon Sep 17 00:00:00 2001 From: Evan West Date: Sun, 18 Jun 2023 10:02:54 -0400 Subject: [PATCH 4/7] add move constructor to dsu --- include/dsu.h | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/include/dsu.h b/include/dsu.h index 6e9ba327..11aee7da 100644 --- a/include/dsu.h +++ b/include/dsu.h @@ -39,6 +39,13 @@ class DisjointSetUnion { } } + // move the DSU to a new object + DisjointSetUnion(DisjointSetUnion &&oth) : n(oth.n), parent(oth.parent), size(oth.size) { + oth.n = 0; + oth.parent = nullptr; + oth.size = nullptr; + } + DisjointSetUnion operator=(const DisjointSetUnion &oth) = delete; inline T find_root(T u) { @@ -103,6 +110,13 @@ class DisjointSetUnion_MT { } } + // move the DSU to a new object + DisjointSetUnion_MT(DisjointSetUnion_MT &&oth) : n(oth.n), parent(oth.parent), size(oth.size) { + oth.n = 0; + oth.parent = nullptr; + oth.size = nullptr; + } + inline T find_root(T u) { assert(0 <= u && u < n); while (parent[parent[u]] != u) { From 12dbb429f6c0d26f267cdf08e8da357c06851f66 Mon Sep 17 00:00:00 2001 From: Evan West Date: Sun, 18 Jun 2023 14:40:46 -0400 Subject: [PATCH 5/7] another bug fix and tests for DSU --- CMakeLists.txt | 1 + include/dsu.h | 2 +- test/dsu_test.cpp | 72 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 test/dsu_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5603cc09..1552e43c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,6 +128,7 @@ if (BUILD_EXE) test/graph_test.cpp test/sketch_test.cpp test/supernode_test.cpp + test/dsu_test.cpp test/util_test.cpp test/util/file_graph_verifier.cpp test/util/graph_gen.cpp diff --git a/include/dsu.h b/include/dsu.h index 11aee7da..bb443b50 100644 --- a/include/dsu.h +++ b/include/dsu.h @@ -135,7 +135,7 @@ class DisjointSetUnion_MT { std::swap(u, v); // if parent of b has not been modified by another thread -> replace with a - if (std::atomic_compare_exchange_weak(&parent[u], &v, u)) { + if (parent[v].compare_exchange_weak(v, u)) { size[u] += size[v]; return {true, u, v}; } diff --git a/test/dsu_test.cpp b/test/dsu_test.cpp new file mode 100644 index 00000000..80745776 --- /dev/null +++ b/test/dsu_test.cpp @@ -0,0 +1,72 @@ + +#include +#include "types.h" +#include "dsu.h" + +// must be power of 2 +constexpr size_t num_slots = 1024; + +TEST(DSU_Tests, SerialDSU) { + DisjointSetUnion dsu(num_slots); + + // merge fully + size_t active = num_slots; + while (active > 1) { + for (node_id_t i = 0; i < active / 2; i++) { + dsu.merge(i, i + active / 2); + } + for (node_id_t i = 0; i < active / 2; i++) { + ASSERT_EQ(dsu.find_root(i), dsu.find_root(i + active / 2)); + } + active /= 2; + } + + node_id_t root = dsu.find_root(0); + for (node_id_t i = 1; i < num_slots; i++) { + ASSERT_EQ(root, dsu.find_root(i)); + } +} + +TEST(DSU_Tests, DSU_MT_Single_Thread) { + DisjointSetUnion_MT dsu(num_slots); + + // merge fully + size_t active = num_slots; + while (active > 1) { + for (node_id_t i = 0; i < active / 2; i++) { + dsu.merge(i, i + active / 2); + } + for (node_id_t i = 0; i < active / 2; i++) { + ASSERT_EQ(dsu.find_root(i), dsu.find_root(i + active / 2)); + } + active /= 2; + } + + node_id_t root = dsu.find_root(0); + for (node_id_t i = 1; i < num_slots; i++) { + ASSERT_EQ(root, dsu.find_root(i)); + } +} + +TEST(DSU_Tests, DSU_MT_Eight_Threads) { + DisjointSetUnion_MT dsu(num_slots); + + // merge fully + size_t active = num_slots; + while (active > 1) { +#pragma omp parallel for num_threads(8) + for (node_id_t i = 0; i < active / 2; i++) { + dsu.merge(i, i + active / 2); + } + for (node_id_t i = 0; i < active / 2; i++) { + ASSERT_EQ(dsu.find_root(i), dsu.find_root(i + active / 2)); + } + active /= 2; + } + + node_id_t root = dsu.find_root(0); + for (node_id_t i = 1; i < num_slots; i++) { + ASSERT_EQ(root, dsu.find_root(i)); + } +} + From fad54222d29f8976ef3f93f584bb3cc07850da3f Mon Sep 17 00:00:00 2001 From: Evan West Date: Mon, 19 Jun 2023 09:07:05 -0400 Subject: [PATCH 6/7] error checking in range_merge --- src/supernode.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/supernode.cpp b/src/supernode.cpp index 13e96fb4..8d473b96 100644 --- a/src/supernode.cpp +++ b/src/supernode.cpp @@ -114,9 +114,12 @@ void Supernode::merge(Supernode &other) { } void Supernode::range_merge(Supernode& other, size_t start_idx, size_t num_merge) { + // For simplicity we only perform basic error checking here. + // It's up to the caller to ensure they aren't accessing out of + // range for a Supernode valid only in a subset of this range. + if (start_idx >= Supernode::max_sketches) throw OutOfQueriesException(); + sample_idx = std::max(sample_idx, other.sample_idx); - // we trust the caller so whatever they tell us goes here - // hopefully if the caller is incorrect then this will be caught by out_of_queries() merged_sketches = start_idx + num_merge; for (size_t i = sample_idx; i < merged_sketches; i++) (*get_sketch(i))+=(*other.get_sketch(i)); From 79c18147d8ef221d6c93e6787543ca2e7e36e30f Mon Sep 17 00:00:00 2001 From: Evan West Date: Tue, 20 Jun 2023 16:11:32 -0400 Subject: [PATCH 7/7] small adjustments to tests and variable names --- include/dsu.h | 18 +++++++++--------- test/dsu_test.cpp | 27 ++++++++------------------- 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/include/dsu.h b/include/dsu.h index bb443b50..d59c0b12 100644 --- a/include/dsu.h +++ b/include/dsu.h @@ -127,17 +127,17 @@ class DisjointSetUnion_MT { } // use CAS in this function to allow for simultaneous merge calls - inline DSUMergeRet merge(T u, T v) { - while ((u = find_root(u)) != (v = find_root(v))) { - assert(0 <= u && u < n); - assert(0 <= v && v < n); - if (size[u] < size[v]) - std::swap(u, v); + inline DSUMergeRet merge(T a, T b) { + while ((a = find_root(a)) != (b = find_root(b))) { + assert(0 <= a && a < n); + assert(0 <= b && b < n); + if (size[a] < size[b]) + std::swap(a, b); // if parent of b has not been modified by another thread -> replace with a - if (parent[v].compare_exchange_weak(v, u)) { - size[u] += size[v]; - return {true, u, v}; + if (parent[b].compare_exchange_weak(b, a)) { + size[a] += size[b]; + return {true, a, b}; } } return {false, 0, 0}; diff --git a/test/dsu_test.cpp b/test/dsu_test.cpp index 80745776..8c0f02f6 100644 --- a/test/dsu_test.cpp +++ b/test/dsu_test.cpp @@ -4,21 +4,18 @@ #include "dsu.h" // must be power of 2 -constexpr size_t num_slots = 1024; +constexpr size_t slots_power = 16; +constexpr size_t num_slots = 1 << slots_power; TEST(DSU_Tests, SerialDSU) { DisjointSetUnion dsu(num_slots); // merge fully - size_t active = num_slots; - while (active > 1) { + for (node_id_t p = slots_power; p > 0; p--) { + size_t active = 1 << p; for (node_id_t i = 0; i < active / 2; i++) { dsu.merge(i, i + active / 2); } - for (node_id_t i = 0; i < active / 2; i++) { - ASSERT_EQ(dsu.find_root(i), dsu.find_root(i + active / 2)); - } - active /= 2; } node_id_t root = dsu.find_root(0); @@ -31,15 +28,11 @@ TEST(DSU_Tests, DSU_MT_Single_Thread) { DisjointSetUnion_MT dsu(num_slots); // merge fully - size_t active = num_slots; - while (active > 1) { + for (node_id_t p = slots_power; p > 0; p--) { + size_t active = 1 << p; for (node_id_t i = 0; i < active / 2; i++) { dsu.merge(i, i + active / 2); } - for (node_id_t i = 0; i < active / 2; i++) { - ASSERT_EQ(dsu.find_root(i), dsu.find_root(i + active / 2)); - } - active /= 2; } node_id_t root = dsu.find_root(0); @@ -52,16 +45,12 @@ TEST(DSU_Tests, DSU_MT_Eight_Threads) { DisjointSetUnion_MT dsu(num_slots); // merge fully - size_t active = num_slots; - while (active > 1) { #pragma omp parallel for num_threads(8) + for (node_id_t p = slots_power; p > 0; p--) { + size_t active = 1 << p; for (node_id_t i = 0; i < active / 2; i++) { dsu.merge(i, i + active / 2); } - for (node_id_t i = 0; i < active / 2; i++) { - ASSERT_EQ(dsu.find_root(i), dsu.find_root(i + active / 2)); - } - active /= 2; } node_id_t root = dsu.find_root(0);