Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ if (BUILD_EXE)
test/graph_test.cpp
test/sketch_test.cpp
test/supernode_test.cpp
test/dsu_test.cpp
test/util_test.cpp
test/util/file_graph_verifier.cpp
test/util/graph_gen.cpp
Expand Down
96 changes: 78 additions & 18 deletions include/dsu.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <vector>
#include <atomic>
#include <cassert>

template<class T>
struct DSUMergeRet {
Expand All @@ -11,16 +12,44 @@ struct DSUMergeRet {

template <class T>
class DisjointSetUnion {
std::vector<T> parent;
std::vector<T> size;
// number of items in the DSU
T n;

// parent and size arrays
T* parent;
T* size;
public:
DisjointSetUnion(T n) : parent(n), size(n, 1) {
DisjointSetUnion(T n) : n(n), parent(new T[n]), size(new T[n]) {
for (T i = 0; i < n; i++) {
parent[i] = i;
size[i] = 1;
}
}

~DisjointSetUnion() {
delete[] parent;
delete[] size;
}

// make a copy of the DSU
DisjointSetUnion(const DisjointSetUnion &oth) : n(oth.n), parent(new T[n]), size(new T[n]) {
for (T i = 0; i < n; i++) {
parent[i] = oth.parent[i];
size[i] = oth.size[i];
}
}

// move the DSU to a new object
DisjointSetUnion(DisjointSetUnion &&oth) : n(oth.n), parent(oth.parent), size(oth.size) {
oth.n = 0;
oth.parent = nullptr;
oth.size = nullptr;
}

DisjointSetUnion operator=(const DisjointSetUnion &oth) = delete;

inline T find_root(T u) {
assert(0 <= u && u < n);
while(parent[parent[u]] != u) {
parent[u] = parent[parent[u]];
u = parent[u];
Expand All @@ -31,6 +60,8 @@ class DisjointSetUnion {
inline DSUMergeRet<T> merge(T u, T v) {
T a = find_root(u);
T b = find_root(v);
assert(0 <= a && a < n);
assert(0 <= b && b < n);
if (a == b) return {false, 0, 0};

if (size[a] < size[b]) std::swap(a,b);
Expand All @@ -40,7 +71,7 @@ class DisjointSetUnion {
}

inline void reset() {
for (T i = 0; i < parent.size(); i++) {
for (T i = 0; i < n; i++) {
parent[i] = i;
size[i] = 1;
}
Expand All @@ -50,41 +81,70 @@ class DisjointSetUnion {
// Disjoint set union that uses atomics to be thread safe
// thus is a little slower for single threaded use cases
template <class T>
class MT_DisjoinSetUnion {
class DisjointSetUnion_MT {
private:
std::vector<std::atomic<T>> parent;
std::vector<T> size;
// number of items in the DSU
T n;

// parent and size arrays
std::atomic<T>* parent;
std::atomic<T>* size;
public:
MT_DisjoinSetUnion(T n) : parent(n), size(n, 1) {
for (T i = 0; i < n; i++)
DisjointSetUnion_MT(T n) : n(n), parent(new std::atomic<T>[n]), size(new std::atomic<T>[n]) {
for (T i = 0; i < n; i++) {
parent[i] = i;
size[i] = 1;
}
}

~DisjointSetUnion_MT() {
delete[] parent;
delete[] size;
}

// make a copy of the DSU
DisjointSetUnion_MT(const DisjointSetUnion_MT &oth) : n(oth.n), parent(new T[n]), size(new T[n]) {
for (T i = 0; i < n; i++) {
parent[i] = oth.parent[i].load();
size[i] = oth.size[i].load();
}
}

// move the DSU to a new object
DisjointSetUnion_MT(DisjointSetUnion_MT &&oth) : n(oth.n), parent(oth.parent), size(oth.size) {
oth.n = 0;
oth.parent = nullptr;
oth.size = nullptr;
}

inline T find_root(T u) {
assert(0 <= u && u < n);
while (parent[parent[u]] != u) {
parent[u] = parent[parent[u]];
parent[u] = parent[parent[u]].load();
u = parent[u];
}
return u;
}

// use CAS in this function to allow for simultaneous merge calls
inline DSUMergeRet<T> merge(T u, T v) {
while ((u = find_root(u)) != (v = find_root(v))) {
if (size[u] < size[v])
std::swap(u, v);
inline DSUMergeRet<T> merge(T a, T b) {
while ((a = find_root(a)) != (b = find_root(b))) {
assert(0 <= a && a < n);
assert(0 <= b && b < n);
if (size[a] < size[b])
std::swap(a, b);

// if parent of b has not been modified by another thread -> replace with a
if (std::atomic_compare_exchange_weak(&parent[u], &v, u)) {
size[u] += size[v];
return {true, u, v};
if (parent[b].compare_exchange_weak(b, a)) {
size[a] += size[b];
return {true, a, b};
}
}
return {false, 0, 0};
}

inline void reset() {
for (T i = 0; i < parent.size(); i++) {
for (T i = 0; i < n; i++) {
parent[i] = i;
size[i] = 1;
}
Expand Down
7 changes: 5 additions & 2 deletions src/supernode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ void Supernode::merge(Supernode &other) {
}

void Supernode::range_merge(Supernode& other, size_t start_idx, size_t num_merge) {
// For simplicity we only perform basic error checking here.
// It's up to the caller to ensure they aren't accessing out of
// range for a Supernode valid only in a subset of this range.
if (start_idx >= Supernode::max_sketches) throw OutOfQueriesException();

sample_idx = std::max(sample_idx, other.sample_idx);
// we trust the caller so whatever they tell us goes here
// hopefully if the caller is incorrect then this will be caught by out_of_queries()
merged_sketches = start_idx + num_merge;
for (size_t i = sample_idx; i < merged_sketches; i++)
(*get_sketch(i))+=(*other.get_sketch(i));
Expand Down
61 changes: 61 additions & 0 deletions test/dsu_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@

#include <gtest/gtest.h>
#include "types.h"
#include "dsu.h"

// must be power of 2
constexpr size_t slots_power = 16;
constexpr size_t num_slots = 1 << slots_power;

TEST(DSU_Tests, SerialDSU) {
DisjointSetUnion<node_id_t> dsu(num_slots);

// merge fully
for (node_id_t p = slots_power; p > 0; p--) {
size_t active = 1 << p;
for (node_id_t i = 0; i < active / 2; i++) {
dsu.merge(i, i + active / 2);
}
}

node_id_t root = dsu.find_root(0);
for (node_id_t i = 1; i < num_slots; i++) {
ASSERT_EQ(root, dsu.find_root(i));
}
}

TEST(DSU_Tests, DSU_MT_Single_Thread) {
DisjointSetUnion_MT<node_id_t> dsu(num_slots);

// merge fully
for (node_id_t p = slots_power; p > 0; p--) {
size_t active = 1 << p;
for (node_id_t i = 0; i < active / 2; i++) {
dsu.merge(i, i + active / 2);
}
}

node_id_t root = dsu.find_root(0);
for (node_id_t i = 1; i < num_slots; i++) {
ASSERT_EQ(root, dsu.find_root(i));
}
}

TEST(DSU_Tests, DSU_MT_Eight_Threads) {
DisjointSetUnion_MT<node_id_t> dsu(num_slots);

// merge fully
#pragma omp parallel for num_threads(8)
for (node_id_t p = slots_power; p > 0; p--) {
size_t active = 1 << p;
for (node_id_t i = 0; i < active / 2; i++) {
dsu.merge(i, i + active / 2);
}
}

node_id_t root = dsu.find_root(0);
for (node_id_t i = 1; i < num_slots; i++) {
ASSERT_EQ(root, dsu.find_root(i));
}
}

70 changes: 70 additions & 0 deletions tools/benchmark/graphcc_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,74 @@ static void BM_DSU_Root(benchmark::State& state) {
}
BENCHMARK(BM_DSU_Root);

// Benchmark the efficiency of parallel DSU merges
// when the sequence of DSU merges is adversarial
// This means we avoid joining roots wherever possible
static void BM_Parallel_DSU_Adversarial(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;

auto rng = std::default_random_engine{};

std::vector<std::pair<node_id_t, node_id_t>> updates;
// generate updates
for (size_t iter = 0; ((size_t)2 << iter) <= size_of_dsu; iter++) {
size_t loc_size = 1 << iter;
size_t jump = 2 << iter;
std::vector<std::pair<node_id_t, node_id_t>> new_updates;
for (size_t i = 0; i < size_of_dsu; i += jump) {
new_updates.push_back({i + loc_size - 1, i + loc_size - 1 + jump / 2});
}
std::shuffle(new_updates.begin(), new_updates.end(), rng);
updates.insert(updates.end(), new_updates.begin(), new_updates.end());
}

// Perform merge test
for (auto _ : state) {
DisjointSetUnion_MT<node_id_t> dsu(size_of_dsu);
#pragma omp parallel for num_threads(state.range(0))
for (auto upd : updates) {
dsu.merge(upd.first, upd.second);
}
}
state.counters["Merge_Latency"] =
benchmark::Counter(state.iterations() * updates.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_Parallel_DSU_Adversarial)->RangeMultiplier(2)->Range(1, 8)->UseRealTime();

// Benchmark the efficiency of parallel DSU merges
// when the sequence of DSU merges is helpful
// this means we only join roots
static void BM_Parallel_DSU_Root(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;

auto rng = std::default_random_engine{};

// generate updates
std::vector<std::pair<node_id_t, node_id_t>> updates;
// generate updates
for (size_t iter = 0; ((size_t)2 << iter) <= size_of_dsu; iter++) {
size_t jump = 2 << iter;
std::vector<std::pair<node_id_t, node_id_t>> new_updates;
for (size_t i = 0; i < size_of_dsu; i += jump) {
new_updates.push_back({i, i + jump / 2});
}
std::shuffle(new_updates.begin(), new_updates.end(), rng);
updates.insert(updates.end(), new_updates.begin(), new_updates.end());
}

// Perform merge test
for (auto _ : state) {
DisjointSetUnion_MT<node_id_t> dsu(size_of_dsu);
#pragma omp parallel for num_threads(state.range(0))
for (auto upd : updates) {
dsu.merge(upd.first, upd.second);
}
}
state.counters["Merge_Latency"] =
benchmark::Counter(state.iterations() * updates.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_Parallel_DSU_Root)->RangeMultiplier(2)->Range(1, 8)->UseRealTime();

BENCHMARK_MAIN();