Skip to content

Commit deb676a

Browse files
authored
Merge pull request #130 from GraphStreamingProject/parallel-dsu
Make parallel DSU work and benchmark it
2 parents 57bbf53 + 79c1814 commit deb676a

File tree

5 files changed

+215
-20
lines changed

5 files changed

+215
-20
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ if (BUILD_EXE)
128128
test/graph_test.cpp
129129
test/sketch_test.cpp
130130
test/supernode_test.cpp
131+
test/dsu_test.cpp
131132
test/util_test.cpp
132133
test/util/file_graph_verifier.cpp
133134
test/util/graph_gen.cpp

include/dsu.h

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include <vector>
33
#include <atomic>
4+
#include <cassert>
45

56
template<class T>
67
struct DSUMergeRet {
@@ -11,16 +12,44 @@ struct DSUMergeRet {
1112

1213
template <class T>
1314
class DisjointSetUnion {
14-
std::vector<T> parent;
15-
std::vector<T> size;
15+
// number of items in the DSU
16+
T n;
17+
18+
// parent and size arrays
19+
T* parent;
20+
T* size;
1621
public:
17-
DisjointSetUnion(T n) : parent(n), size(n, 1) {
22+
DisjointSetUnion(T n) : n(n), parent(new T[n]), size(new T[n]) {
1823
for (T i = 0; i < n; i++) {
1924
parent[i] = i;
25+
size[i] = 1;
26+
}
27+
}
28+
29+
~DisjointSetUnion() {
30+
delete[] parent;
31+
delete[] size;
32+
}
33+
34+
// make a copy of the DSU
35+
DisjointSetUnion(const DisjointSetUnion &oth) : n(oth.n), parent(new T[n]), size(new T[n]) {
36+
for (T i = 0; i < n; i++) {
37+
parent[i] = oth.parent[i];
38+
size[i] = oth.size[i];
2039
}
2140
}
2241

42+
// move the DSU to a new object
43+
DisjointSetUnion(DisjointSetUnion &&oth) : n(oth.n), parent(oth.parent), size(oth.size) {
44+
oth.n = 0;
45+
oth.parent = nullptr;
46+
oth.size = nullptr;
47+
}
48+
49+
DisjointSetUnion operator=(const DisjointSetUnion &oth) = delete;
50+
2351
inline T find_root(T u) {
52+
assert(0 <= u && u < n);
2453
while(parent[parent[u]] != u) {
2554
parent[u] = parent[parent[u]];
2655
u = parent[u];
@@ -31,6 +60,8 @@ class DisjointSetUnion {
3160
inline DSUMergeRet<T> merge(T u, T v) {
3261
T a = find_root(u);
3362
T b = find_root(v);
63+
assert(0 <= a && a < n);
64+
assert(0 <= b && b < n);
3465
if (a == b) return {false, 0, 0};
3566

3667
if (size[a] < size[b]) std::swap(a,b);
@@ -40,7 +71,7 @@ class DisjointSetUnion {
4071
}
4172

4273
inline void reset() {
43-
for (T i = 0; i < parent.size(); i++) {
74+
for (T i = 0; i < n; i++) {
4475
parent[i] = i;
4576
size[i] = 1;
4677
}
@@ -50,41 +81,70 @@ class DisjointSetUnion {
5081
// Disjoint set union that uses atomics to be thread safe
5182
// thus is a little slower for single threaded use cases
5283
template <class T>
53-
class MT_DisjoinSetUnion {
84+
class DisjointSetUnion_MT {
5485
private:
55-
std::vector<std::atomic<T>> parent;
56-
std::vector<T> size;
86+
// number of items in the DSU
87+
T n;
88+
89+
// parent and size arrays
90+
std::atomic<T>* parent;
91+
std::atomic<T>* size;
5792
public:
58-
MT_DisjoinSetUnion(T n) : parent(n), size(n, 1) {
59-
for (T i = 0; i < n; i++)
93+
DisjointSetUnion_MT(T n) : n(n), parent(new std::atomic<T>[n]), size(new std::atomic<T>[n]) {
94+
for (T i = 0; i < n; i++) {
6095
parent[i] = i;
96+
size[i] = 1;
97+
}
98+
}
99+
100+
~DisjointSetUnion_MT() {
101+
delete[] parent;
102+
delete[] size;
103+
}
104+
105+
// make a copy of the DSU
106+
DisjointSetUnion_MT(const DisjointSetUnion_MT &oth) : n(oth.n), parent(new T[n]), size(new T[n]) {
107+
for (T i = 0; i < n; i++) {
108+
parent[i] = oth.parent[i].load();
109+
size[i] = oth.size[i].load();
110+
}
111+
}
112+
113+
// move the DSU to a new object
114+
DisjointSetUnion_MT(DisjointSetUnion_MT &&oth) : n(oth.n), parent(oth.parent), size(oth.size) {
115+
oth.n = 0;
116+
oth.parent = nullptr;
117+
oth.size = nullptr;
61118
}
62119

63120
inline T find_root(T u) {
121+
assert(0 <= u && u < n);
64122
while (parent[parent[u]] != u) {
65-
parent[u] = parent[parent[u]];
123+
parent[u] = parent[parent[u]].load();
66124
u = parent[u];
67125
}
68126
return u;
69127
}
70128

71129
// use CAS in this function to allow for simultaneous merge calls
72-
inline DSUMergeRet<T> merge(T u, T v) {
73-
while ((u = find_root(u)) != (v = find_root(v))) {
74-
if (size[u] < size[v])
75-
std::swap(u, v);
130+
inline DSUMergeRet<T> merge(T a, T b) {
131+
while ((a = find_root(a)) != (b = find_root(b))) {
132+
assert(0 <= a && a < n);
133+
assert(0 <= b && b < n);
134+
if (size[a] < size[b])
135+
std::swap(a, b);
76136

77137
// if parent of b has not been modified by another thread -> replace with a
78-
if (std::atomic_compare_exchange_weak(&parent[u], &v, u)) {
79-
size[u] += size[v];
80-
return {true, u, v};
138+
if (parent[b].compare_exchange_weak(b, a)) {
139+
size[a] += size[b];
140+
return {true, a, b};
81141
}
82142
}
83143
return {false, 0, 0};
84144
}
85145

86146
inline void reset() {
87-
for (T i = 0; i < parent.size(); i++) {
147+
for (T i = 0; i < n; i++) {
88148
parent[i] = i;
89149
size[i] = 1;
90150
}

src/supernode.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,12 @@ void Supernode::merge(Supernode &other) {
114114
}
115115

116116
void Supernode::range_merge(Supernode& other, size_t start_idx, size_t num_merge) {
117+
// For simplicity we only perform basic error checking here.
118+
// It's up to the caller to ensure they aren't accessing out of
119+
// range for a Supernode valid only in a subset of this range.
120+
if (start_idx >= Supernode::max_sketches) throw OutOfQueriesException();
121+
117122
sample_idx = std::max(sample_idx, other.sample_idx);
118-
// we trust the caller so whatever they tell us goes here
119-
// hopefully if the caller is incorrect then this will be caught by out_of_queries()
120123
merged_sketches = start_idx + num_merge;
121124
for (size_t i = sample_idx; i < merged_sketches; i++)
122125
(*get_sketch(i))+=(*other.get_sketch(i));

test/dsu_test.cpp

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
2+
#include <gtest/gtest.h>
3+
#include "types.h"
4+
#include "dsu.h"
5+
6+
// must be power of 2
7+
constexpr size_t slots_power = 16;
8+
constexpr size_t num_slots = 1 << slots_power;
9+
10+
TEST(DSU_Tests, SerialDSU) {
11+
DisjointSetUnion<node_id_t> dsu(num_slots);
12+
13+
// merge fully
14+
for (node_id_t p = slots_power; p > 0; p--) {
15+
size_t active = 1 << p;
16+
for (node_id_t i = 0; i < active / 2; i++) {
17+
dsu.merge(i, i + active / 2);
18+
}
19+
}
20+
21+
node_id_t root = dsu.find_root(0);
22+
for (node_id_t i = 1; i < num_slots; i++) {
23+
ASSERT_EQ(root, dsu.find_root(i));
24+
}
25+
}
26+
27+
TEST(DSU_Tests, DSU_MT_Single_Thread) {
28+
DisjointSetUnion_MT<node_id_t> dsu(num_slots);
29+
30+
// merge fully
31+
for (node_id_t p = slots_power; p > 0; p--) {
32+
size_t active = 1 << p;
33+
for (node_id_t i = 0; i < active / 2; i++) {
34+
dsu.merge(i, i + active / 2);
35+
}
36+
}
37+
38+
node_id_t root = dsu.find_root(0);
39+
for (node_id_t i = 1; i < num_slots; i++) {
40+
ASSERT_EQ(root, dsu.find_root(i));
41+
}
42+
}
43+
44+
TEST(DSU_Tests, DSU_MT_Eight_Threads) {
45+
DisjointSetUnion_MT<node_id_t> dsu(num_slots);
46+
47+
// merge fully
48+
#pragma omp parallel for num_threads(8)
49+
for (node_id_t p = slots_power; p > 0; p--) {
50+
size_t active = 1 << p;
51+
for (node_id_t i = 0; i < active / 2; i++) {
52+
dsu.merge(i, i + active / 2);
53+
}
54+
}
55+
56+
node_id_t root = dsu.find_root(0);
57+
for (node_id_t i = 1; i < num_slots; i++) {
58+
ASSERT_EQ(root, dsu.find_root(i));
59+
}
60+
}
61+

tools/benchmark/graphcc_bench.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,4 +365,74 @@ static void BM_DSU_Root(benchmark::State& state) {
365365
}
366366
BENCHMARK(BM_DSU_Root);
367367

368+
// Benchmark the efficiency of parallel DSU merges
369+
// when the sequence of DSU merges is adversarial
370+
// This means we avoid joining roots wherever possible
371+
static void BM_Parallel_DSU_Adversarial(benchmark::State& state) {
372+
constexpr size_t size_of_dsu = 16 * MB;
373+
374+
auto rng = std::default_random_engine{};
375+
376+
std::vector<std::pair<node_id_t, node_id_t>> updates;
377+
// generate updates
378+
for (size_t iter = 0; ((size_t)2 << iter) <= size_of_dsu; iter++) {
379+
size_t loc_size = 1 << iter;
380+
size_t jump = 2 << iter;
381+
std::vector<std::pair<node_id_t, node_id_t>> new_updates;
382+
for (size_t i = 0; i < size_of_dsu; i += jump) {
383+
new_updates.push_back({i + loc_size - 1, i + loc_size - 1 + jump / 2});
384+
}
385+
std::shuffle(new_updates.begin(), new_updates.end(), rng);
386+
updates.insert(updates.end(), new_updates.begin(), new_updates.end());
387+
}
388+
389+
// Perform merge test
390+
for (auto _ : state) {
391+
DisjointSetUnion_MT<node_id_t> dsu(size_of_dsu);
392+
#pragma omp parallel for num_threads(state.range(0))
393+
for (auto upd : updates) {
394+
dsu.merge(upd.first, upd.second);
395+
}
396+
}
397+
state.counters["Merge_Latency"] =
398+
benchmark::Counter(state.iterations() * updates.size(),
399+
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
400+
}
401+
BENCHMARK(BM_Parallel_DSU_Adversarial)->RangeMultiplier(2)->Range(1, 8)->UseRealTime();
402+
403+
// Benchmark the efficiency of parallel DSU merges
404+
// when the sequence of DSU merges is helpful
405+
// this means we only join roots
406+
static void BM_Parallel_DSU_Root(benchmark::State& state) {
407+
constexpr size_t size_of_dsu = 16 * MB;
408+
409+
auto rng = std::default_random_engine{};
410+
411+
// generate updates
412+
std::vector<std::pair<node_id_t, node_id_t>> updates;
413+
// generate updates
414+
for (size_t iter = 0; ((size_t)2 << iter) <= size_of_dsu; iter++) {
415+
size_t jump = 2 << iter;
416+
std::vector<std::pair<node_id_t, node_id_t>> new_updates;
417+
for (size_t i = 0; i < size_of_dsu; i += jump) {
418+
new_updates.push_back({i, i + jump / 2});
419+
}
420+
std::shuffle(new_updates.begin(), new_updates.end(), rng);
421+
updates.insert(updates.end(), new_updates.begin(), new_updates.end());
422+
}
423+
424+
// Perform merge test
425+
for (auto _ : state) {
426+
DisjointSetUnion_MT<node_id_t> dsu(size_of_dsu);
427+
#pragma omp parallel for num_threads(state.range(0))
428+
for (auto upd : updates) {
429+
dsu.merge(upd.first, upd.second);
430+
}
431+
}
432+
state.counters["Merge_Latency"] =
433+
benchmark::Counter(state.iterations() * updates.size(),
434+
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
435+
}
436+
BENCHMARK(BM_Parallel_DSU_Root)->RangeMultiplier(2)->Range(1, 8)->UseRealTime();
437+
368438
BENCHMARK_MAIN();

0 commit comments

Comments
 (0)