diff --git a/packages/cli/jest.config.js b/packages/cli/jest.config.js index 01c9d384b6..d50166408f 100644 --- a/packages/cli/jest.config.js +++ b/packages/cli/jest.config.js @@ -8,4 +8,5 @@ module.exports = { // There are test cases that change the process working directory, and that does // not work with multiple workers. maxWorkers: 1, + setupFilesAfterEnv: ['/tests/setup.js'], }; diff --git a/packages/cli/package.json b/packages/cli/package.json index 38b2593e72..0c0854f600 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -141,9 +141,11 @@ "pretty-bytes": "^5.6.0", "ps-node": "^0.1.6", "read-pkg-up": "^7.0.1", + "reflect-metadata": "^0.2.2", "semver": "^7.3.5", "sql-formatter": "^4.0.2", "strip-ansi": "^6.0.0", + "tsyringe": "^4.8.0", "validator": "^13.7.0", "w3c-xmlserializer": "^2.0.0", "yargs": "^17.1.1" diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 3332170453..02e000fd02 100755 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -6,6 +6,8 @@ /* eslint-disable func-names */ /* eslint-disable max-classes-per-file */ +import 'reflect-metadata'; + const yargs = require('yargs'); const { promises: fsp, readFileSync } = require('fs'); const { queue } = require('async'); diff --git a/packages/cli/src/cmds/index/rpc.ts b/packages/cli/src/cmds/index/rpc.ts index 5a71f7c6d0..47b1ef87c9 100644 --- a/packages/cli/src/cmds/index/rpc.ts +++ b/packages/cli/src/cmds/index/rpc.ts @@ -10,7 +10,7 @@ import appmapFilter from '../../rpc/appmap/filter'; import { RpcHandler } from '../../rpc/rpc'; import metadata from '../../rpc/appmap/metadata'; import sequenceDiagram from '../../rpc/appmap/sequenceDiagram'; -import { explainHandler, explainStatusHandler, loadThreadHandler } from '../../rpc/explain/explain'; +import { explainHandler, explainStatusHandler } from '../../rpc/explain/explain'; import { buildNavieProvider, commonNavieArgsBuilder as navieBuilder } from '../navie'; import RPCServer from './rpcServer'; import appmapData from '../../rpc/appmap/data'; @@ -27,12 +27,22 @@ import { update } from '../../rpc/file/update'; import { INavieProvider } from '../../rpc/explain/navie/inavie'; import { navieMetadataV1, navieMetadataV2 } from '../../rpc/navie/metadata'; import { navieSuggestHandlerV1 } from '../../rpc/navie/suggest'; -import { initializeHistory } from '../../rpc/explain/navie/historyHelper'; -import History from '../../rpc/explain/navie/history'; -import { join } from 'path'; -import { homedir } from 'os'; import { navieWelcomeV2 } from '../../rpc/navie/welcome'; import { navieRegisterV1 } from '../../rpc/navie/register'; +import { navieThreadSendMessageHandler } from '../../rpc/navie/thread/handlers/sendMessage'; +import { + navieThreadPinItemHandler, + navieThreadUnpinItemHandler, +} from '../../rpc/navie/thread/handlers/pinItem'; +import { navieThreadQueryHandler } from '../../rpc/navie/thread/handlers/query'; +import NavieService from '../../rpc/navie/services/navieService'; +import { ThreadIndexService } from '../../rpc/navie/services/threadIndexService'; +import { container } from 'tsyringe'; +import ThreadService from '../../rpc/navie/services/threadService'; +import { + navieThreadAddMessageAttachmentHandler, + navieThreadRemoveMessageAttachmentHandler, +} from '../../rpc/navie/thread/handlers/messageAttachment'; export const command = 'rpc'; export const describe = 'Run AppMap JSON-RPC server'; @@ -55,6 +65,8 @@ type HandlerArguments = yargs.ArgumentsCamelCase< >; export function rpcMethods(navie: INavieProvider, codeEditor?: string): RpcHandler[] { + const threadService = container.resolve(ThreadService); + const threadIndexService = container.resolve(ThreadIndexService); return [ search(), appmapStatsV1(), @@ -65,7 +77,6 @@ export function rpcMethods(navie: INavieProvider, codeEditor?: string): RpcHandl sequenceDiagram(), explainHandler(navie, codeEditor), explainStatusHandler(), - loadThreadHandler(), update(navie), setConfigurationV1(), getConfigurationV1(), @@ -75,7 +86,13 @@ export function rpcMethods(navie: INavieProvider, codeEditor?: string): RpcHandl navieMetadataV2(), navieSuggestHandlerV1(navie), navieWelcomeV2(navie), - navieRegisterV1(codeEditor), + navieRegisterV1(threadService, codeEditor), + navieThreadSendMessageHandler(threadService), + navieThreadPinItemHandler(threadService), + navieThreadUnpinItemHandler(threadService), + navieThreadQueryHandler(threadIndexService), + navieThreadAddMessageAttachmentHandler(threadService), + navieThreadRemoveMessageAttachmentHandler(threadService), ]; } @@ -83,6 +100,10 @@ export const handler = async (argv: HandlerArguments) => { verbose(argv.verbose); const navie = buildNavieProvider(argv); + + ThreadIndexService.useDefault(); + NavieService.bindNavieProvider(navie); + let codeEditor: string | undefined = argv.codeEditor; if (!codeEditor) { codeEditor = detectCodeEditor(); @@ -92,12 +113,6 @@ export const handler = async (argv: HandlerArguments) => { loadConfiguration(false); await configureRpcDirectories(argv.directory); - { - const history = initializeHistory(); - const oldHistoryDir = join(homedir(), '.appmap', 'navie', 'history'); - await History.migrate(oldHistoryDir, history); - } - const rpcServer = new RPCServer(argv.port, rpcMethods(navie, codeEditor)); rpcServer.start(); }; diff --git a/packages/cli/src/cmds/index/rpcServer.ts b/packages/cli/src/cmds/index/rpcServer.ts index f3ec1bb2cf..0dd109ed29 100644 --- a/packages/cli/src/cmds/index/rpcServer.ts +++ b/packages/cli/src/cmds/index/rpcServer.ts @@ -11,6 +11,7 @@ import { Server } from 'http'; import shadowLocalhost from '../../lib/shadowLocalhost'; import { RpcCallback, RpcHandler, toJaysonRpcError } from '../../rpc/rpc'; +import { sseMiddleware } from '../../rpc/navie/thread/middleware'; const debug = makeDebug('appmap:rpcServer'); @@ -63,6 +64,7 @@ export default class RPCServer { const app = connect(); app.use(cors({ methods: ['POST'] })); app.use(jsonParser()); + app.use(sseMiddleware()); app.use(server.middleware()); const listener = app.listen(this.bindPort, 'localhost'); diff --git a/packages/cli/src/rpc/explain/explain.ts b/packages/cli/src/rpc/explain/explain.ts index 1b8a0c7a6b..8b6ad08f78 100644 --- a/packages/cli/src/rpc/explain/explain.ts +++ b/packages/cli/src/rpc/explain/explain.ts @@ -21,11 +21,8 @@ import configuration, { AppMapDirectory } from '../configuration'; import { getLLMConfiguration } from '../llmConfiguration'; import { RpcError, RpcHandler } from '../rpc'; import collectContext, { buildContextRequest } from './collect-context'; -import { initializeHistory } from './navie/historyHelper'; -import { ThreadAccessError } from './navie/ihistory'; import INavie, { INavieProvider } from './navie/inavie'; import reportFetchError from './navie/report-fetch-error'; -import Thread from './navie/thread'; import handleReview from './review'; const searchStatusByUserMessageId = new Map(); @@ -351,21 +348,6 @@ export function explainStatus(userMessageId: string): ExplainRpc.ExplainStatusRe return searchStatus; } -export async function loadThread(threadId: string): Promise { - const history = initializeHistory(); - - let thread: Thread; - try { - thread = await history.load(threadId); - } catch (e) { - if (e instanceof ThreadAccessError) throw new RpcError(404, `Thread ${threadId} not found`); - - throw e; - } - - return thread; -} - const explainHandler: ( navieProvider: INavieProvider, codeEditor: string | undefined @@ -426,16 +408,4 @@ const explainStatusHandler: () => RpcHandler< }; }; -const loadThreadHandler: () => RpcHandler< - ExplainRpc.LoadThreadOptions, - ExplainRpc.LoadThreadResponse -> = () => { - return { - name: ExplainRpc.ExplainThreadLoadFunctionName, - handler: async (options: ExplainRpc.LoadThreadOptions) => { - return loadThread(options.threadId); - }, - }; -}; - -export { explainHandler, explainStatusHandler, loadThreadHandler }; +export { explainHandler, explainStatusHandler }; diff --git a/packages/cli/src/rpc/explain/navie/history.ts b/packages/cli/src/rpc/explain/navie/history.ts deleted file mode 100644 index 3bda79faa1..0000000000 --- a/packages/cli/src/rpc/explain/navie/history.ts +++ /dev/null @@ -1,331 +0,0 @@ -import { join } from 'path'; -import Thread from './thread'; -import { mkdir, readdir, readFile, readlink, rm, symlink, writeFile } from 'fs/promises'; -import { warn } from 'console'; -import { Message, UserContext } from '@appland/navie'; -import { exists } from '../../../utils'; -import { OpenMode } from 'fs'; -import configuration from '../../configuration'; -import IHistory, { QuestionField, ResponseField, ThreadAccessError } from './ihistory'; - -export const THREAD_ID_REGEX = /^[0-9a-f]{4,16}(-[0-9a-f]{4,16}){3,6}$/; - -type SequenceFile = { timestamp: number; messageId: string }; - -const parseSequenceFile = (dir: string): SequenceFile => ({ - timestamp: parseInt(dir.split('.')[0]), - messageId: dir.split('.')[1], -}); - -export default class History implements IHistory { - private firstResponse = new Map(); - - public constructor(public readonly directory: string) {} - - async question( - threadId: string, - userMessageId: string, - question: string, - codeSelection: UserContext.Context | undefined, - prompt: string | undefined, - extensions: Record = { - question: 'txt', - codeSelection: 'txt', - prompt: 'md', - } - ) { - const threadDir = await this.ensureThreadDir(threadId); - const messageDir = await History.findOrCreateMessageDir(threadDir, userMessageId); - - const writeMessageFile = async (field: QuestionField, content: string) => { - const messageFile = join(messageDir, [field, extensions[field]].join('.')); - await writeFile(messageFile, content); - - const sequenceDir = join(threadDir, 'sequence'); - await mkdir(sequenceDir, { recursive: true }); - const sequenceFile = join( - sequenceDir, - [Date.now().toString(), field, extensions[field]].join('.') - ); - await History.createSymlinkIfNotExists( - join('..', 'messages', userMessageId, [field, extensions[field]].join('.')), - sequenceFile - ); - }; - - await writeMessageFile(QuestionField.Question, question); - if (codeSelection) - await writeMessageFile(QuestionField.CodeSelection, JSON.stringify(codeSelection)); - if (prompt) await writeMessageFile(QuestionField.Prompt, prompt); - - const date = new Date().toISOString().split('T')[0]; - const dateDir = join(this.directory, 'dates', date); - await mkdir(dateDir, { recursive: true }); - const dateThreadFile = join(dateDir, threadId); - await History.createSymlinkIfNotExists(threadDir, dateThreadFile); - } - - async token( - threadId: string, - userMessageId: string, - assistantMessageId: string, - token: string, - extensions: Record = { - answer: 'md', - assistantMessageId: 'txt', - } - ) { - const threadDir = await this.ensureThreadDir(threadId); - const messageDir = await History.findOrCreateMessageDir(threadDir, userMessageId); - const timestamp = Date.now(); - - const writeMessageFile = async ( - field: ResponseField, - content: string, - modeFlag: { flag: OpenMode | undefined } = { flag: 'w' } - ) => { - const messageFile = join(messageDir, [field, extensions[field]].join('.')); - await writeFile(messageFile, content, modeFlag); - - const sequenceDir = join(threadDir, 'sequence'); - await mkdir(sequenceDir, { recursive: true }); - const sequenceFile = join(sequenceDir, [timestamp, field, extensions[field]].join('.')); - await History.createSymlinkIfNotExists( - join('..', 'messages', userMessageId, [field, extensions[field]].join('.')), - sequenceFile - ); - }; - - if (!this.firstResponse.has(assistantMessageId)) { - this.firstResponse.set(assistantMessageId, timestamp); - - await writeMessageFile(ResponseField.AssistantMessageId, assistantMessageId); - await writeMessageFile(ResponseField.Answer, token, { flag: 'a' }); - } else { - const messageFile = join( - messageDir, - [ResponseField.Answer, extensions[ResponseField.Answer]].join('.') - ); - await writeFile(messageFile, token, { flag: 'a' }); - } - } - - async load(threadId: string): Promise { - const threadDir = join(this.directory, 'threads', threadId); - let projectDirectories: string[]; - try { - projectDirectories = (await readFile(join(threadDir, 'projectDirectories.txt'), 'utf-8')) - .split('\n') - .filter(Boolean); - } catch (e) { - throw History.threadAccessError(threadId, 'load', e); - } - - const messagesDir = join(this.directory, 'threads', threadId, 'messages'); - const sequenceDir = join(this.directory, 'threads', threadId, 'sequence'); - - let messageSequenceFiles: string[]; - try { - messageSequenceFiles = await readdir(sequenceDir); - } catch (e) { - throw History.threadAccessError(threadId, 'load', e); - } - - messageSequenceFiles.sort( - (a, b) => parseSequenceFile(a).timestamp - parseSequenceFile(b).timestamp - ); - const contentFileFieldName = (contentFile: string) => contentFile.split('.')[0]; - - const timestamp = - messageSequenceFiles.length > 0 - ? parseSequenceFile(messageSequenceFiles[0]).timestamp - : Date.now(); - const thread = new Thread(threadId, timestamp, projectDirectories); - - const userMessageIds = new Set(); - for (const sequenceFile of messageSequenceFiles) { - const sequenceFilePath = join(sequenceDir, sequenceFile); - // Resolve the file name that the symlink points to. - let messageFile: string; - try { - messageFile = await readlink(sequenceFilePath); - } catch (e) { - warn(e); - continue; - } - const messageFileTokens = messageFile.split('/'); - const userMessageId = messageFileTokens[messageFileTokens.length - 2]; - userMessageIds.add(userMessageId); - } - - for (const userMessageId of userMessageIds) { - const contentFiles = await readdir(join(messagesDir, userMessageId)); - let threadTimestamp: number | undefined; - - const readRecordFile = async (recordName: string): Promise => { - const matchingContentFile = contentFiles.find( - (file) => contentFileFieldName(file) === recordName - ); - if (!matchingContentFile) return undefined; - - const contentFile = join(messagesDir, userMessageId, matchingContentFile); - - // Read the file timestamp to use as the thread timestamp. - try { - const contentFileStat = await readFile(contentFile, 'utf-8'); - const messageTimestamp = parseInt(contentFileStat.split('.')[0]); - if (!threadTimestamp || messageTimestamp < threadTimestamp) - threadTimestamp = messageTimestamp; - } catch (e) { - throw History.threadAccessError(threadId, 'read', e); - } - - try { - return await readFile(contentFile, 'utf-8'); - } catch (e) { - throw History.threadAccessError(threadId, 'read', e); - } - }; - - const question = await readRecordFile('question'); - const rawCodeSelection = await readRecordFile('codeSelection'); - const codeSelection = rawCodeSelection ? JSON.parse(rawCodeSelection) : undefined; - const prompt = await readRecordFile('prompt'); - const assistantMessageId = await readRecordFile('assistantMessageId'); - const answer = await readRecordFile('answer'); - - if (userMessageId && question) - thread.question( - threadTimestamp ?? Date.now(), - userMessageId, - question, - codeSelection, - prompt - ); - if (userMessageId && assistantMessageId && answer) { - thread.answer(userMessageId, assistantMessageId, answer); - } - } - - return thread; - } - - // Message are stored within threadDir/message in subdirectories. Each subdirectory is named - // by joining the timestamp and the user message id with a period. This naming convention - // makes it easy to view and sort the messages in chronological order. - private static async findOrCreateMessageDir( - threadDir: string, - userMessageId: string - ): Promise { - // List message directories in the thread directory. - const messagesDir = join(threadDir, 'messages'); - await mkdir(messagesDir, { recursive: true }); - - let messageIds: string[]; - try { - messageIds = await readdir(messagesDir); - } catch (e) { - throw this.threadAccessError(threadDir, 'initialize storage for', e); - } - - // Find the message directory for the user message. - if (messageIds.includes(userMessageId)) { - return join(messagesDir, userMessageId); - } - - const messagePath = join(messagesDir, userMessageId); - await mkdir(messagePath, { recursive: true }); - return messagePath; - } - - private static async createSymlinkIfNotExists(target: string, path: string) { - try { - await readlink(path); - } catch (e) { - const err = e as Error & { code?: string }; - if (err.code === 'ENOENT') { - await symlink(target, path); - } else if (err.code === 'EEXIST') { - // Symlink already exists, do nothing. - } else { - throw err; - } - } - } - - private async ensureThreadDir(threadId: string): Promise { - const threadDir = join(this.directory, 'threads', threadId); - await mkdir(threadDir, { recursive: true }); - const projectDirectoriesFile = join(threadDir, 'projectDirectories.txt'); - if (!(await exists(projectDirectoriesFile))) { - const projectDirectories = configuration().projectDirectories; - await writeFile(projectDirectoriesFile, projectDirectories.join('\n')); - } - return threadDir; - } - - static threadAccessError(threadId: string, action: string, e: any): ThreadAccessError { - const err = e as Error & { code?: string }; - if (err.code === 'ENOENT') warn(`Thread ${threadId} not found`); - - return new ThreadAccessError(threadId, action, e instanceof Error ? e : undefined); - } - - // In the old-style history format, threads are stored in files named by their thread id - // and timestamp. In the new-style history format, threads are stored in files named by - // their thread id, and the thread file is symlinked to a file named for the date of the thread. - static async migrate( - oldDirectory: string, - history: IHistory, - options: { cleanup: boolean } = { cleanup: true } - ): Promise { - // List the contents of the old directory. Each one is a threadId directory, containing - // timestamped messages. - const threadIds = (await readdir(oldDirectory)).filter( - // Match UUIDs like 5278527e-c4ed-4e45-9fb6-372ed6a036f6 in the thread dir - // Being careful not to touch anything else, like the dates and threads directories that are - // created in the new history format. - (dir) => dir.match(THREAD_ID_REGEX) - ); - - const threadFileAsTimestamp = (threadFile: string) => parseInt(threadFile.split('.')[0]); - - for (const threadId of threadIds) { - warn(`[history] Migrating thread ${threadId} from old history format`); - - const oldThreadDir = join(oldDirectory, threadId); - const threadFiles = await readdir(oldThreadDir); - threadFiles.sort((a, b) => threadFileAsTimestamp(a) - threadFileAsTimestamp(b)); - - let lastUserMessageId: string | undefined; - for (const threadFile of threadFiles) { - const timestamp = threadFileAsTimestamp(threadFile); - // MessageIds are not available, so fake them with timestamps. - const messageId = timestamp.toString(); - const messageFile = join(oldThreadDir, threadFile); - const messageStr = await readFile(messageFile, 'utf-8'); - try { - const message = JSON.parse(messageStr) as Message; - if (message.role === 'user') { - lastUserMessageId = messageId; - // codeSelection and prompt are not available in the old format. - await history.question(threadId, messageId, message.content, undefined, undefined); - } else if (message.role === 'assistant' || message.role === 'system') { - if (lastUserMessageId) - await history.token(threadId, lastUserMessageId, messageId, message.content); - } - } catch (e) { - warn(`[history] Failed to parse message from ${messageFile}. Skipping.`); - warn(e); - } - - // Project directories are unknown, so overwrite the file contents with empty text. - const newThreadDir = join(history.directory, 'threads', threadId); - if (await exists(newThreadDir)) - await writeFile(join(newThreadDir, 'projectDirectories.txt'), ''); - } - - if (options.cleanup) await rm(oldThreadDir, { recursive: true }); - } - } -} diff --git a/packages/cli/src/rpc/explain/navie/historyHelper.ts b/packages/cli/src/rpc/explain/navie/historyHelper.ts deleted file mode 100644 index 7b3885d1ef..0000000000 --- a/packages/cli/src/rpc/explain/navie/historyHelper.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { homedir } from 'os'; -import { mkdirSync, existsSync } from 'fs'; -import History from './history'; -import Thread from './thread'; -import { join } from 'path'; -import { warn } from 'console'; -import configuration from '../../configuration'; -import IHistory, { ThreadAccessError } from './ihistory'; -import HistoryWindows from './historyWindows'; - -export function initializeHistory(): IHistory { - const historyDir = join(homedir(), '.appmap', 'navie', 'history'); - if (!existsSync(historyDir)) { - mkdirSync(historyDir, { recursive: true }); - } - return process.platform === 'win32' ? new HistoryWindows(historyDir) : new History(historyDir); -} - -export async function loadThread(history: IHistory, threadId: string): Promise { - let thread: Thread; - - try { - thread = await history.load(threadId); - } catch (e) { - if (e instanceof ThreadAccessError) { - warn(`[remote-navie] Creating new thread ${threadId} (thread not found)`); - const projectDirectories = configuration().projectDirectories; - thread = new Thread(threadId, Date.now(), projectDirectories); - } else { - throw e; - } - } - - return thread; -} diff --git a/packages/cli/src/rpc/explain/navie/historyWindows.ts b/packages/cli/src/rpc/explain/navie/historyWindows.ts deleted file mode 100644 index a507bdf627..0000000000 --- a/packages/cli/src/rpc/explain/navie/historyWindows.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { ThreadAccessError } from './ihistory'; -import IHistory from './ihistory'; -import Thread from './thread'; - -export default class HistoryWindows implements IHistory { - constructor(public readonly directory: string) { - console.warn('History is currently disabled. It is not yet implemented on Windows.'); - } - - token(): void { - // Do nothing - } - - question(): void { - // Do nothing - } - - load(threadId: string): Thread { - throw new ThreadAccessError(threadId, 'load', new Error('Not implemented on Windows')); - } -} diff --git a/packages/cli/src/rpc/explain/navie/ihistory.ts b/packages/cli/src/rpc/explain/navie/ihistory.ts deleted file mode 100644 index c7114e0101..0000000000 --- a/packages/cli/src/rpc/explain/navie/ihistory.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { UserContext } from '@appland/navie'; -import Thread from './thread'; - -export class ThreadAccessError extends Error { - constructor(public readonly threadId: string, public action: string, public cause?: Error) { - super(ThreadAccessError.errorMessage(threadId, action, cause)); - } - - static errorMessage(threadId: string, action: string, cause?: Error): string { - const messages = [`Failed to ${action} thread ${threadId}`]; - if (cause) messages.push(cause.message); - return messages.join(': '); - } -} - -export enum QuestionField { - Question = 'question', - CodeSelection = 'codeSelection', - Prompt = 'prompt', -} - -export enum ResponseField { - AssistantMessageId = 'assistantMessageId', - Answer = 'answer', -} - -export default interface IHistory { - readonly directory: string; - - token( - threadId: string, - userMessageId: string, - assistantMessageId: string, - token: string, - extensions?: Record - ): void | Promise; - - question( - threadId: string, - userMessageId: string, - question: string, - codeSelection: UserContext.Context | undefined, - prompt: string | undefined, - extensions?: Record - ): void | Promise; - - load(threadId: string): Thread | Promise; -} diff --git a/packages/cli/src/rpc/explain/navie/navie-local.ts b/packages/cli/src/rpc/explain/navie/navie-local.ts index 52ef2c6629..9ced8d4a8c 100644 --- a/packages/cli/src/rpc/explain/navie/navie-local.ts +++ b/packages/cli/src/rpc/explain/navie/navie-local.ts @@ -15,9 +15,10 @@ import { } from '@appland/client'; import reportFetchError from './report-fetch-error'; import assert from 'assert'; -import { initializeHistory, loadThread } from './historyHelper'; -import { THREAD_ID_REGEX } from './history'; import Trajectory from './trajectory'; +import { verbose } from '../../../utils'; +import { container } from 'tsyringe'; +import ThreadService from '../../navie/services/threadService'; const OPTION_SETTERS: Record< string, @@ -54,8 +55,6 @@ export default class LocalNavie extends EventEmitter implements INavie { // Sets a thread id to use with the request. // The caller is responsible for ensuring that the thread id is a unique, valid uuid. setThreadId(threadId: string) { - if (!THREAD_ID_REGEX.test(threadId)) throw new Error(`Invalid thread id: ${threadId}`); - this.assignedThreadId = threadId; } @@ -123,9 +122,6 @@ export default class LocalNavie extends EventEmitter implements INavie { )?.id ?? randomUUID(); } - const history = initializeHistory(); - const thread = await loadThread(history, threadId); - this.#reportConfigTelemetry(); log(`[local-navie] Processing question ${userMessageId} in thread ${threadId}`); this.emit('ack', userMessageId, threadId); @@ -137,9 +133,19 @@ export default class LocalNavie extends EventEmitter implements INavie { prompt, }; - await history.question(threadId, userMessageId, question, codeSelection, prompt); - const startTime = Date.now(); + let chatHistory: Navie.ChatHistory = []; + try { + const threadService = container.resolve(ThreadService); + const thread = await threadService.getThread(threadId); + chatHistory = thread.getChatHistory(); + } catch (e) { + if (verbose()) { + console.warn(`Failed to load thread ${threadId}: ${e}`); + } + // If the thread failed to load, history will be empty. + // This is typically expected if the thread is new. + } const navieFn = navie( clientRequest, @@ -147,7 +153,7 @@ export default class LocalNavie extends EventEmitter implements INavie { this.projectInfoProvider, this.helpProvider, this.navieOptions, - thread.messages + chatHistory ); let agentName: string | undefined; @@ -162,7 +168,6 @@ export default class LocalNavie extends EventEmitter implements INavie { const response = new Array(); for await (const token of navieFn.execute()) { response.push(token); - await history.token(threadId, userMessageId, agentMessageId, token); this.emit('token', token, agentMessageId); } const endTime = Date.now(); diff --git a/packages/cli/src/rpc/explain/navie/navie-remote.ts b/packages/cli/src/rpc/explain/navie/navie-remote.ts index 31a2a2f30d..e299f1e694 100644 --- a/packages/cli/src/rpc/explain/navie/navie-remote.ts +++ b/packages/cli/src/rpc/explain/navie/navie-remote.ts @@ -6,53 +6,17 @@ import { ContextV1, ContextV2, Help, ProjectInfo } from '@appland/navie'; import { verbose } from '../../../utils'; import { default as INavie } from './inavie'; import assert from 'assert'; -import { initializeHistory, loadThread } from './historyHelper'; -import Thread from './thread'; -import IHistory from './ihistory'; -export class RemtoteCallbackHandler { - thread: Thread | undefined; +export class RemoteCallbackHandler { userMessageId: string | undefined; assistantMessageId: string | undefined; - tokens: string[] = []; constructor( - private readonly history: IHistory, private readonly contextProvider: ContextV2.ContextProvider, private readonly projectInfoProvider: ProjectInfo.ProjectInfoProvider, private readonly helpProvider: Help.HelpProvider ) {} - async onAck( - assignedUserMessageId: string, - threadId: string, - question: string, - codeSelection?: string, - prompt?: string - ): Promise { - if (verbose()) - warn(`Explain received ack (userMessageId=${assignedUserMessageId}, threadId=${threadId})`); - - this.thread = await loadThread(this.history, threadId); - this.userMessageId = assignedUserMessageId; - await this.history.question(threadId, this.userMessageId, question, codeSelection, prompt); - } - - async onToken(token: string, messageId: string): Promise { - if (!this.assistantMessageId) this.assistantMessageId = messageId; - - this.tokens.push(token); - - if (this.thread && this.userMessageId) - await this.history.token( - this.thread.threadId, - this.userMessageId, - this.assistantMessageId, - token - ); - else warn(`[remote-navie] Received token but no thread is available to store it`); - } - async onRequestContext( data: Record ): Promise | unknown[]> { @@ -124,8 +88,6 @@ export class RemtoteCallbackHandler { } export default class RemoteNavie extends EventEmitter implements INavie { - private history = initializeHistory(); - constructor( private contextProvider: ContextV2.ContextProvider, private projectInfoProvider: ProjectInfo.ProjectInfoProvider, @@ -143,22 +105,18 @@ export default class RemoteNavie extends EventEmitter implements INavie { } async ask(threadId: string, question: string, codeSelection?: string, prompt?: string) { - const callbackHandler = new RemtoteCallbackHandler( - this.history, + const callbackHandler = new RemoteCallbackHandler( this.contextProvider, this.projectInfoProvider, this.helpProvider ); const onAck = async (userMessageId: string, threadId: string) => { - await callbackHandler.onAck(userMessageId, threadId, question, codeSelection, prompt); - this.emit('ack', userMessageId, threadId); }; const onToken = async (token: string, messageId: string): Promise => { this.emit('token', token, messageId); - await callbackHandler.onToken(token, messageId); }; const onRequestContext = async ( @@ -169,13 +127,11 @@ export default class RemoteNavie extends EventEmitter implements INavie { const onComplete = () => { callbackHandler.onComplete(); - this.emit('complete'); }; const onError = (err: Error) => { callbackHandler.onError(err); - this.emit('error', err); }; diff --git a/packages/cli/src/rpc/explain/navie/thread.ts b/packages/cli/src/rpc/explain/navie/thread.ts deleted file mode 100644 index 018b1334ce..0000000000 --- a/packages/cli/src/rpc/explain/navie/thread.ts +++ /dev/null @@ -1,122 +0,0 @@ -import { warn } from 'console'; - -import { ExplainRpc } from '@appland/rpc'; -import configuration from '../../configuration'; -import { getLLMConfiguration } from '../../llmConfiguration'; - -export type Message = ExplainRpc.Message; - -export type Question = ExplainRpc.Question; - -export type Answer = ExplainRpc.Answer; - -export class Exchange { - readonly question: Question; - answer?: Answer; - - constructor( - timestamp: number, - messageId: string, - question: string, - codeSelection?: string, - prompt?: string - ) { - this.question = { - timestamp, - messageId, - content: question, - role: 'user', - codeSelection, - prompt, - }; - } - - setAnswer(messageId: string, answer: string) { - this.answer = { - messageId, - content: answer, - role: 'assistant', - }; - } -} - -export type ThreadData = ExplainRpc.Thread; - -export default class Thread implements ThreadData { - public readonly exchanges: Exchange[] = []; - - public constructor( - public readonly threadId: string, - public readonly timestamp: number, - public readonly projectDirectories: string[] - ) {} - - get messages(): Message[] { - const result = new Array(); - for (const exchange of this.exchanges) { - result.push(exchange.question); - if (exchange.answer) result.push(exchange.answer); - } - return result; - } - - /** - * Gets the thread timestamp formatted as 'YYYY-mm-dd'. - */ - date() { - const date = new Date(this.timestamp); - return date.toISOString().split('T')[0]; - } - - question( - timestamp: number, - messageId: string, - question: string, - codeSelection?: string, - prompt?: string - ) { - const exchange = new Exchange(timestamp, messageId, question, codeSelection, prompt); - this.exchanges.push(exchange); - } - - answer(userMessageId: string, messageId: string, answer: string) { - const exchange = this.exchanges[this.exchanges.length - 1]; - if (!exchange) { - warn(`[history/thread] No question to answer for message ${messageId}`); - return; - } - if (exchange.question.messageId !== userMessageId) { - warn(`[history/thread] Received an answer to a different question than the last one asked`); - return; - } - exchange.setAnswer(messageId, answer); - } - - asJSON(): ThreadData { - return { - timestamp: this.timestamp, - projectDirectories: this.projectDirectories, - exchanges: this.exchanges, - }; - } - - static fromJSON(threadId: string, data: ThreadData): Thread { - const thread = new Thread(threadId, data.timestamp, data.projectDirectories); - for (const exchange of data.exchanges) { - thread.question( - exchange.question.timestamp, - exchange.question.messageId, - exchange.question.content, - exchange.question.codeSelection, - exchange.question.prompt - ); - if (exchange.answer) - thread.answer( - exchange.question.messageId, - exchange.answer.messageId, - exchange.answer.content - ); - } - return thread; - } -} diff --git a/packages/cli/src/rpc/explain/review.ts b/packages/cli/src/rpc/explain/review.ts index 3e202ed30e..8b4d46181e 100644 --- a/packages/cli/src/rpc/explain/review.ts +++ b/packages/cli/src/rpc/explain/review.ts @@ -10,7 +10,7 @@ import { getDiffLog, getWorkingDiff } from '../../lib/git'; export default async function handleReview( question: string, userContext?: UserContext.Context -): Promise<{ applied: boolean; userContext?: UserContext.Context }> { +): Promise<{ applied: boolean; userContext?: UserContext.ContextItem[] }> { const [mode] = question.split(/\s+/g); if (mode !== '@review') return { applied: false }; diff --git a/packages/cli/src/rpc/navie/register.ts b/packages/cli/src/rpc/navie/register.ts index 53e3f34a05..64bcf8cd30 100644 --- a/packages/cli/src/rpc/navie/register.ts +++ b/packages/cli/src/rpc/navie/register.ts @@ -12,8 +12,10 @@ import { RpcHandler } from '../rpc'; import { getLLMConfiguration } from '../llmConfiguration'; import detectAIEnvVar from '../../cmds/index/aiEnvVar'; import configuration from '../configuration'; +import ThreadService from './services/threadService'; export async function register( + threadService: ThreadService, codeEditor: string | undefined ): Promise { const modelParameters = { @@ -40,17 +42,19 @@ export async function register( if (codeEditor) projectParameters.codeEditor = codeEditor; const thread = await AI.createConversationThread({ modelParameters, projectParameters }); + threadService.registerThread(thread); return { thread }; } export function navieRegisterV1( + threadService: ThreadService, codeEditor?: string ): RpcHandler { return { name: NavieRpc.V1.Register.Method, handler: async () => { - return register(codeEditor); + return register(threadService, codeEditor); }, }; } diff --git a/packages/cli/src/rpc/navie/services/contextService.ts b/packages/cli/src/rpc/navie/services/contextService.ts new file mode 100644 index 0000000000..a7436fe61a --- /dev/null +++ b/packages/cli/src/rpc/navie/services/contextService.ts @@ -0,0 +1,69 @@ +import { ContextV2, Help, ProjectInfo } from '@appland/navie'; +import { basename } from 'path'; +import collectContext, { buildContextRequest } from '../../explain/collect-context'; +import collectProjectInfos from '../../../cmds/navie/projectInfo'; +import collectHelp from '../../../cmds/navie/help'; +import detectCodeEditor from '../../../lib/detectCodeEditor'; +import configuration from '../../configuration'; +import { autoInjectable } from 'tsyringe'; + +@autoInjectable() +export class ContextService { + private readonly codeEditor?: string; + + constructor() { + this.codeEditor = detectCodeEditor(); + } + + async searchContext(data: ContextV2.ContextRequest): Promise { + const { vectorTerms, tokenCount, labels = [] } = data; + const keywords = [...(vectorTerms || [])]; + const config = configuration(); + const appmapDirectories = await config + .appmapDirectories() + .then((dirs) => dirs.map((d) => d.directory)); + const projectDirectories = config.projectDirectories; + + if (keywords.length > 0) { + if ( + labels.find( + (label) => + (label.name === ContextV2.ContextLabelName.Architecture || + label.name === ContextV2.ContextLabelName.Overview) && + label.weight === ContextV2.ContextLabelWeight.High + ) + ) { + keywords.push('architecture', 'design', 'readme', 'about', 'overview'); + projectDirectories.forEach((dir) => keywords.push(basename(dir))); + } + } + + const charLimit = tokenCount * 3; + const contextRequest = buildContextRequest( + appmapDirectories, + projectDirectories, + undefined, + keywords, + charLimit, + data + ); + + const searchResult = await collectContext( + appmapDirectories, + projectDirectories, + charLimit, + contextRequest.vectorTerms, + contextRequest.request + ); + + return searchResult.context; + } + + async projectInfoContext(): Promise { + return collectProjectInfos(this.codeEditor); + } + + helpContext(data: Help.HelpRequest): Promise { + return collectHelp(data); + } +} diff --git a/packages/cli/src/rpc/navie/services/navieService.ts b/packages/cli/src/rpc/navie/services/navieService.ts new file mode 100644 index 0000000000..cd3122c021 --- /dev/null +++ b/packages/cli/src/rpc/navie/services/navieService.ts @@ -0,0 +1,75 @@ +import { ContextV2, Help, ProjectInfo } from '@appland/navie'; +import INavie, { INavieProvider } from '../../explain/navie/inavie'; +import { ContextService } from './contextService'; +import { randomUUID } from 'node:crypto'; +import EventEmitter from 'node:events'; +import { autoInjectable, container, inject } from 'tsyringe'; + +interface ContextEvent { + type: Type; + id: string; + request: Req; + response?: Res; + complete?: boolean; +} + +type ContextEvents = + | ContextEvent<'context', ContextV2.ContextRequest, ContextV2.ContextResponse> + | ContextEvent<'help', Help.HelpRequest, Help.HelpResponse> + | ContextEvent<'project-info', undefined, ProjectInfo.ProjectInfoResponse>; + +export interface ContextEmitter { + on(event: 'context', listener: (event: ContextEvents) => void): this; +} + +function contextEvent< + Req extends ContextV2.ContextRequest | Help.HelpRequest | ProjectInfo.ProjectInfoRequest, + Res extends ContextV2.ContextResponse | Help.HelpResponse | ProjectInfo.ProjectInfoResponse +>( + emitter: EventEmitter, + type: ContextEvents['type'], + fn: (req: Req) => Promise +): (req: Req) => Promise { + return async (req: Req) => { + const requestId = randomUUID(); + emitter.emit('context', { type, id: requestId, request: req } as ContextEvents); + const result = await fn(req); + emitter.emit('context', { + type, + id: requestId, + request: req, + response: result, + complete: true, + } as ContextEvents); + return result; + }; +} + +@autoInjectable() +export default class NavieService { + public static NAVIE_PROVIDER = 'INavieProvider'; + + constructor( + @inject(ContextService) private readonly contextService: ContextService, + @inject(NavieService.NAVIE_PROVIDER) public readonly navieProvider: INavieProvider + ) {} + + static bindNavieProvider(navieProvider?: INavieProvider) { + container.registerInstance(this.NAVIE_PROVIDER, navieProvider); + } + + getNavie(_navieProvider?: INavieProvider): [INavie, ContextEmitter] { + const navieProvider = _navieProvider ?? this.navieProvider; + const contextEmitter = new EventEmitter(); + const navie = navieProvider( + contextEvent(contextEmitter, 'context', this.contextService.searchContext.bind(this)), + contextEvent( + contextEmitter, + 'project-info', + this.contextService.projectInfoContext.bind(this) + ), + contextEvent(contextEmitter, 'help', this.contextService.helpContext.bind(this)) + ); + return [navie, contextEmitter as unknown as ContextEmitter]; + } +} diff --git a/packages/cli/src/rpc/navie/services/threadIndexService.ts b/packages/cli/src/rpc/navie/services/threadIndexService.ts new file mode 100644 index 0000000000..92a4d760be --- /dev/null +++ b/packages/cli/src/rpc/navie/services/threadIndexService.ts @@ -0,0 +1,175 @@ +import sqlite3 from 'better-sqlite3'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; +import configuration from '../../configuration'; +import { container, inject, injectable, singleton } from 'tsyringe'; + +const INITIALIZE_SQL = ` +PRAGMA foreign_keys = ON; + +CREATE TABLE IF NOT EXISTS threads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + uuid TEXT NOT NULL UNIQUE, + path TEXT NOT NULL, + title TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT uuid_format CHECK (length(uuid) = 36) +); + +CREATE INDEX IF NOT EXISTS idx_created_at ON threads (created_at); +CREATE INDEX IF NOT EXISTS idx_uuid ON threads (uuid); + +CREATE TABLE IF NOT EXISTS project_directories ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + path TEXT NOT NULL, + thread_id TEXT NOT NULL, + FOREIGN KEY(thread_id) REFERENCES threads(uuid) ON DELETE CASCADE, + UNIQUE (thread_id, path) +); + +CREATE INDEX IF NOT EXISTS idx_thread_id ON project_directories (thread_id); +`; + +const QUERY_INSERT_THREAD_SQL = `INSERT INTO threads (uuid, path, title) VALUES (?, ?, ?) +ON CONFLICT (uuid) DO UPDATE SET updated_at = CURRENT_TIMESTAMP, title = ?`; +const QUERY_DELETE_THREAD_SQL = `DELETE FROM threads WHERE uuid = ?`; +const QUERY_INSERT_PROJECT_DIRECTORY_SQL = `INSERT INTO project_directories (thread_id, path) VALUES (?, ?) +ON CONFLICT (thread_id, path) DO NOTHING`; + +interface QueryOptions { + uuid?: string; + maxCreatedAt?: Date; + orderBy?: 'created_at' | 'updated_at' | 'title'; + limit?: number; + offset?: number; + projectDirectories?: string[]; +} + +export interface ThreadIndexItem { + id: string; + path: string; + title: string; + created_at: string; + updated_at: string; +} + +/** + * A service for managing the conversation thread index. The thread index is a SQLite database that + * stores information about threads, such as their path and title. + */ +@singleton() +@injectable() +export class ThreadIndexService { + private queryInsert: sqlite3.Statement; + private queryDelete: sqlite3.Statement; + private queryInsertProjectDirectory: sqlite3.Statement; + + static readonly DEFAULT_DATABASE_PATH = join(homedir(), '.appmap', 'navie', 'thread-index.db'); + static readonly DATABASE = 'ThreadIndexDatabase'; + + constructor(@inject(ThreadIndexService.DATABASE) private readonly db: sqlite3.Database) { + this.db.exec(INITIALIZE_SQL); + + this.queryInsert = this.db.prepare(QUERY_INSERT_THREAD_SQL); + this.queryDelete = this.db.prepare(QUERY_DELETE_THREAD_SQL); + this.queryInsertProjectDirectory = this.db.prepare(QUERY_INSERT_PROJECT_DIRECTORY_SQL); + } + + /** + * Binds the database to a sqlite3 instance on disk at the default database path + */ + static useDefault() { + const db = new sqlite3(this.DEFAULT_DATABASE_PATH); + container.registerInstance(this.DATABASE, db); + } + + /** + * Indexes a thread with the given path and title. If the thread is already indexed, it will be + * updated. + * + * @param threadId The thread ID + * @param path The path to the thread + * @param title The title of the thread + */ + index(threadId: string, path: string, title?: string) { + const { projectDirectories } = configuration(); + this.db.transaction(() => { + this.queryInsert.run(threadId, path, title, title); + + // Project directories are written on every index update. + // + // This is likely not necessary but it'll handle the edge case where an additional project + // directory is added mid-way through a conversation. + projectDirectories.forEach((projectDirectory) => { + this.queryInsertProjectDirectory.run(threadId, projectDirectory); + }); + })(); + return; + } + + /** + * Deletes a thread from the index. + * + * @param threadId The thread ID + */ + delete(threadId: string) { + return this.queryDelete.run(threadId); + } + + /** + * Queries the index for threads. + * + * @param options The options to query with + * @returns The threads that match the query + */ + query(options: QueryOptions): ThreadIndexItem[] { + let queryString = `SELECT uuid as id, threads.path, title, created_at, updated_at FROM threads`; + const params: unknown[] = []; + if (options.uuid) { + queryString += ` WHERE uuid = ?`; + params.push(options.uuid); + } + if (options.maxCreatedAt) { + const appendKeyword = queryString.includes('WHERE') ? 'AND' : 'WHERE'; + queryString += ` ${appendKeyword} created_at > ?`; + params.push(options.maxCreatedAt.toISOString()); + } + if (options.orderBy) { + if (!['created_at', 'updated_at', 'title'].includes(options.orderBy)) { + throw new Error(`invalid orderBy option: ${options.orderBy}`); + } + // Note that this parameter is not escaped. It's validated directly above. + queryString += ` ORDER BY threads.${options.orderBy} DESC`; + } + if (options.limit) { + queryString += ` LIMIT ?`; + params.push(options.limit); + } + if (options.offset) { + if (!Number.isInteger(options.limit)) { + throw new Error(`offset cannot be used without a limit`); + } + queryString += ` OFFSET ?`; + params.push(options.offset); + } + if (options.projectDirectories) { + if (options.projectDirectories.length === 0) { + // If `projectDirectories` is an empty array, we want to return all threads that have no + // project directories associated with them. This is edge-casey, but this does occur in + // development. + // + // If you're looking to query for threads with any project directory, leave `projectDirectories` + // as `undefined`. + queryString += ` LEFT JOIN project_directories ON uuid = thread_id WHERE project_directories.path IS NULL`; + } else { + queryString += ` INNER JOIN project_directories ON uuid = thread_id WHERE project_directories.path IN (${options.projectDirectories + .map(() => '?') + .join(',')})`; + params.push(...options.projectDirectories); + } + } + const query = this.db.prepare(queryString); + return query.all(...params) as ThreadIndexItem[]; + } +} diff --git a/packages/cli/src/rpc/navie/services/threadService.ts b/packages/cli/src/rpc/navie/services/threadService.ts new file mode 100644 index 0000000000..8e148e17d8 --- /dev/null +++ b/packages/cli/src/rpc/navie/services/threadService.ts @@ -0,0 +1,91 @@ +import { Thread } from '../thread'; +import { ConversationThread } from '@appland/client'; +import { ThreadIndexService } from './threadIndexService'; +import { inject, injectable, singleton } from 'tsyringe'; +import { NavieThreadInitEvent, TimestampNavieEvent } from '../thread/events'; +import { readFile } from 'node:fs/promises'; +import NavieService from './navieService'; + +type ThreadId = string; + +/** + * A service for managing Navie conversation threads. Threads are stored in memory and can be + * retrieved from memory or loaded from disk if they are not in memory. + */ +@singleton() +@injectable() +export default class ThreadService { + private readonly memoryThreads = new Map(); + + constructor( + @inject(ThreadIndexService) private readonly threadIndexService: ThreadIndexService, + @inject(NavieService) private readonly navieService: NavieService + ) {} + + /** + * Returns a thread from memory or loads it from disk if it's not yet in memory. If the thread is + * not found, or the load fails, an error will be thrown. + * @param threadId the thread identifier to retrieve + * @returns the thread + */ + async getThread(threadId: string): Promise { + let thread = this.memoryThreads.get(threadId); + if (!thread) { + try { + thread = await this.loadFromDisk(threadId); + } catch (e) { + // misbehaving threads will be deleted from the index + // they probably no longer exist + this.threadIndexService.delete(threadId); + throw e; + } + this.memoryThreads.set(thread.conversationThread.id, thread); + } + return thread; + } + + async loadFromDisk(threadId: string): Promise { + const historyFilePath = Thread.getHistoryFilePath(threadId); + let initEvent: NavieThreadInitEvent | undefined; + const eventLog: TimestampNavieEvent[] = []; + + try { + const jsonLines = await readFile(historyFilePath, 'utf-8').then((data) => data.split('\n')); + for (const json of jsonLines) { + if (json.length === 0) continue; + try { + const event = JSON.parse(json) as TimestampNavieEvent; + if (!initEvent && event.type === 'thread-init') { + initEvent = event; + } + eventLog.push(event); + } catch (e) { + console.error('Failed to parse event', json, e); + } + } + } catch (e) { + throw new Error(`Failed to load history file ${historyFilePath}: ${String(e)}`); + } + + if (!initEvent) throw new Error('Thread init event not found'); + + return new Thread(initEvent.conversationThread, this.navieService, eventLog); + } + + /** + * Registers a thread with the service. This will initialize the thread and add it to memory. If + * the thread is already registered, an error will be thrown. + * + * @param conversationThread The thread to register + */ + registerThread(conversationThread: ConversationThread) { + if (this.memoryThreads.has(conversationThread.id)) { + throw new Error(`Thread ${conversationThread.id} is already registered`); + } + + const thread = new Thread(conversationThread, this.navieService); + thread.initialize(); + + this.memoryThreads.set(conversationThread.id, thread); + } +} diff --git a/packages/cli/src/rpc/navie/suggest.ts b/packages/cli/src/rpc/navie/suggest.ts index 93880446ba..56d28c2852 100644 --- a/packages/cli/src/rpc/navie/suggest.ts +++ b/packages/cli/src/rpc/navie/suggest.ts @@ -5,7 +5,7 @@ import { INavieProvider } from '../explain/navie/inavie'; // We don't want to support context lookups const NOP = () => Promise.resolve([]); -function getSuggestions( +export function getSuggestions( navieProvider: INavieProvider, threadId: string ): Promise { diff --git a/packages/cli/src/rpc/navie/thread/events.ts b/packages/cli/src/rpc/navie/thread/events.ts new file mode 100644 index 0000000000..6b785fcf69 --- /dev/null +++ b/packages/cli/src/rpc/navie/thread/events.ts @@ -0,0 +1,102 @@ +import { ConversationThread } from '@appland/client'; +import { ContextV2, Help, ProjectInfo } from '@appland/navie'; +import { NavieRpc } from '@appland/rpc'; + +export type Timestamp = { + time: number; +}; + +export type NavieThreadInitEvent = { + type: 'thread-init'; + conversationThread: ConversationThread; +}; +export type NavieTokenMetadataEvent = { + type: 'token-metadata'; + codeBlockId: string; + metadata: Record; +}; + +export type NavieTokenEvent = { + type: 'token'; + messageId: string; + token: string; + codeBlockId?: string; +}; + +export type NavieMessageEvent = { + type: 'message'; + role: 'system' | 'assistant' | 'user'; + messageId: string; + content: string; +}; + +export type NaviePromptSuggestionsEvent = { + type: 'prompt-suggestions'; + suggestions: NavieRpc.V1.Suggest.Response; + messageId: string; +}; + +export type NavieMessageCompleteEvent = { + type: 'message-complete'; + messageId: string; +}; + +export type NaviePinItemEvent = PinnedItem & { + type: 'pin-item'; +}; + +export type NavieUnpinItemEvent = PinnedItem & { + type: 'unpin-item'; +}; + +export type NavieErrorEvent = { + type: 'error'; + error: unknown; +}; + +export type NavieBeginContextSearchEvent = { + type: 'begin-context-search'; + contextType: 'help' | 'project-info' | 'context'; + id: string; + request?: Help.HelpRequest | ProjectInfo.ProjectInfoRequest | ContextV2.ContextRequest; +}; + +export type NavieCompleteContextSearchEvent = { + type: 'complete-context-search'; + id: string; + result?: Help.HelpResponse | ProjectInfo.ProjectInfoResponse | ContextV2.ContextResponse; +}; + +export type NavieAddMessageAttachmentEvent = { + type: 'add-message-attachment'; + attachmentId: string; + uri?: string; + content?: string; +}; + +export type NavieRemoveMessageAttachmentEvent = { + type: 'remove-message-attachment'; + attachmentId: string; +}; + +export type NavieEvent = + | NavieBeginContextSearchEvent + | NavieCompleteContextSearchEvent + | NavieErrorEvent + | NavieMessageEvent + | NavieMessageCompleteEvent + | NaviePinItemEvent + | NavieUnpinItemEvent + | NaviePromptSuggestionsEvent + | NavieThreadInitEvent + | NavieTokenEvent + | NavieTokenMetadataEvent + | NavieAddMessageAttachmentEvent + | NavieRemoveMessageAttachmentEvent; + +export type TimestampNavieEvent = Timestamp & NavieEvent; + +export type PinnedItem = { + uri?: string; + handle?: string; +}; diff --git a/packages/cli/src/rpc/navie/thread/handlers/messageAttachment.ts b/packages/cli/src/rpc/navie/thread/handlers/messageAttachment.ts new file mode 100644 index 0000000000..4bd851f245 --- /dev/null +++ b/packages/cli/src/rpc/navie/thread/handlers/messageAttachment.ts @@ -0,0 +1,36 @@ +import { NavieRpc } from '@appland/rpc'; +import ThreadService from '../../services/threadService'; +import { RpcError, RpcHandler } from '../../../rpc'; + +export function navieThreadAddMessageAttachmentHandler( + threadService: ThreadService +): RpcHandler< + NavieRpc.V1.Thread.AddMessageAttachment.Params, + NavieRpc.V1.Thread.AddMessageAttachment.Response +> { + return { + name: NavieRpc.V1.Thread.AddMessageAttachment.Method, + async handler({ threadId, uri, content }) { + if (!uri && !content) throw new RpcError(400, 'must specify at least one of uri or content'); + + const thread = await threadService.getThread(threadId); + const attachmentId = thread.addMessageAttachment(uri, content); + return { attachmentId }; + }, + }; +} + +export function navieThreadRemoveMessageAttachmentHandler( + threadService: ThreadService +): RpcHandler< + NavieRpc.V1.Thread.RemoveMessageAttachment.Params, + NavieRpc.V1.Thread.RemoveMessageAttachment.Response +> { + return { + name: NavieRpc.V1.Thread.RemoveMessageAttachment.Method, + async handler({ threadId, attachmentId }) { + const thread = await threadService.getThread(threadId); + thread.removeMessageAttachment(attachmentId); + }, + }; +} diff --git a/packages/cli/src/rpc/navie/thread/handlers/pinItem.ts b/packages/cli/src/rpc/navie/thread/handlers/pinItem.ts new file mode 100644 index 0000000000..951dbc15ff --- /dev/null +++ b/packages/cli/src/rpc/navie/thread/handlers/pinItem.ts @@ -0,0 +1,31 @@ +import { NavieRpc } from '@appland/rpc'; +import { RpcHandler } from '../../../rpc'; +import ThreadService from '../../services/threadService'; + +export function navieThreadPinItemHandler( + threadService: ThreadService +): RpcHandler { + return { + name: NavieRpc.V1.Thread.PinItem.Method, + async handler({ threadId, pinnedItem }) { + const thread = await threadService.getThread(threadId); + if (!thread) return; + + thread.pinItem(pinnedItem); + }, + }; +} + +export function navieThreadUnpinItemHandler( + threadService: ThreadService +): RpcHandler { + return { + name: NavieRpc.V1.Thread.UnpinItem.Method, + async handler({ threadId, pinnedItem }) { + const thread = await threadService.getThread(threadId); + if (!thread) return; + + thread.unpinItem(pinnedItem); + }, + }; +} diff --git a/packages/cli/src/rpc/navie/thread/handlers/query.ts b/packages/cli/src/rpc/navie/thread/handlers/query.ts new file mode 100644 index 0000000000..3b82afac4f --- /dev/null +++ b/packages/cli/src/rpc/navie/thread/handlers/query.ts @@ -0,0 +1,23 @@ +/* eslint-disable @typescript-eslint/no-unsafe-assignment, + @typescript-eslint/no-unsafe-member-access, + @typescript-eslint/no-unsafe-argument */ +import { NavieRpc } from '@appland/rpc'; +import { RpcHandler } from '../../../rpc'; +import { ThreadIndexService } from '../../services/threadIndexService'; + +export function navieThreadQueryHandler( + threadIndexService: ThreadIndexService +): RpcHandler { + return { + name: NavieRpc.V1.Thread.Query.Method, + handler: ({ threadId, maxCreatedAt, orderBy, limit, offset, projectDirectories }) => + threadIndexService.query({ + uuid: threadId, + maxCreatedAt: maxCreatedAt ? new Date(maxCreatedAt) : undefined, + orderBy, + limit, + offset, + projectDirectories, + }), + }; +} diff --git a/packages/cli/src/rpc/navie/thread/handlers/sendMessage.ts b/packages/cli/src/rpc/navie/thread/handlers/sendMessage.ts new file mode 100644 index 0000000000..642782fac5 --- /dev/null +++ b/packages/cli/src/rpc/navie/thread/handlers/sendMessage.ts @@ -0,0 +1,19 @@ +import { NavieRpc } from '@appland/rpc'; +import { RpcHandler } from '../../../rpc'; +import ThreadService from '../../services/threadService'; + +export function navieThreadSendMessageHandler( + threadService: ThreadService +): RpcHandler { + return { + name: NavieRpc.V1.Thread.SendMessage.Method, + async handler({ + threadId, + content, + userContext, + }): Promise { + const thread = await threadService.getThread(threadId); + await thread.sendMessage(content, userContext); + }, + }; +} diff --git a/packages/cli/src/rpc/navie/thread/handlers/subscribe.ts b/packages/cli/src/rpc/navie/thread/handlers/subscribe.ts new file mode 100644 index 0000000000..69a4d608a7 --- /dev/null +++ b/packages/cli/src/rpc/navie/thread/handlers/subscribe.ts @@ -0,0 +1,43 @@ +import { warn } from 'console'; +import type { Thread } from '..'; +import { randomUUID } from 'crypto'; +import ThreadService from '../../services/threadService'; +import { EventStream } from '../middleware'; + +export async function handler( + threadService: ThreadService, + eventStream: EventStream, + threadId: string, + nonce?: number, + replay?: boolean +) { + let thread: Thread; + try { + thread = await threadService.getThread(threadId); + } catch (e) { + warn(`Failed to load thread ${threadId}: ${e}`); + eventStream.send({ type: 'error', error: e, code: 'missing-thread' }); + return; + } + + const events = thread.getEvents(nonce); + if (replay) { + let lastEventTime = events[0]?.time ?? 0; + for (const event of events) { + await new Promise((resolve) => setTimeout(resolve, event.time - lastEventTime)); + eventStream.send(event); + lastEventTime = event.time; + } + } else { + events.forEach((e) => eventStream.send(e)); + } + + const clientId = randomUUID(); + thread.on('event', clientId, (event) => { + eventStream.send(event); + }); + + eventStream.on('close', () => { + thread.removeAllListeners(clientId); + }); +} diff --git a/packages/cli/src/rpc/navie/thread/index.ts b/packages/cli/src/rpc/navie/thread/index.ts new file mode 100644 index 0000000000..f885556dee --- /dev/null +++ b/packages/cli/src/rpc/navie/thread/index.ts @@ -0,0 +1,406 @@ +import { Navie, UserContext } from '@appland/navie'; +import { dirname, join } from 'path'; +import { EventEmitter } from 'stream'; +import { randomUUID } from 'crypto'; +import { ConversationThread } from '@appland/client'; +import { getSuggestions } from '../suggest'; +import { homedir } from 'os'; +import { mkdir, writeFile } from 'fs/promises'; + +import NavieService from '../services/navieService'; +import { + NavieAddMessageAttachmentEvent, + NavieEvent, + NavieMessageEvent, + PinnedItem, + TimestampNavieEvent, +} from './events'; +import { ThreadIndexService } from '../services/threadIndexService'; +import { container } from 'tsyringe'; +import { NavieRpc } from '@appland/rpc'; +import handleReview from '../../explain/review'; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type EventListener = (...args: any[]) => void; + +/** + * Converts the simplified context format back into the format expected by Navie. + * Why two formats? Because the difference between a `code-snippet` and a `code-selection` is too + * ambiguous, and `file` too specific. Ideally I'd like to migrate to use this static/dynamic format + * everywhere, but it works for now. - DB + */ +function convertContext( + context?: NavieRpc.V1.UserContext.ContextItem[] +): UserContext.ContextItem[] { + if (!context) return []; + return context.map((item) => { + if (item.type === 'static') { + return { type: 'code-snippet', content: item.content, location: item.id }; + } + return { type: 'file', location: item.uri }; + }); +} + +function convertMessageAttachmentToContextItem( + attachment: NavieAddMessageAttachmentEvent +): UserContext.ContextItem { + return { + type: 'code-snippet', + content: attachment.content ?? '', + location: attachment.uri, + }; +} + +export class Thread { + private eventEmitter = new EventEmitter(); + private listeners = new Map(); + private log: TimestampNavieEvent[] = []; + private codeBlockId: string | undefined; + private codeBlockLength: number | undefined; + private lastEventWritten = 0; + private lastTokenBeganCodeBlock = false; + private static readonly HISTORY_DIRECTORY = join(homedir(), '.appmap', 'navie', 'history'); + + constructor( + public readonly conversationThread: ConversationThread, + private readonly navieService: NavieService, + eventLog?: TimestampNavieEvent[] + ) { + if (eventLog?.length) { + this.log = eventLog; + this.lastEventWritten = this.log.length; + } + } + + initialize() { + this.logEvent({ type: 'thread-init', conversationThread: this.conversationThread }); + } + + private logEvent(event: NavieEvent) { + const timeStamped = { ...event, time: Date.now() }; + this.log.push(timeStamped); + this.eventEmitter.emit('event', timeStamped); + } + + private async emitSuggestions(messageId: string) { + const suggestions = await getSuggestions( + this.navieService.navieProvider, + this.conversationThread.id + ); + this.logEvent({ type: 'prompt-suggestions', suggestions, messageId }); + } + + static getHistoryFilePath(threadId: string) { + return join(Thread.HISTORY_DIRECTORY, `${threadId}.navie.jsonl`); + } + + on(event: 'event', clientId: string, listener: (event: NavieEvent) => void): this; + + /** + * Bind to the given event. A client identifier is used to associate a client with bound + * listeners. The listening client must call `removeAllListeners` when it disconnects. + * + * @param event the event to listen for + * @param clientId a unique identifier for the client + * @param listener the listener to call when the event is emitted + * @returns this + */ + on(event: string, clientId: string, listener: EventListener): this { + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + this.eventEmitter.on(event, listener); + + // Keep track of listeners for each client + // When the client disconnects, we can remove all listeners for that client + let listeners = this.listeners.get(clientId); + if (!listeners) { + listeners = [listener]; + this.listeners.set(clientId, listeners); + } + listeners.push(listener); + + return this; + } + + /** + * Remove all listeners bound to the given client identifier. + * + * @param clientId the client identifier to remove + */ + removeAllListeners(clientId: string) { + const listeners = this.listeners.get(clientId); + if (!listeners) return; + + for (const listener of listeners) { + this.eventEmitter.removeListener('event', listener); + } + + this.listeners.delete(clientId); + } + + /** + * Flush the event log to disk. This is called automatically when a new message is finalized. + */ + private async flush() { + if (this.log.length === this.lastEventWritten) return; + + const historyFilePath = Thread.getHistoryFilePath(this.conversationThread.id); + const serialized = this.log.slice(this.lastEventWritten ?? 0).map((e) => JSON.stringify(e)); + + try { + await mkdir(dirname(historyFilePath), { recursive: true }); + await writeFile(historyFilePath, serialized.join('\n') + '\n', { flag: 'a' }); + this.lastEventWritten = this.log.length; + } catch (e) { + console.error('failed to write to history file', e); + } + + try { + let lastUserMessage: NavieMessageEvent | undefined; + for (let i = this.log.length - 1; i >= 0; i--) { + const e = this.log[i]; + if (e.type === 'message' && e.role === 'user') { + lastUserMessage = e; + break; + } + } + const title = lastUserMessage?.content.slice(0, 100); + const threadIndexService = container.resolve(ThreadIndexService); + threadIndexService.index(this.conversationThread.id, historyFilePath, title); + } catch (e) { + console.error('failed to update thread index', e); + } + } + + /** + * Pin an item in the thread. This will emit a `pin-item` event. This item won't actually be + * propagated to the backend unless sent by the client via `sendMessage`. This can change in the + * future. + * + * @param item the item to pin + */ + pinItem(item: PinnedItem) { + this.logEvent({ type: 'pin-item', ...item }); + } + + /** + * Unpin an item in the thread. This will emit a `unpin-item` event. + * + * @param item the item to unpin + */ + unpinItem(item: PinnedItem) { + this.logEvent({ type: 'unpin-item', ...item }); + } + + /** + * This function is responsible for pre-processing and token emission. It keeps track of active + * code blocks and strips out comments, instead emitting them as metadata. + * + * @param token The token to be processed and emitted + * @param messageId The message id associated with the token + */ + private onToken(token: string, messageId: string) { + const subTokens = token.split(/^(`{3,})\n?/gm); + for (let subToken of subTokens) { + if (subToken.length === 0) continue; + + const fileMatch = subToken.match(/^\s*?\n?/m); + if (fileMatch) { + this.codeBlockId = this.codeBlockId ?? randomUUID(); + this.logEvent({ + type: 'token-metadata', + codeBlockId: this.codeBlockId, + metadata: { + location: fileMatch[1], + }, + }); + + // Remove the file directive from the token + const index = fileMatch.index ?? 0; + subToken = subToken.slice(0, index) + subToken.slice(index + fileMatch[0].length); + if (subToken.length === 0) continue; + } + + const language = this.lastTokenBeganCodeBlock ? subToken.match(/^[^\s]+\n/) : null; + if (language && this.codeBlockId) { + this.logEvent({ + type: 'token-metadata', + codeBlockId: this.codeBlockId, + metadata: { + language: language[0].trim(), + }, + }); + } + + this.lastTokenBeganCodeBlock = false; + + let clearCodeBlock = false; + if (subToken.match(/^`{3,}/)) { + // Code block fences + if (this.codeBlockLength === undefined) { + this.codeBlockId = this.codeBlockId ?? randomUUID(); + this.codeBlockLength = subToken.length; + this.lastTokenBeganCodeBlock = true; + } else if (subToken.length === this.codeBlockLength) { + clearCodeBlock = true; + } + } + this.logEvent({ + type: 'token', + token: subToken, + messageId, + codeBlockId: this.codeBlockId, + }); + if (clearCodeBlock) { + this.codeBlockId = undefined; + this.codeBlockLength = undefined; + } + } + } + + /** + * Send a user message to the thread. This will emit a `message` event. This promise will resolve + * once the message has been acknowledged by the backend, before the message is completed. Message + * attachments need NOT be included in the `userContext` parameter. + * + * @param message the message to send + * @param codeSelection (optional) additional context to use, i.e., pinned items + */ + async sendMessage( + message: string, + userContext?: NavieRpc.V1.UserContext.ContextItem[] + ): Promise { + const [navie, contextEvents] = this.navieService.getNavie(); + + let context = convertContext(userContext); + context.push(...this.getMessageAttachments().map(convertMessageAttachmentToContextItem)); + + const { applied, userContext: newUserContext } = await handleReview(message, context); + if (applied && newUserContext) { + context = newUserContext; + } + + let responseId: string | undefined; + contextEvents.on('context', (event) => { + event.complete + ? this.logEvent({ type: 'complete-context-search', id: event.id, result: event.response }) + : this.logEvent({ + type: 'begin-context-search', + id: event.id, + request: event.request, + contextType: event.type, + }); + }); + return new Promise((resolve, reject) => { + navie + .on('ack', (userMessageId: string) => { + this.logEvent({ + type: 'message', + role: 'user', + messageId: userMessageId, + content: message, + }); + resolve(); + }) + .on('token', (token: string, messageId: string) => { + if (!responseId) responseId = messageId; + this.onToken(token, messageId); + }) + .on('error', (err: Error) => { + this.logEvent({ type: 'error', error: err }); + reject(err); + }) + .on('complete', () => { + if (!responseId) throw new Error('recieved complete without messageId'); + this.logEvent({ type: 'message-complete', messageId: responseId }); + this.flush() + .then(() => this.emitSuggestions(responseId!)) + .then(() => this.flush()) + .catch(console.error); + }) + .ask(this.conversationThread.id, message, context, undefined) + .catch(reject); + }); + } + + addMessageAttachment(uri?: string, content?: string): string { + const attachmentId = randomUUID(); + this.logEvent({ + type: 'add-message-attachment', + attachmentId, + uri, + content, + }); + return attachmentId; + } + + removeMessageAttachment(attachmentId: string) { + // There's no validation that the attachmentId is valid. + // This will be up to the client to figure out. + this.logEvent({ type: 'remove-message-attachment', attachmentId }); + } + + getMessageAttachments(): NavieAddMessageAttachmentEvent[] { + // Array.lastIndexOf is not supported until es2023. + let lastUserMessageIndex = -1; + for (let i = 0; i < this.log.length; ++i) { + const e = this.log[i]; + if (e.type === 'message' && e.role === 'user') { + lastUserMessageIndex = i; + } + } + + const attachments = new Map(); + for (let i = lastUserMessageIndex + 1; i < this.log.length; ++i) { + const e = this.log[i]; + if (e.type === 'add-message-attachment') { + attachments.set(e.attachmentId, e); + } else if (e.type === 'remove-message-attachment') { + attachments.delete(e.attachmentId); + } + } + + return Array.from(attachments.values()); + } + + /** + * Gets the events since the given nonce. If no nonce is given, it will return all events. + * + * @param sinceNonce The nonce to start from + * @returns The events since the given nonce + */ + getEvents(sinceNonce?: number): readonly TimestampNavieEvent[] { + return this.log.slice(sinceNonce ?? 0); + } + + /** + * Converts the event log into a chat history. Keep in mind that this processes every event in the + * log, so the return value should be cached when possible. + * + * @returns The chat history + */ + getChatHistory(): Navie.ChatHistory { + const chatHistory: Navie.ChatHistory = []; + const streamingMessages = new Map(); + for (const event of this.log) { + if (event.type === 'message') { + chatHistory.push({ role: event.role, content: event.content }); + } + if (event.type === 'token') { + let message = streamingMessages.get(event.messageId); + if (!message) { + message = { + role: 'assistant', + content: '', + }; + streamingMessages.set(event.messageId, message); + chatHistory.push(message); + } + message.content += event.token; + } + if (event.type === 'message-complete') { + streamingMessages.delete(event.messageId); + } + } + return chatHistory; + } +} diff --git a/packages/cli/src/rpc/navie/thread/middleware.ts b/packages/cli/src/rpc/navie/thread/middleware.ts new file mode 100644 index 0000000000..09da9fe95b --- /dev/null +++ b/packages/cli/src/rpc/navie/thread/middleware.ts @@ -0,0 +1,66 @@ +import { NavieRpc } from '@appland/rpc'; +import { IncomingMessage, NextFunction } from 'connect'; +import { ServerResponse } from 'http'; +import { container } from 'tsyringe'; +import ThreadService from '../services/threadService'; +import { handler } from './handlers/subscribe'; + +export class EventStream { + constructor(private readonly res: ServerResponse) { + this.prepareResponse(); + } + + private prepareResponse() { + this.res + .setHeader('Content-Type', 'text/event-stream; charset=utf-8') + .setHeader('Cache-Control', 'no-cache') + .setHeader('Connection', 'keep-alive'); + } + + on(event: 'close', listener: () => void): this; + on(event: string, listener: (...args: unknown[]) => void): this { + this.res.on(event, listener); + return this; + } + + send(event: Record) { + this.res.write(`data: ${JSON.stringify(event)}\n\n`, 'utf-8'); + } + + end() { + this.res.end(); + } +} + +/** + * Accepts a JSON RPC request and begins streaming events through the connection. This middleware + * is non-standard, and is only used for the `subscribe` command. + + * @returns a middleware function + */ +export function sseMiddleware() { + const threadService = container.resolve(ThreadService); + return ( + req: IncomingMessage & { + body?: { method?: string; params?: NavieRpc.V1.Thread.Subscribe.Params }; + }, + res: ServerResponse, + next: NextFunction + ) => { + if (req.body?.method === NavieRpc.V1.Thread.Subscribe.Method) { + const { params } = req.body; + if (!params?.threadId) { + res.writeHead(400); + res.end(); + return; + } + + const eventStream = new EventStream(res); + handler(threadService, eventStream, params.threadId, params?.nonce, params?.replay).catch( + next + ); + } else { + next(); + } + }; +} diff --git a/packages/cli/tests/setup.js b/packages/cli/tests/setup.js new file mode 100644 index 0000000000..0d8d1d05db --- /dev/null +++ b/packages/cli/tests/setup.js @@ -0,0 +1 @@ +require('reflect-metadata'); diff --git a/packages/cli/tests/unit/rpc/explain/LocalNavie.spec.ts b/packages/cli/tests/unit/rpc/explain/LocalNavie.spec.ts index 5449de28b0..3fc827969b 100644 --- a/packages/cli/tests/unit/rpc/explain/LocalNavie.spec.ts +++ b/packages/cli/tests/unit/rpc/explain/LocalNavie.spec.ts @@ -4,11 +4,8 @@ import EventEmitter from 'events'; import { navie as navieFn, Navie } from '@appland/navie'; import LocalNavie from '../../../../src/rpc/explain/navie/navie-local'; -import History from '../../../../src/rpc/explain/navie/history'; -import Thread from '../../../../src/rpc/explain/navie/thread'; jest.mock('@appland/navie'); -jest.mock('../../../../src/rpc/explain/navie/history'); describe('LocalNavie', () => { let navie: LocalNavie; @@ -40,104 +37,4 @@ describe('LocalNavie', () => { ); }); }); - - describe('ask', () => { - let thread: Thread; - let timestamp: number; - let projectDirectories: string[]; - - let mockHistoryLoad: jest.SpyInstance; - let mockHistoryQuestion: jest.SpyInstance; - let mockHistoryToken: jest.SpyInstance; - - beforeEach(() => { - timestamp = Date.now(); - projectDirectories = ['dir-1']; - thread = new Thread('the-thread-id', timestamp, projectDirectories); - - mockHistoryLoad = jest - .spyOn(History.prototype, 'load') - .mockImplementation(async (_threadId: string) => { - return thread; - }); - mockHistoryQuestion = jest.spyOn(History.prototype, 'question'); - mockHistoryToken = jest.spyOn(History.prototype, 'token'); - }); - - describe('when invoked with a threadId', () => { - let navieImpl: Navie.INavie; - let tokens: string[]; - const answer = ["It's", ' 42']; - - beforeEach(() => { - navieImpl = new EventEmitter() as unknown as Navie.INavie; - navieImpl.execute = jest.fn().mockImplementation((): AsyncIterable => { - return (async function* () { - for (const word of answer) { - yield word; - } - })(); - }); - - jest.mocked(navieFn).mockReturnValue(navieImpl); - - tokens = new Array(); - navie.on('token', (token: string) => tokens.push(token)); - }); - - const awaitResponse = (resolve: () => void) => { - navie.on('complete', () => { - expect(tokens.join('')).toEqual("It's 42"); - resolve(); - }); - }; - - it('uses the threadId and processes the question', async () => { - let allocatedThreadId: string; - navie.on('ack', (_messageId: string, threadId: string) => { - expect(threadId).toBe('the-thread-id'); - allocatedThreadId = threadId; - }); - - await new Promise((resolve) => { - awaitResponse(() => { - expect(mockHistoryLoad).toHaveBeenCalledWith(allocatedThreadId); - - resolve(); - }); - navie.ask('the-thread-id', 'What is the meaning of life?'); - }); - }); - - describe('when invoked without a threadId', () => { - it('should assign a thread id if not provided', async () => { - const mockThreadId = 'mock-thread-id'; - navie.setThreadId(mockThreadId); - - const question = 'What is the meaning of life?'; - await navie.ask(undefined, question); - - expect(mockHistoryLoad).toHaveBeenCalledWith(mockThreadId); - }); - - it('allocates a new threadId', async () => { - let allocatedThreadId: string; - - navie.on('ack', (_messageId: string, threadId: string) => { - expect(threadId).toBeDefined(); - allocatedThreadId = threadId; - }); - - await new Promise((resolve) => { - awaitResponse(() => { - expect(mockHistoryLoad).toHaveBeenCalledWith(allocatedThreadId); - - resolve(); - }); - navie.ask(undefined, 'What is the meaning of life?'); - }); - }); - }); - }); - }); }); diff --git a/packages/cli/tests/unit/rpc/explain/history.spec.ts b/packages/cli/tests/unit/rpc/explain/history.spec.ts deleted file mode 100644 index 625261016c..0000000000 --- a/packages/cli/tests/unit/rpc/explain/history.spec.ts +++ /dev/null @@ -1,165 +0,0 @@ -import { mkdtempSync, rmSync } from 'fs'; -import History from '../../../../src/rpc/explain/navie/history'; -import { join } from 'path'; -import { tmpdir } from 'os'; -import { readdir, readFile, readlink, rm } from 'fs/promises'; -import configuration from '../../../../src/rpc/configuration'; -import { exists, verbose } from '../../../../src/utils'; - -describe(History, () => { - const oldFixtureDir = join(__dirname, '..', '..', 'fixtures', 'history', 'oldFormat'); - let tempDir: string; - - const uuid1 = '9af0b3b0-4b3b-4b3b-4b3b-4b3b4b3b4b3b'; - const uuid2 = '9af0b3b0-4b3b-4b3b-4b3b-4b3b4b3b4b3c'; - - beforeEach(() => (tempDir = mkdtempSync(join(tmpdir(), 'history-test-')))); - if (!verbose()) afterEach(() => rmSync(tempDir, { recursive: true })); - - it('migrates old-style history to new-style history', async () => { - const history = new History(tempDir); - await History.migrate(oldFixtureDir, history, { cleanup: false }); - - // List the threads in the new-style history directory - const threads = await readdir(join(tempDir, 'threads')); - expect(threads).toEqual([uuid1, uuid2]); - - // List the symlinks by date in the new history directory - const dates = await readdir(join(tempDir, 'dates')); - const todaysDate = new Date().toISOString().split('T')[0]; - expect(dates).toEqual([todaysDate]); - - const threadIds = await readdir(join(tempDir, 'dates', todaysDate)); - expect(threadIds).toEqual([uuid1, uuid2]); - }); - - it('ignores old-style history with no messages', async () => { - const emptyOldFixtureDir = join(__dirname, '..', '..', 'fixtures', 'history', 'emptyOldFormat'); - const history = new History(tempDir); - await History.migrate(emptyOldFixtureDir, history, { cleanup: false }); - - expect(await exists(join(tempDir, 'threads'))).toBe(false); - expect(await exists(join(tempDir, 'dates'))).toBe(false); - }); - - it('ignores old-style history when messages contain unrecognized roles', async () => { - const oldFixtureDirWithUnrecognizedRoles = join( - __dirname, - '..', - '..', - 'fixtures', - 'history', - 'oldFormatWithUnrecognizedRoles' - ); - const history = new History(tempDir); - await History.migrate(oldFixtureDirWithUnrecognizedRoles, history, { cleanup: false }); - - expect(await exists(join(tempDir, 'threads'))).toBe(false); - expect(await exists(join(tempDir, 'dates'))).toBe(false); - }); - - describe('load', () => { - it('loads a thread from the new-style history', async () => { - const history = new History(tempDir); - - await history.question( - 'thread-1', - 'question-1', - 'question 1', - 'code selection 1', - 'prompt 1' - ); - await history.token('thread-1', 'question-1', 'answer-1', 'answer 1'); - - await history.question( - 'thread-1', - 'question-2', - 'question 2', - 'code selection 2', - 'prompt 2' - ); - await history.token('thread-1', 'question-2', 'answer-2', 'answer 2\n'); - await history.token('thread-1', 'question-2', 'answer-2', 'answer 3'); - - const thread = await history.load('thread-1'); - const timestamps = thread.exchanges.map((e) => e.question.timestamp); - const projectDirectories = configuration().projectDirectories; - expect(thread.projectDirectories).toEqual(projectDirectories); - expect(thread.exchanges).toEqual([ - { - question: { - role: 'user', - timestamp: timestamps[0], - messageId: 'question-1', - content: 'question 1', - prompt: 'prompt 1', - codeSelection: 'code selection 1', - }, - answer: { - role: 'assistant', - messageId: 'answer-1', - content: 'answer 1', - }, - }, - { - question: { - role: 'user', - timestamp: timestamps[1], - messageId: 'question-2', - content: 'question 2', - prompt: 'prompt 2', - codeSelection: 'code selection 2', - }, - answer: { role: 'assistant', messageId: 'answer-2', content: 'answer 2\nanswer 3' }, - }, - ]); - }); - - it('tests sequence of question() and token() calls and verifies filesystem output', async () => { - const history = new History(tempDir); - - await history.question( - 'thread-1', - 'question-1', - 'question 1', - 'code selection 1', - 'prompt 1' - ); - - await history.token('thread-1', 'question-1', 'assistant-1', 'token 1\n'); - await history.token('thread-1', 'question-1', 'assistant-1', 'token 2\n'); - await history.token('thread-1', 'question-1', 'assistant-1', 'token 3'); - - const messageDir = join(tempDir, 'threads', 'thread-1', 'messages', 'question-1'); - const sequenceDir = join(tempDir, 'threads', 'thread-1', 'sequence'); - const dateDir = join(tempDir, 'dates', new Date().toISOString().split('T')[0]); - - // Verify the existence of directories and files - expect(await exists(messageDir)).toBe(true); - expect(await exists(sequenceDir)).toBe(true); - expect(await exists(dateDir)).toBe(true); - - // Verify the content of the files - const questionFile = await readFile(join(messageDir, 'question.txt'), 'utf-8'); - expect(questionFile).toBe('question 1'); - - const tokenFile = await readFile(join(messageDir, 'answer.md'), 'utf-8'); - expect(tokenFile).toBe('token 1\ntoken 2\ntoken 3'); - - // Verify the symlinks - const sequenceFiles = await readdir(sequenceDir); - const sequenceFileNames = sequenceFiles.map((f) => f.split('.')[1]).sort(); - expect(sequenceFileNames).toEqual([ - 'answer', - 'assistantMessageId', - 'codeSelection', - 'prompt', - 'question', - ]); - for (const sequenceFile of sequenceFiles) { - const symlinkTarget = await readlink(join(sequenceDir, sequenceFile)); - expect(symlinkTarget).toContain('question-1'); - } - }); - }); -}); diff --git a/packages/cli/tests/unit/rpc/explain/historyHelper.spec.ts b/packages/cli/tests/unit/rpc/explain/historyHelper.spec.ts deleted file mode 100644 index 517af6a7c0..0000000000 --- a/packages/cli/tests/unit/rpc/explain/historyHelper.spec.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { mkdirSync, existsSync } from 'fs'; -import { join } from 'path'; -import { homedir } from 'os'; -import { initializeHistory } from '../../../../src/rpc/explain/navie/historyHelper'; - -// eslint-disable-next-line @typescript-eslint/no-unsafe-return -jest.mock('fs', () => ({ - ...jest.requireActual('fs'), - mkdirSync: jest.fn(), - existsSync: jest.fn(), -})); - -describe('historyHelper', () => { - beforeEach(() => jest.resetAllMocks()); - - it('creates history directory if it does not exist', () => { - const historyDir = join(homedir(), '.appmap', 'navie', 'history'); - (existsSync as jest.Mock).mockReturnValue(false); - - initializeHistory(); - - expect(mkdirSync).toHaveBeenCalledWith(historyDir, { recursive: true }); - }); - - it('does not create history directory if it already exists', () => { - (existsSync as jest.Mock).mockReturnValue(true); - - initializeHistory(); - - expect(mkdirSync).not.toHaveBeenCalled(); - }); -}); diff --git a/packages/cli/tests/unit/rpc/navie/services/contextService.spec.ts b/packages/cli/tests/unit/rpc/navie/services/contextService.spec.ts new file mode 100644 index 0000000000..efb1f52f46 --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/services/contextService.spec.ts @@ -0,0 +1,95 @@ +import { container } from 'tsyringe'; +import { ContextService } from '../../../../../src/rpc/navie/services/contextService'; + +jest.mock('../../../../../src/rpc/explain/collect-context.ts', () => { + return { + __esModule: true, + default: jest.fn().mockResolvedValue({ context: [] }), + buildContextRequest: jest.fn().mockReturnValue({}), + }; +}); + +jest.mock('../../../../../src/cmds/navie/projectInfo.ts', () => { + return { + __esModule: true, + default: jest.fn(), + }; +}); + +jest.mock('../../../../../src/cmds/navie/help.ts', () => { + return { + __esModule: true, + default: jest.fn(), + }; +}); + +jest.mock('../../../../../src/rpc/configuration.ts', () => { + return { + __esModule: true, + default: jest.fn().mockReturnValue({ + appmapDirectories: jest.fn().mockResolvedValue([]), + projectDirectories: [], + }), + }; +}); + +jest.mock('../../../../../src/lib/detectCodeEditor.ts', () => { + return { + __esModule: true, + default: jest.fn().mockReturnValue('test-editor'), + }; +}); + +import collectContext, { + buildContextRequest, +} from '../../../../../src/rpc/explain/collect-context'; +import collectProjectInfos from '../../../../../src/cmds/navie/projectInfo'; +import collectHelp from '../../../../../src/cmds/navie/help'; +import { Help } from '@appland/navie'; + +const mockedBuildContextRequest = jest.mocked(buildContextRequest); +const mockedCollectContext = jest.mocked(collectContext); +const mockedCollectProjectInfos = jest.mocked(collectProjectInfos); +const mockedCollectHelp = jest.mocked(collectHelp); + +describe('ContextService', () => { + let contextService: ContextService; + + beforeEach(() => { + container.reset(); + contextService = container.resolve(ContextService); + }); + + describe('searchContext', () => { + it('calls collectContext with the correct parameters', async () => { + const vectorTerms = ['foo', 'bar']; + const tokenCount = 10; + const data = { vectorTerms, tokenCount }; + await contextService.searchContext(data); + expect(mockedBuildContextRequest).toHaveBeenCalledWith( + expect.any(Array), + expect.any(Array), + undefined, + vectorTerms, + tokenCount * 3, + data + ); + expect(mockedCollectContext).toHaveBeenCalled(); + }); + }); + + describe('projectInfoContext', () => { + it('calls collectProjectInfos', async () => { + await contextService.projectInfoContext(); + expect(mockedCollectProjectInfos).toHaveBeenCalledWith('test-editor'); + }); + }); + + describe('helpContext', () => { + it('calls collectHelp', async () => { + const data: Help.HelpRequest = { type: 'help', vectorTerms: ['help'], tokenCount: 10 }; + await contextService.helpContext(data); + expect(mockedCollectHelp).toHaveBeenCalledWith(data); + }); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/services/navieService.spec.ts b/packages/cli/tests/unit/rpc/navie/services/navieService.spec.ts new file mode 100644 index 0000000000..b2c6f80d9f --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/services/navieService.spec.ts @@ -0,0 +1,50 @@ +import { container } from 'tsyringe'; +import { INavieProvider } from '../../../../../src/rpc/explain/navie/inavie'; +import NavieService from '../../../../../src/rpc/navie/services/navieService'; +import EventEmitter from 'events'; + +describe('NavieService', () => { + let provider: INavieProvider; + let navieService: NavieService; + beforeEach(() => { + container.reset(); + provider = () => ({ + providerName: 'test', + setOption: jest.fn(), + ask: jest.fn(), + on: jest.fn(), + }); + NavieService.bindNavieProvider(provider); + navieService = container.resolve(NavieService); + }); + + describe('bindNavieProvider', () => { + it('should bind the provider', () => { + expect(container.resolve(NavieService).navieProvider).toBe(provider); + }); + }); + + describe('getNavie', () => { + it('accepts a custom provider', () => { + const providerName = 'custom-provider'; + const customProvider: INavieProvider = () => ({ + providerName, + setOption: jest.fn(), + ask: jest.fn(), + on: jest.fn(), + }); + const [navie] = navieService.getNavie(customProvider); + expect(navie.providerName).toBe(providerName); + }); + + it('returns a default provider if no custom provider is provided', () => { + const [navie] = navieService.getNavie(); + expect(navie.providerName).toBe('test'); + }); + + it('returns a context emitter', () => { + const [, contextEmitter] = navieService.getNavie(); + expect(contextEmitter).toBeInstanceOf(EventEmitter); + }); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/services/threadIndexService.spec.ts b/packages/cli/tests/unit/rpc/navie/services/threadIndexService.spec.ts new file mode 100644 index 0000000000..948ff4883e --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/services/threadIndexService.spec.ts @@ -0,0 +1,195 @@ +/* eslint-disable @typescript-eslint/no-explicit-any, + @typescript-eslint/no-unsafe-assignment, + @typescript-eslint/no-unsafe-call, + @typescript-eslint/no-unsafe-member-access +*/ + +import { container } from 'tsyringe'; +import { + ThreadIndexItem, + ThreadIndexService, +} from '../../../../../src/rpc/navie/services/threadIndexService'; +import sqlite3 from 'better-sqlite3'; +import configuration from '../../../../../src/rpc/configuration'; + +describe('ThreadIndexService', () => { + let threadIndexService: ThreadIndexService; + let db: sqlite3.Database; + const threadId = '00000000-0000-0000-0000-000000000000'; + + beforeEach(() => { + container.reset(); + db = new sqlite3(':memory:'); + container.registerInstance(ThreadIndexService.DATABASE, db); + threadIndexService = container.resolve(ThreadIndexService); + }); + + describe('indexThread', () => { + it('indexes a thread', () => { + const path = 'example-thread-history.jsonl'; + const title = 'example title'; + threadIndexService.index(threadId, path, title); + + const result = db.prepare('SELECT * FROM threads WHERE id = ?').get(1); + expect(result).toEqual({ + id: 1, + uuid: threadId, + path, + title, + created_at: expect.any(String), + updated_at: expect.any(String), + }); + }); + + it('updates the thread if it already exists', () => { + const path = 'example-thread-history.jsonl'; + threadIndexService.index(threadId, path, 'old title'); + threadIndexService.index(threadId, path, 'new title'); + + expect(db.prepare('SELECT * FROM threads WHERE id = ?').get(1)).toEqual({ + id: 1, + uuid: threadId, + path, + title: 'new title', + created_at: expect.any(String), + updated_at: expect.any(String), + }); + }); + + it('indexes project directories', () => { + const path = 'example-thread-history.jsonl'; + const projectDirectories = ['/home/user/dev/applandinc/appmap-js', '/home/user/dev']; + configuration().projectDirectories = projectDirectories; + threadIndexService.index(threadId, path, 'title'); + const result = db.prepare('SELECT * FROM project_directories').all(); + expect(result).toEqual( + projectDirectories.map((path, i) => ({ + id: i + 1, + path, + thread_id: threadId, + })) + ); + }); + }); + describe('deleteThread', () => { + it('deletes a thread', () => { + const path = 'example-thread-history.jsonl'; + threadIndexService.index(threadId, path, 'title'); + threadIndexService.delete(threadId); + expect(db.prepare('SELECT * FROM threads').all()).toEqual([]); + }); + it('does nothing if the thread does not exist', () => { + expect(() => threadIndexService.delete(threadId)).not.toThrow(); + }); + }); + describe('query', () => { + const daysAgo = (days: number) => { + const d = new Date(); + d.setDate(d.getDate() - days); + return d; + }; + + const threads = [ + { + id: '00000000-0000-0000-0000-000000000001', + path: 'three-days-old.jsonl', + title: 'title', + createdAt: daysAgo(3), + updatedAt: new Date(), + projectDirectories: ['/home/user/dev/applandinc/appmap-golang'], + }, + { + id: '00000000-0000-0000-0000-000000000000', + path: 'one-day-old.jsonl', + title: 'title', + createdAt: daysAgo(1), + updatedAt: new Date(), + projectDirectories: ['/home/user/dev/applandinc/appmap-js'], + }, + ]; + + const toModel = (t: typeof threads[number]): ThreadIndexItem => { + const fixture: Partial = { ...t }; + delete fixture.projectDirectories; + + const thread = fixture as typeof threads[number]; + return { + id: thread.id, + path: thread.path, + title: thread.title, + created_at: t.createdAt.toISOString(), + updated_at: t.updatedAt.toISOString(), + }; + }; + + beforeEach(() => { + const insertThread = db.prepare( + 'INSERT INTO threads (uuid, path, title, created_at, updated_at) VALUES (?, ?, ?, ?, ?)' + ); + const insertProjectDirectory = db.prepare( + 'INSERT INTO project_directories (thread_id, path) VALUES (?, ?)' + ); + db.transaction(() => { + threads.forEach((thread) => { + insertThread.run( + thread.id, + thread.path, + thread.title, + thread.createdAt.toISOString(), + thread.updatedAt.toISOString() + ); + thread.projectDirectories.forEach((projectDirectory) => + insertProjectDirectory.run(thread.id, projectDirectory) + ); + }); + })(); + }); + + it('returns everything if no options are provided', () => { + expect(threadIndexService.query({})).toEqual(threads.map(toModel)); + }); + + it('looks up by uuid', () => { + expect(threadIndexService.query({ uuid: threads[1].id })).toEqual([toModel(threads[1])]); + }); + + it('looks up by max created at', () => { + expect(threadIndexService.query({ maxCreatedAt: daysAgo(2) })).toEqual([toModel(threads[1])]); + }); + + it('can look up by uuid and max created at', () => { + // This is an oddity, but its technically possible given the `QueryOptions` interface. + expect(threadIndexService.query({ uuid: threads[1].id, maxCreatedAt: daysAgo(2) })).toEqual([ + toModel(threads[1]), + ]); + }); + + it('orders by', () => { + const result = threadIndexService.query({ orderBy: 'created_at' }); + expect(result).toEqual([...threads].reverse().map(toModel)); + }); + + it('limits', () => { + const result = threadIndexService.query({ limit: 1 }); + expect(result).toEqual(threads.slice(0, 1).map(toModel)); + }); + + it('offsets', () => { + const result = threadIndexService.query({ limit: 1, offset: 1 }); + expect(result).toEqual(threads.slice(1).map(toModel)); + }); + + it('fails if offset is used without a limit', () => { + expect(() => threadIndexService.query({ offset: 1 })).toThrowError( + 'offset cannot be used without a limit' + ); + }); + + it('validates orderBy option', () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-argument + expect(() => threadIndexService.query({ orderBy: 'invalid' } as any)).toThrowError( + 'invalid orderBy option: invalid' + ); + }); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/services/threadService.spec.ts b/packages/cli/tests/unit/rpc/navie/services/threadService.spec.ts new file mode 100644 index 0000000000..b7b0ca7249 --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/services/threadService.spec.ts @@ -0,0 +1,74 @@ +import { container } from 'tsyringe'; +import NavieService from '../../../../../src/rpc/navie/services/navieService'; +import { ThreadIndexService } from '../../../../../src/rpc/navie/services/threadIndexService'; +import ThreadService from '../../../../../src/rpc/navie/services/threadService'; +import { Thread } from '../../../../../src/rpc/navie/thread'; +import { ConversationThread } from '@appland/client'; +import { rm, writeFile } from 'node:fs/promises'; + +describe('ThreadService', () => { + let threadService: ThreadService; + let navieService: NavieService; + let threadIndexService: ThreadIndexService; + const threadId = 'example-thread-id'; + + beforeEach(() => { + container.reset(); + container; + + threadIndexService = { + delete: jest.fn(), + } as unknown as ThreadIndexService; + + navieService = {} as unknown as NavieService; + + container.registerInstance(ThreadIndexService, threadIndexService); + container.registerInstance(NavieService, navieService); + threadService = container.resolve(ThreadService); + }); + + afterEach(() => rm(Thread.getHistoryFilePath(threadId), { force: true, recursive: true })); + + describe('getThread', () => { + it('retrieves a thread from memory', async () => { + threadService.registerThread({ id: threadId } as unknown as ConversationThread); + await expect(threadService.getThread(threadId)).resolves.toBeInstanceOf(Thread); + }); + + it('loads a thread from disk', async () => { + const threadPath = Thread.getHistoryFilePath(threadId); + await writeFile( + threadPath, + JSON.stringify({ type: 'thread-init', conversationThread: { id: threadId } }) + ); + await expect(threadService.getThread(threadId)).resolves.toBeInstanceOf(Thread); + }); + + it('raises an error if no thread-init event is found', async () => { + const threadPath = Thread.getHistoryFilePath(threadId); + await writeFile(threadPath, JSON.stringify({})); + await expect(threadService.getThread(threadId)).rejects.toThrow( + 'Thread init event not found' + ); + }); + + it('raises an error if the thread is not found', async () => { + const result = threadService.getThread(threadId); + await Promise.all([ + expect(result).rejects.toBeInstanceOf(Error), + expect(result).rejects.toThrow('Failed to load history file'), + expect(result).rejects.toThrow('ENOENT'), + ]); + }); + }); + + describe('registerThread', () => { + it('fails if the thread is already registered', () => { + const conversationThread = { id: threadId } as unknown as ConversationThread; + threadService.registerThread(conversationThread); + expect(() => threadService.registerThread(conversationThread)).toThrow( + `Thread ${conversationThread.id} is already registered` + ); + }); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/thread/handlers/messageAttachment.spec.ts b/packages/cli/tests/unit/rpc/navie/thread/handlers/messageAttachment.spec.ts new file mode 100644 index 0000000000..4e2cb68753 --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/thread/handlers/messageAttachment.spec.ts @@ -0,0 +1,54 @@ +import { NavieRpc } from '@appland/rpc'; +import ThreadService from '../../../../../../src/rpc/navie/services/threadService'; +import { + navieThreadAddMessageAttachmentHandler, + navieThreadRemoveMessageAttachmentHandler, +} from '../../../../../../src/rpc/navie/thread/handlers/messageAttachment'; + +describe('message attachment handlers', () => { + let threadService: ThreadService; + let mockThread: { + addMessageAttachment: jest.Mock; + removeMessageAttachment: jest.Mock; + }; + + beforeEach(() => { + mockThread = { + addMessageAttachment: jest.fn(), + removeMessageAttachment: jest.fn(), + }; + threadService = { + getThread: jest.fn().mockResolvedValue(mockThread), + } as unknown as ThreadService; + }); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access + describe(NavieRpc.V1.Thread.AddMessageAttachment.Method, () => { + it('adds a message attachment', () => { + const threadId = 'example-thread-id'; + const uri = 'file://test.md'; + const { handler } = navieThreadAddMessageAttachmentHandler(threadService); + handler({ + threadId, + uri, + }); + expect(threadService.getThread).toHaveBeenCalledWith(threadId); + expect(mockThread.addMessageAttachment).toHaveBeenCalledWith(uri); + }); + }); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access + describe(NavieRpc.V1.Thread.RemoveMessageAttachment.Method, () => { + it('removes a message attachment', () => { + const threadId = 'example-thread-id'; + const attachmentId = 'test-attachment-id'; + const { handler } = navieThreadRemoveMessageAttachmentHandler(threadService); + handler({ + threadId, + attachmentId, + }); + expect(threadService.getThread).toHaveBeenCalledWith(threadId); + expect(mockThread.removeMessageAttachment).toHaveBeenCalledWith(attachmentId); + }); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/thread/handlers/pinItem.spec.ts b/packages/cli/tests/unit/rpc/navie/thread/handlers/pinItem.spec.ts new file mode 100644 index 0000000000..3fe5915a2e --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/thread/handlers/pinItem.spec.ts @@ -0,0 +1,54 @@ +import { NavieRpc } from '@appland/rpc'; +import ThreadService from '../../../../../../src/rpc/navie/services/threadService'; +import { + navieThreadPinItemHandler, + navieThreadUnpinItemHandler, +} from '../../../../../../src/rpc/navie/thread/handlers/pinItem'; + +describe('pin item handlers', () => { + let threadService: ThreadService; + let mockThread: { + pinItem: jest.Mock; + unpinItem: jest.Mock; + }; + + beforeEach(() => { + mockThread = { + pinItem: jest.fn(), + unpinItem: jest.fn(), + }; + threadService = { + getThread: jest.fn().mockResolvedValue(mockThread), + } as unknown as ThreadService; + }); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access + describe(NavieRpc.V1.Thread.PinItem.Method, () => { + it('pins an item', () => { + const threadId = 'example-thread-id'; + const pinnedItem = { uri: 'file://test' }; + const { handler } = navieThreadPinItemHandler(threadService); + handler({ + threadId, + pinnedItem, + }); + expect(threadService.getThread).toHaveBeenCalledWith(threadId); + expect(mockThread.pinItem).toHaveBeenCalledWith(pinnedItem); + }); + }); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access + describe(NavieRpc.V1.Thread.UnpinItem.Method, () => { + it('unpins an item', () => { + const threadId = 'example-thread-id'; + const pinnedItem = { uri: 'file://test' }; + const { handler } = navieThreadUnpinItemHandler(threadService); + handler({ + threadId, + pinnedItem, + }); + expect(threadService.getThread).toHaveBeenCalledWith(threadId); + expect(mockThread.unpinItem).toHaveBeenCalledWith(pinnedItem); + }); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/thread/handlers/query.spec.ts b/packages/cli/tests/unit/rpc/navie/thread/handlers/query.spec.ts new file mode 100644 index 0000000000..6a0657879d --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/thread/handlers/query.spec.ts @@ -0,0 +1,46 @@ +import { NavieRpc } from '@appland/rpc'; +import { ThreadIndexService } from '../../../../../../src/rpc/navie/services/threadIndexService'; +import { navieThreadQueryHandler } from '../../../../../../src/rpc/navie/thread/handlers/query'; + +// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access +describe(NavieRpc.V1.Thread.Query.Method, () => { + let threadIndexService: { + query: jest.Mock; + }; + + beforeEach(() => { + threadIndexService = { + query: jest.fn().mockResolvedValue([]), + }; + }); + + it('passes the query to the thread index service', () => { + const threadId = 'example-thread-id'; + const maxCreatedAt = new Date().toISOString(); + const orderBy = 'created_at'; + const limit = 10; + const offset = 5; + const projectDirectories = ['/home/user/dev/applandinc/appmap-js']; + const { handler } = navieThreadQueryHandler( + threadIndexService as unknown as ThreadIndexService + ); + + handler({ + threadId, + maxCreatedAt, + orderBy, + limit, + offset, + projectDirectories, + }); + + expect(threadIndexService.query).toHaveBeenCalledWith({ + uuid: threadId, + maxCreatedAt: new Date(maxCreatedAt), + orderBy, + limit, + offset, + projectDirectories, + }); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/thread/handlers/sendMessage.spec.ts b/packages/cli/tests/unit/rpc/navie/thread/handlers/sendMessage.spec.ts new file mode 100644 index 0000000000..d441bbc52b --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/thread/handlers/sendMessage.spec.ts @@ -0,0 +1,37 @@ +import { NavieRpc } from '@appland/rpc'; +import ThreadService from '../../../../../../src/rpc/navie/services/threadService'; +import { navieThreadSendMessageHandler } from '../../../../../../src/rpc/navie/thread/handlers/sendMessage'; + +// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access +describe(NavieRpc.V1.Thread.SendMessage.Method, () => { + let threadService: ThreadService; + let mockThread: { + sendMessage: jest.Mock; + }; + + beforeEach(() => { + mockThread = { + sendMessage: jest.fn(), + }; + threadService = { + getThread: jest.fn().mockResolvedValue(mockThread), + } as unknown as ThreadService; + }); + + it('propagates parameters as expected', () => { + const threadId = 'example-thread-id'; + const content = 'test content'; + const userContext: NavieRpc.V1.UserContext.ContextItem[] = [ + { type: 'static', content: 'test' }, + { type: 'dynamic', uri: 'file://test' }, + ]; + const { handler } = navieThreadSendMessageHandler(threadService); + handler({ + threadId, + content, + userContext, + }); + expect(threadService.getThread).toHaveBeenCalledWith(threadId); + expect(mockThread.sendMessage).toHaveBeenCalledWith(content, userContext); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/thread/handlers/subscribe.spec.ts b/packages/cli/tests/unit/rpc/navie/thread/handlers/subscribe.spec.ts new file mode 100644 index 0000000000..e0c6d0dcb1 --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/thread/handlers/subscribe.spec.ts @@ -0,0 +1,122 @@ +import { NavieRpc } from '@appland/rpc'; +import { handler } from '../../../../../../src/rpc/navie/thread/handlers/subscribe'; +import ThreadService from '../../../../../../src/rpc/navie/services/threadService'; +import { EventStream } from '../../../../../../src/rpc/navie/thread/middleware'; + +// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access +describe(NavieRpc.V1.Thread.Subscribe.Method, () => { + let mockEventStream: { + on: jest.Mock; + send: jest.Mock; + end: jest.Mock; + }; + let threadService: { + getThread: jest.Mock; + }; + let mockThread: { + removeAllListeners: jest.Mock; + getEvents: jest.Mock; + on: jest.Mock; + }; + + beforeEach(() => { + mockEventStream = { + on: jest.fn(), + send: jest.fn(), + end: jest.fn(), + }; + mockThread = { + removeAllListeners: jest.fn(), + getEvents: jest.fn().mockReturnValue([]), + on: jest.fn(), + }; + threadService = { + getThread: jest.fn().mockResolvedValue(mockThread), + }; + }); + + it('replays events to the client', async () => { + const events = [ + { type: 'message', role: 'assistant', content: 'test' }, + { type: 'message', role: 'user', content: 'test' }, + { type: 'message', role: 'assistant', content: 'test' }, + ]; + mockThread.getEvents.mockReturnValue(events); + + const threadId = 'example-thread-id'; + await handler( + threadService as unknown as ThreadService, + mockEventStream as unknown as EventStream, + threadId, + undefined, + true + ); + + expect(threadService.getThread).toHaveBeenCalledWith(threadId); + expect(mockThread.on).toHaveBeenCalledWith('event', expect.any(String), expect.any(Function)); + expect(mockThread.getEvents).toHaveBeenCalledWith(undefined); + expect(mockEventStream.send.mock.calls.flat()).toStrictEqual(events); + }); + + it('streams new events to the client', async () => { + await handler( + threadService as unknown as ThreadService, + mockEventStream as unknown as EventStream, + 'example-thread-id', + undefined, + true + ); + + // This is the listener function via: `thread.on('event', clientId, listener)` + const bindListenerCalls = mockThread.on.mock.calls; + expect(bindListenerCalls.length).toBe(1); + expect(bindListenerCalls[0]).toStrictEqual(['event', expect.any(String), expect.any(Function)]); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const listener = bindListenerCalls[0][2] as (event: Record) => void; + const event = { type: 'message', role: 'assistant', content: 'test' }; + + expect(mockEventStream.send).not.toHaveBeenCalled(); + listener(event); + expect(mockEventStream.send).toHaveBeenCalledWith(event); + }); + + it('unbinds listeners when the stream is closed', async () => { + await handler( + threadService as unknown as ThreadService, + mockEventStream as unknown as EventStream, + 'example-thread-id', + undefined, + true + ); + + const onCloseCalls = mockEventStream.on.mock.calls; + expect(onCloseCalls.length).toBe(1); + expect(onCloseCalls[0]).toStrictEqual(['close', expect.any(Function)]); + + // This is the listener function via: `eventStream.on('close', listener)` + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const listener = onCloseCalls[0][1] as () => void; + listener(); + + expect(mockThread.removeAllListeners).toHaveBeenCalledWith(expect.any(String)); + }); + + it('emits an error if the thread cannot be loaded', async () => { + threadService.getThread.mockRejectedValue(new Error('test error')); + + await handler( + threadService as unknown as ThreadService, + mockEventStream as unknown as EventStream, + 'example-thread-id', + undefined, + true + ); + + expect(mockEventStream.send).toHaveBeenCalledWith({ + type: 'error', + error: new Error('test error'), + code: 'missing-thread', + }); + }); +}); diff --git a/packages/cli/tests/unit/rpc/navie/thread/index.spec.ts b/packages/cli/tests/unit/rpc/navie/thread/index.spec.ts new file mode 100644 index 0000000000..96cf935be7 --- /dev/null +++ b/packages/cli/tests/unit/rpc/navie/thread/index.spec.ts @@ -0,0 +1,675 @@ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ +import { ConversationThread } from '@appland/client'; +import { Thread } from '../../../../../src/rpc/navie/thread'; +import NavieService from '../../../../../src/rpc/navie/services/navieService'; +import { container } from 'tsyringe'; +import { getSuggestions } from '../../../../../src/rpc/navie/suggest'; +import { EventEmitter } from 'stream'; +import INavie, { INavieProvider } from '../../../../../src/rpc/explain/navie/inavie'; +import { homedir, tmpdir } from 'os'; +import { join } from 'path'; +import { chmod, mkdtemp, readFile, rm, writeFile } from 'fs/promises'; +import { exists } from '../../../../../src/utils'; +import { + NavieAddMessageAttachmentEvent, + NavieEvent, + NavieMessageEvent, + NavieTokenMetadataEvent, +} from '../../../../../src/rpc/navie/thread/events'; +import { ThreadIndexService } from '../../../../../src/rpc/navie/services/threadIndexService'; +import { randomUUID } from 'crypto'; +import handleReview from '../../../../../src/rpc/explain/review'; +import { NavieRpc } from '@appland/rpc'; + +const exampleSuggestion = { + command: '@test', + prompt: 'this is a test stub', + label: 'stubbed suggestion', + overallScore: 0.5, +}; +jest.mock('../../../../../src/rpc/navie/suggest', () => ({ + getSuggestions: jest.fn().mockImplementation(() => Promise.resolve([exampleSuggestion])), +})); + +jest.mock('../../../../../src/rpc/explain/review', () => ({ + __esModule: true, + default: jest.fn().mockResolvedValue({ applied: false }), +})); + +const mockGetSuggestions = jest.mocked(getSuggestions); +const mockHandleReview = jest.mocked(handleReview); + +// Casting a Thread to a ThreadPrivate allows us to access private members +// in test without exposing them to the public API. +type ThreadPrivate = Omit & { + log: typeof Thread.prototype['log']; + listeners: typeof Thread.prototype['listeners']; + logEvent: typeof Thread.prototype['logEvent']; + lastEventWritten: typeof Thread.prototype['lastEventWritten']; + flush: typeof Thread.prototype['flush']; + onToken: typeof Thread.prototype['onToken']; +}; + +describe('Thread', () => { + let thread: ThreadPrivate; + let conversationThread: ConversationThread; + let mockNavie: INavie; + let mockNavieProvider: INavieProvider; + let navieEventEmitter: EventEmitter; + + beforeEach(() => { + jest.clearAllMocks(); + navieEventEmitter = new EventEmitter(); + mockNavie = { + providerName: 'test', + setOption: jest.fn(), + ask: jest.fn().mockResolvedValue(undefined), + on: jest.fn().mockImplementation((event: string, listener: (...args: unknown[]) => void) => { + navieEventEmitter.on(event, listener); + return mockNavie; + }), + }; + mockNavieProvider = () => mockNavie; + NavieService.bindNavieProvider(mockNavieProvider); + const navieService = container.resolve(NavieService); + + conversationThread = { id: 'example-thread-id' } as unknown as ConversationThread; + thread = new Thread(conversationThread, navieService) as unknown as ThreadPrivate; + }); + + describe('initialize', () => { + it('writes a thread-init event containing the conversation thread', () => { + thread.initialize(); + + expect(thread.conversationThread).toBe(conversationThread); + expect(thread.log).toEqual([ + { + type: 'thread-init', + time: expect.any(Number), + conversationThread, + }, + ]); + }); + }); + + describe('logEvent', () => { + it('appends a timestamp to the event', () => { + thread.logEvent({ type: 'message', role: 'assistant', content: 'test', messageId: '1' }); + expect(thread.log[0].time).toBeLessThanOrEqual(Date.now().valueOf()); + }); + + it('emits an `event` event', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + thread.logEvent({ type: 'message', role: 'assistant', content: 'test', messageId: '1' }); + expect(listener).toHaveBeenCalledWith({ + type: 'message', + role: 'assistant', + content: 'test', + messageId: '1', + time: expect.any(Number), + }); + }); + }); + + describe('emitSuggestions', () => { + it('emits suggestions upon completing an assistant message', async () => { + thread.flush = jest.fn().mockResolvedValue(undefined); + const result = thread.sendMessage('test message'); + + // Some async calls happen before event listeners are bound + setImmediate(() => navieEventEmitter.emit('ack', 'user-message-id')); + await expect(result).resolves.toBeUndefined(); + + navieEventEmitter.emit('token', 'hello world', 'assistant-message-id'); + navieEventEmitter.emit('complete'); + expect(thread.flush).toHaveBeenCalled(); + + // Flush is async, so we need to wait for it to complete + await new Promise((resolve) => setImmediate(resolve)); + expect(mockGetSuggestions).toHaveBeenCalledWith( + mockNavieProvider, + thread.conversationThread.id + ); + expect(thread.log[thread.log.length - 1]).toEqual({ + type: 'prompt-suggestions', + suggestions: [exampleSuggestion], + time: expect.any(Number), + messageId: 'assistant-message-id', + }); + }); + }); + + describe('getHistoryFilePath', () => { + it('returns the expected path', () => { + expect(Thread.getHistoryFilePath('example-thread-id')).toBe( + join(homedir(), '.appmap', 'navie', 'history', 'example-thread-id.navie.jsonl') + ); + }); + }); + + describe('removeAllListeners', () => { + it('unbinds all listeners for a given client identifier', () => { + const clientId = 'test-client'; + const numListeners = 5; + const listeners = Array.from({ length: numListeners }, () => jest.fn()); + listeners.forEach((listener) => thread.on('event', clientId, listener)); + + thread.removeAllListeners(clientId); + expect(thread.listeners.get(clientId)).toBeUndefined(); + + thread.logEvent({ type: 'message', role: 'assistant', content: 'test', messageId: '1' }); + listeners.forEach((listener) => expect(listener).not.toHaveBeenCalled()); + }); + }); + + describe('flush', () => { + const staticThread = Thread as unknown as { HISTORY_DIRECTORY: string }; + let expectedPath: string; + let originalHistoryPath: string; + let threadIndexService: ThreadIndexService; + let tmpDir: string; + + beforeEach(async () => { + container.reset(); + tmpDir = await mkdtemp(join(tmpdir(), 'appmap-test-')); + originalHistoryPath = staticThread.HISTORY_DIRECTORY; + staticThread.HISTORY_DIRECTORY = tmpDir; + expectedPath = Thread.getHistoryFilePath(thread.conversationThread.id); + threadIndexService = { + index: jest.fn(), + } as unknown as ThreadIndexService; + container.register(ThreadIndexService, { useValue: threadIndexService }); + }); + + afterEach(async () => { + container.reset(); + staticThread.HISTORY_DIRECTORY = originalHistoryPath; + await rm(tmpDir, { recursive: true, force: true }); + }); + + it('does nothing if the event log is empty', async () => { + await thread.flush(); + await expect(exists(expectedPath)).resolves.toBe(false); + }); + + it('does nothing if the event log is unchanged since the last flush', async () => { + thread.logEvent({ type: 'message', role: 'assistant', content: 'test', messageId: '1' }); + thread.lastEventWritten = thread.log.length; + + await thread.flush(); + await expect(exists(expectedPath)).resolves.toBe(false); + }); + + it('writes the event log to disk', async () => { + const messages: NavieEvent[] = Array.from({ length: 5 }, (i) => ({ + type: 'message', + role: 'assistant', + content: 'test', + messageId: String(i), + })); + messages.forEach((m) => thread.logEvent(m)); + + await thread.flush(); + + const events = await readFile(expectedPath, { encoding: 'utf-8' }) + .then((data) => data.split('\n').filter(Boolean)) + .then((lines) => lines.map((line) => JSON.parse(line) as Record)); + expect(events).toStrictEqual(messages.map((m) => ({ ...m, time: expect.any(Number) }))); + }); + + it('creates directories if they do not exist', async () => { + staticThread.HISTORY_DIRECTORY = join(tmpDir, 'does-not-exist', 'this-does-not-either'); + expectedPath = Thread.getHistoryFilePath(thread.conversationThread.id); + + // Write an event to the thread + thread.initialize(); + + await thread.flush(); + await expect(exists(expectedPath)).resolves.toBe(true); + }); + + it('updates the thread index with the last user message as the title', async () => { + const content = 'example title'; + thread.initialize(); + thread.logEvent({ type: 'message', role: 'user', content, messageId: '1' }); + await thread.flush(); + expect(threadIndexService.index).toHaveBeenCalledWith( + thread.conversationThread.id, + expectedPath, + content + ); + }); + + it('truncates the last user message if it is too long', async () => { + const content = 'a'.repeat(1000); + thread.initialize(); + thread.logEvent({ type: 'message', role: 'user', content, messageId: '1' }); + await thread.flush(); + expect(threadIndexService.index).toHaveBeenCalledWith( + thread.conversationThread.id, + expectedPath, + 'a'.repeat(100) + ); + }); + + it('does not raise an error if the thread index fails to update', async () => { + thread.initialize(); + threadIndexService.index = jest.fn().mockImplementation(() => { + throw new Error('test error'); + }); + await expect(thread.flush()).resolves.toBeUndefined(); + }); + + it('does not raise an error if the history file fails to write', async () => { + thread.initialize(); + await writeFile(expectedPath, ''); + await chmod(expectedPath, 0o400); + await expect(thread.flush()).resolves.toBeUndefined(); + await expect(readFile(expectedPath, { encoding: 'utf-8' })).resolves.toBe(''); + }); + + it('appends to an existing history file as expected', async () => { + thread.initialize(); + await thread.flush(); + + thread.logEvent({ type: 'message', role: 'assistant', content: 'test', messageId: '1' }); + await thread.flush(); + + const events = await readFile(expectedPath, { encoding: 'utf-8' }) + .then((data) => data.split('\n').filter(Boolean)) + .then((lines) => lines.map((line) => JSON.parse(line) as Record)); + expect(events).toStrictEqual([ + { + type: 'thread-init', + conversationThread: thread.conversationThread, + time: expect.any(Number), + }, + { + type: 'message', + role: 'assistant', + content: 'test', + messageId: '1', + time: expect.any(Number), + }, + ]); + }); + }); + + describe('pinItem', () => { + it('emits a `pin-item` event', () => { + const listener = jest.fn(); + const handle = randomUUID(); + thread.on('event', 'test-client', listener); + thread.pinItem({ handle }); + expect(listener).toHaveBeenCalledWith({ + type: 'pin-item', + handle, + time: expect.any(Number), + }); + + const uri = 'file://test-uri.md'; + thread.pinItem({ uri }); + expect(listener).toHaveBeenCalledWith({ + type: 'pin-item', + uri, + time: expect.any(Number), + }); + }); + }); + + describe('unpinItem', () => { + it('emits an `unpin-item` event', () => { + const listener = jest.fn(); + const handle = randomUUID(); + thread.on('event', 'test-client', listener); + thread.unpinItem({ handle }); + expect(listener).toHaveBeenCalledWith({ + type: 'unpin-item', + handle, + time: expect.any(Number), + }); + + const uri = 'file://test-uri.md'; + thread.unpinItem({ uri }); + expect(listener).toHaveBeenCalledWith({ + type: 'unpin-item', + uri, + time: expect.any(Number), + }); + }); + }); + + describe('onToken', () => { + it('emits tokens as `token` events', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + thread.onToken('test token', 'test-message-id'); + expect(listener).toHaveBeenCalledWith({ + type: 'token', + token: 'test token', + messageId: 'test-message-id', + time: expect.any(Number), + }); + }); + + it('strips file comment directives and emits them as token-metadata events', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + thread.onToken(``, 'test-message-id'); + expect(listener).toHaveBeenCalledWith({ + type: 'token-metadata', + codeBlockId: expect.any(String), + metadata: { + location: 'test-file.md', + }, + time: expect.any(Number), + }); + }); + + it('emits top level code block languages as token-metadata events', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + thread.onToken(`\`\`\`ruby\n# test\n\`\`\``, 'test-message-id'); + expect(listener).toHaveBeenCalledWith({ + type: 'token-metadata', + codeBlockId: expect.any(String), + metadata: { + language: 'ruby', + }, + time: expect.any(Number), + }); + }); + + it('does not emit nested code block languages as token-metadata events', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + thread.onToken( + ['````ruby', '```python', '# test', '```', '````'].join('\n'), + 'test-message-id' + ); + expect(listener).toHaveBeenCalledWith({ + type: 'token-metadata', + codeBlockId: expect.any(String), + metadata: { + language: 'ruby', + }, + time: expect.any(Number), + }); + expect(listener).not.toHaveBeenCalledWith({ + type: 'token-metadata', + codeBlockId: expect.any(String), + metadata: { + language: 'python', + }, + time: expect.any(Number), + }); + }); + + it('does not strip code fences from the token', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + thread.onToken('```ruby\n# test\n```', 'test-message-id'); + expect(listener.mock.calls.flat()).toStrictEqual([ + { + type: 'token', + token: '```', + messageId: 'test-message-id', + time: expect.any(Number), + codeBlockId: expect.any(String), + }, + { + type: 'token-metadata', + codeBlockId: expect.any(String), + metadata: { + language: 'ruby', + }, + time: expect.any(Number), + }, + { + type: 'token', + token: 'ruby\n# test\n', + messageId: 'test-message-id', + time: expect.any(Number), + codeBlockId: expect.any(String), + }, + { + type: 'token', + token: '```', + messageId: 'test-message-id', + time: expect.any(Number), + codeBlockId: expect.any(String), + }, + ]); + }); + + it('strips file comment directives from the token stream', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + thread.onToken(`\n# test\n`, 'test-message-id'); + expect(listener.mock.calls.flat()).toStrictEqual([ + { + type: 'token-metadata', + codeBlockId: expect.any(String), + metadata: { + location: 'test-file.md', + }, + time: expect.any(Number), + }, + { + type: 'token', + token: '# test\n', + messageId: 'test-message-id', + time: expect.any(Number), + codeBlockId: expect.any(String), + }, + ]); + }); + + it('handles cases where the file directive is inside a code block', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + thread.onToken( + ['```ruby', '', '# test', '```'].join('\n'), + 'test-message-id' + ); + + const calls = listener.mock.calls.flat(); + // There should only be a single code block + const codeBlockId = (calls as NavieEvent[]) + .filter((e): e is NavieTokenMetadataEvent => e.type === 'token-metadata') + .find((e) => e.codeBlockId)?.codeBlockId; + + expect(codeBlockId).toBeDefined(); + expect(calls).toStrictEqual([ + { + type: 'token', + token: '```', + messageId: 'test-message-id', + time: expect.any(Number), + codeBlockId, + }, + { + type: 'token-metadata', + metadata: { + location: 'app/models/user.rb', + }, + time: expect.any(Number), + codeBlockId, + }, + { + type: 'token-metadata', + metadata: { + language: 'ruby', + }, + time: expect.any(Number), + codeBlockId, + }, + { + type: 'token', + token: 'ruby\n# test\n', + messageId: 'test-message-id', + time: expect.any(Number), + codeBlockId, + }, + { + type: 'token', + token: '```', + messageId: 'test-message-id', + time: expect.any(Number), + codeBlockId, + }, + ]); + }); + }); + + describe('sendMessage', () => { + it('has context mutated by `@review`', async () => { + const message = '@review my code'; + const context: NavieRpc.V1.UserContext.ContextItem[] = [{ type: 'static', content: 'test' }]; + const result = thread.sendMessage(message, context); + + await new Promise((resolve) => setImmediate(resolve)); + setImmediate(() => navieEventEmitter.emit('ack', 'user-message-id')); + + await expect(result).resolves.toBeUndefined(); + expect(mockHandleReview).toHaveBeenCalledWith(message, [ + { content: 'test', type: 'code-snippet', location: undefined }, + ]); + }); + + it('includes message attachments added since the last user message', async () => { + const content = 'test-content'; + const uri = 'test-uri'; + + thread.addMessageAttachment(uri, content); + const result = thread.sendMessage('test message'); + await new Promise((resolve) => setImmediate(resolve)); + setImmediate(() => navieEventEmitter.emit('ack', 'user-message-id')); + + await expect(result).resolves.toBeUndefined(); + expect(mockNavie.ask).toHaveBeenCalledWith( + thread.conversationThread.id, + 'test message', + [{ content, location: uri, type: 'code-snippet' }], + undefined + ); + }); + }); + + describe('addMessageAttachment', () => { + it('emits an `add-message-attachment` event', () => { + const listener = jest.fn(); + thread.on('event', 'test-client', listener); + const attachmentId = thread.addMessageAttachment('test-uri', 'test-content'); + expect(listener).toHaveBeenCalledWith({ + type: 'add-message-attachment', + attachmentId, + uri: 'test-uri', + content: 'test-content', + time: expect.any(Number), + }); + }); + + it('returns the attachment id', () => { + const result = thread.addMessageAttachment('test-uri', 'test-content'); + const [event] = thread.log; + expect(result).toBe((event as NavieAddMessageAttachmentEvent).attachmentId); + }); + }); + + describe('removeMessageAttachment', () => { + it('emits an `remove-message-attachment` event', () => { + const listener = jest.fn(); + const attachmentId = 'might-not-exist'; + thread.on('event', 'test-client', listener); + thread.removeMessageAttachment(attachmentId); + expect(listener).toHaveBeenCalledWith({ + type: 'remove-message-attachment', + attachmentId, + time: expect.any(Number), + }); + }); + }); + + describe('getMessageAttachments', () => { + it('returns an attachment that was added since the last user message', () => { + const content = 'test-content'; + const uri = 'test-uri'; + const attachmentId = thread.addMessageAttachment(uri, content); + expect(thread.getMessageAttachments()).toStrictEqual([ + { attachmentId, uri, content, time: expect.any(Number), type: 'add-message-attachment' }, + ]); + }); + + it('ignores attachments that were added before the last user message', () => { + thread.addMessageAttachment('test-attachment-id', 'test-uri'); + thread.logEvent({ type: 'message', role: 'user', content: 'test', messageId: '1' }); + expect(thread.getMessageAttachments()).toStrictEqual([]); + }); + + it('ignores attachments that were removed', () => { + const attachmentId = thread.addMessageAttachment('test-attachment-id', 'test-uri'); + thread.removeMessageAttachment(attachmentId); + expect(thread.getMessageAttachments()).toStrictEqual([]); + }); + }); + + describe('getEvents', () => { + const events: NavieEvent[] = [ + { type: 'message', role: 'assistant', content: 'test', messageId: '1' }, + { type: 'message', role: 'user', content: 'test', messageId: '2' }, + { type: 'message', role: 'assistant', content: 'test', messageId: '3' }, + { type: 'message', role: 'user', content: 'test', messageId: '4' }, + ]; + + beforeEach(() => { + events.forEach((e) => thread.logEvent(e)); + }); + + it('returns the full event log', () => { + expect(thread.getEvents()).toStrictEqual( + events.map((e) => ({ ...e, time: expect.any(Number) })) + ); + }); + + it('returns the event log since the given nonce', () => { + expect(thread.getEvents(2)).toStrictEqual( + events.slice(2).map((e) => ({ ...e, time: expect.any(Number) })) + ); + }); + }); + + describe('getChatHistory', () => { + it('returns the full chat history', () => { + const events: NavieMessageEvent[] = [ + { type: 'message', role: 'assistant', content: 'test', messageId: '1' }, + { type: 'message', role: 'user', content: 'test', messageId: '2' }, + { type: 'message', role: 'assistant', content: 'test', messageId: '3' }, + { type: 'message', role: 'user', content: 'test', messageId: '4' }, + ]; + events.forEach((e) => thread.logEvent(e)); + expect(thread.getChatHistory()).toStrictEqual( + events.map(({ role, content }) => ({ role, content })) + ); + }); + + it('reconstructs tokenized messages', () => { + const events: NavieEvent[] = [ + { type: 'message', role: 'user', content: 'hello', messageId: '1' }, + { type: 'token', token: 'hello', messageId: '2' }, + { type: 'token', token: ' ', messageId: '2' }, + { type: 'token', token: 'user', messageId: '2' }, + { type: 'token', token: '!', messageId: '2' }, + { type: 'message-complete', messageId: '2' }, + { type: 'message', role: 'user', content: ':)', messageId: '1' }, + ]; + events.forEach((e) => thread.logEvent(e)); + expect(thread.getChatHistory()).toStrictEqual([ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'hello user!' }, + { role: 'user', content: ':)' }, + ]); + }); + }); +}); diff --git a/packages/cli/tsconfig.json b/packages/cli/tsconfig.json index d6ef3ce3c7..d03fd34dcf 100644 --- a/packages/cli/tsconfig.json +++ b/packages/cli/tsconfig.json @@ -2,6 +2,8 @@ "compilerOptions": { "allowJs": true, "esModuleInterop": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, "lib": ["ES2019", "DOM"], "module": "commonjs", "moduleResolution": "node", diff --git a/packages/components/src/components/chat-search/PinnedItems.vue b/packages/components/src/components/chat-search/PinnedItems.vue index 08a70fdff3..7f71ef6546 100644 --- a/packages/components/src/components/chat-search/PinnedItems.vue +++ b/packages/components/src/components/chat-search/PinnedItems.vue @@ -12,13 +12,14 @@
{{ pin.content }}{{ getPinnedContent(pin.handle).content }}
@@ -33,18 +34,13 @@ import VMarkdownCodeSnippet from '@/components/chat/MarkdownCodeSnippet.vue'; import VMermaidDiagram from '@/components/chat/MermaidDiagram.vue'; import VVSCodeNotice from '@/components/chat-search/VSCodeNotice.vue'; import VIntelliJNotice from '@/components/chat-search/IntelliJNotice.vue'; +import { pinnedItemRegistry } from '@/lib/pinnedItems'; const EditorNoticeComponents = { vscode: VVSCodeNotice, intellij: VIntelliJNotice, }; -const PinnedContextComponents = { - 'code-snippet': VMarkdownCodeSnippet, - mermaid: VMermaidDiagram, - file: VFile, -}; - export default { name: 'v-pinned-items', @@ -77,8 +73,22 @@ export default { getNoticeComponent(): Vue.Component { return EditorNoticeComponents[this.editorType]; }, - getPinnedComponent({ type }: any): Vue.Component | undefined { - return PinnedContextComponents[type]; + getPinnedComponent(handle: number): Vue.Component | undefined { + const pinnedItem = pinnedItemRegistry.get(handle); + + // If it's not registered, it's an external file. + if (!pinnedItem) return VFile; + + const language = pinnedItem.metadata?.language; + if (language === 'mermaid') return VMermaidDiagram; + return VMarkdownCodeSnippet; + }, + getPinnedContent(handle: number): ObservableContent { + return pinnedItemRegistry.get(handle) ?? {}; + }, + getMetadata(pinnedItem: PinnedItem): Record { + const registryItem = pinnedItemRegistry.get(pinnedItem.handle); + return registryItem?.metadata ?? pinnedItem; }, unpin(handle: number) { this.$emit('pin', { handle, pinned: false }); diff --git a/packages/components/src/components/chat-search/StreamingMessageContent.ts b/packages/components/src/components/chat-search/StreamingMessageContent.ts index 3fd37b8cd8..0e392bef97 100644 --- a/packages/components/src/components/chat-search/StreamingMessageContent.ts +++ b/packages/components/src/components/chat-search/StreamingMessageContent.ts @@ -1,19 +1,7 @@ import Vue from 'vue'; -import VMarkdownCodeSnippet from '@/components/chat/MarkdownCodeSnippet.vue'; -import VMermaidDiagram from '@/components/chat/MermaidDiagram.vue'; +import VCodeFencedContent from '@/components/chat/CodeFencedContent.vue'; import VNextPromptButton from '@/components/chat/NextPromptButton.vue'; -function findCursorNode(node: Node): Node | undefined { - const children = Array.from(node.childNodes); - while (children.length) { - const child = children.pop() as Node; - const textNode = findCursorNode(child); - if (textNode) return textNode; - } - - if (node.textContent && node.textContent.trim() !== '') return node; -} - function getAttributeRecord(attrs: NamedNodeMap): Record { return Array.from(attrs).reduce((memo, attr) => { memo[attr.name] = attr.value; @@ -50,18 +38,7 @@ function buildNode(h: Vue.CreateElement, src: Element, $root: Vue): Vue.VNode | // HACK: Attributes are converted to props // This isn't a big deal now, but worth keeping in mind as a potential issue // in the future. - return h( - tag, - { props }, - children.map((c) => { - // HACK: Cursor is a special case given the way we're rendering code snippets. - // We write the source code directly into a slot to avoid the need to escape or - // encode the source text as a prop when rendering markdown. Thus, the cursor is - // rendered into it's own special slot. - const isCursor = typeof c === 'object' && c.data && c.data.class === 'cursor'; - return isCursor ? h('span', { slot: 'cursor', class: 'cursor' }) : c; - }) - ); + return h(tag, { props }, children); } return h( tag, @@ -85,6 +62,9 @@ function buildNode(h: Vue.CreateElement, src: Element, $root: Vue): Vue.VNode | ); } +/** + * This component is responsible for dynamically rendering HTML content containing Vue components. + */ export default Vue.extend({ name: 'v-streaming-message-content', props: { @@ -92,8 +72,7 @@ export default Vue.extend({ active: Boolean, }, components: { - VMarkdownCodeSnippet, - VMermaidDiagram, + VCodeFencedContent, VNextPromptButton, }, data() { @@ -105,14 +84,6 @@ export default Vue.extend({ render(h): Vue.VNode { const dom = this.parser.parseFromString(this.content.trim(), 'text/html'); - if (this.active) { - const textNode = findCursorNode(dom.body); - const cursor = dom.createElement('span'); - cursor.classList.add('cursor'); - const targetElement = textNode?.parentElement ?? dom.body; - targetElement.appendChild(cursor); - } - const children = []; for (let i = 0; i < dom.body.childNodes.length; i++) { const vnode = buildNode(h, dom.body.childNodes[i] as Element, this.$root); diff --git a/packages/components/src/components/chat/Chat.vue b/packages/components/src/components/chat/Chat.vue index 261226044d..40e3593664 100644 --- a/packages/components/src/components/chat/Chat.vue +++ b/packages/components/src/components/chat/Chat.vue @@ -29,7 +29,7 @@ @@ -94,13 +95,14 @@ export type CodeSelection = { }; export interface ITool { + id?: string; title: string; status?: string; complete?: boolean; } interface IMessage { - message: string; + tokens: (string | CodeBlockReference)[]; isUser: boolean; isError: boolean; complete?: boolean; @@ -111,6 +113,7 @@ interface IMessage { } class UserMessage implements IMessage { + public tokens: string[] = []; public readonly messageId = undefined; public readonly sentiment = undefined; public readonly isUser = true; @@ -119,22 +122,51 @@ class UserMessage implements IMessage { public readonly complete = true; public readonly codeSelections = []; - constructor(public content: string) {} + constructor(content: string) { + this.tokens.push(content); + } +} + +interface CodeBlockReference { + type: 'code-block'; + id: string; } +interface HiddenToken { + type: 'hidden'; + content: string; +} + +type Token = string | CodeBlockReference | HiddenToken; + class AssistantMessage implements IMessage { - public content = ''; + public tokens: Token[] = []; public sentiment = undefined; public complete = false; public readonly isUser = false; public readonly isError = false; public readonly tools = []; public readonly codeSelections = undefined; + public readonly promptSuggestions: undefined | NavieRpc.V1.Suggest.NextStep[] = undefined; + private readonly codeBlocks: CodeBlockReference[] = []; constructor(public messageId?: string) {} - append(token: string) { - Vue.set(this, 'content', [this.content, token].join('')); + append(token: Token) { + if (typeof token === 'object' && 'type' in token) { + if (token.type === 'code-block') { + if (this.codeBlocks.some((b) => b.id === token.id)) { + return; + } + this.codeBlocks.push(token); + } + } + + this.tokens.push(token); + } + + setPromptSuggestions(suggestions: NavieRpc.V1.Suggest.NextStep[]) { + Vue.set(this, 'promptSuggestions', suggestions); } } @@ -196,11 +228,13 @@ export default { email: { type: String, }, + threadId: { + type: String, + }, }, data() { return { messages: [] as IMessage[], - threadId: undefined as string | undefined, authorized: true, autoScrollTop: 0, enableScrollLog: false, // Auto-scroll can be tricky, so there is special logging to help debug it. @@ -226,44 +260,18 @@ export default { }, }, methods: { - restoreThread(threadId: string, thread: ExplainRpc.Thread) { - // In hindsight, the thread should have an id property. - this.threadId = threadId; - let populatedCodeSelection = false; - for (const exchange of thread.exchanges) { - if (exchange.question) { - // TODO: User message provides prompt, but the UI does not have a place for it. - const { content, codeSelection } = exchange.question; - const userMessage = this.addUserMessage(content); - if (codeSelection && !populatedCodeSelection) { - populatedCodeSelection = true; - // TODO: There's some mismatch here between what the UI shows & what's in the thread data. - userMessage.codeSelections = [codeSelection]; - } - } - if (exchange.answer) { - const { content } = exchange.answer; - const systemMessage = this.addSystemMessage(); - systemMessage.content = content; - systemMessage.complete = true; - } - } - }, getMessage(query: Partial): IMessage | undefined { return this.messages.find((m) => { return Object.keys(query).every((key) => m[key] === query[key]); }); }, // Creates-or-appends a message. - addToken(token: string, threadId: string, messageId: string) { - if (threadId !== this.threadId) return; - + addToken(token: string, messageId: string) { if (!messageId) console.warn('messageId is undefined'); - if (!threadId) console.warn('threadId is undefined'); let assistantMessage = this.getMessage({ messageId }); if (!assistantMessage) { - assistantMessage = new AssistantMessage(messageId); + assistantMessage = Vue.observable(new AssistantMessage(messageId)); this.messages.push(assistantMessage); } @@ -286,7 +294,7 @@ export default { return userMessage; }, addSystemMessage() { - const message = new AssistantMessage(); + const message = Vue.observable(new AssistantMessage()); this.messages.push(message); return message; }, @@ -307,26 +315,14 @@ export default { } }, async onSend(message: string) { - const userMessage = this.addUserMessage(message); - userMessage.codeSelections = this.codeSelections; - - this.sendMessage( - message, - this.codeSelections.map((s) => s.code), - this.appmaps - ); - - this.codeSelections = []; + this.sendMessage(message, [], this.appmaps); + this.$set(this, 'codeSelections', []); }, onStop() { this.$emit('stop'); }, onAck(_messageId: string, threadId: string) { this.setAuthorized(true); - if (threadId !== this.threadId) { - this.threadId = threadId; - this.$root.$emit('thread-id', threadId); - } }, scrollToBottom() { // Allow one tick to progress to allow any DOM changes to be applied @@ -382,6 +378,13 @@ export default { includeCodeSelection(codeSelection: CodeSelection) { this.codeSelections.push(codeSelection); }, + removeCodeSelection(attachmentId: string) { + this.$set( + this, + 'codeSelections', + this.codeSelections.filter((c) => c.attachmentId !== attachmentId) + ); + }, includeAppMap(appmap: string) { this.appmaps.push(appmap); }, @@ -392,6 +395,9 @@ export default { this.$refs.input.setInput(input); this.$refs.input.moveCursorToEnd(); }, + clear() { + this.$set(this, 'messages', []); + }, }, watch: { isChatting() { diff --git a/packages/components/src/components/chat/ChatInput.vue b/packages/components/src/components/chat/ChatInput.vue index 3cbc5039f3..75eea4f57e 100644 --- a/packages/components/src/components/chat/ChatInput.vue +++ b/packages/components/src/components/chat/ChatInput.vue @@ -23,13 +23,9 @@ data-cy="input-attachments" > @@ -282,7 +278,7 @@ $border-color: rgba(white, 0.333); display: flex; flex-direction: column; gap: 1rem; - padding: 1.5rem; + padding: 1rem 1.5rem 1.5rem 1.5rem; border-top: 1px solid $color-background-dark; box-shadow: 0 0 1rem 0rem $color-tile-shadow; border-radius: $border-radius $border-radius 0 0; diff --git a/packages/components/src/components/chat/CodeFencedContent.vue b/packages/components/src/components/chat/CodeFencedContent.vue new file mode 100644 index 0000000000..ddd1689ce9 --- /dev/null +++ b/packages/components/src/components/chat/CodeFencedContent.vue @@ -0,0 +1,37 @@ + + + diff --git a/packages/components/src/components/chat/CodeSelection.vue b/packages/components/src/components/chat/CodeSelection.vue index 5dc8424698..9d8570bd95 100644 --- a/packages/components/src/components/chat/CodeSelection.vue +++ b/packages/components/src/components/chat/CodeSelection.vue @@ -2,59 +2,38 @@
   
 
 
 
diff --git a/packages/components/src/components/chat/MermaidDiagram.vue b/packages/components/src/components/chat/MermaidDiagram.vue
index 231167bbe8..1a970f4dfa 100644
--- a/packages/components/src/components/chat/MermaidDiagram.vue
+++ b/packages/components/src/components/chat/MermaidDiagram.vue
@@ -3,6 +3,7 @@
     :title="title"
     :menu-items="menuItems"
     :handle="handle"
+    :is-reference="isReference"
     content-type="image"
     @expand="showModal"
     @pin="onPin"
@@ -37,7 +38,8 @@ import pako from 'pako';
 import { fromUint8Array } from 'js-base64';
 
 import type ContextContainerMenuItem from './ContextContainerMenuItem';
-import type { PinEvent, PinMermaid } from './PinEvent';
+import type { PinEvent } from './PinEvent';
+import stripCodeFences from '@/lib/stripCodeFences';
 
 mermaid.initialize({
   startOnLoad: false,
@@ -76,10 +78,11 @@ export default Vue.extend({
     },
   },
   data() {
+    let definition = stripCodeFences(this.$slots.default?.[0].text ?? '');
     return {
+      definition,
       id: `mermaid-${diagramId++}`,
       svg: undefined as string | undefined,
-      definition: this.$slots.default?.[0].text ?? '',
       modalVisible: false,
     };
   },
@@ -161,30 +164,19 @@ export default Vue.extend({
       navigator.clipboard.writeText(this.definition);
     },
     onPin({ pinned, handle }: PinEvent) {
-      const eventData: PinEvent & Partial = { pinned, handle };
-      if (pinned) {
-        eventData.type = 'mermaid';
-        eventData.content = this.definition;
-      }
-      this.$root.$emit('pin', eventData);
+      this.$root.$emit('pin', { pinned, handle });
     },
   },
   updated() {
     // Slots are not reactive unless written directly to the DOM.
     // Luckily for us, this method is called when the content within the slot changes.
-    this.definition = this.$slots.default?.[0].text ?? '';
+    this.definition = stripCodeFences(this.$slots.default?.[0].text ?? '');
   },
 });
 
 
 
+