-
Notifications
You must be signed in to change notification settings - Fork 687
Open
Description
Problem description
Sometimes the data sent by the server does not enter the data event.
Reproduction steps
This is a streaming file download function. The server sends data to the client in chunks until IsLastBlock is encountered, indicating that data transmission is complete. However, sometimes on('data') is not triggered, causing some chunks to be lost, resulting in an incomplete file.
Environment
- Windows11
- Node v22.21.1
- Node install method:windows msi installer
- grpc@1.14.2
Additional context
code segment:
import log from "electron-log";
import fs from "fs";
import fsPromises from "fs/promises";
import path from "path";
import { PassThrough, pipeline } from "stream";
import { promisify } from "util";
import { createGrpcClient } from "../grpc/client.js";
import { DownloadChildMessage } from "../types/grpcChild.js";
const pipe = promisify(pipeline);
log.info("[Worker] Log Downloader Worker Started");
process.on("message", async (msg: DownloadChildMessage & { userDataPath: string; protoPath: string }) => {
const { type, ip, port, payload, taskId, instance, userDataPath, protoPath } = msg;
log.info(`[Worker] gRPC target: ${ip}:${port}`);
// 添加下载超时设置
const DOWNLOAD_TIMEOUT = 5 * 60 * 1000; // 5分钟超时
let timeoutId = setTimeout(() => {
log.error(`[Worker] Download timeout after ${DOWNLOAD_TIMEOUT}ms`);
process.send?.({ success: false, error: "Download timeout" });
process.exit(1);
}, DOWNLOAD_TIMEOUT);
try {
const client = createGrpcClient(ip, port, protoPath);
const input = { InputFileJson: JSON.stringify(payload) };
const tempDir = path.join(userDataPath, `downloads/logs/${taskId}`);
await fsPromises.mkdir(tempDir, { recursive: true });
const zipPath = path.join(
tempDir,
`${generateZipId(instance)}${type === "app" ? "_app" : ""}.zip`
);
const writeStream = fs.createWriteStream(zipPath);
// 监听写入流的错误事件
writeStream.on('error', (err) => {
log.error(`[Worker] Write stream error: ${err.message}`);
pass.destroy(err); // 将错误传播到pipeline
});
const pass = new PassThrough({ highWaterMark: 1024 * 1024 }); // 设置合适的缓冲区大小
let receivedChunks = 0;
let receivedBytes = 0;
let lastBlockReceived = false;
// pipeline 自动处理流和背压
const pipelinePromise = pipe(pass, writeStream)
.then(() => {
clearTimeout(timeoutId);
if (!lastBlockReceived) {
log.warn(`[Worker] Pipeline completed but IsLastBlock was never received. Received ${receivedChunks} chunks, ${receivedBytes} bytes.`);
}
log.info(`[Worker] Download complete -> ${zipPath}`);
process.send?.({ success: true, path: zipPath });
})
.catch(err => {
clearTimeout(timeoutId);
log.error(`[Worker] Pipeline failed: ${err.message}`);
process.send?.({ success: false, error: err.message });
})
.finally(() => {
// 确保进程退出
setTimeout(() => process.exit(0), 1000);
});
const call = type === "app" ? client.DownloadFile(input) : client.DownloadLog(input);
// 改进的背压处理机制
const handleBackpressure = (chunk: Buffer) => {
if (!pass.write(chunk)) {
// 缓冲区满,暂停gRPC流
call.pause();
log.debug(`[Worker] Backpressure: pausing gRPC stream. Buffer full.`);
// 等待drain事件再恢复
pass.once("drain", () => {
log.debug(`[Worker] Backpressure relieved: resuming gRPC stream.`);
call.resume();
});
}
};
call.on("data", (res: any) => {
receivedChunks++;
const chunkSize = res.FileBytes?.length || 0;
receivedBytes += chunkSize;
log.info(`[Worker] Received chunk ${receivedChunks}: ${chunkSize} bytes, IsLastBlock: ${res.IsLastBlock}`);
if (res.IsLastBlock) {
lastBlockReceived = true;
log.info(`[Worker] Final block received. Total: ${receivedChunks} chunks, ${receivedBytes} bytes`);
}
if (res.FileBytes?.length) {
handleBackpressure(res.FileBytes);
} else {
log.warn(`[Worker] Received chunk with empty FileBytes`);
}
});
call.on("end", () => {
log.info("[Worker] gRPC stream ended, closing PassThrough");
if (!lastBlockReceived) {
log.warn(`[Worker] Stream ended but IsLastBlock=true was not received. Received ${receivedChunks} chunks.`);
}
pass.end(); // 正常结束流
});
call.on("error", (err: any) => {
clearTimeout(timeoutId);
log.error(`[Worker] gRPC error: ${err.message}`);
pass.destroy(err); // 将错误传播到pipeline
});
call.on("status", (status: any) => {
log.info(`[Worker] gRPC status: ${JSON.stringify(status)}`);
});
// 等待pipeline完成
await pipelinePromise;
} catch (err: any) {
clearTimeout(timeoutId);
log.error(`[Worker] Fatal error: ${err.message}`);
process.send?.({ success: false, error: err.message });
setTimeout(() => process.exit(1), 1000);
}
});
function generateZipId(instance: any) {
const { HostName, ServerName, Version, ServiceInstanceName, Id } = instance;
return `${HostName}_${ServerName}_${Version}_${ServiceInstanceName}_${Id}`;
}
import grpc from "@grpc/grpc-js";
import protoLoader from "@grpc/proto-loader";
export function createGrpcClient(ip: string, port: number, protoPath: string) {
const packageDefinition = protoLoader.loadSync(protoPath, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
const proto = grpc.loadPackageDefinition(packageDefinition) as any;
const Service = proto.ServerMangerGrpc;
return new Service(
`${ip}:${port}`,
grpc.credentials.createInsecure()
);
}
import { fork } from "child_process";
import { app } from "electron";
import log from "electron-log";
import path, { dirname } from "path";
import { fileURLToPath } from "url";
import { getProtoPath } from "../grpc/protoPath.js";
import { DownloadChildMessage } from "../types/grpcChild.js";
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
export const downloadLogInChild = (
ip: string,
port: number,
payload: any,
taskId: string,
instance: any,
type: "log" | "app" = "log",
protoPath?: string
): Promise<string> => {
return new Promise((resolve, reject) => {
const child = fork(path.join(__dirname, "logDownloaderWorker.js"));
let resolved = false; // 防止多次解析
// 设置通信超时
const timeout = setTimeout(() => {
if (!resolved) {
log.error(`Download child process timeout for ${instance.HostName}`);
child.kill('SIGKILL');
reject(new Error("Child process timeout"));
resolved = true;
}
}, 10 * 60 * 1000); // 10分钟超时
child.on("message", (msg: { success: boolean; path?: string; error?: string }) => {
if (resolved) return;
clearTimeout(timeout);
resolved = true;
if (msg.success) {
log.info(`Download successful for ${instance.HostName}: ${msg.path}`);
resolve(msg.path!);
} else {
log.error(`Download failed for ${instance.HostName}: ${msg.error}`);
reject(new Error(msg.error || "Download failed"));
}
// 温和关闭子进程
setTimeout(() => {
if (child.connected) {
child.disconnect();
}
if (!child.killed) {
child.kill();
}
}, 1000);
});
child.on("error", (err) => {
if (resolved) return;
clearTimeout(timeout);
resolved = true;
log.error(`Child process error for ${instance.HostName}: ${err.message}`);
reject(err);
});
child.on("exit", (code, signal) => {
if (code !== 0 && !resolved) {
clearTimeout(timeout);
resolved = true;
log.error(`Child process exited abnormally for ${instance.HostName}: code ${code}, signal ${signal}`);
reject(new Error(`Child process exited with code ${code}`));
}
});
// 传入 Electron 路径
const userDataPath = app.getPath("userData");
const message: DownloadChildMessage & { userDataPath: string; protoPath: string } = {
ip, port, payload, taskId, instance, type,
userDataPath,
protoPath: protoPath || getProtoPath()
};
// 添加发送重试机制
const sendMessage = (attempt = 0) => {
if (attempt >= 3) {
log.error(`Failed to send message to child process after ${attempt} attempts`);
reject(new Error("Cannot communicate with child process"));
return;
}
if (child.connected) {
child.send(message, (err) => {
if (err) {
log.warn(`Failed to send message to child (attempt ${attempt + 1}): ${err.message}`);
setTimeout(() => sendMessage(attempt + 1), 1000);
}
});
} else {
setTimeout(() => sendMessage(attempt + 1), 1000);
}
};
sendMessage();
});
};log info:
Thank you very much, could you please help me figure out where the problem is?
Metadata
Metadata
Assignees
Labels
No labels