Skip to content

Commit a80efd4

Browse files
stephenpluspluscrwilcox
authored andcommitted
Rewrite PartialResultStream (#208)
Fixes #180 and #198 by reworking the way we process streamed rows from PartialResultStream on Spanner. The high-level difference is this - use RowBuilder for all streams. We had two code paths previously - do not block on returning until we have a non-chunked row. We can hold the final value of a chunked segment and begin returning other data - handle partial segments. Occasionally we will get a partial row and shouldn't try to send that to the user.
1 parent 529d37d commit a80efd4

8 files changed

Lines changed: 2642 additions & 2782 deletions

File tree

handwritten/spanner/package-lock.json

Lines changed: 2354 additions & 2359 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

handwritten/spanner/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
"google-gax": "^0.16.0",
7878
"google-proto-files": "^0.15.0",
7979
"is": "^3.1.0",
80-
"lodash.chunk": "^4.2.0",
8180
"lodash.flatten": "^4.4.0",
8281
"lodash.merge": "^4.6.1",
8382
"lodash.snakecase": "^4.1.1",

handwritten/spanner/src/codec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ function decode(value, field) {
176176
return decoded;
177177
}
178178

179-
return decodeValue_(commonGrpc.Service.decodeValue_(value), field.type);
179+
return decodeValue_(value, field.type);
180180
}
181181

182182
codec.decode = decode;

handwritten/spanner/src/partial-result-stream.js

Lines changed: 10 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
'use strict';
1818

1919
var checkpointStream = require('checkpoint-stream');
20-
var chunk = require('lodash.chunk');
2120
var eventsIntercept = require('events-intercept');
2221
var exec = require('methmeth');
2322
var extend = require('extend');
@@ -27,7 +26,6 @@ var split = require('split-array-stream').split;
2726
var streamEvents = require('stream-events');
2827
var through = require('through2');
2928

30-
var codec = require('./codec.js');
3129
var RowBuilder = require('./row-builder.js');
3230

3331
/**
@@ -68,45 +66,26 @@ function partialResultStream(requestFn, options) {
6866
},
6967
});
7068

71-
var rowChunks = [];
72-
var metadata;
69+
var builder;
7370

7471
var userStream = streamEvents(
7572
through.obj(function(row, _, next) {
76-
var formattedRows = [];
77-
78-
if (row.metadata) {
79-
metadata = row.metadata;
80-
}
81-
82-
if (row.chunkedValue) {
83-
rowChunks.push(row);
84-
next();
85-
return;
86-
}
87-
8873
if (is.empty(row.values)) {
8974
next();
9075
return;
9176
}
9277

93-
if (rowChunks.length > 0) {
94-
// Done getting all the chunks. Put them together.
95-
var builder = new RowBuilder(metadata, rowChunks.concat(row));
96-
formattedRows = formattedRows.concat(builder.toJSON());
97-
rowChunks.length = 0;
98-
} else {
99-
var formattedRow = partialResultStream.formatRow_(metadata, row);
100-
var multipleRows = is.array(formattedRow[0]);
101-
102-
if (multipleRows) {
103-
formattedRows = formattedRows.concat(formattedRow);
104-
} else {
105-
formattedRows.push(formattedRow);
106-
}
78+
// Use RowBuilder to construct and return complete, formatted rows.
79+
if (!builder) {
80+
builder = new RowBuilder(row.metadata.rowType.fields);
10781
}
10882

109-
rowChunks = [];
83+
builder.addRow(row);
84+
85+
// Build the chunks to rows.
86+
builder.build();
87+
88+
var formattedRows = builder.toJSON(builder.flush());
11089

11190
if (options.json) {
11291
formattedRows = formattedRows.map(exec('toJSON', options.jsonOptions));
@@ -155,41 +134,4 @@ function partialResultStream(requestFn, options) {
155134
.pipe(userStream);
156135
}
157136

158-
/**
159-
* Format a PartialResultSet response from the API. A row object will be created
160-
* to map each field name to its decoded value.
161-
*
162-
* If multiple rows exist in a single PartialResultSet, an array is returned.
163-
*
164-
* @param {object} row - A `PartialResultSet` object.
165-
*/
166-
partialResultStream.formatRow_ = function(metadata, row) {
167-
var fields = metadata.rowType.fields;
168-
169-
if (row.values.length > fields.length) {
170-
// More than one row exists. Return an array of formatted rows.
171-
var valueSets = chunk(row.values, fields.length);
172-
173-
return valueSets.map(function(valueSet) {
174-
row.values = valueSet;
175-
return partialResultStream.formatRow_(metadata, row);
176-
});
177-
}
178-
179-
var formattedRow = row.values.map(function(value, index) {
180-
var field = fields[index];
181-
return {
182-
name: field.name,
183-
value: codec.decode(value, field),
184-
};
185-
});
186-
187-
Object.defineProperty(formattedRow, 'toJSON', {
188-
enumerable: false,
189-
value: codec.generateToJSONFromRow(formattedRow),
190-
});
191-
192-
return formattedRow;
193-
};
194-
195137
module.exports = partialResultStream;

handwritten/spanner/src/row-builder.js

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
'use strict';
1818

1919
var codec = require('./codec.js');
20+
var commonGrpc = require('@google-cloud/common-grpc');
2021
var is = require('is');
2122

2223
/*!
@@ -25,11 +26,9 @@ var is = require('is');
2526
* @private
2627
* @class
2728
*/
28-
function RowBuilder(metadata, chunks) {
29-
this.metadata = metadata;
30-
this.fields = this.metadata.rowType.fields;
31-
this.chunks = chunks;
32-
29+
function RowBuilder(fields) {
30+
this.fields = fields;
31+
this.chunks = [];
3332
this.rows = [[]];
3433

3534
Object.defineProperty(this, 'currentRow', {
@@ -46,7 +45,7 @@ RowBuilder.getValue = function(obj) {
4645
var value = obj;
4746

4847
if (obj && obj.kind) {
49-
value = obj[obj.kind];
48+
value = commonGrpc.Service.decodeValue_(obj);
5049
}
5150

5251
if (value && value.values) {
@@ -72,11 +71,11 @@ RowBuilder.formatValue = function(field, value) {
7271
}
7372

7473
if (field.code !== 'STRUCT') {
75-
return value;
74+
return codec.decode(value, field);
7675
}
7776

7877
return field.structType.fields.reduce(function(struct, field, index) {
79-
struct[field.name] = RowBuilder.formatValue(field.type, value[index]);
78+
struct[field.name] = RowBuilder.formatValue(field, value[index]);
8079
return struct;
8180
}, {});
8281
};
@@ -116,14 +115,20 @@ RowBuilder.merge = function(type, head, tail) {
116115
});
117116
};
118117

118+
/**
119+
* Add a PartialResultSet response object to the pending rows.
120+
*/
121+
RowBuilder.prototype.addRow = function(row) {
122+
this.chunks = this.chunks.concat(row);
123+
};
124+
119125
/**
120126
* Appends element to row.
121127
*/
122128
RowBuilder.prototype.append = function(value) {
123129
if (this.currentRow.length === this.fields.length) {
124130
this.rows.push([]);
125131
}
126-
127132
this.currentRow.push(value);
128133
};
129134

@@ -132,35 +137,59 @@ RowBuilder.prototype.append = function(value) {
132137
*/
133138
RowBuilder.prototype.build = function() {
134139
var self = this;
135-
var previousChunk;
136140

137141
this.chunks.forEach(function(chunk) {
138-
if (previousChunk && previousChunk.chunkedValue) {
139-
var type = self.fields[self.currentRow.length - 1].type;
142+
// If we have a chunk to merge, merge the values now.
143+
if (self.pendingChunk) {
144+
var currentColumn = self.currentRow.length % self.fields.length;
140145
var merged = RowBuilder.merge(
141-
type,
142-
self.currentRow.pop(),
146+
self.fields[currentColumn].type,
147+
self.pendingChunk,
143148
chunk.values.shift()
144149
);
150+
chunk.values = merged.concat(chunk.values);
151+
delete self.pendingChunk;
152+
}
145153

146-
merged.forEach(self.append.bind(self));
154+
// If the chunk is chunked, store the last value for merging with the next
155+
// chunk to be processed.
156+
if (chunk.chunkedValue) {
157+
self.pendingChunk = chunk.values.pop();
147158
}
148159

149160
chunk.values.map(RowBuilder.getValue).forEach(self.append.bind(self));
150-
151-
previousChunk = chunk;
152161
});
162+
163+
// As chunks are now in rows, remove them.
164+
this.chunks.length = 0;
153165
};
154166

155167
/**
156-
* Transforms values into JSON format.
168+
* Flush already complete rows.
157169
*/
158-
RowBuilder.prototype.toJSON = function() {
159-
this.build();
170+
RowBuilder.prototype.flush = function() {
171+
var rowsToReturn = this.rows;
172+
173+
if (
174+
!is.empty(this.rows[0]) &&
175+
this.currentRow.length !== this.fields.length
176+
) {
177+
// Don't return the partial row. Hold onto it for the next iteration.
178+
this.rows = this.rows.splice(-1);
179+
} else {
180+
this.rows = [[]];
181+
}
182+
183+
return rowsToReturn;
184+
};
160185

186+
/**
187+
* Transforms values into JSON format.
188+
*/
189+
RowBuilder.prototype.toJSON = function(rows) {
161190
var fields = this.fields;
162191

163-
return this.rows.map(function(values) {
192+
return rows.map(function(values) {
164193
var formattedRow = [];
165194
var serializedRow = {};
166195

@@ -169,7 +198,7 @@ RowBuilder.prototype.toJSON = function() {
169198

170199
var column = {
171200
name: field.name,
172-
value: RowBuilder.formatValue(field.type, value),
201+
value: RowBuilder.formatValue(field, value),
173202
};
174203

175204
formattedRow.push(column);

handwritten/spanner/test/codec.js

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,11 @@ describe('codec', function() {
213213
};
214214
});
215215

216-
it('should return the value from the common decoder', function() {
216+
it('should return the same value if not a special type', function() {
217217
var value = {};
218-
var defaultDecodedValue = {};
219-
220-
FakeGrpcService.decodeValue_ = function(value_) {
221-
assert.strictEqual(value_, value);
222-
return defaultDecodedValue;
223-
};
224218

225219
var decoded = codec.decode(value, BYPASS_FIELD);
226-
assert.strictEqual(decoded, defaultDecodedValue);
220+
assert.strictEqual(decoded, value);
227221
});
228222

229223
it('should return null values as null', function() {

0 commit comments

Comments
 (0)