Mohammad Gufran Jahangir February 21, 2026 0

You know that feeling when an incident starts with a sentence like:

  • “Why is this database publicly reachable?”
  • “Who opened 0.0.0.0/0 to SSH?”
  • “Why did this Kubernetes pod run as privileged?”
  • “Why did someone deploy a LoadBalancer in a dev namespace?”

Most “bad infra” isn’t malicious. It’s normal engineering drift:
new hires, rushed PRs, copy-paste manifests, unclear standards, and “just this once” exceptions that quietly become the default.

Policy as Code is how mature teams stop relying on memory and best intentions—and start enforcing guardrails the same way they enforce tests: automatically, consistently, and early.

This post teaches you OPA + Rego from zero, then shows real policies that block common infrastructure mistakes before they hit production.


Table of Contents

What is Policy as Code (in plain English)?

Policy as Code means writing your rules (security, compliance, platform standards) as version-controlled code that runs automatically in pipelines and platforms.

Instead of a PDF that says:

“All S3 buckets must be private.”

You have a policy that fails the PR or blocks the deployment if someone creates a public bucket.

Why engineers love it (when done right)

  • It’s repeatable: same rule everywhere
  • It’s reviewable: PRs for rule changes
  • It’s testable: policy unit tests
  • It’s auditable: “who changed the guardrail, when, and why”
  • It shifts left: catches issues at PR time, not after a breach

Where OPA fits

What is OPA?

OPA (Open Policy Agent) is a general-purpose policy engine. You feed it “facts” (input data), and it returns a decision: allow/deny plus reasons.

OPA is used for:

  • Kubernetes admission control (block risky manifests)
  • Terraform plan checks (block risky infra changes)
  • API authorization (fine-grained access control)
  • CI/CD checks (stop policy violations before merge)

OPA evaluates policies written in Rego.


OPA in one picture (mental model)

Input (JSON)OPA + Rego policiesDecision (allow/deny + messages)

  • Input is the thing you’re deciding about (a Kubernetes manifest, Terraform plan, API request)
  • Policy is your rules
  • Decision is what your platform does (fail PR, reject deploy, deny API call)

Rego basics (you only need a few concepts to get productive)

Rego can look “new” at first, but the core ideas are small:

1) package groups your rules

package security.kubernetes

2) input is the JSON you are evaluating

OPA always evaluates against an input object.

Example input (simplified):

{
  "kind": "Pod",
  "metadata": {"name": "api"},
  "spec": {"containers": [{"name": "app", "securityContext": {"privileged": true}}]}
}

3) Rules compute values (often booleans or sets)

A rule can define:

  • allow = true/false
  • deny[msg] (a set of messages explaining violations)

4) “Default deny” is the safe pattern

default allow := false
allow := true { ...conditions... }

5) deny[msg] is the most practical pattern for infra guardrails

It lets you return multiple violations at once:

deny[msg] {
  some condition
  msg := "Explain what is wrong and how to fix it"
}

Your first real policy: block privileged containers (Kubernetes)

The goal

Prevent pods from running as privileged.

Rego policy

package kubernetes.guardrails

deny[msg] {
  input.kind == "Pod"
  some i
  c := input.spec.containers[i]
  c.securityContext.privileged == true
  msg := sprintf("Pod %q: container %q must not run privileged=true", [input.metadata.name, c.name])
}

Why this prevents bad infra

Privileged containers can access host devices, escalate privileges, and bypass isolation. Blocking it is a high-value, low-controversy guardrail.


Rego patterns you’ll reuse everywhere

Pattern A: require a field (like labels/tags)

Example: require owner and env labels in Kubernetes.

package kubernetes.guardrails

required_labels := {"owner", "env"}

deny[msg] {
  input.metadata.labels == null
  msg := sprintf("%q: missing metadata.labels (required: %v)", [input.metadata.name, required_labels])
}

deny[msg] {
  some k
  k := required_labels[_]
  not input.metadata.labels[k]
  msg := sprintf("%q: missing label %q", [input.metadata.name, k])
}

This blocks “nobody owns this” deployments, which are a huge source of orphan resources and security gaps.


Pattern B: deny risky defaults (public exposure)

Example: block Service of type LoadBalancer in non-prod namespaces.

package kubernetes.guardrails

deny[msg] {
  input.kind == "Service"
  input.spec.type == "LoadBalancer"
  ns := input.metadata.namespace
  ns != "prod"
  msg := sprintf("Service %q in namespace %q: LoadBalancer not allowed outside prod", [input.metadata.name, ns])
}

This prevents accidental internet exposure in dev/test.


Pattern C: enforce resource requests/limits (prevent runaway costs)

Example: require CPU and memory requests/limits for every container.

package kubernetes.guardrails

deny[msg] {
  input.kind == "Deployment"
  some i
  c := input.spec.template.spec.containers[i]

  not c.resources.requests.cpu
  msg := sprintf("Deployment %q: container %q missing CPU request", [input.metadata.name, c.name])
}

deny[msg] {
  input.kind == "Deployment"
  some i
  c := input.spec.template.spec.containers[i]

  not c.resources.limits.memory
  msg := sprintf("Deployment %q: container %q missing memory limit", [input.metadata.name, c.name])
}

This prevents the classic “one container eats the node” problem and improves scheduling efficiency.


Policy as Code for Terraform (how it blocks risky infra before it exists)

Kubernetes policies block bad deployments. Terraform policies block bad infrastructure changes before they’re applied.

The common approach is:

  1. Generate a plan
  2. Convert plan to JSON
  3. Evaluate with OPA/Rego
  4. Fail the pipeline if policy violations exist

To keep this beginner-friendly, we’ll use a simplified plan-like input.

Example input (simplified Terraform plan JSON)

{
  "resources": [
    {
      "type": "aws_security_group_rule",
      "name": "ssh_ingress",
      "change": {
        "after": {
          "type": "ingress",
          "from_port": 22,
          "to_port": 22,
          "protocol": "tcp",
          "cidr_blocks": ["0.0.0.0/0"]
        }
      }
    }
  ]
}

Policy: block SSH from the entire internet

package terraform.guardrails

deny[msg] {
  some r
  r := input.resources[_]
  r.type == "aws_security_group_rule"
  after := r.change.after

  after.type == "ingress"
  after.from_port <= 22
  after.to_port >= 22

  "0.0.0.0/0" == after.cidr_blocks[_]

  msg := sprintf("Security group rule %q: SSH (22) open to 0.0.0.0/0 is not allowed", [r.name])
}

Why this prevents bad infra

This blocks one of the most common real-world misconfigurations that leads to scanning, brute force attempts, and breaches.


Policy: enforce mandatory tags (cloud governance + cost)

Example input:

{
  "resources": [
    {
      "type": "aws_instance",
      "name": "app_server",
      "change": {
        "after": {
          "tags": { "env": "prod", "owner": "team-payments" }
        }
      }
    }
  ]
}

Rego:

package terraform.guardrails

required_tags := {"env", "owner", "cost_center"}

deny[msg] {
  some r
  r := input.resources[_]
  after := r.change.after
  tags := after.tags

  tags == null
  msg := sprintf("%s.%s: missing tags (required: %v)", [r.type, r.name, required_tags])
}

deny[msg] {
  some r, t
  r := input.resources[_]
  t := required_tags[_]
  after := r.change.after
  tags := after.tags

  tags != null
  not tags[t]

  msg := sprintf("%s.%s: missing tag %q", [r.type, r.name, t])
}

This prevents “unowned infra” and improves cost allocation immediately.


Policy: require encryption (a compliance lifesaver)

Example: require encryption on storage resources (pattern shown generically):

package terraform.guardrails

deny[msg] {
  some r
  r := input.resources[_]
  r.type == "aws_ebs_volume"
  after := r.change.after

  not after.encrypted
  msg := sprintf("%s.%s: EBS volume must have encrypted=true", [r.type, r.name])
}

The magic: how Policy as Code stops bad infra at multiple stages

A mature setup enforces the same intent in 3 places:

1) In PR checks (fast feedback)

  • Fail the PR if Terraform/K8s manifests violate policy
  • Engineers fix issues while context is fresh

2) At deployment time (strong guardrails)

  • Kubernetes admission controller rejects risky objects
  • Even if something bypasses CI, the cluster still protects itself

3) After deployment (audit + drift detection)

  • Detect violations that already exist (legacy systems)
  • Create tickets or alerts to remediate gradually

This is how you go from “We hope people follow standards” to “The platform makes the safe path the easy path.”


Step-by-step: implement OPA guardrails in a practical way

Step 1 — Pick your first 5 “never again” rules

Start with rules that are:

  • high impact
  • low debate
  • easy to understand

Great starters:

  1. No 0.0.0.0/0 for SSH/RDP
  2. No privileged containers
  3. Require tags/labels: owner/env
  4. Require encryption on storage
  5. No public LoadBalancers outside prod (or without explicit annotation)

Step 2 — Create a policy repo (treat it like a product)

Recommended structure:

policies/
  kubernetes/
    guardrails.rego
    guardrails_test.rego
  terraform/
    guardrails.rego
    guardrails_test.rego

Step 3 — Add policy unit tests (this is where teams level up)

A tiny test example:

package kubernetes.guardrails_test

import data.kubernetes.guardrails.deny

test_privileged_container_denied {
  input := {
    "kind": "Pod",
    "metadata": {"name": "bad-pod"},
    "spec": {"containers": [{"name": "app", "securityContext": {"privileged": true}}]}
  }

  count(deny with input as input) > 0
}

Tests make policies safe to change and reduce accidental breakage.

Step 4 — Roll out in “warn mode” first

For 2–4 weeks:

  • don’t block immediately
  • record violations
  • fix noisy rules
  • build trust

Then move the most stable policies into “enforce mode.”

Step 5 — Add exceptions the right way (so you don’t create policy chaos)

Instead of “just disable policy,” use scoped exceptions:

  • by namespace
  • by resource name pattern
  • by explicit annotation like policy.exception: approved-ticket-123

Example pattern (simplified):

is_exception {
  input.metadata.annotations["policy.exception"] != ""
}

deny[msg] {
  not is_exception
  # ...violation conditions...
  msg := "..."
}

Step 6 — Track what matters (policy KPIs)

  • Violation counts over time (should decrease)
  • Top violating teams/resources
  • Mean time to remediate violations
  • Exception count (should be small and reviewed)

Real-world examples of “bad infra” Policy as Code prevents

Example 1: “Open SSH to the world” never reaches prod

Without policy:

  • Terraform gets applied
  • scanners find port 22
  • incident ticket, blame, downtime

With policy:

  • PR fails with a clear message
  • engineer changes CIDR to VPN range or bastion SG
  • no incident, no drama

Example 2: “Privileged pod” never gets admitted

Without policy:

  • privileged pod runs
  • host access risk increases
  • security audit flags it later

With policy:

  • admission rejects it instantly with a human-readable reason

Example 3: “No tags” resources don’t get created

Without policy:

  • mystery bills
  • nobody owns cleanup
  • FinOps becomes detective work

With policy:

  • missing tags block creation
  • cost allocation stays clean

Common mistakes (and how to avoid them)

Mistake 1: Writing policies that are too strict too early

Fix: warn mode → learn → enforce gradually.

Mistake 2: Policies with unclear error messages

Fix: write messages that include what, where, and how to fix.

Bad message: denied by policy
Good message: Deployment payments-api: container app missing CPU request (set resources.requests.cpu)

Mistake 3: No owner for the policy repo

Fix: treat policies like platform code. Assign ownership, reviews, release notes.

Mistake 4: Too many exceptions

Fix: exceptions must be explicit, time-bound, and reviewed.

Mistake 5: Policies not tested

Fix: add policy unit tests for every rule—especially deny rules.


Cheatsheet: Rego essentials you’ll use daily

some i for looping

some i
c := input.spec.containers[i]

Membership checks

"0.0.0.0/0" == after.cidr_blocks[_]

Negation (missing fields)

not input.metadata.labels["owner"]

String formatting for clear messages

msg := sprintf("Resource %q missing owner label", [input.metadata.name])

FAQ (quick answers)

Is Policy as Code only for security?

No. It’s also for:

  • cost guardrails (no oversized instances in dev)
  • reliability standards (requests/limits required)
  • governance (tags, regions, naming)
  • compliance (encryption, retention rules)

Will it slow teams down?

If you start with clear rules + good messages + warn mode, it speeds teams up by preventing rework and incidents.

How many policies should we have?

Start with 5–10 high-impact policies, then grow slowly. Most orgs do better with fewer, stronger rules than 200 fragile ones.

Can engineers maintain it, or does it require a special team?

Engineers can maintain it—especially if you keep the rules simple and cover them with tests.


Final takeaway

Policy as Code is guardrails that don’t forget.
OPA/Rego turns standards into executable rules that protect your infra at PR time and at deploy time—before mistakes become outages or security incidents.


Policy Pack: Multi-Cloud Guardrails (OPA/Rego)

Supports: AWS + Azure + GCP + Kubernetes + Terraform

What you get in this pack

  • Kubernetes guardrails (block risky workloads before they run)
  • Terraform plan guardrails (block risky infra before it’s created)
  • Clear error messages engineers can fix fast
  • Policy tests so policies don’t break silently

Repo structure (recommended)

policies/
  kubernetes/
    guardrails.rego
    guardrails_test.rego
  terraform/
    aws.rego
    aws_test.rego
    azure.rego
    azure_test.rego
    gcp.rego
    gcp_test.rego

Part A — Kubernetes Guardrails (works with manifests or admission inputs)

These policies are written to work with either:

  • plain manifest JSON/YAML (input is the object), or
  • AdmissionReview style (input.review.object)

policies/kubernetes/guardrails.rego

package kubernetes.guardrails

# Helper: support both AdmissionReview (input.review.object) and raw manifests (input)
obj := input.review.object { input.review.object != null }
obj := input { not input.review.object }

kind := lower(obj.kind)

name := obj.metadata.name
namespace := obj.metadata.namespace
labels := obj.metadata.labels
annotations := obj.metadata.annotations

# Helper: get pod spec from common controllers
podspec := obj.spec { kind == "pod" }
podspec := obj.spec.template.spec { kind == "deployment" }
podspec := obj.spec.template.spec { kind == "statefulset" }
podspec := obj.spec.template.spec { kind == "daemonset" }
podspec := obj.spec.template.spec { kind == "replicaset" }
podspec := obj.spec.jobTemplate.spec.template.spec { kind == "cronjob" }

# ------------------------------------------------------------
# K8S-1: Block privileged containers
# ------------------------------------------------------------
deny[msg] {
  podspec
  some i
  c := podspec.containers[i]
  c.securityContext.privileged == true
  msg := sprintf("%s/%s: container %q must not run privileged=true", [namespace, name, c.name])
}

# ------------------------------------------------------------
# K8S-2: Require runAsNonRoot=true (pod or container level)
# ------------------------------------------------------------
deny[msg] {
  podspec
  not podspec.securityContext.runAsNonRoot
  some i
  c := podspec.containers[i]
  not c.securityContext.runAsNonRoot
  msg := sprintf("%s/%s: set securityContext.runAsNonRoot=true (pod-level or per container)", [namespace, name])
}

# ------------------------------------------------------------
# K8S-3: Block hostNetwork=true
# ------------------------------------------------------------
deny[msg] {
  podspec
  podspec.hostNetwork == true
  msg := sprintf("%s/%s: hostNetwork=true is not allowed", [namespace, name])
}

# ------------------------------------------------------------
# K8S-4: Block hostPath volumes (common escape hatch)
# ------------------------------------------------------------
deny[msg] {
  podspec
  some i
  v := podspec.volumes[i]
  v.hostPath.path != ""
  msg := sprintf("%s/%s: hostPath volume %q is not allowed (path=%q)", [namespace, name, v.name, v.hostPath.path])
}

# ------------------------------------------------------------
# K8S-5: Require resource requests + memory limit (stability + cost)
# ------------------------------------------------------------
deny[msg] {
  podspec
  some i
  c := podspec.containers[i]
  not c.resources.requests.cpu
  msg := sprintf("%s/%s: container %q missing resources.requests.cpu", [namespace, name, c.name])
}

deny[msg] {
  podspec
  some i
  c := podspec.containers[i]
  not c.resources.requests.memory
  msg := sprintf("%s/%s: container %q missing resources.requests.memory", [namespace, name, c.name])
}

deny[msg] {
  podspec
  some i
  c := podspec.containers[i]
  not c.resources.limits.memory
  msg := sprintf("%s/%s: container %q missing resources.limits.memory", [namespace, name, c.name])
}

# ------------------------------------------------------------
# K8S-6: Block 'latest' tag (or no tag) to improve reproducibility
# ------------------------------------------------------------
deny[msg] {
  podspec
  some i
  c := podspec.containers[i]
  image := c.image
  not contains(image, ":")
  msg := sprintf("%s/%s: container %q image %q must be pinned with a tag (no implicit latest)", [namespace, name, c.name, image])
}

deny[msg] {
  podspec
  some i
  c := podspec.containers[i]
  image := c.image
  contains(image, ":latest")
  msg := sprintf("%s/%s: container %q must not use :latest (pin a version tag)", [namespace, name, c.name])
}

# ------------------------------------------------------------
# K8S-7: Restrict Service type LoadBalancer outside prod
# Allow exception via annotation: policy.allow-loadbalancer="true"
# ------------------------------------------------------------
deny[msg] {
  kind == "service"
  obj.spec.type == "LoadBalancer"
  ns := obj.metadata.namespace
  ns != "prod"
  not (annotations["policy.allow-loadbalancer"] == "true")
  msg := sprintf("%s/%s: Service type LoadBalancer not allowed outside prod (add annotation policy.allow-loadbalancer=\"true\" for approved exception)", [ns, obj.metadata.name])
}

# ------------------------------------------------------------
# K8S-8: Require basic ownership labels (prevents orphan infra & mystery spend)
# ------------------------------------------------------------
required_labels := {"owner", "env"}

deny[msg] {
  obj.metadata != null
  obj.metadata.labels == null
  msg := sprintf("%s/%s: missing labels (required: %v)", [namespace, name, required_labels])
}

deny[msg] {
  obj.metadata.labels != null
  some k
  k := required_labels[_]
  not labels[k]
  msg := sprintf("%s/%s: missing label %q", [namespace, name, k])
}

policies/kubernetes/guardrails_test.rego

package kubernetes.guardrails_test

import data.kubernetes.guardrails.deny

test_privileged_container_denied {
  input := {
    "kind": "Pod",
    "metadata": {"name": "bad", "namespace": "dev", "labels": {"owner":"team-a","env":"dev"}},
    "spec": {"containers": [{"name":"app","image":"nginx:1.25","securityContext":{"privileged":true}}]}
  }
  count(deny with input as input) > 0
}

test_latest_tag_denied {
  input := {
    "kind": "Deployment",
    "metadata": {"name": "api", "namespace": "dev", "labels": {"owner":"team-a","env":"dev"}},
    "spec": {"template": {"spec": {"containers": [{"name":"app","image":"repo/app:latest","resources":{"requests":{"cpu":"100m","memory":"128Mi"},"limits":{"memory":"256Mi"}}}]}}}
  }
  count(deny with input as input) > 0
}

test_loadbalancer_in_dev_denied {
  input := {
    "kind": "Service",
    "metadata": {"name": "svc", "namespace": "dev", "labels": {"owner":"team-a","env":"dev"}, "annotations": {}},
    "spec": {"type": "LoadBalancer"}
  }
  count(deny with input as input) > 0
}

What this prevents (immediately):

  • privileged workloads, host networking, hostPath escapes
  • unbounded CPU/memory behavior and noisy neighbor issues
  • “latest tag” reproducibility problems
  • accidental public exposure via LoadBalancers
  • orphan resources with no owner/env labeling

Part B — Terraform Guardrails (evaluate plan JSON)

Expected Terraform input (standard practice)

  1. terraform plan -out tfplan.out
  2. terraform show -json tfplan.out > tfplan.json
  3. Evaluate tfplan.json with OPA

In Terraform plan JSON, the key area we use is:

  • input.resource_changes[_]

We’ll write cloud-specific rules because resource shapes differ.


B1) AWS Terraform policies

policies/terraform/aws.rego

package terraform.aws

# We return all policy failures as deny messages
deny[msg] { open_ssh_to_world[msg] }
deny[msg] { require_tags[msg] }
deny[msg] { ebs_must_be_encrypted[msg] }
deny[msg] { s3_public_access_block_required[msg] }
deny[msg] { prod_no_public_ip[msg] }
deny[msg] { restrict_regions[msg] }

# ------------- Helpers -------------
rc := input.resource_changes[_]

is_create_or_update(rc) {
  some a
  a := rc.change.actions[_]
  a == "create" or a == "update"
}

after(rc) := rc.change.after

# ------------- AWS-1: Block SSH open to world -------------
open_ssh_to_world[msg] {
  is_create_or_update(rc)
  rc.type == "aws_security_group_rule"
  a := after(rc)
  a.type == "ingress"
  a.protocol == "tcp"
  a.from_port <= 22
  a.to_port >= 22
  a.cidr_blocks[_] == "0.0.0.0/0"
  msg := sprintf("AWS %s.%s: SSH (22) open to 0.0.0.0/0 is not allowed", [rc.type, rc.name])
}

# ------------- AWS-2: Require tags -------------
required_tags := {"env", "owner", "cost_center"}

require_tags[msg] {
  is_create_or_update(rc)
  # Many AWS resources use `tags`
  a := after(rc)
  a.tags == null
  msg := sprintf("AWS %s.%s: missing tags (required: %v)", [rc.type, rc.name, required_tags])
}

require_tags[msg] {
  is_create_or_update(rc)
  a := after(rc)
  a.tags != null
  some k
  k := required_tags[_]
  not a.tags[k]
  msg := sprintf("AWS %s.%s: missing tag %q", [rc.type, rc.name, k])
}

# ------------- AWS-3: EBS encryption required -------------
ebs_must_be_encrypted[msg] {
  is_create_or_update(rc)
  rc.type == "aws_ebs_volume"
  a := after(rc)
  not a.encrypted
  msg := sprintf("AWS %s.%s: EBS volume must set encrypted=true", [rc.type, rc.name])
}

# ------------- AWS-4: S3 must block public access (account/standard control) -------------
s3_public_access_block_required[msg] {
  is_create_or_update(rc)
  rc.type == "aws_s3_bucket_public_access_block"
  a := after(rc)
  not a.block_public_acls
  msg := sprintf("AWS %s.%s: block_public_acls must be true", [rc.type, rc.name])
}

s3_public_access_block_required[msg] {
  is_create_or_update(rc)
  rc.type == "aws_s3_bucket_public_access_block"
  a := after(rc)
  not a.block_public_policy
  msg := sprintf("AWS %s.%s: block_public_policy must be true", [rc.type, rc.name])
}

# ------------- AWS-5: No public IP for prod EC2 -------------
prod_no_public_ip[msg] {
  is_create_or_update(rc)
  rc.type == "aws_instance"
  a := after(rc)
  a.tags.env == "prod"
  a.associate_public_ip_address == true
  msg := sprintf("AWS %s.%s: prod instances must not associate a public IP", [rc.type, rc.name])
}

# ------------- AWS-6: Restrict regions (example allowlist) -------------
allowed_regions := {"us-east-1", "us-west-2"}

restrict_regions[msg] {
  is_create_or_update(rc)
  # Many resources include provider config elsewhere; for a simple baseline,
  # enforce region tag as an organizational convention.
  a := after(rc)
  a.tags.region != null
  not allowed_regions[a.tags.region]
  msg := sprintf("AWS %s.%s: region %q not in allowed_regions %v (use allowed regions)", [rc.type, rc.name, a.tags.region, allowed_regions])
}

policies/terraform/aws_test.rego

package terraform.aws_test

import data.terraform.aws.deny

test_ssh_open_denied {
  input := {"resource_changes":[
    {"type":"aws_security_group_rule","name":"ssh","change":{"actions":["create"],"after":{
      "type":"ingress","protocol":"tcp","from_port":22,"to_port":22,"cidr_blocks":["0.0.0.0/0"],
      "tags":{"env":"dev","owner":"team-a","cost_center":"cc1","region":"us-east-1"}
    }}}
  ]}
  count(deny with input as input) > 0
}

B2) Azure Terraform policies

policies/terraform/azure.rego

package terraform.azure

deny[msg] { open_ssh_rdp_to_world[msg] }
deny[msg] { require_tags[msg] }
deny[msg] { storage_https_tls_required[msg] }
deny[msg] { prod_no_public_ip[msg] }
deny[msg] { restrict_locations[msg] }

rc := input.resource_changes[_]

is_create_or_update(rc) {
  some a
  a := rc.change.actions[_]
  a == "create" or a == "update"
}

after(rc) := rc.change.after

# -------- Azure-1: Block SSH/RDP open to world (NSG rules) --------
open_ssh_rdp_to_world[msg] {
  is_create_or_update(rc)
  rc.type == "azurerm_network_security_rule"
  a := after(rc)

  # inbound allow
  lower(a.direction) == "inbound"
  lower(a.access) == "allow"

  # ports 22 or 3389
  (a.destination_port_range == "22" or a.destination_port_range == "3389" or
   a.destination_port_ranges[_] == "22" or a.destination_port_ranges[_] == "3389")

  # world sources
  a.source_address_prefix == "*" or a.source_address_prefix == "0.0.0.0/0" or
  a.source_address_prefixes[_] == "*" or a.source_address_prefixes[_] == "0.0.0.0/0"

  msg := sprintf("Azure %s.%s: inbound allow for SSH/RDP from the internet is not allowed", [rc.type, rc.name])
}

# -------- Azure-2: Require tags on common resources --------
required_tags := {"env", "owner", "cost_center"}

require_tags[msg] {
  is_create_or_update(rc)
  a := after(rc)
  a.tags == null
  msg := sprintf("Azure %s.%s: missing tags (required: %v)", [rc.type, rc.name, required_tags])
}

require_tags[msg] {
  is_create_or_update(rc)
  a := after(rc)
  a.tags != null
  some k
  k := required_tags[_]
  not a.tags[k]
  msg := sprintf("Azure %s.%s: missing tag %q", [rc.type, rc.name, k])
}

# -------- Azure-3: Storage must enforce HTTPS + strong TLS --------
storage_https_tls_required[msg] {
  is_create_or_update(rc)
  rc.type == "azurerm_storage_account"
  a := after(rc)
  a.enable_https_traffic_only != true
  msg := sprintf("Azure %s.%s: enable_https_traffic_only must be true", [rc.type, rc.name])
}

storage_https_tls_required[msg] {
  is_create_or_update(rc)
  rc.type == "azurerm_storage_account"
  a := after(rc)
  # Common baseline
  a.min_tls_version != "TLS1_2"
  msg := sprintf("Azure %s.%s: min_tls_version must be TLS1_2", [rc.type, rc.name])
}

# -------- Azure-4: No public IP for prod workloads (Public IP resource or NIC attachment patterns) --------
prod_no_public_ip[msg] {
  is_create_or_update(rc)
  rc.type == "azurerm_public_ip"
  a := after(rc)
  a.tags.env == "prod"
  msg := sprintf("Azure %s.%s: prod must not create public IPs unless explicitly approved", [rc.type, rc.name])
}

# -------- Azure-5: Restrict locations (example allowlist) --------
allowed_locations := {"eastus", "westus2"}

restrict_locations[msg] {
  is_create_or_update(rc)
  a := after(rc)
  a.location != null
  not allowed_locations[lower(a.location)]
  msg := sprintf("Azure %s.%s: location %q not allowed (allowed: %v)", [rc.type, rc.name, a.location, allowed_locations])
}

policies/terraform/azure_test.rego

package terraform.azure_test

import data.terraform.azure.deny

test_nsg_ssh_world_denied {
  input := {"resource_changes":[
    {"type":"azurerm_network_security_rule","name":"ssh","change":{"actions":["create"],"after":{
      "direction":"Inbound","access":"Allow","destination_port_range":"22","source_address_prefix":"0.0.0.0/0",
      "tags":{"env":"dev","owner":"team-a","cost_center":"cc1"}
    }}}
  ]}
  count(deny with input as input) > 0
}

B3) GCP Terraform policies

policies/terraform/gcp.rego

package terraform.gcp

deny[msg] { open_ssh_rdp_to_world[msg] }
deny[msg] { require_labels[msg] }
deny[msg] { bucket_public_prevention_required[msg] }
deny[msg] { prod_no_external_ip[msg] }
deny[msg] { restrict_regions_zones[msg] }

rc := input.resource_changes[_]

is_create_or_update(rc) {
  some a
  a := rc.change.actions[_]
  a == "create" or a == "update"
}

after(rc) := rc.change.after

# -------- GCP-1: Block firewall rules exposing SSH/RDP --------
open_ssh_rdp_to_world[msg] {
  is_create_or_update(rc)
  rc.type == "google_compute_firewall"
  a := after(rc)

  # source ranges include world
  a.source_ranges[_] == "0.0.0.0/0"

  # allow includes tcp 22 or 3389
  some i
  allow := a.allow[i]
  lower(allow.protocol) == "tcp"
  (allow.ports[_] == "22" or allow.ports[_] == "3389")

  msg := sprintf("GCP %s.%s: firewall allows SSH/RDP from 0.0.0.0/0 (not allowed)", [rc.type, rc.name])
}

# -------- GCP-2: Require labels (FinOps + ownership) --------
required_labels := {"env", "owner", "cost_center"}

require_labels[msg] {
  is_create_or_update(rc)
  a := after(rc)
  a.labels == null
  msg := sprintf("GCP %s.%s: missing labels (required: %v)", [rc.type, rc.name, required_labels])
}

require_labels[msg] {
  is_create_or_update(rc)
  a := after(rc)
  a.labels != null
  some k
  k := required_labels[_]
  not a.labels[k]
  msg := sprintf("GCP %s.%s: missing label %q", [rc.type, rc.name, k])
}

# -------- GCP-3: Storage bucket must prevent public access --------
bucket_public_prevention_required[msg] {
  is_create_or_update(rc)
  rc.type == "google_storage_bucket"
  a := after(rc)
  a.public_access_prevention != "enforced"
  msg := sprintf("GCP %s.%s: public_access_prevention must be \"enforced\"", [rc.type, rc.name])
}

bucket_public_prevention_required[msg] {
  is_create_or_update(rc)
  rc.type == "google_storage_bucket"
  a := after(rc)
  a.uniform_bucket_level_access != true
  msg := sprintf("GCP %s.%s: uniform_bucket_level_access must be true", [rc.type, rc.name])
}

# -------- GCP-4: Prod instances must not have external IP --------
prod_no_external_ip[msg] {
  is_create_or_update(rc)
  rc.type == "google_compute_instance"
  a := after(rc)
  a.labels.env == "prod"

  # if access_config exists, it generally means external IP is attached
  some i
  ni := a.network_interface[i]
  ni.access_config != null

  msg := sprintf("GCP %s.%s: prod compute instance must not have external IP (remove access_config)", [rc.type, rc.name])
}

# -------- GCP-5: Restrict regions/zones (example allowlist) --------
allowed_regions := {"us-central1", "us-west1"}

restrict_regions_zones[msg] {
  is_create_or_update(rc)
  a := after(rc)
  a.region != null
  not allowed_regions[a.region]
  msg := sprintf("GCP %s.%s: region %q not allowed (allowed: %v)", [rc.type, rc.name, a.region, allowed_regions])
}

restrict_regions_zones[msg] {
  is_create_or_update(rc)
  a := after(rc)
  a.zone != null
  # zone like us-central1-a => region prefix us-central1
  r := substring(a.zone, 0, count(split(a.zone, "-")) - 1) # best-effort
  not allowed_regions[r]
  msg := sprintf("GCP %s.%s: zone %q not allowed (must be in regions: %v)", [rc.type, rc.name, a.zone, allowed_regions])
}

policies/terraform/gcp_test.rego

package terraform.gcp_test

import data.terraform.gcp.deny

test_gcp_firewall_ssh_world_denied {
  input := {"resource_changes":[
    {"type":"google_compute_firewall","name":"ssh","change":{"actions":["create"],"after":{
      "source_ranges":["0.0.0.0/0"],
      "allow":[{"protocol":"tcp","ports":["22"]}],
      "labels":{"env":"dev","owner":"team-a","cost_center":"cc1"}
    }}}
  ]}
  count(deny with input as input) > 0
}

Part C — How to run this (local + CI)

1) Run policy unit tests

opa test ./policies -v

2) Terraform: evaluate a plan

terraform plan -out tfplan.out
terraform show -json tfplan.out > tfplan.json

# AWS
opa eval -i tfplan.json -d policies/terraform/aws.rego "data.terraform.aws.deny"

# Azure
opa eval -i tfplan.json -d policies/terraform/azure.rego "data.terraform.azure.deny"

# GCP
opa eval -i tfplan.json -d policies/terraform/gcp.rego "data.terraform.gcp.deny"

In CI, you typically fail if the output deny set is non-empty.

3) Kubernetes: evaluate a manifest

If you have a manifest in JSON (or converted to JSON):

opa eval -i pod.json -d policies/kubernetes/guardrails.rego "data.kubernetes.guardrails.deny"

If the deny list is non-empty, block the change/deploy.


Why this set prevents “bad infra” (the real engineering impact)

It blocks the top incident starters:

  • Public SSH/RDP exposure (cloud firewall/NSG/SG rules)
  • Public IPs on prod VMs (common data-exfil path)
  • Public storage buckets (silent data leak risk)
  • Unencrypted storage (audit/compliance failures)
  • Privileged/host-level Kubernetes settings (container escape risk)
  • No requests/limits (node instability + cost spikes)
  • LoadBalancer everywhere (accidental exposure + waste)
  • No ownership tags/labels (orphan infra + mystery bills)

And it does it early — at PR/plan time — when fixes are cheap.


Category: 
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments