SQLAlchemy connection pool within multiple threads and processes
- 7 minutes
- sqlalchemy, postgres, python, gunicorn
SQLAlchemy and Postgres are a very popular choice for python applications needing a database. While using them in the context of a python WSGI web application, I’ve often encountered the same kinds of bugs, related to connection pooling, using the default configuration in SQLAlchemy. WSGI servers will use multiple threads and/or processes for better performance and using connection pools in this context requires understanding subtle but fundamental configurations that can lead to very bad production errors.
SQLAlchemy’s documentation is very good and detailed on this subject, and I encourage you to read it. Meanwhile, in this blog post, I want to demonstrate with examples a couple pitfalls I ran into, and suggest solutions on how to fix them.
To run these examples, I used a Postgres instance launched with this docker command:
docker run --rm -d -p 5432:5432 postgres:11-alpine
How the default pooling works
The default pooling implementation is the QueuePool. When an engine is instantiated, a QueuePool
is also instantiated. At this point, no connections are actually created: connections are only created when first used.
So when we do something like:
session = Session()
session.query(SomeObject).all()
session.close()
A connection is established using the underlying engine and added to the pool. The connection is then in ‘idle’ state, because we haven’t yet reached pool_size
, and it’s ready to be reused. A subsequent query using the same engine would use the same connection.
This means that if no concurrent connections are needed, only one connection will ever be open in the database. This is why SQLAlchemy uses this kind of connection pooling by default: it will not use any more resources than actually requested by the application.
The important point is that Session
, QueuePool
and Engine
are simple python objects, which ultimately hold onto DBAPI connections (database connections). When the same connection is accessed by more than one of these objects, this is when things go wrong.
Using multiple threads
Threads in python share memory: so if an object is modified in a thread, this change is reflected in the other. An object or a function is said to be thread-safe if concurrent access or modification by different threads is possible and the behavior is still normal.
Every pool implementation in SQLAlchemy is thread safe, including the default QueuePool
. This means that 2 threads requesting a connection simultaneously will checkout 2 different connections. By extension, an engine will also be thread-safe. Most of the time in a web application, you would want to share the pool with all the threads in the same process, so that the maximum number of opened connections doesn’t depend on the number of threads configured in your WSGI server.
Within a web application, it is common practice to associate the management of a session with the lifetime of the web request. See Using Thread-Local Scope with Web Applications.
However, both the Connection
object and the Session
object are not thread-safe. Concurrent usage of any in multiple threads will lead to race conditions.
How multiple processes can go wrong
Processes don’t share memory in python (well, not before python 3.8, but that’s another story). There is although an important thing to consider: when the process is forked, the default multiprocessing mode on Unix, every object in memory at that point is copied. This means that if an object holds a reference to a resource (say, a database connection), this reference is also copied, so 2 things can happen:
- The 2 processes will use the same connection concurrently, and the reponses could get mixed up
- One process will close the connection, and the other will try to use it, leading to an exception raised
Therefore, it’s very important to make sure that any SQLAlchemy object is not copied to multiple processes. Some WSGI servers have a configuration where the wsgi app is created before the worker processes are forked. For example, in gunicorn, the preload
argument will do just that:
An example using flask and gunicorn
Here is a sample app that uses flask. Notice I explicitly set the pool_size
to 5 and the max_overflow
to 10, but these are the default arguments when nothing is provided to the create_engine
function. This means that no more than 15 connections can be opened at the same time using this engine instance (with 5 of them staying idle when not in use, and 10 of them being discarded when released)
# gunicorn-example.py
from typing import Dict, List
from flask import Flask, jsonify
from sqlalchemy import create_engine, Integer, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from psycopg2 import connect
host = "192.168.99.201"
user = "postgres"
password = "postgres"
db = "postgres"
db_url = f"postgresql://{user}:{password}@{host}/{db}"
engine = create_engine(
db_url,
pool_size=5, # default in SQLAlchemy
max_overflow=10, # default in SQLAlchemy
pool_timeout=1, # raise an error faster than default
)
Session = sessionmaker(bind=engine)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
Base.metadata.create_all(engine)
def get_connections() -> List[Dict]:
"""Return information about connections"""
sql = """
SELECT
pid,
state
FROM pg_stat_activity
WHERE datname = 'postgres'
AND query NOT LIKE '%%FROM pg_stat_activity%%'
"""
connection = connect(db_url)
cursor = connection.cursor()
cursor.execute(sql)
connections = [
{"pid": r[0], "state": r[1]} for r in cursor.fetchall()
]
cursor.close()
connection.close()
return connections
def get_pool_info() -> Dict:
"""Get information about the current connections and pool"""
return {
"postgres connections": get_connections(),
"pool id": id(engine.pool),
"connections in current pool": (
engine.pool.checkedin() + engine.pool.checkedout()
),
}
app = Flask(__name__)
# Create a session and make a query.
# The connection created is now part of the pool.
session = Session()
session.query(User).all()
session.close()
@app.route("/pool")
def pool():
return jsonify(get_pool_info())
@app.route("/make_query")
def make_query():
session = Session()
session.query(User).all()
session.close()
return jsonify(get_pool_info())
@app.route("/dispose")
def dispose():
engine.pool.dispose()
return jsonify(get_pool_info())
If we call this app using:
gunicorn --workers=2 gunicorn-example:app
the 2 gunicorn workers will not share pools or connections. The behavior is the one expected: each worker will get its own pool, containing a single connection, because only one request is made at a time for each worker.
But if we use:
gunicorn --workers=2 --preload gunicorn-example:app
This script is imported before the forking takes place. The fact that we made a query before forking is problematic, because now the duplicated pool inside the 2 workers think they are responsible for the same connection.
When we request the /pool
route, we get exactly the same result (with the same pool object id) for every call to our app, despite the fact that we have 2 workers:
{
"connections in current pool": 1,
"pool id": 139763488061256,
"postgres connections": [
{
"pid": 1906,
"state": "idle"
}
]
}
Requesting /make_query
reuses the same connection, no matter which gunicorn worker gets the request.
To imitate what would happen if a connection was discarded in the database, we can request the /dispose
route. The connection gets released in the worker that received the request, and postgres discards it. But the other worker still has a reference to it. So trying to request /make_request
with this worker will raise an exception that looks like:
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
This error will be intermittent, and hard to reproduce.
Here are the requests to make to recreate this error for this example:
import requests
url = "http://127.0.0.1:8000"
r = requests.get(f"{url}/dispose")
n_workers = 2
for _ in range(n_workers):
r = requests.get(f"{url}/make_query")
r.raise_for_status()
Takeaways and solutions
Ultimately, the important fact is that you must not use the same connections from multiple threads or processes. To make sure this is not the case, here are a couple things you might want to try:
- Notice when the engine creation takes place: you most likely want to have it shared as a global object after forking.
- Notice when forking takes place. This can depend which libraries you are using.
- If your application forks itself, see it you can call the
dispose()
method on your engine before forking, which will release all of the connections in the pool. - Use an event listener on checkout to verify the connection is still in the same process it was when it was created. Like the example given in the second method in the section Using Connection Pools with Multiprocessing
- Use a NullPool as a compromise, which creates and closes the DB-API connection every time.