Engineer

investiGATOR: VPC Flow Log analysis that doesn't require manual correlation

Manual VPC Flow Log analysis doesn’t scale. Production environments generate millions of entries. Connecting related events across time windows by hand takes hours. Raw IP addresses have no organizational context without separate lookups. I built investiGATOR to address all of that in a single tool.

The repo is at github.com/sam-fakhreddine/investiGATOR.

What it does

Twelve analysis types covering the most common threat patterns:

Auto-discovery pulls EC2 instance IPs, VPC CIDR blocks, and CloudWatch log group names from your account, so you don’t have to provide them manually. WHOIS lookups run in batch against external IPs, giving you organization context for every external source or destination without a separate tool.

Architecture

ARCHITECTURE = {
    'cli_interface': {
        'argument_parser': 'Command-line argument handling',
        'configuration_builder': 'Config generation from args',
        'analysis_runner': 'Orchestrates analysis execution'
    },
    'web_interface': {
        'fastapi_app': 'Modern web API with FastAPI',
        'analysis_service': 'Web-based analysis orchestration',
        'result_processors': 'Structured data for web display'
    },
    'analysis_engine': {
        'traffic_analyzers': '12 specialized analysis modules',
        'protocol_utils': 'Protocol name resolution',
        'whois_integration': 'External IP organization lookup'
    },
    'aws_integration': {
        'instance_discovery': 'EC2 instance information retrieval',
        'log_group_finder': 'VPC Flow Log group auto-discovery',
        'log_downloader': 'CloudWatch Logs integration'
    }
}

Each analysis type is a standalone analyzer class. Here’s the external inbound analyzer as an example:

class ExternalInboundAnalyzer(BaseAnalyzer):
    @staticmethod
    def analyze(logs: List[Dict[str, Any]], config: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Analyze external inbound traffic with WHOIS context"""
        results = defaultdict(int)
        external_ips = set()

        for log in logs:
            if (log.get("dstaddr") in config["instance_ips"] and
                not log.get("srcaddr", "").startswith(config["vpc_cidr_prefix"])):
                srcaddr = log.get("srcaddr", "unknown")
                key = (srcaddr, log.get("action", "unknown"))
                results[key] += 1
                external_ips.add(srcaddr)

        whois_cache = {ip: get_whois_info(ip)["org"] for ip in external_ips}

        return format_structured_results(results, whois_cache)

WHOIS enrichment

The batch WHOIS lookup runs only against external IPs — anything outside the VPC CIDR — and caches results to minimize API calls.

def _batch_whois_lookup(ips: Set[str], vpc_cidr_prefix: str) -> Dict[str, str]:
    """Batch WHOIS lookup for external IPs to reduce API calls"""
    external_ips = {ip for ip in ips if is_external_ip(ip, vpc_cidr_prefix)}
    return {ip: get_whois_info(ip)["org"] for ip in external_ips}

The SSH brute force output with WHOIS context looks like this:

Source IP            Action     Organization          Count
-----------------------------------------------------------------
203.0.113.1         REJECT     Malicious Hosting     156
198.51.100.2        REJECT     Unknown ISP           89
192.0.2.3           ACCEPT     Corporate VPN         12

That context matters in triage. “Unknown ISP” with 156 rejected SSH attempts is a different response than “Corporate VPN” with 12 accepts.

Auto-discovery

Instance info is fetched from EC2 automatically:

def get_instance_info(instance_id, region, profile):
    ec2 = boto3.client('ec2', region_name=region)

    response = ec2.describe_instances(InstanceIds=[instance_id])
    instance = response['Reservations'][0]['Instances'][0]

    return {
        'private_ips': [instance['PrivateIpAddress']],
        'primary_ip': instance['PrivateIpAddress'],
        'vpc_id': instance['VpcId'],
        'vpc_cidr_prefix': get_vpc_cidr_prefix(instance['VpcId']),
        'region': region
    }

You pass an instance ID. Everything else is resolved from there.

Web interface

FastAPI serves a dashboard with calendar-based time range selection, AWS profile switching for multi-account environments, and JSON-formatted results with syntax highlighting.

@app.post("/api/analyze")
async def analyze_logs(
    profile: str = Form(...),
    instance_id: str = Form(...),
    region: Optional[str] = Form(None),
    start_time: str = Form("24h"),
    end_time: str = Form("now"),
    analysis: str = Form("all")
):
    service = AnalysisService()
    return await service.run_analysis(request_data)

Installation and usage

git clone https://github.com/sam-fakhreddine/investiGATOR.git
cd investiGATOR
poetry install

CLI:

# SSH brute force investigation, last 24 hours
poetry run vpc-flow-investigator --instance-id i-0123456789abcdef0 --analysis ssh-inbound --start-time 24h

# Outbound external traffic, last week
poetry run vpc-flow-investigator --instance-id i-0123456789abcdef0 --analysis external-outbound --start-time 1w

# Sensitive port access, production profile, specific time range
poetry run vpc-flow-investigator --instance-id i-0123456789abcdef0 --profile production --analysis sensitive-ports --start-time "2024-01-01T00:00:00" --end-time "2024-01-02T00:00:00"

Web interface:

poetry run vpc-flow-web
# open http://localhost:8000

investiGATOR is most useful for incident response and threat hunting where you need answers fast — SSH brute force sources, data exfiltration candidates, lateral movement patterns. For continuous monitoring, wire the analysis types into your security automation pipeline using the CLI interface.