From 504a92f00827187d024a121f91da8d26285a4d88 Mon Sep 17 00:00:00 2001 From: Evan West Date: Wed, 26 Jul 2023 01:38:50 -0400 Subject: [PATCH 1/9] initial commit: l0_sampling and better config options --- CMakeLists.txt | 10 +++++++--- include/graph_configuration.h | 10 +++++++--- include/supernode.h | 5 +++++ src/graph.cpp | 20 +++++++++++++++----- src/graph_configuration.cpp | 28 +++++++++++++++++++--------- src/l0_sampling/sketch.cpp | 20 ++++++++++++++++++++ test/sketch_test.cpp | 12 ++++++------ tools/process_stream.cpp | 6 ++++-- 8 files changed, 83 insertions(+), 28 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1552e43c..8e9554ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,7 +55,7 @@ FetchContent_Declare( GutterTree GIT_REPOSITORY https://github.com/GraphStreamingProject/GutterTree.git - GIT_TAG main + GIT_TAG better_config ) if (BUILD_BENCH) @@ -88,9 +88,13 @@ set(BUILD_SHARED_LIBS "${SAVED_BUILD_SHARED_LIBS}" CACHE BOOL "" FORCE) # AVAILABLE COMPILATION DEFINITIONS: -# VERIFY_SAMPLES_F Use a deterministic connected-components +# VERIFY_SAMPLES_F Use a deterministic connected-components # algorithm to verify post-processing. -# USE_EAGER_DSU Use the eager DSU query optimization if this flag is present. +# USE_EAGER_DSU Use the eager DSU query optimization if +# this flag is present. +# L0_SAMPLING Run the CubeSketch l0 sampling algorithm +# to ensure that we sample uniformly. +# Otherwise, run a support finding algorithm. add_library(GraphZeppelin src/graph.cpp diff --git a/include/graph_configuration.h b/include/graph_configuration.h index 6d4d61d9..3a008b66 100644 --- a/include/graph_configuration.h +++ b/include/graph_configuration.h @@ -5,7 +5,6 @@ // forward declaration class Graph; -// TODO: Replace this with an enum defined by GutterTree repo enum GutterSystem { GUTTERTREE, STANDALONE, @@ -33,7 +32,10 @@ class GraphConfiguration { // Option to create more sketches than for standard connected components // Ex factor of 1.5, 1.5 times the sketches // factor of 1, normal quantity of sketches - double _adtl_skts_factor = 1; + double _sketches_factor = 1; + + // Size of update batches as relative to the size of a Supernode + double _batch_factor = 1; // Configuration for the guttering system GutteringConfiguration _gutter_conf; @@ -54,7 +56,9 @@ class GraphConfiguration { GraphConfiguration& group_size(size_t group_size); - GraphConfiguration& adtl_skts_factor(double factor); + GraphConfiguration& sketches_factor(double factor); + + GraphConfiguration& batch_factor(double factor); GutteringConfiguration& gutter_conf(); diff --git a/include/supernode.h b/include/supernode.h index 4780434b..8f4b584d 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -92,6 +92,7 @@ class Supernode { double skt_factor = 1) { Sketch::configure(n * n, sketch_fail_factor); max_sketches = log2(n) / (log2(3) - 1) * skt_factor; + if (max_sketches < 1) max_sketches = 1; bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof(); serialized_size = max_sketches * Sketch::serialized_size(); } @@ -210,7 +211,11 @@ class Supernode { // void write_sparse_binary_range(std::ostream&binary_out, uint32_t beg, uint32_t end); +#ifdef L0_SAMPLING + static constexpr size_t default_fail_factor = 100; +#else static constexpr size_t default_fail_factor = 4; +#endif }; diff --git a/src/graph.cpp b/src/graph.cpp index 4de359c3..692200df 100644 --- a/src/graph.cpp +++ b/src/graph.cpp @@ -20,7 +20,7 @@ Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters) #ifdef VERIFY_SAMPLES_F std::cout << "Verifying samples..." << std::endl; #endif - Supernode::configure(num_nodes, Supernode::default_fail_factor, config._adtl_skts_factor); + Supernode::configure(num_nodes, Supernode::default_fail_factor, config._sketches_factor); representatives = new std::set(); supernodes = new Supernode*[num_nodes]; parent = new std::remove_reference::type[num_nodes]; @@ -35,6 +35,11 @@ Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters) supernodes[i] = Supernode::makeSupernode(num_nodes,seed); parent[i] = i; } + + // set the leaf size of the guttering system appropriately + if (config._gutter_conf.get_gutter_bytes() == GutteringConfiguration::uninit_param) { + config._gutter_conf.gutter_bytes(Supernode::get_size() * config._batch_factor); + } backup_file = config._disk_dir + "supernode_backup.data"; // Create the guttering system @@ -59,13 +64,13 @@ Graph::Graph(const std::string& input_file, GraphConfiguration config, int num_i if (open_graph) throw MultipleGraphsException(); vec_t sketch_fail_factor; - double adtl_skts_factor; + double sketches_factor; auto binary_in = std::fstream(input_file, std::ios::in | std::ios::binary); binary_in.read((char*)&seed, sizeof(seed)); binary_in.read((char*)&num_nodes, sizeof(num_nodes)); binary_in.read((char*)&sketch_fail_factor, sizeof(sketch_fail_factor)); - binary_in.read((char*)&adtl_skts_factor, sizeof(adtl_skts_factor)); - Supernode::configure(num_nodes, sketch_fail_factor, adtl_skts_factor); + binary_in.read((char*)&sketches_factor, sizeof(sketches_factor)); + Supernode::configure(num_nodes, sketch_fail_factor, sketches_factor); #ifdef VERIFY_SAMPLES_F std::cout << "Verifying samples..." << std::endl; @@ -82,6 +87,11 @@ Graph::Graph(const std::string& input_file, GraphConfiguration config, int num_i } binary_in.close(); + // set the leaf size of the guttering system appropriately + if (config._gutter_conf.get_gutter_bytes() == GutteringConfiguration::uninit_param) { + config._gutter_conf.gutter_bytes(Supernode::get_size() * config._batch_factor); + } + backup_file = config._disk_dir + "supernode_backup.data"; // Create the guttering system if (config._gutter_sys == GUTTERTREE) @@ -487,7 +497,7 @@ void Graph::write_binary(const std::string& filename) { binary_out.write((char*)&seed, sizeof(seed)); binary_out.write((char*)&num_nodes, sizeof(num_nodes)); binary_out.write((char*)&fail_factor, sizeof(fail_factor)); - binary_out.write((char*)&config._adtl_skts_factor, sizeof(config._adtl_skts_factor)); + binary_out.write((char*)&config._sketches_factor, sizeof(config._sketches_factor)); for (node_id_t i = 0; i < num_nodes; ++i) { supernodes[i]->write_binary(binary_out); } diff --git a/src/graph_configuration.cpp b/src/graph_configuration.cpp index c808c046..aa584bed 100644 --- a/src/graph_configuration.cpp +++ b/src/graph_configuration.cpp @@ -37,17 +37,27 @@ GraphConfiguration& GraphConfiguration::group_size(size_t group_size) { return *this; } -GraphConfiguration& GraphConfiguration::adtl_skts_factor(double factor) { - _adtl_skts_factor = factor; - if (_adtl_skts_factor <= 1) { - std::cout << "adtl_skts_factor=" << _adtl_skts_factor << " is out of bounds. [1, infty)" +GraphConfiguration& GraphConfiguration::sketches_factor(double factor) { + _sketches_factor = factor; + if (_sketches_factor < 0) { + std::cout << "adtl_skts_factor=" << _sketches_factor << " is out of bounds. (0, infty)" << "Defaulting to 1." << std::endl; - _adtl_skts_factor = 1; + _sketches_factor = 1; } - if (_adtl_skts_factor > 1) { - std::cerr << "WARNING: Your graph configuration specifies using a factor " << _adtl_skts_factor - << " more memory than normal." << std::endl; - std::cerr << " Is this intentional? If not, set adtl_skts_factor to one" << std::endl; + if (_sketches_factor != 1) { + std::cerr << "WARNING: Your graph configuration specifies using a factor " << _sketches_factor + << " of the normal quantity of sketches." << std::endl; + std::cerr << " Is this intentional? If not, set adtl_skts_factor to one!" << std::endl; + } + return *this; +} + +GraphConfiguration& GraphConfiguration::batch_factor(double factor) { + _batch_factor = factor; + if (_sketches_factor < 0) { + std::cout << "sketches_factor=" << _sketches_factor << " is out of bounds. (0, infty)" + << "Defaulting to 1." << std::endl; + _sketches_factor = 1; } return *this; } diff --git a/src/l0_sampling/sketch.cpp b/src/l0_sampling/sketch.cpp index e1c74d14..bddf5ac6 100644 --- a/src/l0_sampling/sketch.cpp +++ b/src/l0_sampling/sketch.cpp @@ -72,6 +72,25 @@ Sketch::Sketch(const Sketch& s) : seed(s.seed) { std::memcpy(bucket_c, s.bucket_c, num_elems * sizeof(vec_hash_t)); } +#ifdef L0_SAMPLING +void Sketch::update(const vec_t update_idx) { + vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed()); + + // Update depth 0 bucket + Bucket_Boruvka::update(bucket_a[num_elems - 1], bucket_c[num_elems - 1], update_idx, checksum); + + // Update higher depth buckets + for (unsigned i = 0; i < num_columns; ++i) { + col_hash_t depth = Bucket_Boruvka::get_index_depth(update_idx, column_seed(i), num_guesses); + likely_if(depth < num_guesses) { + for (col_hash_t j = 0; j <= depth; ++j) { + size_t bucket_id = i * num_guesses + j; + Bucket_Boruvka::update(bucket_a[bucket_id], bucket_c[bucket_id], update_idx, checksum); + } + } + } +} +#else // Use support finding algorithm instead. Faster but no guarantee of uniform sample. void Sketch::update(const vec_t update_idx) { vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed()); @@ -86,6 +105,7 @@ void Sketch::update(const vec_t update_idx) { Bucket_Boruvka::update(bucket_a[bucket_id], bucket_c[bucket_id], update_idx, checksum); } } +#endif void Sketch::batch_update(const std::vector& updates) { for (const auto& update_idx : updates) { diff --git a/test/sketch_test.cpp b/test/sketch_test.cpp index f7b6fdab..08fb72fe 100644 --- a/test/sketch_test.cpp +++ b/test/sketch_test.cpp @@ -141,9 +141,9 @@ void test_sketch_sample(unsigned long num_sketches, } TEST(SketchTestSuite, TestSketchSample) { - test_sketch_sample(10000, 1e3, 100, 0.005, 0.02); - test_sketch_sample(1000, 1e4, 1000, 0.001, 0.02); - test_sketch_sample(1000, 1e5, 10000, 0.001, 0.02); + test_sketch_sample(10000, 1e3, 100, 0.001, 0.03); + test_sketch_sample(1000, 1e4, 1000, 0.001, 0.03); + test_sketch_sample(1000, 1e5, 10000, 0.001, 0.03); } /** @@ -209,9 +209,9 @@ void test_sketch_addition(unsigned long num_sketches, } TEST(SketchTestSuite, TestSketchAddition){ - test_sketch_addition(10000, 1e3, 100, 0.005, 0.02); - test_sketch_addition(1000, 1e4, 1000, 0.001, 0.02); - test_sketch_addition(1000, 1e5, 10000, 0.001, 0.02); + test_sketch_addition(10000, 1e3, 100, 0.001, 0.03); + test_sketch_addition(1000, 1e4, 1000, 0.001, 0.03); + test_sketch_addition(1000, 1e5, 10000, 0.001, 0.03); } /** diff --git a/tools/process_stream.cpp b/tools/process_stream.cpp index e62b1d43..0a74cdf3 100644 --- a/tools/process_stream.cpp +++ b/tools/process_stream.cpp @@ -78,8 +78,10 @@ int main(int argc, char **argv) { std::cout << "num_updates = " << num_updates << std::endl; std::cout << std::endl; - auto config = GraphConfiguration().gutter_sys(STANDALONE).num_groups(num_threads); - config.gutter_conf().gutter_factor(-4); + auto config = GraphConfiguration() + .gutter_sys(STANDALONE) + .num_groups(num_threads) + .batch_factor(-4); Graph g{num_nodes, config, reader_threads}; auto ins_start = std::chrono::steady_clock::now(); From 91eee64bf802b19f426e768b9e6f53622790fdd8 Mon Sep 17 00:00:00 2001 From: Evan West Date: Thu, 10 Aug 2023 11:48:01 -0400 Subject: [PATCH 2/9] 2 columns and bug fixes --- include/l0_sampling/sketch.h | 2 +- include/supernode.h | 3 +-- src/graph_configuration.cpp | 6 +++--- tools/process_stream.cpp | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/include/l0_sampling/sketch.h b/include/l0_sampling/sketch.h index 81bc5701..15c76304 100644 --- a/include/l0_sampling/sketch.h +++ b/include/l0_sampling/sketch.h @@ -168,7 +168,7 @@ class Sketch { // max number of non-zeroes in vector is n/2*n/2=n^2/4 static size_t guess_gen(size_t x) { return double_to_ull(log2(x) - 2); } - static size_t column_gen(size_t d) { return double_to_ull((log2(d) + 1)); } + static size_t column_gen(size_t d) { return double_to_ull(log2(d)); } }; class MultipleQueryException : public std::exception { diff --git a/include/supernode.h b/include/supernode.h index 8f4b584d..8cc263b4 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -92,7 +92,6 @@ class Supernode { double skt_factor = 1) { Sketch::configure(n * n, sketch_fail_factor); max_sketches = log2(n) / (log2(3) - 1) * skt_factor; - if (max_sketches < 1) max_sketches = 1; bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof(); serialized_size = max_sketches * Sketch::serialized_size(); } @@ -212,7 +211,7 @@ class Supernode { // void write_sparse_binary_range(std::ostream&binary_out, uint32_t beg, uint32_t end); #ifdef L0_SAMPLING - static constexpr size_t default_fail_factor = 100; + static constexpr size_t default_fail_factor = 128; #else static constexpr size_t default_fail_factor = 4; #endif diff --git a/src/graph_configuration.cpp b/src/graph_configuration.cpp index aa584bed..e19c7025 100644 --- a/src/graph_configuration.cpp +++ b/src/graph_configuration.cpp @@ -54,10 +54,10 @@ GraphConfiguration& GraphConfiguration::sketches_factor(double factor) { GraphConfiguration& GraphConfiguration::batch_factor(double factor) { _batch_factor = factor; - if (_sketches_factor < 0) { - std::cout << "sketches_factor=" << _sketches_factor << " is out of bounds. (0, infty)" + if (_batch_factor < 0) { + std::cout << "batch factor=" << _batch_factor << " is out of bounds. (0, infty)" << "Defaulting to 1." << std::endl; - _sketches_factor = 1; + _batch_factor = 1; } return *this; } diff --git a/tools/process_stream.cpp b/tools/process_stream.cpp index 0a74cdf3..4281af26 100644 --- a/tools/process_stream.cpp +++ b/tools/process_stream.cpp @@ -81,7 +81,7 @@ int main(int argc, char **argv) { auto config = GraphConfiguration() .gutter_sys(STANDALONE) .num_groups(num_threads) - .batch_factor(-4); + .batch_factor(1.0 / 2); Graph g{num_nodes, config, reader_threads}; auto ins_start = std::chrono::steady_clock::now(); From 87436a4c1e3185371c6f69e71adf5906934482d2 Mon Sep 17 00:00:00 2001 From: Evan West Date: Fri, 18 Aug 2023 11:52:17 -0400 Subject: [PATCH 3/9] improve configuration some more. Remove unused options. --- example_streaming.conf | 25 --- include/graph_configuration.h | 9 +- include/graph_worker.h | 4 +- src/graph.cpp | 174 ++++++++++---------- src/graph_configuration.cpp | 40 ++--- src/graph_worker.cpp | 1 - src/supernode.cpp | 4 +- test/graph_test.cpp | 146 ++++++++-------- test/supernode_test.cpp | 6 +- tools/process_stream.cpp | 2 +- tools/statistical_testing/graph_testing.cpp | 2 +- 11 files changed, 189 insertions(+), 224 deletions(-) delete mode 100755 example_streaming.conf diff --git a/example_streaming.conf b/example_streaming.conf deleted file mode 100755 index ddc3d4ce..00000000 --- a/example_streaming.conf +++ /dev/null @@ -1,25 +0,0 @@ -# Should we use the GutterTree as the buffering system or the -# standalone gutters ("tree" for GutterTree, "standalone" for -# standalone gutters, "cachetree" for cache aware guttering). -# Type:String -buffering_system=tree - -# The directory where external memory datastructures -# are to be stored. -# Type:String -disk_dir=. - -# When performing queries during the stream, should supernodes be -# backed up in memory("ON") or to disk("OFF"). -# In memory is preferred when enough memory is available. -# Type:Bool -backup_in_mem=ON - -# How many graph workers should we use. -# Type:Integer -num_groups=1 - -# How many OMP threads should each graph worker use. -# Generally num_groups * group_size <= number of available threads -# Type:Integer -group_size=1 diff --git a/include/graph_configuration.h b/include/graph_configuration.h index 3a008b66..254679db 100644 --- a/include/graph_configuration.h +++ b/include/graph_configuration.h @@ -24,10 +24,7 @@ class GraphConfiguration { bool _backup_in_mem = true; // The number of graph workers - size_t _num_groups = 1; - - // How many OMP threads each graph worker uses - size_t _group_size = 1; + size_t _num_graph_workers = 1; // Option to create more sketches than for standard connected components // Ex factor of 1.5, 1.5 times the sketches @@ -52,9 +49,7 @@ class GraphConfiguration { GraphConfiguration& backup_in_mem(bool backup_in_mem); - GraphConfiguration& num_groups(size_t num_groups); - - GraphConfiguration& group_size(size_t group_size); + GraphConfiguration& num_graph_workers(size_t num_groups); GraphConfiguration& sketches_factor(double factor); diff --git a/include/graph_worker.h b/include/graph_worker.h index e96a0f18..c8a86dc9 100644 --- a/include/graph_worker.h +++ b/include/graph_worker.h @@ -32,8 +32,7 @@ class GraphWorker { // manage configuration // configuration should be set before calling start_workers static int get_num_groups() {return num_groups;} // return the number of GraphWorkers - static int get_group_size() {return group_size;} // return the number of threads in each worker - static void set_config(int g, int s) { num_groups = g; group_size = s; } + static void set_config(int g) { num_groups = g; } private: /** * Create a GraphWorker object by setting metadata and spinning up a thread. @@ -69,7 +68,6 @@ class GraphWorker { // configuration static int num_groups; - static int group_size; static long supernode_size; // list of all GraphWorkers diff --git a/src/graph.cpp b/src/graph.cpp index 692200df..983170ff 100644 --- a/src/graph.cpp +++ b/src/graph.cpp @@ -1,20 +1,22 @@ -#include -#include -#include -#include -#include +#include "../include/graph.h" +#include #include #include -#include -#include "../include/graph.h" + +#include +#include +#include +#include +#include + #include "../include/graph_worker.h" // static variable for enforcing that only one graph is open at a time bool Graph::open_graph = false; -Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters) : - num_nodes(num_nodes), config(config), num_updates(0) { +Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters) + : num_nodes(num_nodes), config(config), num_updates(0) { if (open_graph) throw MultipleGraphsException(); #ifdef VERIFY_SAMPLES_F @@ -22,17 +24,19 @@ Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters) #endif Supernode::configure(num_nodes, Supernode::default_fail_factor, config._sketches_factor); representatives = new std::set(); - supernodes = new Supernode*[num_nodes]; + supernodes = new Supernode *[num_nodes]; parent = new std::remove_reference::type[num_nodes]; size = new node_id_t[num_nodes]; - seed = std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count(); + seed = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count(); std::mt19937_64 r(seed); seed = r(); std::fill(size, size + num_nodes, 1); for (node_id_t i = 0; i < num_nodes; ++i) { representatives->insert(i); - supernodes[i] = Supernode::makeSupernode(num_nodes,seed); + supernodes[i] = Supernode::makeSupernode(num_nodes, seed); parent[i] = i; } @@ -40,46 +44,49 @@ Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters) if (config._gutter_conf.get_gutter_bytes() == GutteringConfiguration::uninit_param) { config._gutter_conf.gutter_bytes(Supernode::get_size() * config._batch_factor); } - + backup_file = config._disk_dir + "supernode_backup.data"; // Create the guttering system if (config._gutter_sys == GUTTERTREE) - gts = new GutterTree(config._disk_dir, num_nodes, config._num_groups, config._gutter_conf, true); + gts = new GutterTree(config._disk_dir, num_nodes, config._num_graph_workers, + config._gutter_conf, true); else if (config._gutter_sys == STANDALONE) - gts = new StandAloneGutters(num_nodes, config._num_groups, num_inserters, config._gutter_conf); + gts = new StandAloneGutters(num_nodes, config._num_graph_workers, num_inserters, + config._gutter_conf); else - gts = new CacheGuttering(num_nodes, config._num_groups, num_inserters, config._gutter_conf); + gts = new CacheGuttering(num_nodes, config._num_graph_workers, num_inserters, + config._gutter_conf); - GraphWorker::set_config(config._num_groups, config._group_size); + GraphWorker::set_config(config._num_graph_workers); GraphWorker::start_workers(this, gts, Supernode::get_size()); open_graph = true; spanning_forest = new std::unordered_set[num_nodes]; spanning_forest_mtx = new std::mutex[num_nodes]; dsu_valid = true; - std::cout << config << std::endl; // print the graph configuration + std::cout << config << std::endl; // print the graph configuration } -Graph::Graph(const std::string& input_file, GraphConfiguration config, int num_inserters) : - config(config), num_updates(0) { +Graph::Graph(const std::string &input_file, GraphConfiguration config, int num_inserters) + : config(config), num_updates(0) { if (open_graph) throw MultipleGraphsException(); - + vec_t sketch_fail_factor; double sketches_factor; auto binary_in = std::fstream(input_file, std::ios::in | std::ios::binary); - binary_in.read((char*)&seed, sizeof(seed)); - binary_in.read((char*)&num_nodes, sizeof(num_nodes)); - binary_in.read((char*)&sketch_fail_factor, sizeof(sketch_fail_factor)); - binary_in.read((char*)&sketches_factor, sizeof(sketches_factor)); + binary_in.read((char *)&seed, sizeof(seed)); + binary_in.read((char *)&num_nodes, sizeof(num_nodes)); + binary_in.read((char *)&sketch_fail_factor, sizeof(sketch_fail_factor)); + binary_in.read((char *)&sketches_factor, sizeof(sketches_factor)); Supernode::configure(num_nodes, sketch_fail_factor, sketches_factor); #ifdef VERIFY_SAMPLES_F std::cout << "Verifying samples..." << std::endl; #endif representatives = new std::set(); - supernodes = new Supernode*[num_nodes]; + supernodes = new Supernode *[num_nodes]; parent = new std::remove_reference::type[num_nodes]; size = new node_id_t[num_nodes]; - std::fill(size, size+num_nodes, 1); + std::fill(size, size + num_nodes, 1); for (node_id_t i = 0; i < num_nodes; ++i) { representatives->insert(i); supernodes[i] = Supernode::makeSupernode(num_nodes, seed, binary_in); @@ -95,46 +102,47 @@ Graph::Graph(const std::string& input_file, GraphConfiguration config, int num_i backup_file = config._disk_dir + "supernode_backup.data"; // Create the guttering system if (config._gutter_sys == GUTTERTREE) - gts = new GutterTree(config._disk_dir, num_nodes, config._num_groups, config._gutter_conf, true); + gts = new GutterTree(config._disk_dir, num_nodes, config._num_graph_workers, + config._gutter_conf, true); else if (config._gutter_sys == STANDALONE) - gts = new StandAloneGutters(num_nodes, config._num_groups, num_inserters, config._gutter_conf); + gts = new StandAloneGutters(num_nodes, config._num_graph_workers, num_inserters, + config._gutter_conf); else - gts = new CacheGuttering(num_nodes, config._num_groups, num_inserters, config._gutter_conf); + gts = new CacheGuttering(num_nodes, config._num_graph_workers, num_inserters, + config._gutter_conf); - GraphWorker::set_config(config._num_groups, config._group_size); + GraphWorker::set_config(config._num_graph_workers); GraphWorker::start_workers(this, gts, Supernode::get_size()); open_graph = true; spanning_forest = new std::unordered_set[num_nodes]; spanning_forest_mtx = new std::mutex[num_nodes]; dsu_valid = false; - std::cout << config << std::endl; // print the graph configuration + std::cout << config << std::endl; // print the graph configuration } Graph::~Graph() { - for (unsigned i=0;i &edges, Supernode *delta_loc) { +void Graph::generate_delta_node(node_id_t node_n, uint64_t node_seed, node_id_t src, + const std::vector &edges, Supernode *delta_loc) { std::vector updates; updates.reserve(edges.size()); - for (const auto& edge : edges) { + for (const auto &edge : edges) { if (src < edge) { - updates.push_back(static_cast( - concat_pairing_fn(src, edge))); + updates.push_back(static_cast(concat_pairing_fn(src, edge))); } else { - updates.push_back(static_cast( - concat_pairing_fn(edge, src))); + updates.push_back(static_cast(concat_pairing_fn(edge, src))); } } Supernode::delta_supernode(node_n, node_seed, updates, delta_loc); @@ -148,11 +156,11 @@ void Graph::batch_update(node_id_t src, const std::vector &edges, Sup } inline void Graph::sample_supernodes(std::pair *query, - std::vector &reps) { + std::vector &reps) { bool except = false; std::exception_ptr err; - #pragma omp parallel for default(none) shared(query, reps, except, err) - for (node_id_t i = 0; i < reps.size(); ++i) { // NOLINT(modernize-loop-convert) +#pragma omp parallel for default(none) shared(query, reps, except, err) + for (node_id_t i = 0; i < reps.size(); ++i) { // NOLINT(modernize-loop-convert) // wrap in a try/catch because exiting through exception is undefined behavior in OMP try { query[reps[i]] = supernodes[reps[i]]->sample(); @@ -181,8 +189,7 @@ inline std::vector> Graph::supernodes_to_merge( new_reps.push_back(i); continue; } - if (ret_code == ZERO) - continue; + if (ret_code == ZERO) continue; // query dsu node_id_t a = get_parent(edge.src); @@ -194,7 +201,7 @@ inline std::vector> Graph::supernodes_to_merge( #endif // make a the parent of b - if (size[a] < size[b]) std::swap(a,b); + if (size[a] < size[b]) std::swap(a, b); parent[b] = a; size[a] += size[b]; @@ -225,17 +232,17 @@ inline std::vector> Graph::supernodes_to_merge( return to_merge; } -void Graph::merge_supernodes(Supernode** copy_supernodes, std::vector &new_reps, - std::vector> &to_merge, bool make_copy) { +void Graph::merge_supernodes(Supernode **copy_supernodes, std::vector &new_reps, + std::vector> &to_merge, bool make_copy) { bool except = false; std::exception_ptr err; - // loop over the to_merge vector and perform supernode merging - #pragma omp parallel for default(shared) - for (node_id_t i = 0; i < new_reps.size(); i++) { // NOLINT(modernize-loop-convert) +// loop over the to_merge vector and perform supernode merging +#pragma omp parallel for default(shared) + for (node_id_t i = 0; i < new_reps.size(); i++) { // NOLINT(modernize-loop-convert) // OMP requires a traditional for-loop to work node_id_t a = new_reps[i]; try { - if (make_copy && config._backup_in_mem) { // make a copy of a + if (make_copy && config._backup_in_mem) { // make a copy of a copy_supernodes[a] = Supernode::makeSupernode(*supernodes[a]); } @@ -254,28 +261,27 @@ void Graph::merge_supernodes(Supernode** copy_supernodes, std::vector } std::vector> Graph::boruvka_emulation(bool make_copy) { - printf("Total number of updates to sketches before CC %lu\n", num_updates.load()); // REMOVE this later - update_locked = true; // disallow updating the graph after we run the alg + printf("Total number of updates to sketches before CC %lu\n", + num_updates.load()); // REMOVE this later + update_locked = true; // disallow updating the graph after we run the alg cc_alg_start = std::chrono::steady_clock::now(); bool first_round = true; - Supernode** copy_supernodes; - if (make_copy && config._backup_in_mem) - copy_supernodes = new Supernode*[num_nodes]; + Supernode **copy_supernodes; + if (make_copy && config._backup_in_mem) copy_supernodes = new Supernode *[num_nodes]; std::pair *query = new std::pair[num_nodes]; std::vector reps(num_nodes); std::vector backed_up; std::fill(size, size + num_nodes, 1); for (node_id_t i = 0; i < num_nodes; ++i) { reps[i] = i; - if (make_copy && config._backup_in_mem) - copy_supernodes[i] = nullptr; + if (make_copy && config._backup_in_mem) copy_supernodes[i] = nullptr; } // function to restore supernodes after CC if make_copy is specified auto cleanup_copy = [&make_copy, this, &backed_up, ©_supernodes]() { if (make_copy) { - if(config._backup_in_mem) { + if (config._backup_in_mem) { // restore original supernodes and free memory for (node_id_t i : backed_up) { if (supernodes[i] != nullptr) free(supernodes[i]); @@ -327,7 +333,7 @@ std::vector> Graph::boruvka_emulation(bool make_copy) { return retval; } -void Graph::backup_to_disk(const std::vector& ids_to_backup) { +void Graph::backup_to_disk(const std::vector &ids_to_backup) { // Make a copy on disk std::fstream binary_out(backup_file, std::ios::out | std::ios::binary); if (!binary_out.is_open()) { @@ -342,7 +348,7 @@ void Graph::backup_to_disk(const std::vector& ids_to_backup) { // given a list of ids restore those supernodes from disk // IMPORTANT: ids_to_restore must be the same as ids_to_backup -void Graph::restore_from_disk(const std::vector& ids_to_restore) { +void Graph::restore_from_disk(const std::vector &ids_to_restore) { // restore from disk std::fstream binary_in(backup_file, std::ios::in | std::ios::binary); if (!binary_in.is_open()) { @@ -360,12 +366,12 @@ std::vector> Graph::connected_components(bool cont) { if (dsu_valid && cont #ifdef VERIFY_SAMPLES_F && !fail_round_2 -#endif // VERIFY_SAMPLES_F - ) { +#endif // VERIFY_SAMPLES_F + ) { cc_alg_start = flush_start = flush_end = std::chrono::steady_clock::now(); #ifdef VERIFY_SAMPLES_F for (node_id_t src = 0; src < num_nodes; ++src) { - for (const auto& dst : spanning_forest[src]) { + for (const auto &dst : spanning_forest[src]) { verifier->verify_edge({src, dst}); } } @@ -379,20 +385,20 @@ std::vector> Graph::connected_components(bool cont) { } flush_start = std::chrono::steady_clock::now(); - gts->force_flush(); // flush everything in guttering system to make final updates - GraphWorker::pause_workers(); // wait for the workers to finish applying the updates + gts->force_flush(); // flush everything in guttering system to make final updates + GraphWorker::pause_workers(); // wait for the workers to finish applying the updates flush_end = std::chrono::steady_clock::now(); // after this point all updates have been processed from the buffer tree std::vector> ret; if (!cont) { - ret = boruvka_emulation(false); // merge in place + ret = boruvka_emulation(false); // merge in place #ifdef VERIFY_SAMPLES_F verifier->verify_soln(ret); #endif return ret; } - + // if backing up in memory then perform copying in boruvka bool except = false; std::exception_ptr err; @@ -423,11 +429,10 @@ std::vector> Graph::connected_components(bool cont) { std::vector> Graph::cc_from_dsu() { // calculate connected components using DSU structure std::map> temp; - for (node_id_t i = 0; i < num_nodes; ++i) - temp[get_parent(i)].insert(i); + for (node_id_t i = 0; i < num_nodes; ++i) temp[get_parent(i)].insert(i); std::vector> retval; retval.reserve(temp.size()); - for (const auto& it : temp) retval.push_back(it.second); + for (const auto &it : temp) retval.push_back(it.second); return retval; } @@ -437,7 +442,7 @@ bool Graph::point_query(node_id_t a, node_id_t b) { cc_alg_start = flush_start = flush_end = std::chrono::steady_clock::now(); #ifdef VERIFY_SAMPLES_F for (node_id_t src = 0; src < num_nodes; ++src) { - for (const auto& dst : spanning_forest[src]) { + for (const auto &dst : spanning_forest[src]) { verifier->verify_edge({src, dst}); } } @@ -447,10 +452,9 @@ bool Graph::point_query(node_id_t a, node_id_t b) { return retval; } - flush_start = std::chrono::steady_clock::now(); - gts->force_flush(); // flush everything in guttering system to make final updates - GraphWorker::pause_workers(); // wait for the workers to finish applying the updates + gts->force_flush(); // flush everything in guttering system to make final updates + GraphWorker::pause_workers(); // wait for the workers to finish applying the updates flush_end = std::chrono::steady_clock::now(); // after this point all updates have been processed from the buffer tree @@ -487,17 +491,17 @@ node_id_t Graph::get_parent(node_id_t node) { return parent[node] = get_parent(parent[node]); } -void Graph::write_binary(const std::string& filename) { - gts->force_flush(); // flush everything in buffering system to make final updates - GraphWorker::pause_workers(); // wait for the workers to finish applying the updates +void Graph::write_binary(const std::string &filename) { + gts->force_flush(); // flush everything in buffering system to make final updates + GraphWorker::pause_workers(); // wait for the workers to finish applying the updates // after this point all updates have been processed from the buffering system auto binary_out = std::fstream(filename, std::ios::out | std::ios::binary); auto fail_factor = Sketch::get_failure_factor(); - binary_out.write((char*)&seed, sizeof(seed)); - binary_out.write((char*)&num_nodes, sizeof(num_nodes)); - binary_out.write((char*)&fail_factor, sizeof(fail_factor)); - binary_out.write((char*)&config._sketches_factor, sizeof(config._sketches_factor)); + binary_out.write((char *)&seed, sizeof(seed)); + binary_out.write((char *)&num_nodes, sizeof(num_nodes)); + binary_out.write((char *)&fail_factor, sizeof(fail_factor)); + binary_out.write((char *)&config._sketches_factor, sizeof(config._sketches_factor)); for (node_id_t i = 0; i < num_nodes; ++i) { supernodes[i]->write_binary(binary_out); } diff --git a/src/graph_configuration.cpp b/src/graph_configuration.cpp index e19c7025..7f03c673 100644 --- a/src/graph_configuration.cpp +++ b/src/graph_configuration.cpp @@ -17,44 +17,34 @@ GraphConfiguration& GraphConfiguration::backup_in_mem(bool backup_in_mem) { return *this; } -GraphConfiguration& GraphConfiguration::num_groups(size_t num_groups) { - _num_groups = num_groups; - if (_num_groups < 1) { - std::cout << "num_groups="<< _num_groups << " is out of bounds. [1, infty)" +GraphConfiguration& GraphConfiguration::num_graph_workers(size_t num_graph_workers) { + _num_graph_workers = num_graph_workers; + if (_num_graph_workers < 1) { + std::cout << "num_graph_workers="<< _num_graph_workers << " is out of bounds. [1, infty)" << "Defaulting to 1." << std::endl; - _num_groups = 1; - } - return *this; -} - -GraphConfiguration& GraphConfiguration::group_size(size_t group_size) { - _group_size = group_size; - if (_group_size < 1) { - std::cout << "group_size="<< _group_size << " is out of bounds. [1, infty)" - << "Defaulting to 1." << std::endl; - _group_size = 1; + _num_graph_workers = 1; } return *this; } GraphConfiguration& GraphConfiguration::sketches_factor(double factor) { _sketches_factor = factor; - if (_sketches_factor < 0) { - std::cout << "adtl_skts_factor=" << _sketches_factor << " is out of bounds. (0, infty)" + if (_sketches_factor <= 0) { + std::cout << "sketches_factor=" << _sketches_factor << " is out of bounds. (0, infty)" << "Defaulting to 1." << std::endl; _sketches_factor = 1; } if (_sketches_factor != 1) { std::cerr << "WARNING: Your graph configuration specifies using a factor " << _sketches_factor << " of the normal quantity of sketches." << std::endl; - std::cerr << " Is this intentional? If not, set adtl_skts_factor to one!" << std::endl; + std::cerr << " Is this intentional? If not, set sketches_factor to one!" << std::endl; } return *this; } GraphConfiguration& GraphConfiguration::batch_factor(double factor) { _batch_factor = factor; - if (_batch_factor < 0) { + if (_batch_factor <= 0) { std::cout << "batch factor=" << _batch_factor << " is out of bounds. (0, infty)" << "Defaulting to 1." << std::endl; _batch_factor = 1; @@ -73,11 +63,17 @@ std::ostream& operator<< (std::ostream &out, const GraphConfiguration &conf) { gutter_system = "GutterTree"; else if (conf._gutter_sys == CACHETREE) gutter_system = "CacheTree"; +#ifdef L0_SAMPLING + out << " Sketching algorithm = CubeSketch" << std::endl; +#else + out << " Sketching algorithm = CameoSketch" << std::endl; +#endif out << " Guttering system = " << gutter_system << std::endl; - out << " Number of groups = " << conf._num_groups << std::endl; - out << " Size of groups = " << conf._group_size << std::endl; + out << " Num sketches factor = " << conf._sketches_factor << std::endl; + out << " Batch size factor = " << conf._batch_factor << std::endl; + out << " Graph worker count = " << conf._num_graph_workers << std::endl; out << " On disk data location = " << conf._disk_dir << std::endl; out << " Backup sketch to RAM = " << (conf._backup_in_mem? "ON" : "OFF") << std::endl; out << conf._gutter_conf; return out; - } \ No newline at end of file + } diff --git a/src/graph_worker.cpp b/src/graph_worker.cpp index da3dee17..466225f2 100644 --- a/src/graph_worker.cpp +++ b/src/graph_worker.cpp @@ -11,7 +11,6 @@ bool GraphWorker::shutdown = false; bool GraphWorker::paused = false; // controls whether threads should pause or resume work int GraphWorker::num_groups = 1; -int GraphWorker::group_size = 1; long GraphWorker::supernode_size; GraphWorker **GraphWorker::workers; std::condition_variable GraphWorker::pause_condition; diff --git a/src/supernode.cpp b/src/supernode.cpp index 8d473b96..caa018a5 100644 --- a/src/supernode.cpp +++ b/src/supernode.cpp @@ -96,7 +96,8 @@ std::pair Supernode::sample() { std::pair, SampleSketchRet> Supernode::exhaustive_sample() { if (out_of_queries()) throw OutOfQueriesException(); - std::pair, SampleSketchRet> query_ret = get_sketch(sample_idx++)->exhaustive_query(); + std::pair, SampleSketchRet> query_ret = + get_sketch(sample_idx++)->exhaustive_query(); std::unordered_set edges(query_ret.first.size()); for (const auto &query_item: query_ret.first) { edges.insert(inv_concat_pairing_fn(query_item)); @@ -162,7 +163,6 @@ void Supernode::apply_delta_update(const Supernode* delta_node) { void Supernode::delta_supernode(uint64_t n, uint64_t seed, const std::vector &updates, void *loc) { auto delta_node = makeSupernode(n, seed, loc); -#pragma omp parallel for num_threads(GraphWorker::get_group_size()) default(shared) for (size_t i = 0; i < delta_node->num_sketches; ++i) { delta_node->get_sketch(i)->batch_update(updates); } diff --git a/test/graph_test.cpp b/test/graph_test.cpp index 6ad7d860..e8b44108 100644 --- a/test/graph_test.cpp +++ b/test/graph_test.cpp @@ -1,18 +1,21 @@ +#include "../include/graph.h" + +#include #include -#include + #include -#include "../include/graph.h" +#include + #include "../graph_worker.h" #include "../include/test/file_graph_verifier.h" -#include "../include/test/mat_graph_verifier.h" #include "../include/test/graph_gen.h" -#include +#include "../include/test/mat_graph_verifier.h" /** * For many of these tests (especially for those upon very sparse and small graphs) * we allow for a certain number of failures per test. - * This is because the responsibility of these tests is to quickly alert us - * to “this code is very wrong” whereas the statistical testing is responsible + * This is because the responsibility of these tests is to quickly alert us + * to “this code is very wrong” whereas the statistical testing is responsible * for a more fine grained analysis. * In this context a false positive is much worse than a false negative. * With 2 failures allowed per test our entire testing suite should fail 1/5000 runs. @@ -20,10 +23,9 @@ // We create this class and instantiate a paramaterized test suite so that we // can run these tests both with the GutterTree and with StandAloneGutters -class GraphTest : public testing::TestWithParam { - -}; -INSTANTIATE_TEST_SUITE_P(GraphTestSuite, GraphTest, testing::Values(GUTTERTREE, STANDALONE, CACHETREE)); +class GraphTest : public testing::TestWithParam {}; +INSTANTIATE_TEST_SUITE_P(GraphTestSuite, GraphTest, + testing::Values(GUTTERTREE, STANDALONE, CACHETREE)); TEST_P(GraphTest, SmallGraphConnectivity) { auto config = GraphConfiguration().gutter_sys(GetParam()); @@ -41,7 +43,8 @@ TEST_P(GraphTest, SmallGraphConnectivity) { in >> a >> b; g.update({{a, b}, INSERT}); } - g.set_verifier(std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); + g.set_verifier( + std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); ASSERT_EQ(78, g.connected_components().size()); } @@ -61,10 +64,11 @@ TEST(GraphTest, IFconnectedComponentsAlgRunTHENupdateLocked) { in >> a >> b; g.update({{a, b}, INSERT}); } - g.set_verifier(std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); + g.set_verifier( + std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); g.connected_components(); - ASSERT_THROW(g.update({{1,2}, INSERT}), UpdateLockedException); - ASSERT_THROW(g.update({{1,2}, DELETE}), UpdateLockedException); + ASSERT_THROW(g.update({{1, 2}, INSERT}), UpdateLockedException); + ASSERT_THROW(g.update({{1, 2}, DELETE}), UpdateLockedException); } TEST(GraphTest, TestSupernodeRestoreAfterCCFailure) { @@ -84,7 +88,8 @@ TEST(GraphTest, TestSupernodeRestoreAfterCCFailure) { in >> a >> b; g.update({{a, b}, INSERT}); } - g.set_verifier(std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); + g.set_verifier( + std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); g.should_fail_CC(); // flush to make sure copy supernodes is consistent with graph supernodes @@ -98,8 +103,7 @@ TEST(GraphTest, TestSupernodeRestoreAfterCCFailure) { ASSERT_THROW(g.connected_components(true), OutOfQueriesException); for (node_id_t i = 0; i < num_nodes; ++i) { for (int j = 0; j < Supernode::get_max_sketches(); ++j) { - ASSERT_TRUE(*copy_supernodes[i]->get_sketch(j) == - *g.supernodes[i]->get_sketch(j)); + ASSERT_TRUE(*copy_supernodes[i]->get_sketch(j) == *g.supernodes[i]->get_sketch(j)); } } } @@ -119,9 +123,10 @@ TEST_P(GraphTest, TestCorrectnessOnSmallRandomGraphs) { node_id_t a, b; while (m--) { in >> type >> a >> b; - if (type == INSERT) { + if (type == INSERT) g.update({{a, b}, INSERT}); - } else g.update({{a, b}, DELETE}); + else + g.update({{a, b}, DELETE}); } g.set_verifier(std::make_unique(n, "./cumul_sample.txt")); @@ -132,8 +137,8 @@ TEST_P(GraphTest, TestCorrectnessOnSmallRandomGraphs) { TEST_P(GraphTest, TestCorrectnessOnSmallSparseGraphs) { auto config = GraphConfiguration().gutter_sys(GetParam()); int num_trials = 5; - while(num_trials--) { - generate_stream({1024,0.002,0.5,0,"./sample.txt","./cumul_sample.txt"}); + while (num_trials--) { + generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); std::ifstream in{"./sample.txt"}; node_id_t n; edge_id_t m; @@ -145,49 +150,51 @@ TEST_P(GraphTest, TestCorrectnessOnSmallSparseGraphs) { in >> type >> a >> b; if (type == INSERT) { g.update({{a, b}, INSERT}); - } else g.update({{a, b}, DELETE}); + } else + g.update({{a, b}, DELETE}); } g.set_verifier(std::make_unique(1024, "./cumul_sample.txt")); g.connected_components(); - } + } } TEST_P(GraphTest, TestCorrectnessOfReheating) { auto config = GraphConfiguration().gutter_sys(GetParam()); int num_trials = 5; while (num_trials--) { - generate_stream({1024,0.002,0.5,0,"./sample.txt","./cumul_sample.txt"}); + generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); std::ifstream in{"./sample.txt"}; node_id_t n; edge_id_t m; in >> n >> m; - Graph *g = new Graph (n, config); + Graph* g = new Graph(n, config); int type; node_id_t a, b; printf("number of updates = %lu\n", m); while (m--) { in >> type >> a >> b; - if (type == INSERT) g->update({{a, b}, INSERT}); - else g->update({{a, b}, DELETE}); + if (type == INSERT) + g->update({{a, b}, INSERT}); + else + g->update({{a, b}, DELETE}); } g->write_binary("./out_temp.txt"); g->set_verifier(std::make_unique(1024, "./cumul_sample.txt")); std::vector> g_res; g_res = g->connected_components(); printf("number of CC = %lu\n", g_res.size()); - delete g; // delete g to avoid having multiple graphs open at once. Which is illegal. + delete g; // delete g to avoid having multiple graphs open at once. Which is illegal. - Graph reheated {"./out_temp.txt"}; + Graph reheated{"./out_temp.txt"}; reheated.set_verifier(std::make_unique(1024, "./cumul_sample.txt")); auto reheated_res = reheated.connected_components(); printf("number of reheated CC = %lu\n", reheated_res.size()); ASSERT_EQ(g_res.size(), reheated_res.size()); for (unsigned i = 0; i < g_res.size(); ++i) { std::vector symdif; - std::set_symmetric_difference(g_res[i].begin(), g_res[i].end(), - reheated_res[i].begin(), reheated_res[i].end(), - std::back_inserter(symdif)); + std::set_symmetric_difference(g_res[i].begin(), g_res[i].end(), reheated_res[i].begin(), + reheated_res[i].end(), std::back_inserter(symdif)); ASSERT_EQ(0, symdif.size()); } } @@ -196,13 +203,10 @@ TEST_P(GraphTest, TestCorrectnessOfReheating) { // Test the multithreaded system by specifiying multiple // Graph Workers of size 2. Ingest a stream and run CC algorithm. TEST_P(GraphTest, MultipleWorkers) { - auto config = GraphConfiguration() - .gutter_sys(GetParam()) - .num_groups(4) - .group_size(2); + auto config = GraphConfiguration().gutter_sys(GetParam()).num_graph_workers(8); int num_trials = 5; - while(num_trials--) { - generate_stream({1024,0.002,0.5,0,"./sample.txt","./cumul_sample.txt"}); + while (num_trials--) { + generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); std::ifstream in{"./sample.txt"}; node_id_t n; edge_id_t m; @@ -214,12 +218,13 @@ TEST_P(GraphTest, MultipleWorkers) { in >> type >> a >> b; if (type == INSERT) { g.update({{a, b}, INSERT}); - } else g.update({{a, b}, DELETE}); + } else + g.update({{a, b}, DELETE}); } g.set_verifier(std::make_unique(1024, "./cumul_sample.txt")); g.connected_components(); - } + } } TEST_P(GraphTest, TestPointQuery) { @@ -238,9 +243,10 @@ TEST_P(GraphTest, TestPointQuery) { in >> a >> b; g.update({{a, b}, INSERT}); } - g.set_verifier(std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); + g.set_verifier( + std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); std::vector> ret = g.connected_components(true); - std::vector ccid (num_nodes); + std::vector ccid(num_nodes); for (node_id_t i = 0; i < ret.size(); ++i) { for (const node_id_t node : ret[i]) { ccid[node] = i; @@ -248,17 +254,16 @@ TEST_P(GraphTest, TestPointQuery) { } for (node_id_t i = 0; i < std::min(10u, num_nodes); ++i) { for (node_id_t j = 0; j < std::min(10u, num_nodes); ++j) { - g.set_verifier(std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); + g.set_verifier( + std::make_unique(1024, curr_dir + "/res/multiples_graph_1024.txt")); ASSERT_EQ(g.point_query(i, j), ccid[i] == ccid[j]); } } } TEST(GraphTest, TestQueryDuringStream) { - auto config = GraphConfiguration() - .gutter_sys(STANDALONE) - .backup_in_mem(false); - { // test copying to disk + auto config = GraphConfiguration().gutter_sys(STANDALONE).backup_in_mem(false); + { // test copying to disk generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); std::ifstream in{"./sample.txt"}; node_id_t n; @@ -270,10 +275,10 @@ TEST(GraphTest, TestQueryDuringStream) { int type; node_id_t a, b; edge_id_t tenth = m / 10; - for(int j = 0; j < 9; j++) { + for (int j = 0; j < 9; j++) { for (edge_id_t i = 0; i < tenth; i++) { in >> type >> a >> b; - g.update({{a,b}, (UpdateType)type}); + g.update({{a, b}, (UpdateType)type}); verify.edge_update(a, b); } verify.reset_cc_state(); @@ -281,9 +286,9 @@ TEST(GraphTest, TestQueryDuringStream) { g.connected_components(true); } m -= 9 * tenth; - while(m--) { + while (m--) { in >> type >> a >> b; - g.update({{a,b}, (UpdateType)type}); + g.update({{a, b}, (UpdateType)type}); verify.edge_update(a, b); } verify.reset_cc_state(); @@ -292,7 +297,7 @@ TEST(GraphTest, TestQueryDuringStream) { } config.backup_in_mem(true); - { // test copying in memory + { // test copying in memory generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); std::ifstream in{"./sample.txt"}; node_id_t n; @@ -304,10 +309,10 @@ TEST(GraphTest, TestQueryDuringStream) { int type; node_id_t a, b; edge_id_t tenth = m / 10; - for(int j = 0; j < 9; j++) { + for (int j = 0; j < 9; j++) { for (edge_id_t i = 0; i < tenth; i++) { in >> type >> a >> b; - g.update({{a,b}, (UpdateType)type}); + g.update({{a, b}, (UpdateType)type}); verify.edge_update(a, b); } verify.reset_cc_state(); @@ -315,9 +320,9 @@ TEST(GraphTest, TestQueryDuringStream) { g.connected_components(true); } m -= 9 * tenth; - while(m--) { + while (m--) { in >> type >> a >> b; - g.update({{a,b}, (UpdateType)type}); + g.update({{a, b}, (UpdateType)type}); verify.edge_update(a, b); } verify.reset_cc_state(); @@ -385,20 +390,19 @@ TEST(GraphTest, MultipleInsertThreads) { in >> n >> m; int per_thread = m / num_threads; Graph g(n, config, num_threads); - std::vector> updates(num_threads, - std::vector(per_thread)); + std::vector> updates(num_threads, std::vector(per_thread)); int type; node_id_t a, b; for (int i = 0; i < num_threads; ++i) { for (int j = 0; j < per_thread; ++j) { in >> type >> a >> b; - updates[i][j] = {{a,b}, (UpdateType)type}; + updates[i][j] = {{a, b}, (UpdateType)type}; } } for (edge_id_t i = per_thread * num_threads; i < m; ++i) { in >> type >> a >> b; - g.update({{a,b}, (UpdateType)type}); + g.update({{a, b}, (UpdateType)type}); } auto task = [&updates, &g](int id) { @@ -421,7 +425,7 @@ TEST(GraphTest, MultipleInsertThreads) { } TEST(GraphTest, MTStreamWithMultipleQueries) { - for(int i = 1; i <= 3; i++) { + for (int i = 1; i <= 3; i++) { auto config = GraphConfiguration().gutter_sys(STANDALONE); const std::string fname = __FILE__; @@ -449,15 +453,16 @@ TEST(GraphTest, MTStreamWithMultipleQueries) { num_queries = 10; int upd_per_query = num_edges / num_queries; int query_idx = upd_per_query; - ASSERT_TRUE(stream.register_query(query_idx)); // register first query + ASSERT_TRUE(stream.register_query(query_idx)); // register first query // task for threads that insert to the graph and perform queries auto task = [&](const int thr_id) { MT_StreamReader reader(stream); GraphUpdate upd; - while(true) { + while (true) { upd = reader.get_edge(); - if (upd.type == BREAKPOINT && num_queries == 0) return; + if (upd.type == BREAKPOINT && num_queries == 0) + return; else if (upd.type == BREAKPOINT) { query_done = false; if (thr_id > 0) { @@ -469,7 +474,7 @@ TEST(GraphTest, MTStreamWithMultipleQueries) { // wait for query to finish lk.lock(); - q_done_cond.wait(lk, [&](){return query_done;}); + q_done_cond.wait(lk, [&]() { return query_done; }); num_query_ready--; lk.unlock(); } else { @@ -477,9 +482,7 @@ TEST(GraphTest, MTStreamWithMultipleQueries) { // wait for other threads to be done applying updates std::unique_lock lk(q_lock); num_query_ready++; - q_ready_cond.wait(lk, [&](){ - return num_query_ready >= inserter_threads; - }); + q_ready_cond.wait(lk, [&]() { return num_query_ready >= inserter_threads; }); // add updates to verifier and perform query for (int j = 0; j < upd_per_query; j++) { @@ -492,7 +495,7 @@ TEST(GraphTest, MTStreamWithMultipleQueries) { // inform other threads that we're ready to continue processing queries stream.post_query_resume(); - if(num_queries > 1) { + if (num_queries > 1) { // prepare next query query_idx += upd_per_query; ASSERT_TRUE(stream.register_query(query_idx)); @@ -503,8 +506,7 @@ TEST(GraphTest, MTStreamWithMultipleQueries) { lk.unlock(); q_done_cond.notify_all(); } - } - else if (upd.type == INSERT || upd.type == DELETE) + } else if (upd.type == INSERT || upd.type == DELETE) g.update(upd, thr_id); else throw std::invalid_argument("Did not recognize edge code!"); @@ -521,7 +523,7 @@ TEST(GraphTest, MTStreamWithMultipleQueries) { } // process the rest of the stream into the MatGraphVerifier - for(size_t i = query_idx; i < num_edges; i++) { + for (size_t i = query_idx; i < num_edges; i++) { GraphUpdate upd = verify_stream.get_edge(); verify.edge_update(upd.edge.src, upd.edge.dst); } diff --git a/test/supernode_test.cpp b/test/supernode_test.cpp index 35959ff6..65df6d3a 100644 --- a/test/supernode_test.cpp +++ b/test/supernode_test.cpp @@ -205,9 +205,7 @@ TEST_F(SupernodeTestSuite, TestBatchUpdate) { } TEST_F(SupernodeTestSuite, TestConcurrency) { - int num_threads_per_group = 2; - unsigned num_threads = - std::thread::hardware_concurrency() / num_threads_per_group - 1; // hyperthreading? + unsigned num_threads = std::thread::hardware_concurrency() - 1; unsigned vec_len = 1000000; unsigned num_updates = 100000; Supernode::configure(vec_len); @@ -223,8 +221,6 @@ TEST_F(SupernodeTestSuite, TestConcurrency) { Supernode* supernode = Supernode::makeSupernode(vec_len, seed); Supernode* piecemeal = Supernode::makeSupernode(vec_len, seed); - GraphWorker::set_config(0, num_threads_per_group); // set number of threads per omp parallel - // concurrently run batch_updates std::thread thd[num_threads]; for (unsigned i = 0; i < num_threads; ++i) { diff --git a/tools/process_stream.cpp b/tools/process_stream.cpp index 4281af26..a91bbad4 100644 --- a/tools/process_stream.cpp +++ b/tools/process_stream.cpp @@ -80,7 +80,7 @@ int main(int argc, char **argv) { auto config = GraphConfiguration() .gutter_sys(STANDALONE) - .num_groups(num_threads) + .num_graph_workers(num_threads) .batch_factor(1.0 / 2); Graph g{num_nodes, config, reader_threads}; diff --git a/tools/statistical_testing/graph_testing.cpp b/tools/statistical_testing/graph_testing.cpp index 130b049c..470ca590 100644 --- a/tools/statistical_testing/graph_testing.cpp +++ b/tools/statistical_testing/graph_testing.cpp @@ -58,7 +58,7 @@ int main() { // setup configuration file per buffering config.gutter_sys(use_tree ? GUTTERTREE : STANDALONE); - config.num_groups(4); + config.num_graph_workers(4); std::string prefix = use_tree? "tree" : "gutters"; std::string test_name; From ed84ee9fc86d05ca20774be8bb81e1f9172fcdb3 Mon Sep 17 00:00:00 2001 From: Evan West Date: Sat, 30 Sep 2023 21:41:29 -0400 Subject: [PATCH 4/9] 2 columns for cameo --- include/supernode.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/supernode.h b/include/supernode.h index 8cc263b4..02a174b7 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -91,7 +91,7 @@ class Supernode { static inline void configure(uint64_t n, vec_t sketch_fail_factor = default_fail_factor, double skt_factor = 1) { Sketch::configure(n * n, sketch_fail_factor); - max_sketches = log2(n) / (log2(3) - 1) * skt_factor; + max_sketches = log2(n) / (log2(4) - log2(3)) * skt_factor; bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof(); serialized_size = max_sketches * Sketch::serialized_size(); } @@ -213,7 +213,7 @@ class Supernode { #ifdef L0_SAMPLING static constexpr size_t default_fail_factor = 128; #else - static constexpr size_t default_fail_factor = 4; + static constexpr size_t default_fail_factor = 2; #endif }; From 9c7795de3c25aa8c36d64c13b0d5d460a4c9f1a6 Mon Sep 17 00:00:00 2001 From: Evan West Date: Sat, 30 Sep 2023 22:10:06 -0400 Subject: [PATCH 5/9] bug fix for serialization and test to catch it --- include/supernode.h | 2 +- test/supernode_test.cpp | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/include/supernode.h b/include/supernode.h index 02a174b7..ff7d617f 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -93,7 +93,7 @@ class Supernode { Sketch::configure(n * n, sketch_fail_factor); max_sketches = log2(n) / (log2(4) - log2(3)) * skt_factor; bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof(); - serialized_size = max_sketches * Sketch::serialized_size(); + serialized_size = max_sketches * Sketch::serialized_size() + sizeof(SerialType); } static inline size_t get_size() { diff --git a/test/supernode_test.cpp b/test/supernode_test.cpp index 65df6d3a..219c0db7 100644 --- a/test/supernode_test.cpp +++ b/test/supernode_test.cpp @@ -1,6 +1,7 @@ #include "../include/supernode.h" #include +#include #include #include @@ -260,10 +261,16 @@ TEST_F(SupernodeTestSuite, TestSerialization) { snodes[num_nodes / 2]->write_binary(file); file.close(); - auto in_file = std::fstream("./out_supernode.txt", std::ios::in | std::ios::binary); + // Get the size of the serialized Supernode file + struct stat stat_buf; + ASSERT_EQ(stat("./out_supernode.txt", &stat_buf), 0); + ASSERT_EQ(Supernode::get_serialized_size(), stat_buf.st_size); + // Create a supernode from the file + auto in_file = std::fstream("./out_supernode.txt", std::ios::in | std::ios::binary); Supernode* reheated = Supernode::makeSupernode(num_nodes, seed, in_file); + // Assert the Supernodes match for (int i = 0; i < Supernode::get_max_sketches(); ++i) { ASSERT_EQ(*snodes[num_nodes / 2]->get_sketch(i), *reheated->get_sketch(i)); } From c57af97add76776c1dc4dcf46dd04c89df8b464d Mon Sep 17 00:00:00 2001 From: Evan West Date: Mon, 2 Oct 2023 19:16:16 -0400 Subject: [PATCH 6/9] num sketches as function of number of columns --- include/supernode.h | 4 +++- test/sketch_test.cpp | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/include/supernode.h b/include/supernode.h index ff7d617f..4022cdfe 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -91,7 +91,7 @@ class Supernode { static inline void configure(uint64_t n, vec_t sketch_fail_factor = default_fail_factor, double skt_factor = 1) { Sketch::configure(n * n, sketch_fail_factor); - max_sketches = log2(n) / (log2(4) - log2(3)) * skt_factor; + max_sketches = (log2(n) / num_sketches_div) * skt_factor; bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof(); serialized_size = max_sketches * Sketch::serialized_size() + sizeof(SerialType); } @@ -212,8 +212,10 @@ class Supernode { #ifdef L0_SAMPLING static constexpr size_t default_fail_factor = 128; + static constexpr double num_sketches_div = log2(3) - 1; #else static constexpr size_t default_fail_factor = 2; + static constexpr double num_sketches_div = log2(4) - log2(3); #endif }; diff --git a/test/sketch_test.cpp b/test/sketch_test.cpp index 08fb72fe..2e7fc0d0 100644 --- a/test/sketch_test.cpp +++ b/test/sketch_test.cpp @@ -4,7 +4,7 @@ #include "../include/test/testing_vector.h" #include "../include/test/sketch_constructors.h" -static const int fail_factor = 100; +static const int fail_factor = 128; TEST(SketchTestSuite, TestExceptions) { Sketch::configure(100, fail_factor); From 8088e5a0339bc70ad1f8dcab6e451504290ac82d Mon Sep 17 00:00:00 2001 From: Evan West Date: Wed, 4 Oct 2023 12:50:11 -0400 Subject: [PATCH 7/9] switch configuration to directly specifying number of columns --- include/l0_sampling/sketch.h | 14 +++++--------- include/supernode.h | 8 ++++---- src/graph.cpp | 12 ++++++------ src/l0_sampling/sketch.cpp | 1 - src/supernode.cpp | 4 ++-- test/sketch_test.cpp | 30 +++++++++++++++--------------- tools/benchmark/graphcc_bench.cpp | 2 +- tools/process_stream.cpp | 2 +- 8 files changed, 34 insertions(+), 39 deletions(-) diff --git a/include/l0_sampling/sketch.h b/include/l0_sampling/sketch.h index 15c76304..958b3a38 100644 --- a/include/l0_sampling/sketch.h +++ b/include/l0_sampling/sketch.h @@ -28,7 +28,6 @@ enum SampleSketchRet { */ class Sketch { private: - static vec_t failure_factor; // Pr(failure) = 1 / factor. Determines number of columns in sketch. static vec_t n; // Length of the vector this is sketching. static size_t num_elems; // length of our actual arrays in number of elements static size_t num_columns; // Portion of array length, number of columns @@ -49,7 +48,7 @@ class Sketch { FRIEND_TEST(EXPR_Parallelism, N10kU100k); // Buckets of this sketch. - // Length is column_gen(failure_factor) * guess_gen(n). + // Length is num_columns * guess_gen(n). // For buckets[i * guess_gen(n) + j], the bucket has a 1/2^j probability // of containing an index. The first two are pointers into the buckets array. alignas(vec_t) char buckets[]; @@ -83,13 +82,12 @@ class Sketch { /* configure the static variables of sketches * @param n Length of the vector to sketch. (static variable) - * @param failure_factor 1/factor = Failure rate for sketch (determines column width) + * @param num_columns Column width, determines the failure probability of the sketch * @return nothing */ - inline static void configure(vec_t _n, vec_t _factor) { + inline static void configure(vec_t _n, vec_t _num_columns) { n = _n; - failure_factor = _factor; - num_columns = column_gen(failure_factor); + num_columns = _num_columns; num_guesses = guess_gen(n); num_elems = num_columns * num_guesses + 1; // +1 for zero bucket optimization } @@ -103,8 +101,6 @@ class Sketch { return num_elems * (sizeof(vec_t) + sizeof(vec_hash_t)); } - inline static vec_t get_failure_factor() { return failure_factor; } - inline void reset_queried() { already_queried = false; } inline static size_t get_columns() { return num_columns; } @@ -168,7 +164,7 @@ class Sketch { // max number of non-zeroes in vector is n/2*n/2=n^2/4 static size_t guess_gen(size_t x) { return double_to_ull(log2(x) - 2); } - static size_t column_gen(size_t d) { return double_to_ull(log2(d)); } + static size_t column_gen(size_t d) { return double_to_ull(ceil(log2(d))); } }; class MultipleQueryException : public std::exception { diff --git a/include/supernode.h b/include/supernode.h index 4022cdfe..a5e4289d 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -88,9 +88,9 @@ class Supernode { ~Supernode(); - static inline void configure(uint64_t n, vec_t sketch_fail_factor = default_fail_factor, + static inline void configure(uint64_t n, vec_t sketch_num_columns = default_num_columns, double skt_factor = 1) { - Sketch::configure(n * n, sketch_fail_factor); + Sketch::configure(n * n, sketch_num_columns); max_sketches = (log2(n) / num_sketches_div) * skt_factor; bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof(); serialized_size = max_sketches * Sketch::serialized_size() + sizeof(SerialType); @@ -211,10 +211,10 @@ class Supernode { // void write_sparse_binary_range(std::ostream&binary_out, uint32_t beg, uint32_t end); #ifdef L0_SAMPLING - static constexpr size_t default_fail_factor = 128; + static constexpr size_t default_num_columns = 7; static constexpr double num_sketches_div = log2(3) - 1; #else - static constexpr size_t default_fail_factor = 2; + static constexpr size_t default_num_columns = 2; static constexpr double num_sketches_div = log2(4) - log2(3); #endif }; diff --git a/src/graph.cpp b/src/graph.cpp index 983170ff..5f2419df 100644 --- a/src/graph.cpp +++ b/src/graph.cpp @@ -22,7 +22,7 @@ Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters) #ifdef VERIFY_SAMPLES_F std::cout << "Verifying samples..." << std::endl; #endif - Supernode::configure(num_nodes, Supernode::default_fail_factor, config._sketches_factor); + Supernode::configure(num_nodes, Supernode::default_num_columns, config._sketches_factor); representatives = new std::set(); supernodes = new Supernode *[num_nodes]; parent = new std::remove_reference::type[num_nodes]; @@ -70,14 +70,14 @@ Graph::Graph(const std::string &input_file, GraphConfiguration config, int num_i : config(config), num_updates(0) { if (open_graph) throw MultipleGraphsException(); - vec_t sketch_fail_factor; + vec_t sketch_num_columns; double sketches_factor; auto binary_in = std::fstream(input_file, std::ios::in | std::ios::binary); binary_in.read((char *)&seed, sizeof(seed)); binary_in.read((char *)&num_nodes, sizeof(num_nodes)); - binary_in.read((char *)&sketch_fail_factor, sizeof(sketch_fail_factor)); + binary_in.read((char *)&sketch_num_columns, sizeof(sketch_num_columns)); binary_in.read((char *)&sketches_factor, sizeof(sketches_factor)); - Supernode::configure(num_nodes, sketch_fail_factor, sketches_factor); + Supernode::configure(num_nodes, sketch_num_columns, sketches_factor); #ifdef VERIFY_SAMPLES_F std::cout << "Verifying samples..." << std::endl; @@ -497,10 +497,10 @@ void Graph::write_binary(const std::string &filename) { // after this point all updates have been processed from the buffering system auto binary_out = std::fstream(filename, std::ios::out | std::ios::binary); - auto fail_factor = Sketch::get_failure_factor(); + vec_t sketch_num_columns = Sketch::get_columns(); binary_out.write((char *)&seed, sizeof(seed)); binary_out.write((char *)&num_nodes, sizeof(num_nodes)); - binary_out.write((char *)&fail_factor, sizeof(fail_factor)); + binary_out.write((char *)&sketch_num_columns, sizeof(sketch_num_columns)); binary_out.write((char *)&config._sketches_factor, sizeof(config._sketches_factor)); for (node_id_t i = 0; i < num_nodes; ++i) { supernodes[i]->write_binary(binary_out); diff --git a/src/l0_sampling/sketch.cpp b/src/l0_sampling/sketch.cpp index bddf5ac6..6212e2b0 100644 --- a/src/l0_sampling/sketch.cpp +++ b/src/l0_sampling/sketch.cpp @@ -3,7 +3,6 @@ #include #include -vec_t Sketch::failure_factor = 100; vec_t Sketch::n; size_t Sketch::num_elems; size_t Sketch::num_columns; diff --git a/src/supernode.cpp b/src/supernode.cpp index caa018a5..fa66162e 100644 --- a/src/supernode.cpp +++ b/src/supernode.cpp @@ -11,7 +11,7 @@ Supernode::Supernode(uint64_t n, uint64_t seed): sample_idx(0), n(n), seed(seed), num_sketches(max_sketches), merged_sketches(max_sketches), sketch_size(Sketch::sketchSizeof()) { - size_t sketch_width = Sketch::column_gen(Sketch::get_failure_factor()); + size_t sketch_width = Sketch::get_columns(); // generate num_sketches sketches for each supernode (read: node) for (size_t i = 0; i < num_sketches; ++i) { Sketch::makeSketch(get_sketch(i), seed); @@ -22,7 +22,7 @@ Supernode::Supernode(uint64_t n, uint64_t seed): sample_idx(0), Supernode::Supernode(uint64_t n, uint64_t seed, std::istream &binary_in) : sample_idx(0), n(n), seed(seed), sketch_size(Sketch::sketchSizeof()) { - size_t sketch_width = Sketch::column_gen(Sketch::get_failure_factor()); + size_t sketch_width = Sketch::get_columns(); SerialType type; binary_in.read((char*) &type, sizeof(SerialType)); diff --git a/test/sketch_test.cpp b/test/sketch_test.cpp index 2e7fc0d0..6e865f60 100644 --- a/test/sketch_test.cpp +++ b/test/sketch_test.cpp @@ -4,10 +4,10 @@ #include "../include/test/testing_vector.h" #include "../include/test/sketch_constructors.h" -static const int fail_factor = 128; +static const int num_columns = 7; TEST(SketchTestSuite, TestExceptions) { - Sketch::configure(100, fail_factor); + Sketch::configure(100, num_columns); SketchUniquePtr sketch1 = makeSketch(rand()); ASSERT_EQ(sketch1->query().second, ZERO); ASSERT_THROW(sketch1->query(), MultipleQueryException); @@ -15,13 +15,13 @@ TEST(SketchTestSuite, TestExceptions) { /** * Find a vector that makes no good buckets */ - Sketch::configure(10000, fail_factor); + Sketch::configure(10000, num_columns); SketchUniquePtr sketch2 = makeSketch(0); std::vector vec_idx(sketch2->n, true); - unsigned long long num_columns = Sketch::column_gen(fail_factor); - unsigned long long num_guesses = Sketch::guess_gen(sketch2->n); + unsigned long long columns = num_columns; + unsigned long long guesses = Sketch::guess_gen(sketch2->n); size_t total_updates = 2; - for (unsigned long long i = 0; i < num_columns;) { + for (unsigned long long i = 0; i < columns;) { size_t depth_1_updates = 0; size_t k = 0; size_t u = 0; @@ -31,7 +31,7 @@ TEST(SketchTestSuite, TestExceptions) { continue; } - col_hash_t depth = Bucket_Boruvka::get_index_depth(k, sketch2->column_seed(i), num_guesses); + col_hash_t depth = Bucket_Boruvka::get_index_depth(k, sketch2->column_seed(i), guesses); if (depth >= 2) { vec_idx[k] = false; // force all updates to only touch depths <= 1 i = 0; @@ -61,7 +61,7 @@ TEST(SketchTestSuite, TestExceptions) { TEST(SketchTestSuite, GIVENonlyIndexZeroUpdatedTHENitWorks) { // GIVEN only the index 0 is updated - Sketch::configure(1000, fail_factor); + Sketch::configure(1000, num_columns); SketchUniquePtr sketch = makeSketch(rand()); sketch->update(0); sketch->update(0); @@ -82,7 +82,7 @@ TEST(SketchTestSuite, GIVENonlyIndexZeroUpdatedTHENitWorks) { void test_sketch_sample(unsigned long num_sketches, unsigned long vec_size, unsigned long num_updates, double max_sample_fail_prob, double max_bucket_fail_prob) { - Sketch::configure(vec_size, fail_factor); + Sketch::configure(vec_size, num_columns); std::chrono::duration runtime(0); unsigned long all_bucket_failures = 0; @@ -152,7 +152,7 @@ TEST(SketchTestSuite, TestSketchSample) { void test_sketch_addition(unsigned long num_sketches, unsigned long vec_size, unsigned long num_updates, double max_sample_fail_prob, double max_bucket_fail_prob) { - Sketch::configure(vec_size, fail_factor); + Sketch::configure(vec_size, num_columns); unsigned long all_bucket_failures = 0; unsigned long sample_incorrect_failures = 0; @@ -218,7 +218,7 @@ TEST(SketchTestSuite, TestSketchAddition){ * Large sketch test */ void test_sketch_large(unsigned long vec_size, unsigned long num_updates) { - Sketch::configure(vec_size, vec_size); + Sketch::configure(vec_size, Sketch::column_gen(vec_size)); // we use an optimization in our sketching that is valid when solving CC // we assume that max number of non-zeroes in vector is vec_size / 4 @@ -280,7 +280,7 @@ TEST(SketchTestSuite, TestSketchLarge) { TEST(SketchTestSuite, TestBatchUpdate) { unsigned long vec_size = 1000000000, num_updates = 1000000; - Sketch::configure(vec_size, fail_factor); + Sketch::configure(vec_size, num_columns); std::vector updates(num_updates); for (unsigned long i = 0; i < num_updates; i++) { updates[i] = static_cast(rand() % vec_size); @@ -303,7 +303,7 @@ TEST(SketchTestSuite, TestBatchUpdate) { TEST(SketchTestSuite, TestSerialization) { unsigned long vec_size = 1 << 20; unsigned long num_updates = 10000; - Sketch::configure(vec_size, fail_factor); + Sketch::configure(vec_size, num_columns); Testing_Vector test_vec = Testing_Vector(vec_size, num_updates); auto seed = rand(); SketchUniquePtr sketch = makeSketch(seed); @@ -323,7 +323,7 @@ TEST(SketchTestSuite, TestSerialization) { TEST(SketchTestSuite, TestSparseSerialization) { unsigned long vec_size = 1 << 20; unsigned long num_updates = 10000; - Sketch::configure(vec_size, fail_factor); + Sketch::configure(vec_size, num_columns); Testing_Vector test_vec = Testing_Vector(vec_size, num_updates); auto seed = rand(); SketchUniquePtr sketch = makeSketch(seed); @@ -343,7 +343,7 @@ TEST(SketchTestSuite, TestSparseSerialization) { TEST(SketchTestSuite, TestExhaustiveQuery) { size_t runs = 10; size_t vec_size = 10; - Sketch::configure(vec_size, vec_size*vec_size); + Sketch::configure(vec_size, Sketch::column_gen(vec_size*vec_size)); for (size_t i = 0; i < runs; i++) { SketchUniquePtr sketch = makeSketch(rand()); diff --git a/tools/benchmark/graphcc_bench.cpp b/tools/benchmark/graphcc_bench.cpp index 29a21370..07e6889b 100644 --- a/tools/benchmark/graphcc_bench.cpp +++ b/tools/benchmark/graphcc_bench.cpp @@ -193,7 +193,7 @@ static void BM_Sketch_Update(benchmark::State& state) { size_t vec_size = state.range(0); vec_t input = vec_size / 3; // initialize sketches - Sketch::configure(vec_size, Supernode::default_fail_factor); + Sketch::configure(vec_size, Supernode::default_num_columns); SketchUniquePtr skt = makeSketch(seed); // Test the speed of updating the sketches diff --git a/tools/process_stream.cpp b/tools/process_stream.cpp index a91bbad4..51740b5a 100644 --- a/tools/process_stream.cpp +++ b/tools/process_stream.cpp @@ -81,7 +81,7 @@ int main(int argc, char **argv) { auto config = GraphConfiguration() .gutter_sys(STANDALONE) .num_graph_workers(num_threads) - .batch_factor(1.0 / 2); + .batch_factor(1); Graph g{num_nodes, config, reader_threads}; auto ins_start = std::chrono::steady_clock::now(); From e367864db2946afa7efc815067cc6d20194604ac Mon Sep 17 00:00:00 2001 From: Evan West Date: Wed, 4 Oct 2023 12:57:18 -0400 Subject: [PATCH 8/9] two columns = log_3/2 rounds --- include/supernode.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/supernode.h b/include/supernode.h index a5e4289d..ab9f7336 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -215,7 +215,7 @@ class Supernode { static constexpr double num_sketches_div = log2(3) - 1; #else static constexpr size_t default_num_columns = 2; - static constexpr double num_sketches_div = log2(4) - log2(3); + static constexpr double num_sketches_div = log2(3) - 1; #endif }; From 43f3f1078de8f98863a05ef177ebf94bc53ddb9e Mon Sep 17 00:00:00 2001 From: Evan West Date: Wed, 4 Oct 2023 13:05:33 -0400 Subject: [PATCH 9/9] switch to main --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e9554ec..01e5e468 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,7 +55,7 @@ FetchContent_Declare( GutterTree GIT_REPOSITORY https://github.com/GraphStreamingProject/GutterTree.git - GIT_TAG better_config + GIT_TAG main ) if (BUILD_BENCH)