Manual VPC Flow Log analysis doesn’t scale. Production environments generate millions of entries. Connecting related events across time windows by hand takes hours. Raw IP addresses have no organizational context without separate lookups. I built investiGATOR to address all of that in a single tool.
The repo is at github.com/sam-fakhreddine/investiGATOR.
What it does
Twelve analysis types covering the most common threat patterns:
- SSH inbound traffic — brute force detection against your instance IPs
- External inbound and outbound — reconnaissance and potential exfiltration
- Top external flows by volume
- Sensitive port access — RDP (3389), SQL Server (1433), MySQL (3306), PostgreSQL (5432), MongoDB (27017), Redis (6379), Elasticsearch (9200)
- Overall traffic summary by protocol and action
- Rejected traffic patterns — security group effectiveness and blocked connection attempts
- Lateral movement indicators
Auto-discovery pulls EC2 instance IPs, VPC CIDR blocks, and CloudWatch log group names from your account, so you don’t have to provide them manually. WHOIS lookups run in batch against external IPs, giving you organization context for every external source or destination without a separate tool.
Architecture
ARCHITECTURE = {
'cli_interface': {
'argument_parser': 'Command-line argument handling',
'configuration_builder': 'Config generation from args',
'analysis_runner': 'Orchestrates analysis execution'
},
'web_interface': {
'fastapi_app': 'Modern web API with FastAPI',
'analysis_service': 'Web-based analysis orchestration',
'result_processors': 'Structured data for web display'
},
'analysis_engine': {
'traffic_analyzers': '12 specialized analysis modules',
'protocol_utils': 'Protocol name resolution',
'whois_integration': 'External IP organization lookup'
},
'aws_integration': {
'instance_discovery': 'EC2 instance information retrieval',
'log_group_finder': 'VPC Flow Log group auto-discovery',
'log_downloader': 'CloudWatch Logs integration'
}
}
Each analysis type is a standalone analyzer class. Here’s the external inbound analyzer as an example:
class ExternalInboundAnalyzer(BaseAnalyzer):
@staticmethod
def analyze(logs: List[Dict[str, Any]], config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Analyze external inbound traffic with WHOIS context"""
results = defaultdict(int)
external_ips = set()
for log in logs:
if (log.get("dstaddr") in config["instance_ips"] and
not log.get("srcaddr", "").startswith(config["vpc_cidr_prefix"])):
srcaddr = log.get("srcaddr", "unknown")
key = (srcaddr, log.get("action", "unknown"))
results[key] += 1
external_ips.add(srcaddr)
whois_cache = {ip: get_whois_info(ip)["org"] for ip in external_ips}
return format_structured_results(results, whois_cache)
WHOIS enrichment
The batch WHOIS lookup runs only against external IPs — anything outside the VPC CIDR — and caches results to minimize API calls.
def _batch_whois_lookup(ips: Set[str], vpc_cidr_prefix: str) -> Dict[str, str]:
"""Batch WHOIS lookup for external IPs to reduce API calls"""
external_ips = {ip for ip in ips if is_external_ip(ip, vpc_cidr_prefix)}
return {ip: get_whois_info(ip)["org"] for ip in external_ips}
The SSH brute force output with WHOIS context looks like this:
Source IP Action Organization Count
-----------------------------------------------------------------
203.0.113.1 REJECT Malicious Hosting 156
198.51.100.2 REJECT Unknown ISP 89
192.0.2.3 ACCEPT Corporate VPN 12
That context matters in triage. “Unknown ISP” with 156 rejected SSH attempts is a different response than “Corporate VPN” with 12 accepts.
Auto-discovery
Instance info is fetched from EC2 automatically:
def get_instance_info(instance_id, region, profile):
ec2 = boto3.client('ec2', region_name=region)
response = ec2.describe_instances(InstanceIds=[instance_id])
instance = response['Reservations'][0]['Instances'][0]
return {
'private_ips': [instance['PrivateIpAddress']],
'primary_ip': instance['PrivateIpAddress'],
'vpc_id': instance['VpcId'],
'vpc_cidr_prefix': get_vpc_cidr_prefix(instance['VpcId']),
'region': region
}
You pass an instance ID. Everything else is resolved from there.
Web interface
FastAPI serves a dashboard with calendar-based time range selection, AWS profile switching for multi-account environments, and JSON-formatted results with syntax highlighting.
@app.post("/api/analyze")
async def analyze_logs(
profile: str = Form(...),
instance_id: str = Form(...),
region: Optional[str] = Form(None),
start_time: str = Form("24h"),
end_time: str = Form("now"),
analysis: str = Form("all")
):
service = AnalysisService()
return await service.run_analysis(request_data)
Installation and usage
git clone https://github.com/sam-fakhreddine/investiGATOR.git
cd investiGATOR
poetry install
CLI:
# SSH brute force investigation, last 24 hours
poetry run vpc-flow-investigator --instance-id i-0123456789abcdef0 --analysis ssh-inbound --start-time 24h
# Outbound external traffic, last week
poetry run vpc-flow-investigator --instance-id i-0123456789abcdef0 --analysis external-outbound --start-time 1w
# Sensitive port access, production profile, specific time range
poetry run vpc-flow-investigator --instance-id i-0123456789abcdef0 --profile production --analysis sensitive-ports --start-time "2024-01-01T00:00:00" --end-time "2024-01-02T00:00:00"
Web interface:
poetry run vpc-flow-web
# open http://localhost:8000
investiGATOR is most useful for incident response and threat hunting where you need answers fast — SSH brute force sources, data exfiltration candidates, lateral movement patterns. For continuous monitoring, wire the analysis types into your security automation pipeline using the CLI interface.