Skip to main content
This page provides examples of common policy scenarios you might want to implement in Formal. Each example includes the policy code and an explanation of how it works.

Basic Policy Examples

Block All Connections by Default

A foundational security pattern: deny all access by default.
package formal.v2

import future.keywords.if

default session := {
  "action": "block",
  "type": "block_with_formal_message",
  "reason": "Default deny policy"
}

Allow Only Admin Group

Combine default deny with explicit allow for specific groups.
package formal.v2

import future.keywords.if
import future.keywords.in

default session := {
  "action": "block",
  "type": "block_with_formal_message"
}

session := {
  "action": "allow",
  "reason": "User is in admin group"
} if {
  "admin" in input.user.groups
}

Allow Machine User with End-User in Admin Group

Control access based on both the machine user and the actual person using it.
package formal.v2

import future.keywords.if
import future.keywords.in

default session := {
  "action": "block",
  "type": "block_with_formal_message"
}

session := {
  "action": "allow",
  "reason": "End-user is admin, machine user is authorized"
} if {
  "admin" in input.end_user.groups
  input.user.username == "idp:formal:machine:bi_tool"
  not input.end_user.email == "blocked@example.com"
}

Block Writes to Production

Prevent destructive operations in production environments.
package formal.v2

import future.keywords.if

pre_request := {
  "action": "block",
  "type": "block_with_formal_message",
  "reason": "Write operations not allowed in production"
} if {
  input.resource.environment == "production"
  input.query.statement_type in ["INSERT", "UPDATE", "DELETE", "DROP"]
}

Advanced Scenarios

Redact emails on Snowflake

Let’s say we want to redact all email columns that come back from our Snowflake database. As you will see, we are going to do this with multiple policies. The main reason is that a single policy can only return one pre_request or post_request stage at a time. Additionally, using separate policies allows for customized notification settings per policy.

Policy 1 – Mask emails

Let’s start by writing our first policy that will mask email columns.
package formal.v2

import future.keywords.if
import future.keywords.in

# Helper function to check if column is in a function
in_function(column) {
  column.in_functions != null
}

# Mask emails
post_request := {
  "action": "mask",
  "type": "redact.partial",
  "sub_type": "email_mask_username",
  "columns": columns,
  "typesafe": "fallback_to_default"
} if {
  columns := [column |
    column := input.columns[_];
    column.data_label == "email_address"
    not in_function(column)
  ]
  columns != []
}
But there’s a catch: we don’t want to try redacting email addresses that went through functions (e.g., CONCAT), because we can’t guarantee the output format. Let’s write a second policy to handle this.

Policy 2 – Hash function outputs

Here is a policy that will hash email columns that have been through a function, unless that function was an aggregate.
package formal.v2

import future.keywords.if
import future.keywords.in

# Helper function to check if column is in a function
in_function(column) {
  column.in_functions != null
}

# Helper function to check if any function has the "aggregate" category
# and does not have the "semi_structured" category
in_aggregate(column) {
  some func in column.in_functions
  "aggregate" in func.categories
  not "semi_structured" in func.categories
}

# Hash emails that went through a function, unless it was an aggregate
post_request := {
  "action": "mask",
  "type": "hash.no_salt",
  "columns": columns,
  "typesafe": "fallback_to_default"
} if {
  columns := [column |
    column := input.columns[_];
    column.data_label == "email_address"
    in_function(column)
    not in_aggregate(column)
  ]
  columns != []
}
We’re excluding “semi_structured” functions (functions for semi-structured and structured data) here because something like ARRAY_AGG in Snowflake would allow someone to view the emails.

Policy 3 – Block queries that may try to escape our masking

What if an attacker tries to bypass our masking policy with advanced Snowflake features? Let’s block the query if we detect them.
package formal.v2

import future.keywords.if
import future.keywords.in

# Block queries that may try to escape our masking
pre_request := {
  "action": "block",
  "type": "block_with_formal_message",
} if {
  forbidden_substrings = [
    "CREATE", "SYSTEM$", "IDENTIFIER", "RESULT_SCAN", "PROCEDURE",
    "CLONE", "LATERAL", "REPLACE", "RENAME", "PIVOT", "RECURSIVE"
  ]
  some substring in forbidden_substrings
  contains(input.sql_query.query, substring)
}
Note that we’re not using post_request anymore, but pre_request: we want the request to be blocked before it’s executed.
Alternatively, we could have blocked CREATE queries by using input.sql_query.statement_type in another policy.

Policy 4 - Blocking Access to Specific Native Users

You can control which users can access specific native users when they attempt to connect using the @<native_user> syntax. This is useful for enforcing least-privilege access and preventing users from accessing highly privileged accounts. The following policy blocks the user john@joinformal.com from using the native user devops:
package formal.v2

import future.keywords.if
import future.keywords.in

session := { 
  "action": "block", 
  "type": "block_with_formal_message" 
} if {
  input.native_user == "devops"
  input.user.email == "john@joinformal.com"
}

Block Access to Stale Objects in AWS S3

This policy prevents access to S3 objects that were last modified more than 10 minutes ago. This is useful for ensuring data freshness and preventing access to outdated information.
package formal.v2

import future.keywords.if

# Helper function to check if a timestamp is older than 10 minutes
is_stale(timestamp) if {
  parsed_time = time.parse_rfc3339_ns(timestamp)
  current_time := time.now_ns()

  # 10 minutes = 10 * 60 * 1000000000 nanoseconds
  cutoff_time := current_time - (10 * 60 * 1000000000)

  parsed_time < cutoff_time
}

# Block access to stale objects in the local-bucket
post_request := { 
  "action": "block", 
  "type": "block_with_formal_message" 
} if {
  input.bucket.name == "local-bucket"
  input.bucket.action == "GetObject"
  is_stale(input.file.last_modified)
}
How it works:
  1. is_stale function: Compares the object’s last_modified timestamp with the current time minus 10 minutes
  2. Time calculation: Uses nanoseconds for precision (10 minutes = 600,000,000,000 nanoseconds)
  3. Blocking logic: Blocks GetObject requests to objects in local-bucket that are older than 10 minutes
  4. Post-request stage: Uses post_request to block after the request is processed but before returning results
Use cases:
  • Prevent access to outdated configuration files
  • Ensure users only see recent data exports
  • Block access to temporary files that should have been cleaned up
You can extend this pattern to:
  • Block multiple users from specific native users
  • Allow only certain users to access privileged native users
  • Require additional authentication for certain native users

Rate Limiting Access to AWS S3

This policy blocks access to test-bucket if the user accessed it more than five times in the last minute, across all Connector instances. The input.bucket.access_count provides cluster-wide access counts that are automatically tracked and synchronized.
package formal.v2

import future.keywords.if

pre_request := { "action": "block", "type": "block_with_formal_message" } if {
  input.bucket.name == "sensitive-bucket"
  input.bucket.access_count.last_minute > 5
}
Clustering Required: If you’re running multiple Connector instances, you need to enable the instances to form a cluster. See the Clustering page for configuration details.

Best Practices

As you can see with this example, policies can quickly get quite extensive. When creating policies, keep these best practices in mind:
  1. Use the correct package name (formal.v2 for data policies, formal.app for permissions).
  2. Import required keywords (future.keywords.if, future.keywords.in).
  3. Use session, pre_request, and post_request stages accordingly.
  4. Test policies before deploying to production by using the Dry-run status.
For more information about policy evaluation and available input objects, see the Evaluation page.
We recommend that you write and version your policies in git, and deploy them with Terraform using our provider.