RLS example 4

From UVOO Tech Wiki
Jump to navigation Jump to search

Go-Migrate SQL files (from scratch) and a Python test script that logs into Keycloak, grabs JWTs, and exercises CRUD + RLS paths to prove the policy works.


migrations/001_init.up.sql

-- +goose Up
-- === Base setup: extensions, schemas, roles ===
create extension if not exists pgcrypto;  -- for gen_random_uuid()

create schema if not exists app;
create schema if not exists todo;

-- Roles model (owner of DB objects vs runtime role for PostgREST)
do $$
begin
  if not exists (select 1 from pg_roles where rolname = 'api_owner') then
    create role api_owner login;
  end if;
  if not exists (select 1 from pg_roles where rolname = 'web_anon') then
    create role web_anon nologin;
  end if;
end$$;

grant usage on schema app, todo to web_anon;

-- === Identity mapping: internal UUID ↔ external JWT sub ===
create table if not exists app.users (
  id                uuid primary key default gen_random_uuid(),
  external_subject  text not null unique, -- JWT sub (opaque; not guaranteed to be a UUID)
  display_name      text,
  email             text,
  is_active         boolean not null default true,
  created_at        timestamptz not null default now()
);

create or replace function app.current_user_id()
returns uuid
stable
security definer
set search_path = public, pg_temp, app
language plpgsql
as $$
declare
  claims   jsonb;
  sub_txt  text;
  uid      uuid;
begin
  claims := nullif(current_setting('request.jwt.claims', true), '')::jsonb;
  if claims is null then
    return null;
  end if;

  sub_txt := claims->>'sub';
  if sub_txt is null then
    return null;
  end if;

  -- Lookup existing active user
  select u.id into uid
  from app.users u
  where u.external_subject = sub_txt
    and u.is_active;

  if uid is not null then
    return uid;
  end if;

  -- Auto-provision on first use from claims
  insert into app.users (external_subject, display_name, email)
  values (
    sub_txt,
    coalesce(claims->>'name', null),
    coalesce(claims->>'email', null)
  )
  returning id into uid;

  return uid;
end
$$;

revoke all on function app.current_user_id() from public;
grant execute on function app.current_user_id() to web_anon;

-- === Groups & membership ===
create table if not exists app.groups (
  id          uuid primary key default gen_random_uuid(),
  name        text not null unique,
  created_by  uuid not null references app.users(id) on delete restrict,
  created_at  timestamptz not null default now()
);

create table if not exists app.group_members (
  group_id  uuid not null references app.groups(id) on delete cascade,
  user_id   uuid not null references app.users(id) on delete cascade,
  role      text not null check (role in ('owner','manager','member')),
  added_at  timestamptz not null default now(),
  primary key (group_id, user_id)
);

create index if not exists idx_group_members_user on app.group_members(user_id);
create index if not exists idx_group_members_group on app.group_members(group_id);

-- Automatically add the creator as owner of any newly created group
create or replace function app.add_creator_as_owner()
returns trigger language plpgsql as $$
begin
  insert into app.group_members(group_id, user_id, role)
  values (new.id, new.created_by, 'owner')
  on conflict do nothing;
  return new;
end$$;

drop trigger if exists trg_groups_owner on app.groups;
create trigger trg_groups_owner
after insert on app.groups
for each row execute function app.add_creator_as_owner();

-- === Todos & shares ===
create table if not exists todo.todos (
  id         uuid primary key default gen_random_uuid(),
  owner_id   uuid not null references app.users(id) on delete restrict,
  title      text not null,
  body       text,
  is_done    boolean not null default false,
  created_at timestamptz not null default now(),
  updated_at timestamptz not null default now()
);

create index if not exists idx_todos_owner on todo.todos(owner_id);

create table if not exists todo.todo_group_shares (
  todo_id   uuid not null references todo.todos(id) on delete cascade,
  group_id  uuid not null references app.groups(id) on delete cascade,
  can_read  boolean not null default true,
  can_write boolean not null default false,
  can_del   boolean not null default false,
  primary key (todo_id, group_id)
);

create index if not exists idx_todo_shares_todo on todo.todo_group_shares(todo_id);
create index if not exists idx_todo_shares_group on todo.todo_group_shares(group_id);

-- Maintain updated_at on todos
create or replace function todo.bump_updated_at()
returns trigger language plpgsql as $$
begin
  new.updated_at := now();
  return new;
end$$;

drop trigger if exists trg_todos_updated_at on todo.todos;
create trigger trg_todos_updated_at
before update on todo.todos
for each row execute function todo.bump_updated_at();

-- === RLS enablement ===
alter table app.users                 enable row level security;
alter table app.groups                enable row level security;
alter table app.group_members         enable row level security;
alter table todo.todos                enable row level security;
alter table todo.todo_group_shares    enable row level security;

-- app.users: each user can see/update themselves
create policy users_select_self on app.users
for select
to web_anon
using (id = app.current_user_id());

create policy users_update_self on app.users
for update
to web_anon
using (id = app.current_user_id())
with check (id = app.current_user_id());

-- app.groups
create policy groups_insert_any on app.groups
for insert
to web_anon
with check (created_by = app.current_user_id());

create policy groups_select_member on app.groups
for select
to web_anon
using (
  exists (
    select 1
    from app.group_members gm
    where gm.group_id = app.groups.id
      and gm.user_id = app.current_user_id()
  )
);

create policy groups_update_owner on app.groups
for update
to web_anon
using (
  exists (
    select 1 from app.group_members gm
    where gm.group_id = app.groups.id
      and gm.user_id = app.current_user_id()
      and gm.role = 'owner'
  )
)
with check (
  exists (
    select 1 from app.group_members gm
    where gm.group_id = app.groups.id
      and gm.user_id = app.current_user_id()
      and gm.role = 'owner'
  )
);

create policy groups_delete_owner on app.groups
for delete
to web_anon
using (
  exists (
    select 1 from app.group_members gm
    where gm.group_id = app.groups.id
      and gm.user_id = app.current_user_id()
      and gm.role = 'owner'
  )
);

-- app.group_members
create policy gm_select_member on app.group_members
for select
to web_anon
using (
  exists (
    select 1 from app.group_members gm2
    where gm2.group_id = group_members.group_id
      and gm2.user_id = app.current_user_id()
  )
);

create policy gm_insert_mgr on app.group_members
for insert
to web_anon
with check (
  exists (
    select 1 from app.group_members gm2
    where gm2.group_id = group_members.group_id
      and gm2.user_id = app.current_user_id()
      and gm2.role in ('owner','manager')
  )
);

create policy gm_update_mgr on app.group_members
for update
to web_anon
using (
  exists (
    select 1 from app.group_members gm2
    where gm2.group_id = group_members.group_id
      and gm2.user_id = app.current_user_id()
      and gm2.role in ('owner','manager')
  )
)
with check (
  exists (
    select 1 from app.group_members gm2
    where gm2.group_id = group_members.group_id
      and gm2.user_id = app.current_user_id()
      and gm2.role in ('owner','manager')
  )
);

create policy gm_delete_mgr_or_self on app.group_members
for delete
to web_anon
using (
  (user_id = app.current_user_id()) or
  exists (
    select 1 from app.group_members gm2
    where gm2.group_id = group_members.group_id
      and gm2.user_id = app.current_user_id()
      and gm2.role in ('owner','manager')
  )
);

-- todo.todos
create policy todos_select_owner_or_shared on todo.todos
for select
to web_anon
using (
  owner_id = app.current_user_id()
  or exists (
    select 1
    from todo.todo_group_shares s
    join app.group_members gm
      on gm.group_id = s.group_id
     and gm.user_id  = app.current_user_id()
    where s.todo_id = todos.id
      and s.can_read
  )
);

create policy todos_insert_self on todo.todos
for insert
to web_anon
with check (owner_id = app.current_user_id());

create policy todos_update_owner_or_shared on todo.todos
for update
to web_anon
using (
  owner_id = app.current_user_id()
  or exists (
    select 1
    from todo.todo_group_shares s
    join app.group_members gm
      on gm.group_id = s.group_id
     and gm.user_id  = app.current_user_id()
    where s.todo_id = todos.id
      and s.can_write
  )
)
with check (
  owner_id = app.current_user_id()
  or exists (
    select 1
    from todo.todo_group_shares s
    join app.group_members gm
      on gm.group_id = s.group_id
     and gm.user_id  = app.current_user_id()
    where s.todo_id = todos.id
      and s.can_write
  )
);

create policy todos_delete_owner_or_shared on todo.todos
for delete
to web_anon
using (
  owner_id = app.current_user_id()
  or exists (
    select 1
    from todo.todo_group_shares s
    join app.group_members gm
      on gm.group_id = s.group_id
     and gm.user_id  = app.current_user_id()
    where s.todo_id = todos.id
      and s.can_del
  )
);

-- todo.todo_group_shares
create policy shares_select_visible on todo.todo_group_shares
for select
to web_anon
using (
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
  or exists (select 1 from app.group_members gm where gm.group_id = todo_group_shares.group_id and gm.user_id = app.current_user_id())
);

create policy shares_insert_todo_owner on todo.todo_group_shares
for insert
to web_anon
with check (
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
);

create policy shares_update_todo_owner on todo.todo_group_shares
for update
to web_anon
using (
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
)
with check (
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
);

create policy shares_delete_todo_owner on todo.todo_group_shares
for delete
to web_anon
using (
  exists (select 1 from todo.todos t where t.id = todo_group_shares.todo_id and t.owner_id = app.current_user_id())
);

-- Base DML privileges (RLS gates real access)
grant select, insert, update, delete on app.users                  to web_anon;
grant select, insert, update, delete on app.groups                 to web_anon;
grant select, insert, update, delete on app.group_members          to web_anon;
grant select, insert, update, delete on todo.todos                 to web_anon;
grant select, insert, update, delete on todo.todo_group_shares     to web_anon;

migrations/001_init.down.sql

-- +goose Down
-- Drop in reverse dependency order

-- Policies automatically dropped with tables; explicit drops are harmless if they existed.
drop trigger if exists trg_todos_updated_at on todo.todos;
drop function if exists todo.bump_updated_at();

drop trigger if exists trg_groups_owner on app.groups;
drop function if exists app.add_creator_as_owner();

-- Tables
drop table if exists todo.todo_group_shares cascade;
drop table if exists todo.todos cascade;

drop table if exists app.group_members cascade;
drop table if exists app.groups cascade;

-- Functions
drop function if exists app.current_user_id();

-- Tables
drop table if exists app.users cascade;

-- Schemas
drop schema if exists todo cascade;
drop schema if exists app cascade;

-- (Optionally) keep roles; comment these if managed outside migrations
do $$
begin
  if exists (select 1 from pg_roles where rolname = 'web_anon') then
    revoke all on schema public from web_anon;
  end if;
end$$;

-- Not dropping roles by default to avoid breaking external config (e.g., PostgREST).
-- Uncomment if desired:
-- drop role if exists web_anon;
-- drop role if exists api_owner;

test_keycloak_postgrest_rls.py

End-to-end validator that:

  • logs in as two users (Alice & Bob) via Keycloak,
  • auto-provisions app.users via /rpc/current_user_id,
  • creates a group (auto-adds creator as owner),
  • adds Bob as member,
  • creates a todo,
  • verifies read/write/delete controls via todo.todo_group_shares.
#!/usr/bin/env python3
"""
E2E RLS test:
- Requires a PostgREST API exposing schemas "app" and "todo" and using web_anon role.
- Keycloak must have users whose credentials you supply.
- Works with public client (no secret) or confidential client (with secret).

ENV VARS:
  API_BASE="https://uapp-api.example.com"     # PostgREST base (no trailing slash)
  KC_BASE="https://auth.example.com"          # Keycloak base (no trailing slash)
  KC_REALM="myrealm"
  KC_CLIENT_ID="myclient"
  KC_CLIENT_SECRET=""                          # optional; leave empty for public client
  USER1="alice@example.com"
  PASS1="alice_password"
  USER2="bob@example.com"
  PASS2="bob_password"
  VERIFY_SSL="true"                            # "false" to skip TLS verify (dev only)

Run:
  python3 test_keycloak_postgrest_rls.py
"""

import os, sys, time, json, base64
import requests
from typing import Dict, Any

API_BASE      = os.environ.get("API_BASE", "http://localhost:3000").rstrip("/")
KC_BASE       = os.environ.get("KC_BASE", "http://localhost:8080").rstrip("/")
KC_REALM      = os.environ.get("KC_REALM", "example")
KC_CLIENT_ID  = os.environ.get("KC_CLIENT_ID", "postgrest-client")
KC_CLIENT_SECRET = os.environ.get("KC_CLIENT_SECRET", "")
USER1         = os.environ.get("USER1", "alice")
PASS1         = os.environ.get("PASS1", "alice")
USER2         = os.environ.get("USER2", "bob")
PASS2         = os.environ.get("PASS2", "bob")
VERIFY_SSL    = os.environ.get("VERIFY_SSL", "true").lower() != "false"

SESSION = requests.Session()

def kc_token(username: str, password: str) -> Dict[str, Any]:
    url = f"{KC_BASE}/realms/{KC_REALM}/protocol/openid-connect/token"
    data = {
        "grant_type": "password",
        "client_id": KC_CLIENT_ID,
        "username": username,
        "password": password,
    }
    if KC_CLIENT_SECRET:
        data["client_secret"] = KC_CLIENT_SECRET
    r = SESSION.post(url, data=data, verify=VERIFY_SSL)
    if r.status_code != 200:
        raise RuntimeError(f"Keycloak token error {r.status_code}: {r.text}")
    return r.json()

def bearer(token: str) -> Dict[str,str]:
    return {"Authorization": f"Bearer {token}"}

def postgrest_call(method: str, path: str, token: str, **kwargs) -> requests.Response:
    url = f"{API_BASE}{path}"
    headers = kwargs.pop("headers", {})
    headers.update(bearer(token))
    # Let PostgREST return exact counts on list calls
    headers.setdefault("Prefer", "count=exact")
    if "json" in kwargs and method.upper() in ("POST","PATCH"):
        headers.setdefault("Content-Type", "application/json")
    return SESSION.request(method=method, url=url, headers=headers, verify=VERIFY_SSL, **kwargs)

def jwt_sub(access_token: str) -> str:
    # Decode JWT payload (no verify) for logging only
    try:
        payload_b64 = access_token.split(".")[1]
        # pad
        payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4)
        payload = json.loads(base64.urlsafe_b64decode(payload_b64))
        return payload.get("sub", "")
    except Exception:
        return ""

def rpc_current_user_id(token: str) -> str:
    r = postgrest_call("POST", "/rpc/current_user_id", token, json={})
    if r.status_code not in (200, 201):
        raise RuntimeError(f"/rpc/current_user_id failed: {r.status_code} {r.text}")
    # PostgREST returns either raw UUID or {"current_user_id": "..."} depending on cfg.
    try:
        j = r.json()
        if isinstance(j, dict) and "current_user_id" in j:
            return j["current_user_id"]
        if isinstance(j, list) and j and "current_user_id" in j[0]:
            return j[0]["current_user_id"]
        if isinstance(j, str):
            return j
    except Exception:
        pass
    # text body?
    return r.text.strip().strip('"')

def assert_ok(resp: requests.Response, msg="expected 2xx"):
    if not (200 <= resp.status_code < 300):
        raise AssertionError(f"{msg}: got {resp.status_code} {resp.text}")

def main():
    print(f"API_BASE={API_BASE}")
    print(f"KC_BASE={KC_BASE} realm={KC_REALM} client_id={KC_CLIENT_ID} secret={'set' if bool(KC_CLIENT_SECRET) else 'unset'}")
    print(f"VERIFY_SSL={VERIFY_SSL}")

    # 1) Login both users
    t1 = kc_token(USER1, PASS1)
    t2 = kc_token(USER2, PASS2)
    a1, a2 = t1["access_token"], t2["access_token"]
    print(f"Alice sub: {jwt_sub(a1)}")
    print(f"Bob   sub: {jwt_sub(a2)}")

    # 2) Auto-provision/resolve current user IDs
    u1 = rpc_current_user_id(a1)
    u2 = rpc_current_user_id(a2)
    print(f"Alice user_id: {u1}")
    print(f"Bob   user_id: {u2}")

    # 3) Alice creates a group (trigger makes her owner automatically)
    group_payload = {"name": f"testgrp-{int(time.time())}", "created_by": u1}
    r = postgrest_call("POST", "/app.groups", a1, json=group_payload)
    assert_ok(r, "create group")
    group = r.json()[0]
    group_id = group["id"]
    print(f"Created group {group_id}")

    # 4) Alice adds Bob as member (allowed because Alice is owner via trigger)
    r = postgrest_call("POST", "/app.group_members", a1, json={"group_id": group_id, "user_id": u2, "role": "member"})
    assert_ok(r, "add bob to group")
    print("Bob added to group as member")

    # 5) Alice creates a todo
    todo_payload = {"owner_id": u1, "title": "Secret plan", "body": "Do not share yet"}
    r = postgrest_call("POST", "/todo.todos", a1, json=todo_payload)
    assert_ok(r, "create todo")
    todo = r.json()[0]
    todo_id = todo["id"]
    print(f"Alice created todo {todo_id}")

    # 6) Bob cannot see it yet
    r = postgrest_call("GET", f"/todo.todos?id=eq.{todo_id}", a2)
    assert_ok(r, "bob read check")
    assert r.json() == [], "Bob should not see Alice's todo before share"
    print("Verified: Bob cannot read unshared todo")

    # 7) Alice shares read only with the group
    share_payload = {"todo_id": todo_id, "group_id": group_id, "can_read": True, "can_write": False, "can_del": False}
    r = postgrest_call("POST", "/todo.todo_group_shares", a1, json=share_payload)
    assert_ok(r, "share todo read-only")
    print("Todo shared (read-only) with group")

    # 8) Bob can now read, but not update
    r = postgrest_call("GET", f"/todo.todos?id=eq.{todo_id}", a2)
    assert_ok(r, "bob read shared todo")
    assert len(r.json()) == 1, "Bob should now see the shared todo"
    print("Verified: Bob can read shared todo")

    r = postgrest_call("PATCH", f"/todo.todos?id=eq.{todo_id}", a2, json={"title": "Bob tries edit"})
    if 200 <= r.status_code < 300:
        raise AssertionError("Bob should NOT be able to update with read-only share")
    print("Verified: Bob cannot update with read-only share")

    # 9) Alice upgrades to can_write
    r = postgrest_call("PATCH", f"/todo.todo_group_shares?todo_id=eq.{todo_id}&group_id=eq.{group_id}", a1, json={"can_write": True})
    assert_ok(r, "upgrade share to write")
    print("Share upgraded: can_write=True")

    # Bob updates successfully now
    r = postgrest_call("PATCH", f"/todo.todos?id=eq.{todo_id}", a2, json={"title": "Bob edited title"})
    assert_ok(r, "bob updates after write share")
    print("Verified: Bob can update after write permission")

    # 10) Delete tests: Bob still cannot delete
    r = postgrest_call("DELETE", f"/todo.todos?id=eq.{todo_id}", a2)
    if 200 <= r.status_code < 300:
        raise AssertionError("Bob should NOT be able to delete without can_del")
    print("Verified: Bob cannot delete without delete permission")

    # Alice grants delete
    r = postgrest_call("PATCH", f"/todo.todo_group_shares?todo_id=eq.{todo_id}&group_id=eq.{group_id}", a1, json={"can_del": True})
    assert_ok(r, "upgrade share to delete")
    print("Share upgraded: can_del=True")

    # Bob deletes it now
    r = postgrest_call("DELETE", f"/todo.todos?id=eq.{todo_id}", a2)
    assert_ok(r, "bob deletes after delete share")
    print("Verified: Bob can delete with delete permission")

    print("\n✅ All RLS tests passed.")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"\n❌ Test failed: {e}", file=sys.stderr)
        sys.exit(1)

Notes / wiring tips

  • PostgREST Ensure your PostgREST config includes:
    • db-anon-role = "web_anon"
    • db-schemas = "app,todo" (and public if you want to expose rpc/current_user_id without schema prefix)
    • If your function is under app, call /rpc/current_user_id only if it’s in a listed schema or exposed via db-extra-search-path.
  • Keycloak
    • The script uses Resource Owner Password Credentials for simplicity. On modern KC, make sure your client allows Direct Access Grants and your users are enabled.
    • If you prefer authorization code + PKCE, you can swap the token function, but ROPC is easiest for headless tests.
  • Auto-provision /rpc/current_user_id both resolves and creates the app.users row for the current JWT sub—this bootstraps group ownership and todo ownership without any admin step.
  • Idempotency The test uses unique names via timestamp; you can re-run safely.

You could split into multiple migrations (e.g., 001_schemas.sql, 002_rls.sql, 003_triggers.sql), say the word and I’ll fan it out cleanly.