Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/cc_alg_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class CCAlgConfiguration {

// getters
std::string get_disk_dir() { return _disk_dir; }
double get_sketch_factor() { return _sketches_factor; }
double get_sketches_factor() { return _sketches_factor; }
double get_batch_factor() { return _batch_factor; }

friend std::ostream& operator<< (std::ostream &out, const CCAlgConfiguration &conf);
Expand Down
9 changes: 5 additions & 4 deletions include/cc_sketch_alg.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ struct alignas(64) GlobalMergeData {
size_t num_merge_needed = -1;
size_t num_merge_done = 0;

GlobalMergeData(node_id_t num_vertices, size_t seed)
GlobalMergeData(node_id_t num_vertices, size_t seed, double sketches_factor)
: sketch(Sketch::calc_vector_length(num_vertices), seed,
Sketch::calc_cc_samples(num_vertices)) {}
Sketch::calc_cc_samples(num_vertices, sketches_factor)) {}

GlobalMergeData(const GlobalMergeData&& other)
: sketch(other.sketch) {
Expand Down Expand Up @@ -155,8 +155,9 @@ class CCSketchAlg {
num_delta_sketches = num_workers;
delta_sketches = new Sketch *[num_delta_sketches];
for (size_t i = 0; i < num_delta_sketches; i++) {
delta_sketches[i] = new Sketch(Sketch::calc_vector_length(num_vertices), seed,
Sketch::calc_cc_samples(num_vertices));
delta_sketches[i] =
new Sketch(Sketch::calc_vector_length(num_vertices), seed,
Sketch::calc_cc_samples(num_vertices, config.get_sketches_factor()));
}
}

Expand Down
8 changes: 4 additions & 4 deletions include/dsu.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ class DisjointSetUnion {

inline T find_root(T u) {
assert(0 <= u && u < n);
while (parent[parent[u]] != u) {
while (parent[parent[u]] != parent[u]) {
parent[u] = parent[parent[u]];
u = parent[u];
}
return u;
return parent[u];
}

inline DSUMergeRet<T> merge(T u, T v) {
Expand Down Expand Up @@ -144,11 +144,11 @@ class DisjointSetUnion_MT {

inline T find_root(T u) {
assert(0 <= u && u < n);
while (parent[parent[u]] != u) {
while (parent[parent[u]] != parent[u]) {
parent[u] = parent[parent[u]].load();
u = parent[u];
}
return u;
return parent[u];
}

// use CAS in this function to allow for simultaneous merge calls
Expand Down
13 changes: 12 additions & 1 deletion include/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ class Sketch {
return ceil(double(num_vertices) * (num_vertices - 1) / 2);
}

/**
* This function computes the number of samples a Sketch should support in order to solve
* connected components. Optionally, can increase or decrease the number of samples by a
* multiplicative factor.
* @param num_vertices Number of graph vertices
* @param f Multiplicative sample factor
* @return The number of samples
*/
static size_t calc_cc_samples(node_id_t num_vertices, double f) {
return ceil(f * log2(num_vertices) / num_samples_div);
}

/**
* Construct a sketch object
* @param vector_len Length of the vector we are sketching
Expand Down Expand Up @@ -167,7 +179,6 @@ class Sketch {
inline size_t get_num_samples() const { return num_samples; }

static size_t calc_bkt_per_col(size_t n) { return ceil(log2(n)) + 1; }
static size_t calc_cc_samples(size_t n) { return ceil(log2(n) / num_samples_div); }

#ifdef L0_SAMPLING
static constexpr size_t default_cols_per_sample = 7;
Expand Down
5 changes: 0 additions & 5 deletions src/cc_alg_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ CCAlgConfiguration& CCAlgConfiguration::sketches_factor(double factor) {
<< "Defaulting to 1." << std::endl;
_sketches_factor = 1;
}
if (_sketches_factor != 1) {
std::cerr << "WARNING: Your graph configuration specifies using a factor " << _sketches_factor
<< " of the normal quantity of sketches." << std::endl;
std::cerr << " Is this intentional? If not, set sketches_factor to one!" << std::endl;
}
return *this;
}

Expand Down
44 changes: 28 additions & 16 deletions src/cc_sketch_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ CCSketchAlg::CCSketchAlg(node_id_t num_vertices, size_t seed, CCAlgConfiguration
sketches = new Sketch *[num_vertices];

vec_t sketch_vec_len = Sketch::calc_vector_length(num_vertices);
size_t sketch_num_samples = Sketch::calc_cc_samples(num_vertices);
size_t sketch_num_samples = Sketch::calc_cc_samples(num_vertices, config.get_sketches_factor());

for (node_id_t i = 0; i < num_vertices; ++i) {
representatives->insert(i);
sketches[i] = new Sketch(sketch_vec_len, seed, sketch_num_samples);
Expand Down Expand Up @@ -48,7 +49,8 @@ CCSketchAlg::CCSketchAlg(node_id_t num_vertices, size_t seed, std::ifstream &bin
sketches = new Sketch *[num_vertices];

vec_t sketch_vec_len = Sketch::calc_vector_length(num_vertices);
size_t sketch_num_samples = Sketch::calc_cc_samples(num_vertices);
size_t sketch_num_samples = Sketch::calc_cc_samples(num_vertices, config.get_sketches_factor());

for (node_id_t i = 0; i < num_vertices; ++i) {
representatives->insert(i);
sketches[i] = new Sketch(sketch_vec_len, seed, binary_stream, sketch_num_samples);
Expand Down Expand Up @@ -88,12 +90,14 @@ void CCSketchAlg::pre_insert(GraphUpdate upd, int /* thr_id */) {
auto src = std::min(edge.src, edge.dst);
auto dst = std::max(edge.src, edge.dst);
std::lock_guard<std::mutex> sflock(spanning_forest_mtx[src]);
if (spanning_forest[src].find(dst) != spanning_forest[src].end()) {
if (dsu.merge(src, dst).merged) {
// this edge adds new connectivity information so add to spanning forest
spanning_forest[src].insert(dst);
}
else if (spanning_forest[src].find(dst) != spanning_forest[src].end()) {
// this update deletes one of our spanning forest edges so mark dsu invalid
dsu_valid = false;
shared_dsu_valid = false;
} else {
spanning_forest[src].insert(dst);
dsu.merge(src, dst);
}
}
#endif // NO_EAGER_DSU
Expand Down Expand Up @@ -227,6 +231,7 @@ inline bool CCSketchAlg::run_round_zero() {
if (sample_supernode(*sketches[i]) && !modified) modified = true;
} catch (...) {
except = true;
#pragma omp critical
err = std::current_exception();
}
}
Expand Down Expand Up @@ -258,14 +263,16 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round,
{
// some thread local variables
Sketch local_sketch(Sketch::calc_vector_length(num_vertices), seed,
Sketch::calc_cc_samples(num_vertices));
Sketch::calc_cc_samples(num_vertices, config.get_sketches_factor()));

size_t thr_id = omp_get_thread_num();
size_t num_threads = omp_get_num_threads();
std::pair<node_id_t, node_id_t> partition = get_ith_partition(num_vertices, thr_id, num_threads);
node_id_t start = partition.first;
node_id_t end = partition.second;
assert(start <= end);
bool local_except = false;
std::exception_ptr local_err;

// node_id_t left_root = merge_instr[start].root;
// node_id_t right_root = merge_instr[end - 1].root;
Expand Down Expand Up @@ -298,8 +305,8 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round,
// num_query += 1;
if (sample_supernode(global_merges[thr_id].sketch) && !modified) modified = true;
} catch (...) {
except = true;
err = std::current_exception();
local_except = true;
local_err = std::current_exception();
}
}

Expand All @@ -312,8 +319,8 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round,
// num_query += 1;
if (sample_supernode(local_sketch) && !modified) modified = true;
} catch (...) {
except = true;
err = std::current_exception();
local_except = true;
local_err = std::current_exception();
}
}

Expand Down Expand Up @@ -343,8 +350,8 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round,
// num_query += 1;
if (sample_supernode(global_merges[global_id].sketch) && !modified) modified = true;
} catch (...) {
except = true;
err = std::current_exception();
local_except = true;
local_err = std::current_exception();
}
}
} else {
Expand All @@ -354,10 +361,15 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round,
// num_query += 1;
if (sample_supernode(local_sketch) && !modified) modified = true;
} catch (...) {
except = true;
err = std::current_exception();
local_except = true;
local_err = std::current_exception();
}
}
if (local_except) {
#pragma omp critical
err = local_err;
except = true;
}
}

// std::cout << "Number of roots queried = " << num_query << std::endl;
Expand Down Expand Up @@ -463,7 +475,7 @@ void CCSketchAlg::boruvka_emulation() {
std::vector<GlobalMergeData> global_merges;
global_merges.reserve(num_threads);
for (size_t i = 0; i < num_threads; i++) {
global_merges.emplace_back(num_vertices, seed);
global_merges.emplace_back(num_vertices, seed, config.get_sketches_factor());
}

dsu.reset();
Expand Down
1 change: 1 addition & 0 deletions src/return_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ std::vector<std::set<node_id_t>> ConnectedComponents::get_component_sets() {
SpanningForest::SpanningForest(node_id_t num_vertices,
const std::unordered_set<node_id_t> *spanning_forest)
: num_vertices(num_vertices) {
edges.reserve(num_vertices);
for (node_id_t src = 0; src < num_vertices; src++) {
for (node_id_t dst : spanning_forest[src]) {
edges.push_back({src, dst});
Expand Down
8 changes: 4 additions & 4 deletions test/sketch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ TEST(SketchTestSuite, TestExhaustiveQuery) {

TEST(SketchTestSuite, TestSampleInsertGrinder) {
size_t nodes = 4096;
Sketch sketch(Sketch::calc_vector_length(nodes), get_seed(), Sketch::calc_cc_samples(nodes));
Sketch sketch(Sketch::calc_vector_length(nodes), get_seed(), Sketch::calc_cc_samples(nodes, 1));

for (size_t src = 0; src < nodes - 1; src++) {
for (size_t dst = src + 7; dst < nodes; dst += 7) {
Expand All @@ -372,7 +372,7 @@ TEST(SketchTestSuite, TestSampleInsertGrinder) {
}

size_t successes = 0;
for (size_t i = 0; i < Sketch::calc_cc_samples(nodes); i++) {
for (size_t i = 0; i < Sketch::calc_cc_samples(nodes, 1); i++) {
SketchSample ret = sketch.sample();
if (ret.result == FAIL) continue;

Expand All @@ -388,7 +388,7 @@ TEST(SketchTestSuite, TestSampleInsertGrinder) {

TEST(SketchTestSuite, TestSampleDeleteGrinder) {
size_t nodes = 4096;
Sketch sketch(Sketch::calc_vector_length(nodes), get_seed(), Sketch::calc_cc_samples(nodes));
Sketch sketch(Sketch::calc_vector_length(nodes), get_seed(), Sketch::calc_cc_samples(nodes, 1));

// insert
for (size_t src = 0; src < nodes - 1; src++) {
Expand All @@ -405,7 +405,7 @@ TEST(SketchTestSuite, TestSampleDeleteGrinder) {
}

size_t successes = 0;
for (size_t i = 0; i < Sketch::calc_cc_samples(nodes); i++) {
for (size_t i = 0; i < Sketch::calc_cc_samples(nodes, 1); i++) {
SketchSample ret = sketch.sample();
if (ret.result == FAIL) continue;

Expand Down
2 changes: 1 addition & 1 deletion test/util/mat_graph_verifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void MatGraphVerifier::edge_update(node_id_t src, node_id_t dst) {
// update adj_matrix entry
adj_matrix[src][dst] = !adj_matrix[src][dst];
}


void MatGraphVerifier::reset_cc_state() {
kruskal_ref = kruskal();
Expand Down
99 changes: 99 additions & 0 deletions tools/benchmark/graphcc_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,105 @@ BENCHMARK(BM_Sketch_Serialize)->RangeMultiplier(10)->Range(1e3, 1e6);
// }
// BENCHMARK(BM_Sketch_Sparse_Serialize)->RangeMultiplier(10)->Range(1e3, 1e6);

// Benchmark DSU Find Root
static void BM_DSU_Find(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;
DisjointSetUnion<node_id_t> dsu(size_of_dsu);

auto rng = std::default_random_engine{};
std::vector<node_id_t> queries;
for (size_t i = 0; i < 4096; i++) {
queries.push_back((size_of_dsu / 4096) * i);
}
std::shuffle(queries.begin(), queries.end(), rng);

// perform find test
for (auto _ : state) {
for (auto q : queries)
dsu.find_root(q);
}
state.counters["Find_Latency"] =
benchmark::Counter(state.iterations() * queries.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_DSU_Find);

static void BM_DSU_Find_After_Combine(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;
DisjointSetUnion<node_id_t> dsu(size_of_dsu);
// merge everything into same root
for (size_t i = 0; i < size_of_dsu - 1; i++) {
dsu.merge(i, i+1);
}

auto rng = std::default_random_engine{};
std::vector<node_id_t> queries;
for (size_t i = 0; i < 4096; i++) {
queries.push_back((size_of_dsu / 4096) * i);
}
std::shuffle(queries.begin(), queries.end(), rng);

// perform find test
for (auto _ : state) {
for (auto q : queries)
dsu.find_root(q);
}
state.counters["Find_Latency"] =
benchmark::Counter(state.iterations() * queries.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_DSU_Find_After_Combine);

// MT DSU Find Root
static void BM_MT_DSU_Find(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;
DisjointSetUnion_MT<node_id_t> dsu(size_of_dsu);

auto rng = std::default_random_engine{};
std::vector<node_id_t> queries;
for (size_t i = 0; i < 4096; i++) {
queries.push_back((size_of_dsu / 4096) * i);
}
std::shuffle(queries.begin(), queries.end(), rng);

// perform find test
for (auto _ : state) {
for (auto q : queries)
dsu.find_root(q);
}
state.counters["Find_Latency"] =
benchmark::Counter(state.iterations() * queries.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_MT_DSU_Find);

// MT DSU Find Root
static void BM_MT_DSU_Find_After_Combine(benchmark::State& state) {
constexpr size_t size_of_dsu = MB;
DisjointSetUnion_MT<node_id_t> dsu(size_of_dsu);
// merge everything into same root
for (size_t i = 0; i < size_of_dsu - 1; i++) {
dsu.merge(i, i+1);
}

auto rng = std::default_random_engine{};
std::vector<node_id_t> queries;
for (size_t i = 0; i < 512; i++) {
queries.push_back((size_of_dsu / 512) * i);
}
std::shuffle(queries.begin(), queries.end(), rng);

// perform find test
for (auto _ : state) {
for (auto q : queries)
dsu.find_root(q);
}
state.counters["Find_Latency"] =
benchmark::Counter(state.iterations() * queries.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_MT_DSU_Find_After_Combine);

// Benchmark speed of DSU merges when the sequence of merges is adversarial
// This means we avoid joining roots wherever possible
static void BM_DSU_Adversarial(benchmark::State& state) {
Expand Down
2 changes: 1 addition & 1 deletion tools/test_correctness.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ CorrectnessResults test_path_correctness(size_t num_vertices, size_t num_graphs,
size_t samples_per_graph) {
CorrectnessResults results;

size_t num_rounds = Sketch::calc_cc_samples(num_vertices);
size_t num_rounds = Sketch::calc_cc_samples(num_vertices, 1);
for (size_t r = 0; r < num_rounds; r++)
results.num_round_hist.push_back(0);

Expand Down