SQLAlchemy connection pool within multiple threads and processes

SQLAlchemy and Postgres are a very popular choice for python applications needing a database. While using them in the context of a python WSGI web application, I’ve often encountered the same kinds of bugs, related to connection pooling, using the default configuration in SQLAlchemy. WSGI servers will use multiple threads and/or processes for better performance and using connection pools in this context requires understanding subtle but fundamental configurations that can lead to very bad production errors.

SQLAlchemy’s documentation is very good and detailed on this subject, and I encourage you to read it. Meanwhile, in this blog post, I want to demonstrate with examples a couple pitfalls I ran into, and suggest solutions on how to fix them.

To run these examples, I used a Postgres instance launched with this docker command:

docker run --rm -d -p 5432:5432 postgres:11-alpine

How the default pooling works

The default pooling implementation is the QueuePool. When an engine is instantiated, a QueuePool is also instantiated. At this point, no connections are actually created: connections are only created when first used.

So when we do something like:

session = Session()
session.query(SomeObject).all()
session.close()

A connection is established using the underlying engine and added to the pool. The connection is then in ‘idle’ state, because we haven’t yet reached pool_size, and it’s ready to be reused. A subsequent query using the same engine would use the same connection.

This means that if no concurrent connections are needed, only one connection will ever be open in the database. This is why SQLAlchemy uses this kind of connection pooling by default: it will not use any more resources than actually requested by the application.

The important point is that Session, QueuePool and Engine are simple python objects, which ultimately hold onto DBAPI connections (database connections). When the same connection is accessed by more than one of these objects, this is when things go wrong.

Using multiple threads

Threads in python share memory: so if an object is modified in a thread, this change is reflected in the other. An object or a function is said to be thread-safe if concurrent access or modification by different threads is possible and the behavior is still normal.

Every pool implementation in SQLAlchemy is thread safe, including the default QueuePool. This means that 2 threads requesting a connection simultaneously will checkout 2 different connections. By extension, an engine will also be thread-safe. Most of the time in a web application, you would want to share the pool with all the threads in the same process, so that the maximum number of opened connections doesn’t depend on the number of threads configured in your WSGI server.

Within a web application, it is common practice to associate the management of a session with the lifetime of the web request. See Using Thread-Local Scope with Web Applications.

However, both the Connection object and the Session object are not thread-safe. Concurrent usage of any in multiple threads will lead to race conditions.

How multiple processes can go wrong

Processes don’t share memory in python (well, not before python 3.8, but that’s another story). There is although an important thing to consider: when the process is forked, the default multiprocessing mode on Unix, every object in memory at that point is copied. This means that if an object holds a reference to a resource (say, a database connection), this reference is also copied, so 2 things can happen:

  • The 2 processes will use the same connection concurrently, and the reponses could get mixed up
  • One process will close the connection, and the other will try to use it, leading to an exception raised

Therefore, it’s very important to make sure that any SQLAlchemy object is not copied to multiple processes. Some WSGI servers have a configuration where the wsgi app is created before the worker processes are forked. For example, in gunicorn, the preload argument will do just that:

An example using flask and gunicorn

Here is a sample app that uses flask. Notice I explicitly set the pool_size to 5 and the max_overflow to 10, but these are the default arguments when nothing is provided to the create_engine function. This means that no more than 15 connections can be opened at the same time using this engine instance (with 5 of them staying idle when not in use, and 10 of them being discarded when released)

# gunicorn-example.py

from typing import Dict, List

from flask import Flask, jsonify
from sqlalchemy import create_engine, Integer, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from psycopg2 import connect


host = "192.168.99.201"
user = "postgres"
password = "postgres"
db = "postgres"
db_url = f"postgresql://{user}:{password}@{host}/{db}"

engine = create_engine(
    db_url,
    pool_size=5,     # default in SQLAlchemy
    max_overflow=10, # default in SQLAlchemy
    pool_timeout=1,  # raise an error faster than default
)
Session = sessionmaker(bind=engine)

Base = declarative_base()


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)

Base.metadata.create_all(engine)


def get_connections() -> List[Dict]:
    """Return information about connections"""
    sql = """
    SELECT
        pid,
        state
    FROM pg_stat_activity
    WHERE datname = 'postgres'
    AND query NOT LIKE '%%FROM pg_stat_activity%%'
    """
    connection = connect(db_url)
    cursor = connection.cursor()
    cursor.execute(sql)
    connections = [
        {"pid": r[0], "state": r[1]} for r in cursor.fetchall()
    ]
    cursor.close()
    connection.close()
    return connections


def get_pool_info() -> Dict:
    """Get information about the current connections and pool"""
    return {
        "postgres connections": get_connections(),
        "pool id": id(engine.pool),
        "connections in current pool": (
            engine.pool.checkedin() + engine.pool.checkedout()
        ),
    }


app = Flask(__name__)

# Create a session and make a query.
# The connection created is now part of the pool.
session = Session()
session.query(User).all()
session.close()


@app.route("/pool")
def pool():
    return jsonify(get_pool_info())


@app.route("/make_query")
def make_query():
    session = Session()
    session.query(User).all()
    session.close()
    return jsonify(get_pool_info())


@app.route("/dispose")
def dispose():
    engine.pool.dispose()
    return jsonify(get_pool_info())

If we call this app using:

gunicorn --workers=2 gunicorn-example:app

the 2 gunicorn workers will not share pools or connections. The behavior is the one expected: each worker will get its own pool, containing a single connection, because only one request is made at a time for each worker.

But if we use:

gunicorn --workers=2 --preload gunicorn-example:app

This script is imported before the forking takes place. The fact that we made a query before forking is problematic, because now the duplicated pool inside the 2 workers think they are responsible for the same connection.

When we request the /pool route, we get exactly the same result (with the same pool object id) for every call to our app, despite the fact that we have 2 workers:

{
  "connections in current pool": 1,
  "pool id": 139763488061256,
  "postgres connections": [
    {
      "pid": 1906,
      "state": "idle"
    }
  ]
}

Requesting /make_query reuses the same connection, no matter which gunicorn worker gets the request.

To imitate what would happen if a connection was discarded in the database, we can request the /dispose route. The connection gets released in the worker that received the request, and postgres discards it. But the other worker still has a reference to it. So trying to request /make_request with this worker will raise an exception that looks like:

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.

This error will be intermittent, and hard to reproduce.

Here are the requests to make to recreate this error for this example:

import requests


url = "http://127.0.0.1:8000"

r = requests.get(f"{url}/dispose")
n_workers = 2
for _ in range(n_workers):
    r = requests.get(f"{url}/make_query")
    r.raise_for_status()

Takeaways and solutions

Ultimately, the important fact is that you must not use the same connections from multiple threads or processes. To make sure this is not the case, here are a couple things you might want to try:

  • Notice when the engine creation takes place: you most likely want to have it shared as a global object after forking.
  • Notice when forking takes place. This can depend which libraries you are using.
  • If your application forks itself, see it you can call the dispose() method on your engine before forking, which will release all of the connections in the pool.
  • Use an event listener on checkout to verify the connection is still in the same process it was when it was created. Like the example given in the second method in the section Using Connection Pools with Multiprocessing
  • Use a NullPool as a compromise, which creates and closes the DB-API connection every time.