You know that feeling when an incident starts with a sentence like:
- “Why is this database publicly reachable?”
- “Who opened
0.0.0.0/0to SSH?” - “Why did this Kubernetes pod run as
privileged?” - “Why did someone deploy a LoadBalancer in a dev namespace?”
Most “bad infra” isn’t malicious. It’s normal engineering drift:
new hires, rushed PRs, copy-paste manifests, unclear standards, and “just this once” exceptions that quietly become the default.
Policy as Code is how mature teams stop relying on memory and best intentions—and start enforcing guardrails the same way they enforce tests: automatically, consistently, and early.
This post teaches you OPA + Rego from zero, then shows real policies that block common infrastructure mistakes before they hit production.
What is Policy as Code (in plain English)?
Policy as Code means writing your rules (security, compliance, platform standards) as version-controlled code that runs automatically in pipelines and platforms.
Instead of a PDF that says:
“All S3 buckets must be private.”
You have a policy that fails the PR or blocks the deployment if someone creates a public bucket.
Why engineers love it (when done right)
- It’s repeatable: same rule everywhere
- It’s reviewable: PRs for rule changes
- It’s testable: policy unit tests
- It’s auditable: “who changed the guardrail, when, and why”
- It shifts left: catches issues at PR time, not after a breach
Where OPA fits
What is OPA?
OPA (Open Policy Agent) is a general-purpose policy engine. You feed it “facts” (input data), and it returns a decision: allow/deny plus reasons.
OPA is used for:
- Kubernetes admission control (block risky manifests)
- Terraform plan checks (block risky infra changes)
- API authorization (fine-grained access control)
- CI/CD checks (stop policy violations before merge)
OPA evaluates policies written in Rego.
OPA in one picture (mental model)
Input (JSON) → OPA + Rego policies → Decision (allow/deny + messages)
- Input is the thing you’re deciding about (a Kubernetes manifest, Terraform plan, API request)
- Policy is your rules
- Decision is what your platform does (fail PR, reject deploy, deny API call)
Rego basics (you only need a few concepts to get productive)
Rego can look “new” at first, but the core ideas are small:
1) package groups your rules
package security.kubernetes
2) input is the JSON you are evaluating
OPA always evaluates against an input object.
Example input (simplified):
{
"kind": "Pod",
"metadata": {"name": "api"},
"spec": {"containers": [{"name": "app", "securityContext": {"privileged": true}}]}
}
3) Rules compute values (often booleans or sets)
A rule can define:
allow = true/falsedeny[msg](a set of messages explaining violations)
4) “Default deny” is the safe pattern
default allow := false
allow := true { ...conditions... }
5) deny[msg] is the most practical pattern for infra guardrails
It lets you return multiple violations at once:
deny[msg] {
some condition
msg := "Explain what is wrong and how to fix it"
}
Your first real policy: block privileged containers (Kubernetes)
The goal
Prevent pods from running as privileged.
Rego policy
package kubernetes.guardrails
deny[msg] {
input.kind == "Pod"
some i
c := input.spec.containers[i]
c.securityContext.privileged == true
msg := sprintf("Pod %q: container %q must not run privileged=true", [input.metadata.name, c.name])
}
Why this prevents bad infra
Privileged containers can access host devices, escalate privileges, and bypass isolation. Blocking it is a high-value, low-controversy guardrail.
Rego patterns you’ll reuse everywhere
Pattern A: require a field (like labels/tags)
Example: require owner and env labels in Kubernetes.
package kubernetes.guardrails
required_labels := {"owner", "env"}
deny[msg] {
input.metadata.labels == null
msg := sprintf("%q: missing metadata.labels (required: %v)", [input.metadata.name, required_labels])
}
deny[msg] {
some k
k := required_labels[_]
not input.metadata.labels[k]
msg := sprintf("%q: missing label %q", [input.metadata.name, k])
}
This blocks “nobody owns this” deployments, which are a huge source of orphan resources and security gaps.
Pattern B: deny risky defaults (public exposure)
Example: block Service of type LoadBalancer in non-prod namespaces.
package kubernetes.guardrails
deny[msg] {
input.kind == "Service"
input.spec.type == "LoadBalancer"
ns := input.metadata.namespace
ns != "prod"
msg := sprintf("Service %q in namespace %q: LoadBalancer not allowed outside prod", [input.metadata.name, ns])
}
This prevents accidental internet exposure in dev/test.
Pattern C: enforce resource requests/limits (prevent runaway costs)
Example: require CPU and memory requests/limits for every container.
package kubernetes.guardrails
deny[msg] {
input.kind == "Deployment"
some i
c := input.spec.template.spec.containers[i]
not c.resources.requests.cpu
msg := sprintf("Deployment %q: container %q missing CPU request", [input.metadata.name, c.name])
}
deny[msg] {
input.kind == "Deployment"
some i
c := input.spec.template.spec.containers[i]
not c.resources.limits.memory
msg := sprintf("Deployment %q: container %q missing memory limit", [input.metadata.name, c.name])
}
This prevents the classic “one container eats the node” problem and improves scheduling efficiency.
Policy as Code for Terraform (how it blocks risky infra before it exists)
Kubernetes policies block bad deployments. Terraform policies block bad infrastructure changes before they’re applied.
The common approach is:
- Generate a plan
- Convert plan to JSON
- Evaluate with OPA/Rego
- Fail the pipeline if policy violations exist
To keep this beginner-friendly, we’ll use a simplified plan-like input.
Example input (simplified Terraform plan JSON)
{
"resources": [
{
"type": "aws_security_group_rule",
"name": "ssh_ingress",
"change": {
"after": {
"type": "ingress",
"from_port": 22,
"to_port": 22,
"protocol": "tcp",
"cidr_blocks": ["0.0.0.0/0"]
}
}
}
]
}
Policy: block SSH from the entire internet
package terraform.guardrails
deny[msg] {
some r
r := input.resources[_]
r.type == "aws_security_group_rule"
after := r.change.after
after.type == "ingress"
after.from_port <= 22
after.to_port >= 22
"0.0.0.0/0" == after.cidr_blocks[_]
msg := sprintf("Security group rule %q: SSH (22) open to 0.0.0.0/0 is not allowed", [r.name])
}
Why this prevents bad infra
This blocks one of the most common real-world misconfigurations that leads to scanning, brute force attempts, and breaches.
Policy: enforce mandatory tags (cloud governance + cost)
Example input:
{
"resources": [
{
"type": "aws_instance",
"name": "app_server",
"change": {
"after": {
"tags": { "env": "prod", "owner": "team-payments" }
}
}
}
]
}
Rego:
package terraform.guardrails
required_tags := {"env", "owner", "cost_center"}
deny[msg] {
some r
r := input.resources[_]
after := r.change.after
tags := after.tags
tags == null
msg := sprintf("%s.%s: missing tags (required: %v)", [r.type, r.name, required_tags])
}
deny[msg] {
some r, t
r := input.resources[_]
t := required_tags[_]
after := r.change.after
tags := after.tags
tags != null
not tags[t]
msg := sprintf("%s.%s: missing tag %q", [r.type, r.name, t])
}
This prevents “unowned infra” and improves cost allocation immediately.
Policy: require encryption (a compliance lifesaver)
Example: require encryption on storage resources (pattern shown generically):
package terraform.guardrails
deny[msg] {
some r
r := input.resources[_]
r.type == "aws_ebs_volume"
after := r.change.after
not after.encrypted
msg := sprintf("%s.%s: EBS volume must have encrypted=true", [r.type, r.name])
}
The magic: how Policy as Code stops bad infra at multiple stages
A mature setup enforces the same intent in 3 places:
1) In PR checks (fast feedback)
- Fail the PR if Terraform/K8s manifests violate policy
- Engineers fix issues while context is fresh
2) At deployment time (strong guardrails)
- Kubernetes admission controller rejects risky objects
- Even if something bypasses CI, the cluster still protects itself
3) After deployment (audit + drift detection)
- Detect violations that already exist (legacy systems)
- Create tickets or alerts to remediate gradually
This is how you go from “We hope people follow standards” to “The platform makes the safe path the easy path.”
Step-by-step: implement OPA guardrails in a practical way
Step 1 — Pick your first 5 “never again” rules
Start with rules that are:
- high impact
- low debate
- easy to understand
Great starters:
- No
0.0.0.0/0for SSH/RDP - No privileged containers
- Require tags/labels: owner/env
- Require encryption on storage
- No public LoadBalancers outside prod (or without explicit annotation)
Step 2 — Create a policy repo (treat it like a product)
Recommended structure:
policies/
kubernetes/
guardrails.rego
guardrails_test.rego
terraform/
guardrails.rego
guardrails_test.rego
Step 3 — Add policy unit tests (this is where teams level up)
A tiny test example:
package kubernetes.guardrails_test
import data.kubernetes.guardrails.deny
test_privileged_container_denied {
input := {
"kind": "Pod",
"metadata": {"name": "bad-pod"},
"spec": {"containers": [{"name": "app", "securityContext": {"privileged": true}}]}
}
count(deny with input as input) > 0
}
Tests make policies safe to change and reduce accidental breakage.
Step 4 — Roll out in “warn mode” first
For 2–4 weeks:
- don’t block immediately
- record violations
- fix noisy rules
- build trust
Then move the most stable policies into “enforce mode.”
Step 5 — Add exceptions the right way (so you don’t create policy chaos)
Instead of “just disable policy,” use scoped exceptions:
- by namespace
- by resource name pattern
- by explicit annotation like
policy.exception: approved-ticket-123
Example pattern (simplified):
is_exception {
input.metadata.annotations["policy.exception"] != ""
}
deny[msg] {
not is_exception
# ...violation conditions...
msg := "..."
}
Step 6 — Track what matters (policy KPIs)
- Violation counts over time (should decrease)
- Top violating teams/resources
- Mean time to remediate violations
- Exception count (should be small and reviewed)
Real-world examples of “bad infra” Policy as Code prevents
Example 1: “Open SSH to the world” never reaches prod
Without policy:
- Terraform gets applied
- scanners find port 22
- incident ticket, blame, downtime
With policy:
- PR fails with a clear message
- engineer changes CIDR to VPN range or bastion SG
- no incident, no drama
Example 2: “Privileged pod” never gets admitted
Without policy:
- privileged pod runs
- host access risk increases
- security audit flags it later
With policy:
- admission rejects it instantly with a human-readable reason
Example 3: “No tags” resources don’t get created
Without policy:
- mystery bills
- nobody owns cleanup
- FinOps becomes detective work
With policy:
- missing tags block creation
- cost allocation stays clean
Common mistakes (and how to avoid them)
Mistake 1: Writing policies that are too strict too early
Fix: warn mode → learn → enforce gradually.
Mistake 2: Policies with unclear error messages
Fix: write messages that include what, where, and how to fix.
Bad message: denied by policy
Good message: Deployment payments-api: container app missing CPU request (set resources.requests.cpu)
Mistake 3: No owner for the policy repo
Fix: treat policies like platform code. Assign ownership, reviews, release notes.
Mistake 4: Too many exceptions
Fix: exceptions must be explicit, time-bound, and reviewed.
Mistake 5: Policies not tested
Fix: add policy unit tests for every rule—especially deny rules.
Cheatsheet: Rego essentials you’ll use daily
some i for looping
some i
c := input.spec.containers[i]
Membership checks
"0.0.0.0/0" == after.cidr_blocks[_]
Negation (missing fields)
not input.metadata.labels["owner"]
String formatting for clear messages
msg := sprintf("Resource %q missing owner label", [input.metadata.name])
FAQ (quick answers)
Is Policy as Code only for security?
No. It’s also for:
- cost guardrails (no oversized instances in dev)
- reliability standards (requests/limits required)
- governance (tags, regions, naming)
- compliance (encryption, retention rules)
Will it slow teams down?
If you start with clear rules + good messages + warn mode, it speeds teams up by preventing rework and incidents.
How many policies should we have?
Start with 5–10 high-impact policies, then grow slowly. Most orgs do better with fewer, stronger rules than 200 fragile ones.
Can engineers maintain it, or does it require a special team?
Engineers can maintain it—especially if you keep the rules simple and cover them with tests.
Final takeaway
Policy as Code is guardrails that don’t forget.
OPA/Rego turns standards into executable rules that protect your infra at PR time and at deploy time—before mistakes become outages or security incidents.
Policy Pack: Multi-Cloud Guardrails (OPA/Rego)
Supports: AWS + Azure + GCP + Kubernetes + Terraform
What you get in this pack
- Kubernetes guardrails (block risky workloads before they run)
- Terraform plan guardrails (block risky infra before it’s created)
- Clear error messages engineers can fix fast
- Policy tests so policies don’t break silently
Repo structure (recommended)
policies/
kubernetes/
guardrails.rego
guardrails_test.rego
terraform/
aws.rego
aws_test.rego
azure.rego
azure_test.rego
gcp.rego
gcp_test.rego
Part A — Kubernetes Guardrails (works with manifests or admission inputs)
These policies are written to work with either:
- plain manifest JSON/YAML (input is the object), or
- AdmissionReview style (input.review.object)
policies/kubernetes/guardrails.rego
package kubernetes.guardrails
# Helper: support both AdmissionReview (input.review.object) and raw manifests (input)
obj := input.review.object { input.review.object != null }
obj := input { not input.review.object }
kind := lower(obj.kind)
name := obj.metadata.name
namespace := obj.metadata.namespace
labels := obj.metadata.labels
annotations := obj.metadata.annotations
# Helper: get pod spec from common controllers
podspec := obj.spec { kind == "pod" }
podspec := obj.spec.template.spec { kind == "deployment" }
podspec := obj.spec.template.spec { kind == "statefulset" }
podspec := obj.spec.template.spec { kind == "daemonset" }
podspec := obj.spec.template.spec { kind == "replicaset" }
podspec := obj.spec.jobTemplate.spec.template.spec { kind == "cronjob" }
# ------------------------------------------------------------
# K8S-1: Block privileged containers
# ------------------------------------------------------------
deny[msg] {
podspec
some i
c := podspec.containers[i]
c.securityContext.privileged == true
msg := sprintf("%s/%s: container %q must not run privileged=true", [namespace, name, c.name])
}
# ------------------------------------------------------------
# K8S-2: Require runAsNonRoot=true (pod or container level)
# ------------------------------------------------------------
deny[msg] {
podspec
not podspec.securityContext.runAsNonRoot
some i
c := podspec.containers[i]
not c.securityContext.runAsNonRoot
msg := sprintf("%s/%s: set securityContext.runAsNonRoot=true (pod-level or per container)", [namespace, name])
}
# ------------------------------------------------------------
# K8S-3: Block hostNetwork=true
# ------------------------------------------------------------
deny[msg] {
podspec
podspec.hostNetwork == true
msg := sprintf("%s/%s: hostNetwork=true is not allowed", [namespace, name])
}
# ------------------------------------------------------------
# K8S-4: Block hostPath volumes (common escape hatch)
# ------------------------------------------------------------
deny[msg] {
podspec
some i
v := podspec.volumes[i]
v.hostPath.path != ""
msg := sprintf("%s/%s: hostPath volume %q is not allowed (path=%q)", [namespace, name, v.name, v.hostPath.path])
}
# ------------------------------------------------------------
# K8S-5: Require resource requests + memory limit (stability + cost)
# ------------------------------------------------------------
deny[msg] {
podspec
some i
c := podspec.containers[i]
not c.resources.requests.cpu
msg := sprintf("%s/%s: container %q missing resources.requests.cpu", [namespace, name, c.name])
}
deny[msg] {
podspec
some i
c := podspec.containers[i]
not c.resources.requests.memory
msg := sprintf("%s/%s: container %q missing resources.requests.memory", [namespace, name, c.name])
}
deny[msg] {
podspec
some i
c := podspec.containers[i]
not c.resources.limits.memory
msg := sprintf("%s/%s: container %q missing resources.limits.memory", [namespace, name, c.name])
}
# ------------------------------------------------------------
# K8S-6: Block 'latest' tag (or no tag) to improve reproducibility
# ------------------------------------------------------------
deny[msg] {
podspec
some i
c := podspec.containers[i]
image := c.image
not contains(image, ":")
msg := sprintf("%s/%s: container %q image %q must be pinned with a tag (no implicit latest)", [namespace, name, c.name, image])
}
deny[msg] {
podspec
some i
c := podspec.containers[i]
image := c.image
contains(image, ":latest")
msg := sprintf("%s/%s: container %q must not use :latest (pin a version tag)", [namespace, name, c.name])
}
# ------------------------------------------------------------
# K8S-7: Restrict Service type LoadBalancer outside prod
# Allow exception via annotation: policy.allow-loadbalancer="true"
# ------------------------------------------------------------
deny[msg] {
kind == "service"
obj.spec.type == "LoadBalancer"
ns := obj.metadata.namespace
ns != "prod"
not (annotations["policy.allow-loadbalancer"] == "true")
msg := sprintf("%s/%s: Service type LoadBalancer not allowed outside prod (add annotation policy.allow-loadbalancer=\"true\" for approved exception)", [ns, obj.metadata.name])
}
# ------------------------------------------------------------
# K8S-8: Require basic ownership labels (prevents orphan infra & mystery spend)
# ------------------------------------------------------------
required_labels := {"owner", "env"}
deny[msg] {
obj.metadata != null
obj.metadata.labels == null
msg := sprintf("%s/%s: missing labels (required: %v)", [namespace, name, required_labels])
}
deny[msg] {
obj.metadata.labels != null
some k
k := required_labels[_]
not labels[k]
msg := sprintf("%s/%s: missing label %q", [namespace, name, k])
}
policies/kubernetes/guardrails_test.rego
package kubernetes.guardrails_test
import data.kubernetes.guardrails.deny
test_privileged_container_denied {
input := {
"kind": "Pod",
"metadata": {"name": "bad", "namespace": "dev", "labels": {"owner":"team-a","env":"dev"}},
"spec": {"containers": [{"name":"app","image":"nginx:1.25","securityContext":{"privileged":true}}]}
}
count(deny with input as input) > 0
}
test_latest_tag_denied {
input := {
"kind": "Deployment",
"metadata": {"name": "api", "namespace": "dev", "labels": {"owner":"team-a","env":"dev"}},
"spec": {"template": {"spec": {"containers": [{"name":"app","image":"repo/app:latest","resources":{"requests":{"cpu":"100m","memory":"128Mi"},"limits":{"memory":"256Mi"}}}]}}}
}
count(deny with input as input) > 0
}
test_loadbalancer_in_dev_denied {
input := {
"kind": "Service",
"metadata": {"name": "svc", "namespace": "dev", "labels": {"owner":"team-a","env":"dev"}, "annotations": {}},
"spec": {"type": "LoadBalancer"}
}
count(deny with input as input) > 0
}
What this prevents (immediately):
- privileged workloads, host networking, hostPath escapes
- unbounded CPU/memory behavior and noisy neighbor issues
- “latest tag” reproducibility problems
- accidental public exposure via LoadBalancers
- orphan resources with no owner/env labeling
Part B — Terraform Guardrails (evaluate plan JSON)
Expected Terraform input (standard practice)
terraform plan -out tfplan.outterraform show -json tfplan.out > tfplan.json- Evaluate
tfplan.jsonwith OPA
In Terraform plan JSON, the key area we use is:
input.resource_changes[_]
We’ll write cloud-specific rules because resource shapes differ.
B1) AWS Terraform policies
policies/terraform/aws.rego
package terraform.aws
# We return all policy failures as deny messages
deny[msg] { open_ssh_to_world[msg] }
deny[msg] { require_tags[msg] }
deny[msg] { ebs_must_be_encrypted[msg] }
deny[msg] { s3_public_access_block_required[msg] }
deny[msg] { prod_no_public_ip[msg] }
deny[msg] { restrict_regions[msg] }
# ------------- Helpers -------------
rc := input.resource_changes[_]
is_create_or_update(rc) {
some a
a := rc.change.actions[_]
a == "create" or a == "update"
}
after(rc) := rc.change.after
# ------------- AWS-1: Block SSH open to world -------------
open_ssh_to_world[msg] {
is_create_or_update(rc)
rc.type == "aws_security_group_rule"
a := after(rc)
a.type == "ingress"
a.protocol == "tcp"
a.from_port <= 22
a.to_port >= 22
a.cidr_blocks[_] == "0.0.0.0/0"
msg := sprintf("AWS %s.%s: SSH (22) open to 0.0.0.0/0 is not allowed", [rc.type, rc.name])
}
# ------------- AWS-2: Require tags -------------
required_tags := {"env", "owner", "cost_center"}
require_tags[msg] {
is_create_or_update(rc)
# Many AWS resources use `tags`
a := after(rc)
a.tags == null
msg := sprintf("AWS %s.%s: missing tags (required: %v)", [rc.type, rc.name, required_tags])
}
require_tags[msg] {
is_create_or_update(rc)
a := after(rc)
a.tags != null
some k
k := required_tags[_]
not a.tags[k]
msg := sprintf("AWS %s.%s: missing tag %q", [rc.type, rc.name, k])
}
# ------------- AWS-3: EBS encryption required -------------
ebs_must_be_encrypted[msg] {
is_create_or_update(rc)
rc.type == "aws_ebs_volume"
a := after(rc)
not a.encrypted
msg := sprintf("AWS %s.%s: EBS volume must set encrypted=true", [rc.type, rc.name])
}
# ------------- AWS-4: S3 must block public access (account/standard control) -------------
s3_public_access_block_required[msg] {
is_create_or_update(rc)
rc.type == "aws_s3_bucket_public_access_block"
a := after(rc)
not a.block_public_acls
msg := sprintf("AWS %s.%s: block_public_acls must be true", [rc.type, rc.name])
}
s3_public_access_block_required[msg] {
is_create_or_update(rc)
rc.type == "aws_s3_bucket_public_access_block"
a := after(rc)
not a.block_public_policy
msg := sprintf("AWS %s.%s: block_public_policy must be true", [rc.type, rc.name])
}
# ------------- AWS-5: No public IP for prod EC2 -------------
prod_no_public_ip[msg] {
is_create_or_update(rc)
rc.type == "aws_instance"
a := after(rc)
a.tags.env == "prod"
a.associate_public_ip_address == true
msg := sprintf("AWS %s.%s: prod instances must not associate a public IP", [rc.type, rc.name])
}
# ------------- AWS-6: Restrict regions (example allowlist) -------------
allowed_regions := {"us-east-1", "us-west-2"}
restrict_regions[msg] {
is_create_or_update(rc)
# Many resources include provider config elsewhere; for a simple baseline,
# enforce region tag as an organizational convention.
a := after(rc)
a.tags.region != null
not allowed_regions[a.tags.region]
msg := sprintf("AWS %s.%s: region %q not in allowed_regions %v (use allowed regions)", [rc.type, rc.name, a.tags.region, allowed_regions])
}
policies/terraform/aws_test.rego
package terraform.aws_test
import data.terraform.aws.deny
test_ssh_open_denied {
input := {"resource_changes":[
{"type":"aws_security_group_rule","name":"ssh","change":{"actions":["create"],"after":{
"type":"ingress","protocol":"tcp","from_port":22,"to_port":22,"cidr_blocks":["0.0.0.0/0"],
"tags":{"env":"dev","owner":"team-a","cost_center":"cc1","region":"us-east-1"}
}}}
]}
count(deny with input as input) > 0
}
B2) Azure Terraform policies
policies/terraform/azure.rego
package terraform.azure
deny[msg] { open_ssh_rdp_to_world[msg] }
deny[msg] { require_tags[msg] }
deny[msg] { storage_https_tls_required[msg] }
deny[msg] { prod_no_public_ip[msg] }
deny[msg] { restrict_locations[msg] }
rc := input.resource_changes[_]
is_create_or_update(rc) {
some a
a := rc.change.actions[_]
a == "create" or a == "update"
}
after(rc) := rc.change.after
# -------- Azure-1: Block SSH/RDP open to world (NSG rules) --------
open_ssh_rdp_to_world[msg] {
is_create_or_update(rc)
rc.type == "azurerm_network_security_rule"
a := after(rc)
# inbound allow
lower(a.direction) == "inbound"
lower(a.access) == "allow"
# ports 22 or 3389
(a.destination_port_range == "22" or a.destination_port_range == "3389" or
a.destination_port_ranges[_] == "22" or a.destination_port_ranges[_] == "3389")
# world sources
a.source_address_prefix == "*" or a.source_address_prefix == "0.0.0.0/0" or
a.source_address_prefixes[_] == "*" or a.source_address_prefixes[_] == "0.0.0.0/0"
msg := sprintf("Azure %s.%s: inbound allow for SSH/RDP from the internet is not allowed", [rc.type, rc.name])
}
# -------- Azure-2: Require tags on common resources --------
required_tags := {"env", "owner", "cost_center"}
require_tags[msg] {
is_create_or_update(rc)
a := after(rc)
a.tags == null
msg := sprintf("Azure %s.%s: missing tags (required: %v)", [rc.type, rc.name, required_tags])
}
require_tags[msg] {
is_create_or_update(rc)
a := after(rc)
a.tags != null
some k
k := required_tags[_]
not a.tags[k]
msg := sprintf("Azure %s.%s: missing tag %q", [rc.type, rc.name, k])
}
# -------- Azure-3: Storage must enforce HTTPS + strong TLS --------
storage_https_tls_required[msg] {
is_create_or_update(rc)
rc.type == "azurerm_storage_account"
a := after(rc)
a.enable_https_traffic_only != true
msg := sprintf("Azure %s.%s: enable_https_traffic_only must be true", [rc.type, rc.name])
}
storage_https_tls_required[msg] {
is_create_or_update(rc)
rc.type == "azurerm_storage_account"
a := after(rc)
# Common baseline
a.min_tls_version != "TLS1_2"
msg := sprintf("Azure %s.%s: min_tls_version must be TLS1_2", [rc.type, rc.name])
}
# -------- Azure-4: No public IP for prod workloads (Public IP resource or NIC attachment patterns) --------
prod_no_public_ip[msg] {
is_create_or_update(rc)
rc.type == "azurerm_public_ip"
a := after(rc)
a.tags.env == "prod"
msg := sprintf("Azure %s.%s: prod must not create public IPs unless explicitly approved", [rc.type, rc.name])
}
# -------- Azure-5: Restrict locations (example allowlist) --------
allowed_locations := {"eastus", "westus2"}
restrict_locations[msg] {
is_create_or_update(rc)
a := after(rc)
a.location != null
not allowed_locations[lower(a.location)]
msg := sprintf("Azure %s.%s: location %q not allowed (allowed: %v)", [rc.type, rc.name, a.location, allowed_locations])
}
policies/terraform/azure_test.rego
package terraform.azure_test
import data.terraform.azure.deny
test_nsg_ssh_world_denied {
input := {"resource_changes":[
{"type":"azurerm_network_security_rule","name":"ssh","change":{"actions":["create"],"after":{
"direction":"Inbound","access":"Allow","destination_port_range":"22","source_address_prefix":"0.0.0.0/0",
"tags":{"env":"dev","owner":"team-a","cost_center":"cc1"}
}}}
]}
count(deny with input as input) > 0
}
B3) GCP Terraform policies
policies/terraform/gcp.rego
package terraform.gcp
deny[msg] { open_ssh_rdp_to_world[msg] }
deny[msg] { require_labels[msg] }
deny[msg] { bucket_public_prevention_required[msg] }
deny[msg] { prod_no_external_ip[msg] }
deny[msg] { restrict_regions_zones[msg] }
rc := input.resource_changes[_]
is_create_or_update(rc) {
some a
a := rc.change.actions[_]
a == "create" or a == "update"
}
after(rc) := rc.change.after
# -------- GCP-1: Block firewall rules exposing SSH/RDP --------
open_ssh_rdp_to_world[msg] {
is_create_or_update(rc)
rc.type == "google_compute_firewall"
a := after(rc)
# source ranges include world
a.source_ranges[_] == "0.0.0.0/0"
# allow includes tcp 22 or 3389
some i
allow := a.allow[i]
lower(allow.protocol) == "tcp"
(allow.ports[_] == "22" or allow.ports[_] == "3389")
msg := sprintf("GCP %s.%s: firewall allows SSH/RDP from 0.0.0.0/0 (not allowed)", [rc.type, rc.name])
}
# -------- GCP-2: Require labels (FinOps + ownership) --------
required_labels := {"env", "owner", "cost_center"}
require_labels[msg] {
is_create_or_update(rc)
a := after(rc)
a.labels == null
msg := sprintf("GCP %s.%s: missing labels (required: %v)", [rc.type, rc.name, required_labels])
}
require_labels[msg] {
is_create_or_update(rc)
a := after(rc)
a.labels != null
some k
k := required_labels[_]
not a.labels[k]
msg := sprintf("GCP %s.%s: missing label %q", [rc.type, rc.name, k])
}
# -------- GCP-3: Storage bucket must prevent public access --------
bucket_public_prevention_required[msg] {
is_create_or_update(rc)
rc.type == "google_storage_bucket"
a := after(rc)
a.public_access_prevention != "enforced"
msg := sprintf("GCP %s.%s: public_access_prevention must be \"enforced\"", [rc.type, rc.name])
}
bucket_public_prevention_required[msg] {
is_create_or_update(rc)
rc.type == "google_storage_bucket"
a := after(rc)
a.uniform_bucket_level_access != true
msg := sprintf("GCP %s.%s: uniform_bucket_level_access must be true", [rc.type, rc.name])
}
# -------- GCP-4: Prod instances must not have external IP --------
prod_no_external_ip[msg] {
is_create_or_update(rc)
rc.type == "google_compute_instance"
a := after(rc)
a.labels.env == "prod"
# if access_config exists, it generally means external IP is attached
some i
ni := a.network_interface[i]
ni.access_config != null
msg := sprintf("GCP %s.%s: prod compute instance must not have external IP (remove access_config)", [rc.type, rc.name])
}
# -------- GCP-5: Restrict regions/zones (example allowlist) --------
allowed_regions := {"us-central1", "us-west1"}
restrict_regions_zones[msg] {
is_create_or_update(rc)
a := after(rc)
a.region != null
not allowed_regions[a.region]
msg := sprintf("GCP %s.%s: region %q not allowed (allowed: %v)", [rc.type, rc.name, a.region, allowed_regions])
}
restrict_regions_zones[msg] {
is_create_or_update(rc)
a := after(rc)
a.zone != null
# zone like us-central1-a => region prefix us-central1
r := substring(a.zone, 0, count(split(a.zone, "-")) - 1) # best-effort
not allowed_regions[r]
msg := sprintf("GCP %s.%s: zone %q not allowed (must be in regions: %v)", [rc.type, rc.name, a.zone, allowed_regions])
}
policies/terraform/gcp_test.rego
package terraform.gcp_test
import data.terraform.gcp.deny
test_gcp_firewall_ssh_world_denied {
input := {"resource_changes":[
{"type":"google_compute_firewall","name":"ssh","change":{"actions":["create"],"after":{
"source_ranges":["0.0.0.0/0"],
"allow":[{"protocol":"tcp","ports":["22"]}],
"labels":{"env":"dev","owner":"team-a","cost_center":"cc1"}
}}}
]}
count(deny with input as input) > 0
}
Part C — How to run this (local + CI)
1) Run policy unit tests
opa test ./policies -v
2) Terraform: evaluate a plan
terraform plan -out tfplan.out
terraform show -json tfplan.out > tfplan.json
# AWS
opa eval -i tfplan.json -d policies/terraform/aws.rego "data.terraform.aws.deny"
# Azure
opa eval -i tfplan.json -d policies/terraform/azure.rego "data.terraform.azure.deny"
# GCP
opa eval -i tfplan.json -d policies/terraform/gcp.rego "data.terraform.gcp.deny"
In CI, you typically fail if the output deny set is non-empty.
3) Kubernetes: evaluate a manifest
If you have a manifest in JSON (or converted to JSON):
opa eval -i pod.json -d policies/kubernetes/guardrails.rego "data.kubernetes.guardrails.deny"
If the deny list is non-empty, block the change/deploy.
Why this set prevents “bad infra” (the real engineering impact)
It blocks the top incident starters:
- Public SSH/RDP exposure (cloud firewall/NSG/SG rules)
- Public IPs on prod VMs (common data-exfil path)
- Public storage buckets (silent data leak risk)
- Unencrypted storage (audit/compliance failures)
- Privileged/host-level Kubernetes settings (container escape risk)
- No requests/limits (node instability + cost spikes)
- LoadBalancer everywhere (accidental exposure + waste)
- No ownership tags/labels (orphan infra + mystery bills)
And it does it early — at PR/plan time — when fixes are cheap.