diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index dcd7f7d4..67215b8e 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -1,5 +1,6 @@ import thrift from 'thrift'; import Int64 from 'node-int64'; +import os from 'os'; import { EventEmitter } from 'events'; import { HeadersInit } from 'node-fetch'; @@ -24,6 +25,14 @@ import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger'; import DBSQLLogger from './DBSQLLogger'; import CloseableCollection from './utils/CloseableCollection'; import IConnectionProvider from './connection/contracts/IConnectionProvider'; +import FeatureFlagCache from './telemetry/FeatureFlagCache'; +import TelemetryClientProvider from './telemetry/TelemetryClientProvider'; +import TelemetryEventEmitter from './telemetry/TelemetryEventEmitter'; +import MetricsAggregator from './telemetry/MetricsAggregator'; +import DatabricksTelemetryExporter from './telemetry/DatabricksTelemetryExporter'; +import { CircuitBreakerRegistry } from './telemetry/CircuitBreaker'; +import { DriverConfiguration, DRIVER_NAME } from './telemetry/types'; +import driverVersion from './version'; function prependSlash(str: string): string { if (str.length > 0 && str.charAt(0) !== '/') { @@ -68,6 +77,19 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I private readonly sessions = new CloseableCollection(); + // Telemetry components (instance-based, NOT singletons) + private host?: string; + + private featureFlagCache?: FeatureFlagCache; + + private telemetryClientProvider?: TelemetryClientProvider; + + private telemetryEmitter?: TelemetryEventEmitter; + + private telemetryAggregator?: MetricsAggregator; + + private circuitBreakerRegistry?: CircuitBreakerRegistry; + private static getDefaultLogger(): IDBSQLLogger { if (!this.defaultLogger) { this.defaultLogger = new DBSQLLogger(); @@ -94,6 +116,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I cloudFetchSpeedThresholdMBps: 0.1, useLZ4Compression: true, + + // Telemetry defaults + telemetryEnabled: false, // Initially disabled for safe rollout + telemetryBatchSize: 100, + telemetryFlushIntervalMs: 5000, + telemetryMaxRetries: 3, + telemetryAuthenticatedExport: true, + telemetryCircuitBreakerThreshold: 5, + telemetryCircuitBreakerTimeout: 60000, // 1 minute }; } @@ -152,6 +183,178 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I return new HttpConnection(this.getConnectionOptions(options), this); } + /** + * Extract workspace ID from hostname. + * @param host - The host string (e.g., "workspace-id.cloud.databricks.com") + * @returns Workspace ID or host if extraction fails + */ + private extractWorkspaceId(host: string): string { + // Extract workspace ID from hostname (first segment before first dot) + const parts = host.split('.'); + return parts.length > 0 ? parts[0] : host; + } + + /** + * Build driver configuration for telemetry reporting. + * @returns DriverConfiguration object with current driver settings + */ + private buildDriverConfiguration(): DriverConfiguration { + return { + driverVersion, + driverName: DRIVER_NAME, + nodeVersion: process.version, + platform: process.platform, + osVersion: os.release(), + osArch: os.arch(), + runtimeVendor: 'Node.js Foundation', + localeName: this.getLocaleName(), + charSetEncoding: 'UTF-8', + processName: this.getProcessName(), + + // Feature flags + cloudFetchEnabled: this.config.useCloudFetch ?? false, + lz4Enabled: this.config.useLZ4Compression ?? false, + arrowEnabled: this.config.arrowEnabled ?? false, + directResultsEnabled: true, // Direct results always enabled + + // Configuration values + socketTimeout: this.config.socketTimeout ?? 0, + retryMaxAttempts: this.config.retryMaxAttempts ?? 0, + cloudFetchConcurrentDownloads: this.config.cloudFetchConcurrentDownloads ?? 0, + }; + } + + /** + * Get locale name in format language_country (e.g., en_US). + * Matches JDBC format: user.language + '_' + user.country + */ + private getLocaleName(): string { + try { + // Try to get from environment variables + const lang = process.env.LANG || process.env.LC_ALL || process.env.LC_MESSAGES || ''; + if (lang) { + // LANG format is typically "en_US.UTF-8", extract "en_US" + const match = lang.match(/^([a-z]{2}_[A-Z]{2})/); + if (match) { + return match[1]; + } + } + // Fallback to en_US + return 'en_US'; + } catch { + return 'en_US'; + } + } + + /** + * Get process name, similar to JDBC's ProcessNameUtil. + * Returns the script name or process title. + */ + private getProcessName(): string { + try { + // Try process.title first (can be set by application) + if (process.title && process.title !== 'node') { + return process.title; + } + // Try to get the main script name from argv[1] + if (process.argv && process.argv.length > 1) { + const scriptPath = process.argv[1]; + // Extract filename without path + const filename = scriptPath.split('/').pop()?.split('\\').pop() || ''; + // Remove extension + const nameWithoutExt = filename.replace(/\.[^.]*$/, ''); + if (nameWithoutExt) { + return nameWithoutExt; + } + } + return 'node'; + } catch { + return 'node'; + } + } + + /** + * Initialize telemetry components if enabled. + * CRITICAL: All errors swallowed and logged at LogLevel.debug ONLY. + * Driver NEVER throws exceptions due to telemetry. + */ + private async initializeTelemetry(): Promise { + if (!this.host) { + return; + } + + try { + // Create feature flag cache instance + this.featureFlagCache = new FeatureFlagCache(this); + this.featureFlagCache.getOrCreateContext(this.host); + + // Check if telemetry enabled via feature flag + const enabled = await this.featureFlagCache.isTelemetryEnabled(this.host); + if (!enabled) { + this.logger.log(LogLevel.debug, 'Telemetry disabled via feature flag'); + return; + } + + // Create telemetry components (all instance-based) + this.telemetryClientProvider = new TelemetryClientProvider(this); + this.telemetryEmitter = new TelemetryEventEmitter(this); + + // Get or create telemetry client for this host (increments refCount) + this.telemetryClientProvider.getOrCreateClient(this.host); + + // Create circuit breaker registry and exporter + this.circuitBreakerRegistry = new CircuitBreakerRegistry(this); + const exporter = new DatabricksTelemetryExporter(this, this.host, this.circuitBreakerRegistry); + this.telemetryAggregator = new MetricsAggregator(this, exporter); + + // Wire up event listeners + this.telemetryEmitter.on('connection.open', (event) => { + try { + this.telemetryAggregator?.processEvent(event); + } catch (error: any) { + this.logger.log(LogLevel.debug, `Error processing connection.open event: ${error.message}`); + } + }); + + this.telemetryEmitter.on('statement.start', (event) => { + try { + this.telemetryAggregator?.processEvent(event); + } catch (error: any) { + this.logger.log(LogLevel.debug, `Error processing statement.start event: ${error.message}`); + } + }); + + this.telemetryEmitter.on('statement.complete', (event) => { + try { + this.telemetryAggregator?.processEvent(event); + } catch (error: any) { + this.logger.log(LogLevel.debug, `Error processing statement.complete event: ${error.message}`); + } + }); + + this.telemetryEmitter.on('cloudfetch.chunk', (event) => { + try { + this.telemetryAggregator?.processEvent(event); + } catch (error: any) { + this.logger.log(LogLevel.debug, `Error processing cloudfetch.chunk event: ${error.message}`); + } + }); + + this.telemetryEmitter.on('error', (event) => { + try { + this.telemetryAggregator?.processEvent(event); + } catch (error: any) { + this.logger.log(LogLevel.debug, `Error processing error event: ${error.message}`); + } + }); + + this.logger.log(LogLevel.debug, 'Telemetry initialized successfully'); + } catch (error: any) { + // Swallow all telemetry initialization errors + this.logger.log(LogLevel.debug, `Telemetry initialization failed: ${error.message}`); + } + } + /** * Connects DBSQLClient to endpoint * @public @@ -173,11 +376,25 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I } } + // Store host for telemetry + this.host = options.host; + // Store enableMetricViewMetadata configuration if (options.enableMetricViewMetadata !== undefined) { this.config.enableMetricViewMetadata = options.enableMetricViewMetadata; } + // Override telemetry config if provided in options + if (options.telemetryEnabled !== undefined) { + this.config.telemetryEnabled = options.telemetryEnabled; + } + if (options.telemetryBatchSize !== undefined) { + this.config.telemetryBatchSize = options.telemetryBatchSize; + } + if (options.telemetryAuthenticatedExport !== undefined) { + this.config.telemetryAuthenticatedExport = options.telemetryAuthenticatedExport; + } + this.authProvider = this.createAuthProvider(options, authProvider); this.connectionProvider = this.createConnectionProvider(options); @@ -211,6 +428,11 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I this.emit('timeout'); }); + // Initialize telemetry if enabled + if (this.config.telemetryEnabled) { + await this.initializeTelemetry(); + } + return this; } @@ -246,12 +468,52 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I serverProtocolVersion: response.serverProtocolVersion, }); this.sessions.add(session); + + // Emit connection.open telemetry event + if (this.telemetryEmitter && this.host) { + try { + const workspaceId = this.extractWorkspaceId(this.host); + const driverConfig = this.buildDriverConfiguration(); + this.telemetryEmitter.emitConnectionOpen({ + sessionId: session.id, + workspaceId, + driverConfig, + }); + } catch (error: any) { + // CRITICAL: All telemetry exceptions swallowed + this.logger.log(LogLevel.debug, `Error emitting connection.open event: ${error.message}`); + } + } + return session; } public async close(): Promise { await this.sessions.closeAll(); + // Cleanup telemetry + if (this.host) { + try { + // Step 1: Close aggregator (stops timer, completes statements, final flush) + if (this.telemetryAggregator) { + this.telemetryAggregator.close(); + } + + // Step 2: Release telemetry client (decrements ref count, closes if last) + if (this.telemetryClientProvider) { + await this.telemetryClientProvider.releaseClient(this.host); + } + + // Step 3: Release feature flag context (decrements ref count) + if (this.featureFlagCache) { + this.featureFlagCache.releaseContext(this.host); + } + } catch (error: any) { + // Swallow all telemetry cleanup errors + this.logger.log(LogLevel.debug, `Telemetry cleanup error: ${error.message}`); + } + } + this.client = undefined; this.connectionProvider = undefined; this.authProvider = undefined; diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index fe22995d..c53684e7 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -34,11 +34,13 @@ import { definedOrError } from './utils'; import { OperationChunksIterator, OperationRowsIterator } from './utils/OperationIterator'; import HiveDriverError from './errors/HiveDriverError'; import IClientContext from './contracts/IClientContext'; +import ExceptionClassifier from './telemetry/ExceptionClassifier'; interface DBSQLOperationConstructorOptions { handle: TOperationHandle; directResults?: TSparkDirectResults; context: IClientContext; + sessionId?: string; } async function delay(ms?: number): Promise { @@ -76,9 +78,17 @@ export default class DBSQLOperation implements IOperation { private resultHandler?: ResultSlicer; - constructor({ handle, directResults, context }: DBSQLOperationConstructorOptions) { + // Telemetry tracking fields + private startTime: number = Date.now(); + + private pollCount: number = 0; + + private sessionId?: string; + + constructor({ handle, directResults, context, sessionId }: DBSQLOperationConstructorOptions) { this.operationHandle = handle; this.context = context; + this.sessionId = sessionId; const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation); @@ -95,6 +105,9 @@ export default class DBSQLOperation implements IOperation { ); this.closeOperation = directResults?.closeOperation; this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`); + + // Emit statement.start telemetry event + this.emitStatementStart(); } public iterateChunks(options?: IteratorOptions): IOperationChunksIterator { @@ -225,6 +238,9 @@ export default class DBSQLOperation implements IOperation { return this.operationStatus; } + // Track poll count for telemetry + this.pollCount += 1; + const driver = await this.context.getDriver(); const response = await driver.getOperationStatus({ operationHandle: this.operationHandle, @@ -279,6 +295,9 @@ export default class DBSQLOperation implements IOperation { this.closed = true; const result = new Status(response.status); + // Emit statement.complete telemetry event + this.emitStatementComplete(); + this.onClose?.(); return result; } @@ -441,7 +460,7 @@ export default class DBSQLOperation implements IOperation { case TSparkRowSetType.URL_BASED_SET: resultSource = new ArrowResultConverter( this.context, - new CloudFetchResultHandler(this.context, this._data, metadata), + new CloudFetchResultHandler(this.context, this._data, metadata, this.id), metadata, ); break; @@ -481,4 +500,83 @@ export default class DBSQLOperation implements IOperation { return response; } + + /** + * Emit statement.start telemetry event. + * CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY. + */ + private emitStatementStart(): void { + try { + const {telemetryEmitter} = (this.context as any); + if (!telemetryEmitter) { + return; + } + + telemetryEmitter.emitStatementStart({ + statementId: this.id, + sessionId: this.sessionId || '', + operationType: this.operationHandle.operationType?.toString(), + }); + } catch (error: any) { + this.context.getLogger().log(LogLevel.debug, `Error emitting statement.start event: ${error.message}`); + } + } + + /** + * Emit statement.complete telemetry event and complete aggregation. + * CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY. + */ + private emitStatementComplete(): void { + try { + const {telemetryEmitter} = (this.context as any); + const {telemetryAggregator} = (this.context as any); + if (!telemetryEmitter || !telemetryAggregator) { + return; + } + + const latencyMs = Date.now() - this.startTime; + const resultFormat = this.metadata?.resultFormat + ? TSparkRowSetType[this.metadata.resultFormat] + : undefined; + + telemetryEmitter.emitStatementComplete({ + statementId: this.id, + sessionId: this.sessionId || '', + latencyMs, + resultFormat, + pollCount: this.pollCount, + }); + + // Complete statement aggregation + telemetryAggregator.completeStatement(this.id); + } catch (error: any) { + this.context.getLogger().log(LogLevel.debug, `Error emitting statement.complete event: ${error.message}`); + } + } + + /** + * Emit error telemetry event with terminal classification. + * CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY. + */ + private emitErrorEvent(error: Error): void { + try { + const {telemetryEmitter} = (this.context as any); + if (!telemetryEmitter) { + return; + } + + // Classify the exception + const isTerminal = ExceptionClassifier.isTerminal(error); + + telemetryEmitter.emitError({ + statementId: this.id, + sessionId: this.sessionId, + errorName: error.name || 'Error', + errorMessage: error.message || 'Unknown error', + isTerminal, + }); + } catch (emitError: any) { + this.context.getLogger().log(LogLevel.debug, `Error emitting error event: ${emitError.message}`); + } + } } diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 9b4245c3..f1f8c96c 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -605,6 +605,7 @@ export default class DBSQLSession implements IDBSQLSession { handle, directResults: response.directResults, context: this.context, + sessionId: this.id, }); this.operations.add(operation); diff --git a/lib/contracts/IDBSQLClient.ts b/lib/contracts/IDBSQLClient.ts index 26588031..c47fddad 100644 --- a/lib/contracts/IDBSQLClient.ts +++ b/lib/contracts/IDBSQLClient.ts @@ -34,6 +34,10 @@ export type ConnectionOptions = { socketTimeout?: number; proxy?: ProxyOptions; enableMetricViewMetadata?: boolean; + // Optional telemetry overrides + telemetryEnabled?: boolean; + telemetryBatchSize?: number; + telemetryAuthenticatedExport?: boolean; } & AuthOptions; export interface OpenSessionRequest { diff --git a/lib/result/CloudFetchResultHandler.ts b/lib/result/CloudFetchResultHandler.ts index 91878813..7fe4dd0d 100644 --- a/lib/result/CloudFetchResultHandler.ts +++ b/lib/result/CloudFetchResultHandler.ts @@ -14,18 +14,24 @@ export default class CloudFetchResultHandler implements IResultsProvider = []; private downloadTasks: Array> = []; + private chunkIndex: number = 0; + constructor( context: IClientContext, source: IResultsProvider, - { lz4Compressed }: TGetResultSetMetadataResp, + metadata: TGetResultSetMetadataResp, + statementId?: string, ) { this.context = context; this.source = source; - this.isLZ4Compressed = lz4Compressed ?? false; + this.isLZ4Compressed = metadata.lz4Compressed ?? false; + this.statementId = statementId; if (this.isLZ4Compressed && !LZ4()) { throw new HiveDriverError('Cannot handle LZ4 compressed result: module `lz4` not installed'); @@ -106,6 +112,10 @@ export default class CloudFetchResultHandler implements IResultsProvider { + describe('Initialization', () => { + it('should initialize telemetry when telemetryEnabled is true', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + // Spy on initialization components + const featureFlagCacheSpy = sinon.spy(FeatureFlagCache.prototype, 'getOrCreateContext'); + const telemetryProviderSpy = sinon.spy(TelemetryClientProvider.prototype, 'getOrCreateClient'); + + try { + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + // Verify telemetry components were initialized + expect(featureFlagCacheSpy.called).to.be.true; + + await client.close(); + } finally { + featureFlagCacheSpy.restore(); + telemetryProviderSpy.restore(); + } + }); + + it('should not initialize telemetry when telemetryEnabled is false', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + const featureFlagCacheSpy = sinon.spy(FeatureFlagCache.prototype, 'getOrCreateContext'); + + try { + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: false, + }); + + // Verify telemetry was not initialized + expect(featureFlagCacheSpy.called).to.be.false; + + await client.close(); + } finally { + featureFlagCacheSpy.restore(); + } + }); + + it('should respect feature flag when telemetry is enabled', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + // Stub feature flag to return false + const featureFlagStub = sinon.stub(FeatureFlagCache.prototype, 'isTelemetryEnabled').resolves(false); + + try { + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + // Verify feature flag was checked + expect(featureFlagStub.called).to.be.true; + + await client.close(); + } finally { + featureFlagStub.restore(); + } + }); + }); + + describe('Reference Counting', () => { + it('should share telemetry client across multiple connections to same host', async function () { + this.timeout(60000); + + const client1 = new DBSQLClient(); + const client2 = new DBSQLClient(); + + const getOrCreateClientSpy = sinon.spy(TelemetryClientProvider.prototype, 'getOrCreateClient'); + const releaseClientSpy = sinon.spy(TelemetryClientProvider.prototype, 'releaseClient'); + + try { + // Enable telemetry for both clients + await client1.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + await client2.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + // Both clients should get the same telemetry client for the host + expect(getOrCreateClientSpy.callCount).to.be.at.least(2); + + // Close first client + await client1.close(); + expect(releaseClientSpy.callCount).to.be.at.least(1); + + // Close second client + await client2.close(); + expect(releaseClientSpy.callCount).to.be.at.least(2); + } finally { + getOrCreateClientSpy.restore(); + releaseClientSpy.restore(); + } + }); + + it('should cleanup telemetry on close', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + const releaseClientSpy = sinon.spy(TelemetryClientProvider.prototype, 'releaseClient'); + const releaseContextSpy = sinon.spy(FeatureFlagCache.prototype, 'releaseContext'); + const flushSpy = sinon.spy(MetricsAggregator.prototype, 'flush'); + + try { + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + await client.close(); + + // Verify cleanup was called + expect(releaseClientSpy.called || flushSpy.called || releaseContextSpy.called).to.be.true; + } finally { + releaseClientSpy.restore(); + releaseContextSpy.restore(); + flushSpy.restore(); + } + }); + }); + + describe('Error Handling', () => { + it('should continue driver operation when telemetry initialization fails', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + // Stub feature flag to throw an error + const featureFlagStub = sinon.stub(FeatureFlagCache.prototype, 'isTelemetryEnabled').rejects(new Error('Feature flag fetch failed')); + + try { + // Connection should succeed even if telemetry fails + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + // Should be able to open a session + const session = await client.openSession({ + initialCatalog: config.catalog, + initialSchema: config.schema, + }); + + // Should be able to execute a query + const operation = await session.executeStatement('SELECT 1 AS test'); + const result = await operation.fetchAll(); + + expect(result).to.have.lengthOf(1); + expect(result[0]).to.deep.equal({ test: 1 }); + + await session.close(); + await client.close(); + } finally { + featureFlagStub.restore(); + } + }); + + it('should continue driver operation when feature flag fetch fails', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + // Stub getOrCreateContext to throw + const contextStub = sinon.stub(FeatureFlagCache.prototype, 'getOrCreateContext').throws(new Error('Context creation failed')); + + try { + // Connection should succeed even if telemetry fails + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + // Should be able to open a session + const session = await client.openSession({ + initialCatalog: config.catalog, + initialSchema: config.schema, + }); + + await session.close(); + await client.close(); + } finally { + contextStub.restore(); + } + }); + + it('should not throw exceptions due to telemetry errors', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + // Stub multiple telemetry methods to throw + const emitterStub = sinon.stub(TelemetryEventEmitter.prototype, 'emitConnectionOpen').throws(new Error('Emitter failed')); + const aggregatorStub = sinon.stub(MetricsAggregator.prototype, 'processEvent').throws(new Error('Aggregator failed')); + + try { + // Connection should not throw + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + // Driver operations should work normally + const session = await client.openSession({ + initialCatalog: config.catalog, + initialSchema: config.schema, + }); + + await session.close(); + await client.close(); + } finally { + emitterStub.restore(); + aggregatorStub.restore(); + } + }); + }); + + describe('Configuration', () => { + it('should read telemetry config from ClientConfig', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + const clientConfig = client.getConfig(); + + // Verify default telemetry config exists + expect(clientConfig).to.have.property('telemetryEnabled'); + expect(clientConfig).to.have.property('telemetryBatchSize'); + expect(clientConfig).to.have.property('telemetryFlushIntervalMs'); + expect(clientConfig).to.have.property('telemetryMaxRetries'); + expect(clientConfig).to.have.property('telemetryAuthenticatedExport'); + expect(clientConfig).to.have.property('telemetryCircuitBreakerThreshold'); + expect(clientConfig).to.have.property('telemetryCircuitBreakerTimeout'); + + // Verify default values + expect(clientConfig.telemetryEnabled).to.equal(false); // Initially disabled + expect(clientConfig.telemetryBatchSize).to.equal(100); + expect(clientConfig.telemetryFlushIntervalMs).to.equal(5000); + expect(clientConfig.telemetryMaxRetries).to.equal(3); + expect(clientConfig.telemetryAuthenticatedExport).to.equal(true); + expect(clientConfig.telemetryCircuitBreakerThreshold).to.equal(5); + expect(clientConfig.telemetryCircuitBreakerTimeout).to.equal(60000); + }); + + it('should allow override via ConnectionOptions', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + // Default should be false + expect(client.getConfig().telemetryEnabled).to.equal(false); + + try { + // Override to true + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + // Config should be updated + expect(client.getConfig().telemetryEnabled).to.equal(true); + + await client.close(); + } catch (error) { + // Clean up even if test fails + await client.close(); + throw error; + } + }); + }); + + describe('End-to-End Telemetry Flow', () => { + it('should emit events during driver operations when telemetry is enabled', async function () { + this.timeout(30000); + + const client = new DBSQLClient(); + + const emitSpy = sinon.spy(TelemetryEventEmitter.prototype, 'emit'); + + try { + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + telemetryEnabled: true, + }); + + const session = await client.openSession({ + initialCatalog: config.catalog, + initialSchema: config.schema, + }); + + const operation = await session.executeStatement('SELECT 1 AS test'); + await operation.fetchAll(); + + // Events may or may not be emitted depending on feature flag + // But the driver should work regardless + + await session.close(); + await client.close(); + } finally { + emitSpy.restore(); + } + }); + }); +});