diff --git a/src/client-side-metrics/client-side-metrics-attributes.ts b/src/client-side-metrics/client-side-metrics-attributes.ts index f5fbf911d..1946cacdd 100644 --- a/src/client-side-metrics/client-side-metrics-attributes.ts +++ b/src/client-side-metrics/client-side-metrics-attributes.ts @@ -25,6 +25,7 @@ export enum StreamingState { * metrics, allowing for differentiation of performance by method. */ export enum MethodName { + READ_ROW = 'Bigtable.ReadRow', READ_ROWS = 'Bigtable.ReadRows', MUTATE_ROW = 'Bigtable.MutateRow', CHECK_AND_MUTATE_ROW = 'Bigtable.CheckAndMutateRow', diff --git a/src/client-side-metrics/exporter.ts b/src/client-side-metrics/exporter.ts index cf72dadd8..4f373ef36 100644 --- a/src/client-side-metrics/exporter.ts +++ b/src/client-side-metrics/exporter.ts @@ -19,11 +19,10 @@ import { Histogram, ResourceMetrics, } from '@opentelemetry/sdk-metrics'; -import {grpc, ServiceError} from 'google-gax'; +import {ClientOptions, ServiceError} from 'google-gax'; import {MetricServiceClient} from '@google-cloud/monitoring'; import {google} from '@google-cloud/monitoring/build/protos/protos'; import ICreateTimeSeriesRequest = google.monitoring.v3.ICreateTimeSeriesRequest; -import {RetryOptions} from 'google-gax'; export interface ExportResult { code: number; @@ -119,7 +118,7 @@ function getIntegerPoints(dataPoint: DataPoint) { * getResource gets the resource object which is used for building the timeseries * object that will be sent to Google Cloud Monitoring dashboard * - * @param {string} metricName The backend name of the metric that we want to record + * @param {string} projectId The name of the project * @param {DataPoint} dataPoint The datapoint containing the data we wish to * send to the Google Cloud Monitoring dashboard */ @@ -184,6 +183,7 @@ function getMetric( * metric attributes, data points, and aggregation information, into an object * that conforms to the expected request format of the Cloud Monitoring API. * + * @param projectId * @param {ResourceMetrics} exportArgs - The OpenTelemetry metrics data to be converted. This * object contains resource attributes, scope information, and a list of * metrics with their associated data points. @@ -211,14 +211,10 @@ function getMetric( * * */ -export function metricsToRequest(exportArgs: ResourceMetrics) { - type WithSyncAttributes = {_syncAttributes: {[index: string]: string}}; - const resourcesWithSyncAttributes = - exportArgs.resource as unknown as WithSyncAttributes; - const projectId = - resourcesWithSyncAttributes._syncAttributes[ - 'monitored_resource.project_id' - ]; +export function metricsToRequest( + projectId: string, + exportArgs: ResourceMetrics, +) { const timeSeriesArray = []; for (const scopeMetrics of exportArgs.scopeMetrics) { for (const scopeMetric of scopeMetrics.metrics) { @@ -297,49 +293,33 @@ export function metricsToRequest(exportArgs: ResourceMetrics) { * @beta */ export class CloudMonitoringExporter extends MetricExporter { - private monitoringClient = new MetricServiceClient(); + private client: MetricServiceClient; - export( + constructor(options: ClientOptions) { + super(); + if (options && options.apiEndpoint) { + // We want the MetricServiceClient to always hit its default endpoint. + delete options.apiEndpoint; + } + this.client = new MetricServiceClient(options); + } + + async export( metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void, - ): void { + ): Promise { (async () => { try { - const request = metricsToRequest(metrics); - // In order to manage the "One or more points were written more - // frequently than the maximum sampling period configured for the - // metric." error we should have the metric service client retry a few - // times to ensure the metrics do get written. - // - // We use all the usual retry codes plus INVALID_ARGUMENT (code 3) - // because INVALID ARGUMENT (code 3) corresponds to the maximum - // sampling error. - const retry = new RetryOptions( - [ - grpc.status.INVALID_ARGUMENT, - grpc.status.DEADLINE_EXCEEDED, - grpc.status.RESOURCE_EXHAUSTED, - grpc.status.ABORTED, - grpc.status.UNAVAILABLE, - ], - { - initialRetryDelayMillis: 5000, - retryDelayMultiplier: 2, - maxRetryDelayMillis: 50000, - }, - ); - await this.monitoringClient.createTimeSeries( + const projectId = await this.client.getProjectId(); + const request = metricsToRequest(projectId, metrics); + await this.client.createServiceTimeSeries( request as ICreateTimeSeriesRequest, - { - retry, - }, ); // The resultCallback typically accepts a value equal to {code: x} // for some value x along with other info. When the code is equal to 0 // then the operation completed successfully. When the code is not equal - // to 0 then the operation failed. Open telemetry logs errors to the - // console when the resultCallback passes in non-zero code values and - // logs nothing when the code is 0. + // to 0 then the operation failed. The resultCallback will not log + // anything to the console whether the error code was 0 or not. resultCallback({code: 0}); } catch (error) { resultCallback(error as ServiceError); diff --git a/src/client-side-metrics/gcp-metrics-handler.ts b/src/client-side-metrics/gcp-metrics-handler.ts index 37fa4adea..7208d17e0 100644 --- a/src/client-side-metrics/gcp-metrics-handler.ts +++ b/src/client-side-metrics/gcp-metrics-handler.ts @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +import {CloudMonitoringExporter} from './exporter'; import { IMetricsHandler, OnAttemptCompleteData, @@ -20,6 +21,7 @@ import { import * as Resources from '@opentelemetry/resources'; import * as ResourceUtil from '@google-cloud/opentelemetry-resource-util'; import {PushMetricExporter, View} from '@opentelemetry/sdk-metrics'; +import {ClientOptions} from 'google-gax'; const { Aggregation, ExplicitBucketHistogramAggregation, @@ -27,6 +29,31 @@ const { Histogram, PeriodicExportingMetricReader, } = require('@opentelemetry/sdk-metrics'); +import * as os from 'os'; +import * as crypto from 'crypto'; + +/** + * Generates a unique client identifier string. + * + * This function creates a client identifier that incorporates the hostname, + * process ID, and a UUID to ensure uniqueness across different client instances + * and processes. The identifier follows the pattern: + * + * `node--` + * + * where: + * - `` is a randomly generated UUID (version 4). + * - `` is the process ID of the current Node.js process. + * - `` is the hostname of the machine. + * + * @returns {string} A unique client identifier string. + */ +function generateClientUuid() { + const hostname = os.hostname() || 'localhost'; + const currentPid = process.pid || ''; + const uuid4 = crypto.randomUUID(); + return `node-${uuid4}-${currentPid}${hostname}`; +} /** * A collection of OpenTelemetry metric instruments used to record @@ -47,10 +74,9 @@ interface MetricsInstruments { * This method gets the open telemetry instruments that will store GCP metrics * for a particular project. * - * @param projectId The project for which the instruments will be stored. * @param exporter The exporter the metrics will be sent to. */ -function createInstruments(projectId: string, exporter: PushMetricExporter) { +function createInstruments(exporter: PushMetricExporter): MetricsInstruments { const latencyBuckets = [ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, 16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, @@ -80,7 +106,6 @@ function createInstruments(projectId: string, exporter: PushMetricExporter) { views: viewList, resource: new Resources.Resource({ 'service.name': 'Cloud Bigtable Table', - 'monitored_resource.project_id': projectId, }).merge(new ResourceUtil.GcpDetectorSync().detect()), readers: [ // Register the exporter @@ -183,11 +208,8 @@ function createInstruments(projectId: string, exporter: PushMetricExporter) { * associating them with relevant attributes for detailed analysis in Cloud Monitoring. */ export class GCPMetricsHandler implements IMetricsHandler { - private exporter: PushMetricExporter; - // The variable below is the singleton map from projects to instrument stacks - // which exists so that we only create one instrument stack per project. This - // will eliminate errors due to the maximum sampling period. - static instrumentsForProject: {[projectId: string]: MetricsInstruments} = {}; + private otelInstruments: MetricsInstruments; + private clientUid: string; /** * The `GCPMetricsHandler` is responsible for managing and recording @@ -196,33 +218,11 @@ export class GCPMetricsHandler implements IMetricsHandler { * (histograms and counters) and exports them to Google Cloud Monitoring * through the provided `PushMetricExporter`. * - * @param exporter - The `PushMetricExporter` instance to use for exporting - * metrics to Google Cloud Monitoring. This exporter is responsible for - * sending the collected metrics data to the monitoring backend. The provided exporter must be fully configured, for example the projectId must have been set. */ - constructor(exporter: PushMetricExporter) { - this.exporter = exporter; - } - - /** - * Initializes the OpenTelemetry metrics instruments if they haven't been already. - * Creates and registers metric instruments (histograms and counters) for various Bigtable client metrics. - * Sets up a MeterProvider and configures a PeriodicExportingMetricReader for exporting metrics to Cloud Monitoring. - * - * which will be provided to the exporter in every export call. - * - */ - private getInstruments(projectId: string): MetricsInstruments { - // The projectId is needed per metrics handler because when the exporter is - // used it provides the project id for the name of the time series exported. - // ie. name: `projects/${....['monitored_resource.project_id']}`, - if (!GCPMetricsHandler.instrumentsForProject[projectId]) { - GCPMetricsHandler.instrumentsForProject[projectId] = createInstruments( - projectId, - this.exporter, - ); - } - return GCPMetricsHandler.instrumentsForProject[projectId]; + constructor(options: ClientOptions) { + this.clientUid = generateClientUuid(); + const exporter = new CloudMonitoringExporter(options); + this.otelInstruments = createInstruments(exporter); } /** @@ -231,11 +231,11 @@ export class GCPMetricsHandler implements IMetricsHandler { * @param {OnOperationCompleteData} data Data related to the completed operation. */ onOperationComplete(data: OnOperationCompleteData) { - const otelInstruments = this.getInstruments(data.projectId); + const otelInstruments = this.otelInstruments; const commonAttributes = { app_profile: data.metricsCollectorData.app_profile, method: data.metricsCollectorData.method, - client_uid: data.metricsCollectorData.client_uid, + client_uid: this.clientUid, client_name: data.client_name, instanceId: data.metricsCollectorData.instanceId, table: data.metricsCollectorData.table, @@ -271,11 +271,11 @@ export class GCPMetricsHandler implements IMetricsHandler { * @param {OnAttemptCompleteData} data Data related to the completed attempt. */ onAttemptComplete(data: OnAttemptCompleteData) { - const otelInstruments = this.getInstruments(data.projectId); + const otelInstruments = this.otelInstruments; const commonAttributes = { app_profile: data.metricsCollectorData.app_profile, method: data.metricsCollectorData.method, - client_uid: data.metricsCollectorData.client_uid, + client_uid: this.clientUid, status: data.status, client_name: data.client_name, instanceId: data.metricsCollectorData.instanceId, diff --git a/src/client-side-metrics/metrics-config-manager.ts b/src/client-side-metrics/metrics-config-manager.ts new file mode 100644 index 000000000..a28d7f14f --- /dev/null +++ b/src/client-side-metrics/metrics-config-manager.ts @@ -0,0 +1,44 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {IMetricsHandler} from './metrics-handler'; +import { + ITabularApiSurface, + OperationMetricsCollector, +} from './operation-metrics-collector'; +import {MethodName, StreamingState} from './client-side-metrics-attributes'; + +/** + * A class for tracing and recording client-side metrics related to Bigtable operations. + */ +export class ClientSideMetricsConfigManager { + private metricsHandlers: IMetricsHandler[]; + + constructor(handlers: IMetricsHandler[]) { + this.metricsHandlers = handlers; + } + + createOperation( + methodName: MethodName, + streaming: StreamingState, + table: ITabularApiSurface, + ): OperationMetricsCollector { + return new OperationMetricsCollector( + table, + methodName, + streaming, + this.metricsHandlers, + ); + } +} diff --git a/src/client-side-metrics/metrics-handler.ts b/src/client-side-metrics/metrics-handler.ts index 6b4f0053e..6ce5bce12 100644 --- a/src/client-side-metrics/metrics-handler.ts +++ b/src/client-side-metrics/metrics-handler.ts @@ -13,7 +13,6 @@ // limitations under the License. import {MethodName, StreamingState} from './client-side-metrics-attributes'; -import {grpc} from 'google-gax'; /** * The interfaces below use undefined instead of null to indicate a metric is @@ -28,11 +27,9 @@ type IMetricsCollectorData = { zone?: string; app_profile?: string; method: MethodName; - client_uid: string; }; interface StandardData { - projectId: string; metricsCollectorData: IMetricsCollectorData; client_name: string; streaming: StreamingState; diff --git a/src/client-side-metrics/operation-metrics-collector.ts b/src/client-side-metrics/operation-metrics-collector.ts index d0da7e474..4851eddbb 100644 --- a/src/client-side-metrics/operation-metrics-collector.ts +++ b/src/client-side-metrics/operation-metrics-collector.ts @@ -13,13 +13,22 @@ // limitations under the License. import * as fs from 'fs'; -import {IMetricsHandler} from './metrics-handler'; import {MethodName, StreamingState} from './client-side-metrics-attributes'; -import {grpc} from 'google-gax'; +import {grpc, ServiceError} from 'google-gax'; import * as gax from 'google-gax'; -const root = gax.protobuf.loadSync( - './protos/google/bigtable/v2/response_params.proto', +import {AbortableDuplex, BigtableOptions} from '../index'; +import * as path from 'path'; +import {IMetricsHandler} from './metrics-handler'; + +// When this environment variable is set then print any errors associated +// with failures in the metrics collector. +const METRICS_DEBUG = process.env.METRICS_DEBUG; + +const protoPath = path.join( + __dirname, + '../../protos/google/bigtable/v2/response_params.proto', ); +const root = gax.protobuf.loadSync(protoPath); const ResponseParams = root.lookupType('ResponseParams'); const {hrtime} = require('node:process'); @@ -32,8 +41,10 @@ export interface ITabularApiSurface { }; id: string; bigtable: { + metricsEnabled?: boolean; + projectId?: string; appProfileId?: string; - clientUid: string; + options: BigtableOptions; }; } @@ -59,10 +70,38 @@ enum MetricsCollectorState { OPERATION_COMPLETE, } +// This method displays warnings if METRICS_DEBUG is enabled. +function withMetricsDebug(fn: () => T): T | undefined { + try { + return fn(); + } catch (e) { + if (METRICS_DEBUG) { + console.warn('METRICS_DEBUG warning'); + console.warn((e as ServiceError).message); + } + } + return; +} + +// Checks that the state transition is valid and if not it throws a warning. +function checkState( + currentState: MetricsCollectorState, + allowedStates: MetricsCollectorState[], +): T | undefined { + if (allowedStates.includes(currentState)) { + return; + } else { + throw Error('Invalid state transition'); + } +} + /** * A class for tracing and recording client-side metrics related to Bigtable operations. */ export class OperationMetricsCollector { + // The following key corresponds to the key the instance information is + // stored in for the metadata that gets returned from the server. + private readonly INSTANCE_INFORMATION_KEY = 'x-goog-ext-425905942-bin'; private state: MetricsCollectorState; private operationStartTime: bigint | null; private attemptStartTime: bigint | null; @@ -71,7 +110,6 @@ export class OperationMetricsCollector { private tabularApiSurface: ITabularApiSurface; private methodName: MethodName; private attemptCount = 0; - private metricsHandlers: IMetricsHandler[]; private firstResponseLatency: number | null; private serverTimeRead: boolean; private serverTime: number | null; @@ -79,18 +117,19 @@ export class OperationMetricsCollector { private streamingOperation: StreamingState; private applicationLatencies: number[]; private lastRowReceivedTime: bigint | null; + private handlers: IMetricsHandler[]; /** * @param {ITabularApiSurface} tabularApiSurface Information about the Bigtable table being accessed. - * @param {IMetricsHandler[]} metricsHandlers The metrics handlers used for recording metrics. * @param {MethodName} methodName The name of the method being traced. * @param {StreamingState} streamingOperation Whether or not the call is a streaming operation. + * @param {IMetricsHandler[]} handlers The metrics handlers used to store the record the metrics. */ constructor( tabularApiSurface: ITabularApiSurface, - metricsHandlers: IMetricsHandler[], methodName: MethodName, streamingOperation: StreamingState, + handlers: IMetricsHandler[], ) { this.state = MetricsCollectorState.OPERATION_NOT_STARTED; this.zone = undefined; @@ -99,7 +138,6 @@ export class OperationMetricsCollector { this.methodName = methodName; this.operationStartTime = null; this.attemptStartTime = null; - this.metricsHandlers = metricsHandlers; this.firstResponseLatency = null; this.serverTimeRead = false; this.serverTime = null; @@ -107,6 +145,7 @@ export class OperationMetricsCollector { this.streamingOperation = streamingOperation; this.lastRowReceivedTime = null; this.applicationLatencies = []; + this.handlers = handlers; } private getMetricsCollectorData() { @@ -115,51 +154,71 @@ export class OperationMetricsCollector { { instanceId: this.tabularApiSurface.instance.id, table: this.tabularApiSurface.id, - cluster: this.cluster, - zone: this.zone, + cluster: this.cluster || '', + zone: this.zone || 'global', method: this.methodName, - client_uid: this.tabularApiSurface.bigtable.clientUid, }, appProfileId ? {app_profile: appProfileId} : {}, ); } + /** + * Called to add handlers to the stream so that we can observe + * header and trailer data for client side metrics. + * + * @param stream + */ + handleStatusAndMetadata(stream: AbortableDuplex) { + stream + .on( + 'metadata', + (metadata: {internalRepr: Map; options: {}}) => { + this.onMetadataReceived(metadata); + }, + ) + .on( + 'status', + (status: { + metadata: {internalRepr: Map; options: {}}; + }) => { + this.onStatusMetadataReceived(status); + }, + ); + } + /** * Called when the operation starts. Records the start time. */ onOperationStart() { - if (this.state === MetricsCollectorState.OPERATION_NOT_STARTED) { + withMetricsDebug(() => { + checkState(this.state, [MetricsCollectorState.OPERATION_NOT_STARTED]); this.operationStartTime = hrtime.bigint(); this.firstResponseLatency = null; this.applicationLatencies = []; this.state = MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS; - } else { - console.warn('Invalid state transition'); - } + }); } /** * Called when an attempt (e.g., an RPC attempt) completes. Records attempt latencies. - * @param {string} projectId The id of the project. * @param {grpc.status} attemptStatus The grpc status for the attempt. */ - onAttemptComplete(projectId: string, attemptStatus: grpc.status) { - if ( - this.state === - MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET || - this.state === - MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED - ) { + onAttemptComplete(attemptStatus: grpc.status) { + withMetricsDebug(() => { + checkState(this.state, [ + MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET, + MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED, + ]); this.state = MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS; this.attemptCount++; const endTime = hrtime.bigint(); - if (projectId && this.attemptStartTime) { + if (this.attemptStartTime) { const totalMilliseconds = Number( (endTime - this.attemptStartTime) / BigInt(1000000), ); - this.metricsHandlers.forEach(metricsHandler => { + this.handlers.forEach(metricsHandler => { if (metricsHandler.onAttemptComplete) { metricsHandler.onAttemptComplete({ attemptLatency: totalMilliseconds, @@ -169,24 +228,23 @@ export class OperationMetricsCollector { status: attemptStatus.toString(), client_name: `nodejs-bigtable/${version}`, metricsCollectorData: this.getMetricsCollectorData(), - projectId, }); } }); + } else { + console.warn('Start time should always be provided'); } - } else { - console.warn('Invalid state transition attempted'); - } + }); } /** * Called when a new attempt starts. Records the start time of the attempt. */ onAttemptStart() { - if ( - this.state === - MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS - ) { + withMetricsDebug(() => { + checkState(this.state, [ + MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS, + ]); this.state = MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET; this.attemptStartTime = hrtime.bigint(); @@ -194,60 +252,59 @@ export class OperationMetricsCollector { this.serverTimeRead = false; this.connectivityErrorCount = 0; this.lastRowReceivedTime = null; - } else { - console.warn('Invalid state transition attempted'); - } + }); } /** * Called when the first response is received. Records first response latencies. */ - onResponse(projectId: string) { - if (!this.firstResponseLatency) { - // Check firstResponseLatency first to improve latency for calls with many rows - if ( - this.state === - MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET - ) { + onResponse() { + withMetricsDebug(() => { + if (!this.firstResponseLatency) { + checkState(this.state, [ + MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET, + ]); this.state = MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED; const endTime = hrtime.bigint(); - if (projectId && this.operationStartTime) { - // first response latency is measured in total milliseconds. + if (this.operationStartTime) { this.firstResponseLatency = Number( (endTime - this.operationStartTime) / BigInt(1000000), ); + } else { + console.warn( + 'ProjectId and operationStartTime should always be provided', + ); } } - } + }); } /** * Called when an operation completes (successfully or unsuccessfully). * Records operation latencies, retry counts, and connectivity error counts. - * @param {string} projectId The id of the project. * @param {grpc.status} finalOperationStatus Information about the completed operation. */ - onOperationComplete(projectId: string, finalOperationStatus: grpc.status) { - if ( - this.state === - MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS - ) { + onOperationComplete(finalOperationStatus: grpc.status) { + this.onAttemptComplete(finalOperationStatus); + withMetricsDebug(() => { + checkState(this.state, [ + MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS, + ]); this.state = MetricsCollectorState.OPERATION_COMPLETE; const endTime = hrtime.bigint(); - if (projectId && this.operationStartTime) { + if (this.operationStartTime) { const totalMilliseconds = Number( (endTime - this.operationStartTime) / BigInt(1000000), ); { - this.metricsHandlers.forEach(metricsHandler => { + this.handlers.forEach(metricsHandler => { if (metricsHandler.onOperationComplete) { metricsHandler.onOperationComplete({ status: finalOperationStatus.toString(), streaming: this.streamingOperation, metricsCollectorData: this.getMetricsCollectorData(), client_name: `nodejs-bigtable/${version}`, - projectId, operationLatency: totalMilliseconds, retryCount: this.attemptCount - 1, firstResponseLatency: this.firstResponseLatency ?? undefined, @@ -256,10 +313,10 @@ export class OperationMetricsCollector { } }); } + } else { + console.warn('operation start time should always be available here'); } - } else { - console.warn('Invalid state transition attempted'); - } + }); } /** @@ -334,29 +391,32 @@ export class OperationMetricsCollector { onStatusMetadataReceived(status: { metadata: {internalRepr: Map; options: {}}; }) { - if (!this.zone || !this.cluster) { - const INSTANCE_INFORMATION_KEY = 'x-goog-ext-425905942-bin'; - const mappedValue = status.metadata.internalRepr.get( - INSTANCE_INFORMATION_KEY, - ) as Buffer[]; - const decodedValue = ResponseParams.decode( - mappedValue[0], - mappedValue[0].length, - ); - if ( - decodedValue && - (decodedValue as unknown as {zoneId: string}).zoneId - ) { - this.zone = (decodedValue as unknown as {zoneId: string}).zoneId; - } - if ( - decodedValue && - (decodedValue as unknown as {clusterId: string}).clusterId - ) { - this.cluster = ( - decodedValue as unknown as {clusterId: string} - ).clusterId; + withMetricsDebug(() => { + if (!this.zone || !this.cluster) { + const mappedValue = status.metadata.internalRepr.get( + this.INSTANCE_INFORMATION_KEY, + ) as Buffer[]; + if (mappedValue && mappedValue[0] && ResponseParams) { + const decodedValue = ResponseParams.decode( + mappedValue[0], + mappedValue[0].length, + ); + if ( + decodedValue && + (decodedValue as unknown as {zoneId: string}).zoneId + ) { + this.zone = (decodedValue as unknown as {zoneId: string}).zoneId; + } + if ( + decodedValue && + (decodedValue as unknown as {clusterId: string}).clusterId + ) { + this.cluster = ( + decodedValue as unknown as {clusterId: string} + ).clusterId; + } + } } - } + }); } } diff --git a/src/index.ts b/src/index.ts index d8a501ea1..b17fcc7a3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -16,7 +16,12 @@ import {replaceProjectIdToken} from '@google-cloud/projectify'; import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); import * as extend from 'extend'; -import {GoogleAuth, CallOptions, grpc as gaxVendoredGrpc} from 'google-gax'; +import { + GoogleAuth, + CallOptions, + grpc as gaxVendoredGrpc, + ClientOptions, +} from 'google-gax'; import * as gax from 'google-gax'; import * as protos from '../protos/protos'; @@ -36,6 +41,8 @@ import * as v2 from './v2'; import {PassThrough, Duplex} from 'stream'; import grpcGcpModule = require('grpc-gcp'); import {ClusterUtils} from './utils/cluster'; +import {ClientSideMetricsConfigManager} from './client-side-metrics/metrics-config-manager'; +import {GCPMetricsHandler} from './client-side-metrics/gcp-metrics-handler'; // eslint-disable-next-line @typescript-eslint/no-var-requires const streamEvents = require('stream-events'); @@ -101,6 +108,8 @@ export interface BigtableOptions extends gax.GoogleAuthOptions { * Internal only. */ BigtableTableAdminClient?: gax.ClientOptions; + + metricsEnabled?: boolean; } /** @@ -474,6 +483,7 @@ export class Bigtable { static AppProfile: AppProfile; static Instance: Instance; static Cluster: Cluster; + _metricsConfigManager: ClientSideMetricsConfigManager; constructor(options: BigtableOptions = {}) { // Determine what scopes are needed. @@ -578,6 +588,12 @@ export class Bigtable { this.appProfileId = options.appProfileId; this.projectName = `projects/${this.projectId}`; this.shouldReplaceProjectIdToken = this.projectId === '{{projectId}}'; + + const handlers = + options.metricsEnabled === true + ? [new GCPMetricsHandler(options as ClientOptions)] + : []; + this._metricsConfigManager = new ClientSideMetricsConfigManager(handlers); } createInstance( @@ -970,6 +986,7 @@ export class Bigtable { gaxStream .on('error', stream.destroy.bind(stream)) .on('metadata', stream.emit.bind(stream, 'metadata')) + .on('status', stream.emit.bind(stream, 'status')) .on('request', stream.emit.bind(stream, 'request')) .pipe(stream); }); diff --git a/src/row.ts b/src/row.ts index 9383f4703..39af7a32b 100644 --- a/src/row.ts +++ b/src/row.ts @@ -31,6 +31,11 @@ import {ServiceError} from 'google-gax'; import {google} from '../protos/protos'; import {RowDataUtils, RowProperties} from './row-data-utils'; import {TabularApiSurface} from './tabular-api-surface'; +import {getRowsInternal} from './utils/getRowsInternal'; +import { + MethodName, + StreamingState, +} from './client-side-metrics/client-side-metrics-attributes'; export interface Rule { column: string; @@ -666,31 +671,42 @@ export class Row { filter, }); - this.table.getRows(getRowsOptions, (err, rows) => { - if (err) { - callback(err); - return; - } + const metricsCollector = + this.table.bigtable._metricsConfigManager.createOperation( + MethodName.READ_ROW, + StreamingState.UNARY, + this.table, + ); + void getRowsInternal( + this.table, + metricsCollector, + getRowsOptions, + (err, rows) => { + if (err) { + callback(err); + return; + } - const row = rows![0]; + const row = rows![0]; - if (!row) { - const e = new RowError(this.id); - callback(e); - return; - } + if (!row) { + const e = new RowError(this.id); + callback(e); + return; + } - this.data = row.data; + this.data = row.data; - // If the user specifies column names, we'll return back the row data - // we received. Otherwise, we'll return the row "this" in a typical - // GrpcServiceObject#get fashion. - if (columns.length > 0) { - callback(null, row.data); - } else { - (callback as {} as GetRowCallback)(null, this); - } - }); + // If the user specifies column names, we'll return back the row data + // we received. Otherwise, we'll return the row "this" in a typical + // GrpcServiceObject#get fashion. + if (columns.length > 0) { + callback(null, row.data); + } else { + (callback as {} as GetRowCallback)(null, this); + } + }, + ); } getMetadata(options?: GetRowOptions): Promise; diff --git a/src/tabular-api-surface.ts b/src/tabular-api-surface.ts index 784f1f622..462658e92 100644 --- a/src/tabular-api-surface.ts +++ b/src/tabular-api-surface.ts @@ -17,28 +17,26 @@ import arrify = require('arrify'); import {Instance} from './instance'; import {Mutation} from './mutation'; import { - AbortableDuplex, Bigtable, Entry, MutateOptions, SampleRowKeysCallback, SampleRowsKeysResponse, } from './index'; -import {Filter, BoundData, RawFilter} from './filter'; +import {BoundData, RawFilter} from './filter'; import {Row} from './row'; -import { - ChunkPushData, - ChunkPushLastScannedRowData, - ChunkTransformer, - DataEvent, -} from './chunktransformer'; import {BackoffSettings} from 'google-gax/build/src/gax'; import {google} from '../protos/protos'; import {CallOptions, grpc, ServiceError} from 'google-gax'; -import {Duplex, PassThrough, Transform} from 'stream'; +import {Transform} from 'stream'; import * as is from 'is'; import {GoogleInnerError} from './table'; -import {TableUtils} from './utils/table'; +import {createReadStreamInternal} from './utils/createReadStreamInternal'; +import {getRowsInternal} from './utils/getRowsInternal'; +import { + MethodName, + StreamingState, +} from './client-side-metrics/client-side-metrics-attributes'; // See protos/google/rpc/code.proto // (4=DEADLINE_EXCEEDED, 8=RESOURCE_EXHAUSTED, 10=ABORTED, 14=UNAVAILABLE) @@ -159,7 +157,9 @@ export class TabularApiSurface { id: string; metadata?: google.bigtable.admin.v2.ITable; maxRetries?: number; - protected viewName?: string; + // We need viewName to be public because now we need it in Row class + // We need it in Row class because now we use getRowsInternal instead of getRows + viewName?: string; protected constructor(instance: Instance, id: string, viewName?: string) { this.bigtable = instance.bigtable; @@ -187,7 +187,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); * Get {@link Row} objects for the rows currently in your table as a * readable object stream. * - * @param {object} [options] Configuration object. * @param {boolean} [options.decode=true] If set to `false` it will not decode * Buffer values returned from Bigtable. * @param {boolean} [options.encoding] The encoding to use when converting @@ -208,361 +207,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); * * @example include:samples/api-reference-doc-snippets/table.js * region_tag:bigtable_api_table_readstream + * @param opts */ createReadStream(opts?: GetRowsOptions) { - const options = opts || {}; - const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 10; - let activeRequestStream: AbortableDuplex | null; - let rowKeys: string[]; - let filter: {} | null; - const rowsLimit = options.limit || 0; - const hasLimit = rowsLimit !== 0; - - let numConsecutiveErrors = 0; - let numRequestsMade = 0; - let retryTimer: NodeJS.Timeout | null; - - rowKeys = options.keys || []; - - /* - The following line of code sets the timeout if it was provided while - creating the client. This will be used to determine if the client should - retry on DEADLINE_EXCEEDED errors. Eventually, this will be handled - downstream in google-gax. - */ - const timeout = - opts?.gaxOptions?.timeout || - (this?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces && - this?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces[ - 'google.bigtable.v2.Bigtable' - ]?.methods['ReadRows']?.timeout_millis); - const callTimeMillis = new Date().getTime(); - - const ranges = TableUtils.getRanges(options); - - // If rowKeys and ranges are both empty, the request is a full table scan. - // Add an empty range to simplify the resumption logic. - if (rowKeys.length === 0 && ranges.length === 0) { - ranges.push({}); - } - - if (options.filter) { - filter = Filter.parse(options.filter); - } - - let chunkTransformer: ChunkTransformer; - let rowStream: Duplex; - - let userCanceled = false; - // The key of the last row that was emitted by the per attempt pipeline - // Note: this must be updated from the operation level userStream to avoid referencing buffered rows that will be - // discarded in the per attempt subpipeline (rowStream) - let lastRowKey = ''; - let rowsRead = 0; - const userStream = new PassThrough({ - objectMode: true, - readableHighWaterMark: 0, // We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early. - writableHighWaterMark: 0, // We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key. - transform(event, _encoding, callback) { - if (userCanceled) { - callback(); - return; - } - if (event.eventType === DataEvent.LAST_ROW_KEY_UPDATE) { - /** - * This code will run when receiving an event containing - * lastScannedRowKey data that the chunk transformer sent. When the - * chunk transformer gets lastScannedRowKey data, this code - * updates the lastRowKey to ensure row ids with the lastScannedRowKey - * aren't re-requested in retries. The lastRowKey needs to be updated - * here and not in the chunk transformer to ensure the update is - * queued behind all events that deliver data to the user stream - * first. - */ - lastRowKey = event.lastScannedRowKey; - callback(); - return; - } - const row = event; - if (TableUtils.lessThanOrEqualTo(row.id, lastRowKey)) { - /* - Sometimes duplicate rows reach this point. To avoid delivering - duplicate rows to the user, rows are thrown away if they don't exceed - the last row key. We can expect each row to reach this point and rows - are delivered in order so if the last row key equals or exceeds the - row id then we know data for this row has already reached this point - and been delivered to the user. In this case we want to throw the row - away and we do not want to deliver this row to the user again. - */ - callback(); - return; - } - lastRowKey = row.id; - rowsRead++; - callback(null, row); - }, - }); - - // The caller should be able to call userStream.end() to stop receiving - // more rows and cancel the stream prematurely. But also, the 'end' event - // will be emitted if the stream ended normally. To tell these two - // situations apart, we'll save the "original" end() function, and - // will call it on rowStream.on('end'). - const originalEnd = userStream.end.bind(userStream); - - // Taking care of this extra listener when piping and unpiping userStream: - const rowStreamPipe = (rowStream: Duplex, userStream: PassThrough) => { - rowStream.pipe(userStream, {end: false}); - rowStream.on('end', originalEnd); - }; - const rowStreamUnpipe = (rowStream: Duplex, userStream: PassThrough) => { - rowStream?.unpipe(userStream); - rowStream?.removeListener('end', originalEnd); - }; - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => { - rowStreamUnpipe(rowStream, userStream); - userCanceled = true; - if (activeRequestStream) { - activeRequestStream.abort(); - } - if (retryTimer) { - clearTimeout(retryTimer); - } - return originalEnd(chunk, encoding, cb); - }; - - const makeNewRequest = () => { - // Avoid cancelling an expired timer if user - // cancelled the stream in the middle of a retry - retryTimer = null; - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - chunkTransformer = new ChunkTransformer({decode: options.decode} as any); - - // If the viewName is provided then request will be made for an - // authorized view. Otherwise, the request is made for a table. - const reqOpts = ( - this.viewName - ? { - authorizedViewName: `${this.name}/authorizedViews/${this.viewName}`, - appProfileId: this.bigtable.appProfileId, - } - : { - tableName: this.name, - appProfileId: this.bigtable.appProfileId, - } - ) as google.bigtable.v2.IReadRowsRequest; - - const retryOpts = { - currentRetryAttempt: 0, // was numConsecutiveErrors - // Handling retries in this client. Specify the retry options to - // make sure nothing is retried in retry-request. - noResponseRetries: 0, - shouldRetryFn: (_: any) => { - return false; - }, - }; - - if (lastRowKey) { - // Readjust and/or remove ranges based on previous valid row reads. - // Iterate backward since items may need to be removed. - for (let index = ranges.length - 1; index >= 0; index--) { - const range = ranges[index]; - const startValue = is.object(range.start) - ? (range.start as BoundData).value - : range.start; - const endValue = is.object(range.end) - ? (range.end as BoundData).value - : range.end; - const startKeyIsRead = - !startValue || - TableUtils.lessThanOrEqualTo( - startValue as string, - lastRowKey as string, - ); - const endKeyIsNotRead = - !endValue || - (endValue as Buffer).length === 0 || - TableUtils.lessThan(lastRowKey as string, endValue as string); - if (startKeyIsRead) { - if (endKeyIsNotRead) { - // EndKey is not read, reset the range to start from lastRowKey open - range.start = { - value: lastRowKey, - inclusive: false, - }; - } else { - // EndKey is read, remove this range - ranges.splice(index, 1); - } - } - } - - // Remove rowKeys already read. - rowKeys = rowKeys.filter(rowKey => - TableUtils.greaterThan(rowKey, lastRowKey as string), - ); - - // If there was a row limit in the original request and - // we've already read all the rows, end the stream and - // do not retry. - if (hasLimit && rowsLimit === rowsRead) { - userStream.end(); - return; - } - // If all the row keys and ranges are read, end the stream - // and do not retry. - if (rowKeys.length === 0 && ranges.length === 0) { - userStream.end(); - return; - } - } - - // Create the new reqOpts - reqOpts.rows = {}; - - // TODO: preprocess all the keys and ranges to Bytes - reqOpts.rows.rowKeys = rowKeys.map( - Mutation.convertToBytes, - ) as {} as Uint8Array[]; - - reqOpts.rows.rowRanges = ranges.map(range => - Filter.createRange( - range.start as BoundData, - range.end as BoundData, - 'Key', - ), - ); - - if (filter) { - reqOpts.filter = filter; - } - - if (hasLimit) { - reqOpts.rowsLimit = rowsLimit - rowsRead; - } - - const gaxOpts = populateAttemptHeader( - numRequestsMade, - options.gaxOptions, + const metricsCollector = + this.bigtable._metricsConfigManager.createOperation( + MethodName.READ_ROWS, + StreamingState.STREAMING, + this, ); - - const requestStream = this.bigtable.request({ - client: 'BigtableClient', - method: 'readRows', - reqOpts, - gaxOpts, - retryOpts, - }); - - activeRequestStream = requestStream!; - - const toRowStream = new Transform({ - transform: (rowData: ChunkPushData, _, next) => { - if ( - userCanceled || - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (userStream as any)._writableState.ended - ) { - return next(); - } - if ( - (rowData as ChunkPushLastScannedRowData).eventType === - DataEvent.LAST_ROW_KEY_UPDATE - ) { - /** - * If the data is the chunk transformer communicating that the - * lastScannedRow was received then this message is passed along - * to the user stream to update the lastRowKey. - */ - next(null, rowData); - } else { - /** - * If the data is just regular rows being pushed from the - * chunk transformer then the rows are encoded so that they - * can be consumed by the user stream. - */ - const row = this.row((rowData as Row).key as string); - row.data = (rowData as Row).data; - next(null, row); - } - }, - objectMode: true, - }); - - rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]); - - // Retry on "received rst stream" errors - const isRstStreamError = (error: ServiceError): boolean => { - if (error.code === 13 && error.message) { - const error_message = (error.message || '').toLowerCase(); - return ( - error.code === 13 && - (error_message.includes('rst_stream') || - error_message.includes('rst stream')) - ); - } - return false; - }; - - rowStream - .on('error', (error: ServiceError) => { - rowStreamUnpipe(rowStream, userStream); - activeRequestStream = null; - if (IGNORED_STATUS_CODES.has(error.code)) { - // We ignore the `cancelled` "error", since we are the ones who cause - // it when the user calls `.abort()`. - userStream.end(); - return; - } - numConsecutiveErrors++; - numRequestsMade++; - if ( - numConsecutiveErrors <= maxRetries && - (RETRYABLE_STATUS_CODES.has(error.code) || - isRstStreamError(error)) && - !(timeout && timeout < new Date().getTime() - callTimeMillis) - ) { - const backOffSettings = - options.gaxOptions?.retry?.backoffSettings || - DEFAULT_BACKOFF_SETTINGS; - const nextRetryDelay = getNextDelay( - numConsecutiveErrors, - backOffSettings, - ); - retryTimer = setTimeout(makeNewRequest, nextRetryDelay); - } else { - if ( - !error.code && - error.message === 'The client has already been closed.' - ) { - // - // The TestReadRows_Generic_CloseClient conformance test requires - // a grpc code to be present when the client is closed. According - // to Gemini, the appropriate code for a closed client is - // CANCELLED since the user actually cancelled the call by closing - // the client. - // - error.code = grpc.status.CANCELLED; - } - userStream.emit('error', error); - } - }) - .on('data', _ => { - // Reset error count after a successful read so the backoff - // time won't keep increasing when as stream had multiple errors - numConsecutiveErrors = 0; - }) - .on('end', () => { - activeRequestStream = null; - }); - rowStreamPipe(rowStream, userStream); - }; - - makeNewRequest(); - return userStream; + return createReadStreamInternal(this, metricsCollector, opts); } getRows(options?: GetRowsOptions): Promise; @@ -575,32 +229,28 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); * before returning the results. Instead we recommend using the streaming API * via {@link Table#createReadStream}. * - * @param {object} [options] Configuration object. See * {@link Table#createReadStream} for a complete list of options. * @param {object} [options.gaxOptions] Request configuration options, outlined * here: https://googleapis.github.io/gax-nodejs/CallSettings.html. - * @param {function} callback The callback function. * @param {?error} callback.err An error returned while making this request. * @param {Row[]} callback.rows List of Row objects. * * @example include:samples/api-reference-doc-snippets/table.js * region_tag:bigtable_api_get_rows + * @param optionsOrCallback + * @param cb */ getRows( optionsOrCallback?: GetRowsOptions | GetRowsCallback, cb?: GetRowsCallback, ): void | Promise { - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - this.createReadStream(options) - .on('error', callback) - .pipe( - concat((rows: Row[]) => { - callback(null, rows); - }), + const metricsCollector = + this.bigtable._metricsConfigManager.createOperation( + MethodName.READ_ROWS, + StreamingState.STREAMING, + this, ); + return getRowsInternal(this, metricsCollector, optionsOrCallback, cb); } insert( diff --git a/src/utils/createReadStreamInternal.ts b/src/utils/createReadStreamInternal.ts new file mode 100644 index 000000000..e1561715a --- /dev/null +++ b/src/utils/createReadStreamInternal.ts @@ -0,0 +1,440 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {GetRowsOptions} from '../table'; +import {Row} from '../row'; +import * as is from 'is'; +import {Filter, BoundData} from '../filter'; +import {Mutation} from '../mutation'; +import {AbortableDuplex} from '../index'; +import { + ChunkPushData, + ChunkPushLastScannedRowData, + ChunkTransformer, + DataEvent, +} from '../chunktransformer'; +import {TableUtils} from './table'; +import {Duplex, PassThrough, Transform} from 'stream'; +import { + MethodName, + StreamingState, +} from '../client-side-metrics/client-side-metrics-attributes'; +import {google} from '../../protos/protos'; +const pumpify = require('pumpify'); +import {grpc, ServiceError} from 'google-gax'; +import { + DEFAULT_BACKOFF_SETTINGS, + getNextDelay, + IGNORED_STATUS_CODES, + populateAttemptHeader, + RETRYABLE_STATUS_CODES, + TabularApiSurface, +} from '../tabular-api-surface'; +import {OperationMetricsCollector} from '../client-side-metrics/operation-metrics-collector'; + +/** + * Creates a readable stream of rows from a Bigtable table or authorized view. + * + * This internal method handles the core logic for streaming rows from a Bigtable + * table. It supports various filtering, limiting, and retry mechanisms. It can + * be used to create a stream for either a whole table or an authorized view. + * + * @param {Table} table The Table instance to read rows from. + * @param metricsCollector + * @param {GetRowsOptions} [opts] Optional configuration for the read operation. + * @param {boolean} [opts.decode=true] If set to `false` it will not decode + * Buffer values returned from Bigtable. + * @param {boolean} [opts.encoding] The encoding to use when converting + * Buffer values to a string. + * @param {string} [opts.end] End value for key range. + * @param {Filter} [opts.filter] Row filters allow you to + * both make advanced queries and format how the data is returned. + * @param {object} [opts.gaxOptions] Request configuration options, outlined + * here: https://googleapis.github.io/gax-nodejs/CallSettings.html. + * @param {string[]} [opts.keys] A list of row keys. + * @param {number} [opts.limit] Maximum number of rows to be returned. + * @param {string} [opts.prefix] Prefix that the row key must match. + * @param {string[]} [opts.prefixes] List of prefixes that a row key must + * match. + * @param {object[]} [opts.ranges] A list of key ranges. + * @param {string} [opts.start] Start value for key range. + * @returns {stream} A readable stream of {@link Row} objects. + * + */ +export function createReadStreamInternal( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + opts?: GetRowsOptions, +) { + const options = opts || {}; + const maxRetries = is.number(table.maxRetries) ? table.maxRetries! : 10; + let activeRequestStream: AbortableDuplex | null; + let rowKeys: string[]; + let filter: {} | null; + const rowsLimit = options.limit || 0; + const hasLimit = rowsLimit !== 0; + + const viewName = table.viewName; + + let numConsecutiveErrors = 0; + let numRequestsMade = 0; + let retryTimer: NodeJS.Timeout | null; + + rowKeys = options.keys || []; + + /* + The following line of code sets the timeout if it was provided while + creating the client. This will be used to determine if the client should + retry on DEADLINE_EXCEEDED errors. Eventually, this will be handled + downstream in google-gax. + */ + const timeout = + opts?.gaxOptions?.timeout || + (table?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces && + table?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces[ + 'google.bigtable.v2.Bigtable' + ]?.methods['ReadRows']?.timeout_millis); + const callTimeMillis = new Date().getTime(); + + const ranges = TableUtils.getRanges(options); + + // If rowKeys and ranges are both empty, the request is a full table scan. + // Add an empty range to simplify the resumption logic. + if (rowKeys.length === 0 && ranges.length === 0) { + ranges.push({}); + } + + if (options.filter) { + filter = Filter.parse(options.filter); + } + + let chunkTransformer: ChunkTransformer; + let rowStream: Duplex; + + let userCanceled = false; + // The key of the last row that was emitted by the per attempt pipeline + // Note: this must be updated from the operation level userStream to avoid referencing buffered rows that will be + // discarded in the per attempt subpipeline (rowStream) + let lastRowKey = ''; + let rowsRead = 0; + const userStream = new PassThrough({ + objectMode: true, + readableHighWaterMark: 0, // We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early. + writableHighWaterMark: 0, // We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key. + transform(event, _encoding, callback) { + if (userCanceled) { + callback(); + return; + } + if (event.eventType === DataEvent.LAST_ROW_KEY_UPDATE) { + /** + * This code will run when receiving an event containing + * lastScannedRowKey data that the chunk transformer sent. When the + * chunk transformer gets lastScannedRowKey data, this code + * updates the lastRowKey to ensure row ids with the lastScannedRowKey + * aren't re-requested in retries. The lastRowKey needs to be updated + * here and not in the chunk transformer to ensure the update is + * queued behind all events that deliver data to the user stream + * first. + */ + lastRowKey = event.lastScannedRowKey; + callback(); + return; + } + const row = event; + if (TableUtils.lessThanOrEqualTo(row.id, lastRowKey)) { + /* + Sometimes duplicate rows reach this point. To avoid delivering + duplicate rows to the user, rows are thrown away if they don't exceed + the last row key. We can expect each row to reach this point and rows + are delivered in order so if the last row key equals or exceeds the + row id then we know data for this row has already reached this point + and been delivered to the user. In this case we want to throw the row + away and we do not want to deliver this row to the user again. + */ + callback(); + return; + } + lastRowKey = row.id; + rowsRead++; + callback(null, row); + }, + }); + + // The caller should be able to call userStream.end() to stop receiving + // more rows and cancel the stream prematurely. But also, the 'end' event + // will be emitted if the stream ended normally. To tell these two + // situations apart, we'll save the "original" end() function, and + // will call it on rowStream.on('end'). + const originalEnd = userStream.end.bind(userStream); + + // Taking care of this extra listener when piping and unpiping userStream: + const rowStreamPipe = (rowStream: Duplex, userStream: PassThrough) => { + rowStream.pipe(userStream, {end: false}); + rowStream.on('end', originalEnd); + }; + const rowStreamUnpipe = (rowStream: Duplex, userStream: PassThrough) => { + rowStream?.unpipe(userStream); + rowStream?.removeListener('end', originalEnd); + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => { + rowStreamUnpipe(rowStream, userStream); + userCanceled = true; + if (activeRequestStream) { + activeRequestStream.abort(); + } + if (retryTimer) { + clearTimeout(retryTimer); + } + return originalEnd(chunk, encoding, cb); + }; + metricsCollector.onOperationStart(); + const makeNewRequest = () => { + metricsCollector.onAttemptStart(); + + // Avoid cancelling an expired timer if user + // cancelled the stream in the middle of a retry + retryTimer = null; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + chunkTransformer = new ChunkTransformer({ + decode: options.decode, + } as any); + + // If the viewName is provided then request will be made for an + // authorized view. Otherwise, the request is made for a table. + const reqOpts = ( + viewName + ? { + authorizedViewName: `${table.name}/authorizedViews/${viewName}`, + appProfileId: table.bigtable.appProfileId, + } + : { + tableName: table.name, + appProfileId: table.bigtable.appProfileId, + } + ) as google.bigtable.v2.IReadRowsRequest; + + const retryOpts = { + currentRetryAttempt: 0, // was numConsecutiveErrors + // Handling retries in this client. Specify the retry options to + // make sure nothing is retried in retry-request. + noResponseRetries: 0, + shouldRetryFn: (_: any) => { + return false; + }, + }; + + if (lastRowKey) { + // Readjust and/or remove ranges based on previous valid row reads. + // Iterate backward since items may need to be removed. + for (let index = ranges.length - 1; index >= 0; index--) { + const range = ranges[index]; + const startValue = is.object(range.start) + ? (range.start as BoundData).value + : range.start; + const endValue = is.object(range.end) + ? (range.end as BoundData).value + : range.end; + const startKeyIsRead = + !startValue || + TableUtils.lessThanOrEqualTo( + startValue as string, + lastRowKey as string, + ); + const endKeyIsNotRead = + !endValue || + (endValue as Buffer).length === 0 || + TableUtils.lessThan(lastRowKey as string, endValue as string); + if (startKeyIsRead) { + if (endKeyIsNotRead) { + // EndKey is not read, reset the range to start from lastRowKey open + range.start = { + value: lastRowKey, + inclusive: false, + }; + } else { + // EndKey is read, remove this range + ranges.splice(index, 1); + } + } + } + + // Remove rowKeys already read. + rowKeys = rowKeys.filter(rowKey => + TableUtils.greaterThan(rowKey, lastRowKey as string), + ); + + // If there was a row limit in the original request and + // we've already read all the rows, end the stream and + // do not retry. + if (hasLimit && rowsLimit === rowsRead) { + userStream.end(); + return; + } + // If all the row keys and ranges are read, end the stream + // and do not retry. + if (rowKeys.length === 0 && ranges.length === 0) { + userStream.end(); + return; + } + } + + // Create the new reqOpts + reqOpts.rows = {}; + + // TODO: preprocess all the keys and ranges to Bytes + reqOpts.rows.rowKeys = rowKeys.map( + Mutation.convertToBytes, + ) as {} as Uint8Array[]; + + reqOpts.rows.rowRanges = ranges.map(range => + Filter.createRange( + range.start as BoundData, + range.end as BoundData, + 'Key', + ), + ); + + if (filter) { + reqOpts.filter = filter; + } + + if (hasLimit) { + reqOpts.rowsLimit = rowsLimit - rowsRead; + } + + const gaxOpts = populateAttemptHeader(numRequestsMade, options.gaxOptions); + + const requestStream = table.bigtable.request({ + client: 'BigtableClient', + method: 'readRows', + reqOpts, + gaxOpts, + retryOpts, + }); + + activeRequestStream = requestStream!; + + const toRowStream = new Transform({ + transform: (rowData: ChunkPushData, _, next) => { + if ( + userCanceled || + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (userStream as any)._writableState.ended + ) { + return next(); + } + if ( + (rowData as ChunkPushLastScannedRowData).eventType === + DataEvent.LAST_ROW_KEY_UPDATE + ) { + /** + * If the data is the chunk transformer communicating that the + * lastScannedRow was received then this message is passed along + * to the user stream to update the lastRowKey. + */ + next(null, rowData); + } else { + /** + * If the data is just regular rows being pushed from the + * chunk transformer then the rows are encoded so that they + * can be consumed by the user stream. + */ + const row = table.row((rowData as Row).key as string); + row.data = (rowData as Row).data; + next(null, row); + } + }, + objectMode: true, + }); + + rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]); + + // Retry on "received rst stream" errors + const isRstStreamError = (error: ServiceError): boolean => { + if (error.code === 13 && error.message) { + const error_message = (error.message || '').toLowerCase(); + return ( + error.code === 13 && + (error_message.includes('rst_stream') || + error_message.includes('rst stream')) + ); + } + return false; + }; + + metricsCollector.handleStatusAndMetadata(requestStream); + rowStream + .on('error', (error: ServiceError) => { + rowStreamUnpipe(rowStream, userStream); + activeRequestStream = null; + if (IGNORED_STATUS_CODES.has(error.code)) { + // We ignore the `cancelled` "error", since we are the ones who cause + // it when the user calls `.abort()`. + userStream.end(); + metricsCollector.onOperationComplete(error.code); + return; + } + numConsecutiveErrors++; + numRequestsMade++; + if ( + numConsecutiveErrors <= maxRetries && + (RETRYABLE_STATUS_CODES.has(error.code) || isRstStreamError(error)) && + !(timeout && timeout < new Date().getTime() - callTimeMillis) + ) { + const backOffSettings = + options.gaxOptions?.retry?.backoffSettings || + DEFAULT_BACKOFF_SETTINGS; + const nextRetryDelay = getNextDelay( + numConsecutiveErrors, + backOffSettings, + ); + metricsCollector.onAttemptComplete(error.code); + retryTimer = setTimeout(makeNewRequest, nextRetryDelay); + } else { + if ( + !error.code && + error.message === 'The client has already been closed.' + ) { + // + // The TestReadRows_Generic_CloseClient conformance test requires + // a grpc code to be present when the client is closed. The + // appropriate code for a closed client is CANCELLED since the + // user actually cancelled the call by closing the client. + // + error.code = grpc.status.CANCELLED; + } + metricsCollector.onOperationComplete(error.code); + userStream.emit('error', error); + } + }) + .on('data', _ => { + // Reset error count after a successful read so the backoff + // time won't keep increasing when as stream had multiple errors + numConsecutiveErrors = 0; + metricsCollector.onResponse(); + }) + .on('end', () => { + activeRequestStream = null; + metricsCollector.onOperationComplete(grpc.status.OK); + }); + rowStreamPipe(rowStream, userStream); + }; + + makeNewRequest(); + return userStream; +} diff --git a/src/utils/getRowsInternal.ts b/src/utils/getRowsInternal.ts new file mode 100644 index 000000000..d106b6de9 --- /dev/null +++ b/src/utils/getRowsInternal.ts @@ -0,0 +1,66 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + GetRowsCallback, + GetRowsOptions, + GetRowsResponse, + TabularApiSurface, +} from '../tabular-api-surface'; +import {createReadStreamInternal} from './createReadStreamInternal'; +import {Row} from '../row'; +import {OperationMetricsCollector} from '../client-side-metrics/operation-metrics-collector'; +// eslint-disable-next-line @typescript-eslint/no-var-requires +const concat = require('concat-stream'); + +/** + * Get {@link Row} objects for the rows currently in your table. + * + * This method is not recommended for large datasets as it will buffer all rows + * before returning the results. Instead we recommend using the streaming API + * via {@link Table#createReadStream}. + * + * @param {TabularApiSurface} table The table instance to get rows from. + * @param metricsCollector + * @param {object} [optionsOrCallback] Configuration object. See + * {@link Table#createReadStream} for a complete list of options. + * @param {object} [optionsOrCallback.gaxOptions] Request configuration options, outlined + * here: https://googleapis.github.io/gax-nodejs/CallSettings.html. + * @param {function} cb The callback function. + * @param {?error} cb.err An error returned while making this request. + * @param {Row[]} cb.rows List of Row objects. + * + * @returns {Promise|void} Returns a promise that resolves with the rows if no callback is provided, otherwise calls the callback with the rows. + * + * @example include:samples/api-reference-doc-snippets/table.js + * region_tag:bigtable_api_get_rows + */ +export function getRowsInternal( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + optionsOrCallback?: GetRowsOptions | GetRowsCallback, + cb?: GetRowsCallback, +): void | Promise { + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + createReadStreamInternal(table, metricsCollector, options) + .on('error', callback) + .pipe( + concat((rows: Row[]) => { + callback(null, rows); + }), + ); +} diff --git a/system-test/client-side-metrics-setup-table.ts b/system-test/client-side-metrics-setup-table.ts new file mode 100644 index 000000000..003d9b3bf --- /dev/null +++ b/system-test/client-side-metrics-setup-table.ts @@ -0,0 +1,71 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Bigtable} from '../src'; +export async function setupBigtable( + bigtable: Bigtable, + columnFamilyId: string, + instanceId: string, + tableIds: string[], +) { + const instance = bigtable.instance(instanceId); + const [instanceInfo] = await instance.exists(); + while (!instanceInfo) { + const [, operation] = await instance.create({ + clusters: { + id: 'fake-cluster3', + location: 'us-west1-c', + nodes: 1, + }, + }); + await operation.promise(); + /** + * For whatever reason, even after waiting for an operation.promise() + * call to complete, the instance still doesn't seem to be ready yet so + * we do another check to ensure the instance is ready. + */ + const [instanceInfoAgain] = await instance.exists(); + if (instanceInfoAgain) { + break; + } + } + const tables = tableIds.map(tableId => instance.table(tableId)); + for (const currentTable of tables) { + const [tableExists] = await currentTable.exists(); + if (!tableExists) { + await currentTable.create({families: [columnFamilyId]}); // Create column family + } else { + // Check if column family exists and create it if not. + const [families] = await currentTable.getFamilies(); + + if ( + !families.some((family: {id: string}) => family.id === columnFamilyId) + ) { + await currentTable.createFamily(columnFamilyId); + } + } + // Add some data so that a firstResponseLatency is recorded. + await currentTable.insert([ + { + key: 'rowId', + data: { + [columnFamilyId]: { + gwashington: 1, + tjefferson: 1, + }, + }, + }, + ]); + } +} diff --git a/system-test/client-side-metrics.ts b/system-test/client-side-metrics.ts new file mode 100644 index 000000000..587f56bd6 --- /dev/null +++ b/system-test/client-side-metrics.ts @@ -0,0 +1,672 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {after, before, describe, it} from 'mocha'; +import * as mocha from 'mocha'; +import { + CloudMonitoringExporter, + ExportResult, +} from '../src/client-side-metrics/exporter'; +import {ResourceMetrics} from '@opentelemetry/sdk-metrics'; +import * as assert from 'assert'; +import {GCPMetricsHandler} from '../src/client-side-metrics/gcp-metrics-handler'; +import * as proxyquire from 'proxyquire'; +import {Bigtable} from '../src'; +import {Row} from '../src/row'; +import {setupBigtable} from './client-side-metrics-setup-table'; +import {TestMetricsHandler} from '../test-common/test-metrics-handler'; +import { + OnAttemptCompleteData, + OnOperationCompleteData, +} from '../src/client-side-metrics/metrics-handler'; +import {ClientOptions, ServiceError} from 'google-gax'; +import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics-config-manager'; +import {MetricServiceClient} from '@google-cloud/monitoring'; + +const SECOND_PROJECT_ID = 'cfdb-sdk-node-tests'; + +function getFakeBigtable( + projectId: string, + metricsHandlerClass: typeof GCPMetricsHandler | typeof TestMetricsHandler, + apiEndpoint?: string, +) { + const metricHandler = new metricsHandlerClass({ + apiEndpoint, + } as unknown as ClientOptions & {value: string}); + const newClient = new Bigtable({ + projectId, + apiEndpoint, + }); + newClient._metricsConfigManager = new ClientSideMetricsConfigManager([ + metricHandler, + ]); + return newClient; +} + +function getHandlerFromExporter(Exporter: typeof CloudMonitoringExporter) { + return proxyquire('../src/client-side-metrics/gcp-metrics-handler.js', { + './exporter': { + CloudMonitoringExporter: Exporter, + }, + }).GCPMetricsHandler; +} + +function readRowsAssertionCheck( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], + method: string, + streaming: string, +) { + assert.strictEqual(requestsHandled.length, 4); + const firstRequest = requestsHandled[0] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + assert(firstRequest.attemptLatency); + assert(firstRequest.serverLatency); + delete firstRequest.attemptLatency; + delete firstRequest.serverLatency; + delete firstRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(firstRequest, { + connectivityErrorCount: 0, + streaming, + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: 'emulator-test-instance', + table: 'my-table', + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method, + }, + projectId, + }); + const secondRequest = requestsHandled[1] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + assert(secondRequest.operationLatency); + assert(secondRequest.firstResponseLatency); + assert(secondRequest.applicationLatencies); + delete secondRequest.operationLatency; + delete secondRequest.firstResponseLatency; + delete secondRequest.applicationLatencies; + delete secondRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(secondRequest, { + status: '0', + streaming, + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: 'emulator-test-instance', + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method, + table: 'my-table', + }, + projectId, + retryCount: 0, + }); + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + const thirdRequest = requestsHandled[2] as any; + assert(thirdRequest.attemptLatency); + assert(thirdRequest.serverLatency); + delete thirdRequest.attemptLatency; + delete thirdRequest.serverLatency; + delete thirdRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(thirdRequest, { + connectivityErrorCount: 0, + streaming, + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: 'emulator-test-instance', + table: 'my-table2', + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method, + }, + projectId, + }); + const fourthRequest = requestsHandled[3] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + assert(fourthRequest.operationLatency); + assert(fourthRequest.firstResponseLatency); + assert(fourthRequest.applicationLatencies); + delete fourthRequest.operationLatency; + delete fourthRequest.firstResponseLatency; + delete fourthRequest.applicationLatencies; + delete fourthRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(fourthRequest, { + status: '0', + streaming, + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: 'emulator-test-instance', + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method, + table: 'my-table2', + }, + projectId, + retryCount: 0, + }); +} + +function checkMultiRowCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.ReadRows', + 'true', + ); +} + +function checkSingleRowCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.ReadRow', + 'false', + ); +} + +/** + * Checks if metrics have been published to Google Cloud Monitoring. + * + * This asynchronous function queries Google Cloud Monitoring to verify + * that the expected metrics from the Bigtable client library have been + * successfully published. It constructs a `MetricServiceClient` to + * interact with the Cloud Monitoring API and retrieves time series data + * for a predefined set of metrics. The test passes if time series data + * is found for each of the specified metrics within a defined time + * interval. + * + * @param {string} projectId The Google Cloud project ID where metrics are + * expected to be published. + * @throws {Error} If no time series data is found for any of the specified + * metrics, indicating that the metrics were not successfully published to + * Cloud Monitoring. + */ +async function checkForPublishedMetrics(projectId: string) { + const monitoringClient = new MetricServiceClient(); // Correct instantiation + const now = Math.floor(Date.now() / 1000); + const filters = [ + 'metric.type="bigtable.googleapis.com/client/attempt_latencies"', + 'metric.type="bigtable.googleapis.com/client/operation_latencies"', + 'metric.type="bigtable.googleapis.com/client/retry_count"', + 'metric.type="bigtable.googleapis.com/client/server_latencies"', + 'metric.type="bigtable.googleapis.com/client/first_response_latencies"', + ]; + for (let i = 0; i < filters.length; i++) { + const filter = filters[i]; + const [series] = await monitoringClient.listTimeSeries({ + name: `projects/${projectId}`, + interval: { + endTime: { + seconds: now, + nanos: 0, + }, + startTime: { + seconds: now - 1000 * 60 * 60 * 24, + nanos: 0, + }, + }, + filter, + }); + assert(series.length > 0); + } +} + +describe('Bigtable/ClientSideMetrics', () => { + const instanceId1 = 'emulator-test-instance'; + const instanceId2 = 'emulator-test-instance2'; + const tableId1 = 'my-table'; + const tableId2 = 'my-table2'; + const columnFamilyId = 'cf1'; + let defaultProjectId: string; + + before(async () => { + const bigtable = new Bigtable(); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtable(bigtable, columnFamilyId, instanceId, [ + tableId1, + tableId2, + ]); + } + defaultProjectId = await new Promise((resolve, reject) => { + bigtable.getProjectId_((err: Error | null, projectId?: string) => { + if (err) { + reject(err); + } else { + resolve(projectId as string); + } + }); + }); + }); + + after(async () => { + const bigtable = new Bigtable(); + try { + // If the instance has been deleted already by another source, we don't + // want this after hook to block the continuous integration pipeline. + const instance = bigtable.instance(instanceId1); + await instance.delete({}); + } catch (e) { + console.warn('The instance has been deleted already'); + } + try { + // If the instance has been deleted already by another source, we don't + // want this after hook to block the continuous integration pipeline. + const instance = bigtable.instance(instanceId2); + await instance.delete({}); + } catch (e) { + console.warn('The instance has been deleted already'); + } + }); + + describe('Bigtable/ClientSideMetricsToGCM', () => { + // This test suite ensures that for each test all the export calls are + // successful even when multiple instances and tables are created. + async function mockBigtable( + projectId: string, + done: mocha.Done, + apiEndpoint?: string, + ) { + /* + The exporter is called every x seconds, but we only want to test the value + it receives once. Since done cannot be called multiple times in mocha, + exported variable ensures we only test the value export receives one time. + */ + let exported = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. + */ + const timeout = setTimeout(() => { + if (!exported) { + done( + new Error( + 'The exporters have not completed yet and the timeout is over', + ), + ); + } + }, 120000); + + class TestExporter extends CloudMonitoringExporter { + constructor(options: ClientOptions) { + super(options); + } + + async export( + metrics: ResourceMetrics, + resultCallback: (result: ExportResult) => void, + ): Promise { + try { + await super.export(metrics, (result: ExportResult) => { + if (!exported) { + exported = true; + try { + clearTimeout(timeout); + // The test passes when the code is 0 because that means the + // result from calling export was successful. + assert.strictEqual(result.code, 0); + resultCallback({code: 0}); + void checkForPublishedMetrics(projectId).then(() => { + done(); + }); + } catch (error) { + // The code here isn't 0 so we report the original error to the mocha test runner. + done(result); + done(error); + } + } else { + resultCallback({code: 0}); + } + }); + } catch (error) { + done(error); + } + } + } + + return getFakeBigtable( + projectId, + getHandlerFromExporter(TestExporter), + apiEndpoint, + ); + } + + it('should send the metrics to Google Cloud Monitoring for a ReadRows call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtable(bigtable, columnFamilyId, instanceId, [ + tableId1, + tableId2, + ]); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + await table.getRows(); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a ReadRows call with a second project', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtable(bigtable, columnFamilyId, instanceId, [ + tableId1, + tableId2, + ]); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('Bigtable/ClientSideMetricsToGCMTimeout', () => { + // This test suite simulates a situation where the user creates multiple + // clients and ensures that the exporter doesn't produce any errors even + // when multiple clients are attempting an export. + async function mockBigtable( + projectId: string, + done: mocha.Done, + onExportSuccess?: () => void, + ) { + class TestExporter extends CloudMonitoringExporter { + constructor(options: ClientOptions) { + super(options); + } + + async export( + metrics: ResourceMetrics, + resultCallback: (result: ExportResult) => void, + ): Promise { + try { + await super.export(metrics, async (result: ExportResult) => { + try { + // The code is expected to be 0 because the + // result from calling export was successful. + assert.strictEqual(result.code, 0); + resultCallback({code: 0}); + if (onExportSuccess) { + onExportSuccess(); + } + } catch (error) { + // The code here isn't 0 so we report the original error to the + // mocha test runner. + // The test fails here because it means that an export was + // unsuccessful. + done(result); + done(error); + resultCallback({code: 0}); + } + }); + } catch (error) { + done(error); + resultCallback({code: 0}); + } + } + } + + /* + Below we mock out the table so that it sends the metrics to a test exporter + that will still send the metrics to Google Cloud Monitoring, but then also + ensure the export was successful and pass the test with code 0 if it is + successful. + */ + return getFakeBigtable(projectId, getHandlerFromExporter(TestExporter)); + } + + it('should send the metrics to Google Cloud Monitoring for a ReadRows call', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtable(bigtable, columnFamilyId, instanceId, [ + tableId1, + tableId2, + ]); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a ReadRows call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtable(bigtable, columnFamilyId, instanceId, [ + tableId1, + tableId2, + ]); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('Bigtable/ClientSideMetricsToMetricsHandler', () => { + async function mockBigtable( + projectId: string, + done: mocha.Done, + checkFn: ( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[], + ) => void, + ) { + let handlerRequestCount = 0; + class TestGCPMetricsHandler extends TestMetricsHandler { + projectId = projectId; + onOperationComplete(data: OnOperationCompleteData) { + handlerRequestCount++; + try { + super.onOperationComplete(data); + if (handlerRequestCount > 1) { + checkFn(projectId, this.requestsHandled); + done(); + } + } catch (e) { + done(e); + } + } + } + + const bigtable = getFakeBigtable(projectId, TestGCPMetricsHandler); + await setupBigtable(bigtable, columnFamilyId, instanceId1, [ + tableId1, + tableId2, + ]); + return bigtable; + } + + it('should send the metrics to the metrics handler for a ReadRows call', done => { + (async () => { + const bigtable = await mockBigtable( + defaultProjectId, + done, + checkMultiRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + })().catch(err => { + throw err; + }); + }); + it('should pass the projectId to the metrics handler properly', done => { + (async () => { + const bigtable = await mockBigtable( + defaultProjectId, + done, + checkMultiRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to the metrics handler for a single row read', done => { + (async () => { + try { + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable( + projectId, + done, + checkSingleRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = new Row(table, 'rowId'); + await row.get(); + const table2 = instance.table(tableId2); + const row2 = new Row(table2, 'rowId'); + await row2.get(); + } catch (e) { + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); +}); diff --git a/system-test/cloud-monitoring-exporter.ts b/system-test/cloud-monitoring-exporter.ts index cbee08e21..f484c8e05 100644 --- a/system-test/cloud-monitoring-exporter.ts +++ b/system-test/cloud-monitoring-exporter.ts @@ -70,11 +70,15 @@ describe('Bigtable/CloudMonitoringExporter', () => { }); }); } - const exporter = new CloudMonitoringExporter(); - exporter.export( - transformedExportInput as unknown as ResourceMetrics, - resultCallback, - ); + const exporter = new CloudMonitoringExporter({}); // Pass empty object as options + exporter + .export( + transformedExportInput as unknown as ResourceMetrics, + resultCallback, + ) + .catch(err => { + throw err; + }); })().catch(err => { throw err; }); diff --git a/system-test/gcp-metrics-handler.ts b/system-test/gcp-metrics-handler.ts index 93025d0a8..2608d6e19 100644 --- a/system-test/gcp-metrics-handler.ts +++ b/system-test/gcp-metrics-handler.ts @@ -13,7 +13,6 @@ // limitations under the License. import {describe} from 'mocha'; -import {GCPMetricsHandler} from '../src/client-side-metrics/gcp-metrics-handler'; import {expectedRequestsHandled} from '../test-common/metrics-handler-fixture'; import { OnAttemptCompleteData, @@ -26,8 +25,20 @@ import { import {Bigtable} from '../src'; import {ResourceMetrics} from '@opentelemetry/sdk-metrics'; import * as assert from 'assert'; -import {expectedOtelHundredExportInputs} from '../test-common/expected-otel-export-input'; -import {replaceTimestamps} from '../test-common/replace-timestamps'; +import {ClientOptions} from 'google-gax'; +import * as proxyquire from 'proxyquire'; + +function getHandler(Exporter: typeof CloudMonitoringExporter) { + const FakeCGPMetricsHandler = proxyquire( + '../src/client-side-metrics/gcp-metrics-handler.js', + { + './exporter': { + CloudMonitoringExporter: Exporter, + }, + }, + ).GCPMetricsHandler; + return new FakeCGPMetricsHandler(); +} describe('Bigtable/GCPMetricsHandler', () => { it('Should export a value to the GCPMetricsHandler', done => { @@ -64,13 +75,17 @@ describe('Bigtable/GCPMetricsHandler', () => { }; } class MockExporter extends CloudMonitoringExporter { - export( + constructor(options: ClientOptions) { + super(options); + } + + async export( metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void, - ): void { + ): Promise { const testResultCallback = getTestResultCallback(resultCallback); if (!exported) { - super.export(metrics, testResultCallback); + await super.export(metrics, testResultCallback); } else { resultCallback({code: 0}); } @@ -87,10 +102,7 @@ describe('Bigtable/GCPMetricsHandler', () => { } }); }); - // projectToInstruments argument is set to {} because we want a fresh - // instrument stack each time this test is run. - GCPMetricsHandler.instrumentsForProject = {}; - const handler = new GCPMetricsHandler(new MockExporter({projectId})); + const handler = getHandler(MockExporter); const transformedRequestsHandled = JSON.parse( JSON.stringify(expectedRequestsHandled).replace( /my-project/g, @@ -105,7 +117,7 @@ describe('Bigtable/GCPMetricsHandler', () => { } } })().catch(err => { - throw err; + done(err); }); }); it('Should export a value to two GCPMetricsHandlers', done => { @@ -152,14 +164,18 @@ describe('Bigtable/GCPMetricsHandler', () => { }; } class MockExporter extends CloudMonitoringExporter { - export( + constructor(options: ClientOptions) { + super(options); + } + + async export( metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void, - ): void { + ): Promise { if (exportedCount < 1) { // The code below uses the test callback to ensure the export was successful. const testResultCallback = getTestResultCallback(resultCallback); - super.export(metrics, testResultCallback); + await super.export(metrics, testResultCallback); } else { // After the test is complete the periodic exporter may still be // running in which case we don't want to do any checks. We just @@ -180,11 +196,8 @@ describe('Bigtable/GCPMetricsHandler', () => { } }); }); - // projectToInstruments argument is set to {} because we want a fresh - // instrument stack each time this test is run. - GCPMetricsHandler.instrumentsForProject = {}; - const handler = new GCPMetricsHandler(new MockExporter({projectId})); - const handler2 = new GCPMetricsHandler(new MockExporter({projectId})); + const handler = getHandler(MockExporter); + const handler2 = handler; const transformedRequestsHandled = JSON.parse( JSON.stringify(expectedRequestsHandled).replace( /my-project/g, @@ -206,142 +219,7 @@ describe('Bigtable/GCPMetricsHandler', () => { } } })().catch(err => { - throw err; - }); - }); - it('Should export a value to a hundred GCPMetricsHandlers', done => { - // This test ensures that when we create multiple GCPMetricsHandlers much like - // what we would be doing when calling readRows on separate tables that - // the data doesn't store duplicates in the same place and export twice as - // much data as it should. - (async () => { - /* - We need to create a timeout here because if we don't then mocha shuts down - the test as it is sleeping before the GCPMetricsHandler has a chance to - export the data. - */ - const timeout = setTimeout(() => { - done(new Error('The export never happened')); - }, 120000); - /* - The exporter is called every x seconds, but we only want to test the value - it receives once. Since done cannot be called multiple times in mocha, - exported variable ensures we only test the value export receives one time. - */ - let exportedCount = 0; - function getTestResultCallback( - resultCallback: (result: ExportResult) => void, - ) { - return (result: ExportResult) => { - exportedCount++; - try { - assert.strictEqual(result.code, 0); - } catch (error) { - // Code isn't 0 so report the original error. - done(result); - done(error); - } - if (exportedCount === 1) { - // We are expecting one call to an exporter. - clearTimeout(timeout); - done(); - } - // The resultCallback needs to be called to end the exporter operation - // so that the test shuts down in mocha. - resultCallback({code: 0}); - }; - } - class MockExporter extends CloudMonitoringExporter { - export( - metrics: ResourceMetrics, - resultCallback: (result: ExportResult) => void, - ): void { - if (exportedCount < 1) { - try { - // This code block ensures the metrics are correct. Mainly, the metrics - // shouldn't contain two copies of the data. It should only contain - // one. - // - // For this test since we are still writing a time series with - // metrics variable we don't want to modify the metrics variable - // to have artificial times because then sending the data to the - // metric service client will fail. Therefore, we must make a copy - // of the metrics and use that. - const parsedExportInput: ResourceMetrics = JSON.parse( - JSON.stringify(metrics), - ); - replaceTimestamps( - parsedExportInput as unknown as typeof expectedOtelHundredExportInputs, - [123, 789], - [456, 789], - ); - assert.deepStrictEqual( - parsedExportInput.scopeMetrics[0].metrics.length, - expectedOtelHundredExportInputs.scopeMetrics[0].metrics.length, - ); - for ( - let index = 0; - index < parsedExportInput.scopeMetrics[0].metrics.length; - index++ - ) { - // We need to compare pointwise because mocha truncates to an 8192 character limit. - assert.deepStrictEqual( - parsedExportInput.scopeMetrics[0].metrics[index], - expectedOtelHundredExportInputs.scopeMetrics[0].metrics[ - index - ], - ); - } - } catch (e) { - // The error needs to be caught so it can be reported to the mocha - // test runner. - done(e); - } - // The code below uses the test callback to ensure the export was successful. - const testResultCallback = getTestResultCallback(resultCallback); - super.export(metrics, testResultCallback); - } else { - // After the test is complete the periodic exporter may still be - // running in which case we don't want to do any checks. We just - // want to call the resultCallback so that there are no hanging - // threads. - resultCallback({code: 0}); - } - } - } - - const bigtable = new Bigtable(); - const projectId: string = await new Promise((resolve, reject) => { - bigtable.getProjectId_((err, projectId) => { - if (err) { - reject(err); - } else { - resolve(projectId as string); - } - }); - }); - const transformedRequestsHandled = JSON.parse( - JSON.stringify(expectedRequestsHandled).replace( - /my-project/g, - projectId, - ), - ); - const handlers = []; - // projectToInstruments argument is set to {} because we want a fresh - // instrument stack each time this test is run. - GCPMetricsHandler.instrumentsForProject = {}; - for (let i = 0; i < 100; i++) { - handlers.push(new GCPMetricsHandler(new MockExporter({projectId}))); - for (const request of transformedRequestsHandled) { - if (request.attemptLatency) { - handlers[i].onAttemptComplete(request as OnAttemptCompleteData); - } else { - handlers[i].onOperationComplete(request as OnOperationCompleteData); - } - } - } - })().catch(err => { - throw err; + done(err); }); }); it('Should write two duplicate points inserted into the metrics handler', done => { @@ -378,13 +256,17 @@ describe('Bigtable/GCPMetricsHandler', () => { }; } class MockExporter extends CloudMonitoringExporter { - export( + constructor(options: ClientOptions) { + super(options); + } + + async export( metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void, - ): void { + ): Promise { const testResultCallback = getTestResultCallback(resultCallback); if (!exported) { - super.export(metrics, testResultCallback); + await super.export(metrics, testResultCallback); } else { resultCallback({code: 0}); } @@ -401,10 +283,7 @@ describe('Bigtable/GCPMetricsHandler', () => { } }); }); - // projectToInstruments argument is set to {} because we want a fresh - // instrument stack each time this test is run. - GCPMetricsHandler.instrumentsForProject = {}; - const handler = new GCPMetricsHandler(new MockExporter({projectId})); + const handler = getHandler(MockExporter); // Pass options with exporter const transformedRequestsHandled = JSON.parse( JSON.stringify(expectedRequestsHandled).replace( /my-project/g, @@ -421,7 +300,7 @@ describe('Bigtable/GCPMetricsHandler', () => { } } })().catch(err => { - throw err; + done(err); }); }); }); diff --git a/system-test/read-rows-acceptance-tests.ts b/system-test/read-rows-acceptance-tests.ts index 93b9af01c..8d70db610 100644 --- a/system-test/read-rows-acceptance-tests.ts +++ b/system-test/read-rows-acceptance-tests.ts @@ -25,6 +25,37 @@ import * as fs from 'fs'; import * as path from 'path'; import {Instance} from '../src/instance'; import {Bigtable, AbortableDuplex} from '../src'; +import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics-config-manager'; +import { + ITabularApiSurface, + OperationMetricsCollector, +} from '../src/client-side-metrics/operation-metrics-collector'; +import { + MethodName, + StreamingState, +} from '../src/client-side-metrics/client-side-metrics-attributes'; + +class FakeOperationMetricsCollector extends OperationMetricsCollector { + onOperationComplete() {} + onResponse() {} + onAttemptStart() {} + onAttemptComplete() {} + onOperationStart() {} + handleStatusAndMetadata() {} + onMetadataReceived() {} + onRowReachesUser() {} + onStatusMetadataReceived() {} +} + +class FakeMetricsConfigManager extends ClientSideMetricsConfigManager { + createOperation( + methodName: MethodName, + streaming: StreamingState, + table: ITabularApiSurface, + ): OperationMetricsCollector { + return new FakeOperationMetricsCollector(table, methodName, streaming, []); + } +} const protosJson = path.resolve(__dirname, '../protos/protos.json'); const root = protobuf.Root.fromJSON( @@ -67,6 +98,7 @@ describe('Read Row Acceptance tests', () => { }); table.bigtable = {} as Bigtable; + table.bigtable._metricsConfigManager = new FakeMetricsConfigManager([]); // eslint-disable-next-line @typescript-eslint/no-explicit-any (table.bigtable.request as any) = () => { const stream = new PassThrough({ diff --git a/system-test/read-rows.ts b/system-test/read-rows.ts index 6f0395da9..bdb91a0ac 100644 --- a/system-test/read-rows.ts +++ b/system-test/read-rows.ts @@ -25,6 +25,8 @@ import {EventEmitter} from 'events'; import {Test} from './testTypes'; import {ServiceError, GrpcClient, GoogleError, CallOptions} from 'google-gax'; import {PassThrough} from 'stream'; +import * as proxyquire from 'proxyquire'; +import {TabularApiSurface} from '../src/tabular-api-surface'; import * as mocha from 'mocha'; const {grpc} = new GrpcClient(); @@ -77,7 +79,32 @@ function rowResponse(rowKey: {}) { } describe('Bigtable/Table', () => { - const bigtable = new Bigtable(); + /** + * We have to mock out the metrics handler because the metrics handler with + * open telemetry causes clock.runAll() to throw an infinite loop error. This + * is most likely because of the periodic reader as it schedules pending + * events on the node event loop which conflicts with the sinon clock. + */ + class TestGCPMetricsHandler { + onOperationComplete() {} + onAttemptComplete() {} + } + const FakeTabularApiSurface = proxyquire('../src/tabular-api-surface.js', { + './client-side-metrics/gcp-metrics-handler': { + GCPMetricsHandler: TestGCPMetricsHandler, + }, + }).TabularApiSurface; + const FakeTable: TabularApiSurface = proxyquire('../src/table.js', { + './tabular-api-surface.js': {TabularApiSurface: FakeTabularApiSurface}, + }).Table; + const FakeInstance = proxyquire('../src/instance.js', { + './table.js': {Table: FakeTable}, + }).Instance; + const FakeBigtable = proxyquire('../src/index.js', { + './instance.js': {Instance: FakeInstance}, + }).Bigtable; + + const bigtable = new FakeBigtable(); const INSTANCE_NAME = 'fake-instance2'; // eslint-disable-next-line @typescript-eslint/no-explicit-any (bigtable as any).grpcCredentials = grpc.credentials.createInsecure(); @@ -139,7 +166,7 @@ describe('Bigtable/Table', () => { rowKeysRead = []; requestedOptions = []; stub = sinon.stub(bigtable, 'request').callsFake(cfg => { - const reqOpts = cfg.reqOpts; + const reqOpts = (cfg as any).reqOpts; const requestOptions = {} as google.bigtable.v2.IRowSet; if (reqOpts.rows && reqOpts.rows.rowRanges) { requestOptions.rowRanges = reqOpts.rows.rowRanges.map( @@ -182,12 +209,14 @@ describe('Bigtable/Table', () => { responses = test.responses; TABLE.maxRetries = test.max_retries; TABLE.createReadStream(test.createReadStream_options) - .on('data', row => rowKeysRead[rowKeysRead.length - 1].push(row.id)) + .on('data', (row: any) => + rowKeysRead[rowKeysRead.length - 1].push(row.id), + ) .on('end', () => { endCalled = true; doAssertionChecks(); }) - .on('error', err => { + .on('error', (err: any) => { error = err as ServiceError; doAssertionChecks(); }); diff --git a/test-common/expected-otel-export-input.ts b/test-common/expected-otel-export-input.ts index 5a8ac0c82..7561dda38 100644 --- a/test-common/expected-otel-export-input.ts +++ b/test-common/expected-otel-export-input.ts @@ -895,7 +895,6 @@ export const expectedOtelExportInput = { 'telemetry.sdk.language': 'nodejs', 'telemetry.sdk.name': 'opentelemetry', 'telemetry.sdk.version': '1.30.1', - 'monitored_resource.project_id': 'my-project', }, asyncAttributesPending: false, _syncAttributes: { @@ -903,7 +902,6 @@ export const expectedOtelExportInput = { 'telemetry.sdk.language': 'nodejs', 'telemetry.sdk.name': 'opentelemetry', 'telemetry.sdk.version': '1.30.1', - 'monitored_resource.project_id': 'my-project', }, _asyncAttributesPromise: {}, }, diff --git a/test-common/metrics-handler-fixture.ts b/test-common/metrics-handler-fixture.ts index 69fce0287..39e89cfcd 100644 --- a/test-common/metrics-handler-fixture.ts +++ b/test-common/metrics-handler-fixture.ts @@ -26,7 +26,6 @@ export const expectedRequestsHandled = [ cluster: 'fake-cluster3', zone: 'us-west1-c', method: 'Bigtable.ReadRows', - client_uid: 'fake-uuid', }, projectId: 'my-project', }, @@ -43,7 +42,6 @@ export const expectedRequestsHandled = [ cluster: 'fake-cluster3', zone: 'us-west1-c', method: 'Bigtable.ReadRows', - client_uid: 'fake-uuid', }, projectId: 'my-project', }, @@ -56,7 +54,6 @@ export const expectedRequestsHandled = [ cluster: 'fake-cluster3', zone: 'us-west1-c', method: 'Bigtable.ReadRows', - client_uid: 'fake-uuid', }, client_name: 'nodejs-bigtable', projectId: 'my-project', diff --git a/test-common/test-metrics-handler.ts b/test-common/test-metrics-handler.ts index 61257913f..0ace7b271 100644 --- a/test-common/test-metrics-handler.ts +++ b/test-common/test-metrics-handler.ts @@ -23,21 +23,20 @@ import { * It logs the metrics and attributes received by the onOperationComplete and onAttemptComplete methods. */ export class TestMetricsHandler implements IMetricsHandler { - private messages: {value: string}; + messages = {value: ''}; + projectId = 'projectId'; requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = []; - constructor(messages: {value: string}) { - this.messages = messages; - } /** * Logs the metrics and attributes received for an operation completion. * @param {OnOperationCompleteData} data Metrics related to the completed operation. */ onOperationComplete(data: OnOperationCompleteData) { - this.requestsHandled.push(data); - data.client_name = 'nodejs-bigtable'; + const dataWithProject = Object.assign({projectId: this.projectId}, data); + dataWithProject.client_name = 'nodejs-bigtable'; + this.requestsHandled.push(dataWithProject); this.messages.value += 'Recording parameters for onOperationComplete:\n'; - this.messages.value += `${JSON.stringify(data)}\n`; + this.messages.value += `${JSON.stringify(dataWithProject)}\n`; } /** @@ -45,9 +44,10 @@ export class TestMetricsHandler implements IMetricsHandler { * @param {OnOperationCompleteData} data Metrics related to the completed attempt. */ onAttemptComplete(data: OnAttemptCompleteData) { - this.requestsHandled.push(data); - data.client_name = 'nodejs-bigtable'; + const dataWithProject = Object.assign({projectId: this.projectId}, data); + dataWithProject.client_name = 'nodejs-bigtable'; + this.requestsHandled.push(dataWithProject); this.messages.value += 'Recording parameters for onAttemptComplete:\n'; - this.messages.value += `${JSON.stringify(data)}\n`; + this.messages.value += `${JSON.stringify(dataWithProject)}\n`; } } diff --git a/test/metric-service-client-credentials.ts b/test/metric-service-client-credentials.ts new file mode 100644 index 000000000..edd8001eb --- /dev/null +++ b/test/metric-service-client-credentials.ts @@ -0,0 +1,85 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as proxyquire from 'proxyquire'; +import {ClientOptions, grpc} from 'google-gax'; +import * as assert from 'assert'; +import {setupBigtable} from '../system-test/client-side-metrics-setup-table'; +import {MetricServiceClient} from '@google-cloud/monitoring'; + +describe('Bigtable/MetricServiceClientCredentials', () => { + it('should pass the credentials to the exporter', done => { + const clientOptions = { + metricsEnabled: true, + sslCreds: grpc.credentials.createInsecure(), + }; + class FakeExporter { + constructor(options: ClientOptions) { + try { + assert.strictEqual(options, clientOptions); + done(); + } catch (e) { + done(e); + } + } + } + const FakeCGPMetricsHandler = proxyquire( + '../src/client-side-metrics/gcp-metrics-handler.js', + { + './exporter': { + CloudMonitoringExporter: FakeExporter, + }, + }, + ).GCPMetricsHandler; + const FakeBigtable = proxyquire('../src/index.js', { + './client-side-metrics/gcp-metrics-handler': { + GCPMetricsHandler: FakeCGPMetricsHandler, + }, + }).Bigtable; + new FakeBigtable(clientOptions); + }); + it('should use second project for the metric service client', async () => { + const SECOND_PROJECT_ID = 'second-project-id'; + const clientOptions = {metricsEnabled: true, projectId: SECOND_PROJECT_ID}; + let savedOptions: ClientOptions = {}; + class FakeExporter { + constructor(options: ClientOptions) { + savedOptions = options; + } + } + const FakeCGPMetricsHandler = proxyquire( + '../src/client-side-metrics/gcp-metrics-handler.js', + { + './exporter': { + CloudMonitoringExporter: FakeExporter, + }, + }, + ).GCPMetricsHandler; + const FakeBigtable = proxyquire('../src/index.js', { + './client-side-metrics/gcp-metrics-handler': { + GCPMetricsHandler: FakeCGPMetricsHandler, + }, + }).Bigtable; + new FakeBigtable(clientOptions); + // savedOptions are the options passed down to the exporter + // we want to ensure that when the second project id is provided to the + // fake client that this sends savedOptions to the exporter that then + // fetches the right projectId when the saved options are provided to the + // MetricsServiceClient as this is required to save the metrics to the right + // project. + const client = new MetricServiceClient(savedOptions); + const projectIdUsed = await client.getProjectId(); + assert.strictEqual(projectIdUsed, SECOND_PROJECT_ID); + }); +}); diff --git a/test/metrics-collector/gcp-metrics-handler.ts b/test/metrics-collector/gcp-metrics-handler.ts index 655bdd78d..c3dddb086 100644 --- a/test/metrics-collector/gcp-metrics-handler.ts +++ b/test/metrics-collector/gcp-metrics-handler.ts @@ -18,7 +18,6 @@ import { ExportResult, metricsToRequest, } from '../../src/client-side-metrics/exporter'; -import {GCPMetricsHandler} from '../../src/client-side-metrics/gcp-metrics-handler'; import {MetricExporter} from '@google-cloud/opentelemetry-cloud-monitoring-exporter'; import {expectedRequestsHandled} from '../../test-common/metrics-handler-fixture'; import { @@ -31,6 +30,37 @@ import { } from '../../test-common/expected-otel-export-input'; import * as assert from 'assert'; import {replaceTimestamps} from '../../test-common/replace-timestamps'; +import * as proxyquire from 'proxyquire'; + +/** + * Cleans a ResourceMetrics object by replacing client UUIDs with a placeholder. + * + * This function creates a deep copy of the input ResourceMetrics object and + * then iterates through its metrics, replacing any existing client_uid attribute + * in the data points with the string 'fake-uuid'. This is primarily used in + * testing to ensure consistent metric output by removing the variability of + * randomly generated client UUIDs. + * + * @param {ResourceMetrics} metrics The ResourceMetrics object to clean. + * @returns {ResourceMetrics} A new ResourceMetrics object with client UUIDs replaced by 'fake-uuid'. + */ +function cleanMetrics(metrics: ResourceMetrics): ResourceMetrics { + const newMetrics = JSON.parse(JSON.stringify(metrics)); // Deep copy to avoid modifying the original object + + newMetrics.scopeMetrics.forEach((scopeMetric: any) => { + scopeMetric.metrics.forEach((metric: any) => { + if (metric.dataPoints) { + metric.dataPoints.forEach((dataPoint: any) => { + if (dataPoint.attributes && dataPoint.attributes.client_uid) { + dataPoint.attributes.client_uid = 'fake-uuid'; + } + }); + } + }); + }); + + return newMetrics; +} describe('Bigtable/GCPMetricsHandler', () => { it('Should export a value ready for sending to the CloudMonitoringExporter', function (done) { @@ -50,6 +80,10 @@ describe('Bigtable/GCPMetricsHandler', () => { let exported = false; class TestExporter extends MetricExporter { + constructor() { + super(); + } + export( metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void, @@ -57,6 +91,7 @@ describe('Bigtable/GCPMetricsHandler', () => { if (!exported) { exported = true; try { + metrics = cleanMetrics(metrics); replaceTimestamps( metrics as unknown as typeof expectedOtelExportInput, [123, 789], @@ -84,7 +119,10 @@ describe('Bigtable/GCPMetricsHandler', () => { JSON.parse(JSON.stringify(metrics)), expectedOtelExportInput, ); - const convertedRequest = metricsToRequest(parsedExportInput); + const convertedRequest = metricsToRequest( + 'my-project', + parsedExportInput, + ); assert.deepStrictEqual( convertedRequest.timeSeries.length, expectedOtelExportConvertedValue.timeSeries.length, @@ -113,10 +151,17 @@ describe('Bigtable/GCPMetricsHandler', () => { } } } + const stubs = { + './exporter': { + CloudMonitoringExporter: TestExporter, + }, + }; + const FakeMetricsHandler = proxyquire( + '../../src/client-side-metrics/gcp-metrics-handler.js', + stubs, + ).GCPMetricsHandler; - const handler = new GCPMetricsHandler( - new TestExporter({projectId: 'some-project'}), - ); + const handler = new FakeMetricsHandler('my-project'); for (const request of expectedRequestsHandled) { if (request.attemptLatency) { diff --git a/test/metrics-collector/metrics-collector.ts b/test/metrics-collector/metrics-collector.ts index c712ecb0b..626c254c0 100644 --- a/test/metrics-collector/metrics-collector.ts +++ b/test/metrics-collector/metrics-collector.ts @@ -22,21 +22,27 @@ import { } from '../../src/client-side-metrics/client-side-metrics-attributes'; import {grpc} from 'google-gax'; import {expectedRequestsHandled} from '../../test-common/metrics-handler-fixture'; +import * as path from 'path'; // Import the 'path' module import * as gax from 'google-gax'; import * as proxyquire from 'proxyquire'; -const root = gax.protobuf.loadSync( - './protos/google/bigtable/v2/response_params.proto', +import {GCPMetricsHandler} from '../../src/client-side-metrics/gcp-metrics-handler'; +import {ResourceMetrics} from '@opentelemetry/sdk-metrics'; +const protoPath = path.join( + __dirname, + '../../protos/google/bigtable/v2/response_params.proto', ); +const root = gax.protobuf.loadSync(protoPath); const ResponseParams = root.lookupType('ResponseParams'); +const projectId = 'my-project'; + /** * A fake implementation of the Bigtable client for testing purposes. Provides a * metricsTracerFactory and a stubbed projectId method. */ class FakeBigtable { - clientUid = 'fake-uuid'; appProfileId?: string; - projectId = 'my-project'; + projectId = projectId; } /** @@ -49,6 +55,11 @@ class FakeInstance { id = 'fakeInstanceId'; } +const logger = {value: ''}; +const testHandler = new TestMetricsHandler(); +testHandler.projectId = projectId; +testHandler.messages = logger; + describe('Bigtable/MetricsCollector', () => { class FakeHRTime { startTime = BigInt(0); @@ -63,17 +74,16 @@ describe('Bigtable/MetricsCollector', () => { 'node:process': { hrtime: new FakeHRTime(), }, + './gcp-metrics-handler': { + GCPMetricsHandler: testHandler, + }, }; const FakeOperationsMetricsCollector = proxyquire( '../../src/client-side-metrics/operation-metrics-collector.js', stubs, ).OperationMetricsCollector; - const logger = {value: ''}; - it('should record the right metrics with a typical method call', async () => { - const testHandler = new TestMetricsHandler(logger); - const metricsHandlers = [testHandler]; class FakeTable { id = 'fakeTableId'; instance = new FakeInstance(); @@ -107,9 +117,9 @@ describe('Bigtable/MetricsCollector', () => { }; const metricsCollector = new FakeOperationsMetricsCollector( this, - metricsHandlers, MethodName.READ_ROWS, StreamingState.STREAMING, + [testHandler as unknown as GCPMetricsHandler], ); // In this method we simulate a series of events that might happen // when a user calls one of the Table methods. @@ -133,10 +143,7 @@ describe('Bigtable/MetricsCollector', () => { logger.value += '9. User receives second row.\n'; metricsCollector.onRowReachesUser(); logger.value += '10. A transient error occurs.\n'; - metricsCollector.onAttemptComplete( - this.bigtable.projectId, - grpc.status.DEADLINE_EXCEEDED, - ); + metricsCollector.onAttemptComplete(grpc.status.DEADLINE_EXCEEDED); logger.value += '11. After a timeout, the second attempt is made.\n'; metricsCollector.onAttemptStart(); logger.value += '12. Client receives status information.\n'; @@ -155,14 +162,7 @@ describe('Bigtable/MetricsCollector', () => { metricsCollector.onRowReachesUser(); logger.value += '19. User reads row 1\n'; logger.value += '20. Stream ends, operation completes\n'; - metricsCollector.onAttemptComplete( - this.bigtable.projectId, - grpc.status.OK, - ); - metricsCollector.onOperationComplete( - this.bigtable.projectId, - grpc.status.OK, - ); + metricsCollector.onOperationComplete(grpc.status.OK); } } } diff --git a/test/metrics-collector/metricsToRequest.ts b/test/metrics-collector/metricsToRequest.ts index cd71a36fd..fcba50ab2 100644 --- a/test/metrics-collector/metricsToRequest.ts +++ b/test/metrics-collector/metricsToRequest.ts @@ -23,7 +23,9 @@ import {ResourceMetrics} from '@opentelemetry/sdk-metrics'; describe('Bigtable/metricsToRequest', () => { it('Converts an otel request to a request ready for the metric service client', () => { + const projectId = 'my-project'; const convertedValue = metricsToRequest( + projectId, expectedOtelExportInput as unknown as ResourceMetrics, ); assert.deepStrictEqual( diff --git a/test/metrics-collector/typical-method-call.txt b/test/metrics-collector/typical-method-call.txt index 917dcd0c6..17a134445 100644 --- a/test/metrics-collector/typical-method-call.txt +++ b/test/metrics-collector/typical-method-call.txt @@ -15,7 +15,7 @@ getDate call returns 5000 ms 10. A transient error occurs. getDate call returns 6000 ms Recording parameters for onAttemptComplete: -{"attemptLatency":4000,"serverLatency":101,"connectivityErrorCount":0,"streaming":"true","status":"4","client_name":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows","client_uid":"fake-uuid"},"projectId":"my-project"} +{"projectId":"my-project","attemptLatency":4000,"serverLatency":101,"connectivityErrorCount":0,"streaming":"true","status":"4","client_name":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"}} 11. After a timeout, the second attempt is made. getDate call returns 7000 ms 12. Client receives status information. @@ -31,7 +31,7 @@ getDate call returns 9000 ms 20. Stream ends, operation completes getDate call returns 10000 ms Recording parameters for onAttemptComplete: -{"attemptLatency":3000,"serverLatency":103,"connectivityErrorCount":0,"streaming":"true","status":"0","client_name":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows","client_uid":"fake-uuid"},"projectId":"my-project"} +{"projectId":"my-project","attemptLatency":3000,"serverLatency":103,"connectivityErrorCount":0,"streaming":"true","status":"0","client_name":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"}} getDate call returns 11000 ms Recording parameters for onOperationComplete: -{"status":"0","streaming":"true","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows","client_uid":"fake-uuid"},"client_name":"nodejs-bigtable","projectId":"my-project","operationLatency":10000,"retryCount":1,"firstResponseLatency":2000,"applicationLatencies":[1000,1000]} +{"projectId":"my-project","status":"0","streaming":"true","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"},"client_name":"nodejs-bigtable","operationLatency":10000,"retryCount":1,"firstResponseLatency":2000,"applicationLatencies":[1000,1000]} diff --git a/test/row.ts b/test/row.ts index 35bd985bc..9e4833239 100644 --- a/test/row.ts +++ b/test/row.ts @@ -19,9 +19,19 @@ import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; import {Mutation} from '../src/mutation.js'; import * as rw from '../src/row'; -import {Table, Entry} from '../src/table.js'; +import { + Table, + Entry, + GetRowsOptions, + GetRowsCallback, + GetRowsResponse, +} from '../src/table.js'; import {Chunk} from '../src/chunktransformer.js'; -import {CallOptions} from 'google-gax'; +import {CallOptions, ServiceError} from 'google-gax'; +import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics-config-manager'; +import {Bigtable} from '../src/'; +import {getRowsInternal} from '../src/utils/getRowsInternal'; +import {TabularApiSurface} from '../src/tabular-api-surface'; const sandbox = sinon.createSandbox(); @@ -78,19 +88,36 @@ describe('Bigtable/Row', () => { let RowError: typeof rw.RowError; let row: rw.Row; - before(() => { + function getFakeRow( + getRowsInternal: ( + table: TabularApiSurface, + singleRow: boolean, + optionsOrCallback?: GetRowsOptions | GetRowsCallback, + cb?: GetRowsCallback, + ) => void | Promise, + ) { const Fake = proxyquire('../src/row.js', { '@google-cloud/promisify': fakePromisify, './mutation.js': {Mutation: FakeMutation}, './filter.js': {Filter: FakeFilter}, './row-data-utils.js': {RowDataUtils: FakeRowDataUtil}, + './utils/getRowsInternal': { + getRowsInternal, + }, }); - Row = Fake.Row; RowError = Fake.RowError; + return Fake; + } + + before(() => { + const Fake = getFakeRow(() => {}); + Row = Fake.Row; }); beforeEach(() => { row = new Row(TABLE, ROW_ID); + row.table.bigtable._metricsConfigManager = + new ClientSideMetricsConfigManager([]); }); afterEach(() => { @@ -997,15 +1024,48 @@ describe('Bigtable/Row', () => { }); describe('get', () => { + function getRowInstance( + fn: (reqOpts: any) => void | Promise, + ) { + const getRowsInternal = ( + table: TabularApiSurface, + singleRow: boolean, + optionsOrCallback?: GetRowsOptions | GetRowsCallback, + cb?: GetRowsCallback, + ) => { + return fn(optionsOrCallback); + }; + const Fake = getFakeRow(getRowsInternal); + Row = Fake.Row; + row = new Row(TABLE, ROW_ID); + return row; + } + + function getRowInstanceForErrResp(err: ServiceError | null, resp?: any[]) { + const getRowsInternal = ( + table: TabularApiSurface, + singleRow: boolean, + optionsOrCallback?: GetRowsOptions | GetRowsCallback, + cb?: GetRowsCallback, + ) => { + if (cb) { + cb(err, resp); + } + }; + const Fake = getFakeRow(getRowsInternal); + Row = Fake.Row; + row = new Row(TABLE, ROW_ID); + return row; + } it('should provide the proper request options', done => { // eslint-disable-next-line @typescript-eslint/no-explicit-any - (row.table.getRows as Function) = (reqOpts: any) => { + const fn = (reqOpts: any) => { assert.strictEqual(reqOpts.keys[0], ROW_ID); assert.strictEqual(reqOpts.filter, undefined); assert.strictEqual(FakeMutation.parseColumnName.callCount, 0); done(); }; - + const row = getRowInstance(fn); row.get(assert.ifError); }); @@ -1022,12 +1082,13 @@ describe('Bigtable/Row', () => { ]; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (row.table.getRows as Function) = (reqOpts: any) => { + const fn = (reqOpts: any) => { assert.deepStrictEqual(reqOpts.filter, expectedFilter); assert.strictEqual(FakeMutation.parseColumnName.callCount, 1); assert(FakeMutation.parseColumnName.calledWith(keys[0])); done(); }; + const row = getRowInstance(fn); row.get(keys, assert.ifError); }); @@ -1058,7 +1119,7 @@ describe('Bigtable/Row', () => { ]; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (row.table.getRows as Function) = (reqOpts: any) => { + const fn = (reqOpts: any) => { assert.deepStrictEqual(reqOpts.filter, expectedFilter); const spy = FakeMutation.parseColumnName; @@ -1068,6 +1129,7 @@ describe('Bigtable/Row', () => { assert.strictEqual(spy.getCall(1).args[0], keys[1]); done(); }; + const row = getRowInstance(fn); row.get(keys, assert.ifError); }); @@ -1082,12 +1144,13 @@ describe('Bigtable/Row', () => { ]; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (row.table.getRows as Function) = (reqOpts: any) => { + const fn = (reqOpts: any) => { assert.deepStrictEqual(reqOpts.filter, expectedFilter); assert.strictEqual(FakeMutation.parseColumnName.callCount, 1); assert(FakeMutation.parseColumnName.calledWith(keys[0])); done(); }; + const row = getRowInstance(fn); row.get(keys, assert.ifError); }); @@ -1121,13 +1184,14 @@ describe('Bigtable/Row', () => { ]; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (row.table.getRows as Function) = (reqOpts: any) => { + const fn = (reqOpts: any) => { assert.deepStrictEqual(reqOpts.filter, expectedFilter); assert.strictEqual(FakeMutation.parseColumnName.callCount, 1); assert(FakeMutation.parseColumnName.calledWith(keys[0])); assert.strictEqual(reqOpts.decode, options.decode); done(); }; + const row = getRowInstance(fn); row.get(keys, options, assert.ifError); }); @@ -1175,13 +1239,14 @@ describe('Bigtable/Row', () => { ]; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (row.table.getRows as Function) = (reqOpts: any) => { + const fn = (reqOpts: any) => { assert.deepStrictEqual(reqOpts.filter, expectedFilter); assert.strictEqual(FakeMutation.parseColumnName.callCount, 2); assert(FakeMutation.parseColumnName.calledWith(keys[0])); assert.strictEqual(reqOpts.decode, options.decode); done(); }; + const row = getRowInstance(fn); row.get(keys, options, assert.ifError); }); @@ -1196,10 +1261,11 @@ describe('Bigtable/Row', () => { const expectedFilter = options.filter; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (row.table.getRows as Function) = (reqOpts: any) => { + const fn = (reqOpts: any) => { assert.deepStrictEqual(reqOpts.filter, expectedFilter); done(); }; + const row = getRowInstance(fn); row.get(keys, options, assert.ifError); }); @@ -1210,18 +1276,19 @@ describe('Bigtable/Row', () => { }; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (row.table.getRows as Function) = (reqOpts: any) => { + const fn = (reqOpts: any) => { assert.strictEqual(reqOpts.decode, options.decode); assert(!reqOpts.filter); done(); }; + const row = getRowInstance(fn); row.get(options, assert.ifError); }); it('should return an error to the callback', done => { const error = new Error('err'); - sandbox.stub(row.table, 'getRows').callsArgWith(1, error); + const row = getRowInstanceForErrResp(error as ServiceError); row.get((err, row) => { assert.strictEqual(error, err); assert.strictEqual(row, undefined); @@ -1230,7 +1297,7 @@ describe('Bigtable/Row', () => { }); it('should return a custom error if the row is not found', done => { - sandbox.stub(row.table, 'getRows').callsArgWith(1, null, []); + const row = getRowInstanceForErrResp(null, []); row.get((err, row_) => { assert(err instanceof RowError); assert.strictEqual(err!.message, 'Unknown row: ' + row.id + '.'); @@ -1245,7 +1312,7 @@ describe('Bigtable/Row', () => { a: 'a', b: 'b', }; - sandbox.stub(row.table, 'getRows').callsArgWith(1, null, [fakeRow]); + const row = getRowInstanceForErrResp(null, [fakeRow]); row.get((err, row_) => { assert.ifError(err); assert.strictEqual(row_, row); @@ -1263,11 +1330,11 @@ describe('Bigtable/Row', () => { }; const keys = ['a', 'b']; + const row = getRowInstanceForErrResp(null, [fakeRow]); row.data = { c: 'c', }; - sandbox.stub(row.table, 'getRows').callsArgWith(1, null, [fakeRow]); row.get(keys, (err, data) => { assert.ifError(err); assert.deepStrictEqual(Object.keys(data), keys); diff --git a/test/table.ts b/test/table.ts index a1a28282a..913e77c12 100644 --- a/test/table.ts +++ b/test/table.ts @@ -30,6 +30,11 @@ import * as tblTypes from '../src/table'; import {Bigtable, RequestOptions} from '../src'; import {EventEmitter} from 'events'; import {TableUtils} from '../src/utils/table'; +import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics-config-manager'; +import {OperationMetricsCollector} from '../src/client-side-metrics/operation-metrics-collector'; +import {SinonSpy} from 'sinon'; +import {TabularApiSurface} from '../src/tabular-api-surface'; +import {GetRowsOptions} from '../src/table'; const sandbox = sinon.createSandbox(); const noop = () => {}; @@ -59,6 +64,24 @@ function createFake(klass: any) { }; } +class FakeMetricsCollector { + onOperationStart() {} + onOperationComplete() {} + onResponse() {} + onAttemptStart() {} + onAttemptComplete() {} + onMetadataReceived() {} + handleStatusAndMetadata() {} + onStatusMetadataReceived() {} + onRowReachesUser() {} +} + +class FakeMetricsConfigManager extends ClientSideMetricsConfigManager { + createOperation() { + return new FakeMetricsCollector() as unknown as OperationMetricsCollector; + } +} + const FakeFamily = createFake(Family); FakeFamily.formatRule_ = sinon.spy(rule => rule); @@ -100,6 +123,43 @@ const FakeFilter = { }, }; +function getTableMock( + createReadStreamInternal: ( + table: TabularApiSurface, + singleRow: boolean, + opts?: GetRowsOptions, + ) => PassThrough, +) { + const FakeGetRows = proxyquire('../src/utils/getRowsInternal.js', { + './createReadStreamInternal': { + createReadStreamInternal: createReadStreamInternal, + }, + }); + const FakeTabularApiSurface = proxyquire('../src/tabular-api-surface.js', { + '@google-cloud/promisify': fakePromisify, + './family.js': {Family: FakeFamily}, + './mutation.js': {Mutation: FakeMutation}, + './filter.js': {Filter: FakeFilter}, + pumpify, + './row.js': {Row: FakeRow}, + './chunktransformer.js': {ChunkTransformer: FakeChunkTransformer}, + './utils/createReadStreamInternal': { + createReadStreamInternal, + }, + './utils/getRowsInternal': { + getRowsInternal: FakeGetRows.getRowsInternal, + }, + }).TabularApiSurface; + const Table = proxyquire('../src/table.js', { + '@google-cloud/promisify': fakePromisify, + './family.js': {Family: FakeFamily}, + './mutation.js': {Mutation: FakeMutation}, + './row.js': {Row: FakeRow}, + './tabular-api-surface': {TabularApiSurface: FakeTabularApiSurface}, + }).Table; + return Table; +} + describe('Bigtable/Table', () => { const TABLE_ID = 'my-table'; let INSTANCE: inst.Instance; @@ -110,27 +170,26 @@ describe('Bigtable/Table', () => { let table: any; before(() => { - const FakeTabularApiSurface = proxyquire('../src/tabular-api-surface.js', { - '@google-cloud/promisify': fakePromisify, - './family.js': {Family: FakeFamily}, - './mutation.js': {Mutation: FakeMutation}, - './filter.js': {Filter: FakeFilter}, - pumpify, - './row.js': {Row: FakeRow}, - './chunktransformer.js': {ChunkTransformer: FakeChunkTransformer}, - }).TabularApiSurface; - Table = proxyquire('../src/table.js', { - '@google-cloud/promisify': fakePromisify, - './family.js': {Family: FakeFamily}, - './mutation.js': {Mutation: FakeMutation}, - './row.js': {Row: FakeRow}, - './tabular-api-surface': {TabularApiSurface: FakeTabularApiSurface}, - }).Table; + const FakeCreateReadStreamInternal = proxyquire( + '../src/utils/createReadStreamInternal.js', + { + '../row.js': {Row: FakeRow}, + '../chunktransformer.js': {ChunkTransformer: FakeChunkTransformer}, + '../filter.js': {Filter: FakeFilter}, + '../mutation.js': {Mutation: FakeMutation}, + pumpify, + }, + ).createReadStreamInternal; + Table = getTableMock(FakeCreateReadStreamInternal); }); beforeEach(() => { INSTANCE = { - bigtable: {} as Bigtable, + bigtable: { + _metricsConfigManager: new FakeMetricsConfigManager( + [], + ) as ClientSideMetricsConfigManager, + } as Bigtable, name: 'a/b/c/d', } as inst.Instance; TABLE_NAME = INSTANCE.name + '/tables/' + TABLE_ID; @@ -2301,13 +2360,14 @@ describe('Bigtable/Table', () => { describe('getRows', () => { describe('success', () => { + let createReadStreamInternal: SinonSpy<[], PassThrough>; const fakeRows = [ {key: 'c', data: {}}, {key: 'd', data: {}}, ]; beforeEach(() => { - table.createReadStream = sinon.spy(() => { + createReadStreamInternal = sinon.spy(() => { const stream = new PassThrough({ objectMode: true, }); @@ -2322,6 +2382,17 @@ describe('Bigtable/Table', () => { return stream; }); + Table = getTableMock(createReadStreamInternal); + INSTANCE = { + bigtable: { + _metricsConfigManager: new FakeMetricsConfigManager( + [], + ) as ClientSideMetricsConfigManager, + } as Bigtable, + name: 'a/b/c/d', + } as inst.Instance; + TABLE_NAME = INSTANCE.name + '/tables/' + TABLE_ID; + table = new Table(INSTANCE, TABLE_ID); }); it('should return the rows to the callback', done => { @@ -2332,8 +2403,8 @@ describe('Bigtable/Table', () => { assert.deepStrictEqual(rows, fakeRows); // eslint-disable-next-line @typescript-eslint/no-explicit-any - const spy = (table as any).createReadStream.getCall(0); - assert.strictEqual(spy.args[0], options); + const spy = createReadStreamInternal.getCall(0); + assert.strictEqual((spy.args as any)[2], options); done(); }); }); @@ -2348,10 +2419,11 @@ describe('Bigtable/Table', () => { }); describe('error', () => { + let createReadStreamInternal: SinonSpy<[], PassThrough>; const error = new Error('err'); beforeEach(() => { - table.createReadStream = sinon.spy(() => { + createReadStreamInternal = sinon.spy(() => { const stream = new PassThrough({ objectMode: true, }); @@ -2362,6 +2434,17 @@ describe('Bigtable/Table', () => { return stream; }); + Table = getTableMock(createReadStreamInternal); + INSTANCE = { + bigtable: { + _metricsConfigManager: new FakeMetricsConfigManager( + [], + ) as ClientSideMetricsConfigManager, + } as Bigtable, + name: 'a/b/c/d', + } as inst.Instance; + TABLE_NAME = INSTANCE.name + '/tables/' + TABLE_ID; + table = new Table(INSTANCE, TABLE_ID); }); it('should return the error to the callback', done => {