From 04511af0866f0d2f12b4238bba38cf137e21e484 Mon Sep 17 00:00:00 2001 From: hemulin Date: Wed, 29 May 2024 23:56:31 +0100 Subject: [PATCH 01/10] Initial adding of historical and live Osmosis data processing to Clickhouse. WIP --- api/.env.example | 2 + api/src/app/app.module.ts | 4 + .../osmosis/OsmosisHistoricalProcessor.ts | 217 ++++++++++++++++++ .../osmosis/OsmosisLiveProcessor.ts | 157 +++++++++++++ .../bridged-networks/osmosis/OsmosisModule.ts | 37 +++ .../osmosis/osmosis.controller.ts | 25 ++ 6 files changed, 442 insertions(+) create mode 100644 api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts create mode 100644 api/src/clickhouse/bridged-networks/osmosis/OsmosisLiveProcessor.ts create mode 100644 api/src/clickhouse/bridged-networks/osmosis/OsmosisModule.ts create mode 100644 api/src/clickhouse/bridged-networks/osmosis/osmosis.controller.ts diff --git a/api/.env.example b/api/.env.example index 41e1b993..2acf6eb1 100644 --- a/api/.env.example +++ b/api/.env.example @@ -25,3 +25,5 @@ NATS_SERVERS="127.0.0.1:4222" DATA_API_HOST="https://data.0l.fyi" DATABASE_URL="postgresql://olfyi:olfyi@127.0.0.1:5432/olfyi?schema=public" + +NUMIA_API_KEY="************************************" \ No newline at end of file diff --git a/api/src/app/app.module.ts b/api/src/app/app.module.ts index 6b779d8f..ddf5ec0a 100644 --- a/api/src/app/app.module.ts +++ b/api/src/app/app.module.ts @@ -17,6 +17,9 @@ import { NatsModule } from "../nats/nats.module.js"; import { OlSwapModule } from "../ol-swap/OlSwapModule.js"; import { MultiSigModule } from "../multi-sig/multi-sig.module.js"; +// Test purposes +import { OsmosisModule } from "../clickhouse/bridged-networks/osmosis/OsmosisModule.js"; + @Module({ imports: [ NatsModule, @@ -47,6 +50,7 @@ import { MultiSigModule } from "../multi-sig/multi-sig.module.js"; OlSwapModule, NodeWatcherModule, MultiSigModule, + OsmosisModule, ], controllers: [], providers: [AppService], diff --git a/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts b/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts new file mode 100644 index 00000000..6a373883 --- /dev/null +++ b/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts @@ -0,0 +1,217 @@ +import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq"; +import { Job, Queue } from "bullmq"; +import axios from "axios"; +import { ClickhouseService } from '../../clickhouse.service.js'; +import { BigQuery } from '@google-cloud/bigquery'; + +interface MintBurnEvent { + timestamp: number; + type: string; + amount: string; + address: string; +} + +interface TransferEvent { + timestamp: number; + from: string; + to: string; + amount: string; +} + +interface PoolSwapEvent { + timestamp: number; + sender: string; + side: string; + amount: string; +} + +@Processor("osmosis-historical") +export class OsmosisHistoricalProcessor extends WorkerHost { + private readonly endpoint = 'https://osmosis.numia.xyz/v2/txs'; + private readonly pageSize = 100; + private readonly bigQueryClient = new BigQuery(); + + constructor( + private readonly clickhouseService: ClickhouseService, + + @InjectQueue("osmosis-historical") + private readonly osmosisQueue: Queue, + ) { + super(); + } + + public async process(job: Job): Promise { + switch (job.name) { + case "fetchHistoricalData": + try { + await this.fetchHistoricalData(); + } catch (error) { + // fail silently to avoid accumulating failed repeating jobs + } + break; + + default: + throw new Error(`invalid job name ${job.name}`); + } + } + + public async triggerFetchHistoricalData() { + // await this.osmosisQueue.add("fetchHistoricalData", undefined); + // await this.fetchMintBurnEvents(); + await this.fetchPoolSwapEvents(); + } + + private async fetchHistoricalData() { + await this.fetchMintBurnEvents(); // And internally, one hop transfers of wLIBRA + await this.fetchPoolSwapEvents(); + } + + private async fetchMintBurnEvents() { + let page = 1; + let hasMore = true; + + while (hasMore) { + const url = `${this.endpoint}/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p?pageSize=${this.pageSize}&page=${page}`; + console.log(url) + const response = await axios.get(url, { + headers: { + Authorization: `Bearer ${process.env.NUMIA_API_KEY}`, + }, + }); + const events = response.data; + + if (events.length === 0) { + hasMore = false; + continue; + } + + for (const event of events) { + const timestamp = new Date(event.blockTimestamp).getTime(); + const type = event.messageTypes[0] === '/osmosis.tokenfactory.v1beta1.MsgMint' ? 'mint' : 'burn'; + const amount = event.messages[0].amount.amount; + const address = type === 'mint' ? event.messages[0].mint_to_address : event.messages[0].burn_from_address; + + await this.insertMintBurnEvent({ timestamp, type, amount, address }); + + if (type === 'mint') { + // Not a proper DFS, just one hop away from minter + await this.fetchTransfersForAddress(event.messages[0].mint_to_address); + } + } + + page++; + } + } + + private async fetchTransfersForAddress(address: string) { + let page = 1; + let hasMore = true; + + while (hasMore) { + const url = `${this.endpoint}/${address}?pageSize=${this.pageSize}&page=${page}`; + const response = await axios.get(url, { + headers: { + Authorization: `Bearer ${process.env.NUMIA_API_KEY}`, + }, + }); + const events = response.data; + + if (events.length === 0) { + hasMore = false; + continue; + } + + for (const event of events) { + if (event.messageTypes.includes('/cosmos.bank.v1beta1.MsgSend')) { + for (const message of event.messages) { + if (message.amount[0].denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { + const transferEvent = { + timestamp: new Date(event.blockTimestamp).getTime(), + from: message.from_address, + to: message.to_address, + amount: message.amount.amount, + }; + await this.insertTransferEvent(transferEvent); + } + } + } + } + + page++; + } + } + + private async fetchPoolSwapEvents() { + const query = ` + SELECT + tx_id, + sender, + denom_in, + parsed_amount_in, + denom_out, + parsed_amount_out, + ingestion_timestamp + FROM \`numia-data.osmosis.osmosis_swaps\` + WHERE pool_id = '1721' + ORDER BY ingestion_timestamp ASC + `; + + const [rows] = await this.bigQueryClient.query(query); + + for (const row of rows) { + const side = row.denom_in === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA' ? 'buy' : 'sell'; + const swapEvent = { + timestamp: new Date(row.ingestion_timestamp).getTime(), + sender: row.sender, + side, + amount: side === 'buy' ? row.parsed_amount_in : row.parsed_amount_out, + }; + await this.insertSwapEvent(swapEvent); + } + } + + private async insertMintBurnEvent(event: MintBurnEvent) { + console.log('Mint/Burn Event:', event); + /* + await this.clickhouseService.client.insert({ + table: 'mint_burn_events', + values: { + timestamp: event.timestamp, + type: event.type, + amount: event.amount, + address: event.address, + } + }); + */ + } + + private async insertTransferEvent(event: TransferEvent) { + console.log('Transfer Event:', event); + /* + await this.clickhouseService.client.insert({ + table: 'transfer_events', + values: { + timestamp: event.timestamp, + from: event.from, + to: event.to, + amount: event.amount, + } + }); + */ + } + + private async insertSwapEvent(event: PoolSwapEvent) { + console.log('Pool Swap Event:', event); + /* + await this.clickhouseService.client.insert({ + table: 'pool_swap_events', + values: { + timestamp: event.timestamp, + sender: event.sender, + side: event.side, + amount: event.amount, + } + }); + */ + } +} diff --git a/api/src/clickhouse/bridged-networks/osmosis/OsmosisLiveProcessor.ts b/api/src/clickhouse/bridged-networks/osmosis/OsmosisLiveProcessor.ts new file mode 100644 index 00000000..a97dcecf --- /dev/null +++ b/api/src/clickhouse/bridged-networks/osmosis/OsmosisLiveProcessor.ts @@ -0,0 +1,157 @@ +import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq"; +import { OnModuleInit } from "@nestjs/common"; +import { Job, Queue } from "bullmq"; +import axios from "axios"; +import Bluebird from "bluebird"; +import { ClickhouseService } from '../../clickhouse.service.js'; + +interface PoolSwapEvent { + timestamp: number; + pool_id: string; + token_out_denom: string; + token_in_denom: string; + side: string; + amount: string; +} + +interface MintBurnTransferEvent { + timestamp: number; + type: string; + amount: string; + address: string; +} + +@Processor("osmosis-live") +export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { + private readonly endpoint = 'https://lcd.osmosis.zone/cosmos/tx/v1beta1/txs'; + + constructor( + private readonly clickhouseService: ClickhouseService, + + @InjectQueue("osmosis-live") + private readonly osmosisQueue: Queue, + ) { + super(); + } + + public async process(job: Job): Promise { + switch (job.name) { + case "fetchLiveData": + try { + await Promise.race([ + this.fetchAndStoreLiveData(), + // 1m timeout to avoid blocking the queue + Bluebird.delay(60 * 1_000), + ]); + } catch (error) { + // fail silently to avoid accumulating failed repeating jobs + } + break; + + default: + throw new Error(`invalid job name ${job.name}`); + } + } + + public async onModuleInit() { + await this.osmosisQueue.add("fetchLiveData", undefined, { + repeat: { + every: 60 * 30 * 1_000, // 1 minute + }, + }); + // await this.fetchAndStoreLiveData(); + } + + public async triggerFetchLiveData() { + // await this.osmosisQueue.add("fetchLiveData", undefined); + await this.fetchAndStoreLiveData(); + } + + private async fetchAndStoreLiveData() { + const events = [ + // '/osmosis.poolmanager.v1beta1.SwapAmountInRoute', + // '/osmosis.poolmanager.v1beta1.SwapAmountOutRoute', + '/osmosis.gamm.v1beta1.MsgSwapExactAmountIn', + '/osmosis.gamm.v1beta1.MsgSwapExactAmountOut', + '/osmosis.tokenfactory.v1beta1.MsgMint', + '/cosmos.bank.v1beta1.MsgSend', + ]; + + for (const event of events) { + const url = `${this.endpoint}?events=${event}`; + console.log(url) + const response = await axios.get(url); + const transactions = response.data.tx_responses; + + for (const tx of transactions) { + for (const log of tx.logs) { + for (const msg of log.events) { + if (msg.type === '/osmosis.gamm.v1beta1.MsgSwapExactAmountIn' || msg.type === '/osmosis.gamm.v1beta1.MsgSwapExactAmountOut') { + await this.handlePoolSwapEvent(msg); + } else if (msg.type === '/osmosis.tokenfactory.v1beta1.MsgMint' || msg.type === '/cosmos.bank.v1beta1.MsgSend') { + await this.handleMintBurnTransferEvent(msg); + } + } + } + } + } + } + + private async handlePoolSwapEvent(event) { + const { pool_id, token_out_denom, token_in_denom } = event.attributes.reduce((acc, attr) => { + acc[attr.key] = attr.value; + return acc; + }, {}); + + const timestamp = new Date(event.timestamp).getTime(); + const side = token_in_denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA' ? 'buy' : 'sell'; + const amount = side === 'buy' ? event.amount_in : event.amount_out; + + await this.insertSwapEvent({ timestamp, pool_id, token_out_denom, token_in_denom, side, amount }); + } + + private async handleMintBurnTransferEvent(event) { + const type = event.type.includes('Mint') ? 'mint' : 'burn'; + const { amount, address } = event.attributes.reduce((acc, attr) => { + if (attr.key === 'amount') acc.amount = attr.value; + if (attr.key === 'address') acc.address = attr.value; + return acc; + }, {}); + + const timestamp = new Date(event.timestamp).getTime(); + + await this.insertMintBurnTransferEvent({ timestamp, type, amount, address }); + } + + private async insertSwapEvent(event: PoolSwapEvent) { + console.log('Pool Swap Event:', event); + /* + await this.clickhouseService.client.insert({ + table: 'pool_swap_events', + values: { + timestamp: event.timestamp, + pool_id: event.pool_id, + token_out_denom: event.token_out_denom, + token_in_denom: event.token_in_denom, + side: event.side, + amount: event.amount, + } + }); + */ + } + + private async insertMintBurnTransferEvent(event: MintBurnTransferEvent) { + console.log('Mint/Burn Transfer Event:', event); + /* + await this.clickhouseService.client.insert({ + table: 'mint_burn_transfer_events', + values: { + timestamp: event.timestamp, + type: event.type, + amount: event.amount, + address: event.address, + } + }); + */ + } +} diff --git a/api/src/clickhouse/bridged-networks/osmosis/OsmosisModule.ts b/api/src/clickhouse/bridged-networks/osmosis/OsmosisModule.ts new file mode 100644 index 00000000..817881fe --- /dev/null +++ b/api/src/clickhouse/bridged-networks/osmosis/OsmosisModule.ts @@ -0,0 +1,37 @@ +import process from "node:process"; +import { BullModule } from "@nestjs/bullmq"; +import { Module, Type } from "@nestjs/common"; + +import { redisClient } from "../../../redis/redis.service.js"; +import { ClickhouseModule } from "../../clickhouse.module.js"; +import { OsmosisLiveProcessor } from "./OsmosisLiveProcessor.js"; +import { OsmosisHistoricalProcessor } from "./OsmosisHistoricalProcessor.js"; +import { OsmosisController } from './osmosis.controller.js'; + +const roles = process.env.ROLES!.split(","); + +const workers: Type[] = []; +if (roles.includes("osmosis-live-processor")) { + workers.push(OsmosisLiveProcessor); +} +if (roles.includes("osmosis-historical-processor")) { + workers.push(OsmosisHistoricalProcessor); +} + +@Module({ + imports: [ + ClickhouseModule, + + BullModule.registerQueue({ + name: "osmosis-live", + connection: redisClient, + }), + BullModule.registerQueue({ + name: "osmosis-historical", + connection: redisClient, + }), + ], + controllers: [OsmosisController], + providers: [OsmosisLiveProcessor, OsmosisHistoricalProcessor, ...workers], +}) +export class OsmosisModule {} \ No newline at end of file diff --git a/api/src/clickhouse/bridged-networks/osmosis/osmosis.controller.ts b/api/src/clickhouse/bridged-networks/osmosis/osmosis.controller.ts new file mode 100644 index 00000000..c87834ac --- /dev/null +++ b/api/src/clickhouse/bridged-networks/osmosis/osmosis.controller.ts @@ -0,0 +1,25 @@ +import { Controller, Get, Post } from '@nestjs/common'; +import { OsmosisHistoricalProcessor } from './OsmosisHistoricalProcessor.js'; +import { OsmosisLiveProcessor } from './OsmosisLiveProcessor.js'; + +@Controller('osmosis') +export class OsmosisController { + constructor( + private readonly osmosisHistoricalProcessor: OsmosisHistoricalProcessor, + private readonly osmosisLiveProcessor: OsmosisLiveProcessor, + ) {} + + @Get('fetch-historical') + @Post('fetch-historical') + async triggerHistoricalFetch(): Promise { + await this.osmosisHistoricalProcessor.triggerFetchHistoricalData(); + return 'Historical data fetching triggered'; + } + + @Get('fetch-live') + @Post('fetch-live') + async triggerLiveFetch(): Promise { + await this.osmosisLiveProcessor.triggerFetchLiveData(); + return 'Live data fetching triggered'; + } +} From f1f66246e0dedda155cb20943aaeb787869a0510 Mon Sep 17 00:00:00 2001 From: hemulin Date: Thu, 30 May 2024 14:31:59 +0100 Subject: [PATCH 02/10] Historical Osmosis data fetch and process working --- api/.gitignore | 2 ++ .../osmosis/OsmosisHistoricalProcessor.ts | 27 ++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/api/.gitignore b/api/.gitignore index fb54d978..adee2355 100644 --- a/api/.gitignore +++ b/api/.gitignore @@ -36,3 +36,5 @@ lerna-debug.log* .env .geoip + +*service_account.json \ No newline at end of file diff --git a/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts b/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts index 6a373883..0cfa4770 100644 --- a/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts +++ b/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts @@ -29,7 +29,7 @@ interface PoolSwapEvent { export class OsmosisHistoricalProcessor extends WorkerHost { private readonly endpoint = 'https://osmosis.numia.xyz/v2/txs'; private readonly pageSize = 100; - private readonly bigQueryClient = new BigQuery(); + private readonly bigQueryClient: BigQuery; constructor( private readonly clickhouseService: ClickhouseService, @@ -38,6 +38,13 @@ export class OsmosisHistoricalProcessor extends WorkerHost { private readonly osmosisQueue: Queue, ) { super(); + // Path to service account key file + const keyFile = './bigquery_service_account.json'; + + // Create a BigQuery client + this.bigQueryClient = new BigQuery({ + keyFilename: keyFile, + }); } public async process(job: Job): Promise { @@ -56,9 +63,9 @@ export class OsmosisHistoricalProcessor extends WorkerHost { } public async triggerFetchHistoricalData() { - // await this.osmosisQueue.add("fetchHistoricalData", undefined); + await this.osmosisQueue.add("fetchHistoricalData", undefined); // await this.fetchMintBurnEvents(); - await this.fetchPoolSwapEvents(); + // await this.fetchPoolSwapEvents(); } private async fetchHistoricalData() { @@ -161,7 +168,7 @@ export class OsmosisHistoricalProcessor extends WorkerHost { for (const row of rows) { const side = row.denom_in === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA' ? 'buy' : 'sell'; const swapEvent = { - timestamp: new Date(row.ingestion_timestamp).getTime(), + timestamp: new Date(row.ingestion_timestamp.value).getTime(), sender: row.sender, side, amount: side === 'buy' ? row.parsed_amount_in : row.parsed_amount_out, @@ -171,8 +178,7 @@ export class OsmosisHistoricalProcessor extends WorkerHost { } private async insertMintBurnEvent(event: MintBurnEvent) { - console.log('Mint/Burn Event:', event); - /* + // console.log('Mint/Burn Event:', event); await this.clickhouseService.client.insert({ table: 'mint_burn_events', values: { @@ -182,12 +188,10 @@ export class OsmosisHistoricalProcessor extends WorkerHost { address: event.address, } }); - */ } private async insertTransferEvent(event: TransferEvent) { - console.log('Transfer Event:', event); - /* + // console.log('Transfer Event:', event); await this.clickhouseService.client.insert({ table: 'transfer_events', values: { @@ -197,12 +201,10 @@ export class OsmosisHistoricalProcessor extends WorkerHost { amount: event.amount, } }); - */ } private async insertSwapEvent(event: PoolSwapEvent) { - console.log('Pool Swap Event:', event); - /* + // console.log('Pool Swap Event:', event); await this.clickhouseService.client.insert({ table: 'pool_swap_events', values: { @@ -212,6 +214,5 @@ export class OsmosisHistoricalProcessor extends WorkerHost { amount: event.amount, } }); - */ } } From 3d5e53ba5043b09b9ec8d027b6c892bd957cffbc Mon Sep 17 00:00:00 2001 From: minaxolone Date: Thu, 30 May 2024 17:39:08 -0400 Subject: [PATCH 03/10] cleanup --- api/package-lock.json | 128 ++++++++++++------ api/package.json | 1 + api/src/config/config.interface.ts | 7 +- api/src/config/config.ts | 4 + .../OsmosisController.ts} | 18 ++- .../osmosis/OsmosisHistoricalProcessor.ts | 90 ++++++------ .../osmosis/OsmosisLiveProcessor.ts | 3 +- .../osmosis/OsmosisModule.ts | 8 +- 8 files changed, 166 insertions(+), 93 deletions(-) rename api/src/{clickhouse/bridged-networks/osmosis/osmosis.controller.ts => osmosis/OsmosisController.ts} (51%) rename api/src/{clickhouse/bridged-networks => }/osmosis/OsmosisHistoricalProcessor.ts (73%) rename api/src/{clickhouse/bridged-networks => }/osmosis/OsmosisLiveProcessor.ts (98%) rename api/src/{clickhouse/bridged-networks => }/osmosis/OsmosisModule.ts (80%) diff --git a/api/package-lock.json b/api/package-lock.json index 7a69e531..ee3be249 100644 --- a/api/package-lock.json +++ b/api/package-lock.json @@ -13,6 +13,7 @@ "@aptos-labs/ts-sdk": "^1.16.0", "@aws-sdk/client-s3": "^3.583.0", "@clickhouse/client": "^1.0.2", + "@google-cloud/bigquery": "^7.7.0", "@nestjs/apollo": "^12.1.0", "@nestjs/bullmq": "^10.1.1", "@nestjs/common": "^10.3.8", @@ -2279,6 +2280,46 @@ "tslib": "^2.1.0" } }, + "node_modules/@google-cloud/bigquery": { + "version": "7.7.0", + "resolved": "https://registry.npmjs.org/@google-cloud/bigquery/-/bigquery-7.7.0.tgz", + "integrity": "sha512-KK0t7j0RAwqRxt3vc0mtoR+DBSiBmbeJaRv7tAIWlQsyE43pxMu7t94i0kUvR1SKnQBZYX9+iBr+mjdmNORPSQ==", + "dependencies": { + "@google-cloud/common": "^5.0.0", + "@google-cloud/paginator": "^5.0.0", + "@google-cloud/precise-date": "^4.0.0", + "@google-cloud/promisify": "^4.0.0", + "arrify": "^2.0.1", + "big.js": "^6.0.0", + "duplexify": "^4.0.0", + "extend": "^3.0.2", + "is": "^3.3.0", + "stream-events": "^1.0.5", + "uuid": "^9.0.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@google-cloud/common": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/@google-cloud/common/-/common-5.0.2.tgz", + "integrity": "sha512-V7bmBKYQyu0eVG2BFejuUjlBt+zrya6vtsKdY+JxMM/dNntPF41vZ9+LhOshEUH01zOHEqBSvI7Dad7ZS6aUeA==", + "dependencies": { + "@google-cloud/projectify": "^4.0.0", + "@google-cloud/promisify": "^4.0.0", + "arrify": "^2.0.1", + "duplexify": "^4.1.1", + "extend": "^3.0.2", + "google-auth-library": "^9.0.0", + "html-entities": "^2.5.2", + "retry-request": "^7.0.0", + "teeny-request": "^9.0.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/@google-cloud/firestore": { "version": "7.7.0", "resolved": "https://registry.npmjs.org/@google-cloud/firestore/-/firestore-7.7.0.tgz", @@ -2298,7 +2339,6 @@ "version": "5.0.0", "resolved": "https://registry.npmjs.org/@google-cloud/paginator/-/paginator-5.0.0.tgz", "integrity": "sha512-87aeg6QQcEPxGCOthnpUjvw4xAZ57G7pL8FS0C4e/81fr3FjkpUpibf1s2v5XGyGhUVGF4Jfg7yEcxqn2iUw1w==", - "optional": true, "dependencies": { "arrify": "^2.0.0", "extend": "^3.0.2" @@ -2307,11 +2347,18 @@ "node": ">=14.0.0" } }, + "node_modules/@google-cloud/precise-date": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/@google-cloud/precise-date/-/precise-date-4.0.0.tgz", + "integrity": "sha512-1TUx3KdaU3cN7nfCdNf+UVqA/PSX29Cjcox3fZZBtINlRrXVTmUkQnCKv2MbBUbCopbK4olAT1IHl76uZyCiVA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/@google-cloud/projectify": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/@google-cloud/projectify/-/projectify-4.0.0.tgz", "integrity": "sha512-MmaX6HeSvyPbWGwFq7mXdo0uQZLGBYCwziiLIGq5JVX+/bdI3SAq6bP98trV5eTWfLuvsMcIC1YJOF2vfteLFA==", - "optional": true, "engines": { "node": ">=14.0.0" } @@ -2320,7 +2367,6 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/@google-cloud/promisify/-/promisify-4.0.0.tgz", "integrity": "sha512-Orxzlfb9c67A15cq2JQEyVc7wEsmFBmHjZWZYQMUyJ1qivXyMwdyNOs9odi79hze+2zqdTtu1E19IM/FtqZ10g==", - "optional": true, "engines": { "node": ">=14" } @@ -4546,7 +4592,6 @@ "version": "2.0.0", "resolved": "https://registry.npmjs.org/@tootallnate/once/-/once-2.0.0.tgz", "integrity": "sha512-XCuKFP5PS55gnMVu3dty8KPatLqUoy/ZYzDzAGCQ8JNFCkLXzmI7vNHCR+XpbZaMWQK/vQubr7PkYq8g470J/A==", - "optional": true, "engines": { "node": ">= 10" } @@ -4653,8 +4698,7 @@ "node_modules/@types/caseless": { "version": "0.12.5", "resolved": "https://registry.npmjs.org/@types/caseless/-/caseless-0.12.5.tgz", - "integrity": "sha512-hWtVTC2q7hc7xZ/RLbxapMvDMgUnDvKvMOpKal4DrMyfGBUfB1oKaZlIRr6mJL+If3bAP6sV/QneGzF6tJjZDg==", - "optional": true + "integrity": "sha512-hWtVTC2q7hc7xZ/RLbxapMvDMgUnDvKvMOpKal4DrMyfGBUfB1oKaZlIRr6mJL+If3bAP6sV/QneGzF6tJjZDg==" }, "node_modules/@types/connect": { "version": "3.4.38", @@ -4850,7 +4894,6 @@ "version": "2.48.12", "resolved": "https://registry.npmjs.org/@types/request/-/request-2.48.12.tgz", "integrity": "sha512-G3sY+NpsA9jnwm0ixhAFQSJ3Q9JkpLZpJbI3GMv0mIAT0y3mRabYeINzal5WOChIiaTEGQYlHOKgkaM9EisWHw==", - "optional": true, "dependencies": { "@types/caseless": "*", "@types/node": "*", @@ -4862,7 +4905,6 @@ "version": "2.5.1", "resolved": "https://registry.npmjs.org/form-data/-/form-data-2.5.1.tgz", "integrity": "sha512-m21N3WOmEEURgk6B9GLOE4RuWOFf28Lhh9qGYeNlGq4VDXUlJy2th2slBNU8Gp8EzloYZOibZJ7t5ecIrFSjVA==", - "optional": true, "dependencies": { "asynckit": "^0.4.0", "combined-stream": "^1.0.6", @@ -4929,8 +4971,7 @@ "node_modules/@types/tough-cookie": { "version": "4.0.5", "resolved": "https://registry.npmjs.org/@types/tough-cookie/-/tough-cookie-4.0.5.tgz", - "integrity": "sha512-/Ad8+nIOV7Rl++6f1BdKxFSMgmoqEoYbHRpPcx3JEfv8VRsQe9Z4mCXeJBzxs7mbHY/XOZZuXlRNfhpVPbs6ZA==", - "optional": true + "integrity": "sha512-/Ad8+nIOV7Rl++6f1BdKxFSMgmoqEoYbHRpPcx3JEfv8VRsQe9Z4mCXeJBzxs7mbHY/XOZZuXlRNfhpVPbs6ZA==" }, "node_modules/@types/yargs": { "version": "17.0.32", @@ -5363,7 +5404,6 @@ "version": "7.1.0", "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-7.1.0.tgz", "integrity": "sha512-o/zjMZRhJxny7OyEF+Op8X+efiELC7k7yOjMzgfzVqOzXqkBkWI79YoTdOtsuWd5BWhAGAuOY/Xa6xpiaWXiNg==", - "optional": true, "dependencies": { "debug": "^4.3.4" }, @@ -5543,7 +5583,6 @@ "version": "2.0.1", "resolved": "https://registry.npmjs.org/arrify/-/arrify-2.0.1.tgz", "integrity": "sha512-3duEwti880xqi4eAMN8AyR4a0ByT90zoYdLlevfrvU43vb0YZwZVfxOgxWrLXXXpyugL0hNZc9G6BiB5B3nUug==", - "optional": true, "engines": { "node": ">=8" } @@ -5731,11 +5770,22 @@ } ] }, + "node_modules/big.js": { + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/big.js/-/big.js-6.2.1.tgz", + "integrity": "sha512-bCtHMwL9LeDIozFn+oNhhFoq+yQ3BNdnsLSASUxLciOb1vgvpHsIO1dsENiGMgbb4SkP5TrzWzRiLddn8ahVOQ==", + "engines": { + "node": "*" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/bigjs" + } + }, "node_modules/bignumber.js": { "version": "9.1.2", "resolved": "https://registry.npmjs.org/bignumber.js/-/bignumber.js-9.1.2.tgz", "integrity": "sha512-2/mKyZH9K85bzOEfhXDBFZTGd1CTs+5IHpeFQo9luiBG7hghdC851Pj2WAhb6E3R6b9tZj/XKhbg4fum+Kepug==", - "optional": true, "engines": { "node": "*" } @@ -6788,7 +6838,6 @@ "version": "4.1.2", "resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.2.tgz", "integrity": "sha512-fz3OjcNCHmRP12MJoZMPglx8m4rrFP8rovnk4vT8Fs+aonZoCwGg10dSsQsfP/E62eZcPTMSMP6686fu9Qlqtw==", - "optional": true, "dependencies": { "end-of-stream": "^1.4.1", "inherits": "^2.0.3", @@ -6800,7 +6849,6 @@ "version": "3.6.2", "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", - "optional": true, "dependencies": { "inherits": "^2.0.3", "string_decoder": "^1.1.1", @@ -7341,8 +7389,7 @@ "node_modules/extend": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/extend/-/extend-3.0.2.tgz", - "integrity": "sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==", - "optional": true + "integrity": "sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==" }, "node_modules/external-editor": { "version": "3.1.0", @@ -7863,7 +7910,6 @@ "version": "6.3.0", "resolved": "https://registry.npmjs.org/gaxios/-/gaxios-6.3.0.tgz", "integrity": "sha512-p+ggrQw3fBwH2F5N/PAI4k/G/y1art5OxKpb2J2chwNNHM4hHuAOtivjPuirMF4KNKwTTUal/lPfL2+7h2mEcg==", - "optional": true, "dependencies": { "extend": "^3.0.2", "https-proxy-agent": "^7.0.1", @@ -7878,7 +7924,6 @@ "version": "6.1.0", "resolved": "https://registry.npmjs.org/gcp-metadata/-/gcp-metadata-6.1.0.tgz", "integrity": "sha512-Jh/AIwwgaxan+7ZUUmRLCjtchyDiqh4KjBJ5tW3plBZb5iL/BPcso8A5DlzeD9qlw0duCamnNdpFjxwaT0KyKg==", - "optional": true, "dependencies": { "gaxios": "^6.0.0", "json-bigint": "^1.0.0" @@ -8029,7 +8074,6 @@ "version": "9.6.3", "resolved": "https://registry.npmjs.org/google-auth-library/-/google-auth-library-9.6.3.tgz", "integrity": "sha512-4CacM29MLC2eT9Cey5GDVK4Q8t+MMp8+OEdOaqD9MG6b0dOyLORaaeJMPQ7EESVgm/+z5EKYyFLxgzBJlJgyHQ==", - "optional": true, "dependencies": { "base64-js": "^1.3.0", "ecdsa-sig-formatter": "^1.0.11", @@ -8046,7 +8090,6 @@ "version": "2.0.0", "resolved": "https://registry.npmjs.org/jwa/-/jwa-2.0.0.tgz", "integrity": "sha512-jrZ2Qx916EA+fq9cEAeCROWPTfCwi1IVHqT2tapuqLEVVDKFDENFw1oL+MwrTvH6msKxsd1YTDVw6uKEcsrLEA==", - "optional": true, "dependencies": { "buffer-equal-constant-time": "1.0.1", "ecdsa-sig-formatter": "1.0.11", @@ -8057,7 +8100,6 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/jws/-/jws-4.0.0.tgz", "integrity": "sha512-KDncfTmOZoOMTFG4mBlG0qUIOlc03fmzH+ru6RgYVZhPkyiy/92Owlt/8UEN+a4TXR1FQetfIpJE8ApdvdVxTg==", - "optional": true, "dependencies": { "jwa": "^2.0.0", "safe-buffer": "^5.0.1" @@ -8173,7 +8215,6 @@ "version": "7.1.0", "resolved": "https://registry.npmjs.org/gtoken/-/gtoken-7.1.0.tgz", "integrity": "sha512-pCcEwRi+TKpMlxAQObHDQ56KawURgyAf6jtIY046fJ5tIv3zDe/LEIubckAO8fj6JnAxLdmWkUfNyulQ2iKdEw==", - "optional": true, "dependencies": { "gaxios": "^6.0.0", "jws": "^4.0.0" @@ -8186,7 +8227,6 @@ "version": "2.0.0", "resolved": "https://registry.npmjs.org/jwa/-/jwa-2.0.0.tgz", "integrity": "sha512-jrZ2Qx916EA+fq9cEAeCROWPTfCwi1IVHqT2tapuqLEVVDKFDENFw1oL+MwrTvH6msKxsd1YTDVw6uKEcsrLEA==", - "optional": true, "dependencies": { "buffer-equal-constant-time": "1.0.1", "ecdsa-sig-formatter": "1.0.11", @@ -8197,7 +8237,6 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/jws/-/jws-4.0.0.tgz", "integrity": "sha512-KDncfTmOZoOMTFG4mBlG0qUIOlc03fmzH+ru6RgYVZhPkyiy/92Owlt/8UEN+a4TXR1FQetfIpJE8ApdvdVxTg==", - "optional": true, "dependencies": { "jwa": "^2.0.0", "safe-buffer": "^5.0.1" @@ -8273,6 +8312,21 @@ "node": ">=8" } }, + "node_modules/html-entities": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/html-entities/-/html-entities-2.5.2.tgz", + "integrity": "sha512-K//PSRMQk4FZ78Kyau+mZurHn3FH0Vwr+H36eE0rPbeYkRRi9YxceYPhuN60UwWorxyKHhqoAJl2OFKa4BVtaA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/mdevils" + }, + { + "type": "patreon", + "url": "https://patreon.com/mdevils" + } + ] + }, "node_modules/html-escaper": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/html-escaper/-/html-escaper-2.0.2.tgz", @@ -8308,7 +8362,6 @@ "version": "5.0.0", "resolved": "https://registry.npmjs.org/http-proxy-agent/-/http-proxy-agent-5.0.0.tgz", "integrity": "sha512-n2hY8YdoRE1i7r6M0w9DIw5GgZN0G25P8zLCRQ8rjXtTU3vsNFBI/vWK/UIeE6g5MUUz6avwAPXmL6Fy9D/90w==", - "optional": true, "dependencies": { "@tootallnate/once": "2", "agent-base": "6", @@ -8322,7 +8375,6 @@ "version": "6.0.2", "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz", "integrity": "sha512-RZNwNclF7+MS/8bDg70amg32dyeZGZxiDuQmZxKLAlQjr3jGyLx+4Kkk58UO7D2QdgFIQCovuSuZESne6RG6XQ==", - "optional": true, "dependencies": { "debug": "4" }, @@ -8346,7 +8398,6 @@ "version": "7.0.4", "resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-7.0.4.tgz", "integrity": "sha512-wlwpilI7YdjSkWaQ/7omYBMTliDcmCN8OLihO6I9B86g06lMyAoqgoDpV0XqoaPOKj+0DIdAvnsWfyAAhmimcg==", - "optional": true, "dependencies": { "agent-base": "^7.0.2", "debug": "4" @@ -8532,6 +8583,14 @@ "node": ">= 0.10" } }, + "node_modules/is": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/is/-/is-3.3.0.tgz", + "integrity": "sha512-nW24QBoPcFGGHJGUwnfpI7Yc5CdqWNdsyHQszVE/z2pKHXzh7FZ5GWhJqSyaQ9wMkQnsTx+kAI8bHlCX4tKdbg==", + "engines": { + "node": "*" + } + }, "node_modules/is-arrayish": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/is-arrayish/-/is-arrayish-0.2.1.tgz", @@ -8628,7 +8687,6 @@ "version": "2.0.1", "resolved": "https://registry.npmjs.org/is-stream/-/is-stream-2.0.1.tgz", "integrity": "sha512-hFoiJiTl63nn+kstHGBtewWSKnQLpyb155KHheA1l39uvtO9nWIop1p3udqPcUd/xbF1VLMO4n7OI6p7RbngDg==", - "devOptional": true, "engines": { "node": ">=8" }, @@ -9548,7 +9606,6 @@ "version": "1.0.0", "resolved": "https://registry.npmjs.org/json-bigint/-/json-bigint-1.0.0.tgz", "integrity": "sha512-SiPv/8VpZuWbvLSMtTDU8hEfrZWg/mH/nV/b4o0CYbSxu1UIQPLdwKOCIyLQX+VIPO5vrLX3i8qtqFyhdPSUSQ==", - "optional": true, "dependencies": { "bignumber.js": "^9.0.0" } @@ -11184,7 +11241,6 @@ "version": "7.0.2", "resolved": "https://registry.npmjs.org/retry-request/-/retry-request-7.0.2.tgz", "integrity": "sha512-dUOvLMJ0/JJYEn8NrpOaGNE7X3vpI5XlZS/u0ANjqtcZVKnIxP7IgCFwrKTxENw29emmwug53awKtaMm4i9g5w==", - "optional": true, "dependencies": { "@types/request": "^2.48.8", "extend": "^3.0.2", @@ -11741,7 +11797,6 @@ "version": "1.0.5", "resolved": "https://registry.npmjs.org/stream-events/-/stream-events-1.0.5.tgz", "integrity": "sha512-E1GUzBSgvct8Jsb3v2X15pjzN1tYebtbLaMg+eBOUOAxgbLoSbT2NS91ckc5lJD1KfLjId+jXJRgo0qnV5Nerg==", - "optional": true, "dependencies": { "stubs": "^3.0.0" } @@ -11749,8 +11804,7 @@ "node_modules/stream-shift": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", - "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", - "optional": true + "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==" }, "node_modules/streamsearch": { "version": "1.1.0", @@ -11869,8 +11923,7 @@ "node_modules/stubs": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/stubs/-/stubs-3.0.0.tgz", - "integrity": "sha512-PdHt7hHUJKxvTCgbKX9C1V/ftOcjJQgz8BZwNfV5c4B6dcGqlpelTbJ999jBGZ2jYiPAwcX5dP6oBwVlBlUbxw==", - "optional": true + "integrity": "sha512-PdHt7hHUJKxvTCgbKX9C1V/ftOcjJQgz8BZwNfV5c4B6dcGqlpelTbJ999jBGZ2jYiPAwcX5dP6oBwVlBlUbxw==" }, "node_modules/subscriptions-transport-ws": { "version": "0.11.0", @@ -12067,7 +12120,6 @@ "version": "9.0.0", "resolved": "https://registry.npmjs.org/teeny-request/-/teeny-request-9.0.0.tgz", "integrity": "sha512-resvxdc6Mgb7YEThw6G6bExlXKkv6+YbuzGg9xuXxSgxJF7Ozs+o8Y9+2R3sArdWdW8nOokoQb1yrpFB0pQK2g==", - "optional": true, "dependencies": { "http-proxy-agent": "^5.0.0", "https-proxy-agent": "^5.0.0", @@ -12083,7 +12135,6 @@ "version": "6.0.2", "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz", "integrity": "sha512-RZNwNclF7+MS/8bDg70amg32dyeZGZxiDuQmZxKLAlQjr3jGyLx+4Kkk58UO7D2QdgFIQCovuSuZESne6RG6XQ==", - "optional": true, "dependencies": { "debug": "4" }, @@ -12095,7 +12146,6 @@ "version": "5.0.1", "resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-5.0.1.tgz", "integrity": "sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==", - "optional": true, "dependencies": { "agent-base": "6", "debug": "4" diff --git a/api/package.json b/api/package.json index 1358e6cd..71f3b16f 100644 --- a/api/package.json +++ b/api/package.json @@ -28,6 +28,7 @@ "@aptos-labs/ts-sdk": "^1.16.0", "@aws-sdk/client-s3": "^3.583.0", "@clickhouse/client": "^1.0.2", + "@google-cloud/bigquery": "^7.7.0", "@nestjs/apollo": "^12.1.0", "@nestjs/bullmq": "^10.1.1", "@nestjs/common": "^10.3.8", diff --git a/api/src/config/config.interface.ts b/api/src/config/config.interface.ts index d8425140..7d6cd8b8 100644 --- a/api/src/config/config.interface.ts +++ b/api/src/config/config.interface.ts @@ -5,6 +5,7 @@ export interface Config { info: InfoConfig; ol: OlConfig; + numia: NumiaConfig; s3: S3Config; clickhouse: ClickhouseConfig; apn?: ApnConfig; @@ -52,4 +53,8 @@ export interface FirebaseConfig { export interface NatsConfig { servers: string; -} \ No newline at end of file +} + +export interface NumiaConfig { + apiKey?: string; +} diff --git a/api/src/config/config.ts b/api/src/config/config.ts index 177bff4b..4c8db5b6 100644 --- a/api/src/config/config.ts +++ b/api/src/config/config.ts @@ -18,6 +18,10 @@ export default (): Config => { dataApiHost: ENV.DATA_API_HOST!, }, + numia: { + apiKey: ENV.NUMIA_API_KEY, + }, + s3: { region: ENV.S3_REGION!, endpoint: ENV.S3_ENDPOINT!, diff --git a/api/src/clickhouse/bridged-networks/osmosis/osmosis.controller.ts b/api/src/osmosis/OsmosisController.ts similarity index 51% rename from api/src/clickhouse/bridged-networks/osmosis/osmosis.controller.ts rename to api/src/osmosis/OsmosisController.ts index c87834ac..73fe7c88 100644 --- a/api/src/clickhouse/bridged-networks/osmosis/osmosis.controller.ts +++ b/api/src/osmosis/OsmosisController.ts @@ -1,25 +1,23 @@ -import { Controller, Get, Post } from '@nestjs/common'; -import { OsmosisHistoricalProcessor } from './OsmosisHistoricalProcessor.js'; -import { OsmosisLiveProcessor } from './OsmosisLiveProcessor.js'; +import { Controller, Post } from "@nestjs/common"; +import { OsmosisHistoricalProcessor } from "./OsmosisHistoricalProcessor.js"; +import { OsmosisLiveProcessor } from "./OsmosisLiveProcessor.js"; -@Controller('osmosis') +@Controller("osmosis") export class OsmosisController { constructor( private readonly osmosisHistoricalProcessor: OsmosisHistoricalProcessor, private readonly osmosisLiveProcessor: OsmosisLiveProcessor, ) {} - @Get('fetch-historical') - @Post('fetch-historical') + @Post("fetch-historical") async triggerHistoricalFetch(): Promise { await this.osmosisHistoricalProcessor.triggerFetchHistoricalData(); - return 'Historical data fetching triggered'; + return "Historical data fetching triggered"; } - @Get('fetch-live') - @Post('fetch-live') + @Post("fetch-live") async triggerLiveFetch(): Promise { await this.osmosisLiveProcessor.triggerFetchLiveData(); - return 'Live data fetching triggered'; + return "Live data fetching triggered"; } } diff --git a/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts b/api/src/osmosis/OsmosisHistoricalProcessor.ts similarity index 73% rename from api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts rename to api/src/osmosis/OsmosisHistoricalProcessor.ts index 0cfa4770..b0a62393 100644 --- a/api/src/clickhouse/bridged-networks/osmosis/OsmosisHistoricalProcessor.ts +++ b/api/src/osmosis/OsmosisHistoricalProcessor.ts @@ -1,8 +1,11 @@ -import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq"; import { Job, Queue } from "bullmq"; +import { BigQuery } from "@google-cloud/bigquery"; import axios from "axios"; -import { ClickhouseService } from '../../clickhouse.service.js'; -import { BigQuery } from '@google-cloud/bigquery'; +import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq"; +import { ConfigService } from "@nestjs/config"; + +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; +import { NumiaConfig } from "../config/config.interface.js"; interface MintBurnEvent { timestamp: number; @@ -27,19 +30,28 @@ interface PoolSwapEvent { @Processor("osmosis-historical") export class OsmosisHistoricalProcessor extends WorkerHost { - private readonly endpoint = 'https://osmosis.numia.xyz/v2/txs'; + private readonly endpoint = "https://osmosis.numia.xyz/v2/txs"; + private readonly pageSize = 100; + + private readonly numiaApiKey?: string; + private readonly bigQueryClient: BigQuery; constructor( + config: ConfigService, + private readonly clickhouseService: ClickhouseService, @InjectQueue("osmosis-historical") private readonly osmosisQueue: Queue, ) { super(); + + this.numiaApiKey = config.get("numia")?.apiKey; + // Path to service account key file - const keyFile = './bigquery_service_account.json'; + const keyFile = "./bigquery_service_account.json"; // Create a BigQuery client this.bigQueryClient = new BigQuery({ @@ -74,64 +86,64 @@ export class OsmosisHistoricalProcessor extends WorkerHost { } private async fetchMintBurnEvents() { - let page = 1; - let hasMore = true; - - while (hasMore) { + for (let page = 1; ; ++page) { const url = `${this.endpoint}/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p?pageSize=${this.pageSize}&page=${page}`; - console.log(url) const response = await axios.get(url, { headers: { - Authorization: `Bearer ${process.env.NUMIA_API_KEY}`, + Authorization: `Bearer ${this.numiaApiKey}`, }, }); const events = response.data; if (events.length === 0) { - hasMore = false; - continue; + break; } for (const event of events) { const timestamp = new Date(event.blockTimestamp).getTime(); - const type = event.messageTypes[0] === '/osmosis.tokenfactory.v1beta1.MsgMint' ? 'mint' : 'burn'; + const type = + event.messageTypes[0] === "/osmosis.tokenfactory.v1beta1.MsgMint" + ? "mint" + : "burn"; const amount = event.messages[0].amount.amount; - const address = type === 'mint' ? event.messages[0].mint_to_address : event.messages[0].burn_from_address; + const address = + type === "mint" + ? event.messages[0].mint_to_address + : event.messages[0].burn_from_address; await this.insertMintBurnEvent({ timestamp, type, amount, address }); - if (type === 'mint') { + if (type === "mint") { // Not a proper DFS, just one hop away from minter - await this.fetchTransfersForAddress(event.messages[0].mint_to_address); + await this.fetchTransfersForAddress( + event.messages[0].mint_to_address, + ); } } - - page++; } } private async fetchTransfersForAddress(address: string) { - let page = 1; - let hasMore = true; - - while (hasMore) { + for (let page = 1; ; ++page) { const url = `${this.endpoint}/${address}?pageSize=${this.pageSize}&page=${page}`; const response = await axios.get(url, { headers: { - Authorization: `Bearer ${process.env.NUMIA_API_KEY}`, + Authorization: `Bearer ${this.numiaApiKey}`, }, }); const events = response.data; if (events.length === 0) { - hasMore = false; - continue; + break; } for (const event of events) { - if (event.messageTypes.includes('/cosmos.bank.v1beta1.MsgSend')) { + if (event.messageTypes.includes("/cosmos.bank.v1beta1.MsgSend")) { for (const message of event.messages) { - if (message.amount[0].denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { + if ( + message.amount[0].denom === + "factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA" + ) { const transferEvent = { timestamp: new Date(event.blockTimestamp).getTime(), from: message.from_address, @@ -143,8 +155,6 @@ export class OsmosisHistoricalProcessor extends WorkerHost { } } } - - page++; } } @@ -166,12 +176,16 @@ export class OsmosisHistoricalProcessor extends WorkerHost { const [rows] = await this.bigQueryClient.query(query); for (const row of rows) { - const side = row.denom_in === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA' ? 'buy' : 'sell'; + const side = + row.denom_in === + "factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA" + ? "buy" + : "sell"; const swapEvent = { timestamp: new Date(row.ingestion_timestamp.value).getTime(), sender: row.sender, side, - amount: side === 'buy' ? row.parsed_amount_in : row.parsed_amount_out, + amount: side === "buy" ? row.parsed_amount_in : row.parsed_amount_out, }; await this.insertSwapEvent(swapEvent); } @@ -180,39 +194,39 @@ export class OsmosisHistoricalProcessor extends WorkerHost { private async insertMintBurnEvent(event: MintBurnEvent) { // console.log('Mint/Burn Event:', event); await this.clickhouseService.client.insert({ - table: 'mint_burn_events', + table: "mint_burn_events", values: { timestamp: event.timestamp, type: event.type, amount: event.amount, address: event.address, - } + }, }); } private async insertTransferEvent(event: TransferEvent) { // console.log('Transfer Event:', event); await this.clickhouseService.client.insert({ - table: 'transfer_events', + table: "transfer_events", values: { timestamp: event.timestamp, from: event.from, to: event.to, amount: event.amount, - } + }, }); } private async insertSwapEvent(event: PoolSwapEvent) { // console.log('Pool Swap Event:', event); await this.clickhouseService.client.insert({ - table: 'pool_swap_events', + table: "pool_swap_events", values: { timestamp: event.timestamp, sender: event.sender, side: event.side, amount: event.amount, - } + }, }); } } diff --git a/api/src/clickhouse/bridged-networks/osmosis/OsmosisLiveProcessor.ts b/api/src/osmosis/OsmosisLiveProcessor.ts similarity index 98% rename from api/src/clickhouse/bridged-networks/osmosis/OsmosisLiveProcessor.ts rename to api/src/osmosis/OsmosisLiveProcessor.ts index a97dcecf..6a2e15d4 100644 --- a/api/src/clickhouse/bridged-networks/osmosis/OsmosisLiveProcessor.ts +++ b/api/src/osmosis/OsmosisLiveProcessor.ts @@ -3,7 +3,8 @@ import { OnModuleInit } from "@nestjs/common"; import { Job, Queue } from "bullmq"; import axios from "axios"; import Bluebird from "bluebird"; -import { ClickhouseService } from '../../clickhouse.service.js'; + +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; interface PoolSwapEvent { timestamp: number; diff --git a/api/src/clickhouse/bridged-networks/osmosis/OsmosisModule.ts b/api/src/osmosis/OsmosisModule.ts similarity index 80% rename from api/src/clickhouse/bridged-networks/osmosis/OsmosisModule.ts rename to api/src/osmosis/OsmosisModule.ts index 817881fe..2780ab9e 100644 --- a/api/src/clickhouse/bridged-networks/osmosis/OsmosisModule.ts +++ b/api/src/osmosis/OsmosisModule.ts @@ -2,11 +2,11 @@ import process from "node:process"; import { BullModule } from "@nestjs/bullmq"; import { Module, Type } from "@nestjs/common"; -import { redisClient } from "../../../redis/redis.service.js"; -import { ClickhouseModule } from "../../clickhouse.module.js"; import { OsmosisLiveProcessor } from "./OsmosisLiveProcessor.js"; import { OsmosisHistoricalProcessor } from "./OsmosisHistoricalProcessor.js"; -import { OsmosisController } from './osmosis.controller.js'; +import { OsmosisController } from "./OsmosisController.js"; +import { ClickhouseModule } from "../clickhouse/clickhouse.module.js"; +import { redisClient } from "../redis/redis.service.js"; const roles = process.env.ROLES!.split(","); @@ -34,4 +34,4 @@ if (roles.includes("osmosis-historical-processor")) { controllers: [OsmosisController], providers: [OsmosisLiveProcessor, OsmosisHistoricalProcessor, ...workers], }) -export class OsmosisModule {} \ No newline at end of file +export class OsmosisModule {} From c203c99c7b98f523ef3bb948bb4f4a174e9f4240 Mon Sep 17 00:00:00 2001 From: hemulin Date: Thu, 30 May 2024 23:20:06 +0100 Subject: [PATCH 04/10] Much refactor of the live data fetcher --- api/src/osmosis/OsmosisLiveProcessor.ts | 211 ++++++++++++++++++------ api/src/osmosis/OsmosisModule.ts | 4 +- 2 files changed, 160 insertions(+), 55 deletions(-) diff --git a/api/src/osmosis/OsmosisLiveProcessor.ts b/api/src/osmosis/OsmosisLiveProcessor.ts index 6a2e15d4..1d4c4310 100644 --- a/api/src/osmosis/OsmosisLiveProcessor.ts +++ b/api/src/osmosis/OsmosisLiveProcessor.ts @@ -8,23 +8,39 @@ import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; interface PoolSwapEvent { timestamp: number; - pool_id: string; - token_out_denom: string; - token_in_denom: string; + sender: string; side: string; amount: string; + txhash: string; } -interface MintBurnTransferEvent { +interface MintEvent { timestamp: number; - type: string; amount: string; - address: string; + mint_to_address: string; + txhash: string; +} + +interface BurnEvent { + timestamp: number; + amount: string; + burn_from_address: string; + txhash: string; +} + +interface TransferEvent { + timestamp: number; + from_address: string; + to_address: string; + amount: string; + txhash: string; } @Processor("osmosis-live") export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { private readonly endpoint = 'https://lcd.osmosis.zone/cosmos/tx/v1beta1/txs'; + private readonly POOL_ID = '1721'; + private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'; constructor( private readonly clickhouseService: ClickhouseService, @@ -41,8 +57,8 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { try { await Promise.race([ this.fetchAndStoreLiveData(), - // 1m timeout to avoid blocking the queue - Bluebird.delay(60 * 1_000), + // 30m timeout to avoid blocking the queue + Bluebird.delay(60 * 30 * 1_000), ]); } catch (error) { // fail silently to avoid accumulating failed repeating jobs @@ -60,97 +76,186 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { every: 60 * 30 * 1_000, // 1 minute }, }); - // await this.fetchAndStoreLiveData(); + await this.fetchAndStoreLiveData(); } public async triggerFetchLiveData() { // await this.osmosisQueue.add("fetchLiveData", undefined); - await this.fetchAndStoreLiveData(); + // await this.fetchAndStoreLiveData(); } private async fetchAndStoreLiveData() { const events = [ - // '/osmosis.poolmanager.v1beta1.SwapAmountInRoute', - // '/osmosis.poolmanager.v1beta1.SwapAmountOutRoute', - '/osmosis.gamm.v1beta1.MsgSwapExactAmountIn', - '/osmosis.gamm.v1beta1.MsgSwapExactAmountOut', + '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn', + '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut', '/osmosis.tokenfactory.v1beta1.MsgMint', + '/osmosis.tokenfactory.v1beta1.MsgBurn', '/cosmos.bank.v1beta1.MsgSend', ]; for (const event of events) { - const url = `${this.endpoint}?events=${event}`; - console.log(url) - const response = await axios.get(url); - const transactions = response.data.tx_responses; - - for (const tx of transactions) { - for (const log of tx.logs) { - for (const msg of log.events) { - if (msg.type === '/osmosis.gamm.v1beta1.MsgSwapExactAmountIn' || msg.type === '/osmosis.gamm.v1beta1.MsgSwapExactAmountOut') { - await this.handlePoolSwapEvent(msg); - } else if (msg.type === '/osmosis.tokenfactory.v1beta1.MsgMint' || msg.type === '/cosmos.bank.v1beta1.MsgSend') { - await this.handleMintBurnTransferEvent(msg); + for (let page = 1; page <= 5; page++) { + const url = `${this.endpoint}?events=message.action='${event}'&limit=100&page=${page}`; + console.log(url); + + try { + const response = await axios.get(url); + if (response.status !== 200) { + console.error(`Error fetching data from URL: ${url}, Status Code: ${response.status}`); + continue; + } + + const transactions = response.data.tx_responses; + for (const tx of transactions) { + const timestamp = new Date(tx.timestamp).getTime(); + const txhash = tx.txhash; + for (const msg of tx.tx.body.messages) { + if (msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn' || msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut') { + await this.handlePoolSwapEvent(msg, timestamp, txhash); + } else if (msg['@type'] === '/osmosis.tokenfactory.v1beta1.MsgMint') { + // await this.handleMintEvent(msg, timestamp, txhash); + } else if (msg['@type'] === '/osmosis.tokenfactory.v1beta1.MsgBurn') { + await this.handleBurnEvent(msg, timestamp, txhash); + } else if (msg['@type'] === '/cosmos.bank.v1beta1.MsgSend') { + await this.handleTransferEvent(msg, timestamp, txhash); + } } } + } catch (error) { + console.error(`Error fetching data from URL: ${url}`, error.message || error); + continue; } + // Delay the next request by 3 seconds + await new Promise(resolve => setTimeout(resolve, 3000)); } } } - private async handlePoolSwapEvent(event) { - const { pool_id, token_out_denom, token_in_denom } = event.attributes.reduce((acc, attr) => { - acc[attr.key] = attr.value; - return acc; - }, {}); + private async handlePoolSwapEvent(msg, timestamp: number, txhash: string) { + const { routes, token_in, token_out } = msg; - const timestamp = new Date(event.timestamp).getTime(); - const side = token_in_denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA' ? 'buy' : 'sell'; - const amount = side === 'buy' ? event.amount_in : event.amount_out; + // Check if pool_id 1721 exists in the routes + const relevantRoute = routes.find(route => route.pool_id === '1721'); + if (!relevantRoute) return; + + if (msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut') { + if (token_out.denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { + const side = 'buy'; + const amount = token_out.amount; + await this.insertSwapEvent({ timestamp, sender: msg.sender, side, amount, txhash }); + } else if (relevantRoute.token_in_denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { + const side = 'sell'; + const amount = msg.token_in_max_amount; + await this.insertSwapEvent({ timestamp, sender: msg.sender, side, amount, txhash }); + } + } else if (msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn') { + if (token_in.denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { + const side = 'sell'; + const amount = token_in.amount; + await this.insertSwapEvent({ timestamp, sender: msg.sender, side, amount, txhash }); + } else if (relevantRoute.token_out_denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { + const side = 'buy'; + const amount = msg.token_out_min_amount; + await this.insertSwapEvent({ timestamp, sender: msg.sender, side, amount, txhash }); + } + } + } - await this.insertSwapEvent({ timestamp, pool_id, token_out_denom, token_in_denom, side, amount }); + private async handleMintEvent(msg, timestamp: number, txhash: string) { + if (msg.amount.denom !== 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') return; + const mintEvent: MintEvent = { + timestamp, + amount: msg.amount.amount, + mint_to_address: msg.mintToAddress, + txhash + }; + await this.insertMintEvent(mintEvent); } - private async handleMintBurnTransferEvent(event) { - const type = event.type.includes('Mint') ? 'mint' : 'burn'; - const { amount, address } = event.attributes.reduce((acc, attr) => { - if (attr.key === 'amount') acc.amount = attr.value; - if (attr.key === 'address') acc.address = attr.value; - return acc; - }, {}); + private async handleBurnEvent(msg, timestamp: number, txhash: string) { + if (msg.amount.denom !== 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') return; + const burnEvent: BurnEvent = { + timestamp, + amount: msg.amount.amount, + burn_from_address: msg.burnFromAddress, + txhash + }; + await this.insertBurnEvent(burnEvent); + } - const timestamp = new Date(event.timestamp).getTime(); + private async handleTransferEvent(msg, timestamp: number, txhash: string) { + const transferAmounts = msg.amount.filter(amount => amount.denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'); + if (transferAmounts.length === 0) return; - await this.insertMintBurnTransferEvent({ timestamp, type, amount, address }); + for (const amount of transferAmounts) { + const transferEvent: TransferEvent = { + timestamp, + from_address: msg.from_address, + to_address: msg.to_address, + amount: amount.amount, + txhash, + }; + await this.insertTransferEvent(transferEvent); + } } private async insertSwapEvent(event: PoolSwapEvent) { - console.log('Pool Swap Event:', event); + console.log('Pool Swap Event:', JSON.stringify(event, null, 2)); /* await this.clickhouseService.client.insert({ table: 'pool_swap_events', values: { timestamp: event.timestamp, - pool_id: event.pool_id, - token_out_denom: event.token_out_denom, - token_in_denom: event.token_in_denom, + sender: event.sender, side: event.side, amount: event.amount, + txhash: event.txhash, + } + }); + */ + } + + private async insertMintEvent(event: MintEvent) { + console.log('Mint Event:', JSON.stringify(event, null, 2)); + /* + await this.clickhouseService.client.insert({ + table: 'mint_events', + values: { + timestamp: event.timestamp, + amount: event.amount, + mint_to_address: event.mint_to_address, + txhash: event.txhash, + } + }); + */ + } + + private async insertBurnEvent(event: BurnEvent) { + console.log('Burn Event:', JSON.stringify(event, null, 2)); + /* + await this.clickhouseService.client.insert({ + table: 'burn_events', + values: { + timestamp: event.timestamp, + amount: event.amount, + burn_from_address: event.burn_from_address, + txhash: event.txhash, } }); */ } - private async insertMintBurnTransferEvent(event: MintBurnTransferEvent) { - console.log('Mint/Burn Transfer Event:', event); + private async insertTransferEvent(event: TransferEvent) { + console.log('Transfer Event:', JSON.stringify(event, null, 2)); /* await this.clickhouseService.client.insert({ - table: 'mint_burn_transfer_events', + table: 'transfer_events', values: { timestamp: event.timestamp, - type: event.type, + from_address: event.from_address, + to_address: event.to_address, amount: event.amount, - address: event.address, + txhash: event.txhash, } }); */ diff --git a/api/src/osmosis/OsmosisModule.ts b/api/src/osmosis/OsmosisModule.ts index 2780ab9e..1891f653 100644 --- a/api/src/osmosis/OsmosisModule.ts +++ b/api/src/osmosis/OsmosisModule.ts @@ -12,10 +12,10 @@ const roles = process.env.ROLES!.split(","); const workers: Type[] = []; if (roles.includes("osmosis-live-processor")) { - workers.push(OsmosisLiveProcessor); + // workers.push(OsmosisLiveProcessor); } if (roles.includes("osmosis-historical-processor")) { - workers.push(OsmosisHistoricalProcessor); + // workers.push(OsmosisHistoricalProcessor); } @Module({ From 7c2670719815306f07e8f668602267be7bdcac11 Mon Sep 17 00:00:00 2001 From: hemulin Date: Thu, 30 May 2024 23:23:11 +0100 Subject: [PATCH 05/10] Uncommented forgotten comment --- api/src/osmosis/OsmosisLiveProcessor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/osmosis/OsmosisLiveProcessor.ts b/api/src/osmosis/OsmosisLiveProcessor.ts index 1d4c4310..36643446 100644 --- a/api/src/osmosis/OsmosisLiveProcessor.ts +++ b/api/src/osmosis/OsmosisLiveProcessor.ts @@ -113,7 +113,7 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { if (msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn' || msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut') { await this.handlePoolSwapEvent(msg, timestamp, txhash); } else if (msg['@type'] === '/osmosis.tokenfactory.v1beta1.MsgMint') { - // await this.handleMintEvent(msg, timestamp, txhash); + await this.handleMintEvent(msg, timestamp, txhash); } else if (msg['@type'] === '/osmosis.tokenfactory.v1beta1.MsgBurn') { await this.handleBurnEvent(msg, timestamp, txhash); } else if (msg['@type'] === '/cosmos.bank.v1beta1.MsgSend') { From e68c5b7510e885238d7d346528976ca49de5ae52 Mon Sep 17 00:00:00 2001 From: hemulin Date: Thu, 30 May 2024 23:34:23 +0100 Subject: [PATCH 06/10] Matched models between live and history data --- api/src/osmosis/OsmosisHistoricalProcessor.ts | 105 ++++++++++++------ 1 file changed, 69 insertions(+), 36 deletions(-) diff --git a/api/src/osmosis/OsmosisHistoricalProcessor.ts b/api/src/osmosis/OsmosisHistoricalProcessor.ts index b0a62393..6198ca2e 100644 --- a/api/src/osmosis/OsmosisHistoricalProcessor.ts +++ b/api/src/osmosis/OsmosisHistoricalProcessor.ts @@ -7,25 +7,34 @@ import { ConfigService } from "@nestjs/config"; import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { NumiaConfig } from "../config/config.interface.js"; -interface MintBurnEvent { +interface PoolSwapEvent { timestamp: number; - type: string; + sender: string; + side: string; amount: string; - address: string; + txhash: string; } -interface TransferEvent { +interface MintEvent { timestamp: number; - from: string; - to: string; amount: string; + mint_to_address: string; + txhash: string; } -interface PoolSwapEvent { +interface BurnEvent { timestamp: number; - sender: string; - side: string; amount: string; + burn_from_address: string; + txhash: string; +} + +interface TransferEvent { + timestamp: number; + from_address: string; + to_address: string; + amount: string; + txhash: string; } @Processor("osmosis-historical") @@ -38,6 +47,8 @@ export class OsmosisHistoricalProcessor extends WorkerHost { private readonly bigQueryClient: BigQuery; + private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'; + constructor( config: ConfigService, @@ -76,8 +87,6 @@ export class OsmosisHistoricalProcessor extends WorkerHost { public async triggerFetchHistoricalData() { await this.osmosisQueue.add("fetchHistoricalData", undefined); - // await this.fetchMintBurnEvents(); - // await this.fetchPoolSwapEvents(); } private async fetchHistoricalData() { @@ -110,14 +119,26 @@ export class OsmosisHistoricalProcessor extends WorkerHost { type === "mint" ? event.messages[0].mint_to_address : event.messages[0].burn_from_address; - - await this.insertMintBurnEvent({ timestamp, type, amount, address }); + const txhash = event.hash; if (type === "mint") { + const mintEvent: MintEvent = { + timestamp, + amount, + mint_to_address: address, + txhash + }; + await this.insertMintEvent(mintEvent); // Not a proper DFS, just one hop away from minter - await this.fetchTransfersForAddress( - event.messages[0].mint_to_address, - ); + await this.fetchTransfersForAddress(event.messages[0].mint_to_address); + } else { + const burnEvent: BurnEvent = { + timestamp, + amount, + burn_from_address: address, + txhash + }; + await this.insertBurnEvent(burnEvent); } } } @@ -141,14 +162,14 @@ export class OsmosisHistoricalProcessor extends WorkerHost { if (event.messageTypes.includes("/cosmos.bank.v1beta1.MsgSend")) { for (const message of event.messages) { if ( - message.amount[0].denom === - "factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA" + message.amount[0].denom === this.TOKEN_DENOM ) { - const transferEvent = { + const transferEvent: TransferEvent = { timestamp: new Date(event.blockTimestamp).getTime(), - from: message.from_address, - to: message.to_address, + from_address: message.from_address, + to_address: message.to_address, amount: message.amount.amount, + txhash: event.hash, }; await this.insertTransferEvent(transferEvent); } @@ -176,49 +197,60 @@ export class OsmosisHistoricalProcessor extends WorkerHost { const [rows] = await this.bigQueryClient.query(query); for (const row of rows) { - const side = - row.denom_in === - "factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA" - ? "buy" - : "sell"; - const swapEvent = { + const side = row.denom_in === this.TOKEN_DENOM ? "buy" : "sell"; + const swapEvent: PoolSwapEvent = { timestamp: new Date(row.ingestion_timestamp.value).getTime(), sender: row.sender, side, amount: side === "buy" ? row.parsed_amount_in : row.parsed_amount_out, + txhash: row.tx_id, }; await this.insertSwapEvent(swapEvent); } } - private async insertMintBurnEvent(event: MintBurnEvent) { - // console.log('Mint/Burn Event:', event); + private async insertMintEvent(event: MintEvent) { + console.log('Mint Event:', JSON.stringify(event, null, 2)); + await this.clickhouseService.client.insert({ + table: "mint_events", + values: { + timestamp: event.timestamp, + amount: event.amount, + mint_to_address: event.mint_to_address, + txhash: event.txhash, + }, + }); + } + + private async insertBurnEvent(event: BurnEvent) { + console.log('Burn Event:', JSON.stringify(event, null, 2)); await this.clickhouseService.client.insert({ - table: "mint_burn_events", + table: "burn_events", values: { timestamp: event.timestamp, - type: event.type, amount: event.amount, - address: event.address, + burn_from_address: event.burn_from_address, + txhash: event.txhash, }, }); } private async insertTransferEvent(event: TransferEvent) { - // console.log('Transfer Event:', event); + console.log('Transfer Event:', JSON.stringify(event, null, 2)); await this.clickhouseService.client.insert({ table: "transfer_events", values: { timestamp: event.timestamp, - from: event.from, - to: event.to, + from_address: event.from_address, + to_address: event.to_address, amount: event.amount, + txhash: event.txhash, }, }); } private async insertSwapEvent(event: PoolSwapEvent) { - // console.log('Pool Swap Event:', event); + console.log('Pool Swap Event:', JSON.stringify(event, null, 2)); await this.clickhouseService.client.insert({ table: "pool_swap_events", values: { @@ -226,6 +258,7 @@ export class OsmosisHistoricalProcessor extends WorkerHost { sender: event.sender, side: event.side, amount: event.amount, + txhash: event.txhash, }, }); } From 4e5acfa80bd3f4558bdc472d185ee74aad59b0d2 Mon Sep 17 00:00:00 2001 From: minaxolone Date: Fri, 31 May 2024 19:19:04 -0400 Subject: [PATCH 07/10] add migration files --- api/olfyiv5-tables.sql | 87 +++++++++++++++++++++++++++++++++++++++++- api/olfyiv6-tables.sql | 2 + api/tables.sql | 2 + 3 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 api/olfyiv6-tables.sql diff --git a/api/olfyiv5-tables.sql b/api/olfyiv5-tables.sql index 883f786a..34bd3c53 100644 --- a/api/olfyiv5-tables.sql +++ b/api/olfyiv5-tables.sql @@ -1,4 +1,6 @@ -CREATE TABLE "state" ( +CREATE DATABASE IF NOT EXISTS "olfyi_v5" ON CLUSTER "olfyi"; + +CREATE TABLE "state" ON CLUSTER "olfyi" ( "version" UInt64, "address" UInt128, "module_address" UInt128, @@ -6,5 +8,86 @@ CREATE TABLE "state" ( "module_name" String, "payload" String ) -ENGINE = MergeTree +ENGINE = ReplicatedMergeTree +ORDER BY "version"; + +CREATE TABLE "burn" ON CLUSTER "olfyi" ( + "version" UInt64, + "timestamp_usecs" UInt64, + "amount" UInt64, + "currency" String, + "preburn_address" UInt128 +) +ENGINE = ReplicatedMergeTree +ORDER BY "version"; + +CREATE TABLE "create_account" ON CLUSTER "olfyi" ( + "version" UInt64, + "timestamp_usecs" UInt64, + "role_id" UInt64, + "created_address" UInt128 +) +ENGINE = ReplicatedMergeTree +ORDER BY "version"; + +CREATE TABLE "mint" ON CLUSTER "olfyi" ( + "version" UInt64, + "timestamp_usecs" UInt64, + "amount" UInt64, + "currency" String +) +ENGINE = ReplicatedMergeTree +ORDER BY "version"; + +CREATE TABLE "new_block" ON CLUSTER "olfyi" ( + "version" UInt64, + "timestamp_usecs" UInt64, + "round" UInt64, + "proposer" UInt128, + "proposed_time" UInt64, + "gas_used" UInt64 +) +ENGINE = ReplicatedMergeTree +ORDER BY "version"; + +CREATE TABLE "received_payment" ON CLUSTER "olfyi" ( + "version" UInt64, + "timestamp_usecs" UInt64, + "amount" UInt64, + "currency" String, + "sender" UInt128, + "receiver" UInt128, + "metadata" String +) +ENGINE = ReplicatedMergeTree +ORDER BY "version"; + +CREATE TABLE "sent_payment" ON CLUSTER "olfyi" ( + "version" UInt64, + "timestamp_usecs" UInt64, + "amount" UInt64, + "currency" String, + "sender" UInt128, + "receiver" UInt128, + "metadata" String +) +ENGINE = ReplicatedMergeTree +ORDER BY "version"; + +CREATE TABLE "user_transaction" ON CLUSTER "olfyi" ( + "version" UInt64, + "timestamp_usecs" UInt64, + "sender" UInt128, + "sequence_number" UInt64, + "max_gas_amount" UInt64, + "gas_unit_price" UInt64, + "gas_currency" String, + "module_address" UInt128, + "module_name" String, + "function_name" String, + "arguments" Array(String), + "vm_status" String, + "gas_used" UInt64 +) +ENGINE = ReplicatedMergeTree ORDER BY "version"; \ No newline at end of file diff --git a/api/olfyiv6-tables.sql b/api/olfyiv6-tables.sql new file mode 100644 index 00000000..bf291c5b --- /dev/null +++ b/api/olfyiv6-tables.sql @@ -0,0 +1,2 @@ +CREATE DATABASE IF NOT EXISTS olfyi_v6 ON CLUSTER olfyi; + diff --git a/api/tables.sql b/api/tables.sql index 288339c2..ed6c5e99 100644 --- a/api/tables.sql +++ b/api/tables.sql @@ -1,3 +1,5 @@ +CREATE DATABASE IF NOT EXISTS "olfyi" ON CLUSTER "olfyi"; + CREATE TABLE "user_transaction" ON CLUSTER "olfyi" ( "version" UInt64, "hash" UInt256, From fdf6e47cfced6022f162948b5ee7089cb9de5a76 Mon Sep 17 00:00:00 2001 From: minaxolone Date: Mon, 3 Jun 2024 18:21:51 -0400 Subject: [PATCH 08/10] cleanup --- api/src/app/app.module.ts | 2 +- api/src/osmosis/OsmosisHistoricalProcessor.ts | 2 +- api/src/osmosis/OsmosisLiveProcessor.ts | 341 +++++++++++++----- api/src/osmosis/OsmosisModule.ts | 4 +- 4 files changed, 264 insertions(+), 85 deletions(-) diff --git a/api/src/app/app.module.ts b/api/src/app/app.module.ts index ddf5ec0a..cb46bd1e 100644 --- a/api/src/app/app.module.ts +++ b/api/src/app/app.module.ts @@ -16,9 +16,9 @@ import { WalletSubscriptionModule } from "../wallet-subscription/wallet-subscrip import { NatsModule } from "../nats/nats.module.js"; import { OlSwapModule } from "../ol-swap/OlSwapModule.js"; import { MultiSigModule } from "../multi-sig/multi-sig.module.js"; +import { OsmosisModule } from "../osmosis/OsmosisModule.js"; // Test purposes -import { OsmosisModule } from "../clickhouse/bridged-networks/osmosis/OsmosisModule.js"; @Module({ imports: [ diff --git a/api/src/osmosis/OsmosisHistoricalProcessor.ts b/api/src/osmosis/OsmosisHistoricalProcessor.ts index 6198ca2e..b05dfa8a 100644 --- a/api/src/osmosis/OsmosisHistoricalProcessor.ts +++ b/api/src/osmosis/OsmosisHistoricalProcessor.ts @@ -49,7 +49,7 @@ export class OsmosisHistoricalProcessor extends WorkerHost { private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'; - constructor( + public constructor( config: ConfigService, private readonly clickhouseService: ClickhouseService, diff --git a/api/src/osmosis/OsmosisLiveProcessor.ts b/api/src/osmosis/OsmosisLiveProcessor.ts index 36643446..20d7d15c 100644 --- a/api/src/osmosis/OsmosisLiveProcessor.ts +++ b/api/src/osmosis/OsmosisLiveProcessor.ts @@ -36,11 +36,84 @@ interface TransferEvent { txhash: string; } +interface MsgMint { + "@type": "/osmosis.tokenfactory.v1beta1.MsgMint"; + sender: string; + mintToAddress: string; + amount: { + denom: string; + amount: string; + }; +} + +interface MsgSwapExactAmountInRoute { + pool_id: string; + token_out_denom: string; +} + +interface MsgSwapExactAmountIn { + "@type": "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn"; + sender: "osmo1ng478yy6kay6dvjndn8cz2jty80xphxxfrexkk"; + routes: MsgSwapExactAmountInRoute[]; + token_in: { + denom: string; + amount: string; + }; + token_out_min_amount: string; +} + +interface MsgSwapExactAmountOutRoute { + pool_id: string; + token_in_denom: string; +} + +interface MsgSwapExactAmountOut { + "@type": "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut"; + sender: string; + routes: MsgSwapExactAmountOutRoute[]; + token_in_max_amount: string; + token_out: { + denom: string; + amount: string; + }; +} + +interface MsgBurn { + "@type": "/osmosis.tokenfactory.v1beta1.MsgBurn"; + sender: string; + amount: { + denom: string; + amount: string; + }; + burnFromAddress: string; +} + +interface MsgSend { + "@type": "/cosmos.bank.v1beta1.MsgSend"; + from_address: string; + to_address: string; + amount: { + denom: string; + amount: string; + }[]; +} + +type Msg = + | MsgMint + | MsgSwapExactAmountIn + | MsgSwapExactAmountOut + | MsgBurn + | MsgSend; + @Processor("osmosis-live") export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { - private readonly endpoint = 'https://lcd.osmosis.zone/cosmos/tx/v1beta1/txs'; - private readonly POOL_ID = '1721'; - private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'; + private static readonly endpoint = + "https://lcd.osmosis.zone/cosmos/tx/v1beta1/txs"; + + private static readonly POOL_ID = "1721"; + + private static readonly TOKEN_DENOM = + "factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA"; constructor( private readonly clickhouseService: ClickhouseService, @@ -71,37 +144,67 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { } public async onModuleInit() { - await this.osmosisQueue.add("fetchLiveData", undefined, { - repeat: { - every: 60 * 30 * 1_000, // 1 minute - }, - }); - await this.fetchAndStoreLiveData(); + // await this.osmosisQueue.add("fetchLiveData", undefined, { + // repeat: { + // every: 60 * 30 * 1_000, // 1 minute + // }, + // }); + // await this.fetchAndStoreLiveData(); } public async triggerFetchLiveData() { - // await this.osmosisQueue.add("fetchLiveData", undefined); + await this.osmosisQueue.add("fetchLiveData", undefined); // await this.fetchAndStoreLiveData(); } private async fetchAndStoreLiveData() { const events = [ - '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn', - '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut', - '/osmosis.tokenfactory.v1beta1.MsgMint', - '/osmosis.tokenfactory.v1beta1.MsgBurn', - '/cosmos.bank.v1beta1.MsgSend', + "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn", + "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut", + "/osmosis.tokenfactory.v1beta1.MsgMint", + "/osmosis.tokenfactory.v1beta1.MsgBurn", + "/cosmos.bank.v1beta1.MsgSend", ]; for (const event of events) { for (let page = 1; page <= 5; page++) { - const url = `${this.endpoint}?events=message.action='${event}'&limit=100&page=${page}`; + const url = `${OsmosisLiveProcessor.endpoint}?events=message.action='${event}'&limit=100&page=${page}`; console.log(url); try { - const response = await axios.get(url); + const response = await axios<{ + total: string; + pagination: null; + txs: { + body: { + messages: []; + memo: string; + timeout_height: string; + extension_options: []; + non_critical_extension_options: []; + }; + }[]; + tx_responses: { + height: string; + timestamp: string; + txhash: string; + + tx: { + "@type": "/cosmos.tx.v1beta1.Tx"; + body: { + messages: Msg[]; + }; + }; + }[]; + }>({ + url, + validateStatus: () => true, + }); + if (response.status !== 200) { - console.error(`Error fetching data from URL: ${url}, Status Code: ${response.status}`); + console.error( + `Error fetching data from URL: ${url}, Status Code: ${response.status}, Body = ${response.data}`, + ); continue; } @@ -110,82 +213,158 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { const timestamp = new Date(tx.timestamp).getTime(); const txhash = tx.txhash; for (const msg of tx.tx.body.messages) { - if (msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn' || msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut') { - await this.handlePoolSwapEvent(msg, timestamp, txhash); - } else if (msg['@type'] === '/osmosis.tokenfactory.v1beta1.MsgMint') { - await this.handleMintEvent(msg, timestamp, txhash); - } else if (msg['@type'] === '/osmosis.tokenfactory.v1beta1.MsgBurn') { - await this.handleBurnEvent(msg, timestamp, txhash); - } else if (msg['@type'] === '/cosmos.bank.v1beta1.MsgSend') { - await this.handleTransferEvent(msg, timestamp, txhash); + switch (msg["@type"]) { + case "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn": + case "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut": + await this.handlePoolSwapEvent(msg, timestamp, txhash); + break; + + case "/osmosis.tokenfactory.v1beta1.MsgMint": + await this.handleMintEvent(msg, timestamp, txhash); + break; + + case "/osmosis.tokenfactory.v1beta1.MsgBurn": + await this.handleBurnEvent(msg, timestamp, txhash); + break; + + case "/cosmos.bank.v1beta1.MsgSend": + await this.handleTransferEvent(msg, timestamp, txhash); + break; } } } } catch (error) { - console.error(`Error fetching data from URL: ${url}`, error.message || error); - continue; + console.error( + `Error fetching data from URL: ${url}`, + error.message || error, + ); } + // Delay the next request by 3 seconds - await new Promise(resolve => setTimeout(resolve, 3000)); + await new Promise((resolve) => setTimeout(resolve, 3000)); } } } - private async handlePoolSwapEvent(msg, timestamp: number, txhash: string) { - const { routes, token_in, token_out } = msg; - - // Check if pool_id 1721 exists in the routes - const relevantRoute = routes.find(route => route.pool_id === '1721'); - if (!relevantRoute) return; - - if (msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut') { - if (token_out.denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { - const side = 'buy'; - const amount = token_out.amount; - await this.insertSwapEvent({ timestamp, sender: msg.sender, side, amount, txhash }); - } else if (relevantRoute.token_in_denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { - const side = 'sell'; - const amount = msg.token_in_max_amount; - await this.insertSwapEvent({ timestamp, sender: msg.sender, side, amount, txhash }); - } - } else if (msg['@type'] === '/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn') { - if (token_in.denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { - const side = 'sell'; - const amount = token_in.amount; - await this.insertSwapEvent({ timestamp, sender: msg.sender, side, amount, txhash }); - } else if (relevantRoute.token_out_denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') { - const side = 'buy'; - const amount = msg.token_out_min_amount; - await this.insertSwapEvent({ timestamp, sender: msg.sender, side, amount, txhash }); - } + private async handlePoolSwapEvent( + msg: MsgSwapExactAmountIn | MsgSwapExactAmountOut, + timestamp: number, + txhash: string, + ) { + // Check if pool_id exists in the routes + const relevantRoute = msg.routes.find( + (route) => route.pool_id === OsmosisLiveProcessor.POOL_ID, + ); + if (!relevantRoute) { + return; + } + + switch (msg["@type"]) { + case "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut": + { + const { token_out } = msg; + if (token_out.denom === OsmosisLiveProcessor.TOKEN_DENOM) { + const side = "buy"; + const amount = token_out.amount; + await this.insertSwapEvent({ + timestamp, + sender: msg.sender, + side, + amount, + txhash, + }); + } else if ( + (relevantRoute as MsgSwapExactAmountOutRoute).token_in_denom === + OsmosisLiveProcessor.TOKEN_DENOM + ) { + const side = "sell"; + const amount = msg.token_in_max_amount; + await this.insertSwapEvent({ + timestamp, + sender: msg.sender, + side, + amount, + txhash, + }); + } + } + break; + + case "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn": + { + const { token_in } = msg; + if (token_in.denom === OsmosisLiveProcessor.TOKEN_DENOM) { + const side = "sell"; + const amount = token_in.amount; + await this.insertSwapEvent({ + timestamp, + sender: msg.sender, + side, + amount, + txhash, + }); + } else if ( + (relevantRoute as MsgSwapExactAmountInRoute).token_out_denom === + OsmosisLiveProcessor.TOKEN_DENOM + ) { + const side = "buy"; + const amount = msg.token_out_min_amount; + await this.insertSwapEvent({ + timestamp, + sender: msg.sender, + side, + amount, + txhash, + }); + } + } + break; } } - private async handleMintEvent(msg, timestamp: number, txhash: string) { - if (msg.amount.denom !== 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') return; - const mintEvent: MintEvent = { - timestamp, - amount: msg.amount.amount, - mint_to_address: msg.mintToAddress, - txhash - }; - await this.insertMintEvent(mintEvent); + private async handleMintEvent( + msg: MsgMint, + timestamp: number, + txhash: string, + ) { + if (msg.amount.denom === OsmosisLiveProcessor.TOKEN_DENOM) { + const mintEvent: MintEvent = { + timestamp, + amount: msg.amount.amount, + mint_to_address: msg.mintToAddress, + txhash, + }; + await this.insertMintEvent(mintEvent); + } } - private async handleBurnEvent(msg, timestamp: number, txhash: string) { - if (msg.amount.denom !== 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA') return; - const burnEvent: BurnEvent = { - timestamp, - amount: msg.amount.amount, - burn_from_address: msg.burnFromAddress, - txhash - }; - await this.insertBurnEvent(burnEvent); + private async handleBurnEvent( + msg: MsgBurn, + timestamp: number, + txhash: string, + ) { + if (msg.amount.denom === OsmosisLiveProcessor.TOKEN_DENOM) { + const burnEvent: BurnEvent = { + timestamp, + amount: msg.amount.amount, + burn_from_address: msg.burnFromAddress, + txhash, + }; + await this.insertBurnEvent(burnEvent); + } } - private async handleTransferEvent(msg, timestamp: number, txhash: string) { - const transferAmounts = msg.amount.filter(amount => amount.denom === 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'); - if (transferAmounts.length === 0) return; + private async handleTransferEvent( + msg: MsgSend, + timestamp: number, + txhash: string, + ) { + const transferAmounts = msg.amount.filter( + (amount) => amount.denom === OsmosisLiveProcessor.TOKEN_DENOM, + ); + if (transferAmounts.length === 0) { + return; + } for (const amount of transferAmounts) { const transferEvent: TransferEvent = { @@ -200,7 +379,7 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { } private async insertSwapEvent(event: PoolSwapEvent) { - console.log('Pool Swap Event:', JSON.stringify(event, null, 2)); + console.log("Pool Swap Event:", JSON.stringify(event, null, 2)); /* await this.clickhouseService.client.insert({ table: 'pool_swap_events', @@ -216,7 +395,7 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { } private async insertMintEvent(event: MintEvent) { - console.log('Mint Event:', JSON.stringify(event, null, 2)); + console.log("Mint Event:", JSON.stringify(event, null, 2)); /* await this.clickhouseService.client.insert({ table: 'mint_events', @@ -231,7 +410,7 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { } private async insertBurnEvent(event: BurnEvent) { - console.log('Burn Event:', JSON.stringify(event, null, 2)); + console.log("Burn Event:", JSON.stringify(event, null, 2)); /* await this.clickhouseService.client.insert({ table: 'burn_events', @@ -246,7 +425,7 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { } private async insertTransferEvent(event: TransferEvent) { - console.log('Transfer Event:', JSON.stringify(event, null, 2)); + console.log("Transfer Event:", JSON.stringify(event, null, 2)); /* await this.clickhouseService.client.insert({ table: 'transfer_events', diff --git a/api/src/osmosis/OsmosisModule.ts b/api/src/osmosis/OsmosisModule.ts index 1891f653..2780ab9e 100644 --- a/api/src/osmosis/OsmosisModule.ts +++ b/api/src/osmosis/OsmosisModule.ts @@ -12,10 +12,10 @@ const roles = process.env.ROLES!.split(","); const workers: Type[] = []; if (roles.includes("osmosis-live-processor")) { - // workers.push(OsmosisLiveProcessor); + workers.push(OsmosisLiveProcessor); } if (roles.includes("osmosis-historical-processor")) { - // workers.push(OsmosisHistoricalProcessor); + workers.push(OsmosisHistoricalProcessor); } @Module({ From 01f7440b69df4371595573ac1e007b04a7f2f359 Mon Sep 17 00:00:00 2001 From: minaxolone Date: Tue, 11 Jun 2024 22:40:45 +0100 Subject: [PATCH 09/10] store cosmos data in PG wip --- api/prisma/schema.prisma | 32 +- api/src/osmosis/OsmosisController.ts | 3 +- api/src/osmosis/OsmosisHistoricalProcessor.ts | 213 ++++---- api/src/osmosis/OsmosisLiveProcessor.ts | 466 ++++++------------ api/src/osmosis/OsmosisModule.ts | 10 +- api/src/osmosis/OsmosisRepository.ts | 115 +++++ api/src/osmosis/types.ts | 88 ++++ api/src/utils.ts | 4 + 8 files changed, 534 insertions(+), 397 deletions(-) create mode 100644 api/src/osmosis/OsmosisRepository.ts create mode 100644 api/src/osmosis/types.ts diff --git a/api/prisma/schema.prisma b/api/prisma/schema.prisma index 82d7cd23..2812c437 100644 --- a/api/prisma/schema.prisma +++ b/api/prisma/schema.prisma @@ -53,7 +53,7 @@ enum PendingTransactionStatus { } model PendingTransaction { - hash Bytes @id + hash Bytes @id sender Bytes sequenceNumber BigInt maxGasAmount BigInt @@ -67,5 +67,33 @@ model PendingTransaction { moduleName String args Bytes[] typeArgs String[] - status PendingTransactionStatus @default(UNKNOWN) + status PendingTransactionStatus @default(UNKNOWN) +} + +model OsmosisMintEvent { + txHash Bytes + index Int + amount BigInt + date DateTime + + @@id([txHash, index]) +} + +model OsmosisSwapEvent { + txHash Bytes + index Int + date DateTime + amount BigInt + + @@id([txHash, index]) +} + +model OsmosisBurnEvent { + txHash Bytes + index Int + date DateTime + amount BigInt + burnFromAddress String + + @@id([txHash, index]) } diff --git a/api/src/osmosis/OsmosisController.ts b/api/src/osmosis/OsmosisController.ts index 73fe7c88..24fc8240 100644 --- a/api/src/osmosis/OsmosisController.ts +++ b/api/src/osmosis/OsmosisController.ts @@ -17,7 +17,8 @@ export class OsmosisController { @Post("fetch-live") async triggerLiveFetch(): Promise { - await this.osmosisLiveProcessor.triggerFetchLiveData(); + // await this.osmosisLiveProcessor.triggerFetchLiveData(); + await this.osmosisLiveProcessor.fetchAndStoreLiveData(); return "Live data fetching triggered"; } } diff --git a/api/src/osmosis/OsmosisHistoricalProcessor.ts b/api/src/osmosis/OsmosisHistoricalProcessor.ts index b05dfa8a..e7f8aab9 100644 --- a/api/src/osmosis/OsmosisHistoricalProcessor.ts +++ b/api/src/osmosis/OsmosisHistoricalProcessor.ts @@ -1,5 +1,5 @@ import { Job, Queue } from "bullmq"; -import { BigQuery } from "@google-cloud/bigquery"; +// import { BigQuery } from "@google-cloud/bigquery"; import axios from "axios"; import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq"; import { ConfigService } from "@nestjs/config"; @@ -45,14 +45,14 @@ export class OsmosisHistoricalProcessor extends WorkerHost { private readonly numiaApiKey?: string; - private readonly bigQueryClient: BigQuery; + // private readonly bigQueryClient: BigQuery; private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'; public constructor( config: ConfigService, - private readonly clickhouseService: ClickhouseService, + // private readonly clickhouseService: ClickhouseService, @InjectQueue("osmosis-historical") private readonly osmosisQueue: Queue, @@ -61,13 +61,13 @@ export class OsmosisHistoricalProcessor extends WorkerHost { this.numiaApiKey = config.get("numia")?.apiKey; - // Path to service account key file - const keyFile = "./bigquery_service_account.json"; + // // Path to service account key file + // const keyFile = "./bigquery_service_account.json"; - // Create a BigQuery client - this.bigQueryClient = new BigQuery({ - keyFilename: keyFile, - }); + // // Create a BigQuery client + // this.bigQueryClient = new BigQuery({ + // keyFilename: keyFile, + // }); } public async process(job: Job): Promise { @@ -91,13 +91,14 @@ export class OsmosisHistoricalProcessor extends WorkerHost { private async fetchHistoricalData() { await this.fetchMintBurnEvents(); // And internally, one hop transfers of wLIBRA - await this.fetchPoolSwapEvents(); + // await this.fetchPoolSwapEvents(); } private async fetchMintBurnEvents() { for (let page = 1; ; ++page) { const url = `${this.endpoint}/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p?pageSize=${this.pageSize}&page=${page}`; - const response = await axios.get(url, { + const response = await axios({ + url, headers: { Authorization: `Bearer ${this.numiaApiKey}`, }, @@ -108,6 +109,9 @@ export class OsmosisHistoricalProcessor extends WorkerHost { break; } + const mintEvents: MintEvent[] = []; + const burnEvents: BurnEvent[] = []; + for (const event of events) { const timestamp = new Date(event.blockTimestamp).getTime(); const type = @@ -122,32 +126,53 @@ export class OsmosisHistoricalProcessor extends WorkerHost { const txhash = event.hash; if (type === "mint") { - const mintEvent: MintEvent = { + mintEvents.push({ timestamp, amount, mint_to_address: address, txhash - }; - await this.insertMintEvent(mintEvent); + }); + + // await this.insertMintEvent(mintEvent); // Not a proper DFS, just one hop away from minter await this.fetchTransfersForAddress(event.messages[0].mint_to_address); } else { - const burnEvent: BurnEvent = { + burnEvents.push({ timestamp, amount, burn_from_address: address, txhash - }; - await this.insertBurnEvent(burnEvent); + }); + // await this.insertBurnEvent(burnEvent); } } + + console.log('mintEvents', mintEvents); + console.log('burnEvents', burnEvents); + + break; } } private async fetchTransfersForAddress(address: string) { for (let page = 1; ; ++page) { const url = `${this.endpoint}/${address}?pageSize=${this.pageSize}&page=${page}`; - const response = await axios.get(url, { + const response = await axios< + { + hash: string; + blockTimestamp: string; + messageTypes: string; + messages: { + amount: { + denom: string; + amount: string; + }; + from_address: string; + to_address: string; + }[]; + }[] + >({ + url, headers: { Authorization: `Bearer ${this.numiaApiKey}`, }, @@ -158,108 +183,118 @@ export class OsmosisHistoricalProcessor extends WorkerHost { break; } + const transferEvents: TransferEvent[] = []; + for (const event of events) { if (event.messageTypes.includes("/cosmos.bank.v1beta1.MsgSend")) { for (const message of event.messages) { if ( - message.amount[0].denom === this.TOKEN_DENOM + message.amount.denom === this.TOKEN_DENOM ) { - const transferEvent: TransferEvent = { + transferEvents.push({ timestamp: new Date(event.blockTimestamp).getTime(), from_address: message.from_address, to_address: message.to_address, amount: message.amount.amount, txhash: event.hash, - }; - await this.insertTransferEvent(transferEvent); + }); + // await this.insertTransferEvent(transferEvent); } } } } - } - } - private async fetchPoolSwapEvents() { - const query = ` - SELECT - tx_id, - sender, - denom_in, - parsed_amount_in, - denom_out, - parsed_amount_out, - ingestion_timestamp - FROM \`numia-data.osmosis.osmosis_swaps\` - WHERE pool_id = '1721' - ORDER BY ingestion_timestamp ASC - `; - - const [rows] = await this.bigQueryClient.query(query); - - for (const row of rows) { - const side = row.denom_in === this.TOKEN_DENOM ? "buy" : "sell"; - const swapEvent: PoolSwapEvent = { - timestamp: new Date(row.ingestion_timestamp.value).getTime(), - sender: row.sender, - side, - amount: side === "buy" ? row.parsed_amount_in : row.parsed_amount_out, - txhash: row.tx_id, - }; - await this.insertSwapEvent(swapEvent); + console.log('transfers', transferEvents); + + break; } } + // private async fetchPoolSwapEvents() { + // const query = ` + // SELECT + // tx_id, + // sender, + // denom_in, + // parsed_amount_in, + // denom_out, + // parsed_amount_out, + // ingestion_timestamp + // FROM \`numia-data.osmosis.osmosis_swaps\` + // WHERE pool_id = '1721' + // ORDER BY ingestion_timestamp ASC + // `; + + // const [rows] = await this.bigQueryClient.query(query); + + // for (const row of rows) { + // const side = row.denom_in === this.TOKEN_DENOM ? "buy" : "sell"; + // const swapEvent: PoolSwapEvent = { + // timestamp: new Date(row.ingestion_timestamp.value).getTime(), + // sender: row.sender, + // side, + // amount: side === "buy" ? row.parsed_amount_in : row.parsed_amount_out, + // txhash: row.tx_id, + // }; + // await this.insertSwapEvent(swapEvent); + // } + // } + private async insertMintEvent(event: MintEvent) { console.log('Mint Event:', JSON.stringify(event, null, 2)); - await this.clickhouseService.client.insert({ - table: "mint_events", - values: { - timestamp: event.timestamp, - amount: event.amount, - mint_to_address: event.mint_to_address, - txhash: event.txhash, - }, - }); + + // await this.clickhouseService.client.insert({ + // table: "mint_events", + // values: { + // timestamp: event.timestamp, + // amount: event.amount, + // mint_to_address: event.mint_to_address, + // txhash: event.txhash, + // }, + // }); } private async insertBurnEvent(event: BurnEvent) { console.log('Burn Event:', JSON.stringify(event, null, 2)); - await this.clickhouseService.client.insert({ - table: "burn_events", - values: { - timestamp: event.timestamp, - amount: event.amount, - burn_from_address: event.burn_from_address, - txhash: event.txhash, - }, - }); + + // await this.clickhouseService.client.insert({ + // table: "burn_events", + // values: { + // timestamp: event.timestamp, + // amount: event.amount, + // burn_from_address: event.burn_from_address, + // txhash: event.txhash, + // }, + // }); } private async insertTransferEvent(event: TransferEvent) { console.log('Transfer Event:', JSON.stringify(event, null, 2)); - await this.clickhouseService.client.insert({ - table: "transfer_events", - values: { - timestamp: event.timestamp, - from_address: event.from_address, - to_address: event.to_address, - amount: event.amount, - txhash: event.txhash, - }, - }); + + // await this.clickhouseService.client.insert({ + // table: "transfer_events", + // values: { + // timestamp: event.timestamp, + // from_address: event.from_address, + // to_address: event.to_address, + // amount: event.amount, + // txhash: event.txhash, + // }, + // }); } private async insertSwapEvent(event: PoolSwapEvent) { console.log('Pool Swap Event:', JSON.stringify(event, null, 2)); - await this.clickhouseService.client.insert({ - table: "pool_swap_events", - values: { - timestamp: event.timestamp, - sender: event.sender, - side: event.side, - amount: event.amount, - txhash: event.txhash, - }, - }); + + // await this.clickhouseService.client.insert({ + // table: "pool_swap_events", + // values: { + // timestamp: event.timestamp, + // sender: event.sender, + // side: event.side, + // amount: event.amount, + // txhash: event.txhash, + // }, + // }); } } diff --git a/api/src/osmosis/OsmosisLiveProcessor.ts b/api/src/osmosis/OsmosisLiveProcessor.ts index 20d7d15c..7b1e3556 100644 --- a/api/src/osmosis/OsmosisLiveProcessor.ts +++ b/api/src/osmosis/OsmosisLiveProcessor.ts @@ -1,3 +1,4 @@ +import _ from 'lodash'; import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq"; import { OnModuleInit } from "@nestjs/common"; import { Job, Queue } from "bullmq"; @@ -5,123 +6,53 @@ import axios from "axios"; import Bluebird from "bluebird"; import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; - -interface PoolSwapEvent { - timestamp: number; - sender: string; - side: string; - amount: string; - txhash: string; -} - -interface MintEvent { - timestamp: number; - amount: string; - mint_to_address: string; - txhash: string; -} - -interface BurnEvent { - timestamp: number; - amount: string; - burn_from_address: string; - txhash: string; -} - -interface TransferEvent { - timestamp: number; - from_address: string; - to_address: string; - amount: string; - txhash: string; -} - -interface MsgMint { - "@type": "/osmosis.tokenfactory.v1beta1.MsgMint"; - sender: string; - mintToAddress: string; - amount: { - denom: string; - amount: string; - }; -} - -interface MsgSwapExactAmountInRoute { - pool_id: string; - token_out_denom: string; -} - -interface MsgSwapExactAmountIn { - "@type": "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn"; - sender: "osmo1ng478yy6kay6dvjndn8cz2jty80xphxxfrexkk"; - routes: MsgSwapExactAmountInRoute[]; - token_in: { - denom: string; - amount: string; - }; - token_out_min_amount: string; -} - -interface MsgSwapExactAmountOutRoute { - pool_id: string; - token_in_denom: string; -} - -interface MsgSwapExactAmountOut { - "@type": "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut"; - sender: string; - routes: MsgSwapExactAmountOutRoute[]; - token_in_max_amount: string; - token_out: { - denom: string; - amount: string; - }; -} - -interface MsgBurn { - "@type": "/osmosis.tokenfactory.v1beta1.MsgBurn"; - sender: string; - amount: { - denom: string; - amount: string; - }; - burnFromAddress: string; -} - -interface MsgSend { - "@type": "/cosmos.bank.v1beta1.MsgSend"; - from_address: string; - to_address: string; - amount: { - denom: string; - amount: string; - }[]; -} - -type Msg = - | MsgMint - | MsgSwapExactAmountIn - | MsgSwapExactAmountOut - | MsgBurn - | MsgSend; +import { ConfigService } from "@nestjs/config"; +import { NumiaConfig } from "../config/config.interface.js"; +import { PrismaService } from "../prisma/prisma.service.js"; +import { + BurnEvent, + MintEvent, + Msg, + MsgBurn, + MsgSend, + MsgSwapExactAmountIn, + MsgSwapExactAmountInRoute, + MsgSwapExactAmountOut, + MsgSwapExactAmountOutRoute, + PoolSwapEvent, + TransferEvent, +} from "./types.js"; +import { OsmosisRepository } from './OsmosisRepository.js'; @Processor("osmosis-live") export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { - private static readonly endpoint = - "https://lcd.osmosis.zone/cosmos/tx/v1beta1/txs"; + // private static readonly endpoint = + // "https://lcd.osmosis.zone/cosmos/tx/v1beta1/txs"; + + private static readonly endpoint = "https://osmosis.numia.xyz/v2/txs"; private static readonly POOL_ID = "1721"; private static readonly TOKEN_DENOM = "factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA"; + private readonly numiaApiKey?: string; + constructor( + config: ConfigService, + private readonly clickhouseService: ClickhouseService, + private readonly prisma: PrismaService, + @InjectQueue("osmosis-live") private readonly osmosisQueue: Queue, + + private readonly osmosisRepository: OsmosisRepository, ) { super(); + + this.numiaApiKey = config.get("numia")?.apiKey; } public async process(job: Job): Promise { @@ -153,103 +84,109 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { } public async triggerFetchLiveData() { - await this.osmosisQueue.add("fetchLiveData", undefined); + // await this.osmosisQueue.add("fetchLiveData", undefined); // await this.fetchAndStoreLiveData(); } - private async fetchAndStoreLiveData() { - const events = [ - "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn", - "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut", - "/osmosis.tokenfactory.v1beta1.MsgMint", - "/osmosis.tokenfactory.v1beta1.MsgBurn", - "/cosmos.bank.v1beta1.MsgSend", - ]; + public async fetchAndStoreLiveData() { + let offset = 0; - for (const event of events) { - for (let page = 1; page <= 5; page++) { - const url = `${OsmosisLiveProcessor.endpoint}?events=message.action='${event}'&limit=100&page=${page}`; - console.log(url); + while (true) { + const mintEvents: MintEvent[] = []; - try { - const response = await axios<{ - total: string; - pagination: null; - txs: { - body: { - messages: []; - memo: string; - timeout_height: string; - extension_options: []; - non_critical_extension_options: []; - }; - }[]; - tx_responses: { - height: string; - timestamp: string; - txhash: string; - - tx: { - "@type": "/cosmos.tx.v1beta1.Tx"; - body: { - messages: Msg[]; - }; - }; - }[]; - }>({ - url, - validateStatus: () => true, - }); - - if (response.status !== 200) { - console.error( - `Error fetching data from URL: ${url}, Status Code: ${response.status}, Body = ${response.data}`, - ); - continue; - } + const url = `${OsmosisLiveProcessor.endpoint}/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p?offset=${offset}`; + console.log(url); - const transactions = response.data.tx_responses; - for (const tx of transactions) { - const timestamp = new Date(tx.timestamp).getTime(); - const txhash = tx.txhash; - for (const msg of tx.tx.body.messages) { - switch (msg["@type"]) { - case "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn": - case "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut": - await this.handlePoolSwapEvent(msg, timestamp, txhash); - break; - - case "/osmosis.tokenfactory.v1beta1.MsgMint": - await this.handleMintEvent(msg, timestamp, txhash); - break; - - case "/osmosis.tokenfactory.v1beta1.MsgBurn": - await this.handleBurnEvent(msg, timestamp, txhash); - break; - - case "/cosmos.bank.v1beta1.MsgSend": - await this.handleTransferEvent(msg, timestamp, txhash); - break; + const response = await axios< + { + _id: string; + hash: string; + blockTimestamp: string; + index: number; + height: number; + addressIndex: string[]; + messageTypes: string[]; + messages: Msg[]; + }[] + >({ + url, + headers: { + Authorization: `Bearer ${this.numiaApiKey}`, + }, + }); + + const transactions = response.data; + if (!transactions.length) { + break; + } + + for (const tx of transactions) { + const txHash = Buffer.from(tx.hash, 'hex'); + + const date = new Date(tx.blockTimestamp); + // const timestamp = date.getTime(); + for (const [index, msg] of tx.messages.entries()) { + switch (msg["@type"]) { + case "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn": + case "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut": + console.log("/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn/out"); + console.dir(tx, { depth: 10 }); + + // await this.handlePoolSwapEvent( + // msg, + // timestamp, + // tx.hash, + // index, + // ); + break; + + case "/osmosis.tokenfactory.v1beta1.MsgMint": + if (msg.amount.denom === OsmosisLiveProcessor.TOKEN_DENOM) { + mintEvents.push({ + date, + amount: BigInt(msg.amount.amount), + mint_to_address: msg.mint_to_address, + txHash, + index, + }); } - } + break; + + case "/osmosis.tokenfactory.v1beta1.MsgBurn": + await this.handleBurnEvent(msg, date, txHash, index); + break; + + case "/cosmos.bank.v1beta1.MsgSend": + console.log("/cosmos.bank.v1beta1.MsgSend"); + console.dir(tx, { depth: 10 }); + + // await this.handleTransferEvent( + // msg, + // timestamp, + // tx.hash, + // index, + // ); + break; } - } catch (error) { - console.error( - `Error fetching data from URL: ${url}`, - error.message || error, - ); } - - // Delay the next request by 3 seconds - await new Promise((resolve) => setTimeout(resolve, 3000)); } + + await this.osmosisRepository.insertMintEvents(mintEvents); + + // Delay the next request by 3 seconds + await new Promise((resolve) => setTimeout(resolve, 3000)); + + offset += transactions.length; + + break; } } private async handlePoolSwapEvent( msg: MsgSwapExactAmountIn | MsgSwapExactAmountOut, - timestamp: number, - txhash: string, + date: Date, + txHash: Uint8Array, + index: number, ) { // Check if pool_id exists in the routes const relevantRoute = msg.routes.find( @@ -264,27 +201,25 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { { const { token_out } = msg; if (token_out.denom === OsmosisLiveProcessor.TOKEN_DENOM) { - const side = "buy"; - const amount = token_out.amount; - await this.insertSwapEvent({ - timestamp, + await this.osmosisRepository.insertSwapEvent({ + date, sender: msg.sender, - side, - amount, - txhash, + side: "buy", + amount: BigInt(token_out.amount), + txHash, + index, }); } else if ( (relevantRoute as MsgSwapExactAmountOutRoute).token_in_denom === OsmosisLiveProcessor.TOKEN_DENOM ) { - const side = "sell"; - const amount = msg.token_in_max_amount; - await this.insertSwapEvent({ - timestamp, + await this.osmosisRepository.insertSwapEvent({ + date, sender: msg.sender, - side, - amount, - txhash, + side: "sell", + amount: BigInt(msg.token_in_max_amount), + txHash, + index, }); } } @@ -294,27 +229,25 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { { const { token_in } = msg; if (token_in.denom === OsmosisLiveProcessor.TOKEN_DENOM) { - const side = "sell"; - const amount = token_in.amount; - await this.insertSwapEvent({ - timestamp, + await this.osmosisRepository.insertSwapEvent({ + date, sender: msg.sender, - side, - amount, - txhash, + side: "sell", + amount: BigInt(token_in.amount), + txHash, + index, }); } else if ( (relevantRoute as MsgSwapExactAmountInRoute).token_out_denom === OsmosisLiveProcessor.TOKEN_DENOM ) { - const side = "buy"; - const amount = msg.token_out_min_amount; - await this.insertSwapEvent({ - timestamp, + await this.osmosisRepository.insertSwapEvent({ + date, sender: msg.sender, - side, - amount, - txhash, + side: "buy", + amount: BigInt(msg.token_out_min_amount), + txHash, + index, }); } } @@ -322,42 +255,28 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { } } - private async handleMintEvent( - msg: MsgMint, - timestamp: number, - txhash: string, - ) { - if (msg.amount.denom === OsmosisLiveProcessor.TOKEN_DENOM) { - const mintEvent: MintEvent = { - timestamp, - amount: msg.amount.amount, - mint_to_address: msg.mintToAddress, - txhash, - }; - await this.insertMintEvent(mintEvent); - } - } - private async handleBurnEvent( msg: MsgBurn, - timestamp: number, - txhash: string, + date: Date, + txHash: Uint8Array, + index: number, ) { if (msg.amount.denom === OsmosisLiveProcessor.TOKEN_DENOM) { - const burnEvent: BurnEvent = { - timestamp, - amount: msg.amount.amount, - burn_from_address: msg.burnFromAddress, - txhash, - }; - await this.insertBurnEvent(burnEvent); + await this.osmosisRepository.insertBurnEvent({ + date, + amount: BigInt(msg.amount.amount), + burn_from_address: msg.burn_from_address, + txHash, + index, + }); } } private async handleTransferEvent( msg: MsgSend, - timestamp: number, - txhash: string, + date: Date, + txHash: Uint8Array, + index: number, ) { const transferAmounts = msg.amount.filter( (amount) => amount.denom === OsmosisLiveProcessor.TOKEN_DENOM, @@ -367,76 +286,15 @@ export class OsmosisLiveProcessor extends WorkerHost implements OnModuleInit { } for (const amount of transferAmounts) { - const transferEvent: TransferEvent = { - timestamp, + await this.osmosisRepository.insertTransferEvent({ + date, from_address: msg.from_address, to_address: msg.to_address, amount: amount.amount, - txhash, - }; - await this.insertTransferEvent(transferEvent); + txHash, + index, + }); } } - private async insertSwapEvent(event: PoolSwapEvent) { - console.log("Pool Swap Event:", JSON.stringify(event, null, 2)); - /* - await this.clickhouseService.client.insert({ - table: 'pool_swap_events', - values: { - timestamp: event.timestamp, - sender: event.sender, - side: event.side, - amount: event.amount, - txhash: event.txhash, - } - }); - */ - } - - private async insertMintEvent(event: MintEvent) { - console.log("Mint Event:", JSON.stringify(event, null, 2)); - /* - await this.clickhouseService.client.insert({ - table: 'mint_events', - values: { - timestamp: event.timestamp, - amount: event.amount, - mint_to_address: event.mint_to_address, - txhash: event.txhash, - } - }); - */ - } - - private async insertBurnEvent(event: BurnEvent) { - console.log("Burn Event:", JSON.stringify(event, null, 2)); - /* - await this.clickhouseService.client.insert({ - table: 'burn_events', - values: { - timestamp: event.timestamp, - amount: event.amount, - burn_from_address: event.burn_from_address, - txhash: event.txhash, - } - }); - */ - } - - private async insertTransferEvent(event: TransferEvent) { - console.log("Transfer Event:", JSON.stringify(event, null, 2)); - /* - await this.clickhouseService.client.insert({ - table: 'transfer_events', - values: { - timestamp: event.timestamp, - from_address: event.from_address, - to_address: event.to_address, - amount: event.amount, - txhash: event.txhash, - } - }); - */ - } } diff --git a/api/src/osmosis/OsmosisModule.ts b/api/src/osmosis/OsmosisModule.ts index 2780ab9e..fd0b8eb1 100644 --- a/api/src/osmosis/OsmosisModule.ts +++ b/api/src/osmosis/OsmosisModule.ts @@ -7,6 +7,8 @@ import { OsmosisHistoricalProcessor } from "./OsmosisHistoricalProcessor.js"; import { OsmosisController } from "./OsmosisController.js"; import { ClickhouseModule } from "../clickhouse/clickhouse.module.js"; import { redisClient } from "../redis/redis.service.js"; +import { PrismaModule } from "../prisma/prisma.module.js"; +import { OsmosisRepository } from "./OsmosisRepository.js"; const roles = process.env.ROLES!.split(","); @@ -21,6 +23,7 @@ if (roles.includes("osmosis-historical-processor")) { @Module({ imports: [ ClickhouseModule, + PrismaModule, BullModule.registerQueue({ name: "osmosis-live", @@ -32,6 +35,11 @@ if (roles.includes("osmosis-historical-processor")) { }), ], controllers: [OsmosisController], - providers: [OsmosisLiveProcessor, OsmosisHistoricalProcessor, ...workers], + providers: [ + OsmosisLiveProcessor, + OsmosisHistoricalProcessor, + OsmosisRepository, + ...workers, + ], }) export class OsmosisModule {} diff --git a/api/src/osmosis/OsmosisRepository.ts b/api/src/osmosis/OsmosisRepository.ts new file mode 100644 index 00000000..5ef02669 --- /dev/null +++ b/api/src/osmosis/OsmosisRepository.ts @@ -0,0 +1,115 @@ +import _ from "lodash"; +import { Injectable } from "@nestjs/common"; +import { PrismaService } from "../prisma/prisma.service.js"; +import { BurnEvent, MintEvent, PoolSwapEvent, TransferEvent } from "./types.js"; +import { toBytea } from "../utils.js"; + +@Injectable() +export class OsmosisRepository { + + public constructor( + private readonly prisma: PrismaService, + ) { + + } + + public async insertBurnEvent(event: BurnEvent) { + await this.prisma.$queryRawUnsafe( + ` + INSERT INTO "OsmosisBurnEvent" ( + "txHash", "index", "date", + "amount", "burnFromAddress" + ) + VALUES ( + $1::bytea, $2, $3::timestamp, + $4::bigint, $5 + ) + ON CONFLICT + DO NOTHING + `, + toBytea(event.txHash), + event.index, + event.date, + event.amount, + event.burn_from_address, + ); + } + + public async insertSwapEvent(event: PoolSwapEvent) { + console.log("Pool Swap Event:", JSON.stringify(event, null, 2)); + /* + await this.clickhouseService.client.insert({ + table: 'pool_swap_events', + values: { + timestamp: event.timestamp, + sender: event.sender, + side: event.side, + amount: event.amount, + txhash: event.txhash, + } + }); + */ + } + + public async insertMintEvent(event: MintEvent) { + console.log("Mint Event:", JSON.stringify(event, null, 2)); + /* + await this.clickhouseService.client.insert({ + table: 'mint_events', + values: { + timestamp: event.timestamp, + amount: event.amount, + mint_to_address: event.mint_to_address, + txhash: event.txhash, + } + }); + */ + } + + + public async insertTransferEvent(event: TransferEvent) { + console.log("Transfer Event:", JSON.stringify(event, null, 2)); + /* + await this.clickhouseService.client.insert({ + table: 'transfer_events', + values: { + timestamp: event.timestamp, + from_address: event.from_address, + to_address: event.to_address, + amount: event.amount, + txhash: event.txhash, + } + }); + */ + } + + public async insertMintEvents(mintEvents: MintEvent[]) { + if (!mintEvents.length) { + return; + } + + const placeholders = mintEvents.map( + (_, i) => + `($${1 + i * 4}::bytea, $${2 + i * 4}, $${3 + i * 4}::bigint, $${4 + i * 4}::timestamp)`, + ); + + const params = _.flatten( + mintEvents.map((mintEvent) => [ + toBytea(mintEvent.txHash), + mintEvent.index, + mintEvent.amount.toString(10), + mintEvent.date.toISOString(), + ]), + ); + + const query = ` + INSERT INTO "OsmosisMintEvent" + ("txHash", "index", "amount", "date") + VALUES ${placeholders.join(",")} + ON CONFLICT + DO NOTHING + `; + + await this.prisma.$queryRawUnsafe(query, ...params); + } +} \ No newline at end of file diff --git a/api/src/osmosis/types.ts b/api/src/osmosis/types.ts new file mode 100644 index 00000000..36204878 --- /dev/null +++ b/api/src/osmosis/types.ts @@ -0,0 +1,88 @@ +export interface AEvent { + txHash: Uint8Array; + index: number; + date: Date; +} + +export interface Amount { + denom: string; + amount: string; +} + +export type PoolSwapEvent = AEvent & { + sender: string; + side: string; + amount: bigint; +}; + +export type MintEvent = AEvent & { + amount: bigint; + mint_to_address: string; + txHash: Uint8Array; +}; + +export type BurnEvent = AEvent & { + date: Date; + amount: bigint; + burn_from_address: string; +}; + +export type TransferEvent = AEvent & { + from_address: string; + to_address: string; + amount: string; +}; + +export interface MsgMint { + "@type": "/osmosis.tokenfactory.v1beta1.MsgMint"; + sender: string; + mint_to_address: string; + amount: Amount; +} + +export interface MsgSwapExactAmountInRoute { + pool_id: string; + token_out_denom: string; +} + +export interface MsgSwapExactAmountIn { + "@type": "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountIn"; + sender: string; + routes: MsgSwapExactAmountInRoute[]; + token_in: Amount; + token_out_min_amount: string; +} + +export interface MsgSwapExactAmountOutRoute { + pool_id: string; + token_in_denom: string; +} + +export interface MsgSwapExactAmountOut { + "@type": "/osmosis.poolmanager.v1beta1.MsgSwapExactAmountOut"; + sender: string; + routes: MsgSwapExactAmountOutRoute[]; + token_in_max_amount: string; + token_out: Amount; +} + +export interface MsgBurn { + "@type": "/osmosis.tokenfactory.v1beta1.MsgBurn"; + sender: string; + amount: Amount; + burn_from_address: string; +} + +export interface MsgSend { + "@type": "/cosmos.bank.v1beta1.MsgSend"; + from_address: string; + to_address: string; + amount: Amount[]; +} + +export type Msg = + | MsgMint + | MsgSwapExactAmountIn + | MsgSwapExactAmountOut + | MsgBurn + | MsgSend; diff --git a/api/src/utils.ts b/api/src/utils.ts index 71897a6b..1846fd8a 100644 --- a/api/src/utils.ts +++ b/api/src/utils.ts @@ -112,3 +112,7 @@ export function getTransactionHash( ); throw new Error("unsupported transaction payload type"); } + +export function toBytea(buff: Uint8Array): string { + return `\\x${Buffer.from(buff).toString("hex")}`; +} \ No newline at end of file From a6c69fe6a4c2ab1e6e5ae2641e723778871a56fa Mon Sep 17 00:00:00 2001 From: Hemulin <> Date: Sun, 14 Jul 2024 23:39:10 +0300 Subject: [PATCH 10/10] Adding Base support. Live fetcher. WIP --- api/package-lock.json | 56 ++--- api/package.json | 2 +- api/prisma/schema.prisma | 4 +- api/src/app/app.module.ts | 5 +- api/src/base/BaseController.ts | 23 ++ api/src/base/BaseHistoricalProcessor.ts | 265 ++++++++++++++++++++++++ api/src/base/BaseLiveProcessor.ts | 192 +++++++++++++++++ api/src/base/BaseModule.ts | 38 ++++ api/src/osmosis/OsmosisController.ts | 7 +- 9 files changed, 557 insertions(+), 35 deletions(-) create mode 100644 api/src/base/BaseController.ts create mode 100644 api/src/base/BaseHistoricalProcessor.ts create mode 100644 api/src/base/BaseLiveProcessor.ts create mode 100644 api/src/base/BaseModule.ts diff --git a/api/package-lock.json b/api/package-lock.json index ee3be249..fb864670 100644 --- a/api/package-lock.json +++ b/api/package-lock.json @@ -61,7 +61,7 @@ "eslint-plugin-prettier": "^5.1.3", "jest": "^29.7.0", "prettier": "^3.2.5", - "prisma": "^5.14.0", + "prisma": "^5.16.2", "source-map-support": "^0.5.21", "supertest": "^7.0.0", "ts-jest": "^29.1.3", @@ -3758,48 +3758,48 @@ } }, "node_modules/@prisma/debug": { - "version": "5.14.0", - "resolved": "https://registry.npmjs.org/@prisma/debug/-/debug-5.14.0.tgz", - "integrity": "sha512-iq56qBZuFfX3fCxoxT8gBX33lQzomBU0qIUaEj1RebsKVz1ob/BVH1XSBwwwvRVtZEV1b7Fxx2eVu34Ge/mg3w==", + "version": "5.16.2", + "resolved": "https://registry.npmjs.org/@prisma/debug/-/debug-5.16.2.tgz", + "integrity": "sha512-ItzB4nR4O8eLzuJiuP3WwUJfoIvewMHqpGCad+64gvThcKEVOtaUza9AEJo2DPqAOa/AWkFyK54oM4WwHeew+A==", "devOptional": true }, "node_modules/@prisma/engines": { - "version": "5.14.0", - "resolved": "https://registry.npmjs.org/@prisma/engines/-/engines-5.14.0.tgz", - "integrity": "sha512-lgxkKZ6IEygVcw6IZZUlPIfLQ9hjSYAtHjZ5r64sCLDgVzsPFCi2XBBJgzPMkOQ5RHzUD4E/dVdpn9+ez8tk1A==", + "version": "5.16.2", + "resolved": "https://registry.npmjs.org/@prisma/engines/-/engines-5.16.2.tgz", + "integrity": "sha512-qUxwMtrwoG3byd4PbX6T7EjHJ8AUhzTuwniOGkh/hIznBfcE2QQnGakyEq4VnwNuttMqvh/GgPFapHQ3lCuRHg==", "devOptional": true, "hasInstallScript": true, "dependencies": { - "@prisma/debug": "5.14.0", - "@prisma/engines-version": "5.14.0-25.e9771e62de70f79a5e1c604a2d7c8e2a0a874b48", - "@prisma/fetch-engine": "5.14.0", - "@prisma/get-platform": "5.14.0" + "@prisma/debug": "5.16.2", + "@prisma/engines-version": "5.16.0-24.34ace0eb2704183d2c05b60b52fba5c43c13f303", + "@prisma/fetch-engine": "5.16.2", + "@prisma/get-platform": "5.16.2" } }, "node_modules/@prisma/engines-version": { - "version": "5.14.0-25.e9771e62de70f79a5e1c604a2d7c8e2a0a874b48", - "resolved": "https://registry.npmjs.org/@prisma/engines-version/-/engines-version-5.14.0-25.e9771e62de70f79a5e1c604a2d7c8e2a0a874b48.tgz", - "integrity": "sha512-ip6pNkRo1UxWv+6toxNcYvItNYaqQjXdFNGJ+Nuk2eYtRoEdoF13wxo7/jsClJFFenMPVNVqXQDV0oveXnR1cA==", + "version": "5.16.0-24.34ace0eb2704183d2c05b60b52fba5c43c13f303", + "resolved": "https://registry.npmjs.org/@prisma/engines-version/-/engines-version-5.16.0-24.34ace0eb2704183d2c05b60b52fba5c43c13f303.tgz", + "integrity": "sha512-HkT2WbfmFZ9WUPyuJHhkiADxazHg8Y4gByrTSVeb3OikP6tjQ7txtSUGu9OBOBH0C13dPKN2qqH12xKtHu/Hiw==", "devOptional": true }, "node_modules/@prisma/fetch-engine": { - "version": "5.14.0", - "resolved": "https://registry.npmjs.org/@prisma/fetch-engine/-/fetch-engine-5.14.0.tgz", - "integrity": "sha512-VrheA9y9DMURK5vu8OJoOgQpxOhas3qF0IBHJ8G/0X44k82kc8E0w98HCn2nhnbOOMwbWsJWXfLC2/F8n5u0gQ==", + "version": "5.16.2", + "resolved": "https://registry.npmjs.org/@prisma/fetch-engine/-/fetch-engine-5.16.2.tgz", + "integrity": "sha512-sq51lfHKfH2jjYSjBtMjP+AznFqOJzXpqmq6B9auWrlTJrMgZ7lPyhWUW7VU7LsQU48/TJ+DZeIz8s9bMYvcHg==", "devOptional": true, "dependencies": { - "@prisma/debug": "5.14.0", - "@prisma/engines-version": "5.14.0-25.e9771e62de70f79a5e1c604a2d7c8e2a0a874b48", - "@prisma/get-platform": "5.14.0" + "@prisma/debug": "5.16.2", + "@prisma/engines-version": "5.16.0-24.34ace0eb2704183d2c05b60b52fba5c43c13f303", + "@prisma/get-platform": "5.16.2" } }, "node_modules/@prisma/get-platform": { - "version": "5.14.0", - "resolved": "https://registry.npmjs.org/@prisma/get-platform/-/get-platform-5.14.0.tgz", - "integrity": "sha512-/yAyBvcEjRv41ynZrhdrPtHgk47xLRRq/o5eWGcUpBJ1YrUZTYB8EoPiopnP7iQrMATK8stXQdPOoVlrzuTQZw==", + "version": "5.16.2", + "resolved": "https://registry.npmjs.org/@prisma/get-platform/-/get-platform-5.16.2.tgz", + "integrity": "sha512-cXiHPgNLNyj22vLouPVNegklpRL/iX2jxTeap5GRO3DmCoVyIHmJAV1CgUMUJhHlcol9yYy7EHvsnXTDJ/PKEA==", "devOptional": true, "dependencies": { - "@prisma/debug": "5.14.0" + "@prisma/debug": "5.16.2" } }, "node_modules/@protobufjs/aspromise": { @@ -10800,13 +10800,13 @@ } }, "node_modules/prisma": { - "version": "5.14.0", - "resolved": "https://registry.npmjs.org/prisma/-/prisma-5.14.0.tgz", - "integrity": "sha512-gCNZco7y5XtjrnQYeDJTiVZmT/ncqCr5RY1/Cf8X2wgLRmyh9ayPAGBNziI4qEE4S6SxCH5omQLVo9lmURaJ/Q==", + "version": "5.16.2", + "resolved": "https://registry.npmjs.org/prisma/-/prisma-5.16.2.tgz", + "integrity": "sha512-rFV/xoBR2hBGGlu4LPLQd4U8WVA+tSAmYyFWGPRVfj+xg7N4kiZV4lSk38htSpF+/IuHKzlrbh4SFk8Z18cI8A==", "devOptional": true, "hasInstallScript": true, "dependencies": { - "@prisma/engines": "5.14.0" + "@prisma/engines": "5.16.2" }, "bin": { "prisma": "build/index.js" diff --git a/api/package.json b/api/package.json index 71f3b16f..71b58c5d 100644 --- a/api/package.json +++ b/api/package.json @@ -76,7 +76,7 @@ "eslint-plugin-prettier": "^5.1.3", "jest": "^29.7.0", "prettier": "^3.2.5", - "prisma": "^5.14.0", + "prisma": "^5.16.2", "source-map-support": "^0.5.21", "supertest": "^7.0.0", "ts-jest": "^29.1.3", diff --git a/api/prisma/schema.prisma b/api/prisma/schema.prisma index 2812c437..39c38709 100644 --- a/api/prisma/schema.prisma +++ b/api/prisma/schema.prisma @@ -1,7 +1,9 @@ generator client { - provider = "prisma-client-js" + provider = "prisma-client-js" + binaryTargets = ["native", "debian-openssl-3.0.x"] } + datasource db { provider = "postgresql" url = env("DATABASE_URL") diff --git a/api/src/app/app.module.ts b/api/src/app/app.module.ts index cb46bd1e..9742c967 100644 --- a/api/src/app/app.module.ts +++ b/api/src/app/app.module.ts @@ -19,6 +19,8 @@ import { MultiSigModule } from "../multi-sig/multi-sig.module.js"; import { OsmosisModule } from "../osmosis/OsmosisModule.js"; // Test purposes +// import { OsmosisModule } from "../clickhouse/bridged-networks/osmosis/OsmosisModule.js"; +import { BaseModule } from "../base/BaseModule.js"; @Module({ imports: [ @@ -50,7 +52,8 @@ import { OsmosisModule } from "../osmosis/OsmosisModule.js"; OlSwapModule, NodeWatcherModule, MultiSigModule, - OsmosisModule, + // OsmosisModule, + BaseModule, ], controllers: [], providers: [AppService], diff --git a/api/src/base/BaseController.ts b/api/src/base/BaseController.ts new file mode 100644 index 00000000..c272571b --- /dev/null +++ b/api/src/base/BaseController.ts @@ -0,0 +1,23 @@ +import { Controller, Post } from "@nestjs/common"; +import { BaseHistoricalProcessor } from "./BaseHistoricalProcessor.js"; +import { BaseLiveProcessor } from "./BaseLiveProcessor.js"; + +@Controller("base") +export class BaseController { + constructor( + private readonly baseHistoricalProcessor: BaseHistoricalProcessor, + private readonly baseLiveProcessor: BaseLiveProcessor, + ) {} + + @Post("fetch-historical") + async triggerHistoricalFetch(): Promise { + await this.baseHistoricalProcessor.triggerFetchHistoricalData(); + return "Base Historical data fetching triggered"; + } + + @Post("fetch-live") + async triggerLiveFetch(): Promise { + await this.baseLiveProcessor.triggerFetchLiveData(); + return "Base Live data fetching triggered"; + } +} diff --git a/api/src/base/BaseHistoricalProcessor.ts b/api/src/base/BaseHistoricalProcessor.ts new file mode 100644 index 00000000..f2b0de1b --- /dev/null +++ b/api/src/base/BaseHistoricalProcessor.ts @@ -0,0 +1,265 @@ +import { Job, Queue } from "bullmq"; +import { BigQuery } from "@google-cloud/bigquery"; +import axios from "axios"; +import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq"; +import { ConfigService } from "@nestjs/config"; + +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; +import { NumiaConfig } from "../config/config.interface.js"; + +interface PoolSwapEvent { + timestamp: number; + sender: string; + side: string; + amount: string; + txhash: string; +} + +interface MintEvent { + timestamp: number; + amount: string; + mint_to_address: string; + txhash: string; +} + +interface BurnEvent { + timestamp: number; + amount: string; + burn_from_address: string; + txhash: string; +} + +interface TransferEvent { + timestamp: number; + from_address: string; + to_address: string; + amount: string; + txhash: string; +} + +@Processor("base-historical") +export class BaseHistoricalProcessor extends WorkerHost { + private readonly endpoint = "https://base.numia.xyz/v2/txs"; + + private readonly pageSize = 100; + + private readonly numiaApiKey?: string; + + private readonly bigQueryClient: BigQuery; + + private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'; + + constructor( + config: ConfigService, + + private readonly clickhouseService: ClickhouseService, + + @InjectQueue("base-historical") + private readonly baseQueue: Queue, + ) { + super(); + + this.numiaApiKey = config.get("numia")?.apiKey; + + // Path to service account key file + const keyFile = "./bigquery_service_account.json"; + + // Create a BigQuery client + this.bigQueryClient = new BigQuery({ + keyFilename: keyFile, + }); + } + + public async process(job: Job): Promise { + switch (job.name) { + case "fetchHistoricalData": + try { + await this.fetchHistoricalData(); + } catch (error) { + // fail silently to avoid accumulating failed repeating jobs + } + break; + + default: + throw new Error(`invalid job name ${job.name}`); + } + } + + public async triggerFetchHistoricalData() { + await this.baseQueue.add("fetchHistoricalData", undefined); + } + + private async fetchHistoricalData() { + await this.fetchMintBurnEvents(); // And internally, one hop transfers of wLIBRA + await this.fetchPoolSwapEvents(); + } + + private async fetchMintBurnEvents() { + for (let page = 1; ; ++page) { + const url = `${this.endpoint}/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p?pageSize=${this.pageSize}&page=${page}`; + const response = await axios.get(url, { + headers: { + Authorization: `Bearer ${this.numiaApiKey}`, + }, + }); + const events = response.data; + + if (events.length === 0) { + break; + } + + for (const event of events) { + const timestamp = new Date(event.blockTimestamp).getTime(); + const type = + event.messageTypes[0] === "/base.tokenfactory.v1beta1.MsgMint" + ? "mint" + : "burn"; + const amount = event.messages[0].amount.amount; + const address = + type === "mint" + ? event.messages[0].mint_to_address + : event.messages[0].burn_from_address; + const txhash = event.hash; + + if (type === "mint") { + const mintEvent: MintEvent = { + timestamp, + amount, + mint_to_address: address, + txhash + }; + await this.insertMintEvent(mintEvent); + // Not a proper DFS, just one hop away from minter + await this.fetchTransfersForAddress(event.messages[0].mint_to_address); + } else { + const burnEvent: BurnEvent = { + timestamp, + amount, + burn_from_address: address, + txhash + }; + await this.insertBurnEvent(burnEvent); + } + } + } + } + + private async fetchTransfersForAddress(address: string) { + for (let page = 1; ; ++page) { + const url = `${this.endpoint}/${address}?pageSize=${this.pageSize}&page=${page}`; + const response = await axios.get(url, { + headers: { + Authorization: `Bearer ${this.numiaApiKey}`, + }, + }); + const events = response.data; + + if (events.length === 0) { + break; + } + + for (const event of events) { + if (event.messageTypes.includes("/cosmos.bank.v1beta1.MsgSend")) { + for (const message of event.messages) { + if ( + message.amount[0].denom === this.TOKEN_DENOM + ) { + const transferEvent: TransferEvent = { + timestamp: new Date(event.blockTimestamp).getTime(), + from_address: message.from_address, + to_address: message.to_address, + amount: message.amount.amount, + txhash: event.hash, + }; + await this.insertTransferEvent(transferEvent); + } + } + } + } + } + } + + private async fetchPoolSwapEvents() { + const query = ` + SELECT + tx_id, + sender, + denom_in, + parsed_amount_in, + denom_out, + parsed_amount_out, + ingestion_timestamp + FROM \`numia-data.base.base_swaps\` + WHERE pool_id = '1721' + ORDER BY ingestion_timestamp ASC + `; + + const [rows] = await this.bigQueryClient.query(query); + + for (const row of rows) { + const side = row.denom_in === this.TOKEN_DENOM ? "buy" : "sell"; + const swapEvent: PoolSwapEvent = { + timestamp: new Date(row.ingestion_timestamp.value).getTime(), + sender: row.sender, + side, + amount: side === "buy" ? row.parsed_amount_in : row.parsed_amount_out, + txhash: row.tx_id, + }; + await this.insertSwapEvent(swapEvent); + } + } + + private async insertMintEvent(event: MintEvent) { + console.log('Mint Event:', JSON.stringify(event, null, 2)); + await this.clickhouseService.client.insert({ + table: "mint_events", + values: { + timestamp: event.timestamp, + amount: event.amount, + mint_to_address: event.mint_to_address, + txhash: event.txhash, + }, + }); + } + + private async insertBurnEvent(event: BurnEvent) { + console.log('Burn Event:', JSON.stringify(event, null, 2)); + await this.clickhouseService.client.insert({ + table: "burn_events", + values: { + timestamp: event.timestamp, + amount: event.amount, + burn_from_address: event.burn_from_address, + txhash: event.txhash, + }, + }); + } + + private async insertTransferEvent(event: TransferEvent) { + console.log('Transfer Event:', JSON.stringify(event, null, 2)); + await this.clickhouseService.client.insert({ + table: "transfer_events", + values: { + timestamp: event.timestamp, + from_address: event.from_address, + to_address: event.to_address, + amount: event.amount, + txhash: event.txhash, + }, + }); + } + + private async insertSwapEvent(event: PoolSwapEvent) { + console.log('Pool Swap Event:', JSON.stringify(event, null, 2)); + await this.clickhouseService.client.insert({ + table: "pool_swap_events", + values: { + timestamp: event.timestamp, + sender: event.sender, + side: event.side, + amount: event.amount, + txhash: event.txhash, + }, + }); + } +} diff --git a/api/src/base/BaseLiveProcessor.ts b/api/src/base/BaseLiveProcessor.ts new file mode 100644 index 00000000..34594c16 --- /dev/null +++ b/api/src/base/BaseLiveProcessor.ts @@ -0,0 +1,192 @@ +import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq"; +import { OnModuleInit } from "@nestjs/common"; +import { Job, Queue } from "bullmq"; +import axios from "axios"; +import Bluebird from "bluebird"; + +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; + +interface PoolSwapEvent { + timestamp: number; + sender: string; + side: string; + amount: string; + txhash: string; +} + +interface MintEvent { + timestamp: number; + amount: string; + mint_to_address: string; + txhash: string; +} + +interface BurnEvent { + timestamp: number; + amount: string; + burn_from_address: string; + txhash: string; +} + +interface TransferEvent { + timestamp: number; + from_address: string; + to_address: string; + amount: string; + txhash: string; +} + +@Processor("base-live") +export class BaseLiveProcessor extends WorkerHost implements OnModuleInit { + private readonly endpoint = 'https://base.blockscout.com/api/v2/tokens/0xc78b628b060258300218740b1a7a5b3c82b3bd9f/transfers'; + + constructor( + private readonly clickhouseService: ClickhouseService, + + @InjectQueue("base-live") + private readonly baseQueue: Queue, + ) { + super(); + } + + public async process(job: Job): Promise { + switch (job.name) { + case "fetchLiveData": + try { + await Promise.race([ + this.fetchAndStoreLiveData(), + // 30m timeout to avoid blocking the queue + Bluebird.delay(60 * 30 * 1_000), + ]); + } catch (error) { + // fail silently to avoid accumulating failed repeating jobs + } + break; + + default: + throw new Error(`invalid job name ${job.name}`); + } + } + + public async onModuleInit() { + // await this.baseQueue.add("fetchLiveData", undefined, { + // repeat: { + // every: 60 * 30 * 1_000, // 1 minute + // }, + // }); + // await this.fetchAndStoreLiveData(); + } + + public async triggerFetchLiveData() { + // await this.baseQueue.add("fetchLiveData", undefined); + await this.fetchAndStoreLiveData(); + } + + private async fetchAndStoreLiveData() { + try { + const response = await axios.get(this.endpoint); + if (response.status !== 200) { + console.error(`Error fetching data from URL: ${this.endpoint}, Status Code: ${response.status}`); + return; + } + + const events = response.data.items; + for (const event of events) { + const timestamp = new Date(event.timestamp).getTime(); + const txhash = event.tx_hash; + const amount = event.total.value; + + switch (event.type) { + case "token_minting": + await this.handleMintEvent(event, timestamp, txhash, amount); + break; + case "token_burning": + await this.handleBurnEvent(event, timestamp, txhash, amount); + break; + case "token_transfer": + await this.handleTransferEvent(event, timestamp, txhash, amount); + break; + } + } + } catch (error) { + console.error(`Error fetching data from URL: ${this.endpoint}`, error.message || error); + } + } + + private async handleMintEvent(event, timestamp: number, txhash: string, amount: string) { + const mintEvent: MintEvent = { + timestamp, + amount, + mint_to_address: event.to.hash, + txhash + }; + await this.insertMintEvent(mintEvent); + } + + private async handleBurnEvent(event, timestamp: number, txhash: string, amount: string) { + const burnEvent: BurnEvent = { + timestamp, + amount, + burn_from_address: event.from.hash, + txhash + }; + await this.insertBurnEvent(burnEvent); + } + + private async handleTransferEvent(event, timestamp: number, txhash: string, amount: string) { + const transferEvent: TransferEvent = { + timestamp, + from_address: event.from.hash, + to_address: event.to.hash, + amount, + txhash, + }; + await this.insertTransferEvent(transferEvent); + } + + private async insertMintEvent(event: MintEvent) { + console.log('Mint Event:', JSON.stringify(event, null, 2)); + /* + await this.clickhouseService.client.insert({ + table: 'mint_events', + values: { + timestamp: event.timestamp, + amount: event.amount, + mint_to_address: event.mint_to_address, + txhash: event.txhash, + } + }); + */ + } + + private async insertBurnEvent(event: BurnEvent) { + console.log('Burn Event:', JSON.stringify(event, null, 2)); + /* + await this.clickhouseService.client.insert({ + table: 'burn_events', + values: { + timestamp: event.timestamp, + amount: event.amount, + burn_from_address: event.burn_from_address, + txhash: event.txhash, + } + }); + */ + } + + private async insertTransferEvent(event: TransferEvent) { + console.log('Transfer Event:', JSON.stringify(event, null, 2)); + /* + await this.clickhouseService.client.insert({ + table: 'transfer_events', + values: { + timestamp: event.timestamp, + from_address: event.from_address, + to_address: event.to_address, + amount: event.amount, + txhash: event.txhash, + } + }); + */ + } +} diff --git a/api/src/base/BaseModule.ts b/api/src/base/BaseModule.ts new file mode 100644 index 00000000..f3f01188 --- /dev/null +++ b/api/src/base/BaseModule.ts @@ -0,0 +1,38 @@ +import process from "node:process"; +import { BullModule } from "@nestjs/bullmq"; +import { Module, Type } from "@nestjs/common"; + +import { BaseLiveProcessor } from "./BaseLiveProcessor.js"; +import { BaseHistoricalProcessor } from "./BaseHistoricalProcessor.js"; +import { BaseController } from "./BaseController.js"; +import { ClickhouseModule } from "../clickhouse/clickhouse.module.js"; +import { redisClient } from "../redis/redis.service.js"; + +const roles = process.env.ROLES!.split(","); + +const workers: Type[] = []; +if (roles.includes("base-live-processor")) { + // workers.push(BaseLiveProcessor); +} +if (roles.includes("base-historical-processor")) { + // workers.push(BaseHistoricalProcessor); +} + +@Module({ + imports: [ + ClickhouseModule, + + BullModule.registerQueue({ + name: "base-live", + connection: redisClient, + }), + BullModule.registerQueue({ + name: "base-historical", + connection: redisClient, + }), + ], + controllers: [BaseController], + providers: [BaseLiveProcessor, BaseHistoricalProcessor, ...workers], + // providers: [BaseLiveProcessor, ...workers], +}) +export class BaseModule {} diff --git a/api/src/osmosis/OsmosisController.ts b/api/src/osmosis/OsmosisController.ts index 24fc8240..c4287088 100644 --- a/api/src/osmosis/OsmosisController.ts +++ b/api/src/osmosis/OsmosisController.ts @@ -12,13 +12,12 @@ export class OsmosisController { @Post("fetch-historical") async triggerHistoricalFetch(): Promise { await this.osmosisHistoricalProcessor.triggerFetchHistoricalData(); - return "Historical data fetching triggered"; + return "Osmosis Historical data fetching triggered"; } @Post("fetch-live") async triggerLiveFetch(): Promise { - // await this.osmosisLiveProcessor.triggerFetchLiveData(); - await this.osmosisLiveProcessor.fetchAndStoreLiveData(); - return "Live data fetching triggered"; + await this.osmosisLiveProcessor.triggerFetchLiveData(); + return "Osmosis Live data fetching triggered"; } }