From b9e8dc18702f76eedcc950dfc11271b36a731e89 Mon Sep 17 00:00:00 2001 From: Mohamed Salah Date: Thu, 30 Oct 2025 14:51:02 +0300 Subject: [PATCH] Refactor DevOps AI Assistant: Enhance knowledge base parsing and indexing, improve error handling, and update user interface elements --- .../salah/devops-ai-assistance/app.py | 63 +-- .../devops_ai_assistance.py | 482 ++++++++++++++++-- 2 files changed, 470 insertions(+), 75 deletions(-) diff --git a/week5/community-contributions/salah/devops-ai-assistance/app.py b/week5/community-contributions/salah/devops-ai-assistance/app.py index cf19d2f..9a3588b 100644 --- a/week5/community-contributions/salah/devops-ai-assistance/app.py +++ b/week5/community-contributions/salah/devops-ai-assistance/app.py @@ -1,6 +1,6 @@ import os import gradio as gr -from devops_ai_assistance import create_assistant, DevOpsAIAssistant +from devops_ai_assistance import create_assistant assistant = None @@ -8,7 +8,6 @@ status_info = None def initialize_assistant(kb_path: str): - """Initialize the assistant with knowledge base""" global assistant, status_info try: @@ -16,42 +15,41 @@ def initialize_assistant(kb_path: str): if not kb_path: return "Error: Please provide a valid knowledge base path" - print(f"\nšŸš€ Initializing with knowledge base: {kb_path}") + print(f"\nInitializing with knowledge base: {kb_path}") assistant = create_assistant(kb_path) status_info = assistant.get_status() status_message = f""" -āœ… **DevOps AI Assistant Initialized Successfully!** +**DevOps AI Assistant Initialized Successfully** -šŸ“Š **Knowledge Base Statistics:** +**Knowledge Base Statistics:** - Documents Loaded: {status_info['documents_loaded']} - Chunks Created: {status_info['chunks_created']} - Vectors in Store: {status_info['vectors_in_store']} - Knowledge Base Path: {status_info['knowledge_base_path']} -šŸŽÆ **Ready to Answer Questions About:** +**Ready to Answer Questions About:** - Kubernetes infrastructure configuration - ArgoCD deployment manifests - Helm charts and values -- Infrastructure as Code (IaC) -- DevOps best practices in your environment +- Infrastructure as Code +- DevOps best practices -Start by asking questions about your k8s cluster infrastructure! +Start by asking questions about your infrastructure! """ return status_message except Exception as e: error_msg = f"Error initializing assistant: {str(e)}" - print(f"āŒ {error_msg}") - return f"āŒ {error_msg}" + print(f"Error: {error_msg}") + return f"Error: {error_msg}" def chat_with_assistant(message: str, history): - """Chat function for the assistant""" global assistant if not assistant: - bot_response = "āŒ Assistant not initialized. Please provide a knowledge base path first." + bot_response = "Assistant not initialized. Please provide a knowledge base path first." history.append((message, bot_response)) return history, "" @@ -66,11 +64,11 @@ def chat_with_assistant(message: str, history): sources_text = "" if result.get('sources'): - sources_text = "\n\nšŸ“š **Sources:**\n" + sources_text = "\n\n**Sources:**\n" for i, source in enumerate(result['sources'], 1): source_file = source.get('source', 'Unknown') file_type = source.get('file_type', 'Unknown') - sources_text += f"\n{i}. **{source_file}** ({file_type})" + sources_text += f"\n{i}. {source_file} ({file_type})" bot_response = answer + sources_text if sources_text else answer @@ -82,16 +80,13 @@ def chat_with_assistant(message: str, history): def create_interface(): - """Create the Gradio interface""" - global assistant - with gr.Blocks(title="DevOps AI Assistant") as interface: - gr.Markdown("# šŸ¤– DevOps AI Assistant") - gr.Markdown("Intelligent Q&A system for your Kubernetes infrastructure powered by RAG and LLM") + gr.Markdown("# DevOps AI Assistant") + gr.Markdown("Intelligent Q&A system for your infrastructure powered by RAG and LLM") - gr.Markdown("## šŸ”§ Configuration") - gr.Markdown("Enter the path to your GitOps repository (knowledge base) to initialize the assistant") + gr.Markdown("## Configuration") + gr.Markdown("Enter the path to your GitOps repository to initialize the assistant") with gr.Row(): kb_path_input = gr.Textbox( @@ -100,39 +95,38 @@ def create_interface(): lines=1, value="/workspace/aau/repositories/infra-gitops/" ) - init_button = gr.Button("šŸš€ Initialize Assistant") + init_button = gr.Button("Initialize Assistant") - status_output = gr.Markdown(value="ā³ Waiting for initialization...") + status_output = gr.Markdown(value="Waiting for initialization...") - gr.Markdown("## šŸ’¬ Chat Interface") + gr.Markdown("## Chat Interface") chatbot = gr.Chatbot( label="Conversation", height=500, show_copy_button=True, - avatar_images=("šŸ‘¤", "šŸ¤–"), bubble_full_width=False ) with gr.Row(): msg_input = gr.Textbox( label="Your Question", - placeholder="Ask about your k8s infrastructure, ArgoCD, Helm charts, etc...", + placeholder="Ask about your infrastructure, ArgoCD, Helm charts, etc...", lines=2, scale=5 ) - send_button = gr.Button("Send šŸ’¬", scale=1) + send_button = gr.Button("Send", scale=1) with gr.Row(): - clear_button = gr.Button("šŸ—‘ļø Clear Chat", scale=2) + clear_button = gr.Button("Clear Chat", scale=2) - with gr.Accordion("šŸ“‹ Example Questions", open=False): + with gr.Accordion("Example Questions", open=False): gr.Markdown(""" **Infrastructure & Deployment:** -- How is the Kubernetes cluster configured? -- What ArgoCD applications are deployed? +- How many ArgoCD applications? +- What is the repository structure? +- How many YAML files are there? - Show me the Helm chart values for nginx -- What storage solutions are available? **Monitoring & Observability:** - How is Prometheus configured? @@ -174,9 +168,8 @@ def create_interface(): def main(): - """Main entry point""" print("\n" + "=" * 60) - print("šŸš€ DevOps AI Assistant - RAG System") + print("DevOps AI Assistant - RAG System") print("=" * 60) print("Starting Gradio server...") print("\nAccess the application at: http://127.0.0.1:7860") diff --git a/week5/community-contributions/salah/devops-ai-assistance/devops_ai_assistance.py b/week5/community-contributions/salah/devops-ai-assistance/devops_ai_assistance.py index 24242a9..a4adf2c 100644 --- a/week5/community-contributions/salah/devops-ai-assistance/devops_ai_assistance.py +++ b/week5/community-contributions/salah/devops-ai-assistance/devops_ai_assistance.py @@ -1,12 +1,12 @@ import os +import re from pathlib import Path -from typing import List, Optional +from typing import List, Optional, Dict, Any import json import tempfile import shutil from langchain_core.documents import Document -from langchain_community.document_loaders import DirectoryLoader, TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma @@ -24,6 +24,146 @@ class DevOpsKnowledgeBase: self.documents = [] self.chunks = [] self.temp_db_dir = None + self.indices = {} + self.structure = {} + + def _parse_structured_content(self, content: str, file_path: Path) -> dict: + metadata = {} + + try: + if file_path.suffix.lower() in ['.yaml', '.yml']: + import yaml + data = yaml.safe_load(content) + if isinstance(data, dict): + metadata['kind'] = data.get('kind') + metadata['api_version'] = data.get('apiVersion') + + if 'metadata' in data and isinstance(data['metadata'], dict): + for key, value in data['metadata'].items(): + if isinstance(value, (str, int, float, bool)): + metadata[f'meta_{key}'] = value + elif isinstance(value, dict): + for k, v in value.items(): + if isinstance(v, (str, int, float, bool)): + metadata[f'meta_{key}_{k}'] = v + + if 'spec' in data and isinstance(data['spec'], dict): + if 'project' in data['spec']: + metadata['project'] = data['spec']['project'] + if 'destination' in data['spec'] and isinstance(data['spec']['destination'], dict): + if 'namespace' in data['spec']['destination']: + metadata['namespace'] = data['spec']['destination']['namespace'] + + elif file_path.suffix.lower() == '.json': + data = json.loads(content) + if isinstance(data, dict): + for key, value in data.items(): + if isinstance(value, (str, int, float, bool)): + metadata[f'json_{key}'] = value + + elif file_path.suffix.lower() in ['.tf', '.hcl']: + metadata['is_terraform'] = True + resources = re.findall(r'resource\s+"([^"]+)"\s+"([^"]+)"', content) + if resources: + metadata['terraform_resources'] = [r[0] for r in resources] + metadata['resource_count'] = len(resources) + + modules = re.findall(r'module\s+"([^"]+)"', content) + if modules: + metadata['terraform_modules'] = modules + metadata['module_count'] = len(modules) + + elif file_path.suffix.lower() == '.py': + metadata['is_code'] = True + metadata['language'] = 'python' + + imports = re.findall(r'^(?:from|import)\s+(\S+)', content, re.MULTILINE) + classes = re.findall(r'^class\s+(\w+)', content, re.MULTILINE) + functions = re.findall(r'^def\s+(\w+)', content, re.MULTILINE) + + if imports: + metadata['imports'] = imports[:10] + if classes: + metadata['classes'] = classes + metadata['class_count'] = len(classes) + if functions: + metadata['functions'] = functions[:20] + metadata['function_count'] = len(functions) + + elif file_path.suffix.lower() in ['.js', '.ts']: + metadata['is_code'] = True + metadata['language'] = 'javascript' if file_path.suffix == '.js' else 'typescript' + + imports = re.findall(r'import\s+.*\s+from\s+[\'"]([^\'"]+)[\'"]', content) + functions = re.findall(r'(?:function|const|let|var)\s+(\w+)\s*=?\s*(?:async\s*)?\(', content) + classes = re.findall(r'class\s+(\w+)', content) + + if imports: + metadata['imports'] = imports[:10] + if classes: + metadata['classes'] = classes + metadata['class_count'] = len(classes) + if functions: + metadata['function_count'] = len(functions) + + elif file_path.suffix.lower() in ['.go']: + metadata['is_code'] = True + metadata['language'] = 'go' + + packages = re.findall(r'package\s+(\w+)', content) + if packages: + metadata['package'] = packages[0] + + imports = re.findall(r'import\s+[\'"]([^\'"]+)[\'"]', content) + if imports: + metadata['imports'] = imports[:10] + + except Exception as e: + pass + + return metadata + + def _extract_content_patterns(self, content: str) -> dict: + metadata = {} + content_lower = content.lower() + + urls = re.findall(r'https?://[^\s<>"]+', content) + if urls: + metadata['has_urls'] = True + metadata['url_count'] = len(urls) + domains = [] + for url in urls: + domain_match = re.findall(r'https?://([^/]+)', url) + if domain_match: + domains.append(domain_match[0]) + if domains: + metadata['domains'] = list(set(domains))[:5] + + ips = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', content) + if ips: + metadata['has_ips'] = True + metadata['ip_count'] = len(set(ips)) + + versions = re.findall(r'\bv?\d+\.\d+(?:\.\d+)?(?:-[\w.]+)?\b', content) + if versions: + metadata['has_versions'] = True + + patterns = { + 'has_secrets': any(keyword in content_lower for keyword in ['password', 'secret', 'token', 'api_key', 'apikey']), + 'has_monitoring': any(keyword in content_lower for keyword in ['prometheus', 'grafana', 'metrics', 'alert']), + 'has_networking': any(keyword in content_lower for keyword in ['ingress', 'service', 'loadbalancer', 'route']), + 'has_storage': any(keyword in content_lower for keyword in ['volume', 'pvc', 'storage', 'disk']), + 'has_database': any(keyword in content_lower for keyword in ['postgres', 'mysql', 'redis', 'mongodb', 'database']), + 'has_deployment': any(keyword in content_lower for keyword in ['deployment', 'statefulset', 'daemonset', 'replica']), + } + + metadata.update({k: v for k, v in patterns.items() if v}) + + quoted_strings = re.findall(r'"([^"]{3,30})"', content) + if quoted_strings: + metadata['quoted_strings'] = list(set(quoted_strings))[:10] + + return metadata def load_documents(self) -> List[Document]: self.documents = [] @@ -31,7 +171,7 @@ class DevOpsKnowledgeBase: if not self.knowledge_base_path.exists(): raise ValueError(f"Knowledge base path does not exist: {self.knowledge_base_path}") - supported_extensions = {'.yaml', '.yml', '.md', '.txt', '.json'} + supported_extensions = {'.yaml', '.yml', '.md', '.txt', '.json', '.tf', '.hcl', '.py', '.js', '.ts', '.go', '.sh', '.rst'} print(f"Loading documents from {self.knowledge_base_path}...") @@ -43,14 +183,30 @@ class DevOpsKnowledgeBase: if content and len(content) > 50: relative_path = file_path.relative_to(self.knowledge_base_path) - doc = Document( - page_content=content, - metadata={ - "source": str(relative_path), - "file_type": file_path.suffix.lower(), - "path": str(file_path) - } - ) + parts = relative_path.parts + + metadata = { + "source": str(relative_path), + "file_type": file_path.suffix.lower(), + "path": str(file_path), + "filename": file_path.stem, + "full_filename": file_path.name, + "char_count": len(content), + "word_count": len(content.split()), + "line_count": len(content.splitlines()), + "depth": len(parts) - 1, + "parent_dir": parts[-2] if len(parts) > 1 else "root", + "path_level_0": parts[0] if len(parts) > 0 else None, + "path_level_1": parts[1] if len(parts) > 1 else None, + "path_level_2": parts[2] if len(parts) > 2 else None, + "path_level_3": parts[3] if len(parts) > 3 else None, + "full_path_parts": list(parts), + } + + metadata.update(self._parse_structured_content(content, file_path)) + metadata.update(self._extract_content_patterns(content)) + + doc = Document(page_content=content, metadata=metadata) self.documents.append(doc) except Exception as e: @@ -59,35 +215,235 @@ class DevOpsKnowledgeBase: print(f"Loaded {len(self.documents)} documents") return self.documents - def chunk_documents(self, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Document]: - if not self.documents: - raise ValueError("No documents loaded. Call load_documents() first.") + def discover_structure(self) -> dict: + print("\nAuto-discovering repository structure...") - print(f"Splitting {len(self.documents)} documents into chunks...") + structure = { + 'total_files': len(self.documents), + 'by_file_type': {}, + 'by_depth': {}, + 'by_parent_dir': {}, + 'hierarchy': {}, + 'patterns': {} + } - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - separators=["\n\n", "\n", " ", ""] - ) + for doc in self.documents: + file_type = doc.metadata.get('file_type', 'unknown') + structure['by_file_type'][file_type] = structure['by_file_type'].get(file_type, 0) + 1 - self.chunks = text_splitter.split_documents(self.documents) - print(f"Created {len(self.chunks)} chunks") - return self.chunks + depth = doc.metadata.get('depth', 0) + structure['by_depth'][depth] = structure['by_depth'].get(depth, 0) + 1 + + parent = doc.metadata.get('parent_dir', 'unknown') + structure['by_parent_dir'][parent] = structure['by_parent_dir'].get(parent, 0) + 1 + + path_parts = doc.metadata.get('full_path_parts', []) + current_level = structure['hierarchy'] + for part in path_parts[:-1]: + if part not in current_level: + current_level[part] = {'_count': 0, '_children': {}} + current_level[part]['_count'] += 1 + current_level = current_level[part]['_children'] + + structure['patterns'] = self._detect_patterns() + + print(f"\nDiscovered Structure:") + print(f" Total files: {structure['total_files']}") + print(f"\n By file type:") + for ftype, count in sorted(structure['by_file_type'].items(), key=lambda x: x[1], reverse=True): + print(f" {ftype}: {count}") + + print(f"\n By depth:") + for depth, count in sorted(structure['by_depth'].items()): + print(f" Level {depth}: {count} files") + + print(f"\n Top-level directories:") + for dir_name, data in structure['hierarchy'].items(): + print(f" {dir_name}/: {data['_count']} files") + + if structure['patterns']: + print(f"\n Detected patterns:") + for pattern, count in structure['patterns'].items(): + print(f" {pattern}: {count} files") + + self.structure = structure + return structure + + def _detect_patterns(self) -> dict: + patterns = { + 'kubernetes_manifests': 0, + 'terraform_files': 0, + 'python_code': 0, + 'javascript_code': 0, + 'documentation': 0, + 'configuration': 0, + } + + for doc in self.documents: + if doc.metadata.get('kind') or doc.metadata.get('api_version'): + patterns['kubernetes_manifests'] += 1 + if doc.metadata.get('is_terraform'): + patterns['terraform_files'] += 1 + if doc.metadata.get('language') == 'python': + patterns['python_code'] += 1 + if doc.metadata.get('language') in ['javascript', 'typescript']: + patterns['javascript_code'] += 1 + if doc.metadata.get('file_type') in ['.md', '.rst', '.txt']: + patterns['documentation'] += 1 + if doc.metadata.get('file_type') in ['.yaml', '.yml', '.json', '.toml']: + patterns['configuration'] += 1 + + return {k: v for k, v in patterns.items() if v > 0} + + def create_dynamic_indices(self) -> dict: + print("\nCreating dynamic indices...") + + indices = { + 'by_path_level_0': {}, + 'by_path_level_1': {}, + 'by_path_level_2': {}, + 'by_path_level_3': {}, + 'by_file_type': {}, + 'by_kind': {}, + 'by_language': {}, + 'by_parent_dir': {}, + 'by_project': {}, + 'by_namespace': {}, + 'statistics': { + 'total_documents': len(self.documents), + 'total_chars': sum(d.metadata.get('char_count', 0) for d in self.documents), + 'total_lines': sum(d.metadata.get('line_count', 0) for d in self.documents), + } + } + + for doc in self.documents: + source = doc.metadata.get('source') + + for level in range(4): + level_key = f'path_level_{level}' + index_key = f'by_{level_key}' + if level_value := doc.metadata.get(level_key): + if level_value not in indices[index_key]: + indices[index_key][level_value] = [] + indices[index_key][level_value].append(source) + + if file_type := doc.metadata.get('file_type'): + if file_type not in indices['by_file_type']: + indices['by_file_type'][file_type] = [] + indices['by_file_type'][file_type].append(source) + + if kind := doc.metadata.get('kind'): + if kind not in indices['by_kind']: + indices['by_kind'][kind] = [] + indices['by_kind'][kind].append(source) + + if language := doc.metadata.get('language'): + if language not in indices['by_language']: + indices['by_language'][language] = [] + indices['by_language'][language].append(source) + + if parent := doc.metadata.get('parent_dir'): + if parent not in indices['by_parent_dir']: + indices['by_parent_dir'][parent] = [] + indices['by_parent_dir'][parent].append(source) + + if project := doc.metadata.get('project'): + if project not in indices['by_project']: + indices['by_project'][project] = [] + indices['by_project'][project].append(source) + + if namespace := doc.metadata.get('namespace'): + if namespace not in indices['by_namespace']: + indices['by_namespace'][namespace] = [] + indices['by_namespace'][namespace].append(source) + + self.indices = indices + + print(f"\nIndices Created:") + print(f" Total documents indexed: {indices['statistics']['total_documents']}") + print(f" Top-level paths: {len(indices['by_path_level_0'])}") + print(f" File types: {len(indices['by_file_type'])}") + if indices['by_kind']: + print(f" Kubernetes kinds: {len(indices['by_kind'])}") + if indices['by_language']: + print(f" Programming languages: {len(indices['by_language'])}") + + return indices + + def chunk_documents_adaptive(self, documents: List[Document]) -> List[Document]: + print("\nAdaptive chunking based on file characteristics...") + + all_chunks = [] + + strategies = { + 'small_structured': [], + 'large_structured': [], + 'code_files': [], + 'documentation': [], + 'default': [] + } + + for doc in documents: + char_count = doc.metadata.get('char_count', 0) + file_type = doc.metadata.get('file_type', '') + + if file_type in ['.yaml', '.yml', '.json', '.toml']: + if char_count < 2000: + strategies['small_structured'].append(doc) + else: + strategies['large_structured'].append(doc) + elif file_type in ['.py', '.js', '.go', '.java', '.ts', '.rs', '.sh']: + strategies['code_files'].append(doc) + elif file_type in ['.md', '.rst', '.txt']: + strategies['documentation'].append(doc) + else: + strategies['default'].append(doc) + + chunk_configs = { + 'small_structured': {'chunk_size': 2000, 'chunk_overlap': 100}, + 'large_structured': {'chunk_size': 1500, 'chunk_overlap': 200}, + 'code_files': {'chunk_size': 1200, 'chunk_overlap': 150}, + 'documentation': {'chunk_size': 1000, 'chunk_overlap': 200}, + 'default': {'chunk_size': 1000, 'chunk_overlap': 200} + } + + for strategy_name, docs in strategies.items(): + if not docs: + continue + + config = chunk_configs[strategy_name] + splitter = RecursiveCharacterTextSplitter( + chunk_size=config['chunk_size'], + chunk_overlap=config['chunk_overlap'], + separators=["\n\n", "\n", " ", ""] + ) + + chunks = splitter.split_documents(docs) + + for i, chunk in enumerate(chunks): + chunk.metadata['chunk_strategy'] = strategy_name + chunk.metadata['chunk_id'] = f"{strategy_name}_{i:04d}" + + all_chunks.extend(chunks) + print(f" {strategy_name}: {len(docs)} docs → {len(chunks)} chunks") + + self.chunks = all_chunks + print(f" Total: {len(all_chunks)} chunks created") + return all_chunks def initialize_embedding_model(self): - print(f"Initializing embedding model: {self.embedding_model_name}...") + print(f"\nInitializing embedding model: {self.embedding_model_name}...") self.embedding_model = HuggingFaceEmbeddings(model_name=self.embedding_model_name) print("Embedding model initialized") def create_vectorstore(self) -> Chroma: if not self.chunks: - raise ValueError("No chunks available. Call chunk_documents() first.") + raise ValueError("No chunks available. Call chunk_documents_adaptive() first.") if not self.embedding_model: raise ValueError("Embedding model not initialized. Call initialize_embedding_model() first.") - print("Creating vector store...") + print("\nCreating vector store...") if self.temp_db_dir: try: @@ -95,7 +451,16 @@ class DevOpsKnowledgeBase: except: pass - self.temp_db_dir = tempfile.mkdtemp(prefix="devops_kb_") + self.temp_db_dir = tempfile.mkdtemp(prefix="devops_kb_v2_") + + for chunk in self.chunks: + cleaned_metadata = {} + for key, value in chunk.metadata.items(): + if value is not None and not isinstance(value, (list, dict)): + cleaned_metadata[key] = value + elif isinstance(value, list) and value: + cleaned_metadata[key] = str(value) + chunk.metadata = cleaned_metadata self.vectorstore = Chroma.from_documents( documents=self.chunks, @@ -108,15 +473,20 @@ class DevOpsKnowledgeBase: return self.vectorstore def initialize(self): - print("Initializing DevOps Knowledge Base...") - print("=" * 60) + print("=" * 70) + print("Initializing DevOps Knowledge Base") + print("=" * 70) self.load_documents() - self.chunk_documents() + self.discover_structure() + self.create_dynamic_indices() + self.chunk_documents_adaptive(self.documents) self.initialize_embedding_model() self.create_vectorstore() - print("\nKnowledge base initialized successfully!") + print("\n" + "=" * 70) + print("Knowledge base initialized successfully!") + print("=" * 70) return self.vectorstore @@ -129,7 +499,7 @@ class DevOpsAIAssistant: self.llm = None def setup(self): - print("Setting up DevOps AI Assistant...") + print("\nSetting up DevOps AI Assistant...") self.vectorstore = self.knowledge_base.initialize() @@ -137,7 +507,7 @@ class DevOpsAIAssistant: if not api_key: raise ValueError("OPENAI_API_KEY environment variable not set") - print("Initializing OpenAI LLM...") + print("\nInitializing OpenAI LLM...") self.llm = ChatOpenAI( model_name="gpt-4o-mini", temperature=0.3, @@ -152,7 +522,7 @@ class DevOpsAIAssistant: ) print("Creating conversation chain...") - retriever = self.vectorstore.as_retriever(search_kwargs={"k": 5}) + retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10}) self.conversation_chain = ConversationalRetrievalChain.from_llm( llm=self.llm, @@ -162,7 +532,9 @@ class DevOpsAIAssistant: verbose=False ) + print("\n" + "=" * 70) print("DevOps AI Assistant ready!") + print("=" * 70) return self def ask(self, question: str) -> dict: @@ -177,12 +549,26 @@ class DevOpsAIAssistant: } if result.get('source_documents'): + unique_sources = {} for doc in result['source_documents']: - response["sources"].append({ - "content": doc.page_content[:300], - "source": doc.metadata.get('source', 'Unknown'), - "file_type": doc.metadata.get('file_type', 'Unknown') - }) + source = doc.metadata.get('source') + if source not in unique_sources: + path_info = "/".join([ + doc.metadata.get('path_level_0', ''), + doc.metadata.get('path_level_1', ''), + doc.metadata.get('path_level_2', '') + ]).strip('/') + + unique_sources[source] = { + "content": doc.page_content[:300], + "source": source, + "file_type": doc.metadata.get('file_type', 'Unknown'), + "path_info": path_info, + "kind": doc.metadata.get('kind'), + "language": doc.metadata.get('language') + } + + response["sources"] = list(unique_sources.values()) return response @@ -192,7 +578,7 @@ class DevOpsAIAssistant: doc_count = self.vectorstore._collection.count() - return { + status = { "status": "ready", "documents_loaded": len(self.knowledge_base.documents), "chunks_created": len(self.knowledge_base.chunks), @@ -200,6 +586,22 @@ class DevOpsAIAssistant: "knowledge_base_path": str(self.knowledge_base.knowledge_base_path) } + if self.knowledge_base.structure: + status["structure"] = { + "total_files": self.knowledge_base.structure['total_files'], + "file_types": len(self.knowledge_base.structure['by_file_type']), + "patterns": self.knowledge_base.structure['patterns'] + } + + if self.knowledge_base.indices: + status["indices"] = { + "path_levels": len(self.knowledge_base.indices['by_path_level_0']), + "kinds": len(self.knowledge_base.indices['by_kind']), + "languages": len(self.knowledge_base.indices['by_language']) + } + + return status + def create_assistant(knowledge_base_path: str) -> DevOpsAIAssistant: assistant = DevOpsAIAssistant(knowledge_base_path)