diff --git a/include/cc_alg_configuration.h b/include/cc_alg_configuration.h index 52da61c6..c1cc80f6 100644 --- a/include/cc_alg_configuration.h +++ b/include/cc_alg_configuration.h @@ -26,7 +26,7 @@ class CCAlgConfiguration { // getters std::string get_disk_dir() { return _disk_dir; } - double get_sketch_factor() { return _sketches_factor; } + double get_sketches_factor() { return _sketches_factor; } double get_batch_factor() { return _batch_factor; } friend std::ostream& operator<< (std::ostream &out, const CCAlgConfiguration &conf); diff --git a/include/cc_sketch_alg.h b/include/cc_sketch_alg.h index ebf8547d..b47795a4 100644 --- a/include/cc_sketch_alg.h +++ b/include/cc_sketch_alg.h @@ -45,9 +45,9 @@ struct alignas(64) GlobalMergeData { size_t num_merge_needed = -1; size_t num_merge_done = 0; - GlobalMergeData(node_id_t num_vertices, size_t seed) + GlobalMergeData(node_id_t num_vertices, size_t seed, double sketches_factor) : sketch(Sketch::calc_vector_length(num_vertices), seed, - Sketch::calc_cc_samples(num_vertices)) {} + Sketch::calc_cc_samples(num_vertices, sketches_factor)) {} GlobalMergeData(const GlobalMergeData&& other) : sketch(other.sketch) { @@ -155,8 +155,9 @@ class CCSketchAlg { num_delta_sketches = num_workers; delta_sketches = new Sketch *[num_delta_sketches]; for (size_t i = 0; i < num_delta_sketches; i++) { - delta_sketches[i] = new Sketch(Sketch::calc_vector_length(num_vertices), seed, - Sketch::calc_cc_samples(num_vertices)); + delta_sketches[i] = + new Sketch(Sketch::calc_vector_length(num_vertices), seed, + Sketch::calc_cc_samples(num_vertices, config.get_sketches_factor())); } } diff --git a/include/dsu.h b/include/dsu.h index 77358cdd..daa7bde1 100644 --- a/include/dsu.h +++ b/include/dsu.h @@ -70,11 +70,11 @@ class DisjointSetUnion { inline T find_root(T u) { assert(0 <= u && u < n); - while (parent[parent[u]] != u) { + while (parent[parent[u]] != parent[u]) { parent[u] = parent[parent[u]]; u = parent[u]; } - return u; + return parent[u]; } inline DSUMergeRet merge(T u, T v) { @@ -144,11 +144,11 @@ class DisjointSetUnion_MT { inline T find_root(T u) { assert(0 <= u && u < n); - while (parent[parent[u]] != u) { + while (parent[parent[u]] != parent[u]) { parent[u] = parent[parent[u]].load(); u = parent[u]; } - return u; + return parent[u]; } // use CAS in this function to allow for simultaneous merge calls diff --git a/include/sketch.h b/include/sketch.h index 23f009f0..1b50ed30 100644 --- a/include/sketch.h +++ b/include/sketch.h @@ -63,6 +63,18 @@ class Sketch { return ceil(double(num_vertices) * (num_vertices - 1) / 2); } + /** + * This function computes the number of samples a Sketch should support in order to solve + * connected components. Optionally, can increase or decrease the number of samples by a + * multiplicative factor. + * @param num_vertices Number of graph vertices + * @param f Multiplicative sample factor + * @return The number of samples + */ + static size_t calc_cc_samples(node_id_t num_vertices, double f) { + return ceil(f * log2(num_vertices) / num_samples_div); + } + /** * Construct a sketch object * @param vector_len Length of the vector we are sketching @@ -167,7 +179,6 @@ class Sketch { inline size_t get_num_samples() const { return num_samples; } static size_t calc_bkt_per_col(size_t n) { return ceil(log2(n)) + 1; } - static size_t calc_cc_samples(size_t n) { return ceil(log2(n) / num_samples_div); } #ifdef L0_SAMPLING static constexpr size_t default_cols_per_sample = 7; diff --git a/src/cc_alg_configuration.cpp b/src/cc_alg_configuration.cpp index 93d0ae47..becbb7e5 100644 --- a/src/cc_alg_configuration.cpp +++ b/src/cc_alg_configuration.cpp @@ -14,11 +14,6 @@ CCAlgConfiguration& CCAlgConfiguration::sketches_factor(double factor) { << "Defaulting to 1." << std::endl; _sketches_factor = 1; } - if (_sketches_factor != 1) { - std::cerr << "WARNING: Your graph configuration specifies using a factor " << _sketches_factor - << " of the normal quantity of sketches." << std::endl; - std::cerr << " Is this intentional? If not, set sketches_factor to one!" << std::endl; - } return *this; } diff --git a/src/cc_sketch_alg.cpp b/src/cc_sketch_alg.cpp index ecd67fcb..ace96bce 100644 --- a/src/cc_sketch_alg.cpp +++ b/src/cc_sketch_alg.cpp @@ -14,7 +14,8 @@ CCSketchAlg::CCSketchAlg(node_id_t num_vertices, size_t seed, CCAlgConfiguration sketches = new Sketch *[num_vertices]; vec_t sketch_vec_len = Sketch::calc_vector_length(num_vertices); - size_t sketch_num_samples = Sketch::calc_cc_samples(num_vertices); + size_t sketch_num_samples = Sketch::calc_cc_samples(num_vertices, config.get_sketches_factor()); + for (node_id_t i = 0; i < num_vertices; ++i) { representatives->insert(i); sketches[i] = new Sketch(sketch_vec_len, seed, sketch_num_samples); @@ -48,7 +49,8 @@ CCSketchAlg::CCSketchAlg(node_id_t num_vertices, size_t seed, std::ifstream &bin sketches = new Sketch *[num_vertices]; vec_t sketch_vec_len = Sketch::calc_vector_length(num_vertices); - size_t sketch_num_samples = Sketch::calc_cc_samples(num_vertices); + size_t sketch_num_samples = Sketch::calc_cc_samples(num_vertices, config.get_sketches_factor()); + for (node_id_t i = 0; i < num_vertices; ++i) { representatives->insert(i); sketches[i] = new Sketch(sketch_vec_len, seed, binary_stream, sketch_num_samples); @@ -88,12 +90,14 @@ void CCSketchAlg::pre_insert(GraphUpdate upd, int /* thr_id */) { auto src = std::min(edge.src, edge.dst); auto dst = std::max(edge.src, edge.dst); std::lock_guard sflock(spanning_forest_mtx[src]); - if (spanning_forest[src].find(dst) != spanning_forest[src].end()) { + if (dsu.merge(src, dst).merged) { + // this edge adds new connectivity information so add to spanning forest + spanning_forest[src].insert(dst); + } + else if (spanning_forest[src].find(dst) != spanning_forest[src].end()) { + // this update deletes one of our spanning forest edges so mark dsu invalid dsu_valid = false; shared_dsu_valid = false; - } else { - spanning_forest[src].insert(dst); - dsu.merge(src, dst); } } #endif // NO_EAGER_DSU @@ -227,6 +231,7 @@ inline bool CCSketchAlg::run_round_zero() { if (sample_supernode(*sketches[i]) && !modified) modified = true; } catch (...) { except = true; +#pragma omp critical err = std::current_exception(); } } @@ -258,7 +263,7 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round, { // some thread local variables Sketch local_sketch(Sketch::calc_vector_length(num_vertices), seed, - Sketch::calc_cc_samples(num_vertices)); + Sketch::calc_cc_samples(num_vertices, config.get_sketches_factor())); size_t thr_id = omp_get_thread_num(); size_t num_threads = omp_get_num_threads(); @@ -266,6 +271,8 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round, node_id_t start = partition.first; node_id_t end = partition.second; assert(start <= end); + bool local_except = false; + std::exception_ptr local_err; // node_id_t left_root = merge_instr[start].root; // node_id_t right_root = merge_instr[end - 1].root; @@ -298,8 +305,8 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round, // num_query += 1; if (sample_supernode(global_merges[thr_id].sketch) && !modified) modified = true; } catch (...) { - except = true; - err = std::current_exception(); + local_except = true; + local_err = std::current_exception(); } } @@ -312,8 +319,8 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round, // num_query += 1; if (sample_supernode(local_sketch) && !modified) modified = true; } catch (...) { - except = true; - err = std::current_exception(); + local_except = true; + local_err = std::current_exception(); } } @@ -343,8 +350,8 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round, // num_query += 1; if (sample_supernode(global_merges[global_id].sketch) && !modified) modified = true; } catch (...) { - except = true; - err = std::current_exception(); + local_except = true; + local_err = std::current_exception(); } } } else { @@ -354,10 +361,15 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round, // num_query += 1; if (sample_supernode(local_sketch) && !modified) modified = true; } catch (...) { - except = true; - err = std::current_exception(); + local_except = true; + local_err = std::current_exception(); } } + if (local_except) { +#pragma omp critical + err = local_err; + except = true; + } } // std::cout << "Number of roots queried = " << num_query << std::endl; @@ -463,7 +475,7 @@ void CCSketchAlg::boruvka_emulation() { std::vector global_merges; global_merges.reserve(num_threads); for (size_t i = 0; i < num_threads; i++) { - global_merges.emplace_back(num_vertices, seed); + global_merges.emplace_back(num_vertices, seed, config.get_sketches_factor()); } dsu.reset(); diff --git a/src/return_types.cpp b/src/return_types.cpp index 8b2726dc..f4c4998d 100644 --- a/src/return_types.cpp +++ b/src/return_types.cpp @@ -32,6 +32,7 @@ std::vector> ConnectedComponents::get_component_sets() { SpanningForest::SpanningForest(node_id_t num_vertices, const std::unordered_set *spanning_forest) : num_vertices(num_vertices) { + edges.reserve(num_vertices); for (node_id_t src = 0; src < num_vertices; src++) { for (node_id_t dst : spanning_forest[src]) { edges.push_back({src, dst}); diff --git a/test/sketch_test.cpp b/test/sketch_test.cpp index dfe81eae..e35f4aa2 100644 --- a/test/sketch_test.cpp +++ b/test/sketch_test.cpp @@ -363,7 +363,7 @@ TEST(SketchTestSuite, TestExhaustiveQuery) { TEST(SketchTestSuite, TestSampleInsertGrinder) { size_t nodes = 4096; - Sketch sketch(Sketch::calc_vector_length(nodes), get_seed(), Sketch::calc_cc_samples(nodes)); + Sketch sketch(Sketch::calc_vector_length(nodes), get_seed(), Sketch::calc_cc_samples(nodes, 1)); for (size_t src = 0; src < nodes - 1; src++) { for (size_t dst = src + 7; dst < nodes; dst += 7) { @@ -372,7 +372,7 @@ TEST(SketchTestSuite, TestSampleInsertGrinder) { } size_t successes = 0; - for (size_t i = 0; i < Sketch::calc_cc_samples(nodes); i++) { + for (size_t i = 0; i < Sketch::calc_cc_samples(nodes, 1); i++) { SketchSample ret = sketch.sample(); if (ret.result == FAIL) continue; @@ -388,7 +388,7 @@ TEST(SketchTestSuite, TestSampleInsertGrinder) { TEST(SketchTestSuite, TestSampleDeleteGrinder) { size_t nodes = 4096; - Sketch sketch(Sketch::calc_vector_length(nodes), get_seed(), Sketch::calc_cc_samples(nodes)); + Sketch sketch(Sketch::calc_vector_length(nodes), get_seed(), Sketch::calc_cc_samples(nodes, 1)); // insert for (size_t src = 0; src < nodes - 1; src++) { @@ -405,7 +405,7 @@ TEST(SketchTestSuite, TestSampleDeleteGrinder) { } size_t successes = 0; - for (size_t i = 0; i < Sketch::calc_cc_samples(nodes); i++) { + for (size_t i = 0; i < Sketch::calc_cc_samples(nodes, 1); i++) { SketchSample ret = sketch.sample(); if (ret.result == FAIL) continue; diff --git a/test/util/mat_graph_verifier.cpp b/test/util/mat_graph_verifier.cpp index 8c2db81a..d313d736 100644 --- a/test/util/mat_graph_verifier.cpp +++ b/test/util/mat_graph_verifier.cpp @@ -19,7 +19,7 @@ void MatGraphVerifier::edge_update(node_id_t src, node_id_t dst) { // update adj_matrix entry adj_matrix[src][dst] = !adj_matrix[src][dst]; } - + void MatGraphVerifier::reset_cc_state() { kruskal_ref = kruskal(); diff --git a/tools/benchmark/graphcc_bench.cpp b/tools/benchmark/graphcc_bench.cpp index f510d6de..fc5b8995 100644 --- a/tools/benchmark/graphcc_bench.cpp +++ b/tools/benchmark/graphcc_bench.cpp @@ -299,6 +299,105 @@ BENCHMARK(BM_Sketch_Serialize)->RangeMultiplier(10)->Range(1e3, 1e6); // } // BENCHMARK(BM_Sketch_Sparse_Serialize)->RangeMultiplier(10)->Range(1e3, 1e6); +// Benchmark DSU Find Root +static void BM_DSU_Find(benchmark::State& state) { + constexpr size_t size_of_dsu = 16 * MB; + DisjointSetUnion dsu(size_of_dsu); + + auto rng = std::default_random_engine{}; + std::vector queries; + for (size_t i = 0; i < 4096; i++) { + queries.push_back((size_of_dsu / 4096) * i); + } + std::shuffle(queries.begin(), queries.end(), rng); + + // perform find test + for (auto _ : state) { + for (auto q : queries) + dsu.find_root(q); + } + state.counters["Find_Latency"] = + benchmark::Counter(state.iterations() * queries.size(), + benchmark::Counter::kIsRate | benchmark::Counter::kInvert); +} +BENCHMARK(BM_DSU_Find); + +static void BM_DSU_Find_After_Combine(benchmark::State& state) { + constexpr size_t size_of_dsu = 16 * MB; + DisjointSetUnion dsu(size_of_dsu); + // merge everything into same root + for (size_t i = 0; i < size_of_dsu - 1; i++) { + dsu.merge(i, i+1); + } + + auto rng = std::default_random_engine{}; + std::vector queries; + for (size_t i = 0; i < 4096; i++) { + queries.push_back((size_of_dsu / 4096) * i); + } + std::shuffle(queries.begin(), queries.end(), rng); + + // perform find test + for (auto _ : state) { + for (auto q : queries) + dsu.find_root(q); + } + state.counters["Find_Latency"] = + benchmark::Counter(state.iterations() * queries.size(), + benchmark::Counter::kIsRate | benchmark::Counter::kInvert); +} +BENCHMARK(BM_DSU_Find_After_Combine); + +// MT DSU Find Root +static void BM_MT_DSU_Find(benchmark::State& state) { + constexpr size_t size_of_dsu = 16 * MB; + DisjointSetUnion_MT dsu(size_of_dsu); + + auto rng = std::default_random_engine{}; + std::vector queries; + for (size_t i = 0; i < 4096; i++) { + queries.push_back((size_of_dsu / 4096) * i); + } + std::shuffle(queries.begin(), queries.end(), rng); + + // perform find test + for (auto _ : state) { + for (auto q : queries) + dsu.find_root(q); + } + state.counters["Find_Latency"] = + benchmark::Counter(state.iterations() * queries.size(), + benchmark::Counter::kIsRate | benchmark::Counter::kInvert); +} +BENCHMARK(BM_MT_DSU_Find); + +// MT DSU Find Root +static void BM_MT_DSU_Find_After_Combine(benchmark::State& state) { + constexpr size_t size_of_dsu = MB; + DisjointSetUnion_MT dsu(size_of_dsu); + // merge everything into same root + for (size_t i = 0; i < size_of_dsu - 1; i++) { + dsu.merge(i, i+1); + } + + auto rng = std::default_random_engine{}; + std::vector queries; + for (size_t i = 0; i < 512; i++) { + queries.push_back((size_of_dsu / 512) * i); + } + std::shuffle(queries.begin(), queries.end(), rng); + + // perform find test + for (auto _ : state) { + for (auto q : queries) + dsu.find_root(q); + } + state.counters["Find_Latency"] = + benchmark::Counter(state.iterations() * queries.size(), + benchmark::Counter::kIsRate | benchmark::Counter::kInvert); +} +BENCHMARK(BM_MT_DSU_Find_After_Combine); + // Benchmark speed of DSU merges when the sequence of merges is adversarial // This means we avoid joining roots wherever possible static void BM_DSU_Adversarial(benchmark::State& state) { diff --git a/tools/test_correctness.cpp b/tools/test_correctness.cpp index 00e7f822..36a6322f 100644 --- a/tools/test_correctness.cpp +++ b/tools/test_correctness.cpp @@ -22,7 +22,7 @@ CorrectnessResults test_path_correctness(size_t num_vertices, size_t num_graphs, size_t samples_per_graph) { CorrectnessResults results; - size_t num_rounds = Sketch::calc_cc_samples(num_vertices); + size_t num_rounds = Sketch::calc_cc_samples(num_vertices, 1); for (size_t r = 0; r < num_rounds; r++) results.num_round_hist.push_back(0);