diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d99bdce..5603cc09 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,6 +27,11 @@ else() message(STATUS "${CMAKE_CXX_COMPILER_ID} not recognized, no flags added") endif() +#add_compile_options(-fsanitize=address) +#add_link_options(-fsanitize=address) +#add_compile_options(-fsanitize=undefined) +#add_link_options(-fsanitize=undefined) + # Check if this project is the top directory or build type is Debug # If so, build executables, otherwise, only build libraries get_directory_property(not_root PARENT_DIRECTORY) @@ -93,7 +98,6 @@ add_library(GraphZeppelin src/supernode.cpp src/graph_worker.cpp src/l0_sampling/sketch.cpp - src/l0_sampling/update.cpp src/util.cpp) add_dependencies(GraphZeppelin GutterTree) target_link_libraries(GraphZeppelin PUBLIC xxhash GutterTree) @@ -108,7 +112,6 @@ add_library(GraphZeppelinVerifyCC src/supernode.cpp src/graph_worker.cpp src/l0_sampling/sketch.cpp - src/l0_sampling/update.cpp src/util.cpp test/util/file_graph_verifier.cpp test/util/mat_graph_verifier.cpp) diff --git a/include/bucket.h b/include/bucket.h deleted file mode 100644 index 753983b3..00000000 --- a/include/bucket.h +++ /dev/null @@ -1,88 +0,0 @@ -#pragma once -#include -#include -#include "types.h" - -namespace Bucket_Boruvka { - /** - * Hashes the column index and the update index together. - * This is used as a parameter to Bucket::contains. - * @param bucket_col Column index of the bucket. - * @param update_idx Update index. - * @param sketch_seed The seed of the Sketch this Bucket belongs to. - * @return The hash of (bucket_col, update_idx) using sketch_seed as a seed. - */ - inline static col_hash_t col_index_hash(const vec_t& update_idx, const long seed_and_col); - - /** - * Hashes the index. - * This is used to as a parameter to Bucket::update - * @param index Update index. - * @param seed The seed of the Sketch this Bucket belongs to. - * @return The hash of the update index, using the sketch seed as a seed. - */ - inline static vec_hash_t index_hash(const vec_t& index, long seed); - - /** - * Checks whether the hash associated with the Bucket hashes the index to 0. - * @param col_index_hash The return value to Bucket::col_index_hash - * @param guess_nonzero A power of 2, used to check if a specific bit value is non-zero - * @return true if the index is NOT hashed to zero mod guess_nonzero. - */ - inline static bool contains(const col_hash_t& col_index_hash, const vec_t& guess_nonzero); - - /** - * Checks whether a Bucket is good, assuming the Bucket contains all elements. - * @param a The bucket's a value. - * @param c The bucket's c value. - * @param sketch_seed The seed of the Sketch this Bucket belongs to. - * @return true if this Bucket is good, else false. - */ - inline static bool is_good(const vec_t& a, const vec_hash_t& c, const long& sketch_seed); - - /** - * Checks whether a Bucket is good. - * @param a The bucket's a value. - * @param c The bucket's c value. - * @param bucket_col This Bucket's column index. - * @param guess_nonzero The guess of nonzero elements in the vector being sketched. - * @param sketch_seed The seed of the Sketch this Bucket belongs to. - * @return true if this Bucket is good, else false. - */ - inline static bool is_good(const vec_t& a, const vec_hash_t& c, const unsigned bucket_col, const vec_t& guess_nonzero, const long& sketch_seed); - - /** - * Updates a Bucket with the given update index - * @param a The bucket's a value. Modified by this function. - * @param c The bucket's c value. Modified by this function. - * @param update_idx The update index - * @param update_hash The hash of the update index, generated with Bucket::index_hash. - */ - inline static void update(vec_t& a, vec_hash_t& c, const vec_t& update_idx, const vec_hash_t& update_hash); -} // namespace Bucket_Boruvka - -inline col_hash_t Bucket_Boruvka::col_index_hash(const vec_t& update_idx, const long seed_and_col) { - return col_hash(&update_idx, sizeof(update_idx), seed_and_col); -} - -inline vec_hash_t Bucket_Boruvka::index_hash(const vec_t& index, long sketch_seed) { - return vec_hash(&index, sizeof(index), sketch_seed); -} - -inline bool Bucket_Boruvka::contains(const col_hash_t& col_index_hash, const col_hash_t& guess_nonzero) { - return (col_index_hash & guess_nonzero) == 0; // use guess_nonzero (power of 2) to check ith bit -} - -inline bool Bucket_Boruvka::is_good(const vec_t& a, const vec_hash_t& c, const long& sketch_seed) { - return c == index_hash(a, sketch_seed); -} - -inline bool Bucket_Boruvka::is_good(const vec_t& a, const vec_hash_t& c, const unsigned bucket_col, const vec_t& guess_nonzero, const long& sketch_seed) { - return c == index_hash(a, sketch_seed) - && contains(col_index_hash(a, sketch_seed + bucket_col), guess_nonzero); -} - -inline void Bucket_Boruvka::update(vec_t& a, vec_hash_t& c, const vec_t& update_idx, const vec_hash_t& update_hash) { - a ^= update_idx; - c ^= update_hash; -} diff --git a/include/graph.h b/include/graph.h index 89abb390..8d8bdbf2 100644 --- a/include/graph.h +++ b/include/graph.h @@ -1,224 +1,224 @@ -#pragma once -#include -#include -#include -#include -#include // REMOVE LATER -#include -#include - -#include -#include "supernode.h" -#include "graph_configuration.h" - -#ifdef VERIFY_SAMPLES_F -#include "test/graph_verifier.h" -#endif - -#include - -// forward declarations -class GraphWorker; - -// Exceptions the Graph class may throw -class UpdateLockedException : public std::exception { - virtual const char* what() const throw() { - return "The graph cannot be updated: Connected components algorithm has " - "already started"; - } -}; - -class MultipleGraphsException : public std::exception { - virtual const char * what() const throw() { - return "Only one Graph may be open at one time. The other Graph must be deleted."; - } -}; - -/** - * Undirected graph object with n nodes labelled 0 to n-1, no self-edges, - * multiple edges, or weights. - */ -class Graph { -protected: - node_id_t num_nodes; - uint64_t seed; - bool update_locked = false; - bool modified = false; - // a set containing one "representative" from each supernode - std::set* representatives; - Supernode** supernodes; - // DSU representation of supernode relationship -#ifdef USE_EAGER_DSU - std::atomic* parent; -#else - node_id_t* parent; -#endif - node_id_t* size; - node_id_t get_parent(node_id_t node); - bool dsu_valid = true; - - std::unordered_set* spanning_forest; - std::mutex* spanning_forest_mtx; - - // Guttering system for batching updates - GutteringSystem *gts; - - void backup_to_disk(const std::vector& ids_to_backup); - void restore_from_disk(const std::vector& ids_to_restore); - - /** - * Update the query array with new samples - * @param query an array of supernode query results - * @param reps an array containing node indices for the representative of each supernode - */ - virtual void sample_supernodes(std::pair *query, - std::vector &reps); - - /** - * @param copy_supernodes an array to be filled with supernodes - * @param to_merge an list of lists of supernodes to be merged - * - */ - void merge_supernodes(Supernode** copy_supernodes, std::vector &new_reps, - std::vector> &to_merge, bool make_copy); - - /** - * Run the disjoint set union to determine what supernodes - * Should be merged together. - * Map from nodes to a vector of nodes to merge with them - * @param query an array of supernode query results - * @param reps an array containing node indices for the representative of each supernode - */ - std::vector> supernodes_to_merge(std::pair *query, - std::vector &reps); - - /** - * Main parallel algorithm utilizing Boruvka and L_0 sampling. - * @return a vector of the connected components in the graph. - */ - std::vector> boruvka_emulation(bool make_copy); - - /** - * Generates connected components from this graph's dsu - * @return a vector of the connected components in the graph. - */ - std::vector> cc_from_dsu(); - - std::string backup_file; // where to backup the supernodes - - FRIEND_TEST(GraphTestSuite, TestCorrectnessOfReheating); - FRIEND_TEST(GraphTest, TestSupernodeRestoreAfterCCFailure); - - GraphConfiguration config; - - static bool open_graph; -public: - explicit Graph(node_id_t num_nodes, int num_inserters=1) : - Graph(num_nodes, GraphConfiguration(), num_inserters) {}; - explicit Graph(const std::string &input_file, int num_inserters=1) : - Graph(input_file, GraphConfiguration(), num_inserters) {}; - explicit Graph(const std::string &input_file, GraphConfiguration config, int num_inserters=1); - explicit Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters=1); - - virtual ~Graph(); - - inline void update(GraphUpdate upd, int thr_id = 0) { - if (update_locked) throw UpdateLockedException(); - Edge &edge = upd.edge; - - gts->insert({edge.src, edge.dst}, thr_id); - std::swap(edge.src, edge.dst); - gts->insert({edge.src, edge.dst}, thr_id); -#ifdef USE_EAGER_DSU - if (dsu_valid) { - auto src = std::min(edge.src, edge.dst); - auto dst = std::max(edge.src, edge.dst); - std::lock_guard sflock (spanning_forest_mtx[src]); - if (spanning_forest[src].find(dst) != spanning_forest[src].end()) { - dsu_valid = false; - } else { - node_id_t a = src, b = dst; - while ((a = get_parent(a)) != (b = get_parent(b))) { - if (size[a] < size[b]) { - std::swap(a, b); - } - if (std::atomic_compare_exchange_weak(&parent[b], &b, a)) { - size[a] += size[b]; - spanning_forest[src].insert(dst); - break; - } - } - } - } -#else - dsu_valid = false; -#endif // USE_EAGER_DSU - } - - /** - * Update all the sketches in supernode, given a batch of updates. - * @param src The supernode where the edges originate. - * @param edges A vector of destinations. - * @param delta_loc Memory location where we should initialize the delta - * supernode. - */ - void batch_update(node_id_t src, const std::vector &edges, Supernode *delta_loc); - - /** - * Main parallel query algorithm utilizing Boruvka and L_0 sampling. - * If cont is true, allow for additional updates when done. - * @param cont - * @return a vector of the connected components in the graph. - */ - std::vector> connected_components(bool cont=false); - - /** - * Point query algorithm utilizing Boruvka and L_0 sampling. - * Allows for additional updates when done. - * @param a, b - * @return true if a and b are in the same connected component, false otherwise. - */ - bool point_query(node_id_t a, node_id_t b); - - -#ifdef VERIFY_SAMPLES_F - std::unique_ptr verifier; - void set_verifier(std::unique_ptr verifier) { - this->verifier = std::move(verifier); - } - - // to induce a failure mid-CC - bool fail_round_2 = false; - void should_fail_CC() { fail_round_2 = true; } -#endif - - // number of updates - std::atomic num_updates; - - /** - * Generate a delta node for the purposes of updating a node sketch - * (supernode). - * @param node_n the total number of nodes in the graph. - * @param node_seed the seed of the supernode in question. - * @param src the src id. - * @param edges a list of node ids to which src is connected. - * @param delta_loc the preallocated memory where the delta_node should be - * placed. this allows memory to be reused by the same - * calling thread. - * @returns nothing (supernode delta is in delta_loc). - */ - static void generate_delta_node(node_id_t node_n, uint64_t node_seed, node_id_t src, - const std::vector &edges, Supernode *delta_loc); - - /** - * Serialize the graph data to a binary file. - * @param filename the name of the file to (over)write data to. - */ - void write_binary(const std::string &filename); - - // time hooks for experiments - std::chrono::steady_clock::time_point flush_start; - std::chrono::steady_clock::time_point flush_end; - std::chrono::steady_clock::time_point cc_alg_start; - std::chrono::steady_clock::time_point cc_alg_end; -}; +#pragma once +#include +#include +#include +#include +#include // REMOVE LATER +#include +#include + +#include +#include "supernode.h" +#include "graph_configuration.h" + +#ifdef VERIFY_SAMPLES_F +#include "test/graph_verifier.h" +#endif + +#include + +// forward declarations +class GraphWorker; + +// Exceptions the Graph class may throw +class UpdateLockedException : public std::exception { + virtual const char* what() const throw() { + return "The graph cannot be updated: Connected components algorithm has " + "already started"; + } +}; + +class MultipleGraphsException : public std::exception { + virtual const char * what() const throw() { + return "Only one Graph may be open at one time. The other Graph must be deleted."; + } +}; + +/** + * Undirected graph object with n nodes labelled 0 to n-1, no self-edges, + * multiple edges, or weights. + */ +class Graph { +protected: + node_id_t num_nodes; + uint64_t seed; + bool update_locked = false; + bool modified = false; + // a set containing one "representative" from each supernode + std::set* representatives; + Supernode** supernodes; + // DSU representation of supernode relationship +#ifdef USE_EAGER_DSU + std::atomic* parent; +#else + node_id_t* parent; +#endif + node_id_t* size; + node_id_t get_parent(node_id_t node); + bool dsu_valid = true; + + std::unordered_set* spanning_forest; + std::mutex* spanning_forest_mtx; + + // Guttering system for batching updates + GutteringSystem *gts; + + void backup_to_disk(const std::vector& ids_to_backup); + void restore_from_disk(const std::vector& ids_to_restore); + + /** + * Update the query array with new samples + * @param query an array of supernode query results + * @param reps an array containing node indices for the representative of each supernode + */ + virtual void sample_supernodes(std::pair *query, + std::vector &reps); + + /** + * @param copy_supernodes an array to be filled with supernodes + * @param to_merge an list of lists of supernodes to be merged + * + */ + void merge_supernodes(Supernode** copy_supernodes, std::vector &new_reps, + std::vector> &to_merge, bool make_copy); + + /** + * Run the disjoint set union to determine what supernodes + * Should be merged together. + * Map from nodes to a vector of nodes to merge with them + * @param query an array of supernode query results + * @param reps an array containing node indices for the representative of each supernode + */ + std::vector> supernodes_to_merge(std::pair *query, + std::vector &reps); + + /** + * Main parallel algorithm utilizing Boruvka and L_0 sampling. + * @return a vector of the connected components in the graph. + */ + std::vector> boruvka_emulation(bool make_copy); + + /** + * Generates connected components from this graph's dsu + * @return a vector of the connected components in the graph. + */ + std::vector> cc_from_dsu(); + + std::string backup_file; // where to backup the supernodes + + FRIEND_TEST(GraphTestSuite, TestCorrectnessOfReheating); + FRIEND_TEST(GraphTest, TestSupernodeRestoreAfterCCFailure); + + GraphConfiguration config; + + static bool open_graph; +public: + explicit Graph(node_id_t num_nodes, int num_inserters=1) : + Graph(num_nodes, GraphConfiguration(), num_inserters) {}; + explicit Graph(const std::string &input_file, int num_inserters=1) : + Graph(input_file, GraphConfiguration(), num_inserters) {}; + explicit Graph(const std::string &input_file, GraphConfiguration config, int num_inserters=1); + explicit Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters=1); + + virtual ~Graph(); + + inline void update(GraphUpdate upd, int thr_id = 0) { + if (update_locked) throw UpdateLockedException(); + Edge &edge = upd.edge; + + gts->insert({edge.src, edge.dst}, thr_id); + std::swap(edge.src, edge.dst); + gts->insert({edge.src, edge.dst}, thr_id); +#ifdef USE_EAGER_DSU + if (dsu_valid) { + auto src = std::min(edge.src, edge.dst); + auto dst = std::max(edge.src, edge.dst); + std::lock_guard sflock (spanning_forest_mtx[src]); + if (spanning_forest[src].find(dst) != spanning_forest[src].end()) { + dsu_valid = false; + } else { + node_id_t a = src, b = dst; + while ((a = get_parent(a)) != (b = get_parent(b))) { + if (size[a] < size[b]) { + std::swap(a, b); + } + if (std::atomic_compare_exchange_weak(&parent[b], &b, a)) { + size[a] += size[b]; + spanning_forest[src].insert(dst); + break; + } + } + } + } +#else + dsu_valid = false; +#endif // USE_EAGER_DSU + } + + /** + * Update all the sketches in supernode, given a batch of updates. + * @param src The supernode where the edges originate. + * @param edges A vector of destinations. + * @param delta_loc Memory location where we should initialize the delta + * supernode. + */ + void batch_update(node_id_t src, const std::vector &edges, Supernode *delta_loc); + + /** + * Main parallel query algorithm utilizing Boruvka and L_0 sampling. + * If cont is true, allow for additional updates when done. + * @param cont + * @return a vector of the connected components in the graph. + */ + std::vector> connected_components(bool cont=false); + + /** + * Point query algorithm utilizing Boruvka and L_0 sampling. + * Allows for additional updates when done. + * @param a, b + * @return true if a and b are in the same connected component, false otherwise. + */ + bool point_query(node_id_t a, node_id_t b); + + +#ifdef VERIFY_SAMPLES_F + std::unique_ptr verifier; + void set_verifier(std::unique_ptr verifier) { + this->verifier = std::move(verifier); + } + + // to induce a failure mid-CC + bool fail_round_2 = false; + void should_fail_CC() { fail_round_2 = true; } +#endif + + // number of updates + std::atomic num_updates; + + /** + * Generate a delta node for the purposes of updating a node sketch + * (supernode). + * @param node_n the total number of nodes in the graph. + * @param node_seed the seed of the supernode in question. + * @param src the src id. + * @param edges a list of node ids to which src is connected. + * @param delta_loc the preallocated memory where the delta_node should be + * placed. this allows memory to be reused by the same + * calling thread. + * @returns nothing (supernode delta is in delta_loc). + */ + static void generate_delta_node(node_id_t node_n, uint64_t node_seed, node_id_t src, + const std::vector &edges, Supernode *delta_loc); + + /** + * Serialize the graph data to a binary file. + * @param filename the name of the file to (over)write data to. + */ + void write_binary(const std::string &filename); + + // time hooks for experiments + std::chrono::steady_clock::time_point flush_start; + std::chrono::steady_clock::time_point flush_end; + std::chrono::steady_clock::time_point cc_alg_start; + std::chrono::steady_clock::time_point cc_alg_end; +}; diff --git a/include/l0_sampling/bucket.h b/include/l0_sampling/bucket.h new file mode 100644 index 00000000..05735589 --- /dev/null +++ b/include/l0_sampling/bucket.h @@ -0,0 +1,68 @@ +#pragma once +#include +#include +#include +#include "types.h" + +namespace Bucket_Boruvka { + static constexpr size_t col_hash_bits = sizeof(col_hash_t) * 8; + /** + * Hashes the column index and the update index together to determine the depth of an update + * This is used as a parameter to Bucket::contains. + * @param update_idx Vector index to update + * @param seed_and_col Combination of seed and column + * @param max_depth The maximum depth to return + * @return The hash of update_idx using seed_and_col as a seed. + */ + inline static col_hash_t get_index_depth(const vec_t update_idx, const long seed_and_col, + const vec_hash_t max_depth); + + /** + * Hashes the index for checksumming + * This is used to as a parameter to Bucket::update + * @param index Vector index to update + * @param seed The seed of the Sketch this Bucket belongs to + * @return The depth of the bucket to update + */ + inline static vec_hash_t get_index_hash(const vec_t index, const long sketch_seed); + + /** + * Checks whether a Bucket is good, assuming the Bucket contains all elements. + * @param a The bucket's a value. + * @param c The bucket's c value. + * @param sketch_seed The seed of the Sketch this Bucket belongs to. + * @return true if this Bucket is good, else false. + */ + inline static bool is_good(const vec_t a, const vec_hash_t c, const long sketch_seed); + + /** + * Updates a Bucket with the given update index + * @param a The bucket's a value. Modified by this function. + * @param c The bucket's c value. Modified by this function. + * @param update_idx The update index + * @param update_hash The hash of the update index, generated with Bucket::index_hash. + */ + inline static void update(vec_t& a, vec_hash_t& c, const vec_t update_idx, + const vec_hash_t update_hash); +} // namespace Bucket_Boruvka + +inline col_hash_t Bucket_Boruvka::get_index_depth(const vec_t update_idx, const long seed_and_col, + const vec_hash_t max_depth) { + col_hash_t depth_hash = col_hash(&update_idx, sizeof(vec_t), seed_and_col); + depth_hash |= (1ull << max_depth); // assert not > max_depth by ORing + return __builtin_ctzl(depth_hash); +} + +inline vec_hash_t Bucket_Boruvka::get_index_hash(const vec_t update_idx, const long sketch_seed) { + return vec_hash(&update_idx, sizeof(vec_t), sketch_seed); +} + +inline bool Bucket_Boruvka::is_good(const vec_t a, const vec_hash_t c, const long sketch_seed) { + return c == get_index_hash(a, sketch_seed); +} + +inline void Bucket_Boruvka::update(vec_t& a, vec_hash_t& c, const vec_t update_idx, + const vec_hash_t update_hash) { + a ^= update_idx; + c ^= update_hash; +} diff --git a/include/l0_sampling/sketch.h b/include/l0_sampling/sketch.h index 8ba41005..8746a5b4 100644 --- a/include/l0_sampling/sketch.h +++ b/include/l0_sampling/sketch.h @@ -1,20 +1,22 @@ #pragma once +#include + #include #include #include #include -#include #include #include #include -#include "../bucket.h" +#include + #include "../types.h" #include "../util.h" -#include +#include "bucket.h" // max number of non-zeroes in vector is n/2*n/2=n^2/4 #define guess_gen(x) double_to_ull(log2(x) - 2) -#define bucket_gen(d) double_to_ull((log2(d)+1)) +#define bucket_gen(d) double_to_ull((log2(d) + 1)) enum SampleSketchRet { GOOD, // querying this sketch returned a single non-zero value @@ -28,60 +30,63 @@ enum SampleSketchRet { * raise an error. */ class Sketch { -private: - static vec_t failure_factor; // Failure factor determines number of columns in sketch. Pr(failure) = 1 / factor - static vec_t n; // Length of the vector this is sketching. - static size_t num_elems; // length of our actual arrays in number of elements - static size_t num_buckets; // Portion of array length, number of buckets - static size_t num_guesses; // Portion of array length, number of guesses + private: + static vec_t failure_factor; // Pr(failure) = 1 / factor. Determines number of columns in sketch. + static vec_t n; // Length of the vector this is sketching. + static size_t num_elems; // length of our actual arrays in number of elements + static size_t num_buckets; // Portion of array length, number of buckets + static size_t num_guesses; // Portion of array length, number of guesses // Seed used for hashing operations in this sketch. const uint64_t seed; // pointers to buckets - vec_t* bucket_a; + vec_t* bucket_a; vec_hash_t* bucket_c; + static constexpr size_t begin_nonnull = 1; // offset at which non-null buckets occur + // Flag to keep track if this sketch has already been queried. bool already_queried = false; FRIEND_TEST(SketchTestSuite, TestExceptions); FRIEND_TEST(EXPR_Parallelism, N10kU100k); - // Buckets of this sketch. // Length is bucket_gen(failure_factor) * guess_gen(n). // For buckets[i * guess_gen(n) + j], the bucket has a 1/2^j probability // of containing an index. The first two are pointers into the buckets array. - char buckets[1]; + alignas(vec_t) char buckets[]; // private constructors -- use makeSketch Sketch(uint64_t seed); - Sketch(uint64_t seed, std::istream &binary_in); + Sketch(uint64_t seed, std::istream& binary_in); Sketch(const Sketch& s); -public: + public: /** * Construct a sketch of a vector of size n * The optional parameters are used when building a sketch from a file - * @param loc A pointer to a location in memory where the caller would like the sketch constructed + * @param loc A pointer to a location in memory where the caller would like the sketch + * constructed * @param seed Seed to use for hashing operations * @param binary_in (Optional) A file which holds an encoding of a sketch - * @return A pointer to a newly constructed sketch + * @return A pointer to a newly constructed sketch */ static Sketch* makeSketch(void* loc, uint64_t seed); - static Sketch* makeSketch(void* loc, uint64_t seed, std::istream &binary_in); - + static Sketch* makeSketch(void* loc, uint64_t seed, std::istream& binary_in); + /** * Copy constructor to create a sketch from another - * @param loc A pointer to a location in memory where the caller would like the sketch constructed + * @param loc A pointer to a location in memory where the caller would like the sketch + * constructed * @param s A sketch to make a copy of - * @return A pointer to a newly constructed sketch + * @return A pointer to a newly constructed sketch */ static Sketch* makeSketch(void* loc, const Sketch& s); - + /* configure the static variables of sketches * @param n Length of the vector to sketch. (static variable) - * @param failure_factor The rate at which an individual sketch is allowed to fail (determines column width) + * @param failure_factor 1/factor = Failure rate for sketch (determines column width) * @return nothing */ inline static void configure(vec_t _n, vec_t _factor) { @@ -89,26 +94,27 @@ class Sketch { failure_factor = _factor; num_buckets = bucket_gen(failure_factor); num_guesses = guess_gen(n); - num_elems = num_buckets * num_guesses + 1; + num_elems = num_buckets * num_guesses + 2; // +2 for zero depth bucket and null bucket } - inline static size_t sketchSizeof() - { return sizeof(Sketch) + num_elems * (sizeof(vec_t) + sizeof(vec_hash_t)) - sizeof(char); } + inline static size_t sketchSizeof() { + return sizeof(Sketch) + num_elems * (sizeof(vec_t) + sizeof(vec_hash_t)) + + (num_elems % 2) * sizeof(vec_hash_t); + } - inline static size_t serialized_size() - { return num_elems * (sizeof(vec_t) + sizeof(vec_hash_t)); } - - inline static vec_t get_failure_factor() - { return failure_factor; } + inline static size_t serialized_size() { + return num_elems * (sizeof(vec_t) + sizeof(vec_hash_t)); + } - inline void reset_queried() - { already_queried = false; } + inline static vec_t get_failure_factor() { return failure_factor; } + + inline void reset_queried() { already_queried = false; } /** * Update a sketch based on information about one of its indices. * @param update the point update. */ - void update(const vec_t& update_idx); + void update(const vec_t update_idx); /** * Update a sketch given a batch of updates. @@ -122,9 +128,7 @@ class Sketch { */ std::pair query(); - inline uint64_t get_seed() const { - return seed; - } + inline uint64_t get_seed() const { return seed; } /** * Operator to add a sketch to another one in-place. Guaranteed to be @@ -134,9 +138,9 @@ class Sketch { * @param sketch2 the one being added. * @return a reference to the combined sketch. */ - friend Sketch &operator+= (Sketch &sketch1, const Sketch &sketch2); - friend bool operator== (const Sketch &sketch1, const Sketch &sketch2); - friend std::ostream& operator<< (std::ostream &os, const Sketch &sketch); + friend Sketch& operator+=(Sketch& sketch1, const Sketch& sketch2); + friend bool operator==(const Sketch& sketch1, const Sketch& sketch2); + friend std::ostream& operator<<(std::ostream& os, const Sketch& sketch); /** * Serialize the sketch to a binary output stream. @@ -147,8 +151,6 @@ class Sketch { }; class MultipleQueryException : public std::exception { -public: - virtual const char* what() const throw() { - return "This sketch has already been sampled!"; - } + public: + virtual const char* what() const throw() { return "This sketch has already been sampled!"; } }; diff --git a/include/l0_sampling/update.h b/include/l0_sampling/update.h deleted file mode 100644 index 9079695b..00000000 --- a/include/l0_sampling/update.h +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once -#include -#include "../types.h" - -/** - * Representation of a generic vector point update. - */ -struct Update { - // the position in the vector that is changed - vec_t index; - // the magnitude of the change - long delta; - - friend std::ostream& operator<< (std::ostream &out, const Update &update); - friend bool operator== (const Update &upd1, const Update &upd2); -}; diff --git a/include/supernode.h b/include/supernode.h index 5b012062..79e16244 100644 --- a/include/supernode.h +++ b/include/supernode.h @@ -34,7 +34,7 @@ class Supernode { /* collection of logn sketches to query from, since we can't query from one sketch more than once */ // The sketches, off the end. - alignas(Sketch) char sketch_buffer[1]; + alignas(Sketch) char sketch_buffer[]; /** * @param n the total number of nodes in the graph. @@ -81,7 +81,7 @@ class Supernode { static inline void configure(uint64_t n, vec_t sketch_fail_factor=100) { Sketch::configure(n*n, sketch_fail_factor); - bytes_size = sizeof(Supernode) + size_t(log2(n)/(log2(3)-1)) * Sketch::sketchSizeof() - sizeof(char); + bytes_size = sizeof(Supernode) + size_t(log2(n)/(log2(3)-1)) * Sketch::sketchSizeof(); serialized_size = size_t(log2(n)/(log2(3)-1)) * Sketch::serialized_size(); } diff --git a/include/types.h b/include/types.h index 725d6463..deda3a06 100644 --- a/include/types.h +++ b/include/types.h @@ -3,7 +3,7 @@ #include typedef uint64_t col_hash_t; -static const auto& vec_hash = XXH32; +static const auto& vec_hash = XXH3_64bits_withSeed; static const auto& col_hash = XXH3_64bits_withSeed; // Is a stream update an insertion or a deletion diff --git a/src/l0_sampling/sketch.cpp b/src/l0_sampling/sketch.cpp index c865fe61..7d5c22d5 100644 --- a/src/l0_sampling/sketch.cpp +++ b/src/l0_sampling/sketch.cpp @@ -54,17 +54,19 @@ Sketch::Sketch(const Sketch& s) : seed(s.seed) { std::memcpy(bucket_c, s.bucket_c, num_elems * sizeof(vec_hash_t)); } -void Sketch::update(const vec_t& update_idx) { - vec_hash_t update_hash = Bucket_Boruvka::index_hash(update_idx, seed); - Bucket_Boruvka::update(bucket_a[num_elems - 1], bucket_c[num_elems - 1], update_idx, update_hash); +void Sketch::update(const vec_t update_idx) { + vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, seed); + + // Update depth 0 bucket + Bucket_Boruvka::update(bucket_a[num_elems - 1], bucket_c[num_elems - 1], update_idx, checksum); + + // Update higher depth buckets for (unsigned i = 0; i < num_buckets; ++i) { - col_hash_t col_index_hash = Bucket_Boruvka::col_index_hash(update_idx, seed + i); - for (unsigned j = 0; j < num_guesses; ++j) { - unsigned bucket_id = i * num_guesses + j; - if (Bucket_Boruvka::contains(col_index_hash, ((col_hash_t)1) << j)){ - Bucket_Boruvka::update(bucket_a[bucket_id], bucket_c[bucket_id], update_idx, update_hash); - } else break; - } + col_hash_t depth = Bucket_Boruvka::get_index_depth(update_idx, seed + i, num_guesses); + size_t bucket_id = i * num_guesses + depth; + bucket_id *= (bool)(depth!=0); // if depth is 0 then "update" null bucket -> bucket[0] + + Bucket_Boruvka::update(bucket_a[bucket_id], bucket_c[bucket_id], update_idx, checksum); } } @@ -87,9 +89,10 @@ std::pair Sketch::query() { return {bucket_a[num_elems - 1], GOOD}; } for (unsigned i = 0; i < num_buckets; ++i) { - for (unsigned j = 0; j < num_guesses; ++j) { + // bucket[0] is null + for (unsigned j = begin_nonnull; j < num_guesses; ++j) { unsigned bucket_id = i * num_guesses + j; - if (Bucket_Boruvka::is_good(bucket_a[bucket_id], bucket_c[bucket_id], i, 1 << j, seed)) { + if (Bucket_Boruvka::is_good(bucket_a[bucket_id], bucket_c[bucket_id], seed)) { return {bucket_a[bucket_id], GOOD}; } } @@ -99,7 +102,7 @@ std::pair Sketch::query() { Sketch &operator+= (Sketch &sketch1, const Sketch &sketch2) { assert (sketch1.seed == sketch2.seed); - for (unsigned i = 0; i < Sketch::num_elems; i++) { + for (unsigned i = Sketch::begin_nonnull; i < Sketch::num_elems; i++) { sketch1.bucket_a[i] ^= sketch2.bucket_a[i]; sketch1.bucket_c[i] ^= sketch2.bucket_c[i]; } @@ -111,11 +114,11 @@ bool operator== (const Sketch &sketch1, const Sketch &sketch2) { if (sketch1.seed != sketch2.seed || sketch1.already_queried != sketch2.already_queried) return false; - for (size_t i = 0; i < Sketch::num_elems; ++i) { + for (size_t i = Sketch::begin_nonnull; i < Sketch::num_elems; ++i) { if (sketch1.bucket_a[i] != sketch2.bucket_a[i]) return false; } - for (size_t i = 0; i < Sketch::num_elems; ++i) { + for (size_t i = Sketch::begin_nonnull; i < Sketch::num_elems; ++i) { if (sketch1.bucket_c[i] != sketch2.bucket_c[i]) return false; } @@ -123,25 +126,22 @@ bool operator== (const Sketch &sketch1, const Sketch &sketch2) { } std::ostream& operator<< (std::ostream &os, const Sketch &sketch) { - for (unsigned k = 0; k < Sketch::n; k++) { - os << '1'; - } - os << std::endl - << "a:" << sketch.bucket_a[Sketch::num_buckets * Sketch::num_guesses] << std::endl - << "c:" << sketch.bucket_c[Sketch::num_buckets * Sketch::num_guesses] << std::endl - << (Bucket_Boruvka::is_good(sketch.bucket_a[Sketch::num_buckets * Sketch::num_guesses], sketch.bucket_c[Sketch::num_buckets * Sketch::num_guesses], sketch.seed) ? "good" : "bad") << std::endl; + vec_t a = sketch.bucket_a[Sketch::num_elems - 1]; + vec_hash_t c = sketch.bucket_c[Sketch::num_elems - 1]; + bool good = Bucket_Boruvka::is_good(a, c, sketch.seed); + + os << " a:" << a << " c:" << c << (good ? " good" : " bad") << std::endl; for (unsigned i = 0; i < Sketch::num_buckets; ++i) { - for (unsigned j = 0; j < Sketch::num_guesses; ++j) { + for (unsigned j = Sketch::begin_nonnull; j < Sketch::num_guesses; ++j) { unsigned bucket_id = i * Sketch::num_guesses + j; - for (unsigned k = 0; k < Sketch::n; k++) { - os << (Bucket_Boruvka::contains(Bucket_Boruvka::col_index_hash(k, sketch.seed + 1), 1 << j) ? '1' : '0'); - } - os << std::endl - << "a:" << sketch.bucket_a[bucket_id] << std::endl - << "c:" << sketch.bucket_c[bucket_id] << std::endl - << (Bucket_Boruvka::is_good(sketch.bucket_a[bucket_id], sketch.bucket_c[bucket_id], i, 1 << j, sketch.seed) ? "good" : "bad") << std::endl; + vec_t a = sketch.bucket_a[bucket_id]; + vec_hash_t c = sketch.bucket_c[bucket_id]; + bool good = Bucket_Boruvka::is_good(a, c, sketch.seed); + + os << " a:" << a << " c:" << c << (good ? " good" : " bad") << std::endl; } + os << std::endl; } return os; } @@ -151,6 +151,8 @@ void Sketch::write_binary(std::ostream& binary_out) { } void Sketch::write_binary(std::ostream &binary_out) const { + // Write out the bucket values to the stream. + // Do not include the null bucket binary_out.write((char*)bucket_a, num_elems * sizeof(vec_t)); binary_out.write((char*)bucket_c, num_elems * sizeof(vec_hash_t)); } diff --git a/src/l0_sampling/update.cpp b/src/l0_sampling/update.cpp deleted file mode 100644 index de603f53..00000000 --- a/src/l0_sampling/update.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "../../include/l0_sampling/update.h" - -std::ostream& operator<< (std::ostream &out, const Update &update){ - out << "Index: " << update.index << " Value: " << update.delta; - return out; -} - -bool operator== (const Update &upd1, const Update &upd2) { - return upd1.index == upd2.index && upd1.delta == upd2.delta; -} diff --git a/test/sketch_test.cpp b/test/sketch_test.cpp index fc83ef9c..172b4110 100644 --- a/test/sketch_test.cpp +++ b/test/sketch_test.cpp @@ -6,15 +6,6 @@ static const int fail_factor = 100; -bool contains_inclusive(col_hash_t hash, col_hash_t guess) { - for (col_hash_t i = 1; i <= guess; i<<=1) { - if (!Bucket_Boruvka::contains(hash, i)) { - return false; - } - } - return true; -} - TEST(SketchTestSuite, TestExceptions) { Sketch::configure(100, fail_factor); SketchUniquePtr sketch1 = makeSketch(rand()); @@ -29,30 +20,40 @@ TEST(SketchTestSuite, TestExceptions) { std::vector vec_idx(sketch2->n, true); unsigned long long num_buckets = bucket_gen(fail_factor); unsigned long long num_guesses = guess_gen(sketch2->n); - for (unsigned long long i = 0; i < num_buckets; ++i) { - for (unsigned long long j = 0; j < num_guesses;) { - uint64_t index = 0; - for (uint64_t k = 0; k < sketch2->n; ++k) { - if (vec_idx[k] && contains_inclusive(Bucket_Boruvka::col_index_hash(k, sketch2->seed + i), 1 << j)) { - if (index == 0) { - index = k + 1; - } else { - index = 0; - break; - } - } + size_t total_updates = 2; + for (unsigned long long i = 0; i < num_buckets;) { + size_t depth_1_updates = 0; + size_t k = 0; + size_t u = 0; + for (; u < total_updates || depth_1_updates < 2;) { + if (vec_idx[k] == false) { + ++k; + continue; + } + + col_hash_t depth = Bucket_Boruvka::get_index_depth(k, sketch2->seed + i, num_guesses); + if (depth >= 2) { + vec_idx[k] = false; // force all updates to only touch depths <= 1 + i = 0; + break; } - if (index) { - vec_idx[index - 1] = false; - i = j = 0; - } else { - ++j; + else if (depth == 1) { + ++depth_1_updates; } + ++u; + ++k; + } + if (u > total_updates) { + total_updates = u; + i = 0; } + else if (u == total_updates) ++i; } + size_t applied_updates = 0; for (uint64_t i = 0; i < sketch2->n; ++i) { if (vec_idx[i]) { sketch2->update(static_cast(i)); + if (++applied_updates >= total_updates) break; } } ASSERT_EQ(sketch2->query().second, FAIL); diff --git a/test/supernode_test.cpp b/test/supernode_test.cpp index 7ce8c5dd..475db3ab 100644 --- a/test/supernode_test.cpp +++ b/test/supernode_test.cpp @@ -99,7 +99,7 @@ TEST_F(SupernodeTestSuite, TestSampleInsertGrinder) { ASSERT_GE(successes, (int)log2(num_nodes)) << "Fewer than logn successful queries: supernode " << i; } - for (unsigned i = 0; i < num_nodes; ++i) delete snodes[i]; + for (unsigned i = 0; i < num_nodes; ++i) free(snodes[i]); } TEST_F(SupernodeTestSuite, TestSampleDeleteGrinder) { diff --git a/tools/benchmark/graphcc_bench.cpp b/tools/benchmark/graphcc_bench.cpp index 76d2b713..e8f901e8 100644 --- a/tools/benchmark/graphcc_bench.cpp +++ b/tools/benchmark/graphcc_bench.cpp @@ -36,7 +36,7 @@ static void flush_filesystem_cache() { } // Test the speed of reading all the data in the kron16 graph stream -static void BM_FileIngest(benchmark::State &state) { +static void BM_FileIngest(benchmark::State& state) { // determine the number of edges in the graph uint64_t num_edges; { @@ -63,7 +63,7 @@ static void BM_FileIngest(benchmark::State &state) { BENCHMARK(BM_FileIngest)->RangeMultiplier(2)->Range(KB << 2, MB / 4)->UseRealTime(); // Test the speed of reading all the data in the kron16 graph stream -static void BM_MTFileIngest(benchmark::State &state) { +static void BM_MTFileIngest(benchmark::State& state) { // determine the number of edges in the graph uint64_t num_edges; { @@ -98,106 +98,116 @@ static void BM_MTFileIngest(benchmark::State &state) { BENCHMARK(BM_MTFileIngest)->RangeMultiplier(4)->Range(1, 20)->UseRealTime(); #endif // FILE_INGEST_F +static void BM_builtin_ffsl(benchmark::State& state) { + size_t i = 0; + size_t j = -1; + for (auto _ : state) { + benchmark::DoNotOptimize(__builtin_ffsl(i++)); + benchmark::DoNotOptimize(__builtin_ffsl(j++)); + } +} +BENCHMARK(BM_builtin_ffsl); + +static void BM_builtin_ctzl(benchmark::State& state) { + size_t i = 0; + size_t j = -1; + for (auto _ : state) { + benchmark::DoNotOptimize(__builtin_ctzl(i++)); + benchmark::DoNotOptimize(__builtin_ctzl(j++)); + } +} +BENCHMARK(BM_builtin_ctzl); + +static void BM_builtin_clzl(benchmark::State& state) { + size_t i = 0; + size_t j = -1; + for (auto _ : state) { + benchmark::DoNotOptimize(__builtin_clzl(i++)); + benchmark::DoNotOptimize(__builtin_clzl(j++)); + } +} +BENCHMARK(BM_builtin_clzl); + // Test the speed of hashing using a method that loops over seeds and a method that // batches by seed // The argument to this benchmark is the number of hashes to batch -static void BM_Hash_XXH64(benchmark::State &state) { - uint64_t num_seeds = 8; - uint64_t num_hashes = state.range(0); - uint64_t output; +static void BM_Hash_XXH64(benchmark::State& state) { + uint64_t input = 100'000; for (auto _ : state) { - for (uint64_t h = 0; h < num_seeds; h++) { - for (uint64_t i = 0; i < num_hashes; i++) { - benchmark::DoNotOptimize(output = XXH64(&i, sizeof(uint64_t), seed + h)); - } - } + ++input; + benchmark::DoNotOptimize(XXH64(&input, sizeof(uint64_t), seed)); } - state.counters["Hash Rate"] = benchmark::Counter( - state.iterations() * num_hashes, benchmark::Counter::kIsRate); + state.counters["Hash Rate"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); } -BENCHMARK(BM_Hash_XXH64)->Arg(1)->Arg(100)->Arg(10000); +BENCHMARK(BM_Hash_XXH64); -static void BM_Hash_XXH3_64(benchmark::State &state) { - uint64_t num_seeds = 8; - uint64_t num_hashes = state.range(0); - uint64_t output; +static void BM_Hash_XXH3_64(benchmark::State& state) { + uint64_t input = 100'000; for (auto _ : state) { - for (uint64_t h = 0; h < num_seeds; h++) { - for (uint64_t i = 0; i < num_hashes; i++) { - benchmark::DoNotOptimize(output = XXH3_64bits_withSeed(&i, sizeof(uint64_t), seed + h)); - } - } + ++input; + benchmark::DoNotOptimize(XXH3_64bits_withSeed(&input, sizeof(uint64_t), seed)); } - state.counters["Hash Rate"] = benchmark::Counter( - state.iterations() * num_hashes, benchmark::Counter::kIsRate); + state.counters["Hash Rate"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); } -BENCHMARK(BM_Hash_XXH3_64)->Arg(1)->Arg(100)->Arg(10000); +BENCHMARK(BM_Hash_XXH3_64); -static void BM_Hash_bucket(benchmark::State &state) { - uint64_t num_seeds = 8; - uint64_t num_hashes = state.range(0); - uint64_t output; +static void BM_index_depth_hash(benchmark::State& state) { + uint64_t input = 100'000; for (auto _ : state) { - for (uint64_t h = 0; h < num_seeds; h++) { - for (uint64_t i = 0; i < num_hashes; i++) { - benchmark::DoNotOptimize(output = Bucket_Boruvka::col_index_hash(i, seed + h)); - } - } + ++input; + benchmark::DoNotOptimize(Bucket_Boruvka::get_index_depth(input, seed, 20)); } - state.counters["Hash Rate"] = benchmark::Counter( - state.iterations() * num_hashes, benchmark::Counter::kIsRate); + state.counters["Hash Rate"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); } -BENCHMARK(BM_Hash_bucket)->Arg(1)->Arg(100)->Arg(10000); +BENCHMARK(BM_index_depth_hash); + +static void BM_index_hash(benchmark::State& state) { + uint64_t input = 100'000; + for (auto _ : state) { + ++input; + benchmark::DoNotOptimize(Bucket_Boruvka::get_index_hash(input, seed)); + } + state.counters["Hash Rate"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); +} +BENCHMARK(BM_index_hash); + +static void BM_update_bucket(benchmark::State& state) { + vec_t a = 0; + vec_hash_t c = 0; + vec_t input = 0x0EADBEEF; + vec_hash_t checksum = 0x0EEDBEEF; + + for (auto _ : state) { + ++input; + ++checksum; + Bucket_Boruvka::update(a, c, input, checksum); + benchmark::DoNotOptimize(a); + benchmark::DoNotOptimize(c); + } +} +BENCHMARK(BM_update_bucket); // Benchmark the speed of updating sketches both serially and in batch mode -static void BM_Sketch_Update(benchmark::State &state) { - constexpr size_t upd_per_sketch = 10000; - constexpr size_t num_sketches = 1000; +static void BM_Sketch_Update(benchmark::State& state) { size_t vec_size = state.range(0); + vec_t input = vec_size / 4; // initialize sketches Sketch::configure(vec_size, 100); - SketchUniquePtr sketches[num_sketches]; - for (size_t i = 0; i < num_sketches; i++) { - sketches[i] = makeSketch(seed + i); - } - - // initialize updates - srand(seed); - std::vector> updates; - for (size_t i = 0; i < num_sketches; i++) { - updates.emplace_back(); - updates[i].reserve(upd_per_sketch); - for (size_t j = 0; j < upd_per_sketch; j++) { - updates[i].push_back(j % vec_size); - } - } + SketchUniquePtr skt = makeSketch(seed); // Test the speed of updating the sketches for (auto _ : state) { - // perform updates - if (!state.range(1)) { - // update serially - for (size_t j = 0; j < upd_per_sketch; j++) { - for (size_t i = 0; i < num_sketches; i++) { - sketches[i]->update(updates[i][j]); - } - } - } else { - // update in batch - for (size_t i = 0; i < num_sketches; i++) { - sketches[i]->batch_update(updates[i]); - } - } + ++input; + skt->update(input); } - state.counters["Update_Rate"] = benchmark::Counter( - state.iterations() * upd_per_sketch * num_sketches, benchmark::Counter::kIsRate); + state.counters["Updates"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); state.counters["Hashes"] = benchmark::Counter( - state.iterations() * upd_per_sketch * num_sketches * 7, benchmark::Counter::kIsRate); + state.iterations() * (bucket_gen(100) + 1), benchmark::Counter::kIsRate); } -BENCHMARK(BM_Sketch_Update)->RangeMultiplier(4)->Ranges({{KB << 4, MB << 4}, {false, true}}); +BENCHMARK(BM_Sketch_Update)->RangeMultiplier(4)->Ranges({{KB << 4, MB << 4}}); // Benchmark the speed of querying sketches -static void BM_Sketch_Query(benchmark::State &state) { +static void BM_Sketch_Query(benchmark::State& state) { constexpr size_t vec_size = KB << 5; constexpr size_t num_sketches = 100; double density = ((double)state.range(0)) / 100; @@ -224,7 +234,7 @@ static void BM_Sketch_Query(benchmark::State &state) { sketches[j]->reset_queried(); } } - state.counters["Query_Rate"] = benchmark::Counter( + state.counters["Query Rate"] = benchmark::Counter( state.iterations() * num_sketches, benchmark::Counter::kIsRate); } BENCHMARK(BM_Sketch_Query)->DenseRange(0, 90, 10);