diff --git a/node/coinstacks/arbitrum-nova/api/package.json b/node/coinstacks/arbitrum-nova/api/package.json index 0a40631b2..b6092dcc5 100644 --- a/node/coinstacks/arbitrum-nova/api/package.json +++ b/node/coinstacks/arbitrum-nova/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/arbitrum-nova/api/src/app.ts b/node/coinstacks/arbitrum-nova/api/src/app.ts index 07b238073..f30ece82f 100644 --- a/node/coinstacks/arbitrum-nova/api/src/app.ts +++ b/node/coinstacks/arbitrum-nova/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, BlockHandler, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses, NewBlock } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { gasOracle, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/arbitrum/api/package.json b/node/coinstacks/arbitrum/api/package.json index 8302673cb..ad205af5a 100644 --- a/node/coinstacks/arbitrum/api/package.json +++ b/node/coinstacks/arbitrum/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/arbitrum/api/src/app.ts b/node/coinstacks/arbitrum/api/src/app.ts index 4f0030da3..edbc23934 100644 --- a/node/coinstacks/arbitrum/api/src/app.ts +++ b/node/coinstacks/arbitrum/api/src/app.ts @@ -2,8 +2,9 @@ import express from 'express' import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' -import { evm, middleware, ConnectionHandler, Registry, TransactionHandler, Prometheus } from '@shapeshiftoss/common-api' +import { evm, middleware, ConnectionHandler, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { cache, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/avalanche/api/package.json b/node/coinstacks/avalanche/api/package.json index ba6db8a3b..82c181b92 100644 --- a/node/coinstacks/avalanche/api/package.json +++ b/node/coinstacks/avalanche/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/avalanche/api/src/app.ts b/node/coinstacks/avalanche/api/src/app.ts index 80f82a79e..0aec94ba5 100644 --- a/node/coinstacks/avalanche/api/src/app.ts +++ b/node/coinstacks/avalanche/api/src/app.ts @@ -2,8 +2,9 @@ import express from 'express' import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' -import { evm, middleware, ConnectionHandler, Registry, TransactionHandler, Prometheus } from '@shapeshiftoss/common-api' +import { evm, middleware, ConnectionHandler, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/base/api/package.json b/node/coinstacks/base/api/package.json index b25c948ac..46feccf77 100644 --- a/node/coinstacks/base/api/package.json +++ b/node/coinstacks/base/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/base/api/src/app.ts b/node/coinstacks/base/api/src/app.ts index b715bf96b..183130552 100644 --- a/node/coinstacks/base/api/src/app.ts +++ b/node/coinstacks/base/api/src/app.ts @@ -9,10 +9,10 @@ import { Registry, AddressFormatter, TransactionHandler, - Prometheus, } from '@shapeshiftoss/common-api' import { Tx as BlockbookTx, WebsocketClient, getAddresses } from '@shapeshiftoss/blockbook' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { BlockbookService } from '../../../common/api/src/evm/blockbookService' import { MoralisService } from '../../../common/api/src/evm/moralisService' import { gasOracle, service } from './controller' diff --git a/node/coinstacks/bitcoin/api/package.json b/node/coinstacks/bitcoin/api/package.json index 1e7bbe8cc..9be13f24c 100644 --- a/node/coinstacks/bitcoin/api/package.json +++ b/node/coinstacks/bitcoin/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/bitcoin/api/src/app.ts b/node/coinstacks/bitcoin/api/src/app.ts index 3a3fa086d..a20c48082 100644 --- a/node/coinstacks/bitcoin/api/src/app.ts +++ b/node/coinstacks/bitcoin/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service, formatAddress } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/bitcoincash/api/package.json b/node/coinstacks/bitcoincash/api/package.json index a42d36043..f6583ed8a 100644 --- a/node/coinstacks/bitcoincash/api/package.json +++ b/node/coinstacks/bitcoincash/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/bitcoincash/api/src/app.ts b/node/coinstacks/bitcoincash/api/src/app.ts index fa487436d..f20efbb03 100644 --- a/node/coinstacks/bitcoincash/api/src/app.ts +++ b/node/coinstacks/bitcoincash/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service, formatAddress } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/bnbsmartchain/api/package.json b/node/coinstacks/bnbsmartchain/api/package.json index 29d704bb4..68d948c0b 100644 --- a/node/coinstacks/bnbsmartchain/api/package.json +++ b/node/coinstacks/bnbsmartchain/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/bnbsmartchain/api/src/app.ts b/node/coinstacks/bnbsmartchain/api/src/app.ts index 84fccba1e..6cda14519 100644 --- a/node/coinstacks/bnbsmartchain/api/src/app.ts +++ b/node/coinstacks/bnbsmartchain/api/src/app.ts @@ -2,8 +2,9 @@ import express from 'express' import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' -import { evm, middleware, ConnectionHandler, Registry, TransactionHandler, Prometheus } from '@shapeshiftoss/common-api' +import { evm, middleware, ConnectionHandler, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/common/api/package.json b/node/coinstacks/common/api/package.json index 42391eb9c..16fae83e5 100644 --- a/node/coinstacks/common/api/package.json +++ b/node/coinstacks/common/api/package.json @@ -15,6 +15,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "@shapeshiftoss/websocket": "^10.0.0", "bignumber.js": "^9.1.2", "uuid": "^9.0.1", diff --git a/node/coinstacks/common/api/src/evm/moralisService.ts b/node/coinstacks/common/api/src/evm/moralisService.ts index 64e299bd2..94502a631 100644 --- a/node/coinstacks/common/api/src/evm/moralisService.ts +++ b/node/coinstacks/common/api/src/evm/moralisService.ts @@ -1,12 +1,13 @@ import { EvmChain } from '@moralisweb3/common-evm-utils' import { EvmStreamResult } from '@moralisweb3/common-streams-utils' import { Logger } from '@shapeshiftoss/logger' +import { AddressSubscriptionClient } from '@shapeshiftoss/websocket' import axios from 'axios' import BigNumber from 'bignumber.js' import Moralis from 'moralis' import PQueue from 'p-queue' import { getAddress, isHex, parseUnits, PublicClient, toHex } from 'viem' -import type { BaseAPI, EstimateGasBody, RPCRequest, RPCResponse, SendTxBody, SubscriptionClient } from '..' +import type { BaseAPI, EstimateGasBody, RPCRequest, RPCResponse, SendTxBody } from '..' import { ApiError, BadRequestError } from '..' import { createAxiosRetry, exponentialDelay, handleError, rpcId, validatePageSize } from '../utils' import type { Account, API, Tx, TxHistory, GasFees, InternalTx, GasEstimate, TokenMetadata } from './models' @@ -42,7 +43,7 @@ export interface MoralisServiceArgs { minPriorityFee?: string } -export class MoralisService implements Omit, API, SubscriptionClient { +export class MoralisService implements Omit, API, AddressSubscriptionClient { private readonly chain: EvmChain private readonly logger: Logger private readonly client: PublicClient diff --git a/node/coinstacks/common/api/src/index.ts b/node/coinstacks/common/api/src/index.ts index 1f0816534..5a44186df 100644 --- a/node/coinstacks/common/api/src/index.ts +++ b/node/coinstacks/common/api/src/index.ts @@ -4,7 +4,6 @@ export * from './models' export * as middleware from './middleware' export * from './websocket' export * from './registry' -export * from './prometheus' export * from './utils' export * as evm from './evm' diff --git a/node/coinstacks/common/api/src/middleware.ts b/node/coinstacks/common/api/src/middleware.ts index d412cdb23..f0f8962d0 100644 --- a/node/coinstacks/common/api/src/middleware.ts +++ b/node/coinstacks/common/api/src/middleware.ts @@ -1,10 +1,10 @@ +import { Prometheus } from '@shapeshiftoss/prometheus' import { json, urlencoded, NextFunction, Request, Response } from 'express' import compression from 'compression' import morgan from 'morgan' import cors from 'cors' import { ValidateError } from 'tsoa' import { ApiError, NotFoundError } from '.' -import { Prometheus } from './prometheus' export function errorHandler(err: Error, req: Request, res: Response, next: NextFunction): Response | void { if (err instanceof ValidateError) { diff --git a/node/coinstacks/common/api/src/registry.ts b/node/coinstacks/common/api/src/registry.ts index c7536d428..4ca0f0fc2 100644 --- a/node/coinstacks/common/api/src/registry.ts +++ b/node/coinstacks/common/api/src/registry.ts @@ -173,7 +173,7 @@ export class Registry { for (const [id, connection] of this.addresses[address].entries()) { const { subscriptionId } = Registry.fromId(id) - connection.publish(subscriptionId, address, tx) + connection.publish(subscriptionId, { address, data: tx }) } }) } diff --git a/node/coinstacks/common/api/src/websocket.ts b/node/coinstacks/common/api/src/websocket.ts index 400359ec6..e1c0f41cd 100644 --- a/node/coinstacks/common/api/src/websocket.ts +++ b/node/coinstacks/common/api/src/websocket.ts @@ -1,21 +1,9 @@ import { Logger } from '@shapeshiftoss/logger' -import { v4 } from 'uuid' +import { Prometheus } from '@shapeshiftoss/prometheus' +import { AddressSubscriptionClient, BaseConnectionHandler } from '@shapeshiftoss/websocket' import WebSocket from 'ws' -import { Prometheus } from './prometheus' import { Registry } from './registry' -export interface RequestPayload { - subscriptionId: string - method: 'subscribe' | 'unsubscribe' | 'ping' - data?: TxsTopicData -} - -export interface ErrorResponse { - subscriptionId: string - type: 'error' - message: string -} - export type Topics = 'txs' export interface TxsTopicData { @@ -23,17 +11,6 @@ export interface TxsTopicData { addresses: Array } -export interface MessageResponse { - address: string - data: unknown - subscriptionId: string -} - -export interface SubscriptionClient { - subscribeAddresses(currentAddresses: Array, addressesToAdd: Array): void - unsubscribeAddresses(currentAddresses: Array, addressesToRemove: Array): void -} - export interface Methods { // eslint-disable-next-line @typescript-eslint/no-explicit-any subscribe: (subscriptionId: string, data?: any) => void @@ -41,123 +18,78 @@ export interface Methods { unsubscribe: (subscriptionId: string, data?: any) => void } -export class ConnectionHandler { - public readonly clientId: string +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function isTxsTopicData(data: any): data is TxsTopicData { + return data && typeof data === 'object' && data.topic === 'txs' && Array.isArray(data.addresses) +} - private readonly websocket: WebSocket +export class ConnectionHandler extends BaseConnectionHandler { private readonly registry: Registry - private readonly client: SubscriptionClient - private readonly prometheus: Prometheus - private readonly logger: Logger + private readonly client: AddressSubscriptionClient private readonly routes: Record - private readonly pingIntervalMs = 10000 - - private pingTimeout?: NodeJS.Timeout - private subscriptionIds = new Set() private constructor( websocket: WebSocket, registry: Registry, - client: SubscriptionClient, + client: AddressSubscriptionClient, prometheus: Prometheus, logger: Logger ) { - this.clientId = v4() + super(websocket, prometheus, logger) + this.registry = registry this.client = client - this.prometheus = prometheus - this.logger = logger.child({ namespace: ['websocket'] }) this.routes = { txs: { subscribe: (subscriptionId: string, data?: TxsTopicData) => this.handleSubscribeTxs(subscriptionId, data), unsubscribe: (subscriptionId: string, data?: TxsTopicData) => this.handleUnsubscribeTxs(subscriptionId, data), }, } - - this.pingTimeout = undefined - this.prometheus.metrics.websocketCount.inc() - this.websocket = websocket - this.websocket.ping() - - const pingInterval = setInterval(() => { - this.websocket.ping() - }, this.pingIntervalMs) - - this.websocket.onerror = (error) => { - this.logger.error({ clientId: this.clientId, error, fn: 'ws.onerror' }, 'websocket error') - this.close(pingInterval) - } - this.websocket.onclose = ({ code, reason }) => { - this.prometheus.metrics.websocketCount.dec() - this.logger.debug({ clientId: this.clientId, code, reason, fn: 'ws.close' }, 'websocket closed') - this.close(pingInterval) - } - this.websocket.on('pong', () => this.heartbeat()) - this.websocket.on('ping', () => this.websocket.pong()) - this.websocket.onmessage = (event) => this.onMessage(event) } static start( websocket: WebSocket, registry: Registry, - client: SubscriptionClient, + client: AddressSubscriptionClient, prometheus: Prometheus, logger: Logger ): void { new ConnectionHandler(websocket, registry, client, prometheus, logger) } - private heartbeat(): void { - if (this.pingTimeout) { - clearTimeout(this.pingTimeout) + onSubscribe(subscriptionId: string, data?: unknown): void { + if (!isTxsTopicData(data)) { + this.sendError(`no topic specified for subscribe`, subscriptionId) + return } - this.pingTimeout = setTimeout(() => { - this.logger.debug({ clientId: this.clientId, fn: 'pingTimeout' }, 'heartbeat failed') - this.websocket.terminate() - }, this.pingIntervalMs + 1000) - } + const route = this.routes[data.topic] + + if (!route || typeof route.subscribe !== 'function') { + this.sendError(`subscribe method not implemented for topic: ${data.topic}`, subscriptionId) + return + } - private sendError(message: string, subscriptionId: string): void { - this.websocket.send(JSON.stringify({ subscriptionId, type: 'error', message } as ErrorResponse)) + route.subscribe(subscriptionId, data) } - private onMessage(event: WebSocket.MessageEvent): void { - try { - const payload: RequestPayload = JSON.parse(event.data.toString()) - - switch (payload.method) { - // browsers do not support ping/pong frame, handle message instead - case 'ping': { - this.websocket.send('pong') - break - } - case 'subscribe': - case 'unsubscribe': { - const topic = payload.data?.topic - - if (!topic) { - this.sendError(`no topic specified for method: ${payload.method}`, payload.subscriptionId) - break - } - - const callback = this.routes[topic][payload.method] - if (callback) { - callback(payload.subscriptionId, payload.data) - } else { - this.sendError(`${payload.method} method not implemented for topic: ${topic}`, payload.subscriptionId) - } - } - } - } catch (err) { - this.logger.error(err, { clientId: this.clientId, fn: 'onMessage', event }, 'Error processing message') + onUnsubscribe(subscriptionId: string, data?: unknown): void { + if (!isTxsTopicData(data)) { + this.sendError(`no topic specified for unsubscribe`, subscriptionId) + return } - } - private close(interval: NodeJS.Timeout): void { - this.pingTimeout && clearTimeout(this.pingTimeout) - clearInterval(interval) + const route = this.routes[data.topic] + if (!route || typeof route.unsubscribe !== 'function') { + this.sendError(`unsubscribe method not implemented for topic: ${data.topic}`, subscriptionId) + return + } + + route.unsubscribe(subscriptionId, data) + } + + onClose(): void { const unsubscribedAddresses: Array = [] for (const subscriptionId of this.subscriptionIds) { @@ -165,7 +97,6 @@ export class ConnectionHandler { unsubscribedAddresses.push(...addresses) } - this.subscriptionIds.clear() this.client.unsubscribeAddresses(this.registry.getAddresses(), unsubscribedAddresses) } @@ -205,9 +136,4 @@ export class ConnectionHandler { this.client.unsubscribeAddresses(this.registry.getAddresses(), unsubscribedAddresses) } - - publish(subscriptionId: string, address: string, data: unknown): void { - const message: MessageResponse = { address, data, subscriptionId } - this.websocket.send(JSON.stringify(message)) - } } diff --git a/node/coinstacks/dogecoin/api/package.json b/node/coinstacks/dogecoin/api/package.json index c2242102d..fb0783b04 100644 --- a/node/coinstacks/dogecoin/api/package.json +++ b/node/coinstacks/dogecoin/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/dogecoin/api/src/app.ts b/node/coinstacks/dogecoin/api/src/app.ts index 179fc63aa..546f22fe1 100644 --- a/node/coinstacks/dogecoin/api/src/app.ts +++ b/node/coinstacks/dogecoin/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { formatAddress, service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/ethereum/api/package.json b/node/coinstacks/ethereum/api/package.json index 2597fa886..85b14825c 100644 --- a/node/coinstacks/ethereum/api/package.json +++ b/node/coinstacks/ethereum/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/ethereum/api/src/app.ts b/node/coinstacks/ethereum/api/src/app.ts index 7d6a6b1e2..edf2a0a78 100644 --- a/node/coinstacks/ethereum/api/src/app.ts +++ b/node/coinstacks/ethereum/api/src/app.ts @@ -2,8 +2,9 @@ import express from 'express' import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' -import { evm, middleware, ConnectionHandler, Registry, TransactionHandler, Prometheus } from '@shapeshiftoss/common-api' +import { evm, middleware, ConnectionHandler, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/gnosis/api/package.json b/node/coinstacks/gnosis/api/package.json index d982c424a..6bfab9981 100644 --- a/node/coinstacks/gnosis/api/package.json +++ b/node/coinstacks/gnosis/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/gnosis/api/src/app.ts b/node/coinstacks/gnosis/api/src/app.ts index 1ef2064d0..fcb54c1ea 100644 --- a/node/coinstacks/gnosis/api/src/app.ts +++ b/node/coinstacks/gnosis/api/src/app.ts @@ -2,8 +2,9 @@ import express from 'express' import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' -import { evm, middleware, ConnectionHandler, Registry, TransactionHandler, Prometheus } from '@shapeshiftoss/common-api' +import { evm, middleware, ConnectionHandler, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/litecoin/api/package.json b/node/coinstacks/litecoin/api/package.json index 081656e61..9bc41c93f 100644 --- a/node/coinstacks/litecoin/api/package.json +++ b/node/coinstacks/litecoin/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/litecoin/api/src/app.ts b/node/coinstacks/litecoin/api/src/app.ts index 9682e5d92..3dc2fcf3f 100644 --- a/node/coinstacks/litecoin/api/src/app.ts +++ b/node/coinstacks/litecoin/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service, formatAddress } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/optimism/api/package.json b/node/coinstacks/optimism/api/package.json index 33ecc2522..4f231cfac 100644 --- a/node/coinstacks/optimism/api/package.json +++ b/node/coinstacks/optimism/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/optimism/api/src/app.ts b/node/coinstacks/optimism/api/src/app.ts index a7d2129b2..e88114cf8 100644 --- a/node/coinstacks/optimism/api/src/app.ts +++ b/node/coinstacks/optimism/api/src/app.ts @@ -2,8 +2,9 @@ import express from 'express' import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' -import { evm, middleware, ConnectionHandler, Registry, TransactionHandler, Prometheus } from '@shapeshiftoss/common-api' +import { evm, middleware, ConnectionHandler, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/polygon/api/package.json b/node/coinstacks/polygon/api/package.json index af3a7b8b0..bd7ecf465 100644 --- a/node/coinstacks/polygon/api/package.json +++ b/node/coinstacks/polygon/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", - "@shapeshiftoss/common-api": "^10.0.0" + "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0" } } diff --git a/node/coinstacks/polygon/api/src/app.ts b/node/coinstacks/polygon/api/src/app.ts index b39940750..582340bd8 100644 --- a/node/coinstacks/polygon/api/src/app.ts +++ b/node/coinstacks/polygon/api/src/app.ts @@ -2,8 +2,9 @@ import express from 'express' import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' -import { evm, middleware, ConnectionHandler, Registry, TransactionHandler, Prometheus } from '@shapeshiftoss/common-api' +import { evm, middleware, ConnectionHandler, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/coinstacks/solana/api/package.json b/node/coinstacks/solana/api/package.json index 7994e6c83..3d292274f 100644 --- a/node/coinstacks/solana/api/package.json +++ b/node/coinstacks/solana/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "@shapeshiftoss/websocket": "^10.0.0", "@solana/web3.js": "^1.95.3", "helius-sdk": "^1.4.0" diff --git a/node/coinstacks/solana/api/src/app.ts b/node/coinstacks/solana/api/src/app.ts index 1f3c7ba7a..d9c3f2cab 100644 --- a/node/coinstacks/solana/api/src/app.ts +++ b/node/coinstacks/solana/api/src/app.ts @@ -1,5 +1,6 @@ -import { ConnectionHandler, middleware, Prometheus, Registry, TransactionHandler } from '@shapeshiftoss/common-api' +import { ConnectionHandler, middleware, Registry, TransactionHandler } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' import { Logs } from '@solana/web3.js' import express from 'express' import { join } from 'path' diff --git a/node/coinstacks/solana/api/src/websocket.ts b/node/coinstacks/solana/api/src/websocket.ts index 0ac947c14..98162c30e 100644 --- a/node/coinstacks/solana/api/src/websocket.ts +++ b/node/coinstacks/solana/api/src/websocket.ts @@ -1,5 +1,5 @@ import { Logger } from '@shapeshiftoss/logger' -import { WebsocketClient as BaseWebsocketClient, Args, Options, Subscription } from '@shapeshiftoss/websocket' +import { Args, Options, Subscription, AddressSubscriptionWebsocketClient } from '@shapeshiftoss/websocket' import WebSocket from 'ws' import { isWebsocketResponse, @@ -17,7 +17,7 @@ interface WebsocketArgs extends Omit { type TransactionHandler = (data: Logs) => Promise -export class WebsocketClient extends BaseWebsocketClient { +export class WebsocketClient extends AddressSubscriptionWebsocketClient { private handleTransaction: TransactionHandler | Array private addresses: Array = [] private subscriptionIds: Array = [] @@ -29,7 +29,7 @@ export class WebsocketClient extends BaseWebsocketClient { this.handleTransaction = args.transactionHandler - this.initialize() + super.connect() } protected onOpen(): void { @@ -95,7 +95,7 @@ export class WebsocketClient extends BaseWebsocketClient { } try { - this.socket.send(JSON.stringify(subscription)) + this.socket?.send(JSON.stringify(subscription)) } catch (err) { this.logger.debug(err, `failed to subscribe address: ${JSON.stringify(subscription)}`) } @@ -107,7 +107,7 @@ export class WebsocketClient extends BaseWebsocketClient { } private unsubscribe(subscriptionId: number): void { - this.socket.send( + this.socket?.send( JSON.stringify({ jsonrpc: '2.0', id: 'unsubscribe', diff --git a/node/coinstacks/zcash/api/package.json b/node/coinstacks/zcash/api/package.json index 6e7a7e28b..79e9f7980 100644 --- a/node/coinstacks/zcash/api/package.json +++ b/node/coinstacks/zcash/api/package.json @@ -14,6 +14,7 @@ "dependencies": { "@shapeshiftoss/blockbook": "^10.0.0", "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bech32": "^2.0.0" } } diff --git a/node/coinstacks/zcash/api/src/app.ts b/node/coinstacks/zcash/api/src/app.ts index 107d6558b..3fe79107f 100644 --- a/node/coinstacks/zcash/api/src/app.ts +++ b/node/coinstacks/zcash/api/src/app.ts @@ -3,16 +3,10 @@ import { join } from 'path' import { Server } from 'ws' import swaggerUi from 'swagger-ui-express' import { Logger } from '@shapeshiftoss/logger' -import { - middleware, - ConnectionHandler, - Registry, - BlockHandler, - TransactionHandler, - Prometheus, -} from '@shapeshiftoss/common-api' +import { middleware, ConnectionHandler, Registry, BlockHandler, TransactionHandler } from '@shapeshiftoss/common-api' import { getAddresses, NewBlock, Tx as BlockbookTx, WebsocketClient } from '@shapeshiftoss/blockbook' import { utxo } from '@shapeshiftoss/common-api' +import { Prometheus } from '@shapeshiftoss/prometheus' import { service, formatAddress } from './controller' import { RegisterRoutes } from './routes' diff --git a/node/packages/blockbook/src/websocket.ts b/node/packages/blockbook/src/websocket.ts index 149495607..9aa98414d 100644 --- a/node/packages/blockbook/src/websocket.ts +++ b/node/packages/blockbook/src/websocket.ts @@ -1,5 +1,5 @@ import { Logger } from '@shapeshiftoss/logger' -import { WebsocketClient as BaseWebsocketClient, Args, Options, Subscription } from '@shapeshiftoss/websocket' +import { Args, Options, Subscription, AddressSubscriptionWebsocketClient } from '@shapeshiftoss/websocket' import WebSocket from 'ws' import { Tx } from './models' import { NewBlock, WebsocketRepsonse } from '.' @@ -14,7 +14,7 @@ interface WebsocketArgs extends Omit { blockHandler: BlockHandler | Array } -export class WebsocketClient extends BaseWebsocketClient { +export class WebsocketClient extends AddressSubscriptionWebsocketClient { private handleTransaction: TransactionHandler | Array private handleBlock: BlockHandler | Array @@ -27,16 +27,16 @@ export class WebsocketClient extends BaseWebsocketClient { this.handleTransaction = args.transactionHandler this.handleBlock = args.blockHandler - super.initialize() + super.connect() } protected onOpen(): void { const subscribeNewBlock: Subscription = { jsonrpc: '2.0', id: 'newBlock', method: 'subscribeNewBlock', params: {} } - this.socket.send(JSON.stringify(subscribeNewBlock)) + this.socket?.send(JSON.stringify(subscribeNewBlock)) if (this.addresses.length) { const subscribeAddresses = this.getAddressesSubscription() - this.socket.send(JSON.stringify(subscribeAddresses)) + this.socket?.send(JSON.stringify(subscribeAddresses)) } } @@ -86,7 +86,7 @@ export class WebsocketClient extends BaseWebsocketClient { const subscribeAddresses = this.getAddressesSubscription() try { - this.socket.send(JSON.stringify(subscribeAddresses)) + this.socket?.send(JSON.stringify(subscribeAddresses)) } catch (err) { this.logger.debug(err, `failed to subscribe addresses: ${JSON.stringify(subscribeAddresses)}`) } diff --git a/node/packages/prometheus/package.json b/node/packages/prometheus/package.json new file mode 100644 index 000000000..5a9d4d107 --- /dev/null +++ b/node/packages/prometheus/package.json @@ -0,0 +1,20 @@ +{ + "name": "@shapeshiftoss/prometheus", + "version": "10.0.0", + "license": "MIT", + "main": "dist/index.js", + "source": "src/index.ts", + "types": "dist/index.d.ts", + "files": [ + "dist" + ], + "scripts": { + "build": "tsc", + "clean": "rm -rf dist node_modules", + "dev": "nodemon -e ts --watch src -x yarn build", + "watch": "nodemon -e ts --watch src -x yarn build" + }, + "dependencies": { + "prom-client": "^15.0.0" + } +} diff --git a/node/packages/prometheus/src/index.ts b/node/packages/prometheus/src/index.ts new file mode 100644 index 000000000..d0fa32f11 --- /dev/null +++ b/node/packages/prometheus/src/index.ts @@ -0,0 +1 @@ +export * from './prometheus' diff --git a/node/coinstacks/common/api/src/prometheus.ts b/node/packages/prometheus/src/prometheus.ts similarity index 100% rename from node/coinstacks/common/api/src/prometheus.ts rename to node/packages/prometheus/src/prometheus.ts diff --git a/node/packages/prometheus/tsconfig.json b/node/packages/prometheus/tsconfig.json new file mode 100644 index 000000000..f0beb6444 --- /dev/null +++ b/node/packages/prometheus/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "incremental": true + }, + "include": ["src"], + "exclude": ["node_modules", "**/__tests__", "**/__mocks__"] +} diff --git a/node/packages/websocket/package.json b/node/packages/websocket/package.json index 096ba53ee..588c764a9 100644 --- a/node/packages/websocket/package.json +++ b/node/packages/websocket/package.json @@ -15,6 +15,7 @@ "watch": "yarn run -T nodemon -e ts --watch src -x yarn build" }, "dependencies": { + "@shapeshiftoss/prometheus": "^10.0.0", "ws": "^8.15.0" }, "devDependencies": { diff --git a/node/packages/websocket/src/connectionHandler.ts b/node/packages/websocket/src/connectionHandler.ts new file mode 100644 index 000000000..a8eb656d6 --- /dev/null +++ b/node/packages/websocket/src/connectionHandler.ts @@ -0,0 +1,113 @@ +import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' +import { v4 } from 'uuid' +import WebSocket from 'ws' + +export interface RequestPayload { + subscriptionId: string + method: 'subscribe' | 'unsubscribe' | 'ping' + data?: unknown +} + +export interface ErrorResponse { + subscriptionId: string + type: 'error' + message: string +} + +export interface MessageResponse extends Record { + data: unknown +} + +export abstract class BaseConnectionHandler { + public readonly clientId: string + + protected readonly websocket: WebSocket + private readonly prometheus?: Prometheus + private readonly logger: Logger + private readonly pingIntervalMs = 10000 + + private pingTimeout?: NodeJS.Timeout + protected subscriptionIds = new Set() + + abstract onSubscribe(subscriptionId: string, data?: unknown): void + abstract onUnsubscribe(subscriptionId: string): void + abstract onClose(): void + + constructor(websocket: WebSocket, prometheus: Prometheus, logger: Logger) { + this.clientId = v4() + this.prometheus = prometheus + this.logger = logger.child({ namespace: ['websocket'] }) + + this.pingTimeout = undefined + this.prometheus?.metrics.websocketCount.inc() + this.websocket = websocket + this.websocket.ping() + this.heartbeat() + + const pingInterval = setInterval(() => { + this.websocket.ping() + }, this.pingIntervalMs) + + this.websocket.onerror = (error) => { + this.logger.error({ clientId: this.clientId, error, fn: 'ws.onerror' }, 'websocket error') + this.close(pingInterval) + } + this.websocket.onclose = ({ code, reason }) => { + this.prometheus?.metrics.websocketCount.dec() + this.logger.debug({ clientId: this.clientId, code, reason, fn: 'ws.close' }, 'websocket closed') + this.close(pingInterval) + } + this.websocket.on('pong', () => this.heartbeat()) + this.websocket.on('ping', () => this.websocket.pong()) + this.websocket.onmessage = (event) => this.onMessage(event) + } + + private heartbeat(): void { + if (this.pingTimeout) { + clearTimeout(this.pingTimeout) + } + + this.pingTimeout = setTimeout(() => { + this.logger.debug({ clientId: this.clientId, fn: 'pingTimeout' }, 'heartbeat failed') + this.websocket.terminate() + }, this.pingIntervalMs + 1000) + } + + protected sendError(message: string, subscriptionId: string): void { + this.websocket.send(JSON.stringify({ subscriptionId, type: 'error', message } as ErrorResponse)) + } + + private onMessage(event: WebSocket.MessageEvent): void { + try { + const payload: RequestPayload = JSON.parse(event.data.toString()) + + switch (payload.method) { + // browsers do not support ping/pong frame, handle message instead + case 'ping': { + this.websocket.send('pong') + return + } + case 'subscribe': + this.onSubscribe(payload.subscriptionId, payload.data) + return + case 'unsubscribe': + this.onUnsubscribe(payload.subscriptionId) + return + } + } catch (err) { + this.logger.error(err, { clientId: this.clientId, fn: 'onMessage', event }, 'Error processing message') + } + } + + private close(interval: NodeJS.Timeout): void { + this.pingTimeout && clearTimeout(this.pingTimeout) + clearInterval(interval) + this.onClose() + this.subscriptionIds.clear() + } + + publish(subscriptionId: string, payload: { data: unknown } & Record): void { + this.websocket.send(JSON.stringify({ ...payload, subscriptionId })) + } +} diff --git a/node/packages/websocket/src/index.ts b/node/packages/websocket/src/index.ts index b2196badb..f1d958ca5 100644 --- a/node/packages/websocket/src/index.ts +++ b/node/packages/websocket/src/index.ts @@ -1 +1,2 @@ +export * from './connectionHandler' export * from './websocket' diff --git a/node/packages/websocket/src/websocket.ts b/node/packages/websocket/src/websocket.ts index 61a3884a4..d5008e9e5 100644 --- a/node/packages/websocket/src/websocket.ts +++ b/node/packages/websocket/src/websocket.ts @@ -24,8 +24,8 @@ export interface Options { resetInterval?: number } -export abstract class WebsocketClient { - protected socket: WebSocket +export abstract class BaseWebsocketClient { + protected socket?: WebSocket protected url: string protected pingTimeout?: NodeJS.Timeout protected interval?: NodeJS.Timeout @@ -40,11 +40,8 @@ export abstract class WebsocketClient { protected abstract onOpen(): void protected abstract onMessage(message: WebSocket.MessageEvent): Promise - abstract subscribeAddresses(addresses: string[]): void - constructor(url: string, args: Args, opts?: Options) { this.url = url - this.socket = new WebSocket(this.url, { handshakeTimeout: 5000 }) this.logger = args.logger this.pingInterval = opts?.pingInterval ?? 10000 @@ -52,48 +49,50 @@ export abstract class WebsocketClient { this.resetInterval = opts?.resetInterval ?? RESET_INTERVAL } - protected initialize(): void { - this.socket.on('ping', () => this.socket.pong()) + protected connect(): void { + if (this.socket?.readyState === WebSocket.OPEN) return + + this.socket = new WebSocket(this.url, { handshakeTimeout: 5000 }) + + this.socket.on('ping', () => this.socket?.pong()) this.socket.on('pong', () => this.heartbeat()) this.socket.onerror = (error) => { this.logger.error({ error, fn: 'ws.onerror' }, 'websocket error') } this.socket.onclose = ({ code, reason }) => { this.logger.error({ code, reason, fn: 'ws.close' }, 'websocket closed') - this.close() + this.close(code) } this.socket.onmessage = (msg) => this.onMessage(msg) this.socket.onopen = () => { this.logger.debug({ fn: 'ws.onopen' }, 'websocket opened') this.retryCount = 0 - this.interval = setInterval(() => this.socket.ping(), this.pingInterval) + this.interval = setInterval(() => this.socket?.ping(), this.pingInterval) this.heartbeat() this.onOpen() this.reset() } } - private close(): void { + private close(code: number): void { this.interval && clearInterval(this.interval) + this.pingTimeout && clearTimeout(this.pingTimeout) + this.resetTimeout && clearTimeout(this.resetTimeout) if (++this.retryCount >= this.retryAttempts && this.retryAttempts !== 0) { throw new Error('failed to reconnect') } - setTimeout( - () => { - this.socket = new WebSocket(this.url, { handshakeTimeout: 5000 }) - this.initialize() - }, - Math.min(Math.random() * (BASE_DELAY * this.retryCount ** 2), MAX_DELAY) - ) + if (code === 1000) return + + setTimeout(() => this.connect(), Math.min(Math.random() * (BASE_DELAY * this.retryCount ** 2), MAX_DELAY)) } private heartbeat(): void { this.pingTimeout && clearTimeout(this.pingTimeout) this.pingTimeout = setTimeout(() => { this.logger.debug({ fn: 'pingTimeout' }, 'heartbeat failed') - this.socket.terminate() + this.socket?.terminate() }, this.pingInterval + 1000) } @@ -103,7 +102,17 @@ export abstract class WebsocketClient { this.resetTimeout && clearTimeout(this.resetTimeout) this.resetTimeout = setTimeout(() => { this.logger.debug({ fn: 'reset' }, 'reset websocket') - this.socket.terminate() + this.socket?.terminate() }, this.resetInterval) } } + +export abstract class AddressSubscriptionWebsocketClient extends BaseWebsocketClient { + abstract subscribeAddresses(currentAddresses: Array, addressesToAdd: Array): void + abstract unsubscribeAddresses(currentAddresses: Array, addressesToRemove: Array): void +} + +export type AddressSubscriptionClient = Pick< + AddressSubscriptionWebsocketClient, + 'subscribeAddresses' | 'unsubscribeAddresses' +> diff --git a/node/proxy/api/package.json b/node/proxy/api/package.json index bd8b86ddd..5bdffd92c 100644 --- a/node/proxy/api/package.json +++ b/node/proxy/api/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@shapeshiftoss/common-api": "^10.0.0", + "@shapeshiftoss/prometheus": "^10.0.0", "bottleneck": "^2.19.5", "elliptic-sdk": "^0.7.2" } diff --git a/node/proxy/api/src/app.ts b/node/proxy/api/src/app.ts index 8d0158ae2..a1c263a69 100644 --- a/node/proxy/api/src/app.ts +++ b/node/proxy/api/src/app.ts @@ -3,25 +3,37 @@ import { join } from 'path' import swaggerUi from 'swagger-ui-express' import { middleware } from '@shapeshiftoss/common-api' import { Logger } from '@shapeshiftoss/logger' +import { Prometheus } from '@shapeshiftoss/prometheus' +import { Server } from 'ws' import { RegisterRoutes } from './routes' import { CoinGecko } from './coingecko' import { Zerion } from './zerion' import { Zrx } from './zrx' import { Portals } from './portals' +import { MarketDataConnectionHandler } from './marketData' +import { CoincapWebsocketClient } from './coincap' const PORT = process.env.PORT ?? 3000 +const COINCAP_API_KEY = process.env.COINCAP_API_KEY export const logger = new Logger({ namespace: ['unchained', 'proxy', 'api'], level: process.env.LOG_LEVEL, }) +const prometheus = new Prometheus({ coinstack: 'proxy' }) + const app = express() -app.use(...middleware.common()) +app.use(...middleware.common(prometheus)) app.get('/health', async (_, res) => res.json({ status: 'ok' })) +app.get('/metrics', async (_, res) => { + res.setHeader('Content-Type', prometheus.register.contentType) + res.send(await prometheus.register.metrics()) +}) + const options: swaggerUi.SwaggerUiOptions = { customCss: '.swagger-ui .topbar { display: none }', customSiteTitle: 'ShapeShift Proxy API Docs', @@ -54,4 +66,14 @@ app.get('/', async (_, res) => { app.use(middleware.errorHandler, middleware.notFoundHandler) -app.listen(PORT, () => logger.info('Server started')) +const server = app.listen(PORT, () => logger.info('Server started')) + +const coincap = new CoincapWebsocketClient(`wss://wss.coincap.io/prices?assets=ALL&apiKey=${COINCAP_API_KEY}`, { + logger, +}) + +const wsServer = new Server({ server }) + +wsServer.on('connection', (connection) => { + MarketDataConnectionHandler.start(connection, coincap, prometheus, logger) +}) diff --git a/node/proxy/api/src/coincap.ts b/node/proxy/api/src/coincap.ts new file mode 100644 index 000000000..ef89677ad --- /dev/null +++ b/node/proxy/api/src/coincap.ts @@ -0,0 +1,94 @@ +import WebSocket from 'ws' + +import { BaseWebsocketClient, Args, Options, BaseConnectionHandler } from '@shapeshiftoss/websocket' +import { MarketDataClient, MarketDataConnectionHandler, MarketDataMessage } from './marketData' + +export class CoincapWebsocketClient extends BaseWebsocketClient implements MarketDataClient { + clients = new Map() + // clientId -> subscriptionId -> assets + subscriptions = new Map>() + + constructor(url: string, args: Args, opts?: Options) { + super(url, { logger: args.logger }, opts) + } + + protected onOpen(): void {} + + protected async onMessage(message: WebSocket.MessageEvent): Promise { + try { + const res = JSON.parse(message.data.toString()) as Record + + super.reset() + this.handleMessage(res) + } catch (err) { + this.logger.error(err, `failed to handle message: ${JSON.stringify(message)}`) + } + } + + subscribe(clientId: string, subscriptionId: string, connection: MarketDataConnectionHandler, assets: Array) { + if (!this.clients.size) this.connect() + this.clients.set(clientId, connection) + + if (!this.subscriptions.has(clientId)) { + this.subscriptions.set(clientId, new Map()) + } + + this.subscriptions.get(clientId)!.set(subscriptionId, assets) + } + + unsubscribe(clientId: string, subscriptionId?: string) { + const clientSubscriptions = this.subscriptions.get(clientId) + + if (clientSubscriptions && subscriptionId) { + // Remove specific subscription + clientSubscriptions.delete(subscriptionId) + + // If client has no more subscriptions, remove them entirely + if (clientSubscriptions.size === 0) { + this.clients.delete(clientId) + this.subscriptions.delete(clientId) + } + } else { + // Remove all subscriptions for this client + this.clients.delete(clientId) + this.subscriptions.delete(clientId) + } + + // Close connection if no more clients + if (!this.clients.size) this.socket?.close(1000) + } + + private handleMessage(message: Record): void { + for (const [clientId, client] of this.clients) { + try { + const clientSubscriptions = this.subscriptions.get(clientId) + if (!clientSubscriptions) continue + + // Send updates for each subscription + for (const [subscriptionId, assets] of clientSubscriptions) { + // Filter data to only include assets this subscription requested + const filteredData: Record = {} + for (const asset of assets) { + if (message[asset] !== undefined) { + filteredData[asset] = message[asset] + } + } + + // Only send if there's relevant data for this subscription + if (Object.keys(filteredData).length > 0) { + const payload: MarketDataMessage = { + type: 'price_update', + source: 'coincap', + data: filteredData, + timestamp: Date.now(), + } + + client.publish(subscriptionId, payload) + } + } + } catch (error) { + this.logger.error({ clientId, error }, 'failed to handle message') + } + } + } +} diff --git a/node/proxy/api/src/marketData.ts b/node/proxy/api/src/marketData.ts new file mode 100644 index 000000000..8565f4142 --- /dev/null +++ b/node/proxy/api/src/marketData.ts @@ -0,0 +1,78 @@ +import { Logger } from '@shapeshiftoss/logger' +import WebSocket from 'ws' + +import { BaseConnectionHandler, MessageResponse } from '@shapeshiftoss/websocket' +import { Prometheus } from '@shapeshiftoss/prometheus' + +export interface MarketDataMessage extends MessageResponse { + type: 'price_update' + source: string + data: Record + timestamp: number +} + +export interface SubscribePayload { + assets: Array +} + +function isSubscribePayload(data: unknown): data is SubscribePayload { + return data !== null && typeof data === 'object' && 'assets' in data +} + +export interface MarketDataClient { + subscribe( + clientId: string, + subscriptionId: string, + connection: MarketDataConnectionHandler, + assets: Array + ): void + unsubscribe(clientId: string, subscriptionId?: string): void +} + +export class MarketDataConnectionHandler extends BaseConnectionHandler { + private readonly client: MarketDataClient + + private constructor(websocket: WebSocket, client: MarketDataClient, prometheus: Prometheus, logger: Logger) { + super(websocket, prometheus, logger) + + this.client = client + } + + static start(websocket: WebSocket, client: MarketDataClient, prometheus: Prometheus, logger: Logger): void { + new MarketDataConnectionHandler(websocket, client, prometheus, logger) + } + + onSubscribe(subscriptionId: string, data?: unknown): void { + if (!subscriptionId) { + this.sendError('subscriptionId required', subscriptionId) + return + } + + if (!isSubscribePayload(data)) { + this.sendError(`invalid subscription payload`, subscriptionId) + return + } + + if (!data.assets.length) { + this.sendError(`assets required`, subscriptionId) + return + } + + this.subscriptionIds.add(subscriptionId) + this.client.subscribe(this.clientId, subscriptionId, this, data.assets) + } + + onUnsubscribe(subscriptionId: string): void { + if (subscriptionId) { + this.subscriptionIds.delete(subscriptionId) + this.client.unsubscribe(this.clientId, subscriptionId) + } else { + this.subscriptionIds.clear() + this.client.unsubscribe(this.clientId) + } + } + + onClose(): void { + this.client.unsubscribe(this.clientId) + } +} diff --git a/node/proxy/sample.env b/node/proxy/sample.env index 666aef86a..bf28996b5 100644 --- a/node/proxy/sample.env +++ b/node/proxy/sample.env @@ -1,6 +1,7 @@ # SECRET ENVIRONMENT VARIABLES ELLIPTIC_API_KEY= ELLIPTIC_API_SECRET= +COINCAP_API_KEY= COINGECKO_API_KEY= ZERION_API_KEY= ZRX_API_KEY= diff --git a/package.json b/package.json index 7f7e865c4..48a1294fd 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,6 @@ "moralis": "^2.27.2", "morgan": "^1.10.0", "p-queue": "^9.0.0", - "prom-client": "^15.0.0", "swagger-ui-express": "^5.0.0", "tsoa": "^6.6.0", "viem": "^2.33.2" diff --git a/yarn.lock b/yarn.lock index 60b6d053a..d68aa4e52 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3076,6 +3076,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3085,6 +3086,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3106,6 +3108,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3121,6 +3124,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3136,6 +3140,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" bech32: "npm:^2.0.0" languageName: unknown linkType: soft @@ -3152,6 +3157,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" bech32: "npm:^2.0.0" languageName: unknown linkType: soft @@ -3178,6 +3184,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3205,6 +3212,7 @@ __metadata: resolution: "@shapeshiftoss/common-api@workspace:node/coinstacks/common/api" dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" "@shapeshiftoss/websocket": "npm:^10.0.0" "@types/uuid": "npm:^9.0.7" "@types/ws": "npm:^8.5.10" @@ -3220,6 +3228,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" bech32: "npm:^2.0.0" languageName: unknown linkType: soft @@ -3236,6 +3245,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3251,6 +3261,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3266,6 +3277,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" bech32: "npm:^2.0.0" languageName: unknown linkType: soft @@ -3289,6 +3301,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3304,6 +3317,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" languageName: unknown linkType: soft @@ -3313,6 +3327,14 @@ __metadata: languageName: unknown linkType: soft +"@shapeshiftoss/prometheus@npm:^10.0.0, @shapeshiftoss/prometheus@workspace:node/packages/prometheus": + version: 0.0.0-use.local + resolution: "@shapeshiftoss/prometheus@workspace:node/packages/prometheus" + dependencies: + prom-client: "npm:^15.0.0" + languageName: unknown + linkType: soft + "@shapeshiftoss/proxy-pulumi@workspace:node/proxy/pulumi": version: 0.0.0-use.local resolution: "@shapeshiftoss/proxy-pulumi@workspace:node/proxy/pulumi" @@ -3324,6 +3346,7 @@ __metadata: resolution: "@shapeshiftoss/solana-api@workspace:node/coinstacks/solana/api" dependencies: "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" "@shapeshiftoss/websocket": "npm:^10.0.0" "@solana/web3.js": "npm:^1.95.3" helius-sdk: "npm:^1.4.0" @@ -3359,6 +3382,7 @@ __metadata: resolution: "@shapeshiftoss/unchained-proxy@workspace:node/proxy/api" dependencies: "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" bottleneck: "npm:^2.19.5" elliptic-sdk: "npm:^0.7.2" languageName: unknown @@ -3376,6 +3400,7 @@ __metadata: version: 0.0.0-use.local resolution: "@shapeshiftoss/websocket@workspace:node/packages/websocket" dependencies: + "@shapeshiftoss/prometheus": "npm:^10.0.0" "@types/ws": "npm:^8.5.10" ws: "npm:^8.15.0" languageName: unknown @@ -3387,6 +3412,7 @@ __metadata: dependencies: "@shapeshiftoss/blockbook": "npm:^10.0.0" "@shapeshiftoss/common-api": "npm:^10.0.0" + "@shapeshiftoss/prometheus": "npm:^10.0.0" bech32: "npm:^2.0.0" languageName: unknown linkType: soft @@ -12221,7 +12247,6 @@ __metadata: nodemon: "npm:^3.0.2" p-queue: "npm:^9.0.0" prettier: "npm:^3.1.1" - prom-client: "npm:^15.0.0" swagger-ui-express: "npm:^5.0.0" tsoa: "npm:^6.6.0" typescript: "npm:5.9.3"