diff --git a/include/graph_configuration.h b/include/graph_configuration.h index fa0a8623..6d4d61d9 100644 --- a/include/graph_configuration.h +++ b/include/graph_configuration.h @@ -30,6 +30,11 @@ class GraphConfiguration { // How many OMP threads each graph worker uses size_t _group_size = 1; + // Option to create more sketches than for standard connected components + // Ex factor of 1.5, 1.5 times the sketches + // factor of 1, normal quantity of sketches + double _adtl_skts_factor = 1; + // Configuration for the guttering system GutteringConfiguration _gutter_conf; @@ -49,6 +54,8 @@ class GraphConfiguration { GraphConfiguration& group_size(size_t group_size); + GraphConfiguration& adtl_skts_factor(double factor); + GutteringConfiguration& gutter_conf(); friend std::ostream& operator<< (std::ostream &out, const GraphConfiguration &conf); diff --git a/include/supernode.h b/include/supernode.h index 800ee49c..4780434b 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -88,9 +88,10 @@ class Supernode { ~Supernode(); - static inline void configure(uint64_t n, vec_t sketch_fail_factor=default_fail_factor) { - Sketch::configure(n*n, sketch_fail_factor); - max_sketches = log2(n)/(log2(3)-1); + static inline void configure(uint64_t n, vec_t sketch_fail_factor = default_fail_factor, + double skt_factor = 1) { + Sketch::configure(n * n, sketch_fail_factor); + max_sketches = log2(n) / (log2(3) - 1) * skt_factor; bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof(); serialized_size = max_sketches * Sketch::serialized_size(); } diff --git a/src/graph.cpp b/src/graph.cpp index ddedbe71..4de359c3 100644 --- a/src/graph.cpp +++ b/src/graph.cpp @@ -20,7 +20,7 @@ Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters) #ifdef VERIFY_SAMPLES_F std::cout << "Verifying samples..." << std::endl; #endif - Supernode::configure(num_nodes); + Supernode::configure(num_nodes, Supernode::default_fail_factor, config._adtl_skts_factor); representatives = new std::set(); supernodes = new Supernode*[num_nodes]; parent = new std::remove_reference::type[num_nodes]; @@ -59,11 +59,13 @@ Graph::Graph(const std::string& input_file, GraphConfiguration config, int num_i if (open_graph) throw MultipleGraphsException(); vec_t sketch_fail_factor; + double adtl_skts_factor; auto binary_in = std::fstream(input_file, std::ios::in | std::ios::binary); binary_in.read((char*)&seed, sizeof(seed)); binary_in.read((char*)&num_nodes, sizeof(num_nodes)); binary_in.read((char*)&sketch_fail_factor, sizeof(sketch_fail_factor)); - Supernode::configure(num_nodes, sketch_fail_factor); + binary_in.read((char*)&adtl_skts_factor, sizeof(adtl_skts_factor)); + Supernode::configure(num_nodes, sketch_fail_factor, adtl_skts_factor); #ifdef VERIFY_SAMPLES_F std::cout << "Verifying samples..." << std::endl; @@ -213,7 +215,7 @@ inline std::vector> Graph::supernodes_to_merge( return to_merge; } -inline void Graph::merge_supernodes(Supernode** copy_supernodes, std::vector &new_reps, +void Graph::merge_supernodes(Supernode** copy_supernodes, std::vector &new_reps, std::vector> &to_merge, bool make_copy) { bool except = false; std::exception_ptr err; @@ -280,6 +282,7 @@ std::vector> Graph::boruvka_emulation(bool make_copy) { parent[i] = i; spanning_forest[i].clear(); } + size_t round_num = 1; try { do { modified = false; @@ -297,6 +300,7 @@ std::vector> Graph::boruvka_emulation(bool make_copy) { if (!first_round && fail_round_2) throw OutOfQueriesException(); #endif first_round = false; + ++round_num; } while (modified); } catch (...) { cleanup_copy(); @@ -307,6 +311,7 @@ std::vector> Graph::boruvka_emulation(bool make_copy) { delete[] query; dsu_valid = true; + std::cout << "Query complete in " << round_num << " rounds." << std::endl; auto retval = cc_from_dsu(); cc_alg_end = std::chrono::steady_clock::now(); return retval; @@ -482,6 +487,7 @@ void Graph::write_binary(const std::string& filename) { binary_out.write((char*)&seed, sizeof(seed)); binary_out.write((char*)&num_nodes, sizeof(num_nodes)); binary_out.write((char*)&fail_factor, sizeof(fail_factor)); + binary_out.write((char*)&config._adtl_skts_factor, sizeof(config._adtl_skts_factor)); for (node_id_t i = 0; i < num_nodes; ++i) { supernodes[i]->write_binary(binary_out); } diff --git a/src/graph_configuration.cpp b/src/graph_configuration.cpp index 02f81052..c808c046 100644 --- a/src/graph_configuration.cpp +++ b/src/graph_configuration.cpp @@ -20,7 +20,7 @@ GraphConfiguration& GraphConfiguration::backup_in_mem(bool backup_in_mem) { GraphConfiguration& GraphConfiguration::num_groups(size_t num_groups) { _num_groups = num_groups; if (_num_groups < 1) { - std::cout << "num_groups="<< _num_groups << " is out of bounds. " + std::cout << "num_groups="<< _num_groups << " is out of bounds. [1, infty)" << "Defaulting to 1." << std::endl; _num_groups = 1; } @@ -30,13 +30,28 @@ GraphConfiguration& GraphConfiguration::num_groups(size_t num_groups) { GraphConfiguration& GraphConfiguration::group_size(size_t group_size) { _group_size = group_size; if (_group_size < 1) { - std::cout << "group_size="<< _group_size << " is out of bounds. " + std::cout << "group_size="<< _group_size << " is out of bounds. [1, infty)" << "Defaulting to 1." << std::endl; _group_size = 1; } return *this; } +GraphConfiguration& GraphConfiguration::adtl_skts_factor(double factor) { + _adtl_skts_factor = factor; + if (_adtl_skts_factor <= 1) { + std::cout << "adtl_skts_factor=" << _adtl_skts_factor << " is out of bounds. [1, infty)" + << "Defaulting to 1." << std::endl; + _adtl_skts_factor = 1; + } + if (_adtl_skts_factor > 1) { + std::cerr << "WARNING: Your graph configuration specifies using a factor " << _adtl_skts_factor + << " more memory than normal." << std::endl; + std::cerr << " Is this intentional? If not, set adtl_skts_factor to one" << std::endl; + } + return *this; +} + GutteringConfiguration& GraphConfiguration::gutter_conf() { return _gutter_conf; } diff --git a/tools/process_stream.cpp b/tools/process_stream.cpp index 0f252d30..e62b1d43 100644 --- a/tools/process_stream.cpp +++ b/tools/process_stream.cpp @@ -1,9 +1,16 @@ #include #include #include +#include // for rusage static bool shutdown = false; +static double get_max_mem_used() { + struct rusage data; + getrusage(RUSAGE_SELF, &data); + return (double) data.ru_maxrss / 1024.0; +} + /* * Function which is run in a seperate thread and will query * the graph for the number of updates it has processed @@ -12,7 +19,7 @@ static bool shutdown = false; * @param start_time the time that we started stream ingestion */ void track_insertions(uint64_t total, Graph *g, std::chrono::steady_clock::time_point start_time) { - total = total * 2; // we insert 2 edge updates per edge + total = total * 2; // we insert 2 edge updates per edge printf("Insertions\n"); printf("Progress: | 0%%\r"); fflush(stdout); @@ -117,4 +124,5 @@ int main(int argc, char **argv) { std::cout << " Flush Gutters(sec): " << flush_time.count() << std::endl; std::cout << " Boruvka's Algorithm(sec): " << cc_alg_time.count() << std::endl; std::cout << "Connected Components: " << CC_num << std::endl; + std::cout << "Maximum Memory Usage(MiB): " << get_max_mem_used() << std::endl; }