Django-style Database Routers in SQLAlchemy

January 11, 2012 at 06:50 PM | Code, SQLAlchemy

Updated August, 2014 - the approach here really won't work very well if you are using transactions! See the notes inline.

As luck would have it, I'm currently assigned to work with some existing Django code. It's been quite a long time I've gone without needing to do so, but now that I'm there, I can start fleshing out how some of the use cases for Django can be approximated in SQLAlchemy.

Today it's something that's really quite simple - how to mimic the Multiple Databases example in Django's docs. The Django approach is to build a "router" object into the system which can then decide on a query-by-query basis which of several databases should be used for each query. For this, they provide an interface which includes db_for_read(), db_for_write(), allow_relation() and allow_syncdb(). In particular, db_for_read() and db_for_write() are used when emitting most SQL. These receive an argument called model, which the docs state is a "type", i.e. a class. Because the database determination is based on type, we call this in SQLAlchemy vertical partitioning - databases are partitioned along types.

The specific example here has the following behavior:

  1. Write operations occur on a database referred to as leader.
  2. Read operations are split among two different databases referred to as follower1 and follower2.
  3. Operations for a specific subset of classes denoted by myapp occur on a database referred to as other.

When using SQLAlchemy, we of course don't have the concept of "apps" as a delineating factor between different types. So we'll use the old fashioned approach and just use the type itself, that is, what hierarchy it inherits from, to determine which subsystem of the application it heralds from.

The SQLAlchemy Session makes all decisions about what Engine to use within the get_bind() method. This method is given pretty much the same information that db_for_read() and db_for_write() receive, where a Mapper is passed in, if available, and we can also check if the operation is a "write" just by checking if the Session is flushing.

Imports

We'll build up our example in the usual way, as a full script broken out. First the imports:

from sqlalchemy import Column, Integer, String, MetaData, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base
import random
import shutil

Engine Registry

Then, some Engine instances. Note that, unlike Django, SQLAlchemy is not a framework, and doesn't have a settings.py file or an existing registry of engines. Assuming we can decide on a decent place to put our own registry, we just stick them in a dict:

engines = {
    'leader': create_engine('sqlite:///leader.db',
                            logging_name='leader'),
    'other': create_engine('sqlite:///other.db',
                            logging_name='other'),
    'follower1': create_engine('sqlite:///follower1.db',
                            logging_name='follower1'),
    'follower2': create_engine('sqlite:///follower2.db',
                            logging_name='follower2'),
    }

The example here uses SQLite databases, so that it's quick and easy to actually run the script. However, as I'm not familiar with replication functionality in SQLite, we'll have to "fake" the part where "leader" replicates to "follower", just for the sake of example.

Routing

Next comes our implementation of get_bind(). We're building into a more open-ended space here, which can good or bad thing, depending on where you're coming from. Instead of building two "router" classes with two db routing methods each, we just need to make one method with some conditionals:

class RoutingSession(Session):

    def get_bind(self, mapper=None, clause=None):
        if mapper and issubclass(mapper.class_, OtherBase):
            return engines['other']
        elif self._flushing:
            return engines['leader']
        else:
            return engines[
                    random.choice(['follower1', 'follower2'])
                ]

So above, we use a three-state conditional to make exactly those same choices the Django example does. When we want to detect the classes destined for the "other" engine, we look for a particular base class called OtherBase. We could just as easily do other kinds of checks here, such as checking for a particular attribute on the class.

We also look at a state variable called _flushing to determine operations destined for the leader. The Session._flushing flag is set as soon as the flush procedure begins, and remains on until the method completes. SQLAlchemy uses this flag mainly to prevent re-entrant calls to autoflush when it's already inside of the flush process.

That's all we need in the way of Session, for the moment. To wire this Session up into the standard scoped_session(sessionmaker(autocommit=True)) process, we can pass it into sessionmaker():

Session = scoped_session(sessionmaker(class_=RoutingSession, autocommit=True))

Note also that, with some dependency on the replication system we have in place, using this approach may very well mean we can't use a transaction for our normal work. If we write some rows into the leader, and don't commit our transaction, we won't see those rows if we try to SELECT from the follower because the transaction is isolated from any other node until committed. Hence the session is set up with autocommit=True, which uses an ad-hoc transaction for each SELECT statement, and one for each flush operation.

Model Setup

Next, let's build up the "model". This is the part where we need to come up with an answer for Django's syncdb feature, which of course in SQLAlchemy is MetaData.create_all(). We now have two different schemas - one schema is shared between leader, follower1, and follower2, the other is destined for other. We'll split out our table metadata among these backends using two different MetaData() objects. To achieve that transparently, I'll use a trick I don't think people are quite aware of yet, which is to use abstract declarative bases.

Starting with a useful declarative base that will give us an id and a data column, as well as a decent __repr__():

class Base(object):
    id = Column(Integer, primary_key=True)
    data = Column(String(50))
    def __repr__(self):
        return "%s(id=%r, data=%r)" % (
            self.__class__.__name__,
            self.id, self.data
        )

Base = declarative_base(cls=Base)

We then split out Base into two subtypes, which won't be mapped. To tell declarative not to map these non-mixin, direct descendants of Base, we use a fairly new flag called __abstract__:

class DefaultBase(Base):
    __abstract__ = True
    metadata = MetaData()

class OtherBase(Base):
    __abstract__ = True
    metadata = MetaData()

and holy crap look at that a MetaData on each one! You can do that? Sure can - the classes we build on top of DefaultBase will put the Table objects into one MetaData, the classes we built on top of OtherBase will put them into another. We've basically just replaced the .metadata attribute declarative sets up with our own - there's no magic. The DefaultBase and OtherBase classes are also not mapped and are transparent to the mapping mechanism.

Let's build out three "models" (I really prefer to say "class", let's see if I can get to the end of this post without complaining more about it...):

class Model1(DefaultBase):
    __tablename__ = 'model1'

class Model2(DefaultBase):
    __tablename__ = 'model2'

class Model3(OtherBase):
    __tablename__ = 'model3'

Bang. For those unfamiliar with declarative, because these objects ultimately descend from Base above, they will have an id and a data column available, where id is a surrogate primary key.

Table Creation

We use MetaData.create_all() here. Anything that's DefaultBase should have tables created in leader, follower1, follower2:

for eng in 'leader', 'follower1', 'follower2':
    DefaultBase.metadata.create_all(engines[eng])

OtherBase then goes to other:

OtherBase.metadata.create_all(engines['other'])

Usage

We now have the tables built, we can show off things getting sent to leader within an explicit transaction block:

s = Session()

with s.begin():
    s.add_all([
        Model1(data='m1_a'),
        Model2(data='m2_a'),
        Model1(data='m1_b'),
        Model2(data='m2_b'),
        Model3(data='m3_a'),
        Model3(data='m3_b'),
    ])

If we have SQL echoing on, we'd see transactions starting on each of leader and other, the requisite INSERT statements, then the two COMMIT calls. Note that the BEGIN for other doesn't occur until the unit of work actually comes across SQL destined for this engine:

INFO sqlalchemy.engine.base.Engine.leader BEGIN (implicit)
INFO sqlalchemy.engine.base.Engine.leader INSERT INTO model1 (data) VALUES (?)
INFO sqlalchemy.engine.base.Engine.leader ('m1_a',)
INFO sqlalchemy.engine.base.Engine.leader INSERT INTO model1 (data) VALUES (?)
INFO sqlalchemy.engine.base.Engine.leader ('m1_b',)
INFO sqlalchemy.engine.base.Engine.other BEGIN (implicit)
INFO sqlalchemy.engine.base.Engine.other INSERT INTO model3 (data) VALUES (?)
INFO sqlalchemy.engine.base.Engine.other ('m3_a',)
INFO sqlalchemy.engine.base.Engine.other INSERT INTO model3 (data) VALUES (?)
INFO sqlalchemy.engine.base.Engine.other ('m3_b',)
INFO sqlalchemy.engine.base.Engine.leader INSERT INTO model2 (data) VALUES (?)
INFO sqlalchemy.engine.base.Engine.leader ('m2_a',)
INFO sqlalchemy.engine.base.Engine.leader INSERT INTO model2 (data) VALUES (?)
INFO sqlalchemy.engine.base.Engine.leader ('m2_b',)
INFO sqlalchemy.engine.base.Engine.other COMMIT
INFO sqlalchemy.engine.base.Engine.leader COMMIT

Now let's query. Normally, if we had a Postgresql or MySQL replication environment set up, the data we write to leader would have been copied out to follower1 and follower2. So cover your eyes for a moment while we pretend:

##### PRETEND PRETEND PRETEND ######
# now let's pretend "leader' is replicating to "follower1", "follower2"
shutil.copy("leader.db", "follower1.db")
shutil.copy("leader.db", "follower2.db")
##### END PRETEND END PRETEND END PRETEND ######

(note that SQLAlchemy as of 0.7 doesn't use a connection pool by default when it talks to SQLite - it just opens from the file each time. As long as nobody's talking to the database, we can swap out the file).

We can do some querying - SQL echoing is displayed inline here:

>>> print s.query(Model1).all()
INFO sqlalchemy.engine.base.Engine.follower2 BEGIN (implicit)
INFO sqlalchemy.engine.base.Engine.follower2 SELECT model1.id AS model1_id, model1.data AS model1_data
FROM model1
[Model1(id=1, data='m1_a'), Model1(id=2, data='m1_b')]

>>> print s.query(Model2).all()
INFO sqlalchemy.engine.base.Engine.follower1 BEGIN (implicit)
INFO sqlalchemy.engine.base.Engine.follower1 SELECT model2.id AS model2_id, model2.data AS model2_data
FROM model2
[Model2(id=1, data='m2_a'), Model2(id=2, data='m2_b')]

>>> print s.query(Model3).all()
INFO sqlalchemy.engine.base.Engine.other BEGIN (implicit)
INFO sqlalchemy.engine.base.Engine.other SELECT model3.id AS model3_id, model3.data AS model3_data
FROM model3
[Model3(id=1, data='m3_a'), Model3(id=2, data='m3_b')]

Manual Access

Django also includes a "manual" selection mode via the using() modifier on the QuerySet object. Let's illustrate a modified version of our RoutingSession with a similar method added. We add one more check in get_bind(), to look for a local attribute called name - then we build a quick "generative" method that copies the state from one RoutingSession into another, so that the second session shares the same state:

class RoutingSession(Session):

    def get_bind(self, mapper=None, clause=None ):
        if self._name:
            return engines[self._name]
        elif mapper and issubclass(mapper.class_, OtherBase):
            return engines['other']
        elif self._flushing:
            return engines['leader']
        else:
            return engines[
                    random.choice(['follower1','follower2'])
                ]

    _name = None
    def using_bind(self, name):
        s = RoutingSession()
        vars(s).update(vars(self))
        s._name = name
        return s

So assuming we plugged in the above Session, we can explicitly query the leader as:

m1 = Session().using_bind("leader").query(Model1).first()

Admittedly, the using_bind() method above might be something I should add some helpers for, like a @generative decorator similar to that available for Query, so that the vars.update() approach is not needed.

I hope this example is useful, and sheds some light on how we do things in SQLAlchemy.

Download the example: sqlalchemy_multiple_dbs.py