
Second Day - 30/7
Deployed this website to alexmacresearch.org!
’Red Teaming’ Prototype
Looking at some pre-commit hooks, I can see that there does exist listeners that can actually look at pre-commit changes to files in a repository. There even exists some incredibly useful already made hooks (detect-aws-credentials and detect-private-key) that have some red teaming capabilities out of the box.
For notifications, there is a few we can try (possibly all of them could be an idea, but that could be a bit cumbersome), there is:
- Email Notifications
- Local Mac Notifications
- Slack/Teams Notifications
The Local Mac notification is simply just using a terminal command, the email and slack notification would require an extra api call/web hooks. While definitely possible, I will focus on the local Mac notification for now, and then build upon that with improved features, I just want a prototype built today.
Thinking about the whole design, I would most likely design it like this:
- Git Pre-commit hook listens for any changes to any repository being worked on
- MCP server gets sent the github repository for a file change
- Using Claude API, it analyses the code for vulnerabilities
- Sends it back to the MCP server to execute the terminal commands to send a notification to you (if there is a true security vulnerability)
As I would like to use an MCP server for this, it would obviously need access to a few tools in order for this to work:
- File system access
- Terminal use
- Network access for some of the notifications
I have set up a small repository for the initial stages of making this (see here), this is where I will be vibe coding the plan I set out.
This is the rudimentary scanning file.
#!/usr/bin/env python3
"""
Security Scanner - A comprehensive security analysis tool
Integrates with MCP server for automated scanning and monitoring
"""
import os
import sys
import json
import subprocess
import argparse
from pathlib import Path
from typing import List, Dict, Any
import re
class SecurityScanner:
def __init__(self):
self.issues = []
self.patterns = {
'secrets': [
(r'(?:password|pwd|pass)\s*[:=]\s*[\'"][^\'"]+[\'"]', 'high', 'Hardcoded password'),
(r'(?:api[_-]?key|apikey)\s*[:=]\s*[\'"][^\'"]+[\'"]', 'critical', 'API key in code'),
(r'(?:secret|token)\s*[:=]\s*[\'"][^\'"]+[\'"]', 'high', 'Secret token in code'),
(r'sk-[a-zA-Z0-9]{48}', 'critical', 'OpenAI API key'),
(r'ghp_[a-zA-Z0-9]{36}', 'critical', 'GitHub personal access token'),
(r'xoxb-[0-9]{11}-[0-9]{11}-[a-zA-Z0-9]{24}', 'critical', 'Slack bot token'),
],
'vulnerabilities': [
(r'eval\s*\(', 'high', 'Use of eval() function'),
(r'exec\s*\(', 'high', 'Use of exec() function'),
(r'os\.system\s*\(', 'high', 'OS command execution'),
(r'subprocess\.call\s*\(.*shell\s*=\s*True', 'high', 'Shell injection risk'),
(r'input\s*\([^)]*\)', 'medium', 'Use of input() function'),
(r'pickle\.loads?\s*\(', 'high', 'Unsafe pickle deserialization'),
(r'yaml\.load\s*\(', 'medium', 'Unsafe YAML loading'),
(r'sql.*\+.*%', 'high', 'Potential SQL injection'),
]
}
def scan_file(self, file_path: Path) -> List[Dict[str, Any]]:
"""Scan a single file for security issues"""
issues = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
# Check for secrets
for pattern, severity, description in self.patterns['secrets']:
for line_num, line in enumerate(lines, 1):
if re.search(pattern, line, re.IGNORECASE):
issues.append({
'type': 'secret',
'severity': severity,
'file': str(file_path),
'line': line_num,
'description': description,
'recommendation': 'Move secrets to environment variables'
})
# Check for vulnerabilities
for pattern, severity, description in self.patterns['vulnerabilities']:
for line_num, line in enumerate(lines, 1):
if re.search(pattern, line, re.IGNORECASE):
issues.append({
'type': 'vulnerability',
'severity': severity,
'file': str(file_path),
'line': line_num,
'description': description,
'recommendation': 'Review and use safer alternatives'
})
except Exception as e:
print(f"Error scanning {file_path}: {e}")
return issues
def scan_directory(self, directory: Path, extensions: List[str] = None) -> Dict[str, Any]:
"""Scan a directory for security issues"""
if extensions is None:
extensions = ['.py', '.js', '.ts', '.java', '.php', '.rb', '.go', '.rs']
all_issues = []
scanned_files = 0
for file_path in directory.rglob('*'):
if (file_path.is_file() and
file_path.suffix in extensions and
not any(part.startswith('.') for part in file_path.parts) and
'node_modules' not in file_path.parts):
file_issues = self.scan_file(file_path)
all_issues.extend(file_issues)
scanned_files += 1
# Create summary
summary = {
'total': len(all_issues),
'critical': len([i for i in all_issues if i['severity'] == 'critical']),
'high': len([i for i in all_issues if i['severity'] == 'high']),
'medium': len([i for i in all_issues if i['severity'] == 'medium']),
'low': len([i for i in all_issues if i['severity'] == 'low']),
}
return {
'issues': all_issues,
'summary': summary,
'scanned_files': scanned_files,
'timestamp': subprocess.check_output(['date', '-Iseconds']).decode().strip()
}
def scan_git_staged(self) -> Dict[str, Any]:
"""Scan only git staged files"""
try:
# Get staged files
result = subprocess.run(['git', 'diff', '--cached', '--name-only'],
capture_output=True, text=True, check=True)
staged_files = result.stdout.strip().split('\n')
if not staged_files or staged_files == ['']:
return {
'issues': [],
'summary': {'total': 0, 'critical': 0, 'high': 0, 'medium': 0, 'low': 0},
'scanned_files': 0,
'message': 'No staged files to scan'
}
all_issues = []
scanned_files = 0
for file_name in staged_files:
file_path = Path(file_name)
if file_path.exists() and file_path.is_file():
file_issues = self.scan_file(file_path)
all_issues.extend(file_issues)
scanned_files += 1
summary = {
'total': len(all_issues),
'critical': len([i for i in all_issues if i['severity'] == 'critical']),
'high': len([i for i in all_issues if i['severity'] == 'high']),
'medium': len([i for i in all_issues if i['severity'] == 'medium']),
'low': len([i for i in all_issues if i['severity'] == 'low']),
}
return {
'issues': all_issues,
'summary': summary,
'scanned_files': scanned_files,
'timestamp': subprocess.check_output(['date', '-Iseconds']).decode().strip()
}
except subprocess.CalledProcessError as e:
return {
'error': f'Git command failed: {e}',
'issues': [],
'summary': {'total': 0, 'critical': 0, 'high': 0, 'medium': 0, 'low': 0},
'scanned_files': 0
}
def generate_report(self, results: Dict[str, Any], output_format: str = 'json') -> str:
"""Generate a report in the specified format"""
if output_format == 'json':
return json.dumps(results, indent=2)
elif output_format == 'text':
report = []
report.append("=" * 60)
report.append("SECURITY SCAN REPORT")
report.append("=" * 60)
report.append(f"Timestamp: {results.get('timestamp', 'N/A')}")
report.append(f"Files Scanned: {results.get('scanned_files', 0)}")
report.append("")
summary = results.get('summary', {})
report.append("SUMMARY:")
report.append(f" Total Issues: {summary.get('total', 0)}")
report.append(f" Critical: {summary.get('critical', 0)}")
report.append(f" High: {summary.get('high', 0)}")
report.append(f" Medium: {summary.get('medium', 0)}")
report.append(f" Low: {summary.get('low', 0)}")
report.append("")
if results.get('issues'):
report.append("ISSUES FOUND:")
report.append("-" * 40)
for issue in results['issues']:
report.append(f"[{issue['severity'].upper()}] {issue['description']}")
report.append(f" File: {issue['file']}:{issue.get('line', '?')}")
report.append(f" Type: {issue['type']}")
report.append(f" Recommendation: {issue['recommendation']}")
report.append("")
return '\n'.join(report)
return json.dumps(results, indent=2)
def main():
parser = argparse.ArgumentParser(description='Security Scanner')
parser.add_argument('path', nargs='?', default='.', help='Path to scan (default: current directory)')
parser.add_argument('--staged', action='store_true', help='Scan only git staged files')
parser.add_argument('--format', choices=['json', 'text'], default='json', help='Output format')
parser.add_argument('--output', '-o', help='Output file (default: stdout)')
args = parser.parse_args()
scanner = SecurityScanner()
if args.staged:
results = scanner.scan_git_staged()
else:
scan_path = Path(args.path)
if not scan_path.exists():
print(f"Error: Path '{scan_path}' does not exist", file=sys.stderr)
sys.exit(1)
results = scanner.scan_directory(scan_path)
report = scanner.generate_report(results, args.format)
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report saved to {args.output}")
else:
print(report)
# Exit with error code if critical or high severity issues found
summary = results.get('summary', {})
if summary.get('critical', 0) > 0 or summary.get('high', 0) > 0:
sys.exit(1)
if __name__ == '__main__':
main()
This has some very good security features:
- Secrets Detection:
- Hardcoded passwords - Pattern: password|pwd|pass with string values
- API keys - Pattern: api_key|apikey with string values
- Secret tokens - Pattern: secret|token with string values
- OpenAI API keys - Pattern: sk-[a-zA-Z0-9]{48}
- GitHub personal access tokens - Pattern: ghp_[a-zA-Z0-9]{36}
- Slack bot tokens - Pattern: xoxb-[0-9]{11}-[0-9]{11}-[a-zA-Z0-9]{24}
- Vulnerability Detection:
- Code injection - eval() and exec() function usage
- OS command execution - os.system() calls
- Shell injection - subprocess.call() with shell=True
- Unsafe input - input() function usage
- Unsafe deserialization - pickle.loads() calls
- Unsafe YAML loading - yaml.load() calls
- SQL injection - Basic SQL concatenation patterns
It also has different types of Severity Classification:
- Critical - API keys, tokens that could cause immediate damage
- High - Code injection, command execution, hardcoded passwords
- Medium - Unsafe input handling, YAML loading
- Low - Currently none defined
This needs an MCP server in order to scan what is being changed in files. At the moment I can only get it to work locally on the files I have made, I have started to add git webhooks to expand this to all repositories that I would manage
"""
MCP Server for Automated Security Scanning
Monitors git repo, triggers scanner, and executes security actions
Includes GitHub webhook integration for scanning commits across all repositories
"""
import os
import subprocess
import json
import requests
import hashlib
import hmac
import base64
import re
from flask import Flask, request, jsonify
from pathlib import Path
# Load environment variables from .env file if it exists
if os.path.exists('.env'):
with open('.env', 'r') as f:
for line in f:
if line.strip() and not line.startswith('#'):
key, value = line.strip().split('=', 1)
os.environ[key] = value
app = Flask(__name__)
SCANNER_PATH = 'scanner.py' # Path to your scanner
REPO_PATH = '.' # Path to your repo
# GitHub configuration
GITHUB_TOKEN = os.environ.get('GITHUB_TOKEN') # Set this in your environment
GITHUB_USERNAME = 'amacca1'
WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET', 'your-webhook-secret') # Set this in your environment
# Utility: Run scanner and parse results
def run_scanner(scan_staged=True):
cmd = ['python3', SCANNER_PATH]
if scan_staged:
cmd.append('--staged')
result = subprocess.run(cmd, capture_output=True, text=True)
try:
output = json.loads(result.stdout)
except Exception:
output = {'error': 'Failed to parse scanner output', 'raw': result.stdout}
return output
# Utility: Send Mac notification
def send_mac_notification(message):
result = subprocess.run(['osascript', '-e', f'display notification "{message}" with title "Security Alert"'], capture_output=True, text=True)
if result.returncode != 0:
print("Notification error:", result.stderr)
# Utility: Log incident
def log_incident(data):
with open('security_incidents.log', 'a') as f:
f.write(json.dumps(data) + '\n')
# Utility: Send email (example using mail)
def send_email(subject, body, to='alexcomp2@outlook.com'):
subprocess.run(['mail', '-s', subject, to], input=body, text=True)
# Endpoint: Trigger scan (simulate git event)
@app.route('/scan', methods=['POST'])
def scan():
scan_staged = request.json.get('staged', True)
results = run_scanner(scan_staged)
summary = results.get('summary', {})
if summary.get('critical', 0) > 0 or summary.get('high', 0) > 0:
send_mac_notification('Critical/High vulnerabilities found!')
log_incident(results)
send_email('Security Alert', json.dumps(results, indent=2))
return jsonify({'status': 'fail', 'results': results}), 400
return jsonify({'status': 'ok', 'results': results})
# Utility: Verify GitHub webhook signature
def verify_github_signature(payload_body, signature_header):
"""Verify that the payload was sent from GitHub by validating SHA256"""
if not signature_header:
return False
hash_object = hmac.new(
WEBHOOK_SECRET.encode('utf-8'),
msg=payload_body,
digestmod=hashlib.sha256
)
expected_signature = "sha256=" + hash_object.hexdigest()
return hmac.compare_digest(expected_signature, signature_header)
# Utility: Get file content from GitHub API
def get_file_content_from_github(repo_full_name, file_path, commit_sha):
"""Get file content from GitHub API"""
headers = {
'Authorization': f'token {GITHUB_TOKEN}',
'Accept': 'application/vnd.github.v3+json'
}
url = f'https://api.github.com/repos/{repo_full_name}/contents/{file_path}?ref={commit_sha}'
response = requests.get(url, headers=headers)
if response.status_code == 200:
import base64
content_data = response.json()
if content_data['encoding'] == 'base64':
return base64.b64decode(content_data['content']).decode('utf-8')
return None
# Utility: Scan modified files from a GitHub commit
def scan_github_files(repo_full_name, commit_sha, modified_files):
"""Scan modified files from a GitHub commit"""
issues = []
scanned_files = 0
# File extensions to scan
extensions = ['.py', '.js', '.ts', '.java', '.php', '.rb', '.go', '.rs']
for file_info in modified_files:
file_path = file_info['filename']
file_ext = Path(file_path).suffix
# Skip if not a supported file type
if file_ext not in extensions:
continue
# Skip deleted files
if file_info['status'] == 'removed':
continue
# Get file content from GitHub
content = get_file_content_from_github(repo_full_name, file_path, commit_sha)
if not content:
continue
# Scan the content using the same patterns from SecurityScanner
file_issues = scan_content(content, file_path)
issues.extend(file_issues)
scanned_files += 1
# Create summary
summary = {
'total': len(issues),
'critical': len([i for i in issues if i['severity'] == 'critical']),
'high': len([i for i in issues if i['severity'] == 'high']),
'medium': len([i for i in issues if i['severity'] == 'medium']),
'low': len([i for i in issues if i['severity'] == 'low']),
}
return {
'issues': issues,
'summary': summary,
'scanned_files': scanned_files,
'repository': repo_full_name,
'commit': commit_sha,
'timestamp': subprocess.check_output(['date', '-Iseconds']).decode().strip()
}
# Utility: Scan file content using the same patterns as SecurityScanner
def scan_content(content, file_path):
"""Scan file content using the same patterns as SecurityScanner"""
issues = []
lines = content.split('\n')
# Same patterns as in SecurityScanner
patterns = {
'secrets': [
(r'(?:password|pwd|pass)\s*[:=]\s*[\'"][^\'"]+[\'"]', 'high', 'Hardcoded password'),
(r'(?:api[_-]?key|apikey)\s*[:=]\s*[\'"][^\'"]+[\'"]', 'critical', 'API key in code'),
(r'(?:secret|token)\s*[:=]\s*[\'"][^\'"]+[\'"]', 'high', 'Secret token in code'),
(r'sk-[a-zA-Z0-9]{48}', 'critical', 'OpenAI API key'),
(r'ghp_[a-zA-Z0-9]{36}', 'critical', 'GitHub personal access token'),
(r'xoxb-[0-9]{11}-[0-9]{11}-[a-zA-Z0-9]{24}', 'critical', 'Slack bot token'),
],
'vulnerabilities': [
(r'eval\s*\(', 'high', 'Use of eval() function'),
(r'exec\s*\(', 'high', 'Use of exec() function'),
(r'os\.system\s*\(', 'high', 'OS command execution'),
(r'subprocess\.call\s*\(.*shell\s*=\s*True', 'high', 'Shell injection risk'),
(r'input\s*\([^)]*\)', 'medium', 'Use of input() function'),
(r'pickle\.loads?\s*\(', 'high', 'Unsafe pickle deserialization'),
(r'yaml\.load\s*\(', 'medium', 'Unsafe YAML loading'),
(r'sql.*\+.*%', 'high', 'Potential SQL injection'),
]
}
import re
# Check for secrets
for pattern, severity, description in patterns['secrets']:
for line_num, line in enumerate(lines, 1):
if re.search(pattern, line, re.IGNORECASE):
issues.append({
'type': 'secret',
'severity': severity,
'file': file_path,
'line': line_num,
'description': description,
'recommendation': 'Move secrets to environment variables'
})
# Check for vulnerabilities
for pattern, severity, description in patterns['vulnerabilities']:
for line_num, line in enumerate(lines, 1):
if re.search(pattern, line, re.IGNORECASE):
issues.append({
'type': 'vulnerability',
'severity': severity,
'file': file_path,
'line': line_num,
'description': description,
'recommendation': 'Review and use safer alternatives'
})
return issues
# New endpoint: GitHub webhook for monitoring all repositories
@app.route('/webhook/github', methods=['POST'])
def github_webhook():
"""Handle GitHub webhook events for push commits"""
# Verify webhook signature
signature = request.headers.get('X-Hub-Signature-256')
if not verify_github_signature(request.data, signature):
return jsonify({'error': 'Invalid signature'}), 403
event_type = request.headers.get('X-GitHub-Event')
if event_type == 'push':
payload = request.json
# Only process pushes to amacca1 repositories
repo_full_name = payload['repository']['full_name']
if not repo_full_name.startswith(f'{GITHUB_USERNAME}/'):
return jsonify({'message': 'Ignored - not amacca1 repository'}), 200
# Get commit information
commits = payload['commits']
for commit in commits:
commit_sha = commit['id']
modified_files = commit.get('modified', []) + commit.get('added', [])
# Convert to format expected by scan_github_files
file_list = [{'filename': f, 'status': 'modified'} for f in modified_files]
# Scan the commit
results = scan_github_files(repo_full_name, commit_sha, file_list)
# If critical or high vulnerabilities found, take action
summary = results.get('summary', {})
if summary.get('critical', 0) > 0 or summary.get('high', 0) > 0:
# Enhanced notification message with repository info
message = f'Critical/High vulnerabilities found in {repo_full_name}!'
send_mac_notification(message)
# Log incident with GitHub context
log_incident({
**results,
'source': 'github_webhook',
'commit_url': f"https://github.com/{repo_full_name}/commit/{commit_sha}"
})
# Enhanced email with GitHub context
email_body = f"""
Security Alert: Vulnerabilities detected in GitHub repository
Repository: {repo_full_name}
Commit: {commit_sha}
Commit URL: https://github.com/{repo_full_name}/commit/{commit_sha}
Scan Results:
{json.dumps(results, indent=2)}
"""
send_email(f'Security Alert - {repo_full_name}', email_body)
return jsonify({'message': 'Webhook processed successfully'}), 200
return jsonify({'message': 'Event type not supported'}), 200
# Example: Poll for git changes (could be run in background)
def poll_git_changes():
# This is a stub. In production, use webhooks or polling logic.
pass
if __name__ == '__main__':
app.run(port=5001)
Looking at Cerebras
Cerebras Cerebras have said that they ‘runs the latest AI models 20x faster than ChatGPT’ - This means that the majority of tasks ran by them is instant. They do this with a vast datacentre with supercomputers. These supercomputers are ran with a Wafer-scale engine (WSE) chip, which is both larger and more powerful than a traditional GPU. It also means that the entire AI model can be kept on the chip, so there is less latency and bandwidth problems that would normally reduce the speeds of a typical system.
Each WSE chip has 44GB (!!) of SRAM, compared to the MBs present in a typical on chip cache.
Simon Willison’s weblog
Having a look at Simon Willison’s weblog, OpenAI have introduced a new ChatGPT feature, called the ‘study mode’. This is very useful now as it have bridged the gap somewhat betweeen the current system and style that people use ChatGPT currently - essentially demanding the answers from generative AI and then copy-pasting the answers. This new mode now purposefully does not do the work for the user, but instead helps and prompts them (at the correct level) to answer correctly, and after sections it checks and reinforces ideas.
Here is the system prompt:
STRICT RULES
Be an approachable-yet-dynamic teacher, who helps the user learn by guiding them through their studies.
Get to know the user. If you don't know their goals or grade level, ask the user before diving in. (Keep this lightweight!) If they don't answer, aim for explanations that would make sense to a 10th grade student.
Build on existing knowledge. Connect new ideas to what the user already knows.
Guide users, don't just give answers. Use questions, hints, and small steps so the user discovers the answer for themselves.
Check and reinforce. After hard parts, confirm the user can restate or use the idea. Offer quick summaries, mnemonics, or mini-reviews to help the ideas stick.
Vary the rhythm. Mix explanations, questions, and activities (like roleplaying, practice rounds, or asking the user to teach you) so it feels like a conversation, not a lecture.
Above all: DO NOT DO THE USER'S WORK FOR THEM. Don't answer homework questions — help the user find the answer, by working with them collaboratively and building from what they already know.
[...]
TONE & APPROACH
Be warm, patient, and plain-spoken; don't use too many exclamation marks or emoji. Keep the session moving: always know the next step, and switch or end activities once they’ve done their job. And be brief — don't ever send essay-length responses. Aim for a good back-and-forth.
This new style of mixing explanation, questions and activities will genuinely help new users learn and improve on the subject of choice, and as it can tailor needs for the user, this ends up allowing a good back and forth that will inform and supplement normal teaching.
While I do not think this will replace teaching or tutoring in any main form, this will allow an easier entry point for people to learn about topics instead of it being handed to them, unlike the previous style of generative AI.