Refactor DevOps AI Assistant: Enhance knowledge base parsing and indexing, improve error handling, and update user interface elements

This commit is contained in:
Mohamed Salah
2025-10-30 14:51:02 +03:00
parent aa3ddf2538
commit b9e8dc1870
2 changed files with 470 additions and 75 deletions

View File

@@ -1,6 +1,6 @@
import os
import gradio as gr
from devops_ai_assistance import create_assistant, DevOpsAIAssistant
from devops_ai_assistance import create_assistant
assistant = None
@@ -8,7 +8,6 @@ status_info = None
def initialize_assistant(kb_path: str):
"""Initialize the assistant with knowledge base"""
global assistant, status_info
try:
@@ -16,42 +15,41 @@ def initialize_assistant(kb_path: str):
if not kb_path:
return "Error: Please provide a valid knowledge base path"
print(f"\n🚀 Initializing with knowledge base: {kb_path}")
print(f"\nInitializing with knowledge base: {kb_path}")
assistant = create_assistant(kb_path)
status_info = assistant.get_status()
status_message = f"""
**DevOps AI Assistant Initialized Successfully!**
**DevOps AI Assistant Initialized Successfully**
📊 **Knowledge Base Statistics:**
**Knowledge Base Statistics:**
- Documents Loaded: {status_info['documents_loaded']}
- Chunks Created: {status_info['chunks_created']}
- Vectors in Store: {status_info['vectors_in_store']}
- Knowledge Base Path: {status_info['knowledge_base_path']}
🎯 **Ready to Answer Questions About:**
**Ready to Answer Questions About:**
- Kubernetes infrastructure configuration
- ArgoCD deployment manifests
- Helm charts and values
- Infrastructure as Code (IaC)
- DevOps best practices in your environment
- Infrastructure as Code
- DevOps best practices
Start by asking questions about your k8s cluster infrastructure!
Start by asking questions about your infrastructure!
"""
return status_message
except Exception as e:
error_msg = f"Error initializing assistant: {str(e)}"
print(f" {error_msg}")
return f" {error_msg}"
print(f"Error: {error_msg}")
return f"Error: {error_msg}"
def chat_with_assistant(message: str, history):
"""Chat function for the assistant"""
global assistant
if not assistant:
bot_response = "Assistant not initialized. Please provide a knowledge base path first."
bot_response = "Assistant not initialized. Please provide a knowledge base path first."
history.append((message, bot_response))
return history, ""
@@ -66,11 +64,11 @@ def chat_with_assistant(message: str, history):
sources_text = ""
if result.get('sources'):
sources_text = "\n\n📚 **Sources:**\n"
sources_text = "\n\n**Sources:**\n"
for i, source in enumerate(result['sources'], 1):
source_file = source.get('source', 'Unknown')
file_type = source.get('file_type', 'Unknown')
sources_text += f"\n{i}. **{source_file}** ({file_type})"
sources_text += f"\n{i}. {source_file} ({file_type})"
bot_response = answer + sources_text if sources_text else answer
@@ -82,16 +80,13 @@ def chat_with_assistant(message: str, history):
def create_interface():
"""Create the Gradio interface"""
global assistant
with gr.Blocks(title="DevOps AI Assistant") as interface:
gr.Markdown("# 🤖 DevOps AI Assistant")
gr.Markdown("Intelligent Q&A system for your Kubernetes infrastructure powered by RAG and LLM")
gr.Markdown("# DevOps AI Assistant")
gr.Markdown("Intelligent Q&A system for your infrastructure powered by RAG and LLM")
gr.Markdown("## 🔧 Configuration")
gr.Markdown("Enter the path to your GitOps repository (knowledge base) to initialize the assistant")
gr.Markdown("## Configuration")
gr.Markdown("Enter the path to your GitOps repository to initialize the assistant")
with gr.Row():
kb_path_input = gr.Textbox(
@@ -100,39 +95,38 @@ def create_interface():
lines=1,
value="/workspace/aau/repositories/infra-gitops/"
)
init_button = gr.Button("🚀 Initialize Assistant")
init_button = gr.Button("Initialize Assistant")
status_output = gr.Markdown(value="Waiting for initialization...")
status_output = gr.Markdown(value="Waiting for initialization...")
gr.Markdown("## 💬 Chat Interface")
gr.Markdown("## Chat Interface")
chatbot = gr.Chatbot(
label="Conversation",
height=500,
show_copy_button=True,
avatar_images=("👤", "🤖"),
bubble_full_width=False
)
with gr.Row():
msg_input = gr.Textbox(
label="Your Question",
placeholder="Ask about your k8s infrastructure, ArgoCD, Helm charts, etc...",
placeholder="Ask about your infrastructure, ArgoCD, Helm charts, etc...",
lines=2,
scale=5
)
send_button = gr.Button("Send 💬", scale=1)
send_button = gr.Button("Send", scale=1)
with gr.Row():
clear_button = gr.Button("🗑️ Clear Chat", scale=2)
clear_button = gr.Button("Clear Chat", scale=2)
with gr.Accordion("📋 Example Questions", open=False):
with gr.Accordion("Example Questions", open=False):
gr.Markdown("""
**Infrastructure & Deployment:**
- How is the Kubernetes cluster configured?
- What ArgoCD applications are deployed?
- How many ArgoCD applications?
- What is the repository structure?
- How many YAML files are there?
- Show me the Helm chart values for nginx
- What storage solutions are available?
**Monitoring & Observability:**
- How is Prometheus configured?
@@ -174,9 +168,8 @@ def create_interface():
def main():
"""Main entry point"""
print("\n" + "=" * 60)
print("🚀 DevOps AI Assistant - RAG System")
print("DevOps AI Assistant - RAG System")
print("=" * 60)
print("Starting Gradio server...")
print("\nAccess the application at: http://127.0.0.1:7860")

View File

@@ -1,12 +1,12 @@
import os
import re
from pathlib import Path
from typing import List, Optional
from typing import List, Optional, Dict, Any
import json
import tempfile
import shutil
from langchain_core.documents import Document
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
@@ -24,6 +24,146 @@ class DevOpsKnowledgeBase:
self.documents = []
self.chunks = []
self.temp_db_dir = None
self.indices = {}
self.structure = {}
def _parse_structured_content(self, content: str, file_path: Path) -> dict:
metadata = {}
try:
if file_path.suffix.lower() in ['.yaml', '.yml']:
import yaml
data = yaml.safe_load(content)
if isinstance(data, dict):
metadata['kind'] = data.get('kind')
metadata['api_version'] = data.get('apiVersion')
if 'metadata' in data and isinstance(data['metadata'], dict):
for key, value in data['metadata'].items():
if isinstance(value, (str, int, float, bool)):
metadata[f'meta_{key}'] = value
elif isinstance(value, dict):
for k, v in value.items():
if isinstance(v, (str, int, float, bool)):
metadata[f'meta_{key}_{k}'] = v
if 'spec' in data and isinstance(data['spec'], dict):
if 'project' in data['spec']:
metadata['project'] = data['spec']['project']
if 'destination' in data['spec'] and isinstance(data['spec']['destination'], dict):
if 'namespace' in data['spec']['destination']:
metadata['namespace'] = data['spec']['destination']['namespace']
elif file_path.suffix.lower() == '.json':
data = json.loads(content)
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, (str, int, float, bool)):
metadata[f'json_{key}'] = value
elif file_path.suffix.lower() in ['.tf', '.hcl']:
metadata['is_terraform'] = True
resources = re.findall(r'resource\s+"([^"]+)"\s+"([^"]+)"', content)
if resources:
metadata['terraform_resources'] = [r[0] for r in resources]
metadata['resource_count'] = len(resources)
modules = re.findall(r'module\s+"([^"]+)"', content)
if modules:
metadata['terraform_modules'] = modules
metadata['module_count'] = len(modules)
elif file_path.suffix.lower() == '.py':
metadata['is_code'] = True
metadata['language'] = 'python'
imports = re.findall(r'^(?:from|import)\s+(\S+)', content, re.MULTILINE)
classes = re.findall(r'^class\s+(\w+)', content, re.MULTILINE)
functions = re.findall(r'^def\s+(\w+)', content, re.MULTILINE)
if imports:
metadata['imports'] = imports[:10]
if classes:
metadata['classes'] = classes
metadata['class_count'] = len(classes)
if functions:
metadata['functions'] = functions[:20]
metadata['function_count'] = len(functions)
elif file_path.suffix.lower() in ['.js', '.ts']:
metadata['is_code'] = True
metadata['language'] = 'javascript' if file_path.suffix == '.js' else 'typescript'
imports = re.findall(r'import\s+.*\s+from\s+[\'"]([^\'"]+)[\'"]', content)
functions = re.findall(r'(?:function|const|let|var)\s+(\w+)\s*=?\s*(?:async\s*)?\(', content)
classes = re.findall(r'class\s+(\w+)', content)
if imports:
metadata['imports'] = imports[:10]
if classes:
metadata['classes'] = classes
metadata['class_count'] = len(classes)
if functions:
metadata['function_count'] = len(functions)
elif file_path.suffix.lower() in ['.go']:
metadata['is_code'] = True
metadata['language'] = 'go'
packages = re.findall(r'package\s+(\w+)', content)
if packages:
metadata['package'] = packages[0]
imports = re.findall(r'import\s+[\'"]([^\'"]+)[\'"]', content)
if imports:
metadata['imports'] = imports[:10]
except Exception as e:
pass
return metadata
def _extract_content_patterns(self, content: str) -> dict:
metadata = {}
content_lower = content.lower()
urls = re.findall(r'https?://[^\s<>"]+', content)
if urls:
metadata['has_urls'] = True
metadata['url_count'] = len(urls)
domains = []
for url in urls:
domain_match = re.findall(r'https?://([^/]+)', url)
if domain_match:
domains.append(domain_match[0])
if domains:
metadata['domains'] = list(set(domains))[:5]
ips = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', content)
if ips:
metadata['has_ips'] = True
metadata['ip_count'] = len(set(ips))
versions = re.findall(r'\bv?\d+\.\d+(?:\.\d+)?(?:-[\w.]+)?\b', content)
if versions:
metadata['has_versions'] = True
patterns = {
'has_secrets': any(keyword in content_lower for keyword in ['password', 'secret', 'token', 'api_key', 'apikey']),
'has_monitoring': any(keyword in content_lower for keyword in ['prometheus', 'grafana', 'metrics', 'alert']),
'has_networking': any(keyword in content_lower for keyword in ['ingress', 'service', 'loadbalancer', 'route']),
'has_storage': any(keyword in content_lower for keyword in ['volume', 'pvc', 'storage', 'disk']),
'has_database': any(keyword in content_lower for keyword in ['postgres', 'mysql', 'redis', 'mongodb', 'database']),
'has_deployment': any(keyword in content_lower for keyword in ['deployment', 'statefulset', 'daemonset', 'replica']),
}
metadata.update({k: v for k, v in patterns.items() if v})
quoted_strings = re.findall(r'"([^"]{3,30})"', content)
if quoted_strings:
metadata['quoted_strings'] = list(set(quoted_strings))[:10]
return metadata
def load_documents(self) -> List[Document]:
self.documents = []
@@ -31,7 +171,7 @@ class DevOpsKnowledgeBase:
if not self.knowledge_base_path.exists():
raise ValueError(f"Knowledge base path does not exist: {self.knowledge_base_path}")
supported_extensions = {'.yaml', '.yml', '.md', '.txt', '.json'}
supported_extensions = {'.yaml', '.yml', '.md', '.txt', '.json', '.tf', '.hcl', '.py', '.js', '.ts', '.go', '.sh', '.rst'}
print(f"Loading documents from {self.knowledge_base_path}...")
@@ -43,14 +183,30 @@ class DevOpsKnowledgeBase:
if content and len(content) > 50:
relative_path = file_path.relative_to(self.knowledge_base_path)
doc = Document(
page_content=content,
metadata={
"source": str(relative_path),
"file_type": file_path.suffix.lower(),
"path": str(file_path)
}
)
parts = relative_path.parts
metadata = {
"source": str(relative_path),
"file_type": file_path.suffix.lower(),
"path": str(file_path),
"filename": file_path.stem,
"full_filename": file_path.name,
"char_count": len(content),
"word_count": len(content.split()),
"line_count": len(content.splitlines()),
"depth": len(parts) - 1,
"parent_dir": parts[-2] if len(parts) > 1 else "root",
"path_level_0": parts[0] if len(parts) > 0 else None,
"path_level_1": parts[1] if len(parts) > 1 else None,
"path_level_2": parts[2] if len(parts) > 2 else None,
"path_level_3": parts[3] if len(parts) > 3 else None,
"full_path_parts": list(parts),
}
metadata.update(self._parse_structured_content(content, file_path))
metadata.update(self._extract_content_patterns(content))
doc = Document(page_content=content, metadata=metadata)
self.documents.append(doc)
except Exception as e:
@@ -59,35 +215,235 @@ class DevOpsKnowledgeBase:
print(f"Loaded {len(self.documents)} documents")
return self.documents
def chunk_documents(self, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Document]:
if not self.documents:
raise ValueError("No documents loaded. Call load_documents() first.")
def discover_structure(self) -> dict:
print("\nAuto-discovering repository structure...")
print(f"Splitting {len(self.documents)} documents into chunks...")
structure = {
'total_files': len(self.documents),
'by_file_type': {},
'by_depth': {},
'by_parent_dir': {},
'hierarchy': {},
'patterns': {}
}
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", " ", ""]
)
for doc in self.documents:
file_type = doc.metadata.get('file_type', 'unknown')
structure['by_file_type'][file_type] = structure['by_file_type'].get(file_type, 0) + 1
self.chunks = text_splitter.split_documents(self.documents)
print(f"Created {len(self.chunks)} chunks")
return self.chunks
depth = doc.metadata.get('depth', 0)
structure['by_depth'][depth] = structure['by_depth'].get(depth, 0) + 1
parent = doc.metadata.get('parent_dir', 'unknown')
structure['by_parent_dir'][parent] = structure['by_parent_dir'].get(parent, 0) + 1
path_parts = doc.metadata.get('full_path_parts', [])
current_level = structure['hierarchy']
for part in path_parts[:-1]:
if part not in current_level:
current_level[part] = {'_count': 0, '_children': {}}
current_level[part]['_count'] += 1
current_level = current_level[part]['_children']
structure['patterns'] = self._detect_patterns()
print(f"\nDiscovered Structure:")
print(f" Total files: {structure['total_files']}")
print(f"\n By file type:")
for ftype, count in sorted(structure['by_file_type'].items(), key=lambda x: x[1], reverse=True):
print(f" {ftype}: {count}")
print(f"\n By depth:")
for depth, count in sorted(structure['by_depth'].items()):
print(f" Level {depth}: {count} files")
print(f"\n Top-level directories:")
for dir_name, data in structure['hierarchy'].items():
print(f" {dir_name}/: {data['_count']} files")
if structure['patterns']:
print(f"\n Detected patterns:")
for pattern, count in structure['patterns'].items():
print(f" {pattern}: {count} files")
self.structure = structure
return structure
def _detect_patterns(self) -> dict:
patterns = {
'kubernetes_manifests': 0,
'terraform_files': 0,
'python_code': 0,
'javascript_code': 0,
'documentation': 0,
'configuration': 0,
}
for doc in self.documents:
if doc.metadata.get('kind') or doc.metadata.get('api_version'):
patterns['kubernetes_manifests'] += 1
if doc.metadata.get('is_terraform'):
patterns['terraform_files'] += 1
if doc.metadata.get('language') == 'python':
patterns['python_code'] += 1
if doc.metadata.get('language') in ['javascript', 'typescript']:
patterns['javascript_code'] += 1
if doc.metadata.get('file_type') in ['.md', '.rst', '.txt']:
patterns['documentation'] += 1
if doc.metadata.get('file_type') in ['.yaml', '.yml', '.json', '.toml']:
patterns['configuration'] += 1
return {k: v for k, v in patterns.items() if v > 0}
def create_dynamic_indices(self) -> dict:
print("\nCreating dynamic indices...")
indices = {
'by_path_level_0': {},
'by_path_level_1': {},
'by_path_level_2': {},
'by_path_level_3': {},
'by_file_type': {},
'by_kind': {},
'by_language': {},
'by_parent_dir': {},
'by_project': {},
'by_namespace': {},
'statistics': {
'total_documents': len(self.documents),
'total_chars': sum(d.metadata.get('char_count', 0) for d in self.documents),
'total_lines': sum(d.metadata.get('line_count', 0) for d in self.documents),
}
}
for doc in self.documents:
source = doc.metadata.get('source')
for level in range(4):
level_key = f'path_level_{level}'
index_key = f'by_{level_key}'
if level_value := doc.metadata.get(level_key):
if level_value not in indices[index_key]:
indices[index_key][level_value] = []
indices[index_key][level_value].append(source)
if file_type := doc.metadata.get('file_type'):
if file_type not in indices['by_file_type']:
indices['by_file_type'][file_type] = []
indices['by_file_type'][file_type].append(source)
if kind := doc.metadata.get('kind'):
if kind not in indices['by_kind']:
indices['by_kind'][kind] = []
indices['by_kind'][kind].append(source)
if language := doc.metadata.get('language'):
if language not in indices['by_language']:
indices['by_language'][language] = []
indices['by_language'][language].append(source)
if parent := doc.metadata.get('parent_dir'):
if parent not in indices['by_parent_dir']:
indices['by_parent_dir'][parent] = []
indices['by_parent_dir'][parent].append(source)
if project := doc.metadata.get('project'):
if project not in indices['by_project']:
indices['by_project'][project] = []
indices['by_project'][project].append(source)
if namespace := doc.metadata.get('namespace'):
if namespace not in indices['by_namespace']:
indices['by_namespace'][namespace] = []
indices['by_namespace'][namespace].append(source)
self.indices = indices
print(f"\nIndices Created:")
print(f" Total documents indexed: {indices['statistics']['total_documents']}")
print(f" Top-level paths: {len(indices['by_path_level_0'])}")
print(f" File types: {len(indices['by_file_type'])}")
if indices['by_kind']:
print(f" Kubernetes kinds: {len(indices['by_kind'])}")
if indices['by_language']:
print(f" Programming languages: {len(indices['by_language'])}")
return indices
def chunk_documents_adaptive(self, documents: List[Document]) -> List[Document]:
print("\nAdaptive chunking based on file characteristics...")
all_chunks = []
strategies = {
'small_structured': [],
'large_structured': [],
'code_files': [],
'documentation': [],
'default': []
}
for doc in documents:
char_count = doc.metadata.get('char_count', 0)
file_type = doc.metadata.get('file_type', '')
if file_type in ['.yaml', '.yml', '.json', '.toml']:
if char_count < 2000:
strategies['small_structured'].append(doc)
else:
strategies['large_structured'].append(doc)
elif file_type in ['.py', '.js', '.go', '.java', '.ts', '.rs', '.sh']:
strategies['code_files'].append(doc)
elif file_type in ['.md', '.rst', '.txt']:
strategies['documentation'].append(doc)
else:
strategies['default'].append(doc)
chunk_configs = {
'small_structured': {'chunk_size': 2000, 'chunk_overlap': 100},
'large_structured': {'chunk_size': 1500, 'chunk_overlap': 200},
'code_files': {'chunk_size': 1200, 'chunk_overlap': 150},
'documentation': {'chunk_size': 1000, 'chunk_overlap': 200},
'default': {'chunk_size': 1000, 'chunk_overlap': 200}
}
for strategy_name, docs in strategies.items():
if not docs:
continue
config = chunk_configs[strategy_name]
splitter = RecursiveCharacterTextSplitter(
chunk_size=config['chunk_size'],
chunk_overlap=config['chunk_overlap'],
separators=["\n\n", "\n", " ", ""]
)
chunks = splitter.split_documents(docs)
for i, chunk in enumerate(chunks):
chunk.metadata['chunk_strategy'] = strategy_name
chunk.metadata['chunk_id'] = f"{strategy_name}_{i:04d}"
all_chunks.extend(chunks)
print(f" {strategy_name}: {len(docs)} docs → {len(chunks)} chunks")
self.chunks = all_chunks
print(f" Total: {len(all_chunks)} chunks created")
return all_chunks
def initialize_embedding_model(self):
print(f"Initializing embedding model: {self.embedding_model_name}...")
print(f"\nInitializing embedding model: {self.embedding_model_name}...")
self.embedding_model = HuggingFaceEmbeddings(model_name=self.embedding_model_name)
print("Embedding model initialized")
def create_vectorstore(self) -> Chroma:
if not self.chunks:
raise ValueError("No chunks available. Call chunk_documents() first.")
raise ValueError("No chunks available. Call chunk_documents_adaptive() first.")
if not self.embedding_model:
raise ValueError("Embedding model not initialized. Call initialize_embedding_model() first.")
print("Creating vector store...")
print("\nCreating vector store...")
if self.temp_db_dir:
try:
@@ -95,7 +451,16 @@ class DevOpsKnowledgeBase:
except:
pass
self.temp_db_dir = tempfile.mkdtemp(prefix="devops_kb_")
self.temp_db_dir = tempfile.mkdtemp(prefix="devops_kb_v2_")
for chunk in self.chunks:
cleaned_metadata = {}
for key, value in chunk.metadata.items():
if value is not None and not isinstance(value, (list, dict)):
cleaned_metadata[key] = value
elif isinstance(value, list) and value:
cleaned_metadata[key] = str(value)
chunk.metadata = cleaned_metadata
self.vectorstore = Chroma.from_documents(
documents=self.chunks,
@@ -108,15 +473,20 @@ class DevOpsKnowledgeBase:
return self.vectorstore
def initialize(self):
print("Initializing DevOps Knowledge Base...")
print("=" * 60)
print("=" * 70)
print("Initializing DevOps Knowledge Base")
print("=" * 70)
self.load_documents()
self.chunk_documents()
self.discover_structure()
self.create_dynamic_indices()
self.chunk_documents_adaptive(self.documents)
self.initialize_embedding_model()
self.create_vectorstore()
print("\nKnowledge base initialized successfully!")
print("\n" + "=" * 70)
print("Knowledge base initialized successfully!")
print("=" * 70)
return self.vectorstore
@@ -129,7 +499,7 @@ class DevOpsAIAssistant:
self.llm = None
def setup(self):
print("Setting up DevOps AI Assistant...")
print("\nSetting up DevOps AI Assistant...")
self.vectorstore = self.knowledge_base.initialize()
@@ -137,7 +507,7 @@ class DevOpsAIAssistant:
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
print("Initializing OpenAI LLM...")
print("\nInitializing OpenAI LLM...")
self.llm = ChatOpenAI(
model_name="gpt-4o-mini",
temperature=0.3,
@@ -152,7 +522,7 @@ class DevOpsAIAssistant:
)
print("Creating conversation chain...")
retriever = self.vectorstore.as_retriever(search_kwargs={"k": 5})
retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10})
self.conversation_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
@@ -162,7 +532,9 @@ class DevOpsAIAssistant:
verbose=False
)
print("\n" + "=" * 70)
print("DevOps AI Assistant ready!")
print("=" * 70)
return self
def ask(self, question: str) -> dict:
@@ -177,12 +549,26 @@ class DevOpsAIAssistant:
}
if result.get('source_documents'):
unique_sources = {}
for doc in result['source_documents']:
response["sources"].append({
"content": doc.page_content[:300],
"source": doc.metadata.get('source', 'Unknown'),
"file_type": doc.metadata.get('file_type', 'Unknown')
})
source = doc.metadata.get('source')
if source not in unique_sources:
path_info = "/".join([
doc.metadata.get('path_level_0', ''),
doc.metadata.get('path_level_1', ''),
doc.metadata.get('path_level_2', '')
]).strip('/')
unique_sources[source] = {
"content": doc.page_content[:300],
"source": source,
"file_type": doc.metadata.get('file_type', 'Unknown'),
"path_info": path_info,
"kind": doc.metadata.get('kind'),
"language": doc.metadata.get('language')
}
response["sources"] = list(unique_sources.values())
return response
@@ -192,7 +578,7 @@ class DevOpsAIAssistant:
doc_count = self.vectorstore._collection.count()
return {
status = {
"status": "ready",
"documents_loaded": len(self.knowledge_base.documents),
"chunks_created": len(self.knowledge_base.chunks),
@@ -200,6 +586,22 @@ class DevOpsAIAssistant:
"knowledge_base_path": str(self.knowledge_base.knowledge_base_path)
}
if self.knowledge_base.structure:
status["structure"] = {
"total_files": self.knowledge_base.structure['total_files'],
"file_types": len(self.knowledge_base.structure['by_file_type']),
"patterns": self.knowledge_base.structure['patterns']
}
if self.knowledge_base.indices:
status["indices"] = {
"path_levels": len(self.knowledge_base.indices['by_path_level_0']),
"kinds": len(self.knowledge_base.indices['by_kind']),
"languages": len(self.knowledge_base.indices['by_language'])
}
return status
def create_assistant(knowledge_base_path: str) -> DevOpsAIAssistant:
assistant = DevOpsAIAssistant(knowledge_base_path)