usability status: experimental
This project follows a Work → Right → Fast approach:
- make it work
- refine it
- optimize it
It's in the early stage, so the code may be rough/incomplete. Join us in building and improving it!
go install github.com/edgeflare/pgo@main # or make build or download from release pagePostgREST compatible REST API
pgo rest --config pkg/config/example.config.yamlSee godoc and pgo rest --help for more.
Benchmark results compared to PostgREST
| Metric | PGO REST | PostgREST | 
|---|---|---|
| VUs | 10,000 | 1,000 | 
| Requests/sec | 38,392 | 828 | 
| Avg Response Time | 241ms | 1.16s | 
| P95 Response Time | 299ms | 3.49s | 
| Error Rate | 0% | 0% | 
- Start Postgres, NATS, Kafka, MQTT broker and pgo pipeline as containers
git clone [email protected]:edgeflare/pgo.git
make image
# Set KAFKA_CFG_ADVERTISED_LISTENERS env var in docs/docker-compose.yaml to host IP for local access,
# as opposed to from within container network. adjust Kafka brokers IP in docs/pipeline-example.docker.yaml
make up # docker compose up- Postgres
- As a source: Create a test table, eg usersin source postgres database
PGUSER=postgres PGPASSWORD=secret PGHOST=localhost PGDATABASE=testdb psqlCREATE TABLE IF NOT EXISTS public.users (
  id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
  name TEXT
);
ALTER TABLE public.users REPLICA IDENTITY FULL;- As a sink
PGUSER=postgres PGPASSWORD=secret PGHOST=localhost PGDATABASE=testdb PGPORT=5431 psql- 
Create the same users table in sink database for mirroring. altering replica identity may not be needed in sink 
- 
Create a second table in sink database which stores transformed data 
CREATE SCHEMA IF NOT EXISTS another_schema;
CREATE TABLE IF NOT EXISTS  another_schema.transformed_users (
  uuid UUID DEFAULT gen_random_uuid(), -- because we're extracting only `name` field
  -- new_id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, -- to handle UPDATE operations, primary key column type must match in source and sink
  new_name TEXT
);pgo caches the table schemas for simpler parsing of CDC events (rows). To update pgo cache with newly created tables,
either docker restart pgo_pgo_1 or NOTIFY it to reload cache by executing on database
NOTIFY pgo, 'reload schema';- Subscribe
- MQTT: /any/prefix/schemaName/tableName/operationtopic (testing with mosquitto client)
mosquitto_sub -t pgo/public/users/c # operation: c=create, u=update, d=delete, r=read- Kafka: topic convention is [prefix].[schema_name].[table_name].[operation]. use any kafka client egkaf
kaf consume pgo.public.users.c --follow # consume messages until program execution- NATS:
nats sub -s nats://localhost:4222 'pgo.public.users.>' # wildcard. includes all nested parts
# nats sub -s nats://localhost:4222 'pgo.public.users.c' # specific- INSERT(or update etc) into users table
INSERT INTO users (name) VALUES ('alice');
INSERT INTO users (name) VALUES ('bob');And notice NATS, MQTT, Kafka, postgres-sink, or debug peer's respective subscriber receiving the message. It's not Postgres only source. Other peers too can be sources (not all peers fully functional yet).
Clean up
make downIt's also possible to import functions, etc around
- net/http.Handler
- router
- middleware (authentication, logging, CORS, RequestID, ...)
- Postgres middleware attaches a pgxpool.Conn to request context for authorized user; useful for RLS
 
If you're curious, start by browsing the examples, skimming over any doc.go, *.md files.
Please see CONTRIBUTING.md.
Apache License 2.0
