Skip to content

Commit 269ade8

Browse files
fix(table): add retries for insert partial failures (#589)
* fix(table): add retries for insert partial failures * refactoring * moar refactors * fix(table): working test cases for insert partial failure and retries (#589) * chore(table): explicit insert/create recursion, elaborated on method signatures/types * chore(table): rename insert option maxAttempts to partialRetries, allow retry of 0, do not allow negative retries #589 * test(table): remove sinon fake timers hack now that v9 is available #655 Co-authored-by: Andrew Zammit <zammit.andrew@gmail.com>
1 parent 94af332 commit 269ade8

4 files changed

Lines changed: 504 additions & 243 deletions

File tree

handwritten/bigquery/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
"mocha": "^7.0.0",
8686
"mv": "^2.1.1",
8787
"ncp": "^2.0.0",
88+
"p-reflect": "^2.1.0",
8889
"proxyquire": "^2.1.0",
8990
"sinon": "^9.0.0",
9091
"tmp": "0.1.0",

handwritten/bigquery/src/table.ts

Lines changed: 140 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export type RowMetadata = any;
7373

7474
export type InsertRowsOptions = bigquery.ITableDataInsertAllRequest & {
7575
createInsertId?: boolean;
76+
partialRetries?: number;
7677
raw?: boolean;
7778
schema?: string | {};
7879
};
@@ -136,6 +137,12 @@ export type FormattedMetadata = bigquery.ITable;
136137
export type TableSchema = bigquery.ITableSchema;
137138
export type TableField = bigquery.ITableFieldSchema;
138139

140+
export interface PartialInsertFailure {
141+
message: string;
142+
reason: string;
143+
row: RowMetadata;
144+
}
145+
139146
/**
140147
* The file formats accepted by BigQuery.
141148
*
@@ -1733,6 +1740,11 @@ class Table extends common.ServiceObject {
17331740
* If you need to create an entire table from a file, consider using
17341741
* {@link Table#load} instead.
17351742
*
1743+
* Note, if a table was recently created, inserts may fail until the table
1744+
* is consistent within BigQuery. If a `schema` is supplied, this method will
1745+
* automatically retry those failed inserts, and it will even create the
1746+
* table with the provided schema if it does not exist.
1747+
*
17361748
* @see [Tabledata: insertAll API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll}
17371749
* @see [Streaming Insert Limits]{@link https://cloud.google.com/bigquery/quotas#streaming_inserts}
17381750
* @see [Troubleshooting Errors]{@link https://developers.google.com/bigquery/troubleshooting-errors}
@@ -1743,12 +1755,15 @@ class Table extends common.ServiceObject {
17431755
* default row id when one is not provided.
17441756
* @param {boolean} [options.ignoreUnknownValues=false] Accept rows that contain
17451757
* values that do not match the schema. The unknown values are ignored.
1758+
* @param {number} [options.partialRetries=3] Number of times to retry
1759+
* inserting rows for cases of partial failures.
17461760
* @param {boolean} [options.raw] If `true`, the `rows` argument is expected to
17471761
* be formatted as according to the
17481762
* [specification](https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll).
1749-
* @param {string|object} [options.schema] If provided will atomatically create
1750-
* a table if it doesn't already exist. Note that this can take longer
1751-
* than 2 minutes to complete. A comma-separated list of name:type pairs.
1763+
* @param {string|object} [options.schema] If provided will automatically
1764+
* create a table if it doesn't already exist. Note that this can take
1765+
* longer than 2 minutes to complete. A comma-separated list of
1766+
* name:type pairs.
17521767
* Valid types are "string", "integer", "float", "boolean", and
17531768
* "timestamp". If the type is omitted, it is assumed to be "string".
17541769
* Example: "name:string, age:integer". Schemas can also be specified as a
@@ -1870,6 +1885,102 @@ class Table extends common.ServiceObject {
18701885
? optionsOrCallback
18711886
: (cb as InsertRowsCallback);
18721887

1888+
this._insertAndCreateTable(rows, options).then(
1889+
resp => callback(null, resp),
1890+
err => callback(err, null)
1891+
);
1892+
}
1893+
1894+
/**
1895+
* Insert rows with retries, but will create the table if not exists.
1896+
*
1897+
* @param {RowMetadata | RowMetadata[]} rows
1898+
* @param {InsertRowsOptions} options
1899+
* @returns {Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable>}
1900+
* @private
1901+
*/
1902+
private async _insertAndCreateTable(
1903+
rows: RowMetadata | RowMetadata[],
1904+
options: InsertRowsOptions
1905+
): Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable> {
1906+
const {schema} = options;
1907+
const delay = 60000;
1908+
1909+
try {
1910+
return await this._insertWithRetry(rows, options);
1911+
} catch (err) {
1912+
if ((err as common.ApiError).code !== 404 || !schema) {
1913+
throw err;
1914+
}
1915+
}
1916+
1917+
try {
1918+
await this.create({schema});
1919+
} catch (err) {
1920+
if ((err as common.ApiError).code !== 409) {
1921+
throw err;
1922+
}
1923+
}
1924+
1925+
// table creation after failed access is subject to failure caching and
1926+
// eventual consistency, see:
1927+
// https://github.com/googleapis/google-cloud-python/issues/4553#issuecomment-350110292
1928+
await new Promise(resolve => setTimeout(resolve, delay));
1929+
return this._insertAndCreateTable(rows, options);
1930+
}
1931+
1932+
/**
1933+
* This method will attempt to insert rows while retrying any partial failures
1934+
* that occur along the way. Because partial insert failures are returned
1935+
* differently, we can't depend on our usual retry strategy.
1936+
*
1937+
* @private
1938+
*
1939+
* @param {RowMetadata|RowMetadata[]} rows The rows to insert.
1940+
* @param {InsertRowsOptions} options Insert options.
1941+
* @returns {Promise<bigquery.ITableDataInsertAllResponse>}
1942+
*/
1943+
private async _insertWithRetry(
1944+
rows: RowMetadata | RowMetadata[],
1945+
options: InsertRowsOptions
1946+
): Promise<bigquery.ITableDataInsertAllResponse> {
1947+
const {partialRetries = 3} = options;
1948+
let error: Error;
1949+
1950+
const maxAttempts = Math.max(partialRetries, 0) + 1;
1951+
1952+
for (let attempts = 0; attempts < maxAttempts; attempts++) {
1953+
try {
1954+
return await this._insert(rows, options);
1955+
} catch (e) {
1956+
error = e;
1957+
rows = ((e.errors || []) as PartialInsertFailure[])
1958+
.filter(err => !!err.row)
1959+
.map(err => err.row);
1960+
1961+
if (!rows.length) {
1962+
break;
1963+
}
1964+
}
1965+
}
1966+
1967+
throw error!;
1968+
}
1969+
1970+
/**
1971+
* This method does the bulk of the work for processing options and making the
1972+
* network request.
1973+
*
1974+
* @private
1975+
*
1976+
* @param {RowMetadata|RowMetadata[]} rows The rows to insert.
1977+
* @param {InsertRowsOptions} options Insert options.
1978+
* @returns {Promise<bigquery.ITableDataInsertAllResponse>}
1979+
*/
1980+
private async _insert(
1981+
rows: RowMetadata | RowMetadata[],
1982+
options: InsertRowsOptions
1983+
): Promise<bigquery.ITableDataInsertAllResponse> {
18731984
rows = arrify(rows) as RowMetadata[];
18741985

18751986
if (!rows.length) {
@@ -1893,74 +2004,39 @@ class Table extends common.ServiceObject {
18932004
}
18942005

18952006
delete json.createInsertId;
2007+
delete json.partialRetries;
18962008
delete json.raw;
2009+
delete json.schema;
18972010

1898-
let schema: string | {};
1899-
1900-
if (options.schema) {
1901-
schema = options.schema;
1902-
delete json.schema;
1903-
}
1904-
1905-
const createTableAndRetry = () => {
1906-
this.create(
1907-
{
1908-
schema,
1909-
},
1910-
(err, table, resp) => {
1911-
if (err && err.code !== 409) {
1912-
callback!(err, resp);
1913-
return;
1914-
}
1915-
1916-
setTimeout(() => {
1917-
this.insert(rows, options, callback!);
1918-
}, 60000);
1919-
}
1920-
);
1921-
};
1922-
1923-
this.request(
1924-
{
1925-
method: 'POST',
1926-
uri: '/insertAll',
1927-
json,
1928-
},
1929-
(err, resp) => {
1930-
if (err) {
1931-
if ((err as common.ApiError).code === 404 && schema) {
1932-
setTimeout(createTableAndRetry, Math.random() * 60000);
1933-
} else {
1934-
callback!(err, resp);
1935-
}
1936-
return;
1937-
}
2011+
const [resp] = await this.request({
2012+
method: 'POST',
2013+
uri: '/insertAll',
2014+
json,
2015+
});
19382016

1939-
const partialFailures = (resp.insertErrors || []).map(
1940-
(insertError: GoogleErrorBody) => {
2017+
const partialFailures = (resp.insertErrors || []).map(
2018+
(insertError: GoogleErrorBody) => {
2019+
return {
2020+
errors: insertError.errors!.map(error => {
19412021
return {
1942-
errors: insertError.errors!.map(error => {
1943-
return {
1944-
message: error.message,
1945-
reason: error.reason,
1946-
};
1947-
}),
1948-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
1949-
row: rows[(insertError as any).index],
2022+
message: error.message,
2023+
reason: error.reason,
19502024
};
1951-
}
1952-
);
1953-
1954-
if (partialFailures.length > 0) {
1955-
err = new common.util.PartialFailureError({
1956-
errors: partialFailures,
1957-
response: resp,
1958-
} as GoogleErrorBody);
1959-
}
1960-
1961-
callback!(err, resp);
2025+
}),
2026+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
2027+
row: rows[(insertError as any).index],
2028+
};
19622029
}
19632030
);
2031+
2032+
if (partialFailures.length > 0) {
2033+
throw new common.util.PartialFailureError({
2034+
errors: partialFailures,
2035+
response: resp,
2036+
} as GoogleErrorBody);
2037+
}
2038+
2039+
return resp;
19642040
}
19652041

19662042
load(

handwritten/bigquery/system-test/bigquery.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,6 @@ describe('BigQuery', () => {
975975
};
976976

977977
const options = {
978-
autoCreate: true,
979978
schema: SCHEMA,
980979
};
981980

0 commit comments

Comments
 (0)