Add GitOps Guardian application with multi-agent architecture for security and compliance reviews

This commit is contained in:
Mohamed Salah
2025-10-31 07:25:00 +02:00
parent 875cbda5e0
commit 88b70bc635
6 changed files with 1155 additions and 0 deletions

View File

@@ -0,0 +1,18 @@
# GitOps Guardian Configuration
# Required: GitHub Personal Access Token
# Create at: https://github.com/settings/tokens
# Required permissions: repo (read)
GITHUB_TOKEN=ghp_your_token_here
# Required: OpenAI API Key
# Get from: https://platform.openai.com/api-keys
# If already exported in ~/.bashrc, you can skip this
# OPENAI_API_KEY=sk-your_key_here
# Required: Repositories to monitor (comma-separated)
# Format: owner/repo,owner/repo2
GITOPS_REPOS=myorg/infra-gitops,myorg/helm-charts
# Optional: Scanning interval in seconds (default: 300)
# SCAN_INTERVAL=300

View File

@@ -0,0 +1,87 @@
# GitOps Guardian
AI-powered multi-agent system for automated security and compliance review of GitOps pull requests.
## What It Does
GitOps Guardian automatically reviews pull requests in GitOps repositories to identify security risks and compliance violations before code reaches production. It uses a multi-agent architecture combining AI-powered security analysis with rule-based Kubernetes best practices validation.
## Key Features
- **Multi-Agent Architecture**: Four specialized agents (Scanner, Security, Compliance, Ensemble)
- **Security Detection**: Identifies hardcoded secrets, privileged containers, missing security contexts, insecure RBAC
- **Compliance Validation**: Checks for :latest tags, missing resource limits, required labels
- **Risk Scoring**: 0-1 risk score with SAFE/REVIEW/RISKY classification
- **Interactive Dashboard**: Real-time Gradio web interface
- **Cost Tracking**: Monitor OpenAI API costs (~$0.0004 per PR)
- **Export & Comments**: Download results as JSON/CSV, post reviews to GitHub PRs
- **Historical Trends**: Track risk scores over time
## Requirements
- Python 3.10+
- GitHub Personal Access Token (repo read permissions)
- OpenAI API Key
- GitOps repositories with open pull requests
## Quick Start
1. Install dependencies:
```bash
pip install -r requirements.txt
```
2. Configure environment:
```bash
cp .env.example .env
# Edit .env with your tokens and repository list
```
3. Run the application:
```bash
python app.py
```
4. Open browser at `http://localhost:7860`
## Configuration
Create `.env` file with:
```bash
GITHUB_TOKEN=ghp_your_token
OPENAI_API_KEY=sk_your_key
GITOPS_REPOS=owner/repo1,owner/repo2
```
## Project Structure
```
gitops-guardian/
├── models.py # Pydantic data models
├── agents.py # Multi-agent system (4 agents)
├── app.py # Gradio UI & orchestration
├── requirements.txt # Dependencies
└── .env.example # Configuration template
```
## How It Works
1. **Scanner Agent** fetches open PRs from GitHub repositories
2. **Security Agent** analyzes diffs using OpenAI GPT-4o-mini for security issues
3. **Compliance Agent** validates Kubernetes manifests against best practices
4. **Ensemble Agent** combines scores (60% security, 40% compliance) into overall risk
5. Results displayed in dashboard with recommendations
## Week 8 Concepts Applied
- Multi-agent system with base Agent class and color-coded logging
- Ensemble pattern with weighted scoring
- External API integration (GitHub + OpenAI)
- Pydantic models for type safety
- Gradio web interface
- Persistent memory (JSON-based)
- Real-world DevOps use case
## Author
Salah - Week 8: Multi-Agent Systems - LLM Engineering Bootcamp 2025

View File

@@ -0,0 +1,407 @@
import os
import re
import yaml
from typing import List, Dict
from openai import OpenAI
from github import Github
from models import (
PullRequest, SecurityScore, ComplianceScore, RiskAssessment,
SecurityIssue, ComplianceViolation, RiskLevel
)
class Agent:
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
BG_BLACK = '\033[40m'
RESET = '\033[0m'
name = "Agent"
color = WHITE
def log(self, message):
pass
class GitOpsScannerAgent(Agent):
name = "GitOps Scanner"
color = Agent.CYAN
def __init__(self, github_token):
self.github = Github(github_token)
def scan(self, repos, memory=[]):
all_prs = []
for repo_name in repos:
try:
repo = self.github.get_repo(repo_name)
pulls = repo.get_pulls(state='open', sort='created', direction='desc')
for pr in pulls:
pr_url = pr.html_url
if pr_url in memory:
continue
files = pr.get_files()
diff_content = ""
files_changed = []
for file in files:
files_changed.append(file.filename)
if file.patch:
diff_content += f"\n\n--- {file.filename}\n{file.patch}"
pull_request = PullRequest(
repo=repo_name,
number=pr.number,
title=pr.title,
author=pr.user.login,
url=pr_url,
diff=diff_content,
files_changed=files_changed,
created_at=pr.created_at,
labels=[label.name for label in pr.labels]
)
all_prs.append(pull_request)
except Exception as e:
pass
return all_prs
class SecurityAgent(Agent):
name = "Security Agent"
color = Agent.RED
def __init__(self, openai_api_key):
self.client = OpenAI(api_key=openai_api_key)
def review(self, pr):
system_prompt = """You are a security expert analyzing GitOps infrastructure changes.
Identify security issues in Kubernetes manifests, Helm charts, and infrastructure code.
Focus on:
1. Hardcoded secrets (AWS keys, passwords, tokens, API keys)
2. Insecure container configurations (privileged mode, hostNetwork)
3. Missing security contexts
4. Overly permissive RBAC
5. Exposed services without proper restrictions
6. Using :latest tags or insecure images
Respond in JSON format:
{
"risk_score": 0.0-1.0,
"issues": [
{
"type": "hardcoded_secret",
"severity": "critical|high|medium|low",
"description": "Found AWS access key",
"line_number": 15,
"file_path": "deployment.yaml",
"recommendation": "Use Kubernetes Secret instead"
}
],
"summary": "Brief summary of findings"
}
"""
user_prompt = f"""Analyze this GitOps pull request for security issues:
Title: {pr.title}
Files changed: {', '.join(pr.files_changed)}
Diff:
{pr.diff[:3000]}
Identify any security concerns."""
try:
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_format={"type": "json_object"},
temperature=0.3
)
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
cost = (input_tokens * 0.150 / 1_000_000) + (output_tokens * 0.600 / 1_000_000)
result = eval(response.choices[0].message.content)
issues = []
for issue_dict in result.get('issues', []):
issue = SecurityIssue(
type=issue_dict.get('type', 'unknown'),
severity=issue_dict.get('severity', 'medium'),
line_number=issue_dict.get('line_number'),
file_path=issue_dict.get('file_path'),
description=issue_dict.get('description', ''),
recommendation=issue_dict.get('recommendation', '')
)
issues.append(issue)
risk_score = float(result.get('risk_score', 0.0))
summary = result.get('summary', 'No issues found')
return SecurityScore(
risk=risk_score,
issues=issues,
summary=summary,
confidence=0.85,
cost=cost
)
except Exception as e:
return SecurityScore(
risk=0.5,
issues=[],
summary=f"Error during analysis: {str(e)}",
confidence=0.3
)
class ComplianceAgent(Agent):
name = "Compliance Agent"
color = Agent.YELLOW
def __init__(self, github_token=None):
self.github_client = Github(github_token) if github_token else None
def review(self, pr):
violations = []
passed_checks = []
yaml_files = self._extract_yaml_files(pr.diff, pr.files_changed)
for file_path, content in yaml_files.items():
try:
docs = list(yaml.safe_load_all(content))
for doc in docs:
if not doc or not isinstance(doc, dict):
continue
image_violations = self._check_image_tags(doc, file_path)
violations.extend(image_violations)
if not image_violations:
passed_checks.append(f"Image tags OK in {file_path}")
limit_violations = self._check_resource_limits(doc, file_path)
violations.extend(limit_violations)
if not limit_violations:
passed_checks.append(f"Resource limits OK in {file_path}")
label_violations = self._check_labels(doc, file_path)
violations.extend(label_violations)
if not label_violations:
passed_checks.append(f"Labels OK in {file_path}")
security_violations = self._check_security_context(doc, file_path)
violations.extend(security_violations)
if not security_violations:
passed_checks.append(f"Security context OK in {file_path}")
except Exception as e:
pass
risk_score = min(1.0, len(violations) / 10.0)
summary = f"Found {len(violations)} violations, {len(passed_checks)} checks passed"
return ComplianceScore(
risk=risk_score,
violations=violations,
passed_checks=passed_checks,
summary=summary
)
def _extract_yaml_files(self, diff, files_changed):
yaml_files = {}
for file_path in files_changed:
if file_path.endswith(('.yaml', '.yml')):
lines = []
in_file = False
for line in diff.split('\n'):
if f"--- {file_path}" in line or f"+++ {file_path}" in line:
in_file = True
continue
if line.startswith('---') or line.startswith('+++'):
in_file = False
if in_file:
if not line.startswith('-') and not line.startswith('@@'):
clean_line = line[1:] if line.startswith('+') else line
lines.append(clean_line)
if lines:
yaml_files[file_path] = '\n'.join(lines)
return yaml_files
def _check_image_tags(self, doc, file_path):
violations = []
containers = self._get_containers(doc)
for container in containers:
image = container.get('image', '')
if ':latest' in image:
violations.append(ComplianceViolation(
rule="no_latest_tags",
severity="error",
file_path=file_path,
description=f"Container using :latest tag: {image}",
suggestion="Use semantic versioning (e.g., v1.2.3) or image digest"
))
elif ':' not in image:
violations.append(ComplianceViolation(
rule="explicit_tag_required",
severity="warning",
file_path=file_path,
description=f"Container missing explicit tag: {image}",
suggestion="Add explicit version tag"
))
return violations
def _check_resource_limits(self, doc, file_path):
violations = []
containers = self._get_containers(doc)
for container in containers:
resources = container.get('resources', {})
limits = resources.get('limits', {})
if not limits.get('cpu'):
violations.append(ComplianceViolation(
rule="cpu_limits_required",
severity="warning",
file_path=file_path,
description=f"Container '{container.get('name')}' missing CPU limits",
suggestion="Add resources.limits.cpu"
))
if not limits.get('memory'):
violations.append(ComplianceViolation(
rule="memory_limits_required",
severity="warning",
file_path=file_path,
description=f"Container '{container.get('name')}' missing memory limits",
suggestion="Add resources.limits.memory"
))
return violations
def _check_labels(self, doc, file_path):
violations = []
metadata = doc.get('metadata', {})
labels = metadata.get('labels', {})
required_labels = ['app', 'version']
for label in required_labels:
if label not in labels:
violations.append(ComplianceViolation(
rule="required_labels",
severity="warning",
file_path=file_path,
description=f"Missing required label: {label}",
suggestion=f"Add metadata.labels.{label}"
))
return violations
def _check_security_context(self, doc, file_path):
violations = []
containers = self._get_containers(doc)
for container in containers:
security_context = container.get('securityContext', {})
if security_context.get('privileged'):
violations.append(ComplianceViolation(
rule="no_privileged_containers",
severity="error",
file_path=file_path,
description=f"Container '{container.get('name')}' running in privileged mode",
suggestion="Remove privileged: true unless absolutely necessary"
))
if doc.get('kind') == 'Pod':
spec = doc.get('spec', {})
if spec.get('hostNetwork'):
violations.append(ComplianceViolation(
rule="no_host_network",
severity="error",
file_path=file_path,
description="Pod using host network",
suggestion="Remove hostNetwork: true unless required"
))
return violations
def _get_containers(self, doc):
containers = []
if doc.get('kind') in ['Deployment', 'StatefulSet', 'DaemonSet', 'Job', 'CronJob']:
spec = doc.get('spec', {})
template = spec.get('template', {})
pod_spec = template.get('spec', {})
containers = pod_spec.get('containers', [])
elif doc.get('kind') == 'Pod':
spec = doc.get('spec', {})
containers = spec.get('containers', [])
return containers
class RiskEnsembleAgent(Agent):
name = "Risk Ensemble"
color = Agent.GREEN
SECURITY_WEIGHT = 0.6
COMPLIANCE_WEIGHT = 0.4
def __init__(self):
pass
def assess(self, security_score, compliance_score):
overall_risk = (
security_score.risk * self.SECURITY_WEIGHT +
compliance_score.risk * self.COMPLIANCE_WEIGHT
)
if overall_risk < 0.3:
risk_level = RiskLevel.SAFE
recommendation = "SAFE TO MERGE - No significant issues found"
elif overall_risk < 0.7:
risk_level = RiskLevel.REVIEW
recommendation = "REVIEW NEEDED - Address issues before merging"
else:
risk_level = RiskLevel.RISKY
recommendation = "HIGH RISK - Do not merge until critical issues are resolved"
confidence = (security_score.confidence + 0.9) / 2
return RiskAssessment(
overall_risk=overall_risk,
risk_level=risk_level,
security_score=security_score,
compliance_score=compliance_score,
recommendation=recommendation,
confidence=confidence
)

View File

@@ -0,0 +1,542 @@
import os
import json
from pathlib import Path
from datetime import datetime
from typing import List
from dotenv import load_dotenv
import gradio as gr
from models import PullRequest, ReviewResult, RiskLevel, SecurityIssue, ComplianceViolation, SecurityScore, ComplianceScore, RiskAssessment
from agents import GitOpsScannerAgent, SecurityAgent, ComplianceAgent, RiskEnsembleAgent
class GitOpsGuardian:
MEMORY_FILE = "reviewed_prs.json"
def __init__(self):
load_dotenv()
self.github_token = os.getenv('GITHUB_TOKEN')
self.openai_api_key = os.getenv('OPENAI_API_KEY')
self.gitops_repos = os.getenv('GITOPS_REPOS', '').split(',')
if not self.github_token:
raise ValueError("GITHUB_TOKEN not found")
if not self.openai_api_key:
raise ValueError("OPENAI_API_KEY not found")
if not self.gitops_repos or self.gitops_repos == ['']:
raise ValueError("GITOPS_REPOS not found")
self.scanner = GitOpsScannerAgent(self.github_token)
self.security_agent = SecurityAgent(self.openai_api_key)
self.compliance_agent = ComplianceAgent(self.github_token)
self.ensemble_agent = RiskEnsembleAgent()
self.total_cost = 0.0
self.history_file = "review_history.json"
self.memory = self._load_memory()
def _load_memory(self):
if Path(self.MEMORY_FILE).exists():
try:
with open(self.MEMORY_FILE, 'r') as f:
data = json.load(f)
return data.get('reviewed_prs', [])
except:
return []
return []
def _save_memory(self):
try:
data = {
'reviewed_prs': self.memory,
'last_updated': datetime.now().isoformat()
}
with open(self.MEMORY_FILE, 'w') as f:
json.dump(data, f, indent=2)
except:
pass
def scan_prs(self):
prs = self.scanner.scan(self.gitops_repos, self.memory)
return prs
def review_pr(self, pr):
security_score = self.security_agent.review(pr)
compliance_score = self.compliance_agent.review(pr)
risk_assessment = self.ensemble_agent.assess(security_score, compliance_score)
result = ReviewResult(
pr=pr,
risk_assessment=risk_assessment,
reviewed_at=datetime.now()
)
if pr.url not in self.memory:
self.memory.append(pr.url)
self._save_memory()
self.total_cost += security_score.cost
self._save_history(result)
return result
def _save_history(self, result):
try:
history = []
if Path(self.history_file).exists():
with open(self.history_file, 'r') as f:
history = json.load(f)
history.append({
'pr_number': result.pr.number,
'pr_url': result.pr.url,
'repo': result.pr.repo,
'title': result.pr.title,
'author': result.pr.author,
'risk_score': result.risk_assessment.overall_risk,
'risk_level': result.risk_assessment.risk_level.value,
'security_risk': result.risk_assessment.security_score.risk,
'security_summary': result.risk_assessment.security_score.summary,
'compliance_risk': result.risk_assessment.compliance_score.risk,
'compliance_summary': result.risk_assessment.compliance_score.summary,
'security_issues_count': len(result.risk_assessment.security_score.issues),
'compliance_violations_count': len(result.risk_assessment.compliance_score.violations),
'cost': result.risk_assessment.security_score.cost,
'reviewed_at': result.reviewed_at.isoformat()
})
with open(self.history_file, 'w') as f:
json.dump(history, f, indent=2)
except:
pass
def post_pr_comment(self, pr, result):
try:
from github import Github
gh = Github(self.github_token)
repo = gh.get_repo(pr.repo)
pr_obj = repo.get_pull(pr.number)
comment = f"""## GitOps Guardian Security Review
**Risk Level:** {result.risk_assessment.risk_level.value} (Score: {result.risk_assessment.overall_risk:.2f})
### Security Analysis
- **Risk:** {result.risk_assessment.security_score.risk:.2f}
- **Issues Found:** {len(result.risk_assessment.security_score.issues)}
- **Summary:** {result.risk_assessment.security_score.summary}
### Compliance Check
- **Risk:** {result.risk_assessment.compliance_score.risk:.2f}
- **Violations:** {len(result.risk_assessment.compliance_score.violations)}
- **Summary:** {result.risk_assessment.compliance_score.summary}
### Recommendation
{result.risk_assessment.recommendation}
---
*Automated review by GitOps Guardian*
"""
pr_obj.create_issue_comment(comment)
return True
except:
return False
def review_all(self):
prs = self.scan_prs()
if not prs:
return []
results = []
for pr in prs:
try:
result = self.review_pr(pr)
results.append(result)
except:
pass
return results
def create_gradio_app():
try:
app = GitOpsGuardian()
except Exception as e:
raise
def load_historical_results():
historical_results = []
try:
if Path(app.history_file).exists():
with open(app.history_file, 'r') as f:
history = json.load(f)
for entry in history:
issues_count = entry.get('security_issues_count', 0)
violations_count = entry.get('compliance_violations_count', 0)
pr = PullRequest(
repo=entry['repo'],
number=entry['pr_number'],
title=entry.get('title', f"PR #{entry['pr_number']}"),
author=entry.get('author', 'unknown'),
url=entry['pr_url'],
diff="",
files_changed=[],
created_at=datetime.fromisoformat(entry['reviewed_at']),
labels=[]
)
security_issues = [
SecurityIssue(
type="historical",
severity="info",
description=f"Historical record ({issues_count} issues found)",
recommendation=""
)
] if issues_count > 0 else []
compliance_violations = [
ComplianceViolation(
rule="historical",
severity="info",
description=f"Historical record ({violations_count} violations found)",
recommendation=""
)
] if violations_count > 0 else []
security_score = SecurityScore(
risk=entry['security_risk'],
issues=security_issues,
summary=entry.get('security_summary', 'Historical review'),
cost=entry.get('cost', 0.0)
)
compliance_score = ComplianceScore(
risk=entry['compliance_risk'],
violations=compliance_violations,
passed_checks=[],
summary=entry.get('compliance_summary', 'Historical review')
)
risk_assessment = RiskAssessment(
overall_risk=entry['risk_score'],
risk_level=RiskLevel(entry['risk_level']),
security_score=security_score,
compliance_score=compliance_score,
recommendation="",
confidence=0.85
)
result = ReviewResult(
pr=pr,
risk_assessment=risk_assessment,
reviewed_at=datetime.fromisoformat(entry['reviewed_at'])
)
historical_results.append(result)
except:
pass
return historical_results
all_results = load_historical_results()
post_comments_enabled = [False]
def export_json():
if not all_results:
return None
try:
export_data = [
{
'pr_number': r.pr.number,
'title': r.pr.title,
'repo': r.pr.repo,
'url': r.pr.url,
'risk_score': r.risk_assessment.overall_risk,
'risk_level': r.risk_assessment.risk_level.value,
'security_risk': r.risk_assessment.security_score.risk,
'compliance_risk': r.risk_assessment.compliance_score.risk,
'reviewed_at': r.reviewed_at.isoformat()
} for r in all_results
]
path = "gitops_guardian_export.json"
with open(path, 'w') as f:
json.dump(export_data, f, indent=2)
return path
except:
return None
def export_csv():
if not all_results:
return None
try:
import csv
path = "gitops_guardian_export.csv"
with open(path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['PR', 'Title', 'Repo', 'Risk Score', 'Level', 'Security', 'Compliance', 'URL'])
for r in all_results:
writer.writerow([
r.pr.number, r.pr.title, r.pr.repo,
f"{r.risk_assessment.overall_risk:.2f}",
r.risk_assessment.risk_level.value,
f"{r.risk_assessment.security_score.risk:.2f}",
f"{r.risk_assessment.compliance_score.risk:.2f}",
r.pr.url
])
return path
except:
return None
def scan_and_review():
try:
results = app.review_all()
if not results:
if all_results:
table_data = format_table(all_results)
summary = "No new PRs found. Showing historical reviews below."
stats = format_stats(all_results)
return table_data, summary, stats
else:
return format_table([]), "No new PRs found", ""
if post_comments_enabled[0]:
for result in results:
app.post_pr_comment(result.pr, result)
all_results.extend(results)
table_data = format_table(all_results)
summary = format_summary(results)
stats = format_stats(all_results)
return table_data, summary, stats
except Exception as e:
error_msg = f"Error during scan: {str(e)}"
return [], error_msg, ""
def format_table(results):
table_data = []
for result in results:
pr = result.pr
risk = result.risk_assessment
if risk.risk_level == RiskLevel.SAFE:
emoji = "SAFE"
elif risk.risk_level == RiskLevel.REVIEW:
emoji = "REVIEW"
else:
emoji = "RISKY"
row = [
f"{emoji} #{pr.number}",
pr.title,
pr.repo,
f"{risk.overall_risk:.2f}",
risk.risk_level.value,
f"{risk.security_score.risk:.2f}",
f"{risk.compliance_score.risk:.2f}",
len(risk.security_score.issues),
len(risk.compliance_score.violations),
pr.url
]
table_data.append(row)
return table_data
def format_summary(results):
if not results:
return "No results"
summary = f"## Latest Scan Results\n\n"
summary += f"**Scanned**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
summary += f"**PRs Reviewed**: {len(results)}\n\n"
for result in results:
pr = result.pr
risk = result.risk_assessment
level_marker = "SAFE" if risk.risk_level == RiskLevel.SAFE else "REVIEW" if risk.risk_level == RiskLevel.REVIEW else "RISKY"
summary += f"### PR #{pr.number}: {pr.title}\n\n"
summary += f"**Risk**: {risk.overall_risk:.2f} ({level_marker})\n\n"
summary += f"**Security**: {risk.security_score.risk:.2f} - {risk.security_score.summary}\n\n"
if risk.security_score.issues:
summary += "**Security Issues**:\n"
for issue in risk.security_score.issues[:3]:
summary += f"- [{issue.severity}] {issue.description}\n"
summary += "\n"
summary += f"**Compliance**: {risk.compliance_score.risk:.2f} - {risk.compliance_score.summary}\n\n"
if risk.compliance_score.violations:
summary += "**Compliance Violations**:\n"
for violation in risk.compliance_score.violations[:3]:
summary += f"- [{violation.severity}] {violation.description}\n"
summary += "\n"
summary += f"**Recommendation**: {risk.recommendation}\n\n"
summary += "---\n\n"
return summary
def format_stats(results):
if not results:
return "No statistics available"
total = len(results)
safe = sum(1 for r in results if r.risk_assessment.risk_level == RiskLevel.SAFE)
review = sum(1 for r in results if r.risk_assessment.risk_level == RiskLevel.REVIEW)
risky = sum(1 for r in results if r.risk_assessment.risk_level == RiskLevel.RISKY)
avg_risk = sum(r.risk_assessment.overall_risk for r in results) / total
history_stats = ""
try:
if Path(app.history_file).exists():
with open(app.history_file, 'r') as f:
history = json.load(f)
if len(history) > 1:
recent_risks = [h['risk_score'] for h in history[-10:]]
trend = "Increasing" if recent_risks[-1] > recent_risks[0] else "Decreasing"
history_stats = f"\n**Historical Trend** (last 10): {trend}\n**Total Reviews**: {len(history)}"
except:
pass
stats = f"""## Overall Statistics
**Total PRs Reviewed**: {total}
**Risk Distribution**:
- Safe: {safe} ({safe/total*100:.1f}%)
- Review: {review} ({review/total*100:.1f}%)
- Risky: {risky} ({risky/total*100:.1f}%)
**Average Risk Score**: {avg_risk:.2f}
**Session Cost**: ${app.total_cost:.6f}
{history_stats}
**Repositories**: {', '.join(app.gitops_repos)}
"""
return stats
with gr.Blocks(title="GitOps Guardian", theme=gr.themes.Soft()) as interface:
gr.Markdown("""
# GitOps Guardian
**AI-Powered Pull Request Security & Compliance Review**
Multi-agent system that scans GitOps repositories and identifies security risks and compliance violations.
""")
with gr.Row():
scan_button = gr.Button("Scan Repositories & Review PRs", variant="primary", size="lg")
post_comment_checkbox = gr.Checkbox(label="Post results as PR comments", value=False)
with gr.Row():
export_json_btn = gr.Button("Export JSON", size="sm")
export_csv_btn = gr.Button("Export CSV", size="sm")
json_file = gr.File(label="Download JSON", visible=False)
csv_file = gr.File(label="Download CSV", visible=False)
gr.Markdown("## Review Results")
initial_table = format_table(all_results) if all_results else []
initial_summary = "Historical reviews loaded. Click 'Scan' to check for new PRs." if all_results else "Click 'Scan' to review PRs"
initial_stats = format_stats(all_results) if all_results else "No reviews yet"
with gr.Row():
pr_table = gr.Dataframe(
headers=[
"PR #", "Title", "Repo", "Risk Score", "Level",
"Security", "Compliance", "Security Issues",
"Violations", "URL"
],
value=initial_table,
label="Pull Requests",
wrap=True,
interactive=False,
column_widths=["5%", "20%", "15%", "8%", "8%", "8%", "8%", "8%", "8%", "12%"]
)
with gr.Row():
with gr.Column(scale=2):
summary_output = gr.Markdown(value=initial_summary, label="Detailed Results")
with gr.Column(scale=1):
stats_output = gr.Markdown(value=initial_stats, label="Statistics")
gr.Markdown("""
## How It Works
1. **Scanner Agent** - Fetches open PRs from configured GitOps repos
2. **Security Agent** - Analyzes for secrets and vulnerabilities using OpenAI
3. **Compliance Agent** - Validates Kubernetes best practices
4. **Ensemble Agent** - Combines scores into risk assessment
## Configuration
Set these in `.env`:
- `GITHUB_TOKEN` - GitHub personal access token
- `OPENAI_API_KEY` - OpenAI API key
- `GITOPS_REPOS` - Comma-separated repo names (owner/repo)
""")
def update_comment_setting(enabled):
post_comments_enabled[0] = enabled
return f"PR comments: {'Enabled' if enabled else 'Disabled'}"
post_comment_checkbox.change(
fn=update_comment_setting,
inputs=[post_comment_checkbox],
outputs=[]
)
scan_button.click(
fn=scan_and_review,
inputs=[],
outputs=[pr_table, summary_output, stats_output]
)
export_json_btn.click(
fn=export_json,
inputs=[],
outputs=[json_file]
)
export_csv_btn.click(
fn=export_csv,
inputs=[],
outputs=[csv_file]
)
return interface
def main():
print("\nGitOps Guardian - AI-Powered PR Review System\n")
try:
interface = create_gradio_app()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)
except Exception as e:
print(f"Failed to start: {e}")
raise
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,83 @@
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
from enum import Enum
class RiskLevel(str, Enum):
SAFE = "SAFE"
REVIEW = "REVIEW"
RISKY = "RISKY"
class SecurityIssue(BaseModel):
type: str
severity: str
line_number: Optional[int] = None
file_path: Optional[str] = None
description: str
recommendation: str
class ComplianceViolation(BaseModel):
rule: str
severity: str
file_path: Optional[str] = None
description: str
suggestion: str
class SecurityScore(BaseModel):
risk: float
issues: List[SecurityIssue] = []
summary: str
confidence: float = 0.8
cost: float = 0.0
class ComplianceScore(BaseModel):
risk: float
violations: List[ComplianceViolation] = []
passed_checks: List[str] = []
summary: str
class RiskAssessment(BaseModel):
overall_risk: float
risk_level: RiskLevel
security_score: SecurityScore
compliance_score: ComplianceScore
recommendation: str
confidence: float = 0.8
class PullRequest(BaseModel):
repo: str
number: int
title: str
author: str
url: str
diff: str
files_changed: List[str] = []
created_at: datetime
labels: List[str] = []
class ReviewResult(BaseModel):
pr: PullRequest
risk_assessment: RiskAssessment
reviewed_at: datetime = Field(default_factory=datetime.now)
def to_summary(self):
risk = self.risk_assessment
level_marker = "SAFE" if risk.risk_level == RiskLevel.SAFE else "REVIEW" if risk.risk_level == RiskLevel.REVIEW else "RISKY"
summary = f"""PR #{self.pr.number}: {self.pr.title}
Overall Risk: {risk.overall_risk:.2f} - {level_marker}
Security: {risk.security_score.risk:.2f} - {risk.security_score.summary}
Compliance: {risk.compliance_score.risk:.2f} - {risk.compliance_score.summary}
Recommendation: {risk.recommendation}
"""
return summary.strip()

View File

@@ -0,0 +1,18 @@
# GitOps Guardian Dependencies
# Core
gradio>=4.0.0
pydantic>=2.0.0
python-dotenv
# AI/LLM
openai>=1.0.0
# GitHub
PyGithub
# YAML parsing
PyYAML
# Utilities
python-dateutil