Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Mutation.WriteBuilder;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadContext;
Expand Down Expand Up @@ -128,6 +129,8 @@
import com.google.spanner.executor.v1.MutationAction.Mod;
import com.google.spanner.executor.v1.MutationAction.UpdateArgs;
import com.google.spanner.executor.v1.OperationResponse;
import com.google.spanner.executor.v1.PartitionedUpdateAction;
import com.google.spanner.executor.v1.PartitionedUpdateAction.ExecutePartitionedUpdateOptions;
import com.google.spanner.executor.v1.QueryAction;
import com.google.spanner.executor.v1.ReadAction;
import com.google.spanner.executor.v1.RestoreCloudDatabaseAction;
Expand Down Expand Up @@ -886,6 +889,13 @@ private Status executeAction(
} else if (action.hasExecutePartition()) {
return executeExecutePartition(
action.getExecutePartition(), outcomeSender, executionContext);
} else if (action.hasPartitionedUpdate()) {
if (dbPath == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Database path must be set for this action");
}
DatabaseClient dbClient = getClient().getDatabaseClient(DatabaseId.of(dbPath));
return executePartitionedUpdate(action.getPartitionedUpdate(), dbClient, outcomeSender);
} else if (action.hasCloseBatchTxn()) {
return executeCloseBatchTxn(action.getCloseBatchTxn(), outcomeSender, executionContext);
} else if (action.hasExecuteChangeStreamQuery()) {
Expand Down Expand Up @@ -1974,6 +1984,33 @@ private Status executeExecutePartition(
}
}

/** Execute a partitioned update which runs different partitions in parallel. */
private Status executePartitionedUpdate(
PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) {
try {
ExecutePartitionedUpdateOptions options = action.getOptions();
Long count =
dbClient.executePartitionedUpdate(
Statement.of(action.getUpdate().getSql()),
Options.tag(options.getTag()),
Options.priority(RpcPriority.fromProto(options.getRpcPriority())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query - What's the behaviour when null is passed into this method?

Copy link
Contributor Author

@rajatbhatta rajatbhatta Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RpcPriority.fromProto(null) will return RpcPriority.UNSPECIFIED. Added a unit test for this too.

SpannerActionOutcome outcome =
SpannerActionOutcome.newBuilder()
.setStatus(toProto(Status.OK))
.addDmlRowsModified(count)
.build();
sender.sendOutcome(outcome);
return sender.finishWithOK();
} catch (SpannerException e) {
return sender.finishWithError(toStatus(e));
} catch (Exception e) {
return sender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
}
}

/** Build a child partition record proto out of childPartitionRecord returned by client. */
private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRecord)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public enum RpcPriority {
RpcPriority(Priority proto) {
this.proto = Preconditions.checkNotNull(proto);
}

public static RpcPriority fromProto(Priority proto) {
for (RpcPriority e : RpcPriority.values()) {
if (e.proto.equals(proto)) return e;
}
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's preferred to not return nulls and instead return some modelled type like UNRECOGNIZED/UNKNOWN/UNSPECIFIED. With nulls, the client does not have a good way to know underlying reason i.e invalid enum type being the actual issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a RpcPriority.UNSPECIFIED to enum to map to Priority.PRIORITY_UNSPECIFIED. If a null value is passed in, we would return RpcPriority.UNSPECIFIED.

}
}

/** Marker interface to mark options applicable to both Read and Query operations */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -300,6 +301,14 @@ public void testUpdateOptionsPriority() {
assertEquals("priority: " + priority + " ", options.toString());
}

@Test
public void testRpcPriorityEnumFromProto() {
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_LOW), RpcPriority.LOW);
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_MEDIUM), RpcPriority.MEDIUM);
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_HIGH), RpcPriority.HIGH);
assertNull(RpcPriority.fromProto(Priority.PRIORITY_UNSPECIFIED));
}

@Test
public void testTransactionOptionsHashCode() {
Options option1 = Options.fromTransactionOptions();
Expand Down