diff --git a/week5/community-contributions/salah/devops-ai-assistance/app.py b/week5/community-contributions/salah/devops-ai-assistance/app.py new file mode 100644 index 0000000..9a3588b --- /dev/null +++ b/week5/community-contributions/salah/devops-ai-assistance/app.py @@ -0,0 +1,189 @@ +import os +import gradio as gr +from devops_ai_assistance import create_assistant + + +assistant = None +status_info = None + + +def initialize_assistant(kb_path: str): + global assistant, status_info + + try: + kb_path = kb_path.strip() + if not kb_path: + return "Error: Please provide a valid knowledge base path" + + print(f"\nInitializing with knowledge base: {kb_path}") + assistant = create_assistant(kb_path) + status_info = assistant.get_status() + + status_message = f""" +**DevOps AI Assistant Initialized Successfully** + +**Knowledge Base Statistics:** +- Documents Loaded: {status_info['documents_loaded']} +- Chunks Created: {status_info['chunks_created']} +- Vectors in Store: {status_info['vectors_in_store']} +- Knowledge Base Path: {status_info['knowledge_base_path']} + +**Ready to Answer Questions About:** +- Kubernetes infrastructure configuration +- ArgoCD deployment manifests +- Helm charts and values +- Infrastructure as Code +- DevOps best practices + +Start by asking questions about your infrastructure! +""" + return status_message + + except Exception as e: + error_msg = f"Error initializing assistant: {str(e)}" + print(f"Error: {error_msg}") + return f"Error: {error_msg}" + + +def chat_with_assistant(message: str, history): + global assistant + + if not assistant: + bot_response = "Assistant not initialized. Please provide a knowledge base path first." + history.append((message, bot_response)) + return history, "" + + if not message.strip(): + bot_response = "Please enter a question about your DevOps infrastructure." + history.append((message, bot_response)) + return history, "" + + try: + result = assistant.ask(message) + answer = result.get('answer', '') + + sources_text = "" + if result.get('sources'): + sources_text = "\n\n**Sources:**\n" + for i, source in enumerate(result['sources'], 1): + source_file = source.get('source', 'Unknown') + file_type = source.get('file_type', 'Unknown') + sources_text += f"\n{i}. {source_file} ({file_type})" + + bot_response = answer + sources_text if sources_text else answer + + except Exception as e: + bot_response = f"Error processing question: {str(e)}" + + history.append((message, bot_response)) + return history, "" + + +def create_interface(): + with gr.Blocks(title="DevOps AI Assistant") as interface: + + gr.Markdown("# DevOps AI Assistant") + gr.Markdown("Intelligent Q&A system for your infrastructure powered by RAG and LLM") + + gr.Markdown("## Configuration") + gr.Markdown("Enter the path to your GitOps repository to initialize the assistant") + + with gr.Row(): + kb_path_input = gr.Textbox( + label="Knowledge Base Path", + placeholder="/workspace/aau/repositories/infra-gitops/", + lines=1, + value="/workspace/aau/repositories/infra-gitops/" + ) + init_button = gr.Button("Initialize Assistant") + + status_output = gr.Markdown(value="Waiting for initialization...") + + gr.Markdown("## Chat Interface") + + chatbot = gr.Chatbot( + label="Conversation", + height=500, + show_copy_button=True, + bubble_full_width=False + ) + + with gr.Row(): + msg_input = gr.Textbox( + label="Your Question", + placeholder="Ask about your infrastructure, ArgoCD, Helm charts, etc...", + lines=2, + scale=5 + ) + send_button = gr.Button("Send", scale=1) + + with gr.Row(): + clear_button = gr.Button("Clear Chat", scale=2) + + with gr.Accordion("Example Questions", open=False): + gr.Markdown(""" +**Infrastructure & Deployment:** +- How many ArgoCD applications? +- What is the repository structure? +- How many YAML files are there? +- Show me the Helm chart values for nginx + +**Monitoring & Observability:** +- How is Prometheus configured? +- What monitoring exporters are installed? +- Tell me about the metrics server setup + +**Security & Access:** +- How are RBAC policies configured? +- What authentication methods are used? +- Explain the network policies + +**DevOps Practices:** +- What is the deployment pipeline? +- How are secrets managed? +- Show me the backup strategy + """) + + init_button.click( + initialize_assistant, + inputs=[kb_path_input], + outputs=[status_output] + ) + + msg_input.submit( + chat_with_assistant, + inputs=[msg_input, chatbot], + outputs=[chatbot, msg_input] + ) + + send_button.click( + chat_with_assistant, + inputs=[msg_input, chatbot], + outputs=[chatbot, msg_input] + ) + + clear_button.click(lambda: [], outputs=chatbot) + + return interface + + +def main(): + print("\n" + "=" * 60) + print("DevOps AI Assistant - RAG System") + print("=" * 60) + print("Starting Gradio server...") + print("\nAccess the application at: http://127.0.0.1:7860") + print("=" * 60 + "\n") + + interface = create_interface() + interface.launch( + server_name="0.0.0.0", + server_port=7860, + share=False, + show_error=True, + show_api=False + ) + + +if __name__ == "__main__": + main() diff --git a/week5/community-contributions/salah/devops-ai-assistance/devops_ai_assistance.py b/week5/community-contributions/salah/devops-ai-assistance/devops_ai_assistance.py new file mode 100644 index 0000000..a4adf2c --- /dev/null +++ b/week5/community-contributions/salah/devops-ai-assistance/devops_ai_assistance.py @@ -0,0 +1,609 @@ +import os +import re +from pathlib import Path +from typing import List, Optional, Dict, Any +import json +import tempfile +import shutil + +from langchain_core.documents import Document +from langchain_text_splitters import RecursiveCharacterTextSplitter +from langchain_huggingface import HuggingFaceEmbeddings +from langchain_community.vectorstores import Chroma +from langchain_openai import ChatOpenAI +from langchain_classic.memory import ConversationBufferMemory +from langchain_classic.chains import ConversationalRetrievalChain + + +class DevOpsKnowledgeBase: + def __init__(self, knowledge_base_path: str, embedding_model: str = "all-MiniLM-L6-v2"): + self.knowledge_base_path = Path(knowledge_base_path) + self.embedding_model_name = embedding_model + self.embedding_model = None + self.vectorstore = None + self.documents = [] + self.chunks = [] + self.temp_db_dir = None + self.indices = {} + self.structure = {} + + def _parse_structured_content(self, content: str, file_path: Path) -> dict: + metadata = {} + + try: + if file_path.suffix.lower() in ['.yaml', '.yml']: + import yaml + data = yaml.safe_load(content) + if isinstance(data, dict): + metadata['kind'] = data.get('kind') + metadata['api_version'] = data.get('apiVersion') + + if 'metadata' in data and isinstance(data['metadata'], dict): + for key, value in data['metadata'].items(): + if isinstance(value, (str, int, float, bool)): + metadata[f'meta_{key}'] = value + elif isinstance(value, dict): + for k, v in value.items(): + if isinstance(v, (str, int, float, bool)): + metadata[f'meta_{key}_{k}'] = v + + if 'spec' in data and isinstance(data['spec'], dict): + if 'project' in data['spec']: + metadata['project'] = data['spec']['project'] + if 'destination' in data['spec'] and isinstance(data['spec']['destination'], dict): + if 'namespace' in data['spec']['destination']: + metadata['namespace'] = data['spec']['destination']['namespace'] + + elif file_path.suffix.lower() == '.json': + data = json.loads(content) + if isinstance(data, dict): + for key, value in data.items(): + if isinstance(value, (str, int, float, bool)): + metadata[f'json_{key}'] = value + + elif file_path.suffix.lower() in ['.tf', '.hcl']: + metadata['is_terraform'] = True + resources = re.findall(r'resource\s+"([^"]+)"\s+"([^"]+)"', content) + if resources: + metadata['terraform_resources'] = [r[0] for r in resources] + metadata['resource_count'] = len(resources) + + modules = re.findall(r'module\s+"([^"]+)"', content) + if modules: + metadata['terraform_modules'] = modules + metadata['module_count'] = len(modules) + + elif file_path.suffix.lower() == '.py': + metadata['is_code'] = True + metadata['language'] = 'python' + + imports = re.findall(r'^(?:from|import)\s+(\S+)', content, re.MULTILINE) + classes = re.findall(r'^class\s+(\w+)', content, re.MULTILINE) + functions = re.findall(r'^def\s+(\w+)', content, re.MULTILINE) + + if imports: + metadata['imports'] = imports[:10] + if classes: + metadata['classes'] = classes + metadata['class_count'] = len(classes) + if functions: + metadata['functions'] = functions[:20] + metadata['function_count'] = len(functions) + + elif file_path.suffix.lower() in ['.js', '.ts']: + metadata['is_code'] = True + metadata['language'] = 'javascript' if file_path.suffix == '.js' else 'typescript' + + imports = re.findall(r'import\s+.*\s+from\s+[\'"]([^\'"]+)[\'"]', content) + functions = re.findall(r'(?:function|const|let|var)\s+(\w+)\s*=?\s*(?:async\s*)?\(', content) + classes = re.findall(r'class\s+(\w+)', content) + + if imports: + metadata['imports'] = imports[:10] + if classes: + metadata['classes'] = classes + metadata['class_count'] = len(classes) + if functions: + metadata['function_count'] = len(functions) + + elif file_path.suffix.lower() in ['.go']: + metadata['is_code'] = True + metadata['language'] = 'go' + + packages = re.findall(r'package\s+(\w+)', content) + if packages: + metadata['package'] = packages[0] + + imports = re.findall(r'import\s+[\'"]([^\'"]+)[\'"]', content) + if imports: + metadata['imports'] = imports[:10] + + except Exception as e: + pass + + return metadata + + def _extract_content_patterns(self, content: str) -> dict: + metadata = {} + content_lower = content.lower() + + urls = re.findall(r'https?://[^\s<>"]+', content) + if urls: + metadata['has_urls'] = True + metadata['url_count'] = len(urls) + domains = [] + for url in urls: + domain_match = re.findall(r'https?://([^/]+)', url) + if domain_match: + domains.append(domain_match[0]) + if domains: + metadata['domains'] = list(set(domains))[:5] + + ips = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', content) + if ips: + metadata['has_ips'] = True + metadata['ip_count'] = len(set(ips)) + + versions = re.findall(r'\bv?\d+\.\d+(?:\.\d+)?(?:-[\w.]+)?\b', content) + if versions: + metadata['has_versions'] = True + + patterns = { + 'has_secrets': any(keyword in content_lower for keyword in ['password', 'secret', 'token', 'api_key', 'apikey']), + 'has_monitoring': any(keyword in content_lower for keyword in ['prometheus', 'grafana', 'metrics', 'alert']), + 'has_networking': any(keyword in content_lower for keyword in ['ingress', 'service', 'loadbalancer', 'route']), + 'has_storage': any(keyword in content_lower for keyword in ['volume', 'pvc', 'storage', 'disk']), + 'has_database': any(keyword in content_lower for keyword in ['postgres', 'mysql', 'redis', 'mongodb', 'database']), + 'has_deployment': any(keyword in content_lower for keyword in ['deployment', 'statefulset', 'daemonset', 'replica']), + } + + metadata.update({k: v for k, v in patterns.items() if v}) + + quoted_strings = re.findall(r'"([^"]{3,30})"', content) + if quoted_strings: + metadata['quoted_strings'] = list(set(quoted_strings))[:10] + + return metadata + + def load_documents(self) -> List[Document]: + self.documents = [] + + if not self.knowledge_base_path.exists(): + raise ValueError(f"Knowledge base path does not exist: {self.knowledge_base_path}") + + supported_extensions = {'.yaml', '.yml', '.md', '.txt', '.json', '.tf', '.hcl', '.py', '.js', '.ts', '.go', '.sh', '.rst'} + + print(f"Loading documents from {self.knowledge_base_path}...") + + for file_path in self.knowledge_base_path.rglob("*"): + if file_path.is_file() and file_path.suffix.lower() in supported_extensions: + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read().strip() + + if content and len(content) > 50: + relative_path = file_path.relative_to(self.knowledge_base_path) + parts = relative_path.parts + + metadata = { + "source": str(relative_path), + "file_type": file_path.suffix.lower(), + "path": str(file_path), + "filename": file_path.stem, + "full_filename": file_path.name, + "char_count": len(content), + "word_count": len(content.split()), + "line_count": len(content.splitlines()), + "depth": len(parts) - 1, + "parent_dir": parts[-2] if len(parts) > 1 else "root", + "path_level_0": parts[0] if len(parts) > 0 else None, + "path_level_1": parts[1] if len(parts) > 1 else None, + "path_level_2": parts[2] if len(parts) > 2 else None, + "path_level_3": parts[3] if len(parts) > 3 else None, + "full_path_parts": list(parts), + } + + metadata.update(self._parse_structured_content(content, file_path)) + metadata.update(self._extract_content_patterns(content)) + + doc = Document(page_content=content, metadata=metadata) + self.documents.append(doc) + + except Exception as e: + print(f"Skipped {file_path.name}: {str(e)}") + + print(f"Loaded {len(self.documents)} documents") + return self.documents + + def discover_structure(self) -> dict: + print("\nAuto-discovering repository structure...") + + structure = { + 'total_files': len(self.documents), + 'by_file_type': {}, + 'by_depth': {}, + 'by_parent_dir': {}, + 'hierarchy': {}, + 'patterns': {} + } + + for doc in self.documents: + file_type = doc.metadata.get('file_type', 'unknown') + structure['by_file_type'][file_type] = structure['by_file_type'].get(file_type, 0) + 1 + + depth = doc.metadata.get('depth', 0) + structure['by_depth'][depth] = structure['by_depth'].get(depth, 0) + 1 + + parent = doc.metadata.get('parent_dir', 'unknown') + structure['by_parent_dir'][parent] = structure['by_parent_dir'].get(parent, 0) + 1 + + path_parts = doc.metadata.get('full_path_parts', []) + current_level = structure['hierarchy'] + for part in path_parts[:-1]: + if part not in current_level: + current_level[part] = {'_count': 0, '_children': {}} + current_level[part]['_count'] += 1 + current_level = current_level[part]['_children'] + + structure['patterns'] = self._detect_patterns() + + print(f"\nDiscovered Structure:") + print(f" Total files: {structure['total_files']}") + print(f"\n By file type:") + for ftype, count in sorted(structure['by_file_type'].items(), key=lambda x: x[1], reverse=True): + print(f" {ftype}: {count}") + + print(f"\n By depth:") + for depth, count in sorted(structure['by_depth'].items()): + print(f" Level {depth}: {count} files") + + print(f"\n Top-level directories:") + for dir_name, data in structure['hierarchy'].items(): + print(f" {dir_name}/: {data['_count']} files") + + if structure['patterns']: + print(f"\n Detected patterns:") + for pattern, count in structure['patterns'].items(): + print(f" {pattern}: {count} files") + + self.structure = structure + return structure + + def _detect_patterns(self) -> dict: + patterns = { + 'kubernetes_manifests': 0, + 'terraform_files': 0, + 'python_code': 0, + 'javascript_code': 0, + 'documentation': 0, + 'configuration': 0, + } + + for doc in self.documents: + if doc.metadata.get('kind') or doc.metadata.get('api_version'): + patterns['kubernetes_manifests'] += 1 + if doc.metadata.get('is_terraform'): + patterns['terraform_files'] += 1 + if doc.metadata.get('language') == 'python': + patterns['python_code'] += 1 + if doc.metadata.get('language') in ['javascript', 'typescript']: + patterns['javascript_code'] += 1 + if doc.metadata.get('file_type') in ['.md', '.rst', '.txt']: + patterns['documentation'] += 1 + if doc.metadata.get('file_type') in ['.yaml', '.yml', '.json', '.toml']: + patterns['configuration'] += 1 + + return {k: v for k, v in patterns.items() if v > 0} + + def create_dynamic_indices(self) -> dict: + print("\nCreating dynamic indices...") + + indices = { + 'by_path_level_0': {}, + 'by_path_level_1': {}, + 'by_path_level_2': {}, + 'by_path_level_3': {}, + 'by_file_type': {}, + 'by_kind': {}, + 'by_language': {}, + 'by_parent_dir': {}, + 'by_project': {}, + 'by_namespace': {}, + 'statistics': { + 'total_documents': len(self.documents), + 'total_chars': sum(d.metadata.get('char_count', 0) for d in self.documents), + 'total_lines': sum(d.metadata.get('line_count', 0) for d in self.documents), + } + } + + for doc in self.documents: + source = doc.metadata.get('source') + + for level in range(4): + level_key = f'path_level_{level}' + index_key = f'by_{level_key}' + if level_value := doc.metadata.get(level_key): + if level_value not in indices[index_key]: + indices[index_key][level_value] = [] + indices[index_key][level_value].append(source) + + if file_type := doc.metadata.get('file_type'): + if file_type not in indices['by_file_type']: + indices['by_file_type'][file_type] = [] + indices['by_file_type'][file_type].append(source) + + if kind := doc.metadata.get('kind'): + if kind not in indices['by_kind']: + indices['by_kind'][kind] = [] + indices['by_kind'][kind].append(source) + + if language := doc.metadata.get('language'): + if language not in indices['by_language']: + indices['by_language'][language] = [] + indices['by_language'][language].append(source) + + if parent := doc.metadata.get('parent_dir'): + if parent not in indices['by_parent_dir']: + indices['by_parent_dir'][parent] = [] + indices['by_parent_dir'][parent].append(source) + + if project := doc.metadata.get('project'): + if project not in indices['by_project']: + indices['by_project'][project] = [] + indices['by_project'][project].append(source) + + if namespace := doc.metadata.get('namespace'): + if namespace not in indices['by_namespace']: + indices['by_namespace'][namespace] = [] + indices['by_namespace'][namespace].append(source) + + self.indices = indices + + print(f"\nIndices Created:") + print(f" Total documents indexed: {indices['statistics']['total_documents']}") + print(f" Top-level paths: {len(indices['by_path_level_0'])}") + print(f" File types: {len(indices['by_file_type'])}") + if indices['by_kind']: + print(f" Kubernetes kinds: {len(indices['by_kind'])}") + if indices['by_language']: + print(f" Programming languages: {len(indices['by_language'])}") + + return indices + + def chunk_documents_adaptive(self, documents: List[Document]) -> List[Document]: + print("\nAdaptive chunking based on file characteristics...") + + all_chunks = [] + + strategies = { + 'small_structured': [], + 'large_structured': [], + 'code_files': [], + 'documentation': [], + 'default': [] + } + + for doc in documents: + char_count = doc.metadata.get('char_count', 0) + file_type = doc.metadata.get('file_type', '') + + if file_type in ['.yaml', '.yml', '.json', '.toml']: + if char_count < 2000: + strategies['small_structured'].append(doc) + else: + strategies['large_structured'].append(doc) + elif file_type in ['.py', '.js', '.go', '.java', '.ts', '.rs', '.sh']: + strategies['code_files'].append(doc) + elif file_type in ['.md', '.rst', '.txt']: + strategies['documentation'].append(doc) + else: + strategies['default'].append(doc) + + chunk_configs = { + 'small_structured': {'chunk_size': 2000, 'chunk_overlap': 100}, + 'large_structured': {'chunk_size': 1500, 'chunk_overlap': 200}, + 'code_files': {'chunk_size': 1200, 'chunk_overlap': 150}, + 'documentation': {'chunk_size': 1000, 'chunk_overlap': 200}, + 'default': {'chunk_size': 1000, 'chunk_overlap': 200} + } + + for strategy_name, docs in strategies.items(): + if not docs: + continue + + config = chunk_configs[strategy_name] + splitter = RecursiveCharacterTextSplitter( + chunk_size=config['chunk_size'], + chunk_overlap=config['chunk_overlap'], + separators=["\n\n", "\n", " ", ""] + ) + + chunks = splitter.split_documents(docs) + + for i, chunk in enumerate(chunks): + chunk.metadata['chunk_strategy'] = strategy_name + chunk.metadata['chunk_id'] = f"{strategy_name}_{i:04d}" + + all_chunks.extend(chunks) + print(f" {strategy_name}: {len(docs)} docs → {len(chunks)} chunks") + + self.chunks = all_chunks + print(f" Total: {len(all_chunks)} chunks created") + return all_chunks + + def initialize_embedding_model(self): + print(f"\nInitializing embedding model: {self.embedding_model_name}...") + self.embedding_model = HuggingFaceEmbeddings(model_name=self.embedding_model_name) + print("Embedding model initialized") + + def create_vectorstore(self) -> Chroma: + if not self.chunks: + raise ValueError("No chunks available. Call chunk_documents_adaptive() first.") + + if not self.embedding_model: + raise ValueError("Embedding model not initialized. Call initialize_embedding_model() first.") + + print("\nCreating vector store...") + + if self.temp_db_dir: + try: + shutil.rmtree(self.temp_db_dir) + except: + pass + + self.temp_db_dir = tempfile.mkdtemp(prefix="devops_kb_v2_") + + for chunk in self.chunks: + cleaned_metadata = {} + for key, value in chunk.metadata.items(): + if value is not None and not isinstance(value, (list, dict)): + cleaned_metadata[key] = value + elif isinstance(value, list) and value: + cleaned_metadata[key] = str(value) + chunk.metadata = cleaned_metadata + + self.vectorstore = Chroma.from_documents( + documents=self.chunks, + embedding=self.embedding_model, + persist_directory=self.temp_db_dir + ) + + doc_count = self.vectorstore._collection.count() + print(f"Vector store created with {doc_count} documents") + return self.vectorstore + + def initialize(self): + print("=" * 70) + print("Initializing DevOps Knowledge Base") + print("=" * 70) + + self.load_documents() + self.discover_structure() + self.create_dynamic_indices() + self.chunk_documents_adaptive(self.documents) + self.initialize_embedding_model() + self.create_vectorstore() + + print("\n" + "=" * 70) + print("Knowledge base initialized successfully!") + print("=" * 70) + return self.vectorstore + + +class DevOpsAIAssistant: + def __init__(self, knowledge_base_path: str, embedding_model: str = "all-MiniLM-L6-v2"): + self.knowledge_base = DevOpsKnowledgeBase(knowledge_base_path, embedding_model) + self.vectorstore = None + self.conversation_chain = None + self.memory = None + self.llm = None + + def setup(self): + print("\nSetting up DevOps AI Assistant...") + + self.vectorstore = self.knowledge_base.initialize() + + api_key = os.getenv('OPENAI_API_KEY') + if not api_key: + raise ValueError("OPENAI_API_KEY environment variable not set") + + print("\nInitializing OpenAI LLM...") + self.llm = ChatOpenAI( + model_name="gpt-4o-mini", + temperature=0.3, + api_key=api_key + ) + + print("Setting up conversation memory...") + self.memory = ConversationBufferMemory( + memory_key="chat_history", + return_messages=True, + output_key='answer' + ) + + print("Creating conversation chain...") + retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10}) + + self.conversation_chain = ConversationalRetrievalChain.from_llm( + llm=self.llm, + retriever=retriever, + memory=self.memory, + return_source_documents=True, + verbose=False + ) + + print("\n" + "=" * 70) + print("DevOps AI Assistant ready!") + print("=" * 70) + return self + + def ask(self, question: str) -> dict: + if not self.conversation_chain: + raise ValueError("Assistant not initialized. Call setup() first.") + + result = self.conversation_chain.invoke({"question": question}) + + response = { + "answer": result.get('answer', ''), + "sources": [] + } + + if result.get('source_documents'): + unique_sources = {} + for doc in result['source_documents']: + source = doc.metadata.get('source') + if source not in unique_sources: + path_info = "/".join([ + doc.metadata.get('path_level_0', ''), + doc.metadata.get('path_level_1', ''), + doc.metadata.get('path_level_2', '') + ]).strip('/') + + unique_sources[source] = { + "content": doc.page_content[:300], + "source": source, + "file_type": doc.metadata.get('file_type', 'Unknown'), + "path_info": path_info, + "kind": doc.metadata.get('kind'), + "language": doc.metadata.get('language') + } + + response["sources"] = list(unique_sources.values()) + + return response + + def get_status(self) -> dict: + if not self.vectorstore: + return {"status": "not_initialized"} + + doc_count = self.vectorstore._collection.count() + + status = { + "status": "ready", + "documents_loaded": len(self.knowledge_base.documents), + "chunks_created": len(self.knowledge_base.chunks), + "vectors_in_store": doc_count, + "knowledge_base_path": str(self.knowledge_base.knowledge_base_path) + } + + if self.knowledge_base.structure: + status["structure"] = { + "total_files": self.knowledge_base.structure['total_files'], + "file_types": len(self.knowledge_base.structure['by_file_type']), + "patterns": self.knowledge_base.structure['patterns'] + } + + if self.knowledge_base.indices: + status["indices"] = { + "path_levels": len(self.knowledge_base.indices['by_path_level_0']), + "kinds": len(self.knowledge_base.indices['by_kind']), + "languages": len(self.knowledge_base.indices['by_language']) + } + + return status + + +def create_assistant(knowledge_base_path: str) -> DevOpsAIAssistant: + assistant = DevOpsAIAssistant(knowledge_base_path) + assistant.setup() + return assistant