import { TFile } from 'obsidian'; import { IndexingQueueItem, IndexingProgress } from '../types'; import { ExtractorManager } from '../extractors'; import { HybridChunker } from '../chunking/chunker'; import { EmbeddingProviderInterface } from '../types'; import { CollectionManager } from '../qdrant/collection'; import { CollectionManager as QdrantCollectionManager } from '../qdrant/collection'; import { generateDeterministicUUID } from '../utils/hash'; export class IndexingQueue { private queue: IndexingQueueItem[] = []; private isProcessing = false; private progress: IndexingProgress; private extractorManager: ExtractorManager; private chunker: HybridChunker; private embeddingProvider: EmbeddingProviderInterface; private collectionManager: QdrantCollectionManager; private onProgressUpdate?: (progress: IndexingProgress) => void; private onError?: (error: string) => void; private maxConcurrency = 3; private batchSize = 10; constructor( extractorManager: ExtractorManager, chunker: HybridChunker, embeddingProvider: EmbeddingProviderInterface, collectionManager: QdrantCollectionManager ) { this.extractorManager = extractorManager; this.chunker = chunker; this.embeddingProvider = embeddingProvider; this.collectionManager = collectionManager; this.progress = { totalFiles: 0, processedFiles: 0, totalChunks: 0, processedChunks: 0, errors: [], isRunning: false }; } /** * Add files to the indexing queue */ addFiles(files: TFile[], action: 'create' | 'update' | 'delete' = 'update'): void { for (const file of files) { this.addFile(file, action); } } /** * Add a single file to the indexing queue */ addFile(file: TFile, action: 'create' | 'update' | 'delete' = 'update'): void { const priority = action === 'delete' ? 0 : (action === 'create' ? 2 : 1); const item: IndexingQueueItem = { file, action, priority }; // Remove existing entry for this file this.queue = this.queue.filter(item => item.file.path !== file.path); // Add new entry this.queue.push(item); // Sort by priority (higher priority first) this.queue.sort((a, b) => b.priority - a.priority); } /** * Start processing the queue */ async startProcessing(): Promise { if (this.isProcessing) { return; } this.isProcessing = true; this.progress.isRunning = true; this.progress.totalFiles = this.queue.length; this.progress.processedFiles = 0; this.progress.errors = []; this.updateProgress(); try { await this.processQueue(); } finally { this.isProcessing = false; this.progress.isRunning = false; this.updateProgress(); } } /** * Stop processing the queue */ stopProcessing(): void { this.isProcessing = false; this.progress.isRunning = false; this.updateProgress(); } /** * Clear the queue */ clearQueue(): void { this.queue = []; this.progress.totalFiles = 0; this.progress.processedFiles = 0; this.updateProgress(); } /** * Get current progress */ getProgress(): IndexingProgress { return { ...this.progress }; } /** * Set progress update callback */ setProgressCallback(callback: (progress: IndexingProgress) => void): void { this.onProgressUpdate = callback; } /** * Set error callback */ setErrorCallback(callback: (error: string) => void): void { this.onError = callback; } private async processQueue(): Promise { while (this.queue.length > 0 && this.isProcessing) { const batch = this.queue.splice(0, this.batchSize); await this.processBatch(batch); } } private async processBatch(batch: IndexingQueueItem[]): Promise { const promises = batch.map(item => this.processItem(item)); await Promise.allSettled(promises); } private async processItem(item: IndexingQueueItem): Promise { try { this.progress.currentFile = item.file.path; this.updateProgress(); if (item.action === 'delete') { await this.deleteFile(item.file); } else { await this.indexFile(item.file); } this.progress.processedFiles++; this.updateProgress(); } catch (error) { const errorMessage = `Failed to process ${item.file.path}: ${error}`; this.progress.errors.push(errorMessage); this.onError?.(errorMessage); console.error(errorMessage, error); } } private async indexFile(file: TFile): Promise { try { // Check if file can be handled if (!this.extractorManager.canHandle(file)) { console.log(`Skipping file ${file.path} - no suitable extractor`); return; } // Extract content const extractedContent = await this.extractorManager.extract(file); if (!extractedContent.text.trim()) { console.log(`Skipping file ${file.path} - no text content`); return; } // Chunk content const chunks = await this.chunker.chunk(extractedContent); if (chunks.length === 0) { console.log(`Skipping file ${file.path} - no chunks created`); return; } this.progress.totalChunks += chunks.length; // Extract chunk texts and generate embeddings const texts = chunks.map(chunk => extractedContent.text.substring(chunk.chunk_start, chunk.chunk_end)); const embeddings = await this.embeddingProvider.embed(texts); // Get model name from the embedding provider const modelName = this.embeddingProvider.getName(); // Prepare points for Qdrant const points = chunks.map((chunk, index) => ({ id: this.generatePointId(file, chunk.chunk_index), vector: embeddings[index], metadata: { ...chunk, model: modelName, // Set the model name for each chunk chunk_text: texts[index] // Include the actual text content } })); // Index in Qdrant await this.collectionManager.indexChunks(points); this.progress.processedChunks += chunks.length; this.updateProgress(); } catch (error) { throw new Error(`Failed to index file ${file.path}: ${error}`); } } private async deleteFile(file: TFile): Promise { try { // Delete all chunks for this file from Qdrant await this.collectionManager.deleteFileChunks(file.path); } catch (error) { throw new Error(`Failed to delete file ${file.path}: ${error}`); } } private generatePointId(file: TFile, chunkIndex: number): string { // Generate a deterministic UUID based on file path and chunk index // This ensures the same file+chunk always gets the same ID const idString = `${file.path}:${chunkIndex}`; return generateDeterministicUUID(idString); } private updateProgress(): void { this.onProgressUpdate?.(this.getProgress()); } /** * Get queue statistics */ getQueueStats(): { queueLength: number; isProcessing: boolean; estimatedTimeRemaining: number; } { const averageTimePerFile = 2000; // 2 seconds per file (rough estimate) const estimatedTimeRemaining = this.queue.length * averageTimePerFile; return { queueLength: this.queue.length, isProcessing: this.isProcessing, estimatedTimeRemaining }; } /** * Get files in queue by action type */ getFilesByAction(action: 'create' | 'update' | 'delete'): TFile[] { return this.queue .filter(item => item.action === action) .map(item => item.file); } /** * Remove files from queue */ removeFiles(filePaths: string[]): void { this.queue = this.queue.filter(item => !filePaths.includes(item.file.path)); this.progress.totalFiles = this.queue.length; this.updateProgress(); } }