Files
blackroad-operating-system/scripts/cloudflare/sync_dns.py
Claude 5305790504 Add Cloudflare DNS migration infrastructure and automation
Implements Action #1 from NEXT_ACTIONS_ALEXA.md: Complete Cloudflare DNS
migration tooling and documentation for BlackRoad domains.

New Features:
- Automated DNS sync script (scripts/cloudflare/sync_dns.py)
  * Syncs DNS records from ops/domains.yaml to Cloudflare API
  * Supports dry-run mode for safe previewing
  * Handles CNAME, A, MX, and TXT records
  * Colored output for easy scanning

- DNS validation script (scripts/cloudflare/validate_dns.py)
  * Verifies DNS resolution and propagation
  * Checks SSL certificate validity
  * Tests HTTP/HTTPS accessibility and redirects
  * Supports checking multiple domains

- GitHub Actions workflow (.github/workflows/sync-cloudflare-dns.yml)
  * Automatically syncs DNS on ops/domains.yaml changes
  * Includes dry-run validation step
  * Manual trigger support via workflow_dispatch

Documentation:
- Comprehensive migration guide (docs/CLOUDFLARE_MIGRATION_GUIDE.md)
  * Step-by-step instructions for migrating from GoDaddy to Cloudflare
  * Covers all 10 BlackRoad domains
  * Includes troubleshooting and rollback procedures
  * Estimated 2-4 hours total migration time

- Scripts README (scripts/cloudflare/README.md)
  * Installation and usage instructions
  * API token setup guide
  * Integration examples for CI/CD

Configuration:
- Updated backend/.env.example with Cloudflare variables:
  * CLOUDFLARE_API_TOKEN
  * CLOUDFLARE_ACCOUNT_ID
  * CLOUDFLARE_ZONE_ID
  * CLOUDFLARE_EMAIL

- Python dependencies (scripts/cloudflare/requirements.txt)
  * requests, PyYAML, dnspython, colorama

Benefits of Cloudflare Migration:
- Free SSL certificates with automatic renewal
- Global CDN for faster page loads
- DDoS protection and Web Application Firewall
- Better DNS performance via anycast network
- Advanced features (Workers, Zero Trust, edge functions)
- Superior analytics and security insights

Next Steps:
1. Set up Cloudflare account and get API token
2. Add blackroad.systems domain to Cloudflare
3. Update nameservers at GoDaddy
4. Run sync script to configure DNS records
5. Verify migration with validation script
6. Repeat for remaining 9 domains

Related:
- Implements infra/cloudflare/CLOUDFLARE_DNS_BLUEPRINT.md
- Addresses ops/DOMAIN_FIX_GUIDE.md recommendations
- Part of Phase 1 Q1 infrastructure foundation

Files Added:
- scripts/cloudflare/sync_dns.py (352 lines)
- scripts/cloudflare/validate_dns.py (387 lines)
- scripts/cloudflare/README.md
- scripts/cloudflare/requirements.txt
- docs/CLOUDFLARE_MIGRATION_GUIDE.md (867 lines)
- .github/workflows/sync-cloudflare-dns.yml

Files Modified:
- backend/.env.example (added Cloudflare env vars)
2025-11-18 02:43:48 +00:00

360 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Sync DNS records from ops/domains.yaml to Cloudflare
This script automates the migration and synchronization of DNS records
from the domain configuration file to Cloudflare. It handles:
- Creating new DNS records
- Updating existing DNS records
- Detecting and reporting configuration drift
Usage:
export CF_API_TOKEN="your-cloudflare-api-token"
export CF_ZONE_ID="your-zone-id" # For blackroad.systems
python scripts/cloudflare/sync_dns.py
Or with command-line arguments:
python scripts/cloudflare/sync_dns.py --zone-id <zone_id> --token <token>
Requirements:
pip install requests pyyaml colorama
"""
import os
import sys
import argparse
import yaml
import requests
from typing import Dict, List, Optional
from datetime import datetime
try:
from colorama import init, Fore, Style
init()
HAS_COLOR = True
except ImportError:
HAS_COLOR = False
# Fallback to no colors
class Fore:
GREEN = RED = YELLOW = CYAN = RESET = ""
class Style:
BRIGHT = RESET_ALL = ""
# Configuration
CF_API_BASE = "https://api.cloudflare.com/client/v4"
DOMAINS_FILE = "ops/domains.yaml"
def print_status(message: str, status: str = "info"):
"""Print colored status messages"""
if status == "success":
prefix = f"{Fore.GREEN}{Fore.RESET}"
elif status == "error":
prefix = f"{Fore.RED}{Fore.RESET}"
elif status == "warning":
prefix = f"{Fore.YELLOW}{Fore.RESET}"
else:
prefix = f"{Fore.CYAN}{Fore.RESET}"
print(f"{prefix} {message}")
def get_api_headers(api_token: str) -> Dict[str, str]:
"""Get headers for Cloudflare API requests"""
return {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
def load_domains() -> Dict:
"""Load domain configuration from ops/domains.yaml"""
try:
with open(DOMAINS_FILE) as f:
return yaml.safe_load(f)
except FileNotFoundError:
print_status(f"Error: {DOMAINS_FILE} not found", "error")
sys.exit(1)
except yaml.YAMLError as e:
print_status(f"Error parsing {DOMAINS_FILE}: {e}", "error")
sys.exit(1)
def get_existing_records(zone_id: str, api_token: str) -> List[Dict]:
"""Fetch all DNS records for a zone"""
url = f"{CF_API_BASE}/zones/{zone_id}/dns_records"
headers = get_api_headers(api_token)
all_records = []
page = 1
per_page = 100
while True:
params = {"page": page, "per_page": per_page}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
print_status(f"Error fetching DNS records: {response.text}", "error")
sys.exit(1)
data = response.json()
if not data.get("success"):
print_status(f"API error: {data.get('errors')}", "error")
sys.exit(1)
records = data.get("result", [])
all_records.extend(records)
# Check if there are more pages
result_info = data.get("result_info", {})
if page * per_page >= result_info.get("total_count", 0):
break
page += 1
return all_records
def create_dns_record(zone_id: str, api_token: str, record: Dict) -> Dict:
"""Create a DNS record"""
url = f"{CF_API_BASE}/zones/{zone_id}/dns_records"
headers = get_api_headers(api_token)
response = requests.post(url, headers=headers, json=record)
if response.status_code not in [200, 201]:
print_status(f"Error creating DNS record: {response.text}", "error")
return None
data = response.json()
if not data.get("success"):
print_status(f"API error: {data.get('errors')}", "error")
return None
return data.get("result")
def update_dns_record(zone_id: str, api_token: str, record_id: str, record: Dict) -> Dict:
"""Update a DNS record"""
url = f"{CF_API_BASE}/zones/{zone_id}/dns_records/{record_id}"
headers = get_api_headers(api_token)
response = requests.put(url, headers=headers, json=record)
if response.status_code != 200:
print_status(f"Error updating DNS record: {response.text}", "error")
return None
data = response.json()
if not data.get("success"):
print_status(f"API error: {data.get('errors')}", "error")
return None
return data.get("result")
def normalize_record_name(name: str, zone_name: str) -> str:
"""Normalize record name for comparison
Cloudflare returns full domain names (e.g., 'blackroad.systems' or 'www.blackroad.systems')
while config may use '@' for apex or just subdomain names.
"""
if name == "@":
return zone_name
elif not name.endswith(zone_name):
return f"{name}.{zone_name}"
return name
def records_match(config_record: Dict, cf_record: Dict, zone_name: str) -> bool:
"""Check if a config record matches a Cloudflare record"""
config_name = normalize_record_name(config_record.get("name", ""), zone_name)
cf_name = cf_record.get("name", "")
return (
config_record.get("type") == cf_record.get("type") and
config_name == cf_name and
config_record.get("content") == cf_record.get("content")
)
def sync_records(zone_id: str, api_token: str, zone_name: str, dry_run: bool = False):
"""Sync DNS records from domains.yaml to Cloudflare"""
print_status(f"Starting DNS sync for zone: {zone_name}")
print_status(f"Zone ID: {zone_id}")
if dry_run:
print_status("DRY RUN MODE - No changes will be made", "warning")
# Load configuration
config = load_domains()
# Get existing records from Cloudflare
print_status("Fetching existing DNS records from Cloudflare...")
existing = get_existing_records(zone_id, api_token)
print_status(f"Found {len(existing)} existing DNS records")
# Build index of existing records
existing_index = {}
for record in existing:
key = f"{record['type']}:{record['name']}"
existing_index[key] = record
# Process domains from config
created = 0
updated = 0
skipped = 0
errors = 0
for domain in config.get("domains", []):
# Only process domains configured for Cloudflare DNS mode
if domain.get("provider") != "cloudflare" or domain.get("mode") != "dns":
continue
# Skip if no record config
if "record" not in domain:
print_status(f"Skipping {domain.get('name')}: No record configuration", "warning")
continue
# Extract domain name (handle both root and subdomain)
domain_name = domain.get("name", "")
# Build record data
record_config = domain["record"]
record_type = record_config.get("type", "CNAME")
record_value = record_config.get("value", "")
# Determine record name for Cloudflare
# For root domains matching zone name, use "@"
if domain_name == zone_name:
record_name = "@"
else:
record_name = domain_name
record_data = {
"type": record_type,
"name": record_name,
"content": record_value,
"ttl": record_config.get("ttl", 1), # 1 = Auto
"proxied": record_config.get("proxied", True)
}
# For MX records, add priority
if record_type == "MX":
record_data["priority"] = record_config.get("priority", 10)
# Build key for lookup
full_name = normalize_record_name(record_name, zone_name)
key = f"{record_type}:{full_name}"
# Check if record exists
if key in existing_index:
existing_record = existing_index[key]
# Check if update is needed
needs_update = (
existing_record.get("content") != record_value or
existing_record.get("proxied") != record_data.get("proxied")
)
if needs_update:
print_status(f"Updating: {key} -> {record_value}", "warning")
if not dry_run:
result = update_dns_record(zone_id, api_token, existing_record["id"], record_data)
if result:
updated += 1
print_status(f" Updated successfully", "success")
else:
errors += 1
else:
print_status(f" [DRY RUN] Would update", "info")
updated += 1
else:
print_status(f"Unchanged: {key}", "info")
skipped += 1
else:
# Create new record
print_status(f"Creating: {key} -> {record_value}", "warning")
if not dry_run:
result = create_dns_record(zone_id, api_token, record_data)
if result:
created += 1
print_status(f" Created successfully", "success")
else:
errors += 1
else:
print_status(f" [DRY RUN] Would create", "info")
created += 1
# Summary
print("\n" + "="*60)
print_status("DNS Sync Complete!", "success")
print("="*60)
print(f" {Fore.GREEN}Created:{Fore.RESET} {created}")
print(f" {Fore.YELLOW}Updated:{Fore.RESET} {updated}")
print(f" {Fore.CYAN}Unchanged:{Fore.RESET} {skipped}")
print(f" {Fore.RED}Errors:{Fore.RESET} {errors}")
print("="*60)
if dry_run:
print_status("This was a DRY RUN - no actual changes were made", "warning")
print_status("Run without --dry-run to apply changes", "info")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Sync DNS records from ops/domains.yaml to Cloudflare"
)
parser.add_argument(
"--token",
help="Cloudflare API token (or set CF_API_TOKEN env var)"
)
parser.add_argument(
"--zone-id",
help="Cloudflare zone ID (or set CF_ZONE_ID env var)"
)
parser.add_argument(
"--zone-name",
default="blackroad.systems",
help="Zone name (default: blackroad.systems)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be changed without making actual changes"
)
args = parser.parse_args()
# Get credentials
api_token = args.token or os.getenv("CF_API_TOKEN")
zone_id = args.zone_id or os.getenv("CF_ZONE_ID")
if not api_token:
print_status("Error: CF_API_TOKEN environment variable or --token argument required", "error")
print_status("Get your token at: https://dash.cloudflare.com/profile/api-tokens", "info")
sys.exit(1)
if not zone_id:
print_status("Error: CF_ZONE_ID environment variable or --zone-id argument required", "error")
print_status("Find your zone ID in the Cloudflare dashboard", "info")
sys.exit(1)
# Run sync
try:
sync_records(zone_id, api_token, args.zone_name, dry_run=args.dry_run)
except KeyboardInterrupt:
print("\n")
print_status("Interrupted by user", "warning")
sys.exit(1)
except Exception as e:
print_status(f"Unexpected error: {e}", "error")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()