Transactions¶
All write operations in pgmq are transactional by default thanks to the built-in decorators. You can also compose multiple operations into a single transaction by passing a connection object manually.
Transaction Decorators¶
Four decorators are provided, one for each client backend:
from pgmq.decorators import (
transaction, # psycopg sync
async_transaction, # asyncpg
sqlalchemy_transaction, # SQLAlchemy sync
sqlalchemy_async_transaction, # SQLAlchemy async
)
How Decorators Work¶
- Check if
connis already provided inkwargs. - If not, acquire a connection (and start a transaction) from the pool or engine.
- Inject
conninto the function. - Commit on success, rollback on exception.
Example: Automatic Transaction¶
from pgmq import PGMQueue
from pgmq.decorators import transaction
queue = PGMQueue()
@transaction
def send_and_read(queue, conn=None):
queue.create_queue("tx_queue", conn=conn)
msg_id = queue.send("tx_queue", {"hello": "world"}, conn=conn)
msg = queue.read("tx_queue", vt=30, conn=conn)
return msg
msg = send_and_read(queue)
Example: Async Transaction¶
from pgmq import AsyncPGMQueue
from pgmq.decorators import async_transaction
queue = AsyncPGMQueue()
await queue.init()
@async_transaction
async def send_and_read(queue, conn=None):
await queue.create_queue("tx_queue", conn=conn)
msg_id = await queue.send("tx_queue", {"hello": "world"}, conn=conn)
msg = await queue.read("tx_queue", vt=30, conn=conn)
return msg
msg = await send_and_read(queue)
Manual Connection Passing¶
You can bypass decorators and manage transactions yourself by passing conn to each operation:
with queue.pool.connection() as conn:
with conn.transaction():
queue.send("my_queue", {"a": 1}, conn=conn)
queue.send("my_queue", {"b": 2}, conn=conn)
For SQLAlchemy:
with queue.engine.begin() as conn:
queue.send("my_queue", {"a": 1}, conn=conn)
queue.send("my_queue", {"b": 2}, conn=conn)
Nested Transactions¶
Decorators detect when conn is already present and will not start a new transaction, allowing safe nesting:
@transaction
def outer(queue, conn=None):
inner(queue, conn=conn) # reuses the same connection
@transaction
def inner(queue, conn=None):
queue.send("my_queue", {"nested": True}, conn=conn)
Read-Only Operations¶
Methods that do not modify data (e.g., list_queues, metrics, test_routing) typically do not use the transaction decorator. You can still pass conn if you want them to run inside your managed transaction scope.