RLS example 4
Jump to navigation
Jump to search
Go-Migrate SQL files (from scratch) and a Python test script that logs into Keycloak, grabs JWTs, and exercises CRUD + RLS paths to prove the policy works.
migrations/001_init.up.sql
-- +goose Up
-- === Base setup: extensions, schemas, roles ===
create extension if not exists pgcrypto; -- for gen_random_uuid()
create schema if not exists app;
create schema if not exists todo;
-- Roles model (owner of DB objects vs runtime role for PostgREST)
do $$
begin
if not exists (select 1 from pg_roles where rolname = 'api_owner') then
create role api_owner login;
end if;
if not exists (select 1 from pg_roles where rolname = 'web_anon') then
create role web_anon nologin;
end if;
end$$;
grant usage on schema app, todo to web_anon;
-- === Identity mapping: internal UUID ↔ external JWT sub ===
create table if not exists app.users (
id uuid primary key default gen_random_uuid(),
external_subject text not null unique, -- JWT sub (opaque; not guaranteed to be a UUID)
display_name text,
email text,
is_active boolean not null default true,
created_at timestamptz not null default now()
);
create or replace function app.current_user_id()
returns uuid
stable
security definer
set search_path = public, pg_temp, app
language plpgsql
as $$
declare
claims jsonb;
sub_txt text;
uid uuid;
begin
claims := nullif(current_setting('request.jwt.claims', true), '')::jsonb;
if claims is null then
return null;
end if;
sub_txt := claims->>'sub';
if sub_txt is null then
return null;
end if;
-- Lookup existing active user
select u.id into uid
from app.users u
where u.external_subject = sub_txt
and u.is_active;
if uid is not null then
return uid;
end if;
-- Auto-provision on first use from claims
insert into app.users (external_subject, display_name, email)
values (
sub_txt,
coalesce(claims->>'name', null),
coalesce(claims->>'email', null)
)
returning id into uid;
return uid;
end
$$;
revoke all on function app.current_user_id() from public;
grant execute on function app.current_user_id() to web_anon;
-- === Groups & membership ===
create table if not exists app.groups (
id uuid primary key default gen_random_uuid(),
name text not null unique,
created_by uuid not null references app.users(id) on delete restrict,
created_at timestamptz not null default now()
);
create table if not exists app.group_members (
group_id uuid not null references app.groups(id) on delete cascade,
user_id uuid not null references app.users(id) on delete cascade,
role text not null check (role in ('owner','manager','member')),
added_at timestamptz not null default now(),
primary key (group_id, user_id)
);
create index if not exists idx_group_members_user on app.group_members(user_id);
create index if not exists idx_group_members_group on app.group_members(group_id);
-- Automatically add the creator as owner of any newly created group
create or replace function app.add_creator_as_owner()
returns trigger language plpgsql as $$
begin
insert into app.group_members(group_id, user_id, role)
values (new.id, new.created_by, 'owner')
on conflict do nothing;
return new;
end$$;
drop trigger if exists trg_groups_owner on app.groups;
create trigger trg_groups_owner
after insert on app.groups
for each row execute function app.add_creator_as_owner();
-- === Todos & shares ===
create table if not exists todo.todos (
id uuid primary key default gen_random_uuid(),
owner_id uuid not null references app.users(id) on delete restrict,
title text not null,
body text,
is_done boolean not null default false,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idx_todos_owner on todo.todos(owner_id);
create table if not exists todo.todo_group_shares (
todo_id uuid not null references todo.todos(id) on delete cascade,
group_id uuid not null references app.groups(id) on delete cascade,
can_read boolean not null default true,
can_write boolean not null default false,
can_del boolean not null default false,
primary key (todo_id, group_id)
);
create index if not exists idx_todo_shares_todo on todo.todo_group_shares(todo_id);
create index if not exists idx_todo_shares_group on todo.todo_group_shares(group_id);
-- Maintain updated_at on todos
create or replace function todo.bump_updated_at()
returns trigger language plpgsql as $$
begin
new.updated_at := now();
return new;
end$$;
drop trigger if exists trg_todos_updated_at on todo.todos;
create trigger trg_todos_updated_at
before update on todo.todos
for each row execute function todo.bump_updated_at();
-- === RLS enablement ===
alter table app.users enable row level security;
alter table app.groups enable row level security;
alter table app.group_members enable row level security;
alter table todo.todos enable row level security;
alter table todo.todo_group_shares enable row level security;
-- app.users: each user can see/update themselves
create policy users_select_self on app.users
for select
to web_anon
using (id = app.current_user_id());
create policy users_update_self on app.users
for update
to web_anon
using (id = app.current_user_id())
with check (id = app.current_user_id());
-- app.groups
create policy groups_insert_any on app.groups
for insert
to web_anon
with check (created_by = app.current_user_id());
create policy groups_select_member on app.groups
for select
to web_anon
using (
exists (
select 1
from app.group_members gm
where gm.group_id = app.groups.id
and gm.user_id = app.current_user_id()
)
);
create policy groups_update_owner on app.groups
for update
to web_anon
using (
exists (
select 1 from app.group_members gm
where gm.group_id = app.groups.id
and gm.user_id = app.current_user_id()
and gm.role = 'owner'
)
)
with check (
exists (
select 1 from app.group_members gm
where gm.group_id = app.groups.id
and gm.user_id = app.current_user_id()
and gm.role = 'owner'
)
);
create policy groups_delete_owner on app.groups
for delete
to web_anon
using (
exists (
select 1 from app.group_members gm
where gm.group_id = app.groups.id
and gm.user_id = app.current_user_id()
and gm.role = 'owner'
)
);
-- app.group_members
create policy gm_select_member on app.group_members
for select
to web_anon
using (
exists (
select 1 from app.group_members gm2
where gm2.group_id = group_members.group_id
and gm2.user_id = app.current_user_id()
)
);
create policy gm_insert_mgr on app.group_members
for insert
to web_anon
with check (
exists (
select 1 from app.group_members gm2
where gm2.group_id = group_members.group_id
and gm2.user_id = app.current_user_id()
and gm2.role in ('owner','manager')
)
);
create policy gm_update_mgr on app.group_members
for update
to web_anon
using (
exists (
select 1 from app.group_members gm2
where gm2.group_id = group_members.group_id
and gm2.user_id = app.current_user_id()
and gm2.role in ('owner','manager')
)
)
with check (
exists (
select 1 from app.group_members gm2
where gm2.group_id = group_members.group_id
and gm2.user_id = app.current_user_id()
and gm2.role in ('owner','manager')
)
);
create policy gm_delete_mgr_or_self on app.group_members
for delete
to web_anon
using (
(user_id = app.current_user_id()) or
exists (
select 1 from app.group_members gm2
where gm2.group_id = group_members.group_id
and gm2.user_id = app.current_user_id()
and gm2.role in ('owner','manager')
)
);
-- todo.todos
create policy todos_select_owner_or_shared on todo.todos
for select
to web_anon
using (
owner_id = app.current_user_id()
or exists (
select 1
from todo.todo_group_shares s
join app.group_members gm
on gm.group_id = s.group_id
and gm.user_id = app.current_user_id()
where s.todo_id = todos.id
and s.can_read
)
);
create policy todos_insert_self on todo.todos
for insert
to web_anon
with check (owner_id = app.current_user_id());
create policy todos_update_owner_or_shared on todo.todos
for update
to web_anon
using (
owner_id = app.current_user_id()
or exists (
select 1
from todo.todo_group_shares s
join app.group_members gm
on gm.group_id = s.group_id
and gm.user_id = app.current_user_id()
where s.todo_id = todos.id
and s.can_write
)
)
with check (
owner_id = app.current_user_id()
or exists (
select 1
from todo.todo_group_shares s
join app.group_members gm
on gm.group_id = s.group_id
and gm.user_id = app.current_user_id()
where s.todo_id = todos.id
and s.can_write
)
);
create policy todos_delete_owner_or_shared on todo.todos
for delete
to web_anon
using (
owner_id = app.current_user_id()
or exists (
select 1
from todo.todo_group_shares s
join app.group_members gm
on gm.group_id = s.group_id
and gm.user_id = app.current_user_id()
where s.todo_id = todos.id
and s.can_del
)
);
-- todo.todo_group_shares
create policy shares_select_visible on todo.todo_group_shares
for select
to web_anon
using (
exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
or exists (select 1 from app.group_members gm where gm.group_id = todo_group_shares.group_id and gm.user_id = app.current_user_id())
);
create policy shares_insert_todo_owner on todo.todo_group_shares
for insert
to web_anon
with check (
exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
);
create policy shares_update_todo_owner on todo.todo_group_shares
for update
to web_anon
using (
exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
)
with check (
exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
);
create policy shares_delete_todo_owner on todo.todo_group_shares
for delete
to web_anon
using (
exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
);
-- Base DML privileges (RLS gates real access)
grant select, insert, update, delete on app.users to web_anon;
grant select, insert, update, delete on app.groups to web_anon;
grant select, insert, update, delete on app.group_members to web_anon;
grant select, insert, update, delete on todo.todos to web_anon;
grant select, insert, update, delete on todo.todo_group_shares to web_anon;
migrations/001_init.down.sql
-- +goose Down
-- Drop in reverse dependency order
-- Policies automatically dropped with tables; explicit drops are harmless if they existed.
drop trigger if exists trg_todos_updated_at on todo.todos;
drop function if exists todo.bump_updated_at();
drop trigger if exists trg_groups_owner on app.groups;
drop function if exists app.add_creator_as_owner();
-- Tables
drop table if exists todo.todo_group_shares cascade;
drop table if exists todo.todos cascade;
drop table if exists app.group_members cascade;
drop table if exists app.groups cascade;
-- Functions
drop function if exists app.current_user_id();
-- Tables
drop table if exists app.users cascade;
-- Schemas
drop schema if exists todo cascade;
drop schema if exists app cascade;
-- (Optionally) keep roles; comment these if managed outside migrations
do $$
begin
if exists (select 1 from pg_roles where rolname = 'web_anon') then
revoke all on schema public from web_anon;
end if;
end$$;
-- Not dropping roles by default to avoid breaking external config (e.g., PostgREST).
-- Uncomment if desired:
-- drop role if exists web_anon;
-- drop role if exists api_owner;
test_keycloak_postgrest_rls.py
End-to-end validator that:
- logs in as two users (Alice & Bob) via Keycloak,
- auto-provisions
app.usersvia/rpc/current_user_id, - creates a group (auto-adds creator as owner),
- adds Bob as member,
- creates a todo,
- verifies read/write/delete controls via
todo.todo_group_shares.
#!/usr/bin/env python3
"""
E2E RLS test:
- Requires a PostgREST API exposing schemas "app" and "todo" and using web_anon role.
- Keycloak must have users whose credentials you supply.
- Works with public client (no secret) or confidential client (with secret).
ENV VARS:
API_BASE="https://uapp-api.example.com" # PostgREST base (no trailing slash)
KC_BASE="https://auth.example.com" # Keycloak base (no trailing slash)
KC_REALM="myrealm"
KC_CLIENT_ID="myclient"
KC_CLIENT_SECRET="" # optional; leave empty for public client
USER1="alice@example.com"
PASS1="alice_password"
USER2="bob@example.com"
PASS2="bob_password"
VERIFY_SSL="true" # "false" to skip TLS verify (dev only)
Run:
python3 test_keycloak_postgrest_rls.py
"""
import os, sys, time, json, base64
import requests
from typing import Dict, Any
API_BASE = os.environ.get("API_BASE", "http://localhost:3000").rstrip("/")
KC_BASE = os.environ.get("KC_BASE", "http://localhost:8080").rstrip("/")
KC_REALM = os.environ.get("KC_REALM", "example")
KC_CLIENT_ID = os.environ.get("KC_CLIENT_ID", "postgrest-client")
KC_CLIENT_SECRET = os.environ.get("KC_CLIENT_SECRET", "")
USER1 = os.environ.get("USER1", "alice")
PASS1 = os.environ.get("PASS1", "alice")
USER2 = os.environ.get("USER2", "bob")
PASS2 = os.environ.get("PASS2", "bob")
VERIFY_SSL = os.environ.get("VERIFY_SSL", "true").lower() != "false"
SESSION = requests.Session()
def kc_token(username: str, password: str) -> Dict[str, Any]:
url = f"{KC_BASE}/realms/{KC_REALM}/protocol/openid-connect/token"
data = {
"grant_type": "password",
"client_id": KC_CLIENT_ID,
"username": username,
"password": password,
}
if KC_CLIENT_SECRET:
data["client_secret"] = KC_CLIENT_SECRET
r = SESSION.post(url, data=data, verify=VERIFY_SSL)
if r.status_code != 200:
raise RuntimeError(f"Keycloak token error {r.status_code}: {r.text}")
return r.json()
def bearer(token: str) -> Dict[str,str]:
return {"Authorization": f"Bearer {token}"}
def postgrest_call(method: str, path: str, token: str, **kwargs) -> requests.Response:
url = f"{API_BASE}{path}"
headers = kwargs.pop("headers", {})
headers.update(bearer(token))
# Let PostgREST return exact counts on list calls
headers.setdefault("Prefer", "count=exact")
if "json" in kwargs and method.upper() in ("POST","PATCH"):
headers.setdefault("Content-Type", "application/json")
return SESSION.request(method=method, url=url, headers=headers, verify=VERIFY_SSL, **kwargs)
def jwt_sub(access_token: str) -> str:
# Decode JWT payload (no verify) for logging only
try:
payload_b64 = access_token.split(".")[1]
# pad
payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4)
payload = json.loads(base64.urlsafe_b64decode(payload_b64))
return payload.get("sub", "")
except Exception:
return ""
def rpc_current_user_id(token: str) -> str:
r = postgrest_call("POST", "/rpc/current_user_id", token, json={})
if r.status_code not in (200, 201):
raise RuntimeError(f"/rpc/current_user_id failed: {r.status_code} {r.text}")
# PostgREST returns either raw UUID or {"current_user_id": "..."} depending on cfg.
try:
j = r.json()
if isinstance(j, dict) and "current_user_id" in j:
return j["current_user_id"]
if isinstance(j, list) and j and "current_user_id" in j[0]:
return j[0]["current_user_id"]
if isinstance(j, str):
return j
except Exception:
pass
# text body?
return r.text.strip().strip('"')
def assert_ok(resp: requests.Response, msg="expected 2xx"):
if not (200 <= resp.status_code < 300):
raise AssertionError(f"{msg}: got {resp.status_code} {resp.text}")
def main():
print(f"API_BASE={API_BASE}")
print(f"KC_BASE={KC_BASE} realm={KC_REALM} client_id={KC_CLIENT_ID} secret={'set' if bool(KC_CLIENT_SECRET) else 'unset'}")
print(f"VERIFY_SSL={VERIFY_SSL}")
# 1) Login both users
t1 = kc_token(USER1, PASS1)
t2 = kc_token(USER2, PASS2)
a1, a2 = t1["access_token"], t2["access_token"]
print(f"Alice sub: {jwt_sub(a1)}")
print(f"Bob sub: {jwt_sub(a2)}")
# 2) Auto-provision/resolve current user IDs
u1 = rpc_current_user_id(a1)
u2 = rpc_current_user_id(a2)
print(f"Alice user_id: {u1}")
print(f"Bob user_id: {u2}")
# 3) Alice creates a group (trigger makes her owner automatically)
group_payload = {"name": f"testgrp-{int(time.time())}", "created_by": u1}
r = postgrest_call("POST", "/app.groups", a1, json=group_payload)
assert_ok(r, "create group")
group = r.json()[0]
group_id = group["id"]
print(f"Created group {group_id}")
# 4) Alice adds Bob as member (allowed because Alice is owner via trigger)
r = postgrest_call("POST", "/app.group_members", a1, json={"group_id": group_id, "user_id": u2, "role": "member"})
assert_ok(r, "add bob to group")
print("Bob added to group as member")
# 5) Alice creates a todo
todo_payload = {"owner_id": u1, "title": "Secret plan", "body": "Do not share yet"}
r = postgrest_call("POST", "/todo.todos", a1, json=todo_payload)
assert_ok(r, "create todo")
todo = r.json()[0]
todo_id = todo["id"]
print(f"Alice created todo {todo_id}")
# 6) Bob cannot see it yet
r = postgrest_call("GET", f"/todo.todos?id=eq.{todo_id}", a2)
assert_ok(r, "bob read check")
assert r.json() == [], "Bob should not see Alice's todo before share"
print("Verified: Bob cannot read unshared todo")
# 7) Alice shares read only with the group
share_payload = {"todo_id": todo_id, "group_id": group_id, "can_read": True, "can_write": False, "can_del": False}
r = postgrest_call("POST", "/todo.todo_group_shares", a1, json=share_payload)
assert_ok(r, "share todo read-only")
print("Todo shared (read-only) with group")
# 8) Bob can now read, but not update
r = postgrest_call("GET", f"/todo.todos?id=eq.{todo_id}", a2)
assert_ok(r, "bob read shared todo")
assert len(r.json()) == 1, "Bob should now see the shared todo"
print("Verified: Bob can read shared todo")
r = postgrest_call("PATCH", f"/todo.todos?id=eq.{todo_id}", a2, json={"title": "Bob tries edit"})
if 200 <= r.status_code < 300:
raise AssertionError("Bob should NOT be able to update with read-only share")
print("Verified: Bob cannot update with read-only share")
# 9) Alice upgrades to can_write
r = postgrest_call("PATCH", f"/todo.todo_group_shares?todo_id=eq.{todo_id}&group_id=eq.{group_id}", a1, json={"can_write": True})
assert_ok(r, "upgrade share to write")
print("Share upgraded: can_write=True")
# Bob updates successfully now
r = postgrest_call("PATCH", f"/todo.todos?id=eq.{todo_id}", a2, json={"title": "Bob edited title"})
assert_ok(r, "bob updates after write share")
print("Verified: Bob can update after write permission")
# 10) Delete tests: Bob still cannot delete
r = postgrest_call("DELETE", f"/todo.todos?id=eq.{todo_id}", a2)
if 200 <= r.status_code < 300:
raise AssertionError("Bob should NOT be able to delete without can_del")
print("Verified: Bob cannot delete without delete permission")
# Alice grants delete
r = postgrest_call("PATCH", f"/todo.todo_group_shares?todo_id=eq.{todo_id}&group_id=eq.{group_id}", a1, json={"can_del": True})
assert_ok(r, "upgrade share to delete")
print("Share upgraded: can_del=True")
# Bob deletes it now
r = postgrest_call("DELETE", f"/todo.todos?id=eq.{todo_id}", a2)
assert_ok(r, "bob deletes after delete share")
print("Verified: Bob can delete with delete permission")
print("\n✅ All RLS tests passed.")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"\n❌ Test failed: {e}", file=sys.stderr)
sys.exit(1)
Notes / wiring tips
- PostgREST
Ensure your PostgREST config includes:
db-anon-role = "web_anon"db-schemas = "app,todo"(andpublicif you want to exposerpc/current_user_idwithout schema prefix)- If your function is under
app, call/rpc/current_user_idonly if it’s in a listed schema or exposed viadb-extra-search-path.
- Keycloak
- The script uses Resource Owner Password Credentials for simplicity. On modern KC, make sure your client allows Direct Access Grants and your users are enabled.
- If you prefer authorization code + PKCE, you can swap the token function, but ROPC is easiest for headless tests.
- Auto-provision
/rpc/current_user_idboth resolves and creates theapp.usersrow for the current JWTsub—this bootstraps group ownership and todo ownership without any admin step. - Idempotency The test uses unique names via timestamp; you can re-run safely.
You could split into multiple migrations (e.g., 001_schemas.sql, 002_rls.sql, 003_triggers.sql), say the word and I’ll fan it out cleanly.