Skip to content

Commit 62fc0a4

Browse files
alkatrivedirelease-please[bot]surbhigarg92
authored
feat: add support for blind writes (#2065)
* sample: blind write * sample: blind write * refactor * add class mutation * add class mutation * feat: blind-writes * refactor * fix: lint errors * refactor * fix: lint errors * fix: header checks * refactor the blind write * feat: add support for blind writes chore(main): release 7.9.0 (#2053) :robot: I have created a release *beep* *boop* --- * **spanner:** Add support for batchWrite ([#2054](https://togithub.com/googleapis/nodejs-spanner/issues/2054)) ([06aab6e](https://togithub.com/googleapis/nodejs-spanner/commit/06aab6e39bbce9e3786f1ac631c80e8909197e92)) * **deps:** Update dependency google-gax to v4.3.4 ([#2051](https://togithub.com/googleapis/nodejs-spanner/issues/2051)) ([80abf06](https://togithub.com/googleapis/nodejs-spanner/commit/80abf06ba8ef9497318ffc597b83fb63e4408f9c)) * **deps:** Update dependency google-gax to v4.3.5 ([#2055](https://togithub.com/googleapis/nodejs-spanner/issues/2055)) ([702c9b0](https://togithub.com/googleapis/nodejs-spanner/commit/702c9b0f34e6cc34233c5aa52b97601b19f70980)) * **deps:** Update dependency google-gax to v4.3.6 ([#2057](https://togithub.com/googleapis/nodejs-spanner/issues/2057)) ([74ebf1e](https://togithub.com/googleapis/nodejs-spanner/commit/74ebf1e45cddf614c180295f3a761a8f84c5cb32)) * **deps:** Update dependency google-gax to v4.3.7 ([#2068](https://togithub.com/googleapis/nodejs-spanner/issues/2068)) ([28fec6c](https://togithub.com/googleapis/nodejs-spanner/commit/28fec6ca505d78d725efc123950be978e0c84ab7)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please). refactor: blind write method fix: lint errors fix: Retry with timeout (#2071) Use `gaxOptions.timeout` during retry in streaming calls. Earlier the timeout value was only used for a single RPC not for the whole operation including retries. Now if RPC returns `Unavailable` error and the timeout value has been reached, library will throw an Deadline exceeded error. ``` const query = { sql: 'Select 1', gaxOptions: {timeout: 500} } const [rows] = await database.run(query); ``` chore(main): release 7.9.1 (#2072) :robot: I have created a release *beep* *boop* --- * Retry with timeout ([#2071](https://togithub.com/googleapis/nodejs-spanner/issues/2071)) ([a943257](https://togithub.com/googleapis/nodejs-spanner/commit/a943257a0402b26fd80196057a9724fd28fc5c1b)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please). refactor: blind write method test: unit test for blind write test: unit test for blind write refactor fix: lint errors feat: add support for change streams transaction exclusion option for Batch Write (#2070) * feat: change stream transaction exclusion option for Batch Write * refactor docs: add doc to blindWrite method docs: add doc to the setQueuedMutations refactor: doc setQueuedMutations fix: presubmit error fix(deps): update dependency google-gax to v4.3.8 (#2077) [![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [google-gax](https://togithub.com/googleapis/gax-nodejs) ([source](https://togithub.com/googleapis/gax-nodejs/tree/HEAD/gax)) | [`4.3.7` -> `4.3.8`](https://renovatebot.com/diffs/npm/google-gax/4.3.7/4.3.8) | [![age](https://developer.mend.io/api/mc/badges/age/npm/google-gax/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/npm/google-gax/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/npm/google-gax/4.3.7/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/npm/google-gax/4.3.7/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | --- <details> <summary>googleapis/gax-nodejs (google-gax)</summary> [Compare Source](https://togithub.com/googleapis/gax-nodejs/compare/google-gax-v4.3.7...google-gax-v4.3.8) - **deps:** remove rimraf in favor of native node rm function ([#&#8203;1626](https://togithub.com/googleapis/gax-nodejs/issues/1626)) ([dd87646](https://togithub.com/googleapis/gax-nodejs/commit/dd87646618d5026549920e224df7f85cbb5ff6a8)) </details> --- 📅 **Schedule**: Branch creation - "after 9am and before 3pm" (UTC), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR is behind base branch, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box --- This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://developer.mend.io/github/googleapis/nodejs-spanner). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzNy40MjUuMSIsInVwZGF0ZWRJblZlciI6IjM3LjQyNS4xIiwidGFyZ2V0QnJhbmNoIjoibWFpbiIsImxhYmVscyI6W119--> updated updated lint refactor * fix: presubmit error * refactor: docs of the method writeAtLeastOnce * test: unit test using mockspanner * fix: lint errors * docs refactor * refactor * refactor --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: surbhigarg92 <surbhigarg.92@gmail.com>
1 parent b91e284 commit 62fc0a4

File tree

5 files changed

+363
-1
lines changed

5 files changed

+363
-1
lines changed

src/database.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,11 @@ import {
6161
import {CreateTableCallback, CreateTableResponse, Table} from './table';
6262
import {
6363
BatchWriteOptions,
64+
CommitCallback,
65+
CommitResponse,
6466
ExecuteSqlRequest,
6567
MutationGroup,
68+
MutationSet,
6669
RunCallback,
6770
RunResponse,
6871
RunUpdateCallback,
@@ -3346,6 +3349,95 @@ class Database extends common.GrpcServiceObject {
33463349
return proxyStream as NodeJS.ReadableStream;
33473350
}
33483351

3352+
/**
3353+
* Write mutations using a single RPC invocation without replay protection.
3354+
*
3355+
* writeAtLeastOnce writes mutations to Spanner using a single Commit RPC.
3356+
* These requests are not replay protected, meaning that it may apply mutations more
3357+
* than once, if the mutations are not idempotent, this may lead to a failure being
3358+
* reported when the mutation was applied once. Replays non-idempotent mutations may
3359+
* have undesirable effects. For example, replays of an insert mutation may produce an
3360+
* already exists error. For this reason, most users of the library will prefer to use
3361+
* {@link runTransaction} instead.
3362+
*
3363+
* However, {@link writeAtLeastOnce()} requires only a single RPC, whereas {@link runTransaction()}
3364+
* requires two RPCs (one of which may be performed in advance), and so this method may be
3365+
* appropriate for latency sensitive and/or high throughput blind writing.
3366+
*
3367+
* We recommend structuring your mutation set to be idempotent to avoid this issue.
3368+
*
3369+
* @param {MutationSet} [mutations] Set of Mutations to be applied.
3370+
* @param {CallOptions} [options] Options object for blind write request.
3371+
* @param {CommitCallback} [callback] Callback function for blind write request.
3372+
*
3373+
* @returns {Promise}
3374+
*
3375+
* @example
3376+
* ```
3377+
* const {Spanner} = require('@google-cloud/spanner');
3378+
* const spanner = new Spanner();
3379+
*
3380+
* const instance = spanner.instance('my-instance');
3381+
* const database = instance.database('my-database');
3382+
* const mutations = new MutationSet();
3383+
* mutations.upsert('Singers', {
3384+
* SingerId: 1,
3385+
* FirstName: 'Scarlet',
3386+
* LastName: 'Terry',
3387+
* });
3388+
* mutations.upsert('Singers', {
3389+
* SingerId: 2,
3390+
* FirstName: 'Marc',
3391+
* LastName: 'Richards',
3392+
* });
3393+
*
3394+
* try {
3395+
* const [response, err] = await database.writeAtLeastOnce(mutations, {});
3396+
* console.log(response.commitTimestamp);
3397+
* } catch(err) {
3398+
* console.log("Error: ", err);
3399+
* }
3400+
* ```
3401+
*/
3402+
writeAtLeastOnce(mutations: MutationSet): Promise<CommitResponse>;
3403+
writeAtLeastOnce(
3404+
mutations: MutationSet,
3405+
options: CallOptions
3406+
): Promise<CommitResponse>;
3407+
writeAtLeastOnce(mutations: MutationSet, callback: CommitCallback): void;
3408+
writeAtLeastOnce(
3409+
mutations: MutationSet,
3410+
options: CallOptions,
3411+
callback: CommitCallback
3412+
): void;
3413+
writeAtLeastOnce(
3414+
mutations: MutationSet,
3415+
optionsOrCallback?: CallOptions | CommitCallback,
3416+
callback?: CommitCallback
3417+
): void | Promise<CommitResponse> {
3418+
const cb =
3419+
typeof optionsOrCallback === 'function'
3420+
? (optionsOrCallback as CommitCallback)
3421+
: callback;
3422+
const options =
3423+
typeof optionsOrCallback === 'object' && optionsOrCallback
3424+
? (optionsOrCallback as CallOptions)
3425+
: {};
3426+
this.pool_.getSession((err, session?, transaction?) => {
3427+
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
3428+
this.writeAtLeastOnce(mutations, options, cb!);
3429+
return;
3430+
}
3431+
if (err) {
3432+
cb!(err as grpc.ServiceError);
3433+
return;
3434+
}
3435+
this._releaseOnEnd(session!, transaction!);
3436+
transaction?.setQueuedMutations(mutations.proto());
3437+
return transaction?.commit(options, cb!);
3438+
});
3439+
}
3440+
33493441
/**
33503442
* Create a Session object.
33513443
*
@@ -3674,6 +3766,7 @@ callbackifyAll(Database, {
36743766
'batchCreateSessions',
36753767
'batchTransaction',
36763768
'batchWriteAtLeastOnce',
3769+
'writeAtLeastOnce',
36773770
'close',
36783771
'createBatchTransaction',
36793772
'createSession',

src/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ import {SessionPool} from './session-pool';
7272
import {Table} from './table';
7373
import {
7474
MutationGroup,
75+
MutationSet,
7576
PartitionedDml,
7677
Snapshot,
7778
Transaction,
@@ -2025,6 +2026,15 @@ export {Transaction};
20252026
*/
20262027
export {MutationGroup};
20272028

2029+
/**
2030+
* {@link MutationSet} class.
2031+
*
2032+
* @name Spanner.MutationSet
2033+
* @see MutationSet
2034+
* @type {Constructor}
2035+
*/
2036+
export {MutationSet};
2037+
20282038
/**
20292039
* @type {object}
20302040
* @property {constructor} DatabaseAdminClient

src/transaction.ts

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1854,6 +1854,17 @@ export class Transaction extends Dml {
18541854
return undefined;
18551855
}
18561856

1857+
/**
1858+
* This method updates the _queuedMutations property of the transaction.
1859+
*
1860+
* @public
1861+
*
1862+
* @param {spannerClient.spanner.v1.Mutation[]} [mutation]
1863+
*/
1864+
setQueuedMutations(mutation: spannerClient.spanner.v1.Mutation[]): void {
1865+
this._queuedMutations = mutation;
1866+
}
1867+
18571868
/**
18581869
* @typedef {object} CommitOptions
18591870
* @property {google.spanner.v1.IRequestOptions} requestOptions The request options to include
@@ -2530,6 +2541,110 @@ function buildDeleteMutation(
25302541
return mutation as spannerClient.spanner.v1.Mutation;
25312542
}
25322543

2544+
/**
2545+
* MutationSet represent a set of changes to be applied atomically to a Cloud Spanner
2546+
* database with a {@link Transaction}.
2547+
* Mutations are used to insert, update, upsert(insert or update), replace, or
2548+
* delete rows within tables.
2549+
*
2550+
* Mutations are added to a {@link Transaction} and are not executed until the
2551+
* transaction is committed via {@link Transaction#commit}.
2552+
*
2553+
* If the transaction is rolled back or encounters an error, the mutations are
2554+
* discarded.
2555+
*
2556+
* @example
2557+
* ```
2558+
* const {Spanner, Mutation} = require('@google-cloud/spanner');
2559+
* const spanner = new Spanner();
2560+
*
2561+
* const instance = spanner.instance('my-instance');
2562+
* const database = instance.database('my-database');
2563+
*
2564+
* const mutations = new MutationSet();
2565+
* mutations.insert('Singers', {SingerId: '123', FirstName: 'David'});
2566+
* mutations.update('Singers', {SingerId: '123', FirstName: 'Marc'});
2567+
*
2568+
* try {
2569+
* database.writeAtLeastOnce(mutations, (err, res) => {
2570+
* console.log("RESPONSE: ", res);
2571+
* });
2572+
* } catch(err) {
2573+
* console.log("ERROR: ", err);
2574+
* }
2575+
* ```
2576+
*/
2577+
export class MutationSet {
2578+
/**
2579+
* An array to store the mutations.
2580+
*/
2581+
private _queuedMutations: spannerClient.spanner.v1.Mutation[];
2582+
2583+
/**
2584+
* Creates a new Mutation object.
2585+
*/
2586+
constructor() {
2587+
this._queuedMutations = [];
2588+
}
2589+
2590+
/**
2591+
* Adds an insert operation to the mutation set.
2592+
* @param {string} table. The name of the table to insert into.
2593+
* @param {object|object[]} rows. A single row object or an array of row objects to insert.
2594+
*/
2595+
insert(table: string, rows: object | object[]): void {
2596+
this._queuedMutations.push(buildMutation('insert', table, rows));
2597+
}
2598+
2599+
/**
2600+
* Adds an update operation to the mutation set.
2601+
* @param {string} table. The name of the table to update.
2602+
* @param {object|object[]} rows. A single row object or an array of row objects to update.
2603+
* Each row object must contain the primary key values to indentify the row to update.
2604+
*/
2605+
update(table: string, rows: object | object[]): void {
2606+
this._queuedMutations.push(buildMutation('update', table, rows));
2607+
}
2608+
2609+
/**
2610+
* Adds an upsert operation to the mutation set.
2611+
* An upsert will insert a new row if it does not exist or update an existing row if it does.
2612+
* @param {string} table. The name of the table to upsert.
2613+
* @param {object|object[]} rows. A single row object or an array of row objects to upsert.
2614+
*/
2615+
upsert(table: string, rows: object | object[]): void {
2616+
this._queuedMutations.push(buildMutation('insertOrUpdate', table, rows));
2617+
}
2618+
2619+
/**
2620+
* Adds a replace operation to the mutation set.
2621+
* A replace operation deletes the existing row (if it exists) and inserts the new row.
2622+
* @param {string} table. The name of the table to replace.
2623+
* @param {object|object[]} rows. A single row object or an array of row objects to replace.
2624+
*/
2625+
replace(table: string, rows: object | object[]): void {
2626+
this._queuedMutations.push(buildMutation('replace', table, rows));
2627+
}
2628+
2629+
/**
2630+
* Adds a deleteRows operation to the mutation set.
2631+
* This operation deletes rows from the specified table based on their primary keys.
2632+
* @param {string} table. The name of the table to deleteRows from.
2633+
* @param {key[]} key. An array of key objects, each represeting the primary key of a row to delete.
2634+
*/
2635+
deleteRows(table: string, keys: Key[]): void {
2636+
this._queuedMutations.push(buildDeleteMutation(table, keys));
2637+
}
2638+
2639+
/**
2640+
* Returns the internal representation of the queued mutations as a protobuf message.
2641+
* @returns {spannerClient.spanner.v1.Mutation[]}. The protobuf message representing the mutations.
2642+
*/
2643+
proto(): spannerClient.spanner.v1.Mutation[] {
2644+
return this._queuedMutations;
2645+
}
2646+
}
2647+
25332648
/**
25342649
* A group of mutations to be committed together.
25352650
* Related mutations should be placed in a group.

0 commit comments

Comments
 (0)