RLS example 4
Jump to navigation
Jump to search
Go-Migrate SQL files (from scratch) and a Python test script that logs into Keycloak, grabs JWTs, and exercises CRUD + RLS paths to prove the policy works.
migrations/001_init.up.sql
-- +goose Up -- === Base setup: extensions, schemas, roles === create extension if not exists pgcrypto; -- for gen_random_uuid() create schema if not exists app; create schema if not exists todo; -- Roles model (owner of DB objects vs runtime role for PostgREST) do $$ begin if not exists (select 1 from pg_roles where rolname = 'api_owner') then create role api_owner login; end if; if not exists (select 1 from pg_roles where rolname = 'web_anon') then create role web_anon nologin; end if; end$$; grant usage on schema app, todo to web_anon; -- === Identity mapping: internal UUID ↔ external JWT sub === create table if not exists app.users ( id uuid primary key default gen_random_uuid(), external_subject text not null unique, -- JWT sub (opaque; not guaranteed to be a UUID) display_name text, email text, is_active boolean not null default true, created_at timestamptz not null default now() ); create or replace function app.current_user_id() returns uuid stable security definer set search_path = public, pg_temp, app language plpgsql as $$ declare claims jsonb; sub_txt text; uid uuid; begin claims := nullif(current_setting('request.jwt.claims', true), '')::jsonb; if claims is null then return null; end if; sub_txt := claims->>'sub'; if sub_txt is null then return null; end if; -- Lookup existing active user select u.id into uid from app.users u where u.external_subject = sub_txt and u.is_active; if uid is not null then return uid; end if; -- Auto-provision on first use from claims insert into app.users (external_subject, display_name, email) values ( sub_txt, coalesce(claims->>'name', null), coalesce(claims->>'email', null) ) returning id into uid; return uid; end $$; revoke all on function app.current_user_id() from public; grant execute on function app.current_user_id() to web_anon; -- === Groups & membership === create table if not exists app.groups ( id uuid primary key default gen_random_uuid(), name text not null unique, created_by uuid not null references app.users(id) on delete restrict, created_at timestamptz not null default now() ); create table if not exists app.group_members ( group_id uuid not null references app.groups(id) on delete cascade, user_id uuid not null references app.users(id) on delete cascade, role text not null check (role in ('owner','manager','member')), added_at timestamptz not null default now(), primary key (group_id, user_id) ); create index if not exists idx_group_members_user on app.group_members(user_id); create index if not exists idx_group_members_group on app.group_members(group_id); -- Automatically add the creator as owner of any newly created group create or replace function app.add_creator_as_owner() returns trigger language plpgsql as $$ begin insert into app.group_members(group_id, user_id, role) values (new.id, new.created_by, 'owner') on conflict do nothing; return new; end$$; drop trigger if exists trg_groups_owner on app.groups; create trigger trg_groups_owner after insert on app.groups for each row execute function app.add_creator_as_owner(); -- === Todos & shares === create table if not exists todo.todos ( id uuid primary key default gen_random_uuid(), owner_id uuid not null references app.users(id) on delete restrict, title text not null, body text, is_done boolean not null default false, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ); create index if not exists idx_todos_owner on todo.todos(owner_id); create table if not exists todo.todo_group_shares ( todo_id uuid not null references todo.todos(id) on delete cascade, group_id uuid not null references app.groups(id) on delete cascade, can_read boolean not null default true, can_write boolean not null default false, can_del boolean not null default false, primary key (todo_id, group_id) ); create index if not exists idx_todo_shares_todo on todo.todo_group_shares(todo_id); create index if not exists idx_todo_shares_group on todo.todo_group_shares(group_id); -- Maintain updated_at on todos create or replace function todo.bump_updated_at() returns trigger language plpgsql as $$ begin new.updated_at := now(); return new; end$$; drop trigger if exists trg_todos_updated_at on todo.todos; create trigger trg_todos_updated_at before update on todo.todos for each row execute function todo.bump_updated_at(); -- === RLS enablement === alter table app.users enable row level security; alter table app.groups enable row level security; alter table app.group_members enable row level security; alter table todo.todos enable row level security; alter table todo.todo_group_shares enable row level security; -- app.users: each user can see/update themselves create policy users_select_self on app.users for select to web_anon using (id = app.current_user_id()); create policy users_update_self on app.users for update to web_anon using (id = app.current_user_id()) with check (id = app.current_user_id()); -- app.groups create policy groups_insert_any on app.groups for insert to web_anon with check (created_by = app.current_user_id()); create policy groups_select_member on app.groups for select to web_anon using ( exists ( select 1 from app.group_members gm where gm.group_id = app.groups.id and gm.user_id = app.current_user_id() ) ); create policy groups_update_owner on app.groups for update to web_anon using ( exists ( select 1 from app.group_members gm where gm.group_id = app.groups.id and gm.user_id = app.current_user_id() and gm.role = 'owner' ) ) with check ( exists ( select 1 from app.group_members gm where gm.group_id = app.groups.id and gm.user_id = app.current_user_id() and gm.role = 'owner' ) ); create policy groups_delete_owner on app.groups for delete to web_anon using ( exists ( select 1 from app.group_members gm where gm.group_id = app.groups.id and gm.user_id = app.current_user_id() and gm.role = 'owner' ) ); -- app.group_members create policy gm_select_member on app.group_members for select to web_anon using ( exists ( select 1 from app.group_members gm2 where gm2.group_id = group_members.group_id and gm2.user_id = app.current_user_id() ) ); create policy gm_insert_mgr on app.group_members for insert to web_anon with check ( exists ( select 1 from app.group_members gm2 where gm2.group_id = group_members.group_id and gm2.user_id = app.current_user_id() and gm2.role in ('owner','manager') ) ); create policy gm_update_mgr on app.group_members for update to web_anon using ( exists ( select 1 from app.group_members gm2 where gm2.group_id = group_members.group_id and gm2.user_id = app.current_user_id() and gm2.role in ('owner','manager') ) ) with check ( exists ( select 1 from app.group_members gm2 where gm2.group_id = group_members.group_id and gm2.user_id = app.current_user_id() and gm2.role in ('owner','manager') ) ); create policy gm_delete_mgr_or_self on app.group_members for delete to web_anon using ( (user_id = app.current_user_id()) or exists ( select 1 from app.group_members gm2 where gm2.group_id = group_members.group_id and gm2.user_id = app.current_user_id() and gm2.role in ('owner','manager') ) ); -- todo.todos create policy todos_select_owner_or_shared on todo.todos for select to web_anon using ( owner_id = app.current_user_id() or exists ( select 1 from todo.todo_group_shares s join app.group_members gm on gm.group_id = s.group_id and gm.user_id = app.current_user_id() where s.todo_id = todos.id and s.can_read ) ); create policy todos_insert_self on todo.todos for insert to web_anon with check (owner_id = app.current_user_id()); create policy todos_update_owner_or_shared on todo.todos for update to web_anon using ( owner_id = app.current_user_id() or exists ( select 1 from todo.todo_group_shares s join app.group_members gm on gm.group_id = s.group_id and gm.user_id = app.current_user_id() where s.todo_id = todos.id and s.can_write ) ) with check ( owner_id = app.current_user_id() or exists ( select 1 from todo.todo_group_shares s join app.group_members gm on gm.group_id = s.group_id and gm.user_id = app.current_user_id() where s.todo_id = todos.id and s.can_write ) ); create policy todos_delete_owner_or_shared on todo.todos for delete to web_anon using ( owner_id = app.current_user_id() or exists ( select 1 from todo.todo_group_shares s join app.group_members gm on gm.group_id = s.group_id and gm.user_id = app.current_user_id() where s.todo_id = todos.id and s.can_del ) ); -- todo.todo_group_shares create policy shares_select_visible on todo.todo_group_shares for select to web_anon using ( exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id()) or exists (select 1 from app.group_members gm where gm.group_id = todo_group_shares.group_id and gm.user_id = app.current_user_id()) ); create policy shares_insert_todo_owner on todo.todo_group_shares for insert to web_anon with check ( exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id()) ); create policy shares_update_todo_owner on todo.todo_group_shares for update to web_anon using ( exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id()) ) with check ( exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id()) ); create policy shares_delete_todo_owner on todo.todo_group_shares for delete to web_anon using ( exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id()) ); -- Base DML privileges (RLS gates real access) grant select, insert, update, delete on app.users to web_anon; grant select, insert, update, delete on app.groups to web_anon; grant select, insert, update, delete on app.group_members to web_anon; grant select, insert, update, delete on todo.todos to web_anon; grant select, insert, update, delete on todo.todo_group_shares to web_anon;
migrations/001_init.down.sql
-- +goose Down -- Drop in reverse dependency order -- Policies automatically dropped with tables; explicit drops are harmless if they existed. drop trigger if exists trg_todos_updated_at on todo.todos; drop function if exists todo.bump_updated_at(); drop trigger if exists trg_groups_owner on app.groups; drop function if exists app.add_creator_as_owner(); -- Tables drop table if exists todo.todo_group_shares cascade; drop table if exists todo.todos cascade; drop table if exists app.group_members cascade; drop table if exists app.groups cascade; -- Functions drop function if exists app.current_user_id(); -- Tables drop table if exists app.users cascade; -- Schemas drop schema if exists todo cascade; drop schema if exists app cascade; -- (Optionally) keep roles; comment these if managed outside migrations do $$ begin if exists (select 1 from pg_roles where rolname = 'web_anon') then revoke all on schema public from web_anon; end if; end$$; -- Not dropping roles by default to avoid breaking external config (e.g., PostgREST). -- Uncomment if desired: -- drop role if exists web_anon; -- drop role if exists api_owner;
test_keycloak_postgrest_rls.py
End-to-end validator that:
- logs in as two users (Alice & Bob) via Keycloak,
- auto-provisions
app.users
via/rpc/current_user_id
, - creates a group (auto-adds creator as owner),
- adds Bob as member,
- creates a todo,
- verifies read/write/delete controls via
todo.todo_group_shares
.
#!/usr/bin/env python3 """ E2E RLS test: - Requires a PostgREST API exposing schemas "app" and "todo" and using web_anon role. - Keycloak must have users whose credentials you supply. - Works with public client (no secret) or confidential client (with secret). ENV VARS: API_BASE="https://uapp-api.example.com" # PostgREST base (no trailing slash) KC_BASE="https://auth.example.com" # Keycloak base (no trailing slash) KC_REALM="myrealm" KC_CLIENT_ID="myclient" KC_CLIENT_SECRET="" # optional; leave empty for public client USER1="alice@example.com" PASS1="alice_password" USER2="bob@example.com" PASS2="bob_password" VERIFY_SSL="true" # "false" to skip TLS verify (dev only) Run: python3 test_keycloak_postgrest_rls.py """ import os, sys, time, json, base64 import requests from typing import Dict, Any API_BASE = os.environ.get("API_BASE", "http://localhost:3000").rstrip("/") KC_BASE = os.environ.get("KC_BASE", "http://localhost:8080").rstrip("/") KC_REALM = os.environ.get("KC_REALM", "example") KC_CLIENT_ID = os.environ.get("KC_CLIENT_ID", "postgrest-client") KC_CLIENT_SECRET = os.environ.get("KC_CLIENT_SECRET", "") USER1 = os.environ.get("USER1", "alice") PASS1 = os.environ.get("PASS1", "alice") USER2 = os.environ.get("USER2", "bob") PASS2 = os.environ.get("PASS2", "bob") VERIFY_SSL = os.environ.get("VERIFY_SSL", "true").lower() != "false" SESSION = requests.Session() def kc_token(username: str, password: str) -> Dict[str, Any]: url = f"{KC_BASE}/realms/{KC_REALM}/protocol/openid-connect/token" data = { "grant_type": "password", "client_id": KC_CLIENT_ID, "username": username, "password": password, } if KC_CLIENT_SECRET: data["client_secret"] = KC_CLIENT_SECRET r = SESSION.post(url, data=data, verify=VERIFY_SSL) if r.status_code != 200: raise RuntimeError(f"Keycloak token error {r.status_code}: {r.text}") return r.json() def bearer(token: str) -> Dict[str,str]: return {"Authorization": f"Bearer {token}"} def postgrest_call(method: str, path: str, token: str, **kwargs) -> requests.Response: url = f"{API_BASE}{path}" headers = kwargs.pop("headers", {}) headers.update(bearer(token)) # Let PostgREST return exact counts on list calls headers.setdefault("Prefer", "count=exact") if "json" in kwargs and method.upper() in ("POST","PATCH"): headers.setdefault("Content-Type", "application/json") return SESSION.request(method=method, url=url, headers=headers, verify=VERIFY_SSL, **kwargs) def jwt_sub(access_token: str) -> str: # Decode JWT payload (no verify) for logging only try: payload_b64 = access_token.split(".")[1] # pad payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4) payload = json.loads(base64.urlsafe_b64decode(payload_b64)) return payload.get("sub", "") except Exception: return "" def rpc_current_user_id(token: str) -> str: r = postgrest_call("POST", "/rpc/current_user_id", token, json={}) if r.status_code not in (200, 201): raise RuntimeError(f"/rpc/current_user_id failed: {r.status_code} {r.text}") # PostgREST returns either raw UUID or {"current_user_id": "..."} depending on cfg. try: j = r.json() if isinstance(j, dict) and "current_user_id" in j: return j["current_user_id"] if isinstance(j, list) and j and "current_user_id" in j[0]: return j[0]["current_user_id"] if isinstance(j, str): return j except Exception: pass # text body? return r.text.strip().strip('"') def assert_ok(resp: requests.Response, msg="expected 2xx"): if not (200 <= resp.status_code < 300): raise AssertionError(f"{msg}: got {resp.status_code} {resp.text}") def main(): print(f"API_BASE={API_BASE}") print(f"KC_BASE={KC_BASE} realm={KC_REALM} client_id={KC_CLIENT_ID} secret={'set' if bool(KC_CLIENT_SECRET) else 'unset'}") print(f"VERIFY_SSL={VERIFY_SSL}") # 1) Login both users t1 = kc_token(USER1, PASS1) t2 = kc_token(USER2, PASS2) a1, a2 = t1["access_token"], t2["access_token"] print(f"Alice sub: {jwt_sub(a1)}") print(f"Bob sub: {jwt_sub(a2)}") # 2) Auto-provision/resolve current user IDs u1 = rpc_current_user_id(a1) u2 = rpc_current_user_id(a2) print(f"Alice user_id: {u1}") print(f"Bob user_id: {u2}") # 3) Alice creates a group (trigger makes her owner automatically) group_payload = {"name": f"testgrp-{int(time.time())}", "created_by": u1} r = postgrest_call("POST", "/app.groups", a1, json=group_payload) assert_ok(r, "create group") group = r.json()[0] group_id = group["id"] print(f"Created group {group_id}") # 4) Alice adds Bob as member (allowed because Alice is owner via trigger) r = postgrest_call("POST", "/app.group_members", a1, json={"group_id": group_id, "user_id": u2, "role": "member"}) assert_ok(r, "add bob to group") print("Bob added to group as member") # 5) Alice creates a todo todo_payload = {"owner_id": u1, "title": "Secret plan", "body": "Do not share yet"} r = postgrest_call("POST", "/todo.todos", a1, json=todo_payload) assert_ok(r, "create todo") todo = r.json()[0] todo_id = todo["id"] print(f"Alice created todo {todo_id}") # 6) Bob cannot see it yet r = postgrest_call("GET", f"/todo.todos?id=eq.{todo_id}", a2) assert_ok(r, "bob read check") assert r.json() == [], "Bob should not see Alice's todo before share" print("Verified: Bob cannot read unshared todo") # 7) Alice shares read only with the group share_payload = {"todo_id": todo_id, "group_id": group_id, "can_read": True, "can_write": False, "can_del": False} r = postgrest_call("POST", "/todo.todo_group_shares", a1, json=share_payload) assert_ok(r, "share todo read-only") print("Todo shared (read-only) with group") # 8) Bob can now read, but not update r = postgrest_call("GET", f"/todo.todos?id=eq.{todo_id}", a2) assert_ok(r, "bob read shared todo") assert len(r.json()) == 1, "Bob should now see the shared todo" print("Verified: Bob can read shared todo") r = postgrest_call("PATCH", f"/todo.todos?id=eq.{todo_id}", a2, json={"title": "Bob tries edit"}) if 200 <= r.status_code < 300: raise AssertionError("Bob should NOT be able to update with read-only share") print("Verified: Bob cannot update with read-only share") # 9) Alice upgrades to can_write r = postgrest_call("PATCH", f"/todo.todo_group_shares?todo_id=eq.{todo_id}&group_id=eq.{group_id}", a1, json={"can_write": True}) assert_ok(r, "upgrade share to write") print("Share upgraded: can_write=True") # Bob updates successfully now r = postgrest_call("PATCH", f"/todo.todos?id=eq.{todo_id}", a2, json={"title": "Bob edited title"}) assert_ok(r, "bob updates after write share") print("Verified: Bob can update after write permission") # 10) Delete tests: Bob still cannot delete r = postgrest_call("DELETE", f"/todo.todos?id=eq.{todo_id}", a2) if 200 <= r.status_code < 300: raise AssertionError("Bob should NOT be able to delete without can_del") print("Verified: Bob cannot delete without delete permission") # Alice grants delete r = postgrest_call("PATCH", f"/todo.todo_group_shares?todo_id=eq.{todo_id}&group_id=eq.{group_id}", a1, json={"can_del": True}) assert_ok(r, "upgrade share to delete") print("Share upgraded: can_del=True") # Bob deletes it now r = postgrest_call("DELETE", f"/todo.todos?id=eq.{todo_id}", a2) assert_ok(r, "bob deletes after delete share") print("Verified: Bob can delete with delete permission") print("\n✅ All RLS tests passed.") if __name__ == "__main__": try: main() except Exception as e: print(f"\n❌ Test failed: {e}", file=sys.stderr) sys.exit(1)
Notes / wiring tips
- PostgREST
Ensure your PostgREST config includes:
db-anon-role = "web_anon"
db-schemas = "app,todo"
(andpublic
if you want to exposerpc/current_user_id
without schema prefix)- If your function is under
app
, call/rpc/current_user_id
only if it’s in a listed schema or exposed viadb-extra-search-path
.
- Keycloak
- The script uses Resource Owner Password Credentials for simplicity. On modern KC, make sure your client allows Direct Access Grants and your users are enabled.
- If you prefer authorization code + PKCE, you can swap the token function, but ROPC is easiest for headless tests.
- Auto-provision
/rpc/current_user_id
both resolves and creates theapp.users
row for the current JWTsub
—this bootstraps group ownership and todo ownership without any admin step. - Idempotency The test uses unique names via timestamp; you can re-run safely.
You could split into multiple migrations (e.g., 001_schemas.sql
, 002_rls.sql
, 003_triggers.sql
), say the word and I’ll fan it out cleanly.