From dd0fcaeefe6c0a9839bc746b014e42ff4d7ae2a5 Mon Sep 17 00:00:00 2001 From: OTLegend Date: Mon, 22 Dec 2025 08:41:30 +0100 Subject: [PATCH 1/2] fix: split publish/finality handling with phased flow and timeouts - add phased DKG adapter and use publish+mint phases, enqueue finality jobs - introduce job defaults for publish/finality timeouts (3m publish, 15m finality) - add mint_submitted status migration and update setup to include it --- .../workflows/publisher-phased.spec.ts | 281 ++++++ .../workflows/publisher-plugin.spec.ts | 4 +- packages/plugin-dkg-publisher/setup.js | 2 +- .../migrations/0002_ambitious_vance_astro.sql | 1 + .../migrations/meta/0002_snapshot.json | 867 ++++++++++++++++++ .../database/migrations/meta/_journal.json | 9 +- .../src/database/schema.ts | 1 + .../src/services/AssetService.ts | 5 + .../src/services/DkgClientAdapter.ts | 89 ++ .../src/services/DkgService.ts | 13 + .../src/services/PublishingService.ts | 76 +- .../src/services/QueuePoller.ts | 2 +- .../src/services/QueueService.ts | 468 ++++++---- .../src/services/index.ts | 3 + packages/plugin-dkg-publisher/src/types.ts | 1 + 15 files changed, 1604 insertions(+), 218 deletions(-) create mode 100644 apps/agent/tests/integration/workflows/publisher-phased.spec.ts rename packages/plugin-dkg-publisher/tests/dkg-publisher.spec.ts => apps/agent/tests/integration/workflows/publisher-plugin.spec.ts (99%) create mode 100644 packages/plugin-dkg-publisher/src/database/migrations/0002_ambitious_vance_astro.sql create mode 100644 packages/plugin-dkg-publisher/src/database/migrations/meta/0002_snapshot.json create mode 100644 packages/plugin-dkg-publisher/src/services/DkgClientAdapter.ts diff --git a/apps/agent/tests/integration/workflows/publisher-phased.spec.ts b/apps/agent/tests/integration/workflows/publisher-phased.spec.ts new file mode 100644 index 0000000..ca78335 --- /dev/null +++ b/apps/agent/tests/integration/workflows/publisher-phased.spec.ts @@ -0,0 +1,281 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { describe, it, beforeEach, afterEach } from "mocha"; +import { expect } from "chai"; +import { PublishingService } from "../../../../../packages/plugin-dkg-publisher/src/services/PublishingService"; +import { DkgService } from "../../../../../packages/plugin-dkg-publisher/src/services/DkgService"; +import { QueueService } from "../../../../../packages/plugin-dkg-publisher/src/services/QueueService"; + +// Minimal fake DB that mimics the chained drizzle calls used in PublishingService +class FakeDb { + public assetRow: any; + public updates: any[]; + + constructor(assetRow: any) { + this.assetRow = assetRow; + this.updates = []; + } + + select() { + return { + from: () => ({ + where: () => ({ + limit: async () => [this.assetRow], + }), + }), + }; + } + + update() { + const self = this; + return { + set(values: any) { + return { + where: async () => { + self.updates.push(values); + return [{ affectedRows: 1 }]; + }, + }; + }, + }; + } +} + +// Fake phased client to avoid hitting real dkg.js +class FakePhasedClient { + public publishCalled: boolean; + public mintCalled: boolean; + public ual: string; + public txHash: string; + + constructor(ual: string, txHash: string) { + this.ual = ual; + this.txHash = txHash; + this.publishCalled = false; + this.mintCalled = false; + } + + async publishPhase() { + this.publishCalled = true; + return { + readyForMint: true, + publishOperationId: "op-123", + }; + } + + async mintPhase() { + this.mintCalled = true; + return { + UAL: this.ual, + mintKnowledgeCollectionReceipt: { transactionHash: this.txHash }, + }; + } +} + +describe("PublishingService phased publish", () => { + const wallet = { + id: 1, + address: "0xabc", + privateKey: "0xkey", + blockchain: "chain", + }; + const assetRow = { + id: 42, + contentUrl: "http://example.com/test.json", + epochs: 2, + replications: 1, + privacy: "private", + status: "queued", + }; + + let fakeDb: FakeDb; + let fakeClient: FakePhasedClient; + let publishingService: PublishingService; + let originalFetch: any; + + beforeEach(() => { + fakeDb = new FakeDb(assetRow); + fakeClient = new FakePhasedClient("did:dkg:ual123", "0xtxhash"); + + // Stub DkgService to return our fake phased client + const dkgServiceStub = { + createWalletPhasedClient: () => fakeClient, + } as unknown as DkgService; + + publishingService = new PublishingService(fakeDb as any, dkgServiceStub); + + // Stub fetch used for loading content + originalFetch = (global as any).fetch; + (global as any).fetch = async () => ({ + ok: true, + json: async () => ({ foo: "bar" }), + }); + }); + + afterEach(() => { + (global as any).fetch = originalFetch as any; + }); + + it("runs publish+mint phases and sets status mint_submitted", async () => { + const result = await publishingService.publishAsset(assetRow.id, wallet); + + expect(result.success).to.equal(true); + expect(result.ual).to.equal("did:dkg:ual123"); + expect(fakeClient.publishCalled).to.equal(true); + expect(fakeClient.mintCalled).to.equal(true); + + // Last DB update should set status to mint_submitted and store UAL/tx + const lastUpdate = fakeDb.updates[fakeDb.updates.length - 1]; + expect(lastUpdate.status).to.equal("mint_submitted"); + expect(lastUpdate.ual).to.equal("did:dkg:ual123"); + expect(lastUpdate.transactionHash).to.equal("0xtxhash"); + }); +}); + +describe("QueueService finality-check handler", () => { + it("marks asset published when finality is reached", async () => { + // Fake asset/DB/wallet/DKG services + const assetService = { + getAsset: async () => ({ id: 7, ual: "did:dkg:ual123" }), + updateAssetStatus: async (_id: number, status: string) => { + expect(status).to.equal("published"); + }, + }; + const walletService = { + getWalletForQueries: async () => ({ + id: 1, + address: "0xabc", + privateKey: "0xkey", + }), + }; + const dkgService = { + createWalletPhasedClient: () => ({ + finalityPhase: async () => ({ + finality: { status: "FINALIZED" }, + numberOfConfirmations: 3, + requiredConfirmations: 1, + }), + }), + } as unknown as DkgService; + + const queueService = Object.create(QueueService.prototype) as any; + queueService.assetService = assetService; + queueService.walletService = walletService; + queueService.dkgService = dkgService; + + const job = { + id: "job-1", + data: { assetId: 7, ual: "did:dkg:ual123" }, + updateProgress: async () => {}, + } as any; + + const res = await queueService.processFinalityJob(job, 0); + expect(res.success).to.equal(true); + expect(res.assetId).to.equal(7); + }); + + it("bubbles error when finality is not reached (timeout/lag)", async () => { + let updateCalled = false; + + const assetService = { + getAsset: async () => ({ id: 8, ual: "did:dkg:ual999" }), + updateAssetStatus: async () => { + updateCalled = true; + }, + }; + const walletService = { + getWalletForQueries: async () => ({ + id: 2, + address: "0xdef", + privateKey: "0xkey2", + }), + }; + const dkgService = { + createWalletPhasedClient: () => ({ + finalityPhase: async () => ({ + finality: { status: "NOT FINALIZED" }, + numberOfConfirmations: 0, + requiredConfirmations: 3, + }), + }), + } as unknown as DkgService; + + const queueService = Object.create(QueueService.prototype) as any; + queueService.assetService = assetService; + queueService.walletService = walletService; + queueService.dkgService = dkgService; + + const job = { + id: "job-2", + data: { assetId: 8, ual: "did:dkg:ual999" }, + updateProgress: async () => {}, + } as any; + + let caught = false; + try { + await queueService.processFinalityJob(job, 0); + } catch (error: any) { + caught = true; + expect(error.message).to.include("Finality not reached"); + } + + expect(caught).to.equal(true); + expect(updateCalled).to.equal(false); + }); + + it("enqueues a finality-check job after successful publish+mint", async () => { + let finalityCalled = false; + let finalityArgs: any = null; + + const queueService = Object.create(QueueService.prototype) as any; + queueService.assetService = { + claimAssetForProcessing: async () => true, + createPublishingAttempt: async () => 99, + updatePublishingAttempt: async () => {}, + handleAssetFailure: async () => {}, + }; + queueService.walletService = { + assignWalletToAsset: async () => ({ + id: 1, + address: "0xabc", + privateKey: "0xkey", + }), + releaseWallet: async () => {}, + }; + queueService.publishingService = { + publishAsset: async () => ({ + success: true, + ual: "did:dkg:ual123", + transactionHash: "0xtxhash", + }), + }; + queueService.enqueueFinalityJob = async ( + assetId: number, + priority: number, + ual: string, + transactionHash: string, + ) => { + finalityCalled = true; + finalityArgs = { assetId, priority, ual, transactionHash }; + }; + + const job = { + id: "job-publish", + data: { assetId: 123 }, + opts: { priority: 50 }, + timestamp: Date.now(), + updateProgress: async () => {}, + } as any; + + const res = await queueService.processPublishJob(job, 0); + + expect(res.success).to.equal(true); + expect(finalityCalled).to.equal(true); + expect(finalityArgs).to.deep.equal({ + assetId: 123, + priority: 50, + ual: "did:dkg:ual123", + transactionHash: "0xtxhash", + }); + }); +}); diff --git a/packages/plugin-dkg-publisher/tests/dkg-publisher.spec.ts b/apps/agent/tests/integration/workflows/publisher-plugin.spec.ts similarity index 99% rename from packages/plugin-dkg-publisher/tests/dkg-publisher.spec.ts rename to apps/agent/tests/integration/workflows/publisher-plugin.spec.ts index ec20da4..49b5099 100644 --- a/packages/plugin-dkg-publisher/tests/dkg-publisher.spec.ts +++ b/apps/agent/tests/integration/workflows/publisher-plugin.spec.ts @@ -2,7 +2,7 @@ import { describe, it, beforeEach, afterEach } from "mocha"; import { expect } from "chai"; -import dkgPublisherPlugin from "../dist/index.mjs"; +import dkgPublisherPlugin from "../../../../../packages/plugin-dkg-publisher/src/index"; import express from "express"; import request from "supertest"; import { @@ -362,4 +362,4 @@ describe("@dkg/plugin-dkg-publisher checks", () => { } }); }); -}); \ No newline at end of file +}); diff --git a/packages/plugin-dkg-publisher/setup.js b/packages/plugin-dkg-publisher/setup.js index 2f0c874..42064b2 100644 --- a/packages/plugin-dkg-publisher/setup.js +++ b/packages/plugin-dkg-publisher/setup.js @@ -812,7 +812,7 @@ volumes: max_attempts INTEGER DEFAULT 3, -- Status and attempts - status ENUM('pending', 'queued', 'assigned', 'publishing', 'published', 'failed') NOT NULL DEFAULT 'pending', + status ENUM('pending', 'queued', 'assigned', 'publishing', 'mint_submitted', 'published', 'failed') NOT NULL DEFAULT 'pending', status_message TEXT, attempt_count INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0, diff --git a/packages/plugin-dkg-publisher/src/database/migrations/0002_ambitious_vance_astro.sql b/packages/plugin-dkg-publisher/src/database/migrations/0002_ambitious_vance_astro.sql new file mode 100644 index 0000000..bddba24 --- /dev/null +++ b/packages/plugin-dkg-publisher/src/database/migrations/0002_ambitious_vance_astro.sql @@ -0,0 +1 @@ +ALTER TABLE `assets` MODIFY COLUMN `status` enum('pending','queued','assigned','publishing','mint_submitted','published','failed') NOT NULL DEFAULT 'pending'; \ No newline at end of file diff --git a/packages/plugin-dkg-publisher/src/database/migrations/meta/0002_snapshot.json b/packages/plugin-dkg-publisher/src/database/migrations/meta/0002_snapshot.json new file mode 100644 index 0000000..2bbb594 --- /dev/null +++ b/packages/plugin-dkg-publisher/src/database/migrations/meta/0002_snapshot.json @@ -0,0 +1,867 @@ +{ + "version": "5", + "dialect": "mysql", + "id": "1bb0f690-93d9-46d7-95a4-d76abd04ec1d", + "prevId": "f0a69756-52d0-4f78-bfe8-7c3cafc00cce", + "tables": { + "assets": { + "name": "assets", + "columns": { + "id": { + "name": "id", + "type": "serial", + "primaryKey": false, + "notNull": true, + "autoincrement": true + }, + "wallet_id": { + "name": "wallet_id", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "batch_id": { + "name": "batch_id", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "content_url": { + "name": "content_url", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "content_size": { + "name": "content_size", + "type": "bigint", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "source": { + "name": "source", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "source_id": { + "name": "source_id", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "priority": { + "name": "priority", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 50 + }, + "privacy": { + "name": "privacy", + "type": "enum('private','public')", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "'private'" + }, + "epochs": { + "name": "epochs", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 2 + }, + "replications": { + "name": "replications", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 1 + }, + "max_attempts": { + "name": "max_attempts", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 3 + }, + "status": { + "name": "status", + "type": "enum('pending','queued','assigned','publishing','mint_submitted','published','failed')", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'pending'" + }, + "status_message": { + "name": "status_message", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "attempt_count": { + "name": "attempt_count", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "retry_count": { + "name": "retry_count", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "next_retry_at": { + "name": "next_retry_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "last_error": { + "name": "last_error", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "ual": { + "name": "ual", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "transaction_hash": { + "name": "transaction_hash", + "type": "varchar(66)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "blockchain": { + "name": "blockchain", + "type": "varchar(50)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(now())" + }, + "queued_at": { + "name": "queued_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "assigned_at": { + "name": "assigned_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "publishing_started_at": { + "name": "publishing_started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "published_at": { + "name": "published_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "onUpdate": true, + "default": "(now())" + } + }, + "indexes": { + "idx_status": { + "name": "idx_status", + "columns": [ + "status" + ], + "isUnique": false + }, + "idx_retry": { + "name": "idx_retry", + "columns": [ + "status", + "next_retry_at" + ], + "isUnique": false + }, + "idx_source": { + "name": "idx_source", + "columns": [ + "source", + "source_id" + ], + "isUnique": false + }, + "idx_pending": { + "name": "idx_pending", + "columns": [ + "status", + "created_at" + ], + "isUnique": false + }, + "idx_batch": { + "name": "idx_batch", + "columns": [ + "batch_id" + ], + "isUnique": false + } + }, + "foreignKeys": { + "assets_wallet_id_wallets_id_fk": { + "name": "assets_wallet_id_wallets_id_fk", + "tableFrom": "assets", + "tableTo": "wallets", + "columnsFrom": [ + "wallet_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "set null", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "assets_id": { + "name": "assets_id", + "columns": [ + "id" + ] + } + }, + "uniqueConstraints": { + "assets_ual_unique": { + "name": "assets_ual_unique", + "columns": [ + "ual" + ] + } + } + }, + "batches": { + "name": "batches", + "columns": { + "id": { + "name": "id", + "type": "serial", + "primaryKey": false, + "notNull": true, + "autoincrement": true + }, + "batch_name": { + "name": "batch_name", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "source": { + "name": "source", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "total_assets": { + "name": "total_assets", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "pending_count": { + "name": "pending_count", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "processing_count": { + "name": "processing_count", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "published_count": { + "name": "published_count", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "failed_count": { + "name": "failed_count", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(now())" + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "idx_batch_status": { + "name": "idx_batch_status", + "columns": [ + "created_at", + "completed_at" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": { + "batches_id": { + "name": "batches_id", + "columns": [ + "id" + ] + } + }, + "uniqueConstraints": {} + }, + "metrics_hourly": { + "name": "metrics_hourly", + "columns": { + "hour_timestamp": { + "name": "hour_timestamp", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "assets_registered": { + "name": "assets_registered", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "assets_published": { + "name": "assets_published", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "assets_failed": { + "name": "assets_failed", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "avg_publish_duration_seconds": { + "name": "avg_publish_duration_seconds", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "total_gas_used": { + "name": "total_gas_used", + "type": "bigint", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "unique_wallets_used": { + "name": "unique_wallets_used", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "idx_metrics_hour": { + "name": "idx_metrics_hour", + "columns": [ + "hour_timestamp" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": { + "metrics_hourly_hour_timestamp": { + "name": "metrics_hourly_hour_timestamp", + "columns": [ + "hour_timestamp" + ] + } + }, + "uniqueConstraints": {} + }, + "publishing_attempts": { + "name": "publishing_attempts", + "columns": { + "id": { + "name": "id", + "type": "serial", + "primaryKey": false, + "notNull": true, + "autoincrement": true + }, + "asset_id": { + "name": "asset_id", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "attempt_number": { + "name": "attempt_number", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "worker_id": { + "name": "worker_id", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "wallet_address": { + "name": "wallet_address", + "type": "varchar(42)", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "wallet_id": { + "name": "wallet_id", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "otnode_url": { + "name": "otnode_url", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "blockchain": { + "name": "blockchain", + "type": "varchar(50)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "transaction_hash": { + "name": "transaction_hash", + "type": "varchar(66)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "gas_used": { + "name": "gas_used", + "type": "bigint", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "status": { + "name": "status", + "type": "enum('started','success','failed','timeout')", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "ual": { + "name": "ual", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error_type": { + "name": "error_type", + "type": "varchar(50)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error_message": { + "name": "error_message", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "duration_seconds": { + "name": "duration_seconds", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(now())" + } + }, + "indexes": { + "idx_asset_attempts": { + "name": "idx_asset_attempts", + "columns": [ + "asset_id", + "attempt_number" + ], + "isUnique": false + }, + "idx_wallet_usage": { + "name": "idx_wallet_usage", + "columns": [ + "wallet_address", + "started_at" + ], + "isUnique": false + } + }, + "foreignKeys": { + "publishing_attempts_asset_id_assets_id_fk": { + "name": "publishing_attempts_asset_id_assets_id_fk", + "tableFrom": "publishing_attempts", + "tableTo": "assets", + "columnsFrom": [ + "asset_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "publishing_attempts_wallet_id_wallets_id_fk": { + "name": "publishing_attempts_wallet_id_wallets_id_fk", + "tableFrom": "publishing_attempts", + "tableTo": "wallets", + "columnsFrom": [ + "wallet_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "publishing_attempts_id": { + "name": "publishing_attempts_id", + "columns": [ + "id" + ] + } + }, + "uniqueConstraints": {} + }, + "wallet_metrics": { + "name": "wallet_metrics", + "columns": { + "wallet_id": { + "name": "wallet_id", + "type": "int", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "date": { + "name": "date", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "total_publishes": { + "name": "total_publishes", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "successful_publishes": { + "name": "successful_publishes", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "failed_publishes": { + "name": "failed_publishes", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "avg_duration_seconds": { + "name": "avg_duration_seconds", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "total_gas_used": { + "name": "total_gas_used", + "type": "bigint", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "wallet_metrics_wallet_id_wallets_id_fk": { + "name": "wallet_metrics_wallet_id_wallets_id_fk", + "tableFrom": "wallet_metrics", + "tableTo": "wallets", + "columnsFrom": [ + "wallet_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "wallet_metrics_wallet_id_date_pk": { + "name": "wallet_metrics_wallet_id_date_pk", + "columns": [ + "wallet_id", + "date" + ] + } + }, + "uniqueConstraints": {} + }, + "wallets": { + "name": "wallets", + "columns": { + "id": { + "name": "id", + "type": "serial", + "primaryKey": false, + "notNull": true, + "autoincrement": true + }, + "address": { + "name": "address", + "type": "varchar(42)", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "private_key_encrypted": { + "name": "private_key_encrypted", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "blockchain": { + "name": "blockchain", + "type": "varchar(50)", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "is_active": { + "name": "is_active", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": true + }, + "is_locked": { + "name": "is_locked", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": false + }, + "locked_by": { + "name": "locked_by", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "locked_at": { + "name": "locked_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "last_used_at": { + "name": "last_used_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "total_uses": { + "name": "total_uses", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "successful_uses": { + "name": "successful_uses", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "failed_uses": { + "name": "failed_uses", + "type": "int", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(now())" + } + }, + "indexes": { + "idx_available": { + "name": "idx_available", + "columns": [ + "is_active", + "is_locked", + "last_used_at" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": { + "wallets_id": { + "name": "wallets_id", + "columns": [ + "id" + ] + } + }, + "uniqueConstraints": { + "wallets_address_unique": { + "name": "wallets_address_unique", + "columns": [ + "address" + ] + } + } + } + }, + "schemas": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + } +} \ No newline at end of file diff --git a/packages/plugin-dkg-publisher/src/database/migrations/meta/_journal.json b/packages/plugin-dkg-publisher/src/database/migrations/meta/_journal.json index 1bc3a60..d13baae 100644 --- a/packages/plugin-dkg-publisher/src/database/migrations/meta/_journal.json +++ b/packages/plugin-dkg-publisher/src/database/migrations/meta/_journal.json @@ -15,6 +15,13 @@ "when": 1756889502071, "tag": "0001_amused_dexter_bennett", "breakpoints": true + }, + { + "idx": 2, + "version": "5", + "when": 1766134677947, + "tag": "0002_ambitious_vance_astro", + "breakpoints": true } ] -} +} \ No newline at end of file diff --git a/packages/plugin-dkg-publisher/src/database/schema.ts b/packages/plugin-dkg-publisher/src/database/schema.ts index 9b523c3..7db9b1b 100644 --- a/packages/plugin-dkg-publisher/src/database/schema.ts +++ b/packages/plugin-dkg-publisher/src/database/schema.ts @@ -44,6 +44,7 @@ export const assets = mysqlTable( "queued", "assigned", "publishing", + "mint_submitted", "published", "failed", ]) diff --git a/packages/plugin-dkg-publisher/src/services/AssetService.ts b/packages/plugin-dkg-publisher/src/services/AssetService.ts index 42f8d44..905500f 100644 --- a/packages/plugin-dkg-publisher/src/services/AssetService.ts +++ b/packages/plugin-dkg-publisher/src/services/AssetService.ts @@ -91,6 +91,7 @@ export class AssetService extends EventEmitter { | "queued" | "assigned" | "publishing" + | "mint_submitted" | "published" | "failed", additionalFields?: Partial<{ @@ -117,6 +118,9 @@ export class AssetService extends EventEmitter { case "publishing": updates.publishingStartedAt = sql`NOW()`; break; + case "mint_submitted": + updates.publishingStartedAt = updates.publishingStartedAt || sql`NOW()`; + break; case "published": updates.publishedAt = sql`NOW()`; break; @@ -453,6 +457,7 @@ export class AssetService extends EventEmitter { | "queued" | "assigned" | "publishing" + | "mint_submitted" | "published" | "failed", ): Promise { diff --git a/packages/plugin-dkg-publisher/src/services/DkgClientAdapter.ts b/packages/plugin-dkg-publisher/src/services/DkgClientAdapter.ts new file mode 100644 index 0000000..0d6983a --- /dev/null +++ b/packages/plugin-dkg-publisher/src/services/DkgClientAdapter.ts @@ -0,0 +1,89 @@ +/** + * Lightweight adapter interface for DKG phased operations so we can + * inject fakes in tests without pulling in the real dkg.js client. + */ +export interface DkgPhasedClient { + publishPhase( + content: any, + options?: Record, + ): Promise; + mintPhase( + publishContext: PublishPhaseResult, + stepHooks?: Record, + ): Promise; + finalityPhase( + ual: string, + options?: Record, + ): Promise; +} + +export interface PublishPhaseResult { + readyForMint?: boolean; + datasetRoot?: string; + datasetSize?: number; + knowledgeAssetsAmount?: number; + publishOperationId?: string; + publishOperationResult?: any; + blockchain?: any; + endpoint?: string; + port?: string | number; + hashFunctionId?: number; + immutable?: boolean; + tokenAmount?: string | number | bigint; + payer?: string; + minimumNumberOfFinalizationConfirmations?: number; + minimumNumberOfNodeReplications?: number; + epochsNum?: number; + contentAssetStorageAddress?: string; + signatures?: any; + publisherNodeSignature?: any; + operation?: any; +} + +export interface MintPhaseResult { + UAL: string; + datasetRoot: string; + knowledgeCollectionId: string | number; + mintKnowledgeCollectionReceipt: any; +} + +export interface FinalityPhaseResult { + finality: { + status: string; + }; + numberOfConfirmations: number; + requiredConfirmations: number; +} + +/** + * Adapter around a real dkg.js client instance. We keep this thin so we + * can swap in a mock DkgPhasedClient in tests without loading dkg.js. + */ +export class RealDkgPhasedClient implements DkgPhasedClient { + constructor(private client: any) {} + + async publishPhase( + content: any, + options: Record = {}, + ): Promise { + return this.client.asset.publishAssetPhase(content, options); + } + + async mintPhase( + publishContext: PublishPhaseResult, + stepHooks: Record = {}, + ): Promise { + return this.client.asset.mintKnowledgeCollectionPhase( + publishContext, + {}, + stepHooks, + ); + } + + async finalityPhase( + ual: string, + options: Record = {}, + ): Promise { + return this.client.asset.finalizePublishPhase(ual, options); + } +} diff --git a/packages/plugin-dkg-publisher/src/services/DkgService.ts b/packages/plugin-dkg-publisher/src/services/DkgService.ts index 0b61c8d..2091834 100644 --- a/packages/plugin-dkg-publisher/src/services/DkgService.ts +++ b/packages/plugin-dkg-publisher/src/services/DkgService.ts @@ -1,5 +1,9 @@ import DKG from "dkg.js"; import { WalletService } from "./WalletService"; +import { + DkgPhasedClient, + RealDkgPhasedClient, +} from "./DkgClientAdapter"; export interface SparqlQueryResult { success: boolean; @@ -196,6 +200,15 @@ export class DkgService { return walletDkgClient; } + /** + * Create a phased DKG client adapter for publish/mint/finality operations. + * This wraps the real dkg.js client so tests can swap in a mock. + */ + createWalletPhasedClient(wallet: any): DkgPhasedClient { + const client = this.createWalletDKGClient(wallet); + return new RealDkgPhasedClient(client); + } + /** * Validate SPARQL query syntax (basic validation) */ diff --git a/packages/plugin-dkg-publisher/src/services/PublishingService.ts b/packages/plugin-dkg-publisher/src/services/PublishingService.ts index 1774c2a..d8d0969 100644 --- a/packages/plugin-dkg-publisher/src/services/PublishingService.ts +++ b/packages/plugin-dkg-publisher/src/services/PublishingService.ts @@ -121,11 +121,11 @@ export class PublishingService { [asset.privacy || "private"]: content, }; - // Create DKG client for this wallet - const dkgClient = this.dkgService.createWalletDKGClient(wallet); + // Create phased DKG client for this wallet + const phasedClient = this.dkgService.createWalletPhasedClient(wallet); - // Publish to DKG - console.log(`🚀 Publishing to DKG:`, { + // Publish to DKG (publish phase) + logger.info(`🚀 Publishing (phase 1) to DKG`, { assetId, epochs: asset.epochs, replications: asset.replications || 1, @@ -133,78 +133,50 @@ export class PublishingService { privacy: asset.privacy, }); - console.log(`📡 Making DKG API call...`); - - const result = await dkgClient.asset.create(wrappedContent, { + const publishResult = await phasedClient.publishPhase(wrappedContent, { epochsNum: asset.epochs, minimumNumberOfFinalizationConfirmations: 3, minimumNumberOfNodeReplications: asset.replications || 1, }); - // Log the complete DKG asset.create result - logger.info(`📡 DKG asset.create RESULT for asset ${assetId}`, { - assetId, - result: JSON.stringify(result, null, 2), - }); - - // Check for DKG API errors first - if ( - result?.operation?.publish?.errorType || - result?.operation?.publish?.errorMessage - ) { - const errorType = result.operation.publish.errorType; - const errorMessage = result.operation.publish.errorMessage; - - logger.error(`❌ DKG API ERROR for asset ${assetId}`, { - assetId, - errorType, - errorMessage, - operationId: result.operation.publish.operationId, - status: result.operation.publish.status, - }); - - throw new Error(`DKG API Error: ${errorType} - ${errorMessage}`); + if (!publishResult?.readyForMint) { + throw new Error( + `Publish phase did not complete (operationId=${publishResult?.publishOperationId})`, + ); } - // ONLY update as published if we actually have a UAL - if (!result.UAL) { - logger.error(`❌ DKG API SUCCESS BUT NO UAL for asset ${assetId}`, { - assetId, - resultStructure: result ? Object.keys(result) : "null", - resultJson: JSON.stringify(result), - UALValue: result?.UAL, - UALType: typeof result?.UAL, - }); - throw new Error("DKG API returned success but no UAL was provided"); + // Mint phase + const mintResult = await phasedClient.mintPhase(publishResult); + + if (!mintResult?.UAL) { + throw new Error("Mint phase did not return a UAL"); } - logger.info(`✅ DKG API SUCCESS WITH UAL for asset ${assetId}`, { + logger.info(`✅ DKG mint SUCCESS WITH UAL for asset ${assetId}`, { assetId, - ual: result.UAL, + ual: mintResult.UAL, transactionHash: - result.operation?.mintKnowledgeCollection?.transactionHash, + mintResult.mintKnowledgeCollectionReceipt?.transactionHash, }); - // Update asset with success + // Update asset: mint submitted, store tx/hash/ual, keep publishedAt null await this.db .update(assets) .set({ - status: "published", - ual: result.UAL, + status: "mint_submitted", + ual: mintResult.UAL, transactionHash: - result.operation?.mintKnowledgeCollection?.transactionHash || null, + mintResult.mintKnowledgeCollectionReceipt?.transactionHash || null, blockchain: wallet.blockchain, - publishedAt: sql`NOW()`, + publishingStartedAt: sql`NOW()`, }) .where(eq(assets.id, assetId)); - // Attempt record updating is handled by the worker - return { success: true, - ual: result.UAL, + ual: mintResult.UAL, transactionHash: - result.operation?.mintKnowledgeCollection?.transactionHash, + mintResult.mintKnowledgeCollectionReceipt?.transactionHash, }; } catch (error: any) { console.error(`Publishing failed for asset ${assetId}:`, error); diff --git a/packages/plugin-dkg-publisher/src/services/QueuePoller.ts b/packages/plugin-dkg-publisher/src/services/QueuePoller.ts index 3c470c8..94f2219 100644 --- a/packages/plugin-dkg-publisher/src/services/QueuePoller.ts +++ b/packages/plugin-dkg-publisher/src/services/QueuePoller.ts @@ -143,7 +143,7 @@ export class QueuePoller extends EventEmitter { let addedCount = 0; for (const asset of queuedAssets) { try { - await this.queueService.addToQueue(asset.id, asset.priority); + await this.queueService.addToQueue(asset.id, asset.priority, "publish-asset"); addedCount++; logger.logAssetEvent(asset.id, "Added to processing queue", { priority: asset.priority, diff --git a/packages/plugin-dkg-publisher/src/services/QueueService.ts b/packages/plugin-dkg-publisher/src/services/QueueService.ts index 1e391be..1084255 100644 --- a/packages/plugin-dkg-publisher/src/services/QueueService.ts +++ b/packages/plugin-dkg-publisher/src/services/QueueService.ts @@ -1,5 +1,5 @@ import { EventEmitter } from "events"; -import { Queue, Worker, QueueEvents } from "bullmq"; +import { Queue, Worker, QueueEvents, Job } from "bullmq"; import IORedis from "ioredis"; import { createBullBoard } from "@bull-board/api"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter"; @@ -8,6 +8,7 @@ import { PublishingService } from "./PublishingService"; import { WalletService } from "./WalletService"; import { AssetService } from "./AssetService"; import { queueLogger as logger } from "./Logger"; +import { DkgService } from "./DkgService"; export interface QueueStats { waiting: number; @@ -24,13 +25,18 @@ export class QueueService { private serverAdapter: ExpressAdapter; private currentWalletCount: number = 0; private walletCheckInterval: NodeJS.Timeout | null = null; + private jobDefaults: Record>; constructor( private redis: IORedis, private publishingService: PublishingService, private walletService: WalletService, private assetService: AssetService, + private dkgService?: DkgService, private healthMonitor?: EventEmitter, // Will be set later to avoid circular dependency + queueOptions?: { + jobDefaults?: Record>; + }, ) { // Initialize queue this.publishQueue = new Queue("knowledge-asset-publishing", { @@ -66,6 +72,22 @@ export class QueueService { serverAdapter: this.serverAdapter, }); + // Job-specific defaults (timeouts, backoff) keyed by job name + this.jobDefaults = { + "publish-asset": { + timeout: parseInt(process.env.PUBLISH_JOB_TIMEOUT_MS || `${3 * 60 * 1000}`), // 3 minutes default + }, + "finality-check": { + timeout: parseInt( + process.env.FINALITY_JOB_TIMEOUT_MS || `${15 * 60 * 1000}`, + ), // 15 minutes default + attempts: 3, + backoff: { type: "exponential", delay: 5000 }, + removeOnComplete: true, + }, + ...(queueOptions?.jobDefaults || {}), + }; + // No event listeners needed - QueuePoller handles all scheduling } @@ -81,8 +103,14 @@ export class QueueService { /** * Add asset to publishing queue (prevents duplicates) */ - async addToQueue(assetId: number, priority?: number): Promise { - const jobId = `asset-${assetId}`; + async addToQueue( + assetId: number, + priority?: number, + jobName: "publish-asset" | "finality-check" = "publish-asset", + payload: Record = {}, + jobOptions: Record = {}, + ): Promise { + const jobId = `${jobName}-${assetId}`; try { // Check if job already exists in Redis @@ -106,12 +134,14 @@ export class QueueService { // Now add the job (either fresh or after cleanup) await this.publishQueue.add( - "publish-asset", - { assetId }, + jobName, + { assetId, ...payload }, { priority: priority || 50, delay: 0, jobId: jobId, + ...(this.jobDefaults[jobName] || {}), + ...jobOptions, }, ); console.log(`✅ Asset ${assetId} added to BullMQ queue`); @@ -124,6 +154,199 @@ export class QueueService { } } + /** + * Enqueue a finality-check job with defaults merged in. + */ + async enqueueFinalityJob( + assetId: number, + priority: number | undefined, + ual: string, + transactionHash?: string | null, + ): Promise { + const defaults = this.jobDefaults["finality-check"] || {}; + await this.addToQueue( + assetId, + priority, + "finality-check", + { ual, transactionHash }, + defaults, + ); + } + + /** + * Handle publish-asset jobs (publish+mint, then enqueue finality). + */ + private async processPublishJob(job: Job, workerId: number): Promise { + const { assetId } = job.data; + logger.info( + `\n=== WORKER ${workerId} STARTED PROCESSING ASSET ${assetId} ===`, + { workerId: workerId, jobId: job.id, assetId }, + ); + + let attemptId: number | null = null; + let wallet: any = null; + + try { + // First, atomically claim the asset for processing + logger.info(`Worker ${workerId} attempting to claim asset ${assetId}`, { + workerId: workerId, + assetId, + }); + const claimed = await this.assetService.claimAssetForProcessing(assetId); + if (!claimed) { + logger.warn( + `Asset ${assetId} already being processed by another worker - job will exit`, + { workerId: workerId, assetId }, + ); + return; // Another worker is handling it + } + + logger.info( + `✅ Asset ${assetId} SUCCESSFULLY CLAIMED by worker ${workerId}`, + { workerId: workerId, assetId }, + ); + + // Update job progress + await job.updateProgress(10); + + // Get wallet from pool + logger.info( + `Worker ${workerId} requesting available wallet for asset ${assetId}`, + { workerId: workerId, assetId }, + ); + wallet = await this.walletService.assignWalletToAsset(assetId); + if (!wallet) { + logger.error(`❌ NO AVAILABLE WALLETS for asset ${assetId}`, { + workerId: workerId, + assetId, + }); + throw new Error("No available wallets"); + } + logger.info(`✅ Got wallet ${wallet.id} for asset ${assetId}`, { + workerId: workerId, + assetId, + walletId: wallet.id, + }); + + await job.updateProgress(20); + + // Create publishing attempt record BEFORE attempting to publish + logger.info( + `Worker ${workerId} creating publishing attempt for asset ${assetId}`, + { workerId: workerId, assetId }, + ); + attemptId = await this.assetService.createPublishingAttempt( + assetId, + wallet, + ); + logger.info( + `✅ Created publishing attempt ${attemptId} for asset ${assetId}`, + { attemptId, workerId: workerId, assetId }, + ); + + // Publish asset + logger.info( + `🚀 STARTING PUBLISH for asset ${assetId} with wallet ${wallet.id}`, + { workerId: workerId, assetId, walletId: wallet.id }, + ); + const result = await this.publishingService.publishAsset(assetId, wallet); + + await job.updateProgress(100); + + logger.info(`📡 Publishing result for asset ${assetId}`, { + workerId: workerId, + assetId, + success: result.success, + error: result.error, + }); + + if (!result.success) { + logger.error( + `❌ PUBLISHING FAILED for asset ${assetId}: ${result.error}`, + { workerId: workerId, assetId, error: result.error }, + ); + throw new Error(result.error || "Publishing failed"); + } + + logger.info(`🎉 PUBLISHING SUCCESSFUL for asset ${assetId}`, { + workerId: workerId, + assetId, + ual: result.ual, + }); + + // Update publishing attempt record as successful + if (attemptId && result.success) { + await this.assetService.updatePublishingAttempt(attemptId, { + status: "success", + ual: result.ual, + transactionHash: result.transactionHash, + durationSeconds: Math.floor((Date.now() - job.timestamp) / 1000), + }); + } + + // Enqueue finality job with defaults + try { + await this.enqueueFinalityJob( + assetId, + job.opts.priority, + result.ual!, + result.transactionHash, + ); + logger.info( + `📬 Enqueued finality-check job for asset ${assetId} (UAL ${result.ual})`, + { workerId: workerId, assetId }, + ); + } catch (enqueueError: any) { + logger.error( + `⚠️ Failed to enqueue finality-check for asset ${assetId}: ${enqueueError.message}`, + { workerId: workerId, assetId }, + ); + } + + // Release wallet on success + await this.walletService.releaseWallet(wallet.id, true); + + logger.info( + `=== WORKER ${workerId} COMPLETED ASSET ${assetId} SUCCESSFULLY ===\n`, + { workerId: workerId, assetId }, + ); + return result; + } catch (error: any) { + logger.error( + `💥 WORKER ${workerId} ERROR processing asset ${assetId}: ${error.message}`, + { + workerId: workerId, + assetId, + error: error.message, + stack: error.stack, + }, + ); + // Release wallet on failure if we had one + if (wallet) { + await this.walletService.releaseWallet(wallet.id, false); + } + + // Update publishing attempt record as failed + if (attemptId) { + await this.assetService.updatePublishingAttempt(attemptId, { + status: "failed", + errorType: error.name || "Error", + errorMessage: error.message, + durationSeconds: Math.floor((Date.now() - job.timestamp) / 1000), + }); + } + + // Handle failure with retry logic + await this.assetService.handleAssetFailure(assetId, error.message); + + logger.info(`=== WORKER ${workerId} FAILED ASSET ${assetId} ===\n`, { + workerId: workerId, + assetId, + }); + throw error; + } + } + // Recovery logic moved to QueuePoller /** @@ -183,163 +406,10 @@ export class QueueService { const worker = new Worker( "knowledge-asset-publishing", async (job) => { - const { assetId } = job.data; - logger.info( - `\n=== WORKER ${i} STARTED PROCESSING ASSET ${assetId} ===`, - { workerId: i, jobId: job.id, assetId }, - ); - - let attemptId: number | null = null; - let wallet: any = null; - - try { - // First, atomically claim the asset for processing - logger.info(`Worker ${i} attempting to claim asset ${assetId}`, { - workerId: i, - assetId, - }); - const claimed = - await this.assetService.claimAssetForProcessing(assetId); - if (!claimed) { - logger.warn( - `Asset ${assetId} already being processed by another worker - job will exit`, - { workerId: i, assetId }, - ); - return; // Another worker is handling it - } - - logger.info( - `✅ Asset ${assetId} SUCCESSFULLY CLAIMED by worker ${i}`, - { workerId: i, assetId }, - ); - - // Update job progress - await job.updateProgress(10); - - // Get wallet from pool - logger.info( - `Worker ${i} requesting available wallet for asset ${assetId}`, - { workerId: i, assetId }, - ); - wallet = await this.walletService.assignWalletToAsset(assetId); - if (!wallet) { - logger.error(`❌ NO AVAILABLE WALLETS for asset ${assetId}`, { - workerId: i, - assetId, - }); - throw new Error("No available wallets"); - } - logger.info(`✅ Got wallet ${wallet.id} for asset ${assetId}`, { - workerId: i, - assetId, - walletId: wallet.id, - }); - - await job.updateProgress(20); - - // Create publishing attempt record BEFORE attempting to publish - logger.info( - `Worker ${i} creating publishing attempt for asset ${assetId}`, - { workerId: i, assetId }, - ); - attemptId = await this.assetService.createPublishingAttempt( - assetId, - wallet, - ); - logger.info( - `✅ Created publishing attempt ${attemptId} for asset ${assetId}`, - { attemptId, workerId: i, assetId }, - ); - - // Publish asset - logger.info( - `🚀 STARTING PUBLISH for asset ${assetId} with wallet ${wallet.id}`, - { workerId: i, assetId, walletId: wallet.id }, - ); - const result = await this.publishingService.publishAsset( - assetId, - wallet, - ); - - await job.updateProgress(100); - - logger.info(`📡 Publishing result for asset ${assetId}`, { - workerId: i, - assetId, - success: result.success, - error: result.error, - }); - - if (!result.success) { - logger.error( - `❌ PUBLISHING FAILED for asset ${assetId}: ${result.error}`, - { workerId: i, assetId, error: result.error }, - ); - throw new Error(result.error || "Publishing failed"); - } - - logger.info(`🎉 PUBLISHING SUCCESSFUL for asset ${assetId}`, { - workerId: i, - assetId, - ual: result.ual, - }); - - // Update publishing attempt record as successful - if (attemptId && result.success) { - await this.assetService.updatePublishingAttempt(attemptId, { - status: "success", - ual: result.ual, - transactionHash: result.transactionHash, - durationSeconds: Math.floor( - (Date.now() - job.timestamp) / 1000, - ), - }); - } - - // Release wallet on success - await this.walletService.releaseWallet(wallet.id, true); - - logger.info( - `=== WORKER ${i} COMPLETED ASSET ${assetId} SUCCESSFULLY ===\n`, - { workerId: i, assetId }, - ); - return result; - } catch (error: any) { - logger.error( - `💥 WORKER ${i} ERROR processing asset ${assetId}: ${error.message}`, - { - workerId: i, - assetId, - error: error.message, - stack: error.stack, - }, - ); - // Release wallet on failure if we had one - if (wallet) { - await this.walletService.releaseWallet(wallet.id, false); - } - - // Update publishing attempt record as failed - if (attemptId) { - await this.assetService.updatePublishingAttempt(attemptId, { - status: "failed", - errorType: error.name || "Error", - errorMessage: error.message, - durationSeconds: Math.floor( - (Date.now() - job.timestamp) / 1000, - ), - }); - } - - // Handle failure with retry logic - await this.assetService.handleAssetFailure(assetId, error.message); - - logger.info(`=== WORKER ${i} FAILED ASSET ${assetId} ===\n`, { - workerId: i, - assetId, - }); - throw error; + if (job.name === "finality-check") { + return this.processFinalityJob(job, i); } + return this.processPublishJob(job, i); }, { connection: this.redis, @@ -419,6 +489,82 @@ export class QueueService { logger.info(`All ${workerCount} workers started successfully`); } + /** + * Handle finality-check jobs (no wallet lock required) + */ + private async processFinalityJob(job: Job, workerId: number): Promise { + const { assetId, ual: ualFromJob } = job.data || {}; + const assetIdNum = Number(assetId); + + logger.info( + `\n=== WORKER ${workerId} STARTED FINALITY FOR ASSET ${assetIdNum} ===`, + { workerId, jobId: job.id, assetId: assetIdNum }, + ); + + const asset = await this.assetService.getAsset(assetIdNum); + const ual = ualFromJob || asset?.ual; + + if (!ual) { + throw new Error( + `Finality job missing UAL for asset ${assetIdNum} (job payload and DB empty)`, + ); + } + + // Use any available wallet for read-only finality checks + const wallet = await this.walletService.getWalletForQueries(); + if (!wallet) { + throw new Error("No wallet available for finality check"); + } + + if (!this.dkgService) { + throw new Error("DKG service not initialized"); + } + + const phasedClient = this.dkgService.createWalletPhasedClient(wallet); + + try { + await job.updateProgress(20); + const finalityResult = await phasedClient.finalityPhase(ual, { + minimumNumberOfFinalizationConfirmations: 3, + maxNumberOfRetries: 60, + frequency: 1, + }); + await job.updateProgress(80); + + const isFinalized = + finalityResult?.finality?.status === "FINALIZED" || + finalityResult?.numberOfConfirmations >= + finalityResult?.requiredConfirmations; + + if (!isFinalized) { + throw new Error( + `Finality not reached for ${ual} (confirmations=${finalityResult?.numberOfConfirmations}/${finalityResult?.requiredConfirmations})`, + ); + } + + await this.assetService.updateAssetStatus(assetIdNum, "published", { + ual, + }); + await job.updateProgress(100); + + logger.info( + `✅ Finality reached for asset ${assetIdNum} (UAL ${ual})`, + { workerId, assetId: assetIdNum }, + ); + return { + success: true, + assetId: assetIdNum, + ual, + }; + } catch (error: any) { + logger.error( + `❌ Finality check failed for asset ${assetIdNum}: ${error.message}`, + { workerId, assetId: assetIdNum, error: error.message }, + ); + throw error; + } + } + /** * Monitor wallet count and automatically restart workers if needed */ diff --git a/packages/plugin-dkg-publisher/src/services/index.ts b/packages/plugin-dkg-publisher/src/services/index.ts index e74df5b..1cedcad 100644 --- a/packages/plugin-dkg-publisher/src/services/index.ts +++ b/packages/plugin-dkg-publisher/src/services/index.ts @@ -10,6 +10,7 @@ import { MetricsService } from "./MetricsService"; import { HealthMonitor } from "./HealthMonitor"; import { QueuePoller } from "./QueuePoller"; import { DkgService } from "./DkgService"; +import { RealDkgPhasedClient, DkgPhasedClient } from "./DkgClientAdapter"; import type { KnowledgeAssetManagerConfig } from "../types"; export type ServiceConfig = KnowledgeAssetManagerConfig; @@ -58,6 +59,7 @@ export async function initializeServices( publishingService, walletService, assetService, + dkgService, ); container.register("queueService", queueService); @@ -159,3 +161,4 @@ export { MetricsService } from "./MetricsService"; export { HealthMonitor } from "./HealthMonitor"; export { QueuePoller } from "./QueuePoller"; export { DkgService } from "./DkgService"; +export { RealDkgPhasedClient, DkgPhasedClient } from "./DkgClientAdapter"; diff --git a/packages/plugin-dkg-publisher/src/types.ts b/packages/plugin-dkg-publisher/src/types.ts index c6f2d0e..7854b36 100644 --- a/packages/plugin-dkg-publisher/src/types.ts +++ b/packages/plugin-dkg-publisher/src/types.ts @@ -55,6 +55,7 @@ export interface AssetStatus { | "queued" | "assigned" | "publishing" + | "mint_submitted" | "published" | "failed"; ual?: string | null; From 75de630ce6d28e1517c9c5679f32d22e80047d9e Mon Sep 17 00:00:00 2001 From: OTLegend Date: Fri, 26 Dec 2025 15:29:08 +0100 Subject: [PATCH 2/2] [feature] Track finality attempts and harden publish flow --- .../migrations/0003_finality_attempts.sql | 19 ++++++ .../src/database/schema.ts | 48 ++++++++++++++ .../src/services/AssetService.ts | 57 ++++++++++++++++- .../src/services/DkgClientAdapter.ts | 14 +++-- .../src/services/PublishingService.ts | 63 ++++++++++++++++--- .../src/services/QueueService.ts | 27 ++++++++ 6 files changed, 214 insertions(+), 14 deletions(-) create mode 100644 packages/plugin-dkg-publisher/src/database/migrations/0003_finality_attempts.sql diff --git a/packages/plugin-dkg-publisher/src/database/migrations/0003_finality_attempts.sql b/packages/plugin-dkg-publisher/src/database/migrations/0003_finality_attempts.sql new file mode 100644 index 0000000..74424a5 --- /dev/null +++ b/packages/plugin-dkg-publisher/src/database/migrations/0003_finality_attempts.sql @@ -0,0 +1,19 @@ +CREATE TABLE `finality_attempts` ( + `id` serial AUTO_INCREMENT NOT NULL, + `asset_id` int NOT NULL, + `attempt_number` int NOT NULL, + `worker_id` varchar(100), + `ual` varchar(255) NOT NULL, + `transaction_hash` varchar(66), + `confirmations` int, + `required_confirmations` int, + `status` enum('started','success','failed','timeout') NOT NULL, + `error_type` varchar(50), + `error_message` text, + `started_at` timestamp NOT NULL, + `completed_at` timestamp, + `duration_seconds` int, + `created_at` timestamp DEFAULT (now()), + CONSTRAINT `finality_attempts_asset_id_assets_id_fk` FOREIGN KEY (`asset_id`) REFERENCES `assets`(`id`) ON DELETE cascade ON UPDATE no action +);--> statement-breakpoint +CREATE INDEX `idx_finality_asset_attempts` ON `finality_attempts` (`asset_id`,`attempt_number`);--> statement-breakpoint diff --git a/packages/plugin-dkg-publisher/src/database/schema.ts b/packages/plugin-dkg-publisher/src/database/schema.ts index 7db9b1b..2589f95 100644 --- a/packages/plugin-dkg-publisher/src/database/schema.ts +++ b/packages/plugin-dkg-publisher/src/database/schema.ts @@ -147,6 +147,41 @@ export const publishingAttempts = mysqlTable( }), ); +// Finality attempts table: Track finality checks separately +export const finalityAttempts = mysqlTable( + "finality_attempts", + { + id: serial("id").primaryKey(), + assetId: int("asset_id") + .notNull() + .references(() => assets.id, { onDelete: "cascade" }), + attemptNumber: int("attempt_number").notNull(), + workerId: varchar("worker_id", { length: 100 }), + ual: varchar("ual", { length: 255 }).notNull(), + transactionHash: varchar("transaction_hash", { length: 66 }), + confirmations: int("confirmations"), + requiredConfirmations: int("required_confirmations"), + status: mysqlEnum("status", [ + "started", + "success", + "failed", + "timeout", + ]).notNull(), + errorType: varchar("error_type", { length: 50 }), + errorMessage: text("error_message"), + startedAt: timestamp("started_at").notNull(), + completedAt: timestamp("completed_at"), + durationSeconds: int("duration_seconds"), + createdAt: timestamp("created_at").defaultNow(), + }, + (table) => ({ + assetAttemptsIdx: index("idx_finality_asset_attempts").on( + table.assetId, + table.attemptNumber, + ), + }), +); + // Batch table: Track batch operations export const batches = mysqlTable( "batches", @@ -213,6 +248,7 @@ export const assetsRelations = relations(assets, ({ one, many }) => ({ references: [batches.id], }), publishingAttempts: many(publishingAttempts), + finalityAttempts: many(finalityAttempts), })); export const walletsRelations = relations(wallets, ({ many }) => ({ @@ -234,6 +270,16 @@ export const publishingAttemptsRelations = relations( }), ); +export const finalityAttemptsRelations = relations( + finalityAttempts, + ({ one }) => ({ + asset: one(assets, { + fields: [finalityAttempts.assetId], + references: [assets.id], + }), + }), +); + export const batchesRelations = relations(batches, ({ many }) => ({ assets: many(assets), })); @@ -252,5 +298,7 @@ export type Wallet = typeof wallets.$inferSelect; export type NewWallet = typeof wallets.$inferInsert; export type PublishingAttempt = typeof publishingAttempts.$inferSelect; export type NewPublishingAttempt = typeof publishingAttempts.$inferInsert; +export type FinalityAttempt = typeof finalityAttempts.$inferSelect; +export type NewFinalityAttempt = typeof finalityAttempts.$inferInsert; export type Batch = typeof batches.$inferSelect; export type NewBatch = typeof batches.$inferInsert; diff --git a/packages/plugin-dkg-publisher/src/services/AssetService.ts b/packages/plugin-dkg-publisher/src/services/AssetService.ts index 905500f..c4a738b 100644 --- a/packages/plugin-dkg-publisher/src/services/AssetService.ts +++ b/packages/plugin-dkg-publisher/src/services/AssetService.ts @@ -1,6 +1,10 @@ import { EventEmitter } from "events"; import { Database } from "../database"; -import { assets, publishingAttempts } from "../database/schema"; +import { + assets, + publishingAttempts, + finalityAttempts, +} from "../database/schema"; import { eq, and, sql, desc, or } from "drizzle-orm"; import { AssetInput, AssetStatus } from "../types"; import { StorageService } from "./StorageService"; @@ -412,6 +416,57 @@ export class AssetService extends EventEmitter { .where(eq(publishingAttempts.id, attemptId)); } + /** + * Create a finality attempt record + */ + async createFinalityAttempt( + assetId: number, + ual: string, + transactionHash?: string | null, + ): Promise { + // Determine next attempt number + const existingAttempts = await this.db + .select({ count: sql`COUNT(*)` }) + .from(finalityAttempts) + .where(eq(finalityAttempts.assetId, assetId)); + const currentAttemptNumber = (existingAttempts[0]?.count || 0) + 1; + + const attemptResult = await this.db.insert(finalityAttempts).values({ + assetId, + attemptNumber: currentAttemptNumber, + workerId: process.pid.toString(), + ual, + transactionHash: transactionHash || null, + status: "started", + startedAt: sql`NOW()`, + }); + + return attemptResult[0].insertId; + } + + /** + * Update finality attempt record + */ + async updateFinalityAttempt( + attemptId: number, + updates: { + status: "started" | "success" | "failed" | "timeout"; + confirmations?: number; + requiredConfirmations?: number; + errorType?: string; + errorMessage?: string; + durationSeconds?: number; + }, + ): Promise { + await this.db + .update(finalityAttempts) + .set({ + ...updates, + completedAt: sql`NOW()`, + }) + .where(eq(finalityAttempts.id, attemptId)); + } + /** * Retry failed assets */ diff --git a/packages/plugin-dkg-publisher/src/services/DkgClientAdapter.ts b/packages/plugin-dkg-publisher/src/services/DkgClientAdapter.ts index 0d6983a..c808c94 100644 --- a/packages/plugin-dkg-publisher/src/services/DkgClientAdapter.ts +++ b/packages/plugin-dkg-publisher/src/services/DkgClientAdapter.ts @@ -73,11 +73,15 @@ export class RealDkgPhasedClient implements DkgPhasedClient { publishContext: PublishPhaseResult, stepHooks: Record = {}, ): Promise { - return this.client.asset.mintKnowledgeCollectionPhase( - publishContext, - {}, - stepHooks, - ); + const hasHooks = stepHooks && Object.keys(stepHooks).length > 0; + if (hasHooks) { + return this.client.asset.mintKnowledgeCollectionPhase( + publishContext, + {}, + stepHooks, + ); + } + return this.client.asset.mintKnowledgeCollectionPhase(publishContext); } async finalityPhase( diff --git a/packages/plugin-dkg-publisher/src/services/PublishingService.ts b/packages/plugin-dkg-publisher/src/services/PublishingService.ts index d8d0969..38b9612 100644 --- a/packages/plugin-dkg-publisher/src/services/PublishingService.ts +++ b/packages/plugin-dkg-publisher/src/services/PublishingService.ts @@ -125,7 +125,7 @@ export class PublishingService { const phasedClient = this.dkgService.createWalletPhasedClient(wallet); // Publish to DKG (publish phase) - logger.info(`🚀 Publishing (phase 1) to DKG`, { + logger.info(`Publishing (phase 1) to DKG`, { assetId, epochs: asset.epochs, replications: asset.replications || 1, @@ -133,26 +133,73 @@ export class PublishingService { privacy: asset.privacy, }); - const publishResult = await phasedClient.publishPhase(wrappedContent, { - epochsNum: asset.epochs, - minimumNumberOfFinalizationConfirmations: 3, - minimumNumberOfNodeReplications: asset.replications || 1, + let publishResult; + try { + publishResult = await phasedClient.publishPhase(wrappedContent, { + epochsNum: asset.epochs, + minimumNumberOfFinalizationConfirmations: 3, + minimumNumberOfNodeReplications: asset.replications || 1, + }); + } catch (pubError: any) { + logger.error(`Publish phase threw`, { + assetId, + error: pubError?.message, + stack: pubError?.stack, + }); + throw pubError; + } + logger.debug(`Publish phase returned`, { + assetId, + publishOperationId: publishResult?.publishOperationId, + readyForMint: publishResult?.readyForMint, }); - if (!publishResult?.readyForMint) { + const publishStatus = + publishResult?.publishOperationResult?.status ?? "unknown"; + const minAcksReached = + publishResult?.publishOperationResult?.data?.minAcksReached; + // If dkg.js didn't set readyForMint, infer it from status/minAcks + const readyForMint = + publishResult?.readyForMint ?? + (publishStatus === "COMPLETED" || minAcksReached === true); + publishResult.readyForMint = readyForMint; + + if (!readyForMint) { + const publishData = publishResult?.publishOperationResult?.data; + const publishErrorMessage = + publishResult?.publishOperationResult?.errorMessage || + publishResult?.publishOperationResult?.data?.errorMessage; + + logger.error("❌ Publish phase incomplete", { + assetId, + publishOperationId: publishResult?.publishOperationId, + status: publishStatus, + minAcksReached, + errorMessage: publishErrorMessage, + data: publishData, + }); throw new Error( - `Publish phase did not complete (operationId=${publishResult?.publishOperationId})`, + `Publish phase did not complete (operationId=${publishResult?.publishOperationId}, status=${publishStatus}, minAcks=${minAcksReached})`, ); } // Mint phase + logger.debug(`Mint phase starting`, { + assetId, + publishOperationId: publishResult?.publishOperationId, + }); const mintResult = await phasedClient.mintPhase(publishResult); + logger.debug(`Mint phase returned`, { + assetId, + ual: mintResult?.UAL, + tx: mintResult?.mintKnowledgeCollectionReceipt?.transactionHash, + }); if (!mintResult?.UAL) { throw new Error("Mint phase did not return a UAL"); } - logger.info(`✅ DKG mint SUCCESS WITH UAL for asset ${assetId}`, { + logger.info(`DKG mint SUCCESS WITH UAL for asset ${assetId}`, { assetId, ual: mintResult.UAL, transactionHash: diff --git a/packages/plugin-dkg-publisher/src/services/QueueService.ts b/packages/plugin-dkg-publisher/src/services/QueueService.ts index 1084255..fbb88b9 100644 --- a/packages/plugin-dkg-publisher/src/services/QueueService.ts +++ b/packages/plugin-dkg-publisher/src/services/QueueService.ts @@ -495,6 +495,8 @@ export class QueueService { private async processFinalityJob(job: Job, workerId: number): Promise { const { assetId, ual: ualFromJob } = job.data || {}; const assetIdNum = Number(assetId); + const startedAt = Date.now(); + let finalityAttemptId: number | null = null; logger.info( `\n=== WORKER ${workerId} STARTED FINALITY FOR ASSET ${assetIdNum} ===`, @@ -523,6 +525,13 @@ export class QueueService { const phasedClient = this.dkgService.createWalletPhasedClient(wallet); try { + // Create finality attempt record + finalityAttemptId = await this.assetService.createFinalityAttempt( + assetIdNum, + ual, + job.data?.transactionHash || null, + ); + await job.updateProgress(20); const finalityResult = await phasedClient.finalityPhase(ual, { minimumNumberOfFinalizationConfirmations: 3, @@ -542,6 +551,16 @@ export class QueueService { ); } + // Update finality attempt + if (finalityAttemptId) { + await this.assetService.updateFinalityAttempt(finalityAttemptId, { + status: "success", + confirmations: finalityResult?.numberOfConfirmations, + requiredConfirmations: finalityResult?.requiredConfirmations, + durationSeconds: Math.floor((Date.now() - startedAt) / 1000), + }); + } + await this.assetService.updateAssetStatus(assetIdNum, "published", { ual, }); @@ -557,6 +576,14 @@ export class QueueService { ual, }; } catch (error: any) { + if (finalityAttemptId) { + await this.assetService.updateFinalityAttempt(finalityAttemptId, { + status: "failed", + errorType: error?.name || "Error", + errorMessage: error?.message, + durationSeconds: Math.floor((Date.now() - startedAt) / 1000), + }); + } logger.error( `❌ Finality check failed for asset ${assetIdNum}: ${error.message}`, { workerId, assetId: assetIdNum, error: error.message },