Topic-Based Routing¶
PGMQ supports topic-based message routing with wildcard patterns, similar to AMQP topic exchanges. This feature allows you to route messages to multiple queues based on routing keys and pattern matching.
Overview¶
Topic routing in PGMQ works by binding patterns to queues. When a message is sent with a routing key, it is delivered to all queues whose patterns match that key. This enables flexible publish-subscribe patterns and content-based routing.
Key Features¶
- Wildcard patterns: Use
*(one segment) and#(zero or more segments) for flexible matching - Multiple queue delivery: A single message can be routed to multiple queues
- Pattern-based subscriptions: Queues subscribe to message types via patterns
- Dry-run testing: Test routing without sending messages using
test_routing() - Precompiled regex: Patterns are compiled to regex at bind time for fast matching
- Automatic cleanup: Bindings are removed when queues are dropped (CASCADE)
How It Works¶
Routing Keys and Patterns¶
Routing keys and patterns use dot-separated segments:
-- Routing keys (used when sending messages)
'logs.api.error'
'orders.created'
'user.signup.completed'
-- Patterns (used when binding to queues)
'logs.*' -- Matches exactly one segment after 'logs.'
'logs.#' -- Matches zero or more segments after 'logs.'
'*.error' -- Matches any single segment before '.error'
'#.error' -- Matches zero or more segments before '.error'
'logs.*.error' -- Matches 'logs.{any}.error'
Wildcards¶
| Wildcard | Meaning | Example Pattern | Matches | Does Not Match |
|---|---|---|---|---|
* |
Exactly one segment | logs.* |
logs.error, logs.info |
logs, logs.api.error |
# |
Zero or more segments | logs.# |
logs.error, logs.api.error |
logs (no dot after) |
Binding Patterns to Queues¶
-- Create queues
SELECT pgmq.create('all_logs');
SELECT pgmq.create('error_logs');
SELECT pgmq.create('api_errors');
-- Bind patterns
SELECT pgmq.bind_topic('logs.#', 'all_logs'); -- All logs
SELECT pgmq.bind_topic('logs.*.error', 'error_logs'); -- Error logs from any service
SELECT pgmq.bind_topic('logs.api.error', 'api_errors'); -- Only API errors
Sending Messages with Routing Keys¶
-- This message routes to: all_logs, error_logs, api_errors (3 queues)
SELECT pgmq.send_topic('logs.api.error', '{"message": "API failed"}');
-- This message routes to: all_logs, error_logs (2 queues)
SELECT pgmq.send_topic('logs.db.error', '{"message": "DB connection failed"}');
-- This message routes to: all_logs only (1 queue)
SELECT pgmq.send_topic('logs.api.info', '{"message": "Request received"}');
API Reference¶
Binding Functions¶
pgmq.bind_topic(pattern, queue_name)¶
Bind a pattern to a queue. Messages matching the pattern will be routed to this queue.
Parameters:
pattern(text): The wildcard pattern to match routing keysqueue_name(text): Name of the queue to receive matching messages
Returns: void
Example:
SELECT pgmq.bind_topic('orders.#', 'order_events');
SELECT pgmq.bind_topic('orders.*.failed', 'failed_orders');
Notes:
- Patterns are validated before binding
- Binding the same pattern to the same queue is idempotent (no error, no duplicate)
- One pattern can be bound to multiple queues
- One queue can have multiple pattern bindings
pgmq.unbind_topic(pattern, queue_name)¶
Remove a pattern binding from a queue.
Parameters:
pattern(text): The pattern to unbindqueue_name(text): Name of the queue
Returns: boolean - true if a binding was removed, false if no binding existed
Example:
SELECT pgmq.unbind_topic('orders.#', 'order_events');
pgmq.list_topic_bindings()¶
Returns all topic bindings across all queues.
Returns: Table with columns:
pattern(text): The wildcard patternqueue_name(text): Name of the queue receiving matching messagesbound_at(timestamp with time zone): When the binding was createdcompiled_regex(text): The internal regex used for matching
Example:
SELECT * FROM pgmq.list_topic_bindings();
-- Returns:
-- pattern | queue_name | bound_at | compiled_regex
-- -------------+-------------+----------------------------+-------------------
-- orders.# | all_orders | 2024-01-15 10:30:00.000000 | ^orders\..*$
-- logs.*.error | error_logs | 2024-01-15 10:29:00.000000 | ^logs\.[^.]+\.error$
Notes:
- Results are ordered by
bound_atDESC (most recent first), then bypatternandqueue_name - Use this to audit or debug your topic routing configuration
pgmq.list_topic_bindings(queue_name)¶
Returns all topic bindings for a specific queue.
Parameters:
queue_name(text): Name of the queue
Returns: Table with columns:
pattern(text): The wildcard patternqueue_name(text): Name of the queue (always matches the parameter)bound_at(timestamp with time zone): When the binding was createdcompiled_regex(text): The internal regex used for matching
Example:
SELECT * FROM pgmq.list_topic_bindings('order_events');
-- Returns:
-- pattern | queue_name | bound_at | compiled_regex
-- ----------------+---------------+----------------------------+---------------------
-- orders.# | order_events | 2024-01-15 10:30:00.000000 | ^orders\..*$
-- orders.*.failed | order_events | 2024-01-15 10:28:00.000000 | ^orders\.[^.]+\.failed$
Notes:
- Results are ordered by
bound_atDESC (most recent first), then bypattern - Returns empty result if queue has no bindings
queue_namecolumn is included for consistency with the no-argument version
Sending Functions¶
pgmq.send_topic(routing_key, msg, headers, delay)¶
Send a message to all queues whose patterns match the routing key.
Parameters:
routing_key(text): The routing key for pattern matchingmsg(jsonb): The message payloadheaders(jsonb): Optional message headersdelay(integer): Delay in seconds before message becomes visible
Returns: integer - Number of queues the message was sent to
Example:
-- Full signature
SELECT pgmq.send_topic('orders.created', '{"order_id": 123}', '{"priority": "high"}', 0);
-- Simplified versions
SELECT pgmq.send_topic('orders.created', '{"order_id": 123}');
SELECT pgmq.send_topic('orders.created', '{"order_id": 123}', 5); -- 5 second delay
Notes:
- If no patterns match the routing key, the message is not sent (returns 0)
- If multiple patterns match, the message is sent to all matching queues
- All sends succeed or all fail (transactional)
pgmq.send_batch_topic(routing_key, msgs, headers, delay)¶
Send multiple messages to all queues whose patterns match the routing key.
Parameters:
routing_key(text): The routing key for pattern matchingmsgs(jsonb[]): Array of message payloads to sendheaders(jsonb[]): Optional array of headers for each messagedelay(integer): Delay in seconds before messages become visibledelay(timestamp with time zone): Timestamp when messages become visible
Returns: Table with columns:
- queue_name (text): Name of the queue that received messages
- msg_id (bigint): ID of each message that was sent
Example:
-- Send 3 messages to all matching queues
SELECT * FROM pgmq.send_batch_topic(
'orders.created',
ARRAY[
'{"order_id": 1, "amount": 100}',
'{"order_id": 2, "amount": 200}',
'{"order_id": 3, "amount": 300}'
]::jsonb[]
);
-- Returns (if 2 queues match the pattern):
-- queue_name | msg_id
-- ------------------+--------
-- order_processor | 1
-- order_processor | 2
-- order_processor | 3
-- order_analytics | 1
-- order_analytics | 2
-- order_analytics | 3
-- With headers
SELECT * FROM pgmq.send_batch_topic(
'notifications.email',
ARRAY['{"to": "user1@example.com"}', '{"to": "user2@example.com"}']::jsonb[],
ARRAY['{"priority": "high"}', '{"priority": "normal"}']::jsonb[]
);
-- Simplified versions
SELECT * FROM pgmq.send_batch_topic('logs.info', ARRAY['{"msg": "test"}']::jsonb[]);
SELECT * FROM pgmq.send_batch_topic('alerts.critical', ARRAY['{"alert": "down"}']::jsonb[], 60); -- 60 second delay
-- With timestamp delay (visible in 1 hour)
SELECT * FROM pgmq.send_batch_topic(
'scheduled.tasks',
ARRAY['{"task": "backup"}']::jsonb[],
CURRENT_TIMESTAMP + INTERVAL '1 hour'
);
Notes:
- Each message in the batch is sent to all matching queues
- If the routing key matches 2 queues and you send 3 messages, you get 6 total messages (3 per queue)
- Headers array length must exactly match messages array length when provided (not NULL). Empty headers arrays will fail validation if msgs is not empty. To send without headers, omit the parameter or pass NULL
- If no patterns match the routing key, returns an empty result set
- All sends succeed or all fail (transactional)
Testing Functions¶
pgmq.test_routing(routing_key)¶
Test which queues would receive a message with the given routing key, without actually sending a message.
Parameters:
routing_key(text): The routing key to test
Returns: Table with columns:
pattern(text): The matching patternqueue_name(text): The queue that would receive the messagecompiled_regex(text): The internal regex used for matching
Example:
SELECT *
FROM pgmq.test_routing('logs.api.error');
-- Returns:
-- pattern | queue_name | compiled_regex
-- -------------+-------------+-------------------
-- logs.# | all_logs | ^logs\..*$
-- logs.*.error | error_logs | ^logs\.[^.]+\.error$
Validation Functions¶
pgmq.validate_routing_key(routing_key)¶
Validate that a routing key is well-formed.
Valid routing keys:
- Alphanumeric characters, dots, hyphens, and underscores
- Cannot start or end with a dot
- Cannot contain consecutive dots
- Cannot contain wildcards (
*or#) - Maximum 255 characters
Example:
SELECT pgmq.validate_routing_key('logs.api.error'); -- Returns true
SELECT pgmq.validate_routing_key('logs..error'); -- Raises exception
pgmq.validate_topic_pattern(pattern)¶
Validate that a topic pattern is well-formed.
Valid patterns:
- Same rules as routing keys, plus
*and#wildcards - Cannot have consecutive wildcards (
**,##,*#,#*)
Example:
SELECT pgmq.validate_topic_pattern('logs.*.error'); -- Returns true
SELECT pgmq.validate_topic_pattern('logs.**'); -- Raises exception
Usage Patterns¶
1. Log Aggregation¶
Route logs to different queues based on severity and service:
-- Create queues
SELECT pgmq.create('logs_all');
SELECT pgmq.create('logs_errors');
SELECT pgmq.create('logs_critical');
SELECT pgmq.create('api_logs');
-- Bind patterns
SELECT pgmq.bind_topic('#', 'logs_all'); -- All logs
SELECT pgmq.bind_topic('*.error', 'logs_errors'); -- All errors
SELECT pgmq.bind_topic('*.critical', 'logs_critical'); -- All critical
SELECT pgmq.bind_topic('api.#', 'api_logs');
-- API service logs
-- Send logs
SELECT pgmq.send_topic('api.error', '{"msg": "API error"}');
-- Routes to: logs_all, logs_errors, api_logs
SELECT pgmq.send_topic('db.critical', '{"msg": "DB down"}');
-- Routes to: logs_all, logs_critical
2. Event Broadcasting (Fanout)¶
Send events to all interested consumers:
-- Create consumer queues
SELECT pgmq.create('notifications');
SELECT pgmq.create('analytics');
SELECT pgmq.create('audit_log');
-- All queues subscribe to everything
SELECT pgmq.bind_topic('#', 'notifications');
SELECT pgmq.bind_topic('#', 'analytics');
SELECT pgmq.bind_topic('#', 'audit_log');
-- Any event goes to all queues
SELECT pgmq.send_topic('user.signup', '{"user_id": 123}');
-- Routes to all 3 queues
3. Direct Routing (Exact Match)¶
Route specific events to specific queues:
-- Create queues
SELECT pgmq.create('user_events');
SELECT pgmq.create('order_events');
-- Bind exact patterns (no wildcards)
SELECT pgmq.bind_topic('user.created', 'user_events');
SELECT pgmq.bind_topic('user.updated', 'user_events');
SELECT pgmq.bind_topic('user.deleted', 'user_events');
SELECT pgmq.bind_topic('order.placed', 'order_events');
SELECT pgmq.bind_topic('order.shipped', 'order_events');
-- Messages route to exactly one queue
SELECT pgmq.send_topic('user.created', '{"user_id": 1}');
-- Routes to: user_events only
4. Geographic Routing¶
Route messages based on region:
-- Create regional queues
SELECT pgmq.create('events_us');
SELECT pgmq.create('events_eu');
SELECT pgmq.create('events_global');
-- Bind regional patterns
SELECT pgmq.bind_topic('us.#', 'events_us');
SELECT pgmq.bind_topic('eu.#', 'events_eu');
SELECT pgmq.bind_topic('#', 'events_global');
-- Route by region
SELECT pgmq.send_topic('us.orders.created', '{"order_id": 100}');
-- Routes to: events_us, events_global
SELECT pgmq.send_topic('eu.users.signup', '{"user_id": 50}');
-- Routes to: events_eu, events_global
5. Multi-Tenant Processing¶
Route messages to tenant-specific and shared queues:
-- Create queues
SELECT pgmq.create('tenant_acme');
SELECT pgmq.create('tenant_globex');
SELECT pgmq.create('all_tenants');
-- Bind patterns
SELECT pgmq.bind_topic('acme.#', 'tenant_acme');
SELECT pgmq.bind_topic('globex.#', 'tenant_globex');
SELECT pgmq.bind_topic('#', 'all_tenants');
-- Route by tenant
SELECT pgmq.send_topic('acme.orders.new', '{"order": "data"}');
-- Routes to: tenant_acme, all_tenants
6. Batch Message Publishing¶
Send multiple messages at once using send_batch_topic:
-- Create notification queues
SELECT pgmq.create('email_queue');
SELECT pgmq.create('sms_queue');
SELECT pgmq.create('all_notifications');
-- Bind patterns
SELECT pgmq.bind_topic('notify.email.#', 'email_queue');
SELECT pgmq.bind_topic('notify.sms.#', 'sms_queue');
SELECT pgmq.bind_topic('notify.#', 'all_notifications');
-- Send batch of email notifications
SELECT * FROM pgmq.send_batch_topic(
'notify.email.welcome',
ARRAY[
'{"to": "user1@example.com", "name": "Alice"}',
'{"to": "user2@example.com", "name": "Bob"}',
'{"to": "user3@example.com", "name": "Carol"}'
]::jsonb[]
);
-- Routes 3 messages each to: email_queue and all_notifications (6 total messages)
-- Send batch with headers for priority handling
SELECT * FROM pgmq.send_batch_topic(
'notify.sms.alert',
ARRAY['{"phone": "+1234567890", "msg": "Alert!"}']::jsonb[],
ARRAY['{"priority": "urgent"}']::jsonb[]
);
-- Routes to: sms_queue and all_notifications with priority header
-- Send batch with delay (e.g., scheduled notifications)
SELECT * FROM pgmq.send_batch_topic(
'notify.email.reminder',
ARRAY[
'{"to": "user@example.com", "subject": "Reminder"}',
'{"to": "admin@example.com", "subject": "Daily report"}'
]::jsonb[],
3600 -- Delay 1 hour
);
-- Messages become visible in 1 hour
Performance Considerations¶
Pattern Compilation¶
Patterns are compiled to regular expressions when bound, not at send time:
-- View all patterns with their compiled regex
SELECT pattern, queue_name, compiled_regex
FROM pgmq.list_topic_bindings();
This means:
- Binding is slightly slower (regex compilation)
- Sending is fast (uses precompiled regex)
- Many bindings = more patterns to check per send
Indexing¶
The topic_bindings table has a covering index for efficient pattern scanning:
-- Index includes queue_name for index-only scans
CREATE INDEX idx_topic_bindings_covering
ON pgmq.topic_bindings (pattern) INCLUDE (queue_name);
Examples¶
See examples/topics.sql for comprehensive usage examples including:
- Wildcard differences (
*vs#) - Topic-based routing
- Fanout pattern (broadcast)
- Direct routing (exact match)
- Dry-run testing