Files
LLM_Engineering_OLD/week8/community_contributions/salah/gitops-guardian/agents.py

408 lines
13 KiB
Python

import os
import re
import yaml
from typing import List, Dict
from openai import OpenAI
from github import Github
from models import (
PullRequest, SecurityScore, ComplianceScore, RiskAssessment,
SecurityIssue, ComplianceViolation, RiskLevel
)
class Agent:
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
BG_BLACK = '\033[40m'
RESET = '\033[0m'
name = "Agent"
color = WHITE
def log(self, message):
pass
class GitOpsScannerAgent(Agent):
name = "GitOps Scanner"
color = Agent.CYAN
def __init__(self, github_token):
self.github = Github(github_token)
def scan(self, repos, memory=[]):
all_prs = []
for repo_name in repos:
try:
repo = self.github.get_repo(repo_name)
pulls = repo.get_pulls(state='open', sort='created', direction='desc')
for pr in pulls:
pr_url = pr.html_url
if pr_url in memory:
continue
files = pr.get_files()
diff_content = ""
files_changed = []
for file in files:
files_changed.append(file.filename)
if file.patch:
diff_content += f"\n\n--- {file.filename}\n{file.patch}"
pull_request = PullRequest(
repo=repo_name,
number=pr.number,
title=pr.title,
author=pr.user.login,
url=pr_url,
diff=diff_content,
files_changed=files_changed,
created_at=pr.created_at,
labels=[label.name for label in pr.labels]
)
all_prs.append(pull_request)
except Exception as e:
pass
return all_prs
class SecurityAgent(Agent):
name = "Security Agent"
color = Agent.RED
def __init__(self, openai_api_key):
self.client = OpenAI(api_key=openai_api_key)
def review(self, pr):
system_prompt = """You are a security expert analyzing GitOps infrastructure changes.
Identify security issues in Kubernetes manifests, Helm charts, and infrastructure code.
Focus on:
1. Hardcoded secrets (AWS keys, passwords, tokens, API keys)
2. Insecure container configurations (privileged mode, hostNetwork)
3. Missing security contexts
4. Overly permissive RBAC
5. Exposed services without proper restrictions
6. Using :latest tags or insecure images
Respond in JSON format:
{
"risk_score": 0.0-1.0,
"issues": [
{
"type": "hardcoded_secret",
"severity": "critical|high|medium|low",
"description": "Found AWS access key",
"line_number": 15,
"file_path": "deployment.yaml",
"recommendation": "Use Kubernetes Secret instead"
}
],
"summary": "Brief summary of findings"
}
"""
user_prompt = f"""Analyze this GitOps pull request for security issues:
Title: {pr.title}
Files changed: {', '.join(pr.files_changed)}
Diff:
{pr.diff[:3000]}
Identify any security concerns."""
try:
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_format={"type": "json_object"},
temperature=0.3
)
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
cost = (input_tokens * 0.150 / 1_000_000) + (output_tokens * 0.600 / 1_000_000)
result = eval(response.choices[0].message.content)
issues = []
for issue_dict in result.get('issues', []):
issue = SecurityIssue(
type=issue_dict.get('type', 'unknown'),
severity=issue_dict.get('severity', 'medium'),
line_number=issue_dict.get('line_number'),
file_path=issue_dict.get('file_path'),
description=issue_dict.get('description', ''),
recommendation=issue_dict.get('recommendation', '')
)
issues.append(issue)
risk_score = float(result.get('risk_score', 0.0))
summary = result.get('summary', 'No issues found')
return SecurityScore(
risk=risk_score,
issues=issues,
summary=summary,
confidence=0.85,
cost=cost
)
except Exception as e:
return SecurityScore(
risk=0.5,
issues=[],
summary=f"Error during analysis: {str(e)}",
confidence=0.3
)
class ComplianceAgent(Agent):
name = "Compliance Agent"
color = Agent.YELLOW
def __init__(self, github_token=None):
self.github_client = Github(github_token) if github_token else None
def review(self, pr):
violations = []
passed_checks = []
yaml_files = self._extract_yaml_files(pr.diff, pr.files_changed)
for file_path, content in yaml_files.items():
try:
docs = list(yaml.safe_load_all(content))
for doc in docs:
if not doc or not isinstance(doc, dict):
continue
image_violations = self._check_image_tags(doc, file_path)
violations.extend(image_violations)
if not image_violations:
passed_checks.append(f"Image tags OK in {file_path}")
limit_violations = self._check_resource_limits(doc, file_path)
violations.extend(limit_violations)
if not limit_violations:
passed_checks.append(f"Resource limits OK in {file_path}")
label_violations = self._check_labels(doc, file_path)
violations.extend(label_violations)
if not label_violations:
passed_checks.append(f"Labels OK in {file_path}")
security_violations = self._check_security_context(doc, file_path)
violations.extend(security_violations)
if not security_violations:
passed_checks.append(f"Security context OK in {file_path}")
except Exception as e:
pass
risk_score = min(1.0, len(violations) / 10.0)
summary = f"Found {len(violations)} violations, {len(passed_checks)} checks passed"
return ComplianceScore(
risk=risk_score,
violations=violations,
passed_checks=passed_checks,
summary=summary
)
def _extract_yaml_files(self, diff, files_changed):
yaml_files = {}
for file_path in files_changed:
if file_path.endswith(('.yaml', '.yml')):
lines = []
in_file = False
for line in diff.split('\n'):
if f"--- {file_path}" in line or f"+++ {file_path}" in line:
in_file = True
continue
if line.startswith('---') or line.startswith('+++'):
in_file = False
if in_file:
if not line.startswith('-') and not line.startswith('@@'):
clean_line = line[1:] if line.startswith('+') else line
lines.append(clean_line)
if lines:
yaml_files[file_path] = '\n'.join(lines)
return yaml_files
def _check_image_tags(self, doc, file_path):
violations = []
containers = self._get_containers(doc)
for container in containers:
image = container.get('image', '')
if ':latest' in image:
violations.append(ComplianceViolation(
rule="no_latest_tags",
severity="error",
file_path=file_path,
description=f"Container using :latest tag: {image}",
suggestion="Use semantic versioning (e.g., v1.2.3) or image digest"
))
elif ':' not in image:
violations.append(ComplianceViolation(
rule="explicit_tag_required",
severity="warning",
file_path=file_path,
description=f"Container missing explicit tag: {image}",
suggestion="Add explicit version tag"
))
return violations
def _check_resource_limits(self, doc, file_path):
violations = []
containers = self._get_containers(doc)
for container in containers:
resources = container.get('resources', {})
limits = resources.get('limits', {})
if not limits.get('cpu'):
violations.append(ComplianceViolation(
rule="cpu_limits_required",
severity="warning",
file_path=file_path,
description=f"Container '{container.get('name')}' missing CPU limits",
suggestion="Add resources.limits.cpu"
))
if not limits.get('memory'):
violations.append(ComplianceViolation(
rule="memory_limits_required",
severity="warning",
file_path=file_path,
description=f"Container '{container.get('name')}' missing memory limits",
suggestion="Add resources.limits.memory"
))
return violations
def _check_labels(self, doc, file_path):
violations = []
metadata = doc.get('metadata', {})
labels = metadata.get('labels', {})
required_labels = ['app', 'version']
for label in required_labels:
if label not in labels:
violations.append(ComplianceViolation(
rule="required_labels",
severity="warning",
file_path=file_path,
description=f"Missing required label: {label}",
suggestion=f"Add metadata.labels.{label}"
))
return violations
def _check_security_context(self, doc, file_path):
violations = []
containers = self._get_containers(doc)
for container in containers:
security_context = container.get('securityContext', {})
if security_context.get('privileged'):
violations.append(ComplianceViolation(
rule="no_privileged_containers",
severity="error",
file_path=file_path,
description=f"Container '{container.get('name')}' running in privileged mode",
suggestion="Remove privileged: true unless absolutely necessary"
))
if doc.get('kind') == 'Pod':
spec = doc.get('spec', {})
if spec.get('hostNetwork'):
violations.append(ComplianceViolation(
rule="no_host_network",
severity="error",
file_path=file_path,
description="Pod using host network",
suggestion="Remove hostNetwork: true unless required"
))
return violations
def _get_containers(self, doc):
containers = []
if doc.get('kind') in ['Deployment', 'StatefulSet', 'DaemonSet', 'Job', 'CronJob']:
spec = doc.get('spec', {})
template = spec.get('template', {})
pod_spec = template.get('spec', {})
containers = pod_spec.get('containers', [])
elif doc.get('kind') == 'Pod':
spec = doc.get('spec', {})
containers = spec.get('containers', [])
return containers
class RiskEnsembleAgent(Agent):
name = "Risk Ensemble"
color = Agent.GREEN
SECURITY_WEIGHT = 0.6
COMPLIANCE_WEIGHT = 0.4
def __init__(self):
pass
def assess(self, security_score, compliance_score):
overall_risk = (
security_score.risk * self.SECURITY_WEIGHT +
compliance_score.risk * self.COMPLIANCE_WEIGHT
)
if overall_risk < 0.3:
risk_level = RiskLevel.SAFE
recommendation = "SAFE TO MERGE - No significant issues found"
elif overall_risk < 0.7:
risk_level = RiskLevel.REVIEW
recommendation = "REVIEW NEEDED - Address issues before merging"
else:
risk_level = RiskLevel.RISKY
recommendation = "HIGH RISK - Do not merge until critical issues are resolved"
confidence = (security_score.confidence + 0.9) / 2
return RiskAssessment(
overall_risk=overall_risk,
risk_level=risk_level,
security_score=security_score,
compliance_score=compliance_score,
recommendation=recommendation,
confidence=confidence
)