From 36c8ff4debba998c59e894aaf560a751b1302fcf Mon Sep 17 00:00:00 2001 From: Evan West Date: Thu, 8 Feb 2024 16:28:17 -0500 Subject: [PATCH 1/3] adjust to begin incorporation with streaming utilities --- CMakeLists.txt | 40 +++------ include/ascii_file_stream.h | 106 ------------------------ include/binary_file_stream.h | 153 ----------------------------------- include/graph_stream.h | 67 --------------- include/types.h | 34 +------- 5 files changed, 15 insertions(+), 385 deletions(-) delete mode 100644 include/ascii_file_stream.h delete mode 100644 include/binary_file_stream.h delete mode 100644 include/graph_stream.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 72ee8ac9..14d811cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,19 +43,19 @@ else() message (STATUS "GraphZeppelin building executables") endif() -# Get xxHash +# Get GutterTree Project FetchContent_Declare( - xxhash + GutterTree - GIT_REPOSITORY https://github.com/Cyan4973/xxHash.git - GIT_TAG v0.8.0 + GIT_REPOSITORY https://github.com/GraphStreamingProject/GutterTree.git + GIT_TAG main ) -# Get GutterTree Project +# Get StreamingUtilities FetchContent_Declare( - GutterTree + StreamingUtilities - GIT_REPOSITORY https://github.com/GraphStreamingProject/GutterTree.git + GIT_REPOSITORY https://github.com/GraphStreamingProject/StreamingUtilities.git GIT_TAG main ) @@ -72,21 +72,7 @@ if (BUILD_BENCH) FetchContent_MakeAvailable(benchmark) endif() -FetchContent_MakeAvailable(xxHash GutterTree) -##### -# Some additional steps for xxHash as it is unofficial -##### -#xxHash messes with BUILD_SHARED_LIBS if it is empty -set(SAVED_BUILD_SHARED_LIBS "${BUILD_SHARED_LIBS}") - -add_subdirectory( - "${xxhash_SOURCE_DIR}/cmake_unofficial" - "${xxhash_BINARY_DIR}" - EXCLUDE_FROM_ALL -) -#Restore BUILD_SHARED_LIBS -set(BUILD_SHARED_LIBS "${SAVED_BUILD_SHARED_LIBS}" CACHE BOOL "" FORCE) - +FetchContent_MakeAvailable(GutterTree StreamingUtilities) # AVAILABLE COMPILATION DEFINITIONS: # VERIFY_SAMPLES_F Use a deterministic connected-components @@ -107,8 +93,8 @@ add_library(GraphZeppelin src/cc_alg_configuration.cpp src/sketch.cpp src/util.cpp) -add_dependencies(GraphZeppelin GutterTree) -target_link_libraries(GraphZeppelin PUBLIC xxhash GutterTree) +add_dependencies(GraphZeppelin GutterTree StreamingUtilities) +target_link_libraries(GraphZeppelin PUBLIC xxhash GutterTree StreamingUtilities) target_include_directories(GraphZeppelin PUBLIC include/) target_compile_options(GraphZeppelin PUBLIC -fopenmp) target_link_options(GraphZeppelin PUBLIC -fopenmp) @@ -123,8 +109,8 @@ add_library(GraphZeppelinVerifyCC src/util.cpp test/util/file_graph_verifier.cpp test/util/mat_graph_verifier.cpp) -add_dependencies(GraphZeppelinVerifyCC GutterTree) -target_link_libraries(GraphZeppelinVerifyCC PUBLIC xxhash GutterTree) +add_dependencies(GraphZeppelinVerifyCC GutterTree StreamingUtilities) +target_link_libraries(GraphZeppelinVerifyCC PUBLIC xxhash GutterTree StreamingUtilities) target_include_directories(GraphZeppelinVerifyCC PUBLIC include/ include/test/) target_compile_options(GraphZeppelinVerifyCC PUBLIC -fopenmp) target_link_options(GraphZeppelinVerifyCC PUBLIC -fopenmp) @@ -156,7 +142,7 @@ if (BUILD_EXE) src/util.cpp test/util/efficient_gen/edge_gen.cpp test/util/efficient_gen/efficient_gen.cpp) - target_link_libraries(efficient_gen PRIVATE xxhash GraphZeppelinCommon) + target_link_libraries(efficient_gen PRIVATE xxhash GraphZeppelinCommon StreamingUtilities) # executable for converting to stream format add_executable(to_binary_format diff --git a/include/ascii_file_stream.h b/include/ascii_file_stream.h deleted file mode 100644 index 2fb10147..00000000 --- a/include/ascii_file_stream.h +++ /dev/null @@ -1,106 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "graph_stream.h" - -class AsciiFileStream : public GraphStream { - public: - AsciiFileStream(std::string file_name, bool has_type = true) - : file_name(file_name), has_type(has_type) { - - bool stream_exists = false; - { - std::fstream check(file_name, std::fstream::in); - stream_exists = check.is_open(); - } - - if (stream_exists) - stream_file.open(file_name, std::fstream::in | std::fstream::out); - else - stream_file.open(file_name, std::fstream::in | std::fstream::out | std::fstream::trunc); - - if (!stream_file.is_open()) - throw StreamException("AsciiFileStream: could not open " + file_name); - - if (stream_exists) - stream_file >> num_vertices >> num_edges; - } - - inline size_t get_update_buffer(GraphStreamUpdate* upd_buf, size_t num_updates) { - assert(upd_buf != nullptr); - - size_t i = 0; - for (; i < num_updates; i++) { - GraphStreamUpdate& upd = upd_buf[i]; - - if (upd_offset >= num_edges || upd_offset >= break_edge_idx) { - upd.type = BREAKPOINT; - upd.edge = {0, 0}; - return i + 1; - } - int type = INSERT; - if (has_type) - stream_file >> type; - stream_file >> upd.edge.src >> upd.edge.dst; - upd.type = type; - ++upd_offset; - } - return i; - } - - // get_update_buffer() is not thread safe - inline bool get_update_is_thread_safe() { return false; } - - inline void write_header(node_id_t num_verts, edge_id_t num_edg) { - stream_file.seekp(0); // seek to beginning - stream_file << num_verts << " " << num_edg << std::endl; - num_vertices = num_verts; - num_edges = num_edg; - } - - inline void write_updates(GraphStreamUpdate* upd_buf, edge_id_t num_updates) { - for (edge_id_t i = 0; i < num_updates; i++) { - auto upd = upd_buf[i]; - if (has_type) - stream_file << (int) upd.type << " "; - stream_file << upd.edge.src << " " << upd.edge.dst << std::endl; - } - } - - inline void set_num_edges(edge_id_t num_edg) { - num_edges = num_edg; - } - - inline void seek(edge_id_t pos) { - if (pos != 0) - throw StreamException("AsciiFileStream: stream does not support seeking by update index"); - stream_file.seekp(0); stream_file.seekg(0); - upd_offset = 0; - } - - inline bool set_break_point(edge_id_t break_idx) { - if (break_idx < upd_offset) return false; - break_edge_idx = break_idx; - return true; - } - - inline void serialize_metadata(std::ostream& out) { - out << AsciiFile << " " << file_name << std::endl; - } - - static GraphStream* construct_from_metadata(std::istream& in) { - std::string file_name_from_stream; - in >> file_name_from_stream; - return new AsciiFileStream(file_name_from_stream); - } - - private: - const std::string file_name; - const bool has_type; - std::fstream stream_file; - edge_id_t break_edge_idx = -1; - edge_id_t upd_offset = 0; -}; diff --git a/include/binary_file_stream.h b/include/binary_file_stream.h deleted file mode 100644 index b3dd9f61..00000000 --- a/include/binary_file_stream.h +++ /dev/null @@ -1,153 +0,0 @@ -#pragma once -#include -#include //open and close - -#include -#include -#include -#include -#include - -#include "graph_stream.h" - -class BinaryFileStream : public GraphStream { - public: - /** - * Open a BinaryFileStream - * @param file_name Name of the stream file - */ - BinaryFileStream(std::string file_name, bool open_read_only = true) - : read_only(open_read_only), file_name(file_name) { - if (read_only) - stream_fd = open(file_name.c_str(), O_RDONLY, S_IRUSR); - else - stream_fd = open(file_name.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR); - - if (!stream_fd) - throw StreamException("BinaryFileStream: Could not open stream file " + file_name + - ". Does it exist?"); - - // read header from the input file - if (read_only) { - if (read(stream_fd, (char*)&num_vertices, sizeof(num_vertices)) != sizeof(num_vertices)) - throw StreamException("BinaryFileStream: Could not read number of nodes"); - if (read(stream_fd, (char*)&num_edges, sizeof(num_edges)) != sizeof(num_edges)) - throw StreamException("BinaryFileStream: Could not read number of edges"); - - end_of_file = (num_edges * edge_size) + header_size; - stream_off = header_size; - set_break_point(-1); - } - } - - ~BinaryFileStream() { - if (stream_fd) close(stream_fd); - } - - inline size_t get_update_buffer(GraphStreamUpdate* upd_buf, size_t num_updates) { - assert(upd_buf != nullptr); - - // many threads may execute this line simultaneously creating edge cases - size_t bytes_to_read = num_updates * edge_size; - size_t read_off = stream_off.fetch_add(bytes_to_read, std::memory_order_relaxed); - - // catch these edge cases here - if (read_off + bytes_to_read > break_index) { - bytes_to_read = read_off > break_index ? 0 : break_index - read_off; - stream_off = break_index.load(); - upd_buf[bytes_to_read / edge_size] = {BREAKPOINT, {0, 0}}; - } - // read into the buffer - assert(bytes_to_read % edge_size == 0); - size_t bytes_read = 0; - while (bytes_read < bytes_to_read) { - int r = - pread(stream_fd, upd_buf + bytes_read, bytes_to_read - bytes_read, read_off + bytes_read); - if (r == -1) throw StreamException("BinaryFileStream: Could not perform pread"); - if (r == 0) throw StreamException("BinaryFileStream: pread() got no data"); - bytes_read += r; - } - - size_t upds_read = bytes_to_read / edge_size; - if (upds_read < num_updates) { - GraphStreamUpdate& upd = upd_buf[upds_read]; - upd.type = BREAKPOINT; - upd.edge = {0, 0}; - return upds_read + 1; - } - return upds_read; - } - - // get_update_buffer() is thread safe! :) - inline bool get_update_is_thread_safe() { return true; } - - // write the number of nodes and edges to the stream - inline void write_header(node_id_t num_verts, edge_id_t num_edg) { - if (read_only) throw StreamException("BinaryFileStream: stream not open for writing!"); - - lseek(stream_fd, 0, SEEK_SET); - int r1 = write(stream_fd, (char*)&num_verts, sizeof(num_verts)); - int r2 = write(stream_fd, (char*)&num_edg, sizeof(num_edg)); - - if (r1 + r2 != header_size) { - perror("write_header"); - throw StreamException("BinaryFileStream: could not write header to stream file"); - } - - stream_off = header_size; - num_vertices = num_verts; - num_edges = num_edg; - end_of_file = (num_edges * edge_size) + header_size; - } - - // write an edge to the stream - inline void write_updates(GraphStreamUpdate* upd, edge_id_t num_updates) { - if (read_only) throw StreamException("BinaryFileStream: stream not open for writing!"); - - size_t bytes_to_write = num_updates * edge_size; - // size_t write_off = stream_off.fetch_add(bytes_to_write, std::memory_order_relaxed); - - size_t bytes_written = 0; - while (bytes_written < bytes_to_write) { - int r = write(stream_fd, (char*)upd + bytes_written, bytes_to_write - bytes_written); - if (r == -1) throw StreamException("BinaryFileStream: Could not perform write"); - bytes_written += r; - } - } - - // seek to a position in the stream - inline void seek(edge_id_t edge_idx) { stream_off = edge_idx * edge_size + header_size; } - - inline bool set_break_point(edge_id_t break_idx) { - edge_id_t byte_index = END_OF_STREAM; - if (break_idx != END_OF_STREAM) { - byte_index = header_size + break_idx * edge_size; - } - if (byte_index < stream_off) return false; - break_index = byte_index; - if (break_index > end_of_file) break_index = end_of_file; - return true; - } - - inline void serialize_metadata(std::ostream& out) { - out << BinaryFile << " " << file_name << std::endl; - } - - static GraphStream* construct_from_metadata(std::istream& in) { - std::string file_name_from_stream; - in >> file_name_from_stream; - return new BinaryFileStream(file_name_from_stream); - } - - private: - int stream_fd; - edge_id_t end_of_file; - std::atomic stream_off; - std::atomic break_index; - const bool read_only; // is stream read only? - const std::string file_name; - - // size of binary encoded edge and buffer read size - static constexpr size_t edge_size = sizeof(GraphStreamUpdate); - static constexpr size_t header_size = sizeof(node_id_t) + sizeof(edge_id_t); -}; diff --git a/include/graph_stream.h b/include/graph_stream.h deleted file mode 100644 index 2cd4a968..00000000 --- a/include/graph_stream.h +++ /dev/null @@ -1,67 +0,0 @@ -#pragma once -#include -#include -#include - -#include "types.h" - -#pragma pack(push,1) -struct GraphStreamUpdate { - uint8_t type; - Edge edge; -}; -#pragma pack(pop) - -static constexpr edge_id_t END_OF_STREAM = (edge_id_t) -1; - -// Enum that defines the types of streams -enum StreamType { - BinaryFile, - AsciiFile, -}; - -class GraphStream { - public: - virtual ~GraphStream() = default; - inline node_id_t vertices() { return num_vertices; } - inline edge_id_t edges() { return num_edges; } - - // Extract a buffer of many updates from the stream - virtual size_t get_update_buffer(GraphStreamUpdate* upd_buf, edge_id_t num_updates) = 0; - - // Query the GraphStream to see if get_update_buffer is thread-safe - // this is implemenation dependent - virtual bool get_update_is_thread_safe() = 0; - - // Move read pointer to new location in stream - // Child classes may choose to throw an error if seek is called - // For example, a GraphStream recieved over the network would - // likely not support seek - virtual void seek(edge_id_t edge_idx) = 0; - - // Query handling - // Call this function to register a query at a future edge index - // This function returns true if the query is correctly registered - virtual bool set_break_point(edge_id_t query_idx) = 0; - - // Serialize GraphStream metadata for distribution - // So that stream reading can happen simultaneously - virtual void serialize_metadata(std::ostream &out) = 0; - - // construct a stream object from serialized metadata - static GraphStream* construct_stream_from_metadata(std::istream &in); - - protected: - node_id_t num_vertices = 0; - edge_id_t num_edges = 0; - private: - static std::unordered_map constructor_map; -}; - -class StreamException : public std::exception { - private: - std::string err_msg; - public: - StreamException(std::string err) : err_msg(err) {} - virtual const char* what() const throw() { return err_msg.c_str(); } -}; diff --git a/include/types.h b/include/types.h index 76e45164..6fea6b26 100644 --- a/include/types.h +++ b/include/types.h @@ -2,43 +2,13 @@ #include #include #include +#include typedef uint64_t col_hash_t; static const auto& vec_hash = XXH3_64bits_withSeed; static const auto& col_hash = XXH3_64bits_withSeed; -// Is a stream update an insertion or a deletion -// BREAKPOINT: special type that indicates that a break point has been reached -// a break point may be either the end of the stream or the index of a query -enum UpdateType { - INSERT = 0, - DELETE = 1, - BREAKPOINT = 2 -}; - -struct Edge { - node_id_t src = 0; - node_id_t dst = 0; - - bool operator< (const Edge&oth) const { - if (src == oth.src) - return dst < oth.dst; - return src < oth.src; - } - bool operator== (const Edge&oth) const { - return src == oth.src && dst == oth.dst; - } -}; -namespace std { - template <> - struct hash { - auto operator()(const Edge&edge) const -> size_t { - std::hash h; - return h(edge.dst) + (31 * h(edge.src)); - } - }; -} - +// Graph Stream Updates are parsed into the GraphUpdate type for more convinient processing struct GraphUpdate { Edge edge; UpdateType type; From 3d0c64382b8c323a09b50a44235b0c04a045504c Mon Sep 17 00:00:00 2001 From: Evan West Date: Sat, 10 Feb 2024 19:59:38 -0500 Subject: [PATCH 2/3] fix the bug and remove more streaming stuff from this repo --- CMakeLists.txt | 27 ---- include/graph_sketch_driver.h | 4 +- include/test/efficient_gen.h | 9 -- include/test/graph_gen.h | 26 ---- include/worker_thread_group.h | 2 +- src/cc_sketch_alg.cpp | 4 +- test/cc_alg_test.cpp | 40 +++-- test/util/efficient_gen/edge_gen.cpp | 118 -------------- test/util/efficient_gen/efficient_gen.cpp | 19 --- test/util/graph_gen.cpp | 145 ------------------ test/util/graph_gen_test.cpp | 12 -- tools/statistical_testing/analyze_results.py | 73 --------- tools/statistical_testing/graph_testing.cpp | 96 ------------ .../medium_test_expected.txt | 2 - tools/statistical_testing/requirements.txt | 3 - .../small_test_expected.txt | 1 - tools/statistical_testing/stat_config.txt | 5 - tools/statistical_testing/test_runner.py | 130 ---------------- tools/to_binary_format.cpp | 98 ------------ tools/validate_binary_stream.cpp | 45 ------ 20 files changed, 33 insertions(+), 826 deletions(-) delete mode 100644 include/test/efficient_gen.h delete mode 100644 include/test/graph_gen.h delete mode 100644 test/util/efficient_gen/edge_gen.cpp delete mode 100644 test/util/efficient_gen/efficient_gen.cpp delete mode 100644 test/util/graph_gen.cpp delete mode 100644 test/util/graph_gen_test.cpp delete mode 100644 tools/statistical_testing/analyze_results.py delete mode 100644 tools/statistical_testing/graph_testing.cpp delete mode 100644 tools/statistical_testing/medium_test_expected.txt delete mode 100644 tools/statistical_testing/requirements.txt delete mode 100644 tools/statistical_testing/small_test_expected.txt delete mode 100644 tools/statistical_testing/stat_config.txt delete mode 100644 tools/statistical_testing/test_runner.py delete mode 100644 tools/to_binary_format.cpp delete mode 100644 tools/validate_binary_stream.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 14d811cf..9384c358 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -124,31 +124,10 @@ if (BUILD_EXE) test/dsu_test.cpp test/util_test.cpp test/util/file_graph_verifier.cpp - test/util/graph_gen.cpp - test/util/graph_gen_test.cpp test/util/graph_verifier_test.cpp) add_dependencies(tests GraphZeppelinVerifyCC) target_link_libraries(tests PRIVATE GraphZeppelinVerifyCC) - add_executable(statistical_test - tools/statistical_testing/graph_testing.cpp - test/util/file_graph_verifier.cpp - test/util/graph_gen.cpp) - add_dependencies(statistical_test GraphZeppelinVerifyCC) - target_link_libraries(statistical_test PRIVATE GraphZeppelinVerifyCC) - - # executables for experiment/benchmarking - add_executable(efficient_gen - src/util.cpp - test/util/efficient_gen/edge_gen.cpp - test/util/efficient_gen/efficient_gen.cpp) - target_link_libraries(efficient_gen PRIVATE xxhash GraphZeppelinCommon StreamingUtilities) - - # executable for converting to stream format - add_executable(to_binary_format - tools/to_binary_format.cpp) - target_link_libraries(to_binary_format PRIVATE GraphZeppelinCommon) - # executable for processing a binary graph stream add_executable(process_stream tools/process_stream.cpp) @@ -158,12 +137,6 @@ if (BUILD_EXE) add_executable(test_correctness tools/test_correctness.cpp) target_link_libraries(test_correctness PRIVATE GraphZeppelinVerifyCC) - - # tool for validating that a binary stream appears correct - add_executable(validate_binary_stream - tools/validate_binary_stream.cpp - ) - target_link_libraries(validate_binary_stream PRIVATE GraphZeppelin) endif() if (BUILD_BENCH) diff --git a/include/graph_sketch_driver.h b/include/graph_sketch_driver.h index 1ad6f0ef..38f77eba 100644 --- a/include/graph_sketch_driver.h +++ b/include/graph_sketch_driver.h @@ -58,8 +58,8 @@ class GraphSketchDriver { FRIEND_TEST(GraphTest, TestSupernodeRestoreAfterCCFailure); public: GraphSketchDriver(Alg *sketching_alg, GraphStream *stream, DriverConfiguration config, - size_t num_inserters = 1) - : sketching_alg(sketching_alg), stream(stream), num_stream_threads(num_inserters) { + size_t num_stream_threads = 1) + : sketching_alg(sketching_alg), stream(stream), num_stream_threads(num_stream_threads) { sketching_alg->allocate_worker_memory(config.get_worker_threads()); // set the leaf size of the guttering system appropriately if (config.gutter_conf().get_gutter_bytes() == GutteringConfiguration::uninit_param) { diff --git a/include/test/efficient_gen.h b/include/test/efficient_gen.h deleted file mode 100644 index 2a578c00..00000000 --- a/include/test/efficient_gen.h +++ /dev/null @@ -1,9 +0,0 @@ -#pragma once - -void write_edges(uint32_t n, double p, const std::string& out_f); -// insert, delete based on a geometric distribution with ratio p -// i.e. p% of edges will be deleted, p^2% will be re-inserted, p^3 will be re-deleted -// until 1 element is left -void insert_delete(double p, const std::string& in_file, const std::string& out_file); - -void write_cumul(const std::string& stream_f, const std::string& cumul_f); diff --git a/include/test/graph_gen.h b/include/test/graph_gen.h deleted file mode 100644 index 24f03359..00000000 --- a/include/test/graph_gen.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once -#include -#include - -typedef struct genSet { - long n; // number of nodes - double p; // prob of edge between nodes - double r; // geometric insertion/removal - int max_appearances; // the maximum number of times an edge can show up - // in the stream. 0 for no limit. - std::string out_file; // file to write stream - std::string cumul_out_file; // file to write cumul graph - genSet(long n, double p, double r, int max_appearances, - std::string out_file, std::string cumul_out_file) - : n(n), p(p), r(r), max_appearances - (max_appearances), out_file(std::move(out_file)), cumul_out_file - (std::move(cumul_out_file)) {} -} GraphGenSettings; - -/** - * Generates a 1024-node graph with approximately 60,000 edge insert/deletes. - * Writes stream output to sample.txt - * Writes cumulative output to cumul_sample.txt - */ -void generate_stream(const GraphGenSettings& settings = - {1024,0.03,0.5,0,"./sample.txt", "./cumul_sample.txt"}); diff --git a/include/worker_thread_group.h b/include/worker_thread_group.h index 6575afda..a7ee26a6 100644 --- a/include/worker_thread_group.h +++ b/include/worker_thread_group.h @@ -88,7 +88,7 @@ class WorkerThread { } } } - int id; + const int id; GraphSketchDriver *driver; GutteringSystem *gts; std::condition_variable &flush_condition; diff --git a/src/cc_sketch_alg.cpp b/src/cc_sketch_alg.cpp index 78fc54a4..27855b31 100644 --- a/src/cc_sketch_alg.cpp +++ b/src/cc_sketch_alg.cpp @@ -109,12 +109,12 @@ void CCSketchAlg::apply_update_batch(int thr_id, node_id_t src_vertex, delta_sketch.update(static_cast(concat_pairing_fn(src_vertex, dst))); } - std::unique_lock(sketches[src_vertex]->mutex); + std::unique_lock lk(sketches[src_vertex]->mutex); sketches[src_vertex]->merge(delta_sketch); } void CCSketchAlg::apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets) { - std::unique_lock(sketches[src_vertex]->mutex); + std::unique_lock lk(sketches[src_vertex]->mutex); sketches[src_vertex]->merge_raw_bucket_buffer(raw_buckets); } diff --git a/test/cc_alg_test.cpp b/test/cc_alg_test.cpp index 457534fa..a92cd406 100644 --- a/test/cc_alg_test.cpp +++ b/test/cc_alg_test.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -7,7 +8,6 @@ #include "cc_sketch_alg.h" #include "file_graph_verifier.h" -#include "graph_gen.h" #include "graph_sketch_driver.h" #include "mat_graph_verifier.h" @@ -16,6 +16,21 @@ static size_t get_seed() { return std::chrono::duration_cast(now.time_since_epoch()).count(); } +// helper function to generate a dynamic binary stream and its cumulative insert only stream +void generate_stream(size_t seed, node_id_t num_vertices, double density, double delete_portion, + double adtl_portion, size_t rounds, std::string stream_name, + std::string cumul_name) { + // remove old versions of the stream files + std::remove(stream_name.c_str()); + std::remove(cumul_name.c_str()); + + // generate new stream files + DynamicErdosGenerator dy_stream(seed, num_vertices, density, delete_portion, adtl_portion, + rounds); + dy_stream.to_ascii_file(stream_name); + dy_stream.write_cumulative_file(cumul_name); +} + /** * For many of these tests (especially for those upon very sparse and small graphs) * we allow for a certain number of failures per test. @@ -54,7 +69,7 @@ TEST_P(CCAlgTest, TestCorrectnessOnSmallRandomGraphs) { auto driver_config = DriverConfiguration().gutter_sys(GetParam()); int num_trials = 5; while (num_trials--) { - generate_stream(); + generate_stream(get_seed(), 1024, 0.03, 0.5, 0.005, 3, "sample.txt", "cumul_sample.txt"); AsciiFileStream stream{"./sample.txt"}; node_id_t num_nodes = stream.vertices(); @@ -73,7 +88,7 @@ TEST_P(CCAlgTest, TestCorrectnessOnSmallSparseGraphs) { auto driver_config = DriverConfiguration().gutter_sys(GetParam()); int num_trials = 5; while (num_trials--) { - generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); + generate_stream(get_seed(), 1024, 0.002, 0.5, 0.005, 3, "sample.txt", "cumul_sample.txt"); AsciiFileStream stream{"./sample.txt"}; node_id_t num_nodes = stream.vertices(); @@ -92,7 +107,7 @@ TEST_P(CCAlgTest, TestCorrectnessOfReheating) { auto driver_config = DriverConfiguration().gutter_sys(GetParam()); int num_trials = 5; while (num_trials--) { - generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); + generate_stream(get_seed(), 1024, 0.002, 0.5, 0.005, 3, "sample.txt", "cumul_sample.txt"); AsciiFileStream stream{"./sample.txt"}; node_id_t num_nodes = stream.vertices(); @@ -123,11 +138,13 @@ TEST_P(CCAlgTest, MultipleWorkers) { auto driver_config = DriverConfiguration().gutter_sys(GetParam()).worker_threads(8); int num_trials = 5; while (num_trials--) { - generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); + size_t seed = get_seed(); + generate_stream(seed, 1024, 0.002, 0.5, 0.5, 3, "sample.txt", "cumul_sample.txt"); AsciiFileStream stream{"./sample.txt"}; node_id_t num_nodes = stream.vertices(); - CCSketchAlg cc_alg{num_nodes, get_seed()}; + seed = get_seed(); + CCSketchAlg cc_alg{num_nodes, seed}; cc_alg.set_verifier(std::make_unique(1024, "./cumul_sample.txt")); GraphSketchDriver driver(&cc_alg, &stream, driver_config); @@ -172,18 +189,17 @@ TEST_P(CCAlgTest, TestPointQuery) { TEST(CCAlgTest, TestQueryDuringStream) { auto driver_config = DriverConfiguration().gutter_sys(STANDALONE); auto cc_config = CCAlgConfiguration(); - generate_stream({1024, 0.002, 0.5, 0, "./sample.txt", "./cumul_sample.txt"}); + generate_stream(get_seed(), 1024, 0.03, 0.5, 0.05, 3, "sample.txt", "cumul_sample.txt"); std::ifstream in{"./sample.txt"}; AsciiFileStream stream{"./sample.txt"}; node_id_t num_nodes = stream.vertices(); edge_id_t num_edges = stream.edges(); - edge_id_t tenth = num_edges / 10; + edge_id_t tenth = num_edges / 10; CCSketchAlg cc_alg{num_nodes, get_seed(), cc_config}; GraphSketchDriver driver(&cc_alg, &stream, driver_config); MatGraphVerifier verify(num_nodes); - int type; node_id_t a, b; @@ -197,7 +213,7 @@ TEST(CCAlgTest, TestQueryDuringStream) { } verify.reset_cc_state(); - driver.process_stream_until(tenth * (j+1)); + driver.process_stream_until(tenth * (j + 1)); driver.prep_query(); cc_alg.set_verifier(std::make_unique(verify)); cc_alg.connected_components(); @@ -284,7 +300,7 @@ TEST(CCAlgTest, MTStreamWithMultipleQueries) { size_t num_queries = 10; size_t upd_per_query = num_edges / num_queries; - for (size_t i = 0; i < num_queries-1; i++) { + for (size_t i = 0; i < num_queries - 1; i++) { for (size_t j = 0; j < upd_per_query; j++) { GraphStreamUpdate upd; verify_stream.get_update_buffer(&upd, 1); @@ -294,7 +310,7 @@ TEST(CCAlgTest, MTStreamWithMultipleQueries) { verify.reset_cc_state(); cc_alg.set_verifier(std::make_unique(verify)); - driver.process_stream_until(upd_per_query * (i+1)); + driver.process_stream_until(upd_per_query * (i + 1)); driver.prep_query(); cc_alg.connected_components(); } diff --git a/test/util/efficient_gen/edge_gen.cpp b/test/util/efficient_gen/edge_gen.cpp deleted file mode 100644 index 5187a756..00000000 --- a/test/util/efficient_gen/edge_gen.cpp +++ /dev/null @@ -1,118 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include "../../../include/test/efficient_gen.h" -#include "../../../include/types.h" -#include "../../../include/util.h" - -typedef uint32_t ul; -typedef uint64_t ull; - -std::ofstream& operator<< (std::ofstream &os, const std::pair p) { - os << p.first << " " << p.second; - return os; -} - -void write_edges(ul n, double p, const std::string& out_f) { - ull num_edges = ((ull)n*(n-1))/2; - ull* arr = (ull*) malloc(num_edges*sizeof(ull)); - ul idx = 0; - - std::cout << "Generating possible edges" << std::endl; - for (unsigned i=0; i < n; ++i) { - for (unsigned j=i+1;j < n; ++j) { - arr[idx++] = concat_pairing_fn(i, j); - } - } - - std::cout << "Permuting edges" << std::endl; - std::shuffle(arr,arr+num_edges, std::mt19937(std::random_device()())); - std::ofstream out(out_f); - ull m = (ull) (num_edges*p); - out << n << " " << m << std::endl; - - std::cout << "Writing edges to file" << std::endl; - while (m--) { - Edge e = inv_concat_pairing_fn(arr[m]); - out << e.src << " " << e.dst << std::endl; - } - - out.close(); - free(arr); -} - -void insert_delete(double p, const std::string& in_file, const std::string& out_file) { - std::cout << "Deleting and reinserting some edges" << std::endl; - std::ifstream in(in_file); - std::ofstream out(out_file); - int n; ull m; in >> n >> m; - - ull full_m = m; - ull ins_del_arr[(ull)log2(m)+2]; - std::fill(ins_del_arr,ins_del_arr + (ull)log2(m)+2,0); - ins_del_arr[0] = m; - for (unsigned i = 0; ins_del_arr[i] > 1; ++i) { - ins_del_arr[i+1] = (ul)(ins_del_arr[i]*p); - full_m += ins_del_arr[i+1]; - } - - out << n << " " << full_m << std::endl; - - ull* memoized = (ull*) malloc(ins_del_arr[1]*sizeof(ull)); - ul a,b; - - for (unsigned i=0;i> a >> b; - out << "0 " << a << " " << b << std::endl; - memoized[i] = concat_pairing_fn(a, b); - } - - for (unsigned i=ins_del_arr[1];i> a >> b; - out << "0 " << a << " " << b << std::endl; - } - - for (unsigned i = 1; ins_del_arr[i] >= 1; ++i) { - int temp = i%2; - for (unsigned j=0;j> n >> m; - std::vector> adj(n,std::vector(n,false)); - bool type; - int a,b; - for (ull i=1;i<=m;++i) { - in >> type >> a >> b; - if ((type == INSERT && adj[a][b] == 1) || (type == DELETE && adj[a][b] == 0)) { - std::cerr << "Insertion/deletion error at line " << i - << " in " << stream_f; - return; - } - adj[a][b] = !adj[a][b]; - } - // write cumul output - ull m_cumul = 0; - for (int i = 0; i < n; ++i) { - for (int j = 0; j < n; ++j) { - if (adj[i][j]) ++m_cumul; - } - } - out << n << " " << m_cumul << std::endl; - for (int i = 0; i < n; ++i) { - for (int j = 0; j < n; ++j) { - if (adj[i][j]) out << i << " " << j << std::endl; - } - } -} diff --git a/test/util/efficient_gen/efficient_gen.cpp b/test/util/efficient_gen/efficient_gen.cpp deleted file mode 100644 index 93aa5b30..00000000 --- a/test/util/efficient_gen/efficient_gen.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include -#include "../../../include/test/efficient_gen.h" - -int main() { - int n; double p, r = 0.1; std::string s,t; char c = 0; bool cumul = false; - std::cout << "n: "; std::cin >> n; - std::cout << "p: "; std::cin >> p; - std::cout << "r: "; std::cin >> r; - std::cout << "cumul (y/n): "; std::cin >> c; - if (c == 'y' || c == 'Y') cumul = true; - std::cout << "Out file: "; std::cin >> s; - if (cumul) { std::cout << "Cumul out: "; std::cin >> t; } - - auto start = time(nullptr); - write_edges(n, p, "./TEMP_F"); - insert_delete(r,"./TEMP_F", s); - if (cumul) write_cumul(s,t); - std::cout << "Completed in " << time(nullptr)-start << " seconds" << std::endl; -} diff --git a/test/util/graph_gen.cpp b/test/util/graph_gen.cpp deleted file mode 100644 index 8114abc0..00000000 --- a/test/util/graph_gen.cpp +++ /dev/null @@ -1,145 +0,0 @@ -#include "graph_gen.h" -#include "types.h" -#include "util.h" - -#include -#include -#include -#include - -#define endl '\n' - -typedef uint32_t ul; -typedef uint64_t ull; - -const ull ULLMAX = std::numeric_limits
    ::max(); - - -std::ofstream& operator<< (std::ofstream &os, const std::pair p) { - os << p.first << " " << p.second; - return os; -} - -void write_edges(long n, double p, const std::string& out_f) { - ul num_edges = (n*(n-1))/2; - ull* arr = (ull*) malloc(num_edges*sizeof(ull)); - ul e = 0; - for (unsigned i = 0; i < n; ++i) { - for (unsigned j = i+1; j < n; ++j) { - arr[e++] = concat_pairing_fn(i, j); - } - } - std::shuffle(arr,arr+num_edges, std::mt19937(std::random_device()())); - std::ofstream out(out_f); - ul m = (ul) (num_edges*p); - out << n << " " << m << endl; - - while (m--) { - Edge e = inv_concat_pairing_fn(arr[m]); - out << e.src << " " << e.dst << endl; - } - out.flush(); - out.close(); - free(arr); -} - -void insert_delete(double p, int max_appearances, const std::string& in_file, - const std::string& out_file) { - std::ifstream in(in_file); - std::ofstream out(out_file); - int n; ul m; in >> n >> m; - long long full_m = m; - ull ins_del_arr[(ul)log2(m)+2]; - std::fill(ins_del_arr,ins_del_arr + (ul)log2(m)+2,0); - ins_del_arr[0] = m; - if (max_appearances == 0) { - for (unsigned i = 0; ins_del_arr[i] > 1; ++i) { - ins_del_arr[i + 1] = (ull) (ins_del_arr[i] * p); - full_m += ins_del_arr[i + 1]; - } - } else { - for (int i = 0; i < max_appearances - 1; ++i) { - ins_del_arr[i + 1] = (ull) (ins_del_arr[i] * p); - full_m += ins_del_arr[i + 1]; - } - } - - out << n << " " << full_m << endl; - - ull* memoized = (ull*) malloc(ins_del_arr[1]*sizeof(ull)); - ul a,b; - - for (unsigned i=0;i> a >> b; - out << "0 " << a << " " << b << endl; - memoized[i] = concat_pairing_fn(a, b); - } - - for (unsigned i=ins_del_arr[1];i> a >> b; - out << "0 " << a << " " << b << endl; - } - - in.close(); - - unsigned stopping = 1; - if (max_appearances == 0) { - for (; ins_del_arr[stopping] >= 1; ++stopping); - } else { - stopping = max_appearances; - } - for (unsigned i = 1; i < stopping; ++i) { - int temp = i % 2; - for (unsigned j = 0; j < ins_del_arr[i]; ++j) { - out << temp << " "; - Edge e = inv_concat_pairing_fn(memoized[j]); - out << e.src << " " << e.dst << endl; - } - } - out.flush(); - out.close(); - free(memoized); -} - -void write_cumul(const std::string& stream_f, const std::string& cumul_f) { - std::ifstream in(stream_f); - std::ofstream out(cumul_f); - int n; ull m; in >> n >> m; - std::vector> adj(n,std::vector(n,false)); - bool type; - int a,b; - for (ull i=1;i<=m;++i) { - in >> type >> a >> b; - if ((type == INSERT && adj[a][b] == 1) || (type == DELETE && adj[a][b] == 0)) { - std::cerr << "Insertion/deletion error at line " << i - << " in " << stream_f; - return; - } - adj[a][b] = !adj[a][b]; - } - - in.close(); - - // write cumul output - ull m_cumul = 0; - for (int i = 0; i < n; ++i) { - for (int j = 0; j < n; ++j) { - if (adj[i][j]) ++m_cumul; - } - } - out << n << " " << m_cumul << endl; - for (int i = 0; i < n; ++i) { - for (int j = 0; j < n; ++j) { - if (adj[i][j]) out << i << " " << j << endl; - } - } - out.flush(); - out.close(); -} - -void generate_stream(const GraphGenSettings& settings) { - write_edges(settings.n, settings.p, "./TEMP_F"); - insert_delete(settings.r, settings.max_appearances, "./TEMP_F", settings - .out_file); - write_cumul(settings.out_file,settings.cumul_out_file); -} diff --git a/test/util/graph_gen_test.cpp b/test/util/graph_gen_test.cpp deleted file mode 100644 index edd4d9f8..00000000 --- a/test/util/graph_gen_test.cpp +++ /dev/null @@ -1,12 +0,0 @@ -#include -#include "../../include/test/graph_gen.h" - -TEST(GraphGenTestSuite, TestGeneration) { - std::string fname = __FILE__; - size_t pos = fname.find_last_of("\\/"); - std::string curr_dir = (std::string::npos == pos) ? "" : fname.substr(0, pos); - generate_stream(); - struct stat buffer; - ASSERT_FALSE(stat("./sample.txt", &buffer)); - ASSERT_FALSE(stat("./cumul_sample.txt", &buffer)); -} diff --git a/tools/statistical_testing/analyze_results.py b/tools/statistical_testing/analyze_results.py deleted file mode 100644 index 2c284fba..00000000 --- a/tools/statistical_testing/analyze_results.py +++ /dev/null @@ -1,73 +0,0 @@ - -import numpy as np -import argparse -from scipy.stats import ttest_ind, norm - -def check_error(test_name, test_result_file, expected_result_file, confidence=0.95): - print('::::: ', test_name, ' :::::', sep='') - test_file = open(test_result_file) - test_result = np.loadtxt(test_file) - - test_file = open(expected_result_file) - test_expect = np.loadtxt(test_file) - - result_t = test_result.transpose() - test_failures = result_t[0,:] - test_runs = result_t[1,:] - - total_expect_failures = test_expect[0] - total_expect_runs = test_expect[1] - - assert (test_runs == 100).all(), "Each bin must be of size 100" - - # First step: Verify that there is not a dependency between tests and upon the graph - if (test_failures >= 6).any(): - return True, "Dependency between tests or upon input graph found" - - # Second step: Verify that the number of test failures does not deviate from the expectation - total_test_failures = np.sum(test_failures) - total_test_runs = np.sum(test_runs) - - assert total_test_runs == total_expect_runs, "The number of runs must be the same" - pr = total_expect_failures / total_expect_runs - critical_z_val = norm.ppf(1 - (1 - confidence) / 2) - z_test_deviation = np.ceil(critical_z_val * np.sqrt(pr * (1-pr) / total_expect_runs) * total_expect_runs) - print("Number of test failures:", total_test_failures, "{0}%".format(total_test_failures/total_test_runs)) - print("Total number of failures is allowed to deviate by at most", z_test_deviation) - print("Deviation is", total_test_failures - total_expect_failures) - if total_test_failures - z_test_deviation > total_expect_failures: - return True, "Test error is statistically greater than expectation {0}/{1}".format(int(total_test_failures), int(total_test_runs)) - - if total_test_failures + z_test_deviation < total_expect_failures: - return True, "Test error is statistically less than expectation {0}/{1}".format(int(total_test_failures), int(total_test_runs)) - - return False, "No statistical deviation detected {0}/{1}".format(int(total_test_failures), int(total_test_runs)) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Statistical testing on graph tests.') - parser.add_argument('small', metavar="small output", type=str, - help='the file which contains the results from the small graph test') - parser.add_argument('medium', metavar="medium output", type=str, - help='the file which contains the results from the medium graph test') - parser.add_argument('iso', metavar="medium iso output", type=str, - help='the file which contains the results from the medium+iso graph test') - - parser.add_argument('small_exp', metavar="small expect", type=str, - help="the file which contains the results from a correct branch for small graph") - parser.add_argument('medium_exp', metavar="medium expect", type=str, - help="the file which contains the results from a correct branch for medium graph") - parser.add_argument('iso_exp', metavar="medium iso expect", type=str, - help="the file which contains the results from a correct branch for medium+iso graph") - args = parser.parse_args() - - stat_result = check_error("small_test", args.small, args.small_exp, 0.1) - print(stat_result[0]) - print(stat_result[1]) - - stat_result = check_error("medium_test", args.medium, args.medium_exp, 0.1) - print(stat_result[0]) - print(stat_result[1]) - - stat_result = check_error("medium_iso_test", args.iso, args.iso_exp, 0.1) - print(stat_result[0]) - print(stat_result[1]) diff --git a/tools/statistical_testing/graph_testing.cpp b/tools/statistical_testing/graph_testing.cpp deleted file mode 100644 index dee89912..00000000 --- a/tools/statistical_testing/graph_testing.cpp +++ /dev/null @@ -1,96 +0,0 @@ -#include -#include "graph_sketch_driver.h" -#include "cc_sketch_alg.h" -#include "ascii_file_stream.h" -#include "graph_gen.h" -#include "file_graph_verifier.h" - -static DriverConfiguration driver_config; -static size_t get_seed() { - auto now = std::chrono::high_resolution_clock::now(); - return std::chrono::duration_cast(now.time_since_epoch()).count(); -} - -static inline int do_run() { - AsciiFileStream stream{"./sample.txt"}; - node_id_t n = stream.vertices(); - CCSketchAlg cc_alg{n, get_seed()}; - cc_alg.set_verifier(std::make_unique(n, "./cumul_sample.txt")); - GraphSketchDriver driver(&cc_alg, &stream, driver_config); - driver.process_stream_until(END_OF_STREAM); - driver.prep_query(); - try { - cc_alg.connected_components(); - } catch (std::exception const &err) { - return 1; - } - return 0; -} - -int small_graph_test(int runs) { - int failures = 0; - for (int i = 0; i < runs; i++) { - generate_stream({1024,0.002,0.5,0,"./sample.txt","./cumul_sample.txt"}); - failures += do_run(); - } - return failures; -} - -int medium_graph_test(int runs) { - int failures = 0; - for (int i = 0; i < runs; i++) { - generate_stream({2048,0.002,0.5,0,"./sample.txt","./cumul_sample.txt"}); - failures += do_run(); - } - return failures; -} - -int main() { - int runs = 100; - int num_trails = 500; - std::vector trial_list; - std::ofstream out; - - // run both with GutterTree and StandAloneGutters - for(int i = 0; i < 2; i++) { - bool use_tree = (bool) i; - - // setup configuration file per buffering - driver_config.gutter_sys(use_tree ? GUTTERTREE : STANDALONE); - driver_config.worker_threads(4); - std::string prefix = use_tree? "tree" : "gutters"; - std::string test_name; - - /************* small graph test *************/ - test_name = prefix + "_" + "small_graph_test"; - fprintf(stderr, "%s\n", test_name.c_str()); - out.open("./" + test_name); - for(int i = 0; i < num_trails; i++) { - if (i % 50 == 0) fprintf(stderr, "trial %i\n", i); - int trial_result = small_graph_test(runs); - trial_list.push_back(trial_result); - } - // output the results of these trials - for (unsigned i = 0; i < trial_list.size(); i++) { - out << trial_list[i] << " " << runs << "\n"; - } - trial_list.clear(); - out.close(); - - /************* medium graph test ************/ - test_name = prefix + "_" + "medium_graph_test"; - fprintf(stderr, "%s\n", test_name.c_str()); - out.open("./" + test_name); - for(int i = 0; i < num_trails; i++) { - if (i % 50 == 0) fprintf(stderr, "trial %i\n", i); - int trial_result = medium_graph_test(runs); - trial_list.push_back(trial_result); - } - // output the results of these trials - for (unsigned i = 0; i < trial_list.size(); i++) { - out << trial_list[i] << " " << runs << "\n"; - } - trial_list.clear(); - out.close(); - } -} diff --git a/tools/statistical_testing/medium_test_expected.txt b/tools/statistical_testing/medium_test_expected.txt deleted file mode 100644 index 03e815e8..00000000 --- a/tools/statistical_testing/medium_test_expected.txt +++ /dev/null @@ -1,2 +0,0 @@ -180 50000 - diff --git a/tools/statistical_testing/requirements.txt b/tools/statistical_testing/requirements.txt deleted file mode 100644 index db9b7bba..00000000 --- a/tools/statistical_testing/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -numpy>=1.21.2 -scipy>=1.7.1 -GitPython>=3.1.24 diff --git a/tools/statistical_testing/small_test_expected.txt b/tools/statistical_testing/small_test_expected.txt deleted file mode 100644 index 022b6d48..00000000 --- a/tools/statistical_testing/small_test_expected.txt +++ /dev/null @@ -1 +0,0 @@ -228 50000 diff --git a/tools/statistical_testing/stat_config.txt b/tools/statistical_testing/stat_config.txt deleted file mode 100644 index f4a33f1c..00000000 --- a/tools/statistical_testing/stat_config.txt +++ /dev/null @@ -1,5 +0,0 @@ -build_path=./build -stat_path=./test/statistical_testing -confidence=0.95 -usr= -pwd= diff --git a/tools/statistical_testing/test_runner.py b/tools/statistical_testing/test_runner.py deleted file mode 100644 index 1b4a39d3..00000000 --- a/tools/statistical_testing/test_runner.py +++ /dev/null @@ -1,130 +0,0 @@ -import subprocess -import importlib -import datetime -import smtplib -import git -SMTP_PORT = 465 - -importlib.import_module('analyze_results') -from analyze_results import check_error - -''' -Configure the system by reading from the configuration file -''' -def configure(): - build_path = "./" - stat_path = "./" - confidence = 0.95 - usr = "" - pwd = "" - with open('test/statistical_testing/stat_config.txt') as config: - lines = config.readlines() - for line in lines: - line_pair = line.split('=') - if line_pair[0].rstrip() == 'build_path': - build_path = line_pair[1].rstrip() - elif line_pair[0].rstrip() == 'stat_path': - stat_path = line_pair[1].rstrip() - elif line_pair[0].rstrip() == 'confidence': - confidence = float(line_pair[1].rstrip()) - elif line_pair[0].rstrip() == 'usr': - usr = line_pair[1].rstrip() - elif line_pair[0].rstrip() == 'pwd': - pwd = line_pair[1].rstrip() - else: - print("Error: unknown configuration parameter", line_pair[0]) - exit(1) - - return build_path, stat_path, confidence, usr, pwd - -''' -Run the statistical_testing executables -''' -def run_test(build_path): - subprocess.run(build_path + '/statistical_test', stdout=subprocess.DEVNULL, check=True) - -''' -Format the results of the test and raise an error if necessary -''' -def log_result(test_name, err, err_dsc): - if err: - return 'ERROR Test: ' + test_name + ' = ' + err_dsc - else: - return 'PASSED Test: ' + test_name + ' = ' + err_dsc -''' -Send an email containing the log -''' -def send_email(err_found, log, usr, pwd): - server_ssl = smtplib.SMTP_SSL('smtp.gmail.com', SMTP_PORT) - server_ssl.ehlo() - - today = datetime.datetime.today() - - server_ssl.login(usr, pwd) - subject = '' - if err_found: - subject = 'ERROR: ' - subject += 'Statistical Testing Log {0}/{1}/{2}'.format(str(today.month), str(today.day), str(today.year)) - - msg = "\r\n".join([ - "From: "+usr, - "To: graph.stat.testing@gmail.com", - "Subject:"+subject, - "", - log - ]) - server_ssl.sendmail(usr, "graph.stat.testing@gmail.com", msg) - server_ssl.quit() - -if __name__ == "__main__": - # Setup - build_path, stat_path, confidence, usr, pwd = configure() - assert usr != '' and pwd != '', "must specifiy user and password in configuration file" - - try: - repo = git.Repo("./") - buf_repo = git.Repo(build_path + "/GutterTree/src/GutterTree") - except: - print("Must run code at root directory of StreamingRepo and must have GutterTree code present in build dir") - exit(1) - head = repo.heads[0] - stream_commit_hash = head.commit.hexsha - stream_commit_msg = head.commit.message - - head = buf_repo.heads[0] - buffer_commit_hash = head.commit.hexsha - buffer_commit_msg = head.commit.message - - log = "StreamRepo Commit: " + stream_commit_hash + "\n" + stream_commit_msg + "\n" - log += "GutterTree Commit: " + buffer_commit_hash + "\n" + buffer_commit_msg + "\n" - - # Run the tests - run_test(build_path) - - for pre in ["tree", "gutters"]: - if pre == "tree": - log += "GutterTree\n" - else: - log += "StandAloneGutters\n" - - # Collect statistical results - # test_name, test_result_file, expected_result_file - try: - print("small test") - small_err, small_dsc = check_error('small test', pre + 'small_graph_test', stat_path + '/small_test_expected.txt') - except Exception as err: - small_err = True - small_dsc = "test threw expection: {0}".format(err) - try: - print("medium test") - medium_err, medium_dsc = check_error('medium test', pre + 'medium_graph_test', stat_path + '/medium_test_expected.txt') - except Exception as err: - medium_err = True - medium_dsc = "test threw expection: {0}".format(err) - - # Create a log, and send email - log += log_result('small test', small_err, small_dsc) + "\n" - log += log_result('medium test', medium_err, medium_dsc) + "\n" - - print("Sending email!") - send_email(small_err or medium_err, log, usr, pwd) diff --git a/tools/to_binary_format.cpp b/tools/to_binary_format.cpp deleted file mode 100644 index 290fde6b..00000000 --- a/tools/to_binary_format.cpp +++ /dev/null @@ -1,98 +0,0 @@ -#include -#include -#include -#include -#include -#include - -int main(int argc, char **argv) { - if (argc < 3 || argc > 5) { - std::cout << "Incorrect number of arguments. " - "Expected [2-4] but got " << argc-1 << std::endl; - std::cout << "Arguments are: ascii_stream out_file_name [--update_type] [--verbose]" << std::endl; - std::cout << "ascii_stream: The file to parse into binary format" << std::endl; - std::cout << "out_file_name: Where the binary stream will be written" << std::endl; - std::cout << "--update_type: If present then ascii stream indicates insertions vs deletions" << std::endl; - std::cout << "--silent: If present then no warnings are printed when stream corrections are made" << std::endl; - exit(EXIT_FAILURE); - } - - std::ifstream txt_file(argv[1]); - if (!txt_file) { - std::cerr << "ERROR: could not open input file!" << std::endl; - exit(EXIT_FAILURE); - } - std::ofstream out_file(argv[2], std::ios_base::binary | std::ios_base::out); - if (!out_file) { - std::cerr << "ERROR: could not open output file! " << argv[2] << ": " << strerror(errno) << std::endl; - exit(EXIT_FAILURE); - } - - bool update_type = false; - bool silent = false; - for (int i = 3; i < argc; i++) { - if (std::string(argv[i]) == "--update_type") - update_type = true; - else if (std::string(argv[i]) == "--silent") { - silent = true; - } - else { - std::cerr << "Did not recognize argument: " << argv[i] << " Expected '--update_type' or '--silent'"; - return EXIT_FAILURE; - } - } - - node_id_t num_nodes; - edge_id_t num_edges; - - txt_file >> num_nodes >> num_edges; - - std::cout << "Parsed ascii stream header. . ." << std::endl; - std::cout << "Number of nodes: " << num_nodes << std::endl; - std::cout << "Number of updates: " << num_edges << std::endl; - if (update_type) - std::cout << "Assuming that update format is: upd_type src dst" << std::endl; - else - std::cout << "Assuming that update format is: src dst" << std::endl; - - - out_file.write((char *) &num_nodes, sizeof(num_nodes)); - out_file.write((char *) &num_edges, sizeof(num_edges)); - - std::vector> adj_mat(num_nodes); - for (node_id_t i = 0; i < num_nodes; ++i) - adj_mat[i] = std::vector(num_nodes - i); - - bool u; - node_id_t src; - node_id_t dst; - - while(num_edges--) { - u = false; - if (update_type) - txt_file >> u >> src >> dst; - else - txt_file >> src >> dst; - - if (src > dst) { - if (!silent && u != adj_mat[dst][src - dst]) { - std::cout << "WARNING: update " << u << " " << src << " " << dst; - std::cout << " is double insert or delete before insert. Correcting." << std::endl; - } - u = adj_mat[dst][src - dst]; - adj_mat[dst][src - dst] = !adj_mat[dst][src - dst]; - } else { - if (!silent && u != adj_mat[src][dst - src]) { - std::cout << "WARNING: update " << u << " " << src << " " << dst; - std::cout << " is double insert or delete before insert. Correcting." << std::endl; - } - u = adj_mat[src][dst - src]; - adj_mat[src][dst - src] = !adj_mat[src][dst - src]; - } - - out_file.write((char *) &u, sizeof(u)); - out_file.write((char *) &src, sizeof(src)); - out_file.write((char *) &dst, sizeof(dst)); - } -} - diff --git a/tools/validate_binary_stream.cpp b/tools/validate_binary_stream.cpp deleted file mode 100644 index 41227832..00000000 --- a/tools/validate_binary_stream.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include - -int main(int argc, char **argv) { - if (argc != 2) { - std::cout << "Incorrect Number of Arguments!" << std::endl; - std::cout << "Arguments: stream_file" << std::endl; - exit(EXIT_FAILURE); - } - - BinaryFileStream stream(argv[1]); - node_id_t nodes = stream.vertices(); - size_t edges = stream.edges(); - - std::cout << "Attempting to validate stream " << argv[1] << std::endl; - std::cout << "Number of nodes = " << nodes << std::endl; - std::cout << "Number of updates = " << edges << std::endl; - - // validate the src and dst of each node in the stream and ensure there are enough of them - bool err = false; - for (size_t e = 0; e < edges; e++) { - GraphStreamUpdate upd; - try { - stream.get_update_buffer(&upd, 1); - } catch (...) { - std::cerr << "ERROR: Could not get edge at index: " << e << std::endl; - err = true; - std::rethrow_exception(std::current_exception()); - break; - } - Edge edge = upd.edge; - UpdateType u = static_cast(upd.type); - std::cerr << u << " " << edge.src << " " << edge.dst << std::endl; - if (edge.src >= nodes || edge.dst >= nodes || (u != INSERT && u != DELETE) || - edge.src == edge.dst) { - std::cerr << "ERROR: edge idx:" << e << "=(" << edge.src << "," << edge.dst << "), " << u - << std::endl; - err = true; - } - if (e % 1000000000 == 0 && e != 0) std::cout << e << std::endl; - } - - if (!err) std::cout << "Stream validated!" << std::endl; - if (err) std::cout << "Stream invalid!" << std::endl; -} - From 2534e219047f849107c70e282093a17b84d85531 Mon Sep 17 00:00:00 2001 From: Evan West Date: Mon, 12 Feb 2024 12:13:18 -0500 Subject: [PATCH 3/3] swap unnecessary unique_lock for lock_guard --- src/cc_sketch_alg.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cc_sketch_alg.cpp b/src/cc_sketch_alg.cpp index 27855b31..ecd67fcb 100644 --- a/src/cc_sketch_alg.cpp +++ b/src/cc_sketch_alg.cpp @@ -109,12 +109,12 @@ void CCSketchAlg::apply_update_batch(int thr_id, node_id_t src_vertex, delta_sketch.update(static_cast(concat_pairing_fn(src_vertex, dst))); } - std::unique_lock lk(sketches[src_vertex]->mutex); + std::lock_guard lk(sketches[src_vertex]->mutex); sketches[src_vertex]->merge(delta_sketch); } void CCSketchAlg::apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets) { - std::unique_lock lk(sketches[src_vertex]->mutex); + std::lock_guard lk(sketches[src_vertex]->mutex); sketches[src_vertex]->merge_raw_bucket_buffer(raw_buckets); } @@ -152,7 +152,7 @@ inline bool CCSketchAlg::sample_supernode(Sketch &skt) { auto src = std::min(e.src, e.dst); auto dst = std::max(e.src, e.dst); { - std::unique_lock lk(spanning_forest_mtx[src]); + std::lock_guard lk(spanning_forest_mtx[src]); spanning_forest[src].insert(dst); } } @@ -207,7 +207,7 @@ inline node_id_t find_last_partition_of_root(const std::vector &merg // merge the global and return if it is safe to query now inline bool merge_global(const size_t cur_round, const Sketch &local_sketch, GlobalMergeData &global) { - std::unique_lock lk(global.mtx); + std::lock_guard lk(global.mtx); global.sketch.range_merge(local_sketch, cur_round, 1); ++global.num_merge_done; assert(global.num_merge_done <= global.num_merge_needed); @@ -333,7 +333,7 @@ bool CCSketchAlg::perform_boruvka_round(const size_t cur_round, if (!root_from_left) { // Resolved root_from_left, so we are the first thread to encounter this root // set the number of threads that will merge into this component - std::unique_lock lk(global_merges[global_id].mtx); + std::lock_guard lk(global_merges[global_id].mtx); global_merges[global_id].num_merge_needed = global_id - thr_id + 1; } bool query_ready = merge_global(cur_round, local_sketch, global_merges[global_id]);