Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/system-tests-against-emulator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@
# working-directory: handwritten/spanner
# env:
# SPANNER_EMULATOR_HOST: localhost:9010
# GCLOUD_PROJECT: emulator-test-project
# GCLOUD_PROJECT: emulator-test-project
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*!
* Copyright 2026 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {ServerDuplexStream, status} from '@grpc/grpc-js';
import {Spanner} from '../../src';
import {trace, context, Tracer} from '@opentelemetry/api';
import * as protos from '../../protos/protos';
import {CloudUtil} from './cloud-util';
import {OutcomeSender, IExecutionFlowContext} from './cloud-executor';
import spanner = protos.google.spanner;
import SpannerAsyncActionRequest = spanner.executor.v1.SpannerAsyncActionRequest;
import SpannerAsyncActionResponse = spanner.executor.v1.SpannerAsyncActionResponse;
import ISpannerAction = spanner.executor.v1.ISpannerAction;
import IAdminAction = spanner.executor.v1.IAdminAction;
import ICreateCloudInstanceAction = spanner.executor.v1.ICreateCloudInstanceAction;

/**
* Context for a single stream connection.
*/
export class ExecutionFlowContext implements IExecutionFlowContext {
private call: ServerDuplexStream<
SpannerAsyncActionRequest,
SpannerAsyncActionResponse
>;

constructor(
call: ServerDuplexStream<
SpannerAsyncActionRequest,
SpannerAsyncActionResponse
>,
) {
this.call = call;
}

/**
* Sends a response back to the client.
*/
public onNext(response: SpannerAsyncActionResponse): void {
const stream = this.call as any;

// Prevent writing if client cancelled the call, or the underlying Node stream is un-writable/destroyed
if (this.call.cancelled || stream.destroyed || stream.writable === false) {
console.warn('Attempted to write to a closed or cancelled stream.');
return;
}

this.call.write(response);
}

/**
* Sends an error back to the client.
*/
public onError(error: Error): void {
const stream = this.call as any;

if (this.call.cancelled || stream.destroyed || stream.writable === false) {
console.warn(
'Attempted to emit error to a closed or cancelled stream.',
error,
);
return;
}

this.call.emit('error', error);
}

/**
* Clean up resources associated with the context.
*/
public cleanup(): void {
console.log('Cleaning up ExecutionFlowContext');
}
}

export class CloudClientExecutor {
private spanner: Spanner;
private tracer: Tracer;

constructor() {
const spannerOptions = CloudUtil.getSpannerOptions();
this.spanner = new Spanner(spannerOptions);
this.tracer = trace.getTracer(CloudClientExecutor.name);
}

/**
* Creates a new ExecutionFlowContext for a stream.
*/
public createExecutionFlowContext(
call: ServerDuplexStream<
SpannerAsyncActionRequest,
SpannerAsyncActionResponse
>,
): ExecutionFlowContext {
return new ExecutionFlowContext(call);
}

/**
* Starts handling a SpannerAsyncActionRequest.
*/
public startHandlingRequest(
req: SpannerAsyncActionRequest,
executionContext: ExecutionFlowContext,
): {code: number; details: string} {
const outcomeSender = new OutcomeSender(req.actionId!, executionContext);

if (!req.action) {
return outcomeSender.finishWithError({
code: status.INVALID_ARGUMENT,
message: 'Invalid request: No action present',
});
}
this.executeAction(outcomeSender, req.action).catch(err => {
console.error('Unhandled exception in action execution:', err);
outcomeSender.finishWithError(err);
});

return {code: status.OK, details: ''};
}

/**
* Determines the specific Spanner action type and routes it to the appropriate handler.
*/
private async executeAction(
outcomeSender: OutcomeSender,
action: ISpannerAction,
): Promise<void> {
const actionType =
Object.keys(action).find(
k => action[k as keyof typeof action] !== undefined,
) || 'unknown';
const span = this.tracer.startSpan(`performaction_${actionType}`);

return context.with(trace.setSpan(context.active(), span), async () => {
try {
if (action.admin) {
await this.executeAdminAction(action.admin, outcomeSender);
return;
}

outcomeSender.finishWithError({
code: status.UNIMPLEMENTED,
message: `Action ${actionType} not implemented yet`,
});
} catch (e: any) {
span.recordException(e);
console.error('Unexpected error:', e);
outcomeSender.finishWithError({
code: status.INVALID_ARGUMENT,
message: `Unexpected error: ${e.message}`,
});
} finally {
span.end();
}
});
}

private async executeAdminAction(
action: IAdminAction,
sender: OutcomeSender,
): Promise<void> {
try {
if (action.createCloudInstance) {
await this.executeCreateCloudInstance(
action.createCloudInstance,
sender,
);
return;
}
sender.finishWithError({
code: status.UNIMPLEMENTED,
message: 'Admin action not implemented',
});
} catch (e: any) {
sender.finishWithError(e);
}
}

private async executeCreateCloudInstance(
action: ICreateCloudInstanceAction,
sender: OutcomeSender,
): Promise<void> {
try {
console.log(`Creating instance: \n${JSON.stringify(action, null, 2)}`);

const instanceId = action.instanceId!;
const projectId = action.projectId!;
const configId = action.instanceConfigId!;

const instanceAdminClient = this.spanner.getInstanceAdminClient();

const [operation] = await instanceAdminClient.createInstance({
parent: instanceAdminClient.projectPath(projectId),
instanceId: instanceId,
instance: {
config: instanceAdminClient.instanceConfigPath(projectId, configId),
displayName: instanceId,
nodeCount: action.nodeCount || 1,
processingUnits: action.processingUnits,
labels: action.labels || {},
},
});

console.log('Waiting for instance creation operation to complete...');

await operation.promise();

console.log(`Instance ${instanceId} created successfully.`);

sender.finishWithOK();
} catch (err: any) {
if (err.code === status.ALREADY_EXISTS) {
console.log('Instance already exists, returning OK.');
sender.finishWithOK();
return;
}
console.error('Failed to create instance:', err);
sender.finishWithError(err);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*!
* Copyright 2026 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {ServerDuplexStream, status, ServiceError} from '@grpc/grpc-js';
import {trace, context, Tracer} from '@opentelemetry/api';
import {CloudClientExecutor} from './cloud-client-executor';
import * as protos from '../../protos/protos';
import spanner = protos.google.spanner;
import SpannerAsyncActionRequest = spanner.executor.v1.SpannerAsyncActionRequest;
import SpannerAsyncActionResponse = spanner.executor.v1.SpannerAsyncActionResponse;

const MAX_CLOUD_TRACE_CHECK_LIMIT = 20;

/**
* Implements the SpannerExecutorProxy service, which handles asynchronous
* Spanner actions via a bidirectional gRPC stream.
*/
export class CloudExecutorImpl {
private clientExecutor: CloudClientExecutor;
private tracer: Tracer;

constructor() {
this.clientExecutor = new CloudClientExecutor();

this.tracer = trace.getTracer(CloudClientExecutor.name);
}

/**
* Handles incoming SpannerAsyncActionRequest messages from the client.
*/
public executeActionAsync(
call: ServerDuplexStream<
SpannerAsyncActionRequest,
SpannerAsyncActionResponse
>,
): void {
// Create a top-level OpenTelemetry span for streaming request.
const span = this.tracer.startSpan(
'nodejs_systest_execute_actions_stream',
{
root: true,
},
);

const streamContext = trace.setSpan(context.active(), span);

// The executionContext manages the lifecycle and flow state for this specific gRPC stream context.
const executionContext =
this.clientExecutor.createExecutionFlowContext(call);

// Handle receiving requests on duplex stream
// Handle incoming requests sequentially on the duplex stream.
call.on('data', (request: SpannerAsyncActionRequest) => {
context.with(streamContext, () => {
console.log(`Receiving request: \n${JSON.stringify(request, null, 2)}`);

// Ensure nested properties exist before attempting to inject configuration overrides.
if (!request.action) request.action = {};
if (!request.action.spannerOptions) request.action.spannerOptions = {};
if (!request.action.spannerOptions.sessionPoolOptions)
request.action.spannerOptions.sessionPoolOptions = {};

// as the multiplexed session is defualt enabled, mutate the incoming request to forcefully enforce multiplexed sessions for this proxy execution.
request.action.spannerOptions.sessionPoolOptions.useMultiplexed = true;

console.log(
`Updated request to set multiplexed session flag: \n${JSON.stringify(request, null, 2)}`,
);

// TODO: Set requestHasReadOrQueryAction flag here when Read/Query are implemented.

try {
const reqStatus = this.clientExecutor.startHandlingRequest(
request,
executionContext,
);
if (reqStatus.code !== status.OK) {
console.error(
`Failed to handle request, half closed: ${reqStatus.details}`,
);
}
} catch (err) {
console.error('Exception when handling request', err);
}
});
});

// Handle stream errors
call.on('error', (err: Error) => {
context.with(streamContext, () => {
console.error('Client ends the stream with error.', err);
span.recordException(err);
span.end();
executionContext.cleanup();
});
});

// Handle the completion of the client stream
call.on('end', async () => {
await context.with(streamContext, async () => {
span.end();
// TODO: Add End-to-End trace verification here once Read/Query actions are implemented.
console.log('Client called Done, half closed');
executionContext.cleanup();

call.end();
});
});
}
}
Loading
Loading