-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathsupernode.h
More file actions
220 lines (182 loc) · 7.35 KB
/
supernode.h
File metadata and controls
220 lines (182 loc) · 7.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#pragma once
#include <fstream>
#include <sys/mman.h>
#include <graph_zeppelin_common.h>
#include "l0_sampling/sketch.h"
enum SerialType {
FULL,
PARTIAL,
SPARSE,
};
/**
* This interface implements the "supernode" so Boruvka can use it as a black
* box without needing to worry about implementing l_0.
*/
class Supernode {
static size_t max_sketches;
static size_t bytes_size; // the size of a super-node in bytes including the sketches
static size_t serialized_size; // the size of a supernode that has been serialized
size_t sample_idx;
std::mutex node_mt;
FRIEND_TEST(SupernodeTestSuite, TestBatchUpdate);
FRIEND_TEST(SupernodeTestSuite, TestConcurrency);
FRIEND_TEST(SupernodeTestSuite, TestSerialization);
FRIEND_TEST(SupernodeTestSuite, TestPartialSparseSerialization);
FRIEND_TEST(SupernodeTestSuite, SketchesHaveUniqueSeeds);
FRIEND_TEST(GraphTestSuite, TestCorrectnessOfReheating);
FRIEND_TEST(GraphTest, TestSupernodeRestoreAfterCCFailure);
FRIEND_TEST(EXPR_Parallelism, N10kU100k);
public:
const uint64_t n; // for creating a copy
const uint64_t seed; // for creating a copy
private:
size_t num_sketches;
size_t merged_sketches; // This variable tells us which sketches are good for queries post merge
size_t sketch_size;
/* collection of logn sketches to query from, since we can't query from one
sketch more than once */
// The sketches, off the end.
alignas(Sketch) char sketch_buffer[];
/**
* @param n the total number of nodes in the graph.
* @param seed the (fixed) seed value passed to each supernode.
*/
Supernode(uint64_t n, uint64_t seed);
/**
* @param n the total number of nodes in the graph.
* @param seed the (fixed) seed value passed to each supernode.
* @param binary_in A stream to read the data from.
*/
Supernode(uint64_t n, uint64_t seed, std::istream &binary_in);
Supernode(const Supernode& s);
// get the ith sketch in the sketch array
inline Sketch* get_sketch(size_t i) {
return reinterpret_cast<Sketch*>(sketch_buffer + i * sketch_size);
}
// version of above for const supernode objects
inline const Sketch* get_sketch(size_t i) const {
return reinterpret_cast<const Sketch*>(sketch_buffer + i * sketch_size);
}
public:
/**
* Supernode construtors
* @param n the total number of nodes in the graph.
* @param seed the (fixed) seed value passed to each supernode.
* @param loc (Optional) the memory location to put the supernode.
* @return a pointer to the newly created supernode object
*/
static Supernode* makeSupernode(uint64_t n, long seed, void *loc = malloc(bytes_size));
// create supernode from file
static Supernode* makeSupernode(uint64_t n, long seed, std::istream &binary_in,
void *loc = malloc(bytes_size));
// copy 'constructor'
static Supernode* makeSupernode(const Supernode& s, void *loc = malloc(bytes_size));
~Supernode();
static inline void configure(uint64_t n, vec_t sketch_fail_factor=default_fail_factor) {
Sketch::configure(n*n, sketch_fail_factor);
max_sketches = log2(n)/(log2(3)-1);
bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof();
serialized_size = max_sketches * Sketch::serialized_size();
}
static inline size_t get_size() {
return bytes_size;
}
// return the size of a supernode that has been serialized using write_binary()
static inline size_t get_serialized_size() {
return serialized_size;
}
inline size_t get_sketch_size() {
return sketch_size;
}
// return the maximum number of sketches held in by a Supernode
// most Supernodes will hold this many sketches
static int get_max_sketches() { return max_sketches; };
// get number of samples remaining in the Supernode
int samples_remaining() { return merged_sketches - sample_idx; }
inline bool out_of_queries() {
return sample_idx >= merged_sketches;
}
inline int curr_idx() {
return sample_idx;
}
// reset the supernode query metadata
// we use this when resuming insertions after CC made copies in memory
inline void reset_query_state() {
for (size_t i = 0; i < sample_idx; i++) {
get_sketch(i)->reset_queried();
}
sample_idx = 0;
}
// get the ith sketch in the sketch array as a const object
inline const Sketch* get_const_sketch(size_t i) {
return reinterpret_cast<Sketch*>(sketch_buffer + i * sketch_size);
}
/**
* Function to sample an edge from the cut of a supernode.
* @return an edge in the cut, represented as an Edge with LHS <= RHS,
* if one exists. Additionally, returns a code representing the
* sample result (good, zero, or fail)
*/
std::pair<Edge, SampleSketchRet> sample();
/**
* Function to sample 1 or more edges from the cut of a supernode.
* This function runs a query that samples from all columns in a single Sketch
* @return an list of edges in the cut, each represented as an Edge with LHS <= RHS,
* if one exists. Additionally, returns a code represnting the sample
* result (good, zero, or fail)
*/
std::pair<std::vector<Edge>, SampleSketchRet> exhaustive_sample();
/**
* In-place merge function. Guaranteed to update the caller Supernode.
*/
void merge(Supernode& other);
/**
* In-place range merge function. Updates the caller Supernode.
* The range merge only merges some of the Sketches
* This function should only be used if you know what you're doing
* @param other Supernode to merge into caller
* @param start_idx Index of first Sketch to merge
* @param num_merge How many sketches to merge
*/
void range_merge(Supernode& other, size_t start_idx, size_t num_merge);
/**
* Insert or delete an (encoded) edge into the supernode. Guaranteed to be
* processed BEFORE Boruvka starts.
*/
void update(vec_t update);
/**
* Update all the sketches in a supernode, given a batch of updates.
* @param delta_node a delta supernode created through calling
* Supernode::delta_supernode.
*/
void apply_delta_update(const Supernode* delta_node);
/**
* Create new delta supernode with given initial parmameters and batch of
* updates to apply.
* @param n see declared constructor.
* @param seed see declared constructor.
* @param updates the batch of updates to apply.
* @param loc the location to place the delta in
*/
static void delta_supernode(uint64_t n, uint64_t seed, const
std::vector<vec_t>& updates, void *loc);
/**
* Serialize the supernode to a binary output stream.
* @param binary_out the stream to write to.
*/
void write_binary(std::ostream &binary_out, bool sparse = false);
/*
* Serialize a portion of the supernode to a binary output stream.
* @param binary_out the stream to write to.
* @param beg the index of the first sketch to serialize
* @param num the number of sketches to serialize
*/
void write_binary_range(std::ostream&binary_out, uint32_t beg, uint32_t num, bool sparse = false);
// void write_sparse_binary_range(std::ostream&binary_out, uint32_t beg, uint32_t end);
static constexpr size_t default_fail_factor = 4;
};
class OutOfQueriesException : public std::exception {
virtual const char* what() const throw() {
return "This supernode cannot be sampled more times!";
}
};