<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en">
	<id>https://tech.uvoo.io/index.php?action=history&amp;feed=atom&amp;title=RLS_example_4</id>
	<title>RLS example 4 - Revision history</title>
	<link rel="self" type="application/atom+xml" href="https://tech.uvoo.io/index.php?action=history&amp;feed=atom&amp;title=RLS_example_4"/>
	<link rel="alternate" type="text/html" href="https://tech.uvoo.io/index.php?title=RLS_example_4&amp;action=history"/>
	<updated>2026-04-24T04:39:42Z</updated>
	<subtitle>Revision history for this page on the wiki</subtitle>
	<generator>MediaWiki 1.35.2</generator>
	<entry>
		<id>https://tech.uvoo.io/index.php?title=RLS_example_4&amp;diff=5650&amp;oldid=prev</id>
		<title>Busk: Created page with &quot;**Go-Migrate SQL files** (from scratch) and a **Python test script** that logs into Keycloak, grabs JWTs, and exercises CRUD + RLS paths to prove the policy works.  ---  # mig...&quot;</title>
		<link rel="alternate" type="text/html" href="https://tech.uvoo.io/index.php?title=RLS_example_4&amp;diff=5650&amp;oldid=prev"/>
		<updated>2025-09-18T15:35:01Z</updated>

		<summary type="html">&lt;p&gt;Created page with &amp;quot;**Go-Migrate SQL files** (from scratch) and a **Python test script** that logs into Keycloak, grabs JWTs, and exercises CRUD + RLS paths to prove the policy works.  ---  # mig...&amp;quot;&lt;/p&gt;
&lt;p&gt;&lt;b&gt;New page&lt;/b&gt;&lt;/p&gt;&lt;div&gt;**Go-Migrate SQL files** (from scratch) and a **Python test script** that logs into Keycloak, grabs JWTs, and exercises CRUD + RLS paths to prove the policy works.&lt;br /&gt;
&lt;br /&gt;
---&lt;br /&gt;
&lt;br /&gt;
# migrations/001\_init.up.sql&lt;br /&gt;
&lt;br /&gt;
```sql&lt;br /&gt;
-- +goose Up&lt;br /&gt;
-- === Base setup: extensions, schemas, roles ===&lt;br /&gt;
create extension if not exists pgcrypto;  -- for gen_random_uuid()&lt;br /&gt;
&lt;br /&gt;
create schema if not exists app;&lt;br /&gt;
create schema if not exists todo;&lt;br /&gt;
&lt;br /&gt;
-- Roles model (owner of DB objects vs runtime role for PostgREST)&lt;br /&gt;
do $$&lt;br /&gt;
begin&lt;br /&gt;
  if not exists (select 1 from pg_roles where rolname = 'api_owner') then&lt;br /&gt;
    create role api_owner login;&lt;br /&gt;
  end if;&lt;br /&gt;
  if not exists (select 1 from pg_roles where rolname = 'web_anon') then&lt;br /&gt;
    create role web_anon nologin;&lt;br /&gt;
  end if;&lt;br /&gt;
end$$;&lt;br /&gt;
&lt;br /&gt;
grant usage on schema app, todo to web_anon;&lt;br /&gt;
&lt;br /&gt;
-- === Identity mapping: internal UUID ↔ external JWT sub ===&lt;br /&gt;
create table if not exists app.users (&lt;br /&gt;
  id                uuid primary key default gen_random_uuid(),&lt;br /&gt;
  external_subject  text not null unique, -- JWT sub (opaque; not guaranteed to be a UUID)&lt;br /&gt;
  display_name      text,&lt;br /&gt;
  email             text,&lt;br /&gt;
  is_active         boolean not null default true,&lt;br /&gt;
  created_at        timestamptz not null default now()&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create or replace function app.current_user_id()&lt;br /&gt;
returns uuid&lt;br /&gt;
stable&lt;br /&gt;
security definer&lt;br /&gt;
set search_path = public, pg_temp, app&lt;br /&gt;
language plpgsql&lt;br /&gt;
as $$&lt;br /&gt;
declare&lt;br /&gt;
  claims   jsonb;&lt;br /&gt;
  sub_txt  text;&lt;br /&gt;
  uid      uuid;&lt;br /&gt;
begin&lt;br /&gt;
  claims := nullif(current_setting('request.jwt.claims', true), '')::jsonb;&lt;br /&gt;
  if claims is null then&lt;br /&gt;
    return null;&lt;br /&gt;
  end if;&lt;br /&gt;
&lt;br /&gt;
  sub_txt := claims-&amp;gt;&amp;gt;'sub';&lt;br /&gt;
  if sub_txt is null then&lt;br /&gt;
    return null;&lt;br /&gt;
  end if;&lt;br /&gt;
&lt;br /&gt;
  -- Lookup existing active user&lt;br /&gt;
  select u.id into uid&lt;br /&gt;
  from app.users u&lt;br /&gt;
  where u.external_subject = sub_txt&lt;br /&gt;
    and u.is_active;&lt;br /&gt;
&lt;br /&gt;
  if uid is not null then&lt;br /&gt;
    return uid;&lt;br /&gt;
  end if;&lt;br /&gt;
&lt;br /&gt;
  -- Auto-provision on first use from claims&lt;br /&gt;
  insert into app.users (external_subject, display_name, email)&lt;br /&gt;
  values (&lt;br /&gt;
    sub_txt,&lt;br /&gt;
    coalesce(claims-&amp;gt;&amp;gt;'name', null),&lt;br /&gt;
    coalesce(claims-&amp;gt;&amp;gt;'email', null)&lt;br /&gt;
  )&lt;br /&gt;
  returning id into uid;&lt;br /&gt;
&lt;br /&gt;
  return uid;&lt;br /&gt;
end&lt;br /&gt;
$$;&lt;br /&gt;
&lt;br /&gt;
revoke all on function app.current_user_id() from public;&lt;br /&gt;
grant execute on function app.current_user_id() to web_anon;&lt;br /&gt;
&lt;br /&gt;
-- === Groups &amp;amp; membership ===&lt;br /&gt;
create table if not exists app.groups (&lt;br /&gt;
  id          uuid primary key default gen_random_uuid(),&lt;br /&gt;
  name        text not null unique,&lt;br /&gt;
  created_by  uuid not null references app.users(id) on delete restrict,&lt;br /&gt;
  created_at  timestamptz not null default now()&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create table if not exists app.group_members (&lt;br /&gt;
  group_id  uuid not null references app.groups(id) on delete cascade,&lt;br /&gt;
  user_id   uuid not null references app.users(id) on delete cascade,&lt;br /&gt;
  role      text not null check (role in ('owner','manager','member')),&lt;br /&gt;
  added_at  timestamptz not null default now(),&lt;br /&gt;
  primary key (group_id, user_id)&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create index if not exists idx_group_members_user on app.group_members(user_id);&lt;br /&gt;
create index if not exists idx_group_members_group on app.group_members(group_id);&lt;br /&gt;
&lt;br /&gt;
-- Automatically add the creator as owner of any newly created group&lt;br /&gt;
create or replace function app.add_creator_as_owner()&lt;br /&gt;
returns trigger language plpgsql as $$&lt;br /&gt;
begin&lt;br /&gt;
  insert into app.group_members(group_id, user_id, role)&lt;br /&gt;
  values (new.id, new.created_by, 'owner')&lt;br /&gt;
  on conflict do nothing;&lt;br /&gt;
  return new;&lt;br /&gt;
end$$;&lt;br /&gt;
&lt;br /&gt;
drop trigger if exists trg_groups_owner on app.groups;&lt;br /&gt;
create trigger trg_groups_owner&lt;br /&gt;
after insert on app.groups&lt;br /&gt;
for each row execute function app.add_creator_as_owner();&lt;br /&gt;
&lt;br /&gt;
-- === Todos &amp;amp; shares ===&lt;br /&gt;
create table if not exists todo.todos (&lt;br /&gt;
  id         uuid primary key default gen_random_uuid(),&lt;br /&gt;
  owner_id   uuid not null references app.users(id) on delete restrict,&lt;br /&gt;
  title      text not null,&lt;br /&gt;
  body       text,&lt;br /&gt;
  is_done    boolean not null default false,&lt;br /&gt;
  created_at timestamptz not null default now(),&lt;br /&gt;
  updated_at timestamptz not null default now()&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create index if not exists idx_todos_owner on todo.todos(owner_id);&lt;br /&gt;
&lt;br /&gt;
create table if not exists todo.todo_group_shares (&lt;br /&gt;
  todo_id   uuid not null references todo.todos(id) on delete cascade,&lt;br /&gt;
  group_id  uuid not null references app.groups(id) on delete cascade,&lt;br /&gt;
  can_read  boolean not null default true,&lt;br /&gt;
  can_write boolean not null default false,&lt;br /&gt;
  can_del   boolean not null default false,&lt;br /&gt;
  primary key (todo_id, group_id)&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create index if not exists idx_todo_shares_todo on todo.todo_group_shares(todo_id);&lt;br /&gt;
create index if not exists idx_todo_shares_group on todo.todo_group_shares(group_id);&lt;br /&gt;
&lt;br /&gt;
-- Maintain updated_at on todos&lt;br /&gt;
create or replace function todo.bump_updated_at()&lt;br /&gt;
returns trigger language plpgsql as $$&lt;br /&gt;
begin&lt;br /&gt;
  new.updated_at := now();&lt;br /&gt;
  return new;&lt;br /&gt;
end$$;&lt;br /&gt;
&lt;br /&gt;
drop trigger if exists trg_todos_updated_at on todo.todos;&lt;br /&gt;
create trigger trg_todos_updated_at&lt;br /&gt;
before update on todo.todos&lt;br /&gt;
for each row execute function todo.bump_updated_at();&lt;br /&gt;
&lt;br /&gt;
-- === RLS enablement ===&lt;br /&gt;
alter table app.users                 enable row level security;&lt;br /&gt;
alter table app.groups                enable row level security;&lt;br /&gt;
alter table app.group_members         enable row level security;&lt;br /&gt;
alter table todo.todos                enable row level security;&lt;br /&gt;
alter table todo.todo_group_shares    enable row level security;&lt;br /&gt;
&lt;br /&gt;
-- app.users: each user can see/update themselves&lt;br /&gt;
create policy users_select_self on app.users&lt;br /&gt;
for select&lt;br /&gt;
to web_anon&lt;br /&gt;
using (id = app.current_user_id());&lt;br /&gt;
&lt;br /&gt;
create policy users_update_self on app.users&lt;br /&gt;
for update&lt;br /&gt;
to web_anon&lt;br /&gt;
using (id = app.current_user_id())&lt;br /&gt;
with check (id = app.current_user_id());&lt;br /&gt;
&lt;br /&gt;
-- app.groups&lt;br /&gt;
create policy groups_insert_any on app.groups&lt;br /&gt;
for insert&lt;br /&gt;
to web_anon&lt;br /&gt;
with check (created_by = app.current_user_id());&lt;br /&gt;
&lt;br /&gt;
create policy groups_select_member on app.groups&lt;br /&gt;
for select&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1&lt;br /&gt;
    from app.group_members gm&lt;br /&gt;
    where gm.group_id = app.groups.id&lt;br /&gt;
      and gm.user_id = app.current_user_id()&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy groups_update_owner on app.groups&lt;br /&gt;
for update&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1 from app.group_members gm&lt;br /&gt;
    where gm.group_id = app.groups.id&lt;br /&gt;
      and gm.user_id = app.current_user_id()&lt;br /&gt;
      and gm.role = 'owner'&lt;br /&gt;
  )&lt;br /&gt;
)&lt;br /&gt;
with check (&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1 from app.group_members gm&lt;br /&gt;
    where gm.group_id = app.groups.id&lt;br /&gt;
      and gm.user_id = app.current_user_id()&lt;br /&gt;
      and gm.role = 'owner'&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy groups_delete_owner on app.groups&lt;br /&gt;
for delete&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1 from app.group_members gm&lt;br /&gt;
    where gm.group_id = app.groups.id&lt;br /&gt;
      and gm.user_id = app.current_user_id()&lt;br /&gt;
      and gm.role = 'owner'&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
-- app.group_members&lt;br /&gt;
create policy gm_select_member on app.group_members&lt;br /&gt;
for select&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1 from app.group_members gm2&lt;br /&gt;
    where gm2.group_id = group_members.group_id&lt;br /&gt;
      and gm2.user_id = app.current_user_id()&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy gm_insert_mgr on app.group_members&lt;br /&gt;
for insert&lt;br /&gt;
to web_anon&lt;br /&gt;
with check (&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1 from app.group_members gm2&lt;br /&gt;
    where gm2.group_id = group_members.group_id&lt;br /&gt;
      and gm2.user_id = app.current_user_id()&lt;br /&gt;
      and gm2.role in ('owner','manager')&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy gm_update_mgr on app.group_members&lt;br /&gt;
for update&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1 from app.group_members gm2&lt;br /&gt;
    where gm2.group_id = group_members.group_id&lt;br /&gt;
      and gm2.user_id = app.current_user_id()&lt;br /&gt;
      and gm2.role in ('owner','manager')&lt;br /&gt;
  )&lt;br /&gt;
)&lt;br /&gt;
with check (&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1 from app.group_members gm2&lt;br /&gt;
    where gm2.group_id = group_members.group_id&lt;br /&gt;
      and gm2.user_id = app.current_user_id()&lt;br /&gt;
      and gm2.role in ('owner','manager')&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy gm_delete_mgr_or_self on app.group_members&lt;br /&gt;
for delete&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  (user_id = app.current_user_id()) or&lt;br /&gt;
  exists (&lt;br /&gt;
    select 1 from app.group_members gm2&lt;br /&gt;
    where gm2.group_id = group_members.group_id&lt;br /&gt;
      and gm2.user_id = app.current_user_id()&lt;br /&gt;
      and gm2.role in ('owner','manager')&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
-- todo.todos&lt;br /&gt;
create policy todos_select_owner_or_shared on todo.todos&lt;br /&gt;
for select&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  owner_id = app.current_user_id()&lt;br /&gt;
  or exists (&lt;br /&gt;
    select 1&lt;br /&gt;
    from todo.todo_group_shares s&lt;br /&gt;
    join app.group_members gm&lt;br /&gt;
      on gm.group_id = s.group_id&lt;br /&gt;
     and gm.user_id  = app.current_user_id()&lt;br /&gt;
    where s.todo_id = todos.id&lt;br /&gt;
      and s.can_read&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy todos_insert_self on todo.todos&lt;br /&gt;
for insert&lt;br /&gt;
to web_anon&lt;br /&gt;
with check (owner_id = app.current_user_id());&lt;br /&gt;
&lt;br /&gt;
create policy todos_update_owner_or_shared on todo.todos&lt;br /&gt;
for update&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  owner_id = app.current_user_id()&lt;br /&gt;
  or exists (&lt;br /&gt;
    select 1&lt;br /&gt;
    from todo.todo_group_shares s&lt;br /&gt;
    join app.group_members gm&lt;br /&gt;
      on gm.group_id = s.group_id&lt;br /&gt;
     and gm.user_id  = app.current_user_id()&lt;br /&gt;
    where s.todo_id = todos.id&lt;br /&gt;
      and s.can_write&lt;br /&gt;
  )&lt;br /&gt;
)&lt;br /&gt;
with check (&lt;br /&gt;
  owner_id = app.current_user_id()&lt;br /&gt;
  or exists (&lt;br /&gt;
    select 1&lt;br /&gt;
    from todo.todo_group_shares s&lt;br /&gt;
    join app.group_members gm&lt;br /&gt;
      on gm.group_id = s.group_id&lt;br /&gt;
     and gm.user_id  = app.current_user_id()&lt;br /&gt;
    where s.todo_id = todos.id&lt;br /&gt;
      and s.can_write&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy todos_delete_owner_or_shared on todo.todos&lt;br /&gt;
for delete&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  owner_id = app.current_user_id()&lt;br /&gt;
  or exists (&lt;br /&gt;
    select 1&lt;br /&gt;
    from todo.todo_group_shares s&lt;br /&gt;
    join app.group_members gm&lt;br /&gt;
      on gm.group_id = s.group_id&lt;br /&gt;
     and gm.user_id  = app.current_user_id()&lt;br /&gt;
    where s.todo_id = todos.id&lt;br /&gt;
      and s.can_del&lt;br /&gt;
  )&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
-- todo.todo_group_shares&lt;br /&gt;
create policy shares_select_visible on todo.todo_group_shares&lt;br /&gt;
for select&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())&lt;br /&gt;
  or exists (select 1 from app.group_members gm where gm.group_id = todo_group_shares.group_id and gm.user_id = app.current_user_id())&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy shares_insert_todo_owner on todo.todo_group_shares&lt;br /&gt;
for insert&lt;br /&gt;
to web_anon&lt;br /&gt;
with check (&lt;br /&gt;
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy shares_update_todo_owner on todo.todo_group_shares&lt;br /&gt;
for update&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())&lt;br /&gt;
)&lt;br /&gt;
with check (&lt;br /&gt;
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
create policy shares_delete_todo_owner on todo.todo_group_shares&lt;br /&gt;
for delete&lt;br /&gt;
to web_anon&lt;br /&gt;
using (&lt;br /&gt;
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())&lt;br /&gt;
);&lt;br /&gt;
&lt;br /&gt;
-- Base DML privileges (RLS gates real access)&lt;br /&gt;
grant select, insert, update, delete on app.users                  to web_anon;&lt;br /&gt;
grant select, insert, update, delete on app.groups                 to web_anon;&lt;br /&gt;
grant select, insert, update, delete on app.group_members          to web_anon;&lt;br /&gt;
grant select, insert, update, delete on todo.todos                 to web_anon;&lt;br /&gt;
grant select, insert, update, delete on todo.todo_group_shares     to web_anon;&lt;br /&gt;
```&lt;br /&gt;
&lt;br /&gt;
---&lt;br /&gt;
&lt;br /&gt;
# migrations/001\_init.down.sql&lt;br /&gt;
&lt;br /&gt;
```sql&lt;br /&gt;
-- +goose Down&lt;br /&gt;
-- Drop in reverse dependency order&lt;br /&gt;
&lt;br /&gt;
-- Policies automatically dropped with tables; explicit drops are harmless if they existed.&lt;br /&gt;
drop trigger if exists trg_todos_updated_at on todo.todos;&lt;br /&gt;
drop function if exists todo.bump_updated_at();&lt;br /&gt;
&lt;br /&gt;
drop trigger if exists trg_groups_owner on app.groups;&lt;br /&gt;
drop function if exists app.add_creator_as_owner();&lt;br /&gt;
&lt;br /&gt;
-- Tables&lt;br /&gt;
drop table if exists todo.todo_group_shares cascade;&lt;br /&gt;
drop table if exists todo.todos cascade;&lt;br /&gt;
&lt;br /&gt;
drop table if exists app.group_members cascade;&lt;br /&gt;
drop table if exists app.groups cascade;&lt;br /&gt;
&lt;br /&gt;
-- Functions&lt;br /&gt;
drop function if exists app.current_user_id();&lt;br /&gt;
&lt;br /&gt;
-- Tables&lt;br /&gt;
drop table if exists app.users cascade;&lt;br /&gt;
&lt;br /&gt;
-- Schemas&lt;br /&gt;
drop schema if exists todo cascade;&lt;br /&gt;
drop schema if exists app cascade;&lt;br /&gt;
&lt;br /&gt;
-- (Optionally) keep roles; comment these if managed outside migrations&lt;br /&gt;
do $$&lt;br /&gt;
begin&lt;br /&gt;
  if exists (select 1 from pg_roles where rolname = 'web_anon') then&lt;br /&gt;
    revoke all on schema public from web_anon;&lt;br /&gt;
  end if;&lt;br /&gt;
end$$;&lt;br /&gt;
&lt;br /&gt;
-- Not dropping roles by default to avoid breaking external config (e.g., PostgREST).&lt;br /&gt;
-- Uncomment if desired:&lt;br /&gt;
-- drop role if exists web_anon;&lt;br /&gt;
-- drop role if exists api_owner;&lt;br /&gt;
```&lt;br /&gt;
&lt;br /&gt;
---&lt;br /&gt;
&lt;br /&gt;
# test\_keycloak\_postgrest\_rls.py&lt;br /&gt;
&lt;br /&gt;
End-to-end validator that:&lt;br /&gt;
&lt;br /&gt;
* logs in as two users (Alice &amp;amp; Bob) via Keycloak,&lt;br /&gt;
* auto-provisions `app.users` via `/rpc/current_user_id`,&lt;br /&gt;
* creates a group (auto-adds creator as owner),&lt;br /&gt;
* adds Bob as member,&lt;br /&gt;
* creates a todo,&lt;br /&gt;
* verifies read/write/delete controls via `todo.todo_group_shares`.&lt;br /&gt;
&lt;br /&gt;
```python&lt;br /&gt;
#!/usr/bin/env python3&lt;br /&gt;
&amp;quot;&amp;quot;&amp;quot;&lt;br /&gt;
E2E RLS test:&lt;br /&gt;
- Requires a PostgREST API exposing schemas &amp;quot;app&amp;quot; and &amp;quot;todo&amp;quot; and using web_anon role.&lt;br /&gt;
- Keycloak must have users whose credentials you supply.&lt;br /&gt;
- Works with public client (no secret) or confidential client (with secret).&lt;br /&gt;
&lt;br /&gt;
ENV VARS:&lt;br /&gt;
  API_BASE=&amp;quot;https://uapp-api.example.com&amp;quot;     # PostgREST base (no trailing slash)&lt;br /&gt;
  KC_BASE=&amp;quot;https://auth.example.com&amp;quot;          # Keycloak base (no trailing slash)&lt;br /&gt;
  KC_REALM=&amp;quot;myrealm&amp;quot;&lt;br /&gt;
  KC_CLIENT_ID=&amp;quot;myclient&amp;quot;&lt;br /&gt;
  KC_CLIENT_SECRET=&amp;quot;&amp;quot;                          # optional; leave empty for public client&lt;br /&gt;
  USER1=&amp;quot;alice@example.com&amp;quot;&lt;br /&gt;
  PASS1=&amp;quot;alice_password&amp;quot;&lt;br /&gt;
  USER2=&amp;quot;bob@example.com&amp;quot;&lt;br /&gt;
  PASS2=&amp;quot;bob_password&amp;quot;&lt;br /&gt;
  VERIFY_SSL=&amp;quot;true&amp;quot;                            # &amp;quot;false&amp;quot; to skip TLS verify (dev only)&lt;br /&gt;
&lt;br /&gt;
Run:&lt;br /&gt;
  python3 test_keycloak_postgrest_rls.py&lt;br /&gt;
&amp;quot;&amp;quot;&amp;quot;&lt;br /&gt;
&lt;br /&gt;
import os, sys, time, json, base64&lt;br /&gt;
import requests&lt;br /&gt;
from typing import Dict, Any&lt;br /&gt;
&lt;br /&gt;
API_BASE      = os.environ.get(&amp;quot;API_BASE&amp;quot;, &amp;quot;http://localhost:3000&amp;quot;).rstrip(&amp;quot;/&amp;quot;)&lt;br /&gt;
KC_BASE       = os.environ.get(&amp;quot;KC_BASE&amp;quot;, &amp;quot;http://localhost:8080&amp;quot;).rstrip(&amp;quot;/&amp;quot;)&lt;br /&gt;
KC_REALM      = os.environ.get(&amp;quot;KC_REALM&amp;quot;, &amp;quot;example&amp;quot;)&lt;br /&gt;
KC_CLIENT_ID  = os.environ.get(&amp;quot;KC_CLIENT_ID&amp;quot;, &amp;quot;postgrest-client&amp;quot;)&lt;br /&gt;
KC_CLIENT_SECRET = os.environ.get(&amp;quot;KC_CLIENT_SECRET&amp;quot;, &amp;quot;&amp;quot;)&lt;br /&gt;
USER1         = os.environ.get(&amp;quot;USER1&amp;quot;, &amp;quot;alice&amp;quot;)&lt;br /&gt;
PASS1         = os.environ.get(&amp;quot;PASS1&amp;quot;, &amp;quot;alice&amp;quot;)&lt;br /&gt;
USER2         = os.environ.get(&amp;quot;USER2&amp;quot;, &amp;quot;bob&amp;quot;)&lt;br /&gt;
PASS2         = os.environ.get(&amp;quot;PASS2&amp;quot;, &amp;quot;bob&amp;quot;)&lt;br /&gt;
VERIFY_SSL    = os.environ.get(&amp;quot;VERIFY_SSL&amp;quot;, &amp;quot;true&amp;quot;).lower() != &amp;quot;false&amp;quot;&lt;br /&gt;
&lt;br /&gt;
SESSION = requests.Session()&lt;br /&gt;
&lt;br /&gt;
def kc_token(username: str, password: str) -&amp;gt; Dict[str, Any]:&lt;br /&gt;
    url = f&amp;quot;{KC_BASE}/realms/{KC_REALM}/protocol/openid-connect/token&amp;quot;&lt;br /&gt;
    data = {&lt;br /&gt;
        &amp;quot;grant_type&amp;quot;: &amp;quot;password&amp;quot;,&lt;br /&gt;
        &amp;quot;client_id&amp;quot;: KC_CLIENT_ID,&lt;br /&gt;
        &amp;quot;username&amp;quot;: username,&lt;br /&gt;
        &amp;quot;password&amp;quot;: password,&lt;br /&gt;
    }&lt;br /&gt;
    if KC_CLIENT_SECRET:&lt;br /&gt;
        data[&amp;quot;client_secret&amp;quot;] = KC_CLIENT_SECRET&lt;br /&gt;
    r = SESSION.post(url, data=data, verify=VERIFY_SSL)&lt;br /&gt;
    if r.status_code != 200:&lt;br /&gt;
        raise RuntimeError(f&amp;quot;Keycloak token error {r.status_code}: {r.text}&amp;quot;)&lt;br /&gt;
    return r.json()&lt;br /&gt;
&lt;br /&gt;
def bearer(token: str) -&amp;gt; Dict[str,str]:&lt;br /&gt;
    return {&amp;quot;Authorization&amp;quot;: f&amp;quot;Bearer {token}&amp;quot;}&lt;br /&gt;
&lt;br /&gt;
def postgrest_call(method: str, path: str, token: str, **kwargs) -&amp;gt; requests.Response:&lt;br /&gt;
    url = f&amp;quot;{API_BASE}{path}&amp;quot;&lt;br /&gt;
    headers = kwargs.pop(&amp;quot;headers&amp;quot;, {})&lt;br /&gt;
    headers.update(bearer(token))&lt;br /&gt;
    # Let PostgREST return exact counts on list calls&lt;br /&gt;
    headers.setdefault(&amp;quot;Prefer&amp;quot;, &amp;quot;count=exact&amp;quot;)&lt;br /&gt;
    if &amp;quot;json&amp;quot; in kwargs and method.upper() in (&amp;quot;POST&amp;quot;,&amp;quot;PATCH&amp;quot;):&lt;br /&gt;
        headers.setdefault(&amp;quot;Content-Type&amp;quot;, &amp;quot;application/json&amp;quot;)&lt;br /&gt;
    return SESSION.request(method=method, url=url, headers=headers, verify=VERIFY_SSL, **kwargs)&lt;br /&gt;
&lt;br /&gt;
def jwt_sub(access_token: str) -&amp;gt; str:&lt;br /&gt;
    # Decode JWT payload (no verify) for logging only&lt;br /&gt;
    try:&lt;br /&gt;
        payload_b64 = access_token.split(&amp;quot;.&amp;quot;)[1]&lt;br /&gt;
        # pad&lt;br /&gt;
        payload_b64 += &amp;quot;=&amp;quot; * ((4 - len(payload_b64) % 4) % 4)&lt;br /&gt;
        payload = json.loads(base64.urlsafe_b64decode(payload_b64))&lt;br /&gt;
        return payload.get(&amp;quot;sub&amp;quot;, &amp;quot;&amp;quot;)&lt;br /&gt;
    except Exception:&lt;br /&gt;
        return &amp;quot;&amp;quot;&lt;br /&gt;
&lt;br /&gt;
def rpc_current_user_id(token: str) -&amp;gt; str:&lt;br /&gt;
    r = postgrest_call(&amp;quot;POST&amp;quot;, &amp;quot;/rpc/current_user_id&amp;quot;, token, json={})&lt;br /&gt;
    if r.status_code not in (200, 201):&lt;br /&gt;
        raise RuntimeError(f&amp;quot;/rpc/current_user_id failed: {r.status_code} {r.text}&amp;quot;)&lt;br /&gt;
    # PostgREST returns either raw UUID or {&amp;quot;current_user_id&amp;quot;: &amp;quot;...&amp;quot;} depending on cfg.&lt;br /&gt;
    try:&lt;br /&gt;
        j = r.json()&lt;br /&gt;
        if isinstance(j, dict) and &amp;quot;current_user_id&amp;quot; in j:&lt;br /&gt;
            return j[&amp;quot;current_user_id&amp;quot;]&lt;br /&gt;
        if isinstance(j, list) and j and &amp;quot;current_user_id&amp;quot; in j[0]:&lt;br /&gt;
            return j[0][&amp;quot;current_user_id&amp;quot;]&lt;br /&gt;
        if isinstance(j, str):&lt;br /&gt;
            return j&lt;br /&gt;
    except Exception:&lt;br /&gt;
        pass&lt;br /&gt;
    # text body?&lt;br /&gt;
    return r.text.strip().strip('&amp;quot;')&lt;br /&gt;
&lt;br /&gt;
def assert_ok(resp: requests.Response, msg=&amp;quot;expected 2xx&amp;quot;):&lt;br /&gt;
    if not (200 &amp;lt;= resp.status_code &amp;lt; 300):&lt;br /&gt;
        raise AssertionError(f&amp;quot;{msg}: got {resp.status_code} {resp.text}&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
def main():&lt;br /&gt;
    print(f&amp;quot;API_BASE={API_BASE}&amp;quot;)&lt;br /&gt;
    print(f&amp;quot;KC_BASE={KC_BASE} realm={KC_REALM} client_id={KC_CLIENT_ID} secret={'set' if bool(KC_CLIENT_SECRET) else 'unset'}&amp;quot;)&lt;br /&gt;
    print(f&amp;quot;VERIFY_SSL={VERIFY_SSL}&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 1) Login both users&lt;br /&gt;
    t1 = kc_token(USER1, PASS1)&lt;br /&gt;
    t2 = kc_token(USER2, PASS2)&lt;br /&gt;
    a1, a2 = t1[&amp;quot;access_token&amp;quot;], t2[&amp;quot;access_token&amp;quot;]&lt;br /&gt;
    print(f&amp;quot;Alice sub: {jwt_sub(a1)}&amp;quot;)&lt;br /&gt;
    print(f&amp;quot;Bob   sub: {jwt_sub(a2)}&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 2) Auto-provision/resolve current user IDs&lt;br /&gt;
    u1 = rpc_current_user_id(a1)&lt;br /&gt;
    u2 = rpc_current_user_id(a2)&lt;br /&gt;
    print(f&amp;quot;Alice user_id: {u1}&amp;quot;)&lt;br /&gt;
    print(f&amp;quot;Bob   user_id: {u2}&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 3) Alice creates a group (trigger makes her owner automatically)&lt;br /&gt;
    group_payload = {&amp;quot;name&amp;quot;: f&amp;quot;testgrp-{int(time.time())}&amp;quot;, &amp;quot;created_by&amp;quot;: u1}&lt;br /&gt;
    r = postgrest_call(&amp;quot;POST&amp;quot;, &amp;quot;/app.groups&amp;quot;, a1, json=group_payload)&lt;br /&gt;
    assert_ok(r, &amp;quot;create group&amp;quot;)&lt;br /&gt;
    group = r.json()[0]&lt;br /&gt;
    group_id = group[&amp;quot;id&amp;quot;]&lt;br /&gt;
    print(f&amp;quot;Created group {group_id}&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 4) Alice adds Bob as member (allowed because Alice is owner via trigger)&lt;br /&gt;
    r = postgrest_call(&amp;quot;POST&amp;quot;, &amp;quot;/app.group_members&amp;quot;, a1, json={&amp;quot;group_id&amp;quot;: group_id, &amp;quot;user_id&amp;quot;: u2, &amp;quot;role&amp;quot;: &amp;quot;member&amp;quot;})&lt;br /&gt;
    assert_ok(r, &amp;quot;add bob to group&amp;quot;)&lt;br /&gt;
    print(&amp;quot;Bob added to group as member&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 5) Alice creates a todo&lt;br /&gt;
    todo_payload = {&amp;quot;owner_id&amp;quot;: u1, &amp;quot;title&amp;quot;: &amp;quot;Secret plan&amp;quot;, &amp;quot;body&amp;quot;: &amp;quot;Do not share yet&amp;quot;}&lt;br /&gt;
    r = postgrest_call(&amp;quot;POST&amp;quot;, &amp;quot;/todo.todos&amp;quot;, a1, json=todo_payload)&lt;br /&gt;
    assert_ok(r, &amp;quot;create todo&amp;quot;)&lt;br /&gt;
    todo = r.json()[0]&lt;br /&gt;
    todo_id = todo[&amp;quot;id&amp;quot;]&lt;br /&gt;
    print(f&amp;quot;Alice created todo {todo_id}&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 6) Bob cannot see it yet&lt;br /&gt;
    r = postgrest_call(&amp;quot;GET&amp;quot;, f&amp;quot;/todo.todos?id=eq.{todo_id}&amp;quot;, a2)&lt;br /&gt;
    assert_ok(r, &amp;quot;bob read check&amp;quot;)&lt;br /&gt;
    assert r.json() == [], &amp;quot;Bob should not see Alice's todo before share&amp;quot;&lt;br /&gt;
    print(&amp;quot;Verified: Bob cannot read unshared todo&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 7) Alice shares read only with the group&lt;br /&gt;
    share_payload = {&amp;quot;todo_id&amp;quot;: todo_id, &amp;quot;group_id&amp;quot;: group_id, &amp;quot;can_read&amp;quot;: True, &amp;quot;can_write&amp;quot;: False, &amp;quot;can_del&amp;quot;: False}&lt;br /&gt;
    r = postgrest_call(&amp;quot;POST&amp;quot;, &amp;quot;/todo.todo_group_shares&amp;quot;, a1, json=share_payload)&lt;br /&gt;
    assert_ok(r, &amp;quot;share todo read-only&amp;quot;)&lt;br /&gt;
    print(&amp;quot;Todo shared (read-only) with group&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 8) Bob can now read, but not update&lt;br /&gt;
    r = postgrest_call(&amp;quot;GET&amp;quot;, f&amp;quot;/todo.todos?id=eq.{todo_id}&amp;quot;, a2)&lt;br /&gt;
    assert_ok(r, &amp;quot;bob read shared todo&amp;quot;)&lt;br /&gt;
    assert len(r.json()) == 1, &amp;quot;Bob should now see the shared todo&amp;quot;&lt;br /&gt;
    print(&amp;quot;Verified: Bob can read shared todo&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    r = postgrest_call(&amp;quot;PATCH&amp;quot;, f&amp;quot;/todo.todos?id=eq.{todo_id}&amp;quot;, a2, json={&amp;quot;title&amp;quot;: &amp;quot;Bob tries edit&amp;quot;})&lt;br /&gt;
    if 200 &amp;lt;= r.status_code &amp;lt; 300:&lt;br /&gt;
        raise AssertionError(&amp;quot;Bob should NOT be able to update with read-only share&amp;quot;)&lt;br /&gt;
    print(&amp;quot;Verified: Bob cannot update with read-only share&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 9) Alice upgrades to can_write&lt;br /&gt;
    r = postgrest_call(&amp;quot;PATCH&amp;quot;, f&amp;quot;/todo.todo_group_shares?todo_id=eq.{todo_id}&amp;amp;group_id=eq.{group_id}&amp;quot;, a1, json={&amp;quot;can_write&amp;quot;: True})&lt;br /&gt;
    assert_ok(r, &amp;quot;upgrade share to write&amp;quot;)&lt;br /&gt;
    print(&amp;quot;Share upgraded: can_write=True&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # Bob updates successfully now&lt;br /&gt;
    r = postgrest_call(&amp;quot;PATCH&amp;quot;, f&amp;quot;/todo.todos?id=eq.{todo_id}&amp;quot;, a2, json={&amp;quot;title&amp;quot;: &amp;quot;Bob edited title&amp;quot;})&lt;br /&gt;
    assert_ok(r, &amp;quot;bob updates after write share&amp;quot;)&lt;br /&gt;
    print(&amp;quot;Verified: Bob can update after write permission&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # 10) Delete tests: Bob still cannot delete&lt;br /&gt;
    r = postgrest_call(&amp;quot;DELETE&amp;quot;, f&amp;quot;/todo.todos?id=eq.{todo_id}&amp;quot;, a2)&lt;br /&gt;
    if 200 &amp;lt;= r.status_code &amp;lt; 300:&lt;br /&gt;
        raise AssertionError(&amp;quot;Bob should NOT be able to delete without can_del&amp;quot;)&lt;br /&gt;
    print(&amp;quot;Verified: Bob cannot delete without delete permission&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # Alice grants delete&lt;br /&gt;
    r = postgrest_call(&amp;quot;PATCH&amp;quot;, f&amp;quot;/todo.todo_group_shares?todo_id=eq.{todo_id}&amp;amp;group_id=eq.{group_id}&amp;quot;, a1, json={&amp;quot;can_del&amp;quot;: True})&lt;br /&gt;
    assert_ok(r, &amp;quot;upgrade share to delete&amp;quot;)&lt;br /&gt;
    print(&amp;quot;Share upgraded: can_del=True&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    # Bob deletes it now&lt;br /&gt;
    r = postgrest_call(&amp;quot;DELETE&amp;quot;, f&amp;quot;/todo.todos?id=eq.{todo_id}&amp;quot;, a2)&lt;br /&gt;
    assert_ok(r, &amp;quot;bob deletes after delete share&amp;quot;)&lt;br /&gt;
    print(&amp;quot;Verified: Bob can delete with delete permission&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
    print(&amp;quot;\n✅ All RLS tests passed.&amp;quot;)&lt;br /&gt;
&lt;br /&gt;
if __name__ == &amp;quot;__main__&amp;quot;:&lt;br /&gt;
    try:&lt;br /&gt;
        main()&lt;br /&gt;
    except Exception as e:&lt;br /&gt;
        print(f&amp;quot;\n❌ Test failed: {e}&amp;quot;, file=sys.stderr)&lt;br /&gt;
        sys.exit(1)&lt;br /&gt;
```&lt;br /&gt;
&lt;br /&gt;
---&lt;br /&gt;
&lt;br /&gt;
## Notes / wiring tips&lt;br /&gt;
&lt;br /&gt;
* **PostgREST**&lt;br /&gt;
  Ensure your PostgREST config includes:&lt;br /&gt;
&lt;br /&gt;
  * `db-anon-role = &amp;quot;web_anon&amp;quot;`&lt;br /&gt;
  * `db-schemas = &amp;quot;app,todo&amp;quot;` (and `public` if you want to expose `rpc/current_user_id` without schema prefix)&lt;br /&gt;
  * If your function is under `app`, call `/rpc/current_user_id` only if it’s in a listed schema or exposed via `db-extra-search-path`.&lt;br /&gt;
&lt;br /&gt;
* **Keycloak**&lt;br /&gt;
&lt;br /&gt;
  * The script uses **Resource Owner Password Credentials** for simplicity. On modern KC, make sure your client allows Direct Access Grants and your users are enabled.&lt;br /&gt;
  * If you prefer **authorization code + PKCE**, you can swap the token function, but ROPC is easiest for headless tests.&lt;br /&gt;
&lt;br /&gt;
* **Auto-provision**&lt;br /&gt;
  `/rpc/current_user_id` both resolves and creates the `app.users` row for the current JWT `sub`—this bootstraps group ownership and todo ownership without any admin step.&lt;br /&gt;
&lt;br /&gt;
* **Idempotency**&lt;br /&gt;
  The test uses unique names via timestamp; you can re-run safely.&lt;br /&gt;
&lt;br /&gt;
---&lt;br /&gt;
&lt;br /&gt;
You could split into multiple migrations (e.g., `001_schemas.sql`, `002_rls.sql`, `003_triggers.sql`), say the word and I’ll fan it out cleanly.&lt;/div&gt;</summary>
		<author><name>Busk</name></author>
	</entry>
</feed>