Building a Kubernetes Egress Controller for Fine-Grained Outbound Traffic Control

Dynamic Egress Controller for Kubernetes
Overview
This blog covers the journey of implementing a dynamic egress controller in Kubernetes to securely manage outbound traffic from pods. The solution is built using Python and Kopf (Kubernetes Operator Framework), enabling hostname-based egress rules and cross-namespace communication.
Problem Statement
In Kubernetes, controlling outbound traffic (egress) is crucial for enforcing security policies. However, managing static IPs for external services is complex due to DNS changes, redirections, and lack of native support for hostname-based policies in NetworkPolicies.
Additionally, certain applications need to talk to other namespaces (e.g., backup systems or shared service layers), requiring secure cross-namespace communication.
Goals
- Support dynamic hostname resolution.
- Automatically generate and manage
NetworkPolicy
resources. - Support static IPs in egress rules.
- Enable secure DNS resolution using kube-dns.
- Allow traffic to specific Kubernetes namespaces.
Custom Resource: EgressPolicy
We introduced a new custom resource named EgressPolicy
with the following structure:
apiVersion: platform.example.com/v1alpha1
kind: EgressPolicy
metadata:
name: allow-external-services
namespace: app-namespace
spec:
refreshIntervalSeconds: 60
destinations:
- hostname: api.github.com
- hostname: us1.datadoghq.com
- ip: 8.8.8.8
namespaces:
- backup-tools
- shared-services
hostname
: Domains to be resolved periodically.ip
: Direct IP addresses to allow.namespaces
: Additional namespaces to allow egress access.
Architecture
+--------------------+
| EgressPolicy CRD |
+--------------------+
|
v
+--------------------+ +--------------------+
| Kopf Python |<----->| Kubernetes API |
| Controller | +--------------------+
+--------------------+
|
v
+----------------------------+
| Generated NetworkPolicies |
+----------------------------+
Features
- Written in Python using Kopf.
- Listens to
create
,update
, anddelete
events onEgressPolicy
. - Maintains a TTL-based cache for resolved IPs.
- Generates
NetworkPolicy
resources dynamically. - Adds kube-dns access for DNS resolution (UDP/TCP on port 53).
- Allows specifying target namespaces for cross-namespace access.
- Supports periodic refresh via
refreshIntervalSeconds
.
Python Code
import kopf
import kubernetes
import socket
import time
import ipaddress
import json
import threading
import http.server
from datetime import datetime, timedelta
from kubernetes.client.rest import ApiException
API_GROUP = "platform.example.com"
API_VERSION = "v1alpha1"
PLURAL = "egresspolicies"
TTL_DAYS = 7
# Load in-cluster config
kubernetes.config.load_incluster_config()
api = kubernetes.client.NetworkingV1Api()
core_api = kubernetes.client.CoreV1Api()
# Cache to track cumulative IPs per policy with timestamps
policy_ip_cache = {} # key: namespace/name -> {ip: timestamp}
# Healthz HTTP server for readiness probe
def start_health_server():
class HealthHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/healthz":
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
else:
self.send_response(404)
self.end_headers()
server = http.server.HTTPServer(('0.0.0.0', 8080), HealthHandler)
threading.Thread(target=server.serve_forever, daemon=True).start()
start_health_server()
def resolve_ips(hostname):
try:
return list({
ip[4][0] for ip in socket.getaddrinfo(hostname, None)
if ipaddress.ip_address(ip[4][0]).version == 4
})
except Exception as e:
print(f"[ERROR] Failed to resolve {hostname}: {e}")
return []
def build_network_policy(namespace, name, ip_list, ns_list):
dns_ip = "172.16.0.10"
ip_list.append(dns_ip)
dns_rule = kubernetes.client.V1NetworkPolicyEgressRule(
to=[kubernetes.client.V1NetworkPolicyPeer(
namespace_selector=kubernetes.client.V1LabelSelector(
match_labels={"kubernetes.io/metadata.name": "kube-system"}
)
)],
ports=[
kubernetes.client.V1NetworkPolicyPort(protocol="UDP", port=53),
kubernetes.client.V1NetworkPolicyPort(protocol="TCP", port=53),
]
)
ip_rule = kubernetes.client.V1NetworkPolicyEgressRule(
to=[
kubernetes.client.V1NetworkPolicyPeer(
ip_block=kubernetes.client.V1IPBlock(cidr=f"{ip}/32")
) for ip in ip_list
],
ports=[
kubernetes.client.V1NetworkPolicyPort(protocol="TCP", port=443)
]
)
namespace_rules = [
kubernetes.client.V1NetworkPolicyEgressRule(
to=[
kubernetes.client.V1NetworkPolicyPeer(
namespace_selector=kubernetes.client.V1LabelSelector(
match_labels={"kubernetes.io/metadata.name": ns}
)
)
]
) for ns in ns_list
]
return kubernetes.client.V1NetworkPolicy(
metadata=kubernetes.client.V1ObjectMeta(
name=f"egress-{name}",
namespace=namespace
),
spec=kubernetes.client.V1NetworkPolicySpec(
pod_selector=kubernetes.client.V1LabelSelector(match_labels={}),
policy_types=["Egress"],
egress=[dns_rule, ip_rule] + namespace_rules
)
)
def ensure_default_deny(namespace):
deny_policy = kubernetes.client.V1NetworkPolicy(
metadata=kubernetes.client.V1ObjectMeta(
name="default-deny-egress",
namespace=namespace
),
spec=kubernetes.client.V1NetworkPolicySpec(
pod_selector=kubernetes.client.V1LabelSelector(match_labels={}),
policy_types=["Egress"],
egress=[]
)
)
try:
api.create_namespaced_network_policy(namespace, deny_policy)
print(f"[INFO] Created default deny egress policy in namespace '{namespace}'")
except ApiException as e:
if e.status == 409:
print(f"[INFO] Default deny egress policy already exists in '{namespace}'")
else:
print(f"[ERROR] Failed to create default deny policy in '{namespace}': {e}")
@kopf.on.resume(f"{API_GROUP}/{API_VERSION}", PLURAL)
@kopf.on.update(f"{API_GROUP}/{API_VERSION}", PLURAL)
@kopf.on.create(f"{API_GROUP}/{API_VERSION}", PLURAL)
def start_timer(spec, name, namespace, logger, **kwargs):
interval = spec.get("refreshIntervalSeconds", 600)
logger.info(f"Starting or updating refresh timer for '{name}' every {interval} seconds.")
@kopf.timer(f"{API_GROUP}/{API_VERSION}", PLURAL, id=f"refresh-{name}", interval=interval, sharp=True)
def dynamic_timer(spec, name, namespace, logger, **_):
destinations = spec.get('destinations', [])
ns_targets = spec.get('namespaces', [])
logger.info(f"Resolving destinations for EgressPolicy '{name}' in '{namespace}': {destinations}, namespaces: {ns_targets}")
ensure_default_deny(namespace)
key = f"{namespace}/{name}"
now = datetime.utcnow()
newly_resolved = {}
for dest in destinations:
if 'hostname' in dest:
hostname = dest['hostname']
ips = resolve_ips(hostname)
for ip in ips:
newly_resolved[ip] = now
logger.info(f"Resolved hostname {hostname} to {ips}")
elif 'ip' in dest:
ip = dest['ip']
try:
ipaddress.ip_address(ip)
newly_resolved[ip] = now
logger.info(f"Using provided IP {ip} directly")
except ValueError:
logger.warning(f"Invalid IP address skipped: {ip}")
cached_ips = policy_ip_cache.get(key, {})
cached_ips.update(newly_resolved)
pruned_ips = {
ip: ts for ip, ts in cached_ips.items()
if now - ts <= timedelta(days=TTL_DAYS)
}
policy_ip_cache[key] = pruned_ips
resolved_ips = list(pruned_ips.keys())
policy = build_network_policy(namespace, name, resolved_ips, ns_targets)
try:
api.create_namespaced_network_policy(namespace, policy)
logger.info(f"Created NetworkPolicy 'egress-{name}' with IPs: {resolved_ips} and namespaces: {ns_targets}")
except ApiException as e:
if e.status == 409:
api.replace_namespaced_network_policy(f"egress-{name}", namespace, policy)
logger.info(f"Updated existing NetworkPolicy 'egress-{name}'")
else:
logger.error(f"Failed to create/update NetworkPolicy: {e}")
logger.info(json.dumps({
"event": "egress-policy-update",
"policy": name,
"namespace": namespace,
"ips": resolved_ips,
"namespaces": ns_targets
}))
@kopf.on.delete(f"{API_GROUP}/{API_VERSION}", PLURAL)
def cleanup_policy(name, namespace, logger, **kwargs):
key = f"{namespace}/{name}"
if key in policy_ip_cache:
del policy_ip_cache[key]
try:
api.delete_namespaced_network_policy(f"egress-{name}", namespace)
logger.info(f"Deleted NetworkPolicy 'egress-{name}' on CR deletion")
except ApiException as e:
if e.status != 404:
logger.error(f"Failed to delete NetworkPolicy: {e}")
Best Practices
- Use one
EgressPolicy
per use-case for better management. - Avoid mixing unrelated destinations in one policy.
- Periodically review resolved IPs and refresh interval.
- Use
namespaces
only when cross-namespace access is needed. - Keep shared or base controller resources minimal for network security.
โ ๏ธ Challenges I Faced
- Dynamic DNS Resolution: FQDNs arenโt natively supported in Kubernetes network policies, so I implemented a TTL-based IP resolution cache (7-day expiry).
- Policy Overlap: Multiple CRs for the same namespace could lead to duplicated rules โ the controller merges them smartly.
- DNS Egress: Every namespace is granted access to UDP/TCP 53 to
kube-dns.kube-system
to resolve FQDNs. - Controller Failures: Ensured graceful handling with retry and conflict resolution logic.
๐ Outcome
- โ Every tenant namespace now has isolated egress boundaries
- โ DNS-based destinations are dynamically managed
- โ Platform team controls access centrally by applying CRs
- โ Reduced risk of accidental data leaks
๐ฆ Tech Stack
- Python + Kopf
- Kubernetes Custom Resource Definitions (CRD)
- Calico CNI
- DNS Resolver + TTL cache (Python
socket
+dnspython
) - Azure Kubernetes Service (AKS)
๐ Whatโs Next?
- Add
refreshIntervalSeconds
to support per-policy re-resolution of DNS - Integrate policy violations alerting using Datadog
- Optional integration with OPA/Gatekeeper for audit-level enforcement
๐ง Learnings
- Writing custom controllers gives immense flexibility in Kubernetes
- Egress is often overlooked in multi-tenant designs โ donโt ignore it
- Kopf is fantastic for quick Python-based operators without boilerplate
๐โโ๏ธ About Me
I'm a Cloud Solution Architect working with Azure, AKS, and secure SaaS platforms. Follow me for more deep dives on Kubernetes, cloud architecture, and DevOps.
Connect: LinkedIn | Blog: ajin-cloudjourney.hashnode.dev
#Kubernetes #AKS #Egress #DevSecOps #Python #Kopf #Calico #CloudArchitecture #MultiTenant
Subscribe to my newsletter
Read articles from Ajin Joseph directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
