Building a Kubernetes Egress Controller for Fine-Grained Outbound Traffic Control

Ajin JosephAjin Joseph
5 min read

Dynamic Egress Controller for Kubernetes

Overview

This blog covers the journey of implementing a dynamic egress controller in Kubernetes to securely manage outbound traffic from pods. The solution is built using Python and Kopf (Kubernetes Operator Framework), enabling hostname-based egress rules and cross-namespace communication.

Problem Statement

In Kubernetes, controlling outbound traffic (egress) is crucial for enforcing security policies. However, managing static IPs for external services is complex due to DNS changes, redirections, and lack of native support for hostname-based policies in NetworkPolicies.

Additionally, certain applications need to talk to other namespaces (e.g., backup systems or shared service layers), requiring secure cross-namespace communication.

Goals

  • Support dynamic hostname resolution.
  • Automatically generate and manage NetworkPolicy resources.
  • Support static IPs in egress rules.
  • Enable secure DNS resolution using kube-dns.
  • Allow traffic to specific Kubernetes namespaces.

Custom Resource: EgressPolicy

We introduced a new custom resource named EgressPolicy with the following structure:

apiVersion: platform.example.com/v1alpha1
kind: EgressPolicy
metadata:
  name: allow-external-services
  namespace: app-namespace
spec:
  refreshIntervalSeconds: 60
  destinations:
    - hostname: api.github.com
    - hostname: us1.datadoghq.com
    - ip: 8.8.8.8
  namespaces:
    - backup-tools
    - shared-services
  • hostname: Domains to be resolved periodically.
  • ip: Direct IP addresses to allow.
  • namespaces: Additional namespaces to allow egress access.

Architecture

+--------------------+
|   EgressPolicy CRD |
+--------------------+
          |
          v
+--------------------+       +--------------------+
| Kopf Python        |<----->| Kubernetes API     |
| Controller         |       +--------------------+
+--------------------+
          |
          v
+----------------------------+
| Generated NetworkPolicies |
+----------------------------+

Features

  • Written in Python using Kopf.
  • Listens to create, update, and delete events on EgressPolicy.
  • Maintains a TTL-based cache for resolved IPs.
  • Generates NetworkPolicy resources dynamically.
  • Adds kube-dns access for DNS resolution (UDP/TCP on port 53).
  • Allows specifying target namespaces for cross-namespace access.
  • Supports periodic refresh via refreshIntervalSeconds.

Python Code

import kopf
import kubernetes
import socket
import time
import ipaddress
import json
import threading
import http.server
from datetime import datetime, timedelta
from kubernetes.client.rest import ApiException

API_GROUP = "platform.example.com"
API_VERSION = "v1alpha1"
PLURAL = "egresspolicies"
TTL_DAYS = 7

# Load in-cluster config
kubernetes.config.load_incluster_config()
api = kubernetes.client.NetworkingV1Api()
core_api = kubernetes.client.CoreV1Api()

# Cache to track cumulative IPs per policy with timestamps
policy_ip_cache = {}  # key: namespace/name -> {ip: timestamp}

# Healthz HTTP server for readiness probe
def start_health_server():
    class HealthHandler(http.server.BaseHTTPRequestHandler):
        def do_GET(self):
            if self.path == "/healthz":
                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"OK")
            else:
                self.send_response(404)
                self.end_headers()
    server = http.server.HTTPServer(('0.0.0.0', 8080), HealthHandler)
    threading.Thread(target=server.serve_forever, daemon=True).start()

start_health_server()

def resolve_ips(hostname):
    try:
        return list({
            ip[4][0] for ip in socket.getaddrinfo(hostname, None)
            if ipaddress.ip_address(ip[4][0]).version == 4
        })
    except Exception as e:
        print(f"[ERROR] Failed to resolve {hostname}: {e}")
        return []

def build_network_policy(namespace, name, ip_list, ns_list):
    dns_ip = "172.16.0.10"
    ip_list.append(dns_ip)

    dns_rule = kubernetes.client.V1NetworkPolicyEgressRule(
        to=[kubernetes.client.V1NetworkPolicyPeer(
            namespace_selector=kubernetes.client.V1LabelSelector(
                match_labels={"kubernetes.io/metadata.name": "kube-system"}
            )
        )],
        ports=[
            kubernetes.client.V1NetworkPolicyPort(protocol="UDP", port=53),
            kubernetes.client.V1NetworkPolicyPort(protocol="TCP", port=53),
        ]
    )

    ip_rule = kubernetes.client.V1NetworkPolicyEgressRule(
        to=[
            kubernetes.client.V1NetworkPolicyPeer(
                ip_block=kubernetes.client.V1IPBlock(cidr=f"{ip}/32")
            ) for ip in ip_list
        ],
        ports=[
            kubernetes.client.V1NetworkPolicyPort(protocol="TCP", port=443)
        ]
    )

    namespace_rules = [
        kubernetes.client.V1NetworkPolicyEgressRule(
            to=[
                kubernetes.client.V1NetworkPolicyPeer(
                    namespace_selector=kubernetes.client.V1LabelSelector(
                        match_labels={"kubernetes.io/metadata.name": ns}
                    )
                )
            ]
        ) for ns in ns_list
    ]

    return kubernetes.client.V1NetworkPolicy(
        metadata=kubernetes.client.V1ObjectMeta(
            name=f"egress-{name}",
            namespace=namespace
        ),
        spec=kubernetes.client.V1NetworkPolicySpec(
            pod_selector=kubernetes.client.V1LabelSelector(match_labels={}),
            policy_types=["Egress"],
            egress=[dns_rule, ip_rule] + namespace_rules
        )
    )

def ensure_default_deny(namespace):
    deny_policy = kubernetes.client.V1NetworkPolicy(
        metadata=kubernetes.client.V1ObjectMeta(
            name="default-deny-egress",
            namespace=namespace
        ),
        spec=kubernetes.client.V1NetworkPolicySpec(
            pod_selector=kubernetes.client.V1LabelSelector(match_labels={}),
            policy_types=["Egress"],
            egress=[]
        )
    )
    try:
        api.create_namespaced_network_policy(namespace, deny_policy)
        print(f"[INFO] Created default deny egress policy in namespace '{namespace}'")
    except ApiException as e:
        if e.status == 409:
            print(f"[INFO] Default deny egress policy already exists in '{namespace}'")
        else:
            print(f"[ERROR] Failed to create default deny policy in '{namespace}': {e}")

@kopf.on.resume(f"{API_GROUP}/{API_VERSION}", PLURAL)
@kopf.on.update(f"{API_GROUP}/{API_VERSION}", PLURAL)
@kopf.on.create(f"{API_GROUP}/{API_VERSION}", PLURAL)
def start_timer(spec, name, namespace, logger, **kwargs):
    interval = spec.get("refreshIntervalSeconds", 600)
    logger.info(f"Starting or updating refresh timer for '{name}' every {interval} seconds.")

    @kopf.timer(f"{API_GROUP}/{API_VERSION}", PLURAL, id=f"refresh-{name}", interval=interval, sharp=True)
    def dynamic_timer(spec, name, namespace, logger, **_):
        destinations = spec.get('destinations', [])
        ns_targets = spec.get('namespaces', [])
        logger.info(f"Resolving destinations for EgressPolicy '{name}' in '{namespace}': {destinations}, namespaces: {ns_targets}")

        ensure_default_deny(namespace)

        key = f"{namespace}/{name}"
        now = datetime.utcnow()
        newly_resolved = {}

        for dest in destinations:
            if 'hostname' in dest:
                hostname = dest['hostname']
                ips = resolve_ips(hostname)
                for ip in ips:
                    newly_resolved[ip] = now
                logger.info(f"Resolved hostname {hostname} to {ips}")
            elif 'ip' in dest:
                ip = dest['ip']
                try:
                    ipaddress.ip_address(ip)
                    newly_resolved[ip] = now
                    logger.info(f"Using provided IP {ip} directly")
                except ValueError:
                    logger.warning(f"Invalid IP address skipped: {ip}")

        cached_ips = policy_ip_cache.get(key, {})
        cached_ips.update(newly_resolved)
        pruned_ips = {
            ip: ts for ip, ts in cached_ips.items()
            if now - ts <= timedelta(days=TTL_DAYS)
        }
        policy_ip_cache[key] = pruned_ips

        resolved_ips = list(pruned_ips.keys())
        policy = build_network_policy(namespace, name, resolved_ips, ns_targets)
        try:
            api.create_namespaced_network_policy(namespace, policy)
            logger.info(f"Created NetworkPolicy 'egress-{name}' with IPs: {resolved_ips} and namespaces: {ns_targets}")
        except ApiException as e:
            if e.status == 409:
                api.replace_namespaced_network_policy(f"egress-{name}", namespace, policy)
                logger.info(f"Updated existing NetworkPolicy 'egress-{name}'")
            else:
                logger.error(f"Failed to create/update NetworkPolicy: {e}")

        logger.info(json.dumps({
            "event": "egress-policy-update",
            "policy": name,
            "namespace": namespace,
            "ips": resolved_ips,
            "namespaces": ns_targets
        }))

@kopf.on.delete(f"{API_GROUP}/{API_VERSION}", PLURAL)
def cleanup_policy(name, namespace, logger, **kwargs):
    key = f"{namespace}/{name}"
    if key in policy_ip_cache:
        del policy_ip_cache[key]

    try:
        api.delete_namespaced_network_policy(f"egress-{name}", namespace)
        logger.info(f"Deleted NetworkPolicy 'egress-{name}' on CR deletion")
    except ApiException as e:
        if e.status != 404:
            logger.error(f"Failed to delete NetworkPolicy: {e}")

Best Practices

  • Use one EgressPolicy per use-case for better management.
  • Avoid mixing unrelated destinations in one policy.
  • Periodically review resolved IPs and refresh interval.
  • Use namespaces only when cross-namespace access is needed.
  • Keep shared or base controller resources minimal for network security.

โš ๏ธ Challenges I Faced

  • Dynamic DNS Resolution: FQDNs arenโ€™t natively supported in Kubernetes network policies, so I implemented a TTL-based IP resolution cache (7-day expiry).
  • Policy Overlap: Multiple CRs for the same namespace could lead to duplicated rules โ€” the controller merges them smartly.
  • DNS Egress: Every namespace is granted access to UDP/TCP 53 to kube-dns.kube-system to resolve FQDNs.
  • Controller Failures: Ensured graceful handling with retry and conflict resolution logic.

๐Ÿš€ Outcome

  • โœ… Every tenant namespace now has isolated egress boundaries
  • โœ… DNS-based destinations are dynamically managed
  • โœ… Platform team controls access centrally by applying CRs
  • โœ… Reduced risk of accidental data leaks

๐Ÿ“ฆ Tech Stack

  • Python + Kopf
  • Kubernetes Custom Resource Definitions (CRD)
  • Calico CNI
  • DNS Resolver + TTL cache (Python socket + dnspython)
  • Azure Kubernetes Service (AKS)

๐Ÿ“˜ Whatโ€™s Next?

  • Add refreshIntervalSeconds to support per-policy re-resolution of DNS
  • Integrate policy violations alerting using Datadog
  • Optional integration with OPA/Gatekeeper for audit-level enforcement

๐Ÿง  Learnings

  • Writing custom controllers gives immense flexibility in Kubernetes
  • Egress is often overlooked in multi-tenant designs โ€” donโ€™t ignore it
  • Kopf is fantastic for quick Python-based operators without boilerplate

๐Ÿ™‹โ€โ™‚๏ธ About Me

I'm a Cloud Solution Architect working with Azure, AKS, and secure SaaS platforms. Follow me for more deep dives on Kubernetes, cloud architecture, and DevOps.

Connect: LinkedIn | Blog: ajin-cloudjourney.hashnode.dev

#Kubernetes #AKS #Egress #DevSecOps #Python #Kopf #Calico #CloudArchitecture #MultiTenant

0
Subscribe to my newsletter

Read articles from Ajin Joseph directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Ajin Joseph
Ajin Joseph