Query
Session的query函数会返回一个Query对象。query函数可以接受多种参数类型。可以是类,或者是类的instrumented descriptor。下面的这个例子取出了所有的User记录。
>>> for instance in session.query(User).order_by(User.id):
... print(instance.name, instance.fullname)
ed Ed Jones
wendy Wendy Williams
mary Mary Contrary
fred Fred Flinstone
Query也接受ORM-instrumented descriptors作为参数。当多个参数传入时,返回结果为以同样顺序排列的tuples
>>> for name, fullname in session.query(User.name, User.fullname):
... print(name, fullname)
ed Ed Jones
wendy Wendy Williams
mary Mary Contrary
fred Fred Flinstone
Query返回的tuples由KeyedTuple这个类提供,其成员除了用下标访问意外,还可以视为实例变量来获取。对应的变量的名称与被查询的类变量名称一样,如下例:
>>> for row in session.query(User, User.name).all():
... print(row.User, row.name)
<User(name='ed', fullname='Ed Jones', password='f8s7ccs')> ed
<User(name='wendy', fullname='Wendy Williams', password='foobar')> wendy
<User(name='mary', fullname='Mary Contrary', password='xxg527')> mary
<User(name='fred', fullname='Fred Flinstone', password='blah')> fred
你可以通过label()来制定descriptor对应实例变量的名称
>>> for row in session.query(User.name.label('name_label')).all():
... print(row.name_label)
ed
wendy
mary
fred
而对于类参数而言,要实现同样的定制需要使用aliased
>>> from sqlalchemy.orm import aliased
>>> user_alias = aliased(User, name='user_alias')
SQL>>> for row in session.query(user_alias, user_alias.name).all():
... print(row.user_alias)
<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>
<User(name='wendy', fullname='Wendy Williams', password='foobar')>
<User(name='mary', fullname='Mary Contrary', password='xxg527')>
<User(name='fred', fullname='Fred Flinstone', password='blah')>
基本的查询操作除了上面这些之外,还包括OFFSET和LIMIT,这个可以通过Python的array slice来完成。
>>> for u in session.query(User).order_by(User.id)[1:3]:
... print(u)
<User(name='wendy', fullname='Wendy Williams', password='foobar')>
<User(name='mary', fullname='Mary Contrary', password='xxg527')>
上述过程实际上只涉及了整体取出的操作,而没有进行筛选,筛选常用的函数是filter_by和filter。其中后者比起前者要更灵活一些,你可以在后者的参数中使用python的运算符。
>>> for name, in session.query(User.name).\
... filter_by(fullname='Ed Jones'):
... print(name)
ed
>>> for name, in session.query(User.name).\
... filter(User.fullname=='Ed Jones'):
... print(name)
ed
注意Query对象是generative的,这意味你可以把他们串接起来调用,如下:
>>> for user in session.query(User).\
... filter(User.name=='ed').\
... filter(User.fullname=='Ed Jones'):
... print(user)
<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>
串接的filter之间是与的关系。
常用的filter操作符
下面的这些操作符可以应用在filter函数中
equals
query.filter(User.name == 'ed')
not equals:
query.filter(User.name != 'ed')
LIKE
query.filter(User.name.like('%ed%'))
IN
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
# works with query objects too:
query.filter(User.name.in_(
session.query(User.name).filter(User.name.like('%ed%'))
))
NOT IN
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
query.filter(User.id.notin_(['ed', 'wendy', 'jack']))
IS NULL
query.filter(User.name == None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.is_(None))
IS NOT NULL
query.filter(User.name != None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.isnot(None))
AND
# use and_()
from sqlalchemy import and_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
# or send multiple expressions to .filter()
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
# or chain multiple filter()/filter_by() calls
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
OR
from sqlalchemy import or_
query.filter(or_(User.name == 'ed', User.name == 'wendy'))
MATCH
query.filter(User.name.match('wendy'))
返回列表(List)和单项(Scalar)
很多Query的方法执行了SQL命令并返回了取出的数据库结果。
all()返回一个列表:
>>> query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
SQL>>> query.all()
[<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>,
<User(name='fred', fullname='Fred Flinstone', password='blah')>]
first()返回至多一个结果,而且以单项形式,而不是只有一个元素的tuple形式返回这个结果.
>>> query.first()
<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>
one()返回且仅返回一个查询结果。当结果的数量不足一个或者多于一个时会报错。
>>> user = query.one()
Traceback (most recent call last):
...
MultipleResultsFound: Multiple rows were found for one()
没有查找到结果时:
>>> user = query.filter(User.id == 99).one()
Traceback (most recent call last):
...
NoResultFound: No row was found for one()
one_or_none():从名称可以看出,当结果数量为0时返回None, 多于1个时报错
scalar()和one()类似,但是返回单项而不是tuple
嵌入使用SQL
你可以在Query中通过text()使用SQL语句。例如:
>>> from sqlalchemy import text
>>> for user in session.query(User).\
... filter(text("id<224")).\
... order_by(text("id")).all():
... print(user.name)
ed
wendy
mary
fred
除了上面这种直接将参数写进字符串的方式外,你还可以通过params()方法来传递参数
>>> session.query(User).filter(text("id<:value and name=:name")).\
... params(value=224, name='fred').order_by(User.id).one()
<User(name='fred', fullname='Fred Flinstone', password='blah')>
并且,你可以直接使用完整的SQL语句,但是要注意将表名和列明写正确。
>>> session.query(User).from_statement(
... text("SELECT * FROM users where name=:name")).\
... params(name='ed').all()
[<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>]
计数
Query定义了一个很方便的计数函数count()
>>> session.query(User).filter(User.name.like('%ed')).count()
SELECT count(*) AS count_1
FROM (SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
users.password AS users_password
FROM users
WHERE users.name LIKE ?) AS anon_1
('%ed',)
2
注意上面我们同时列出了实际的SQL指令。在SQLAlchemy中,我们总是将被计数的查询打包成一个子查询,然后对这个子查询进行计数。即便是最简单的SELECT count(*) FROM table,也会如此处理。为了更精细的控制计数过程,我们可以采用func.count()这个函数。
>>> from sqlalchemy import func
SQL>>> session.query(func.count(User.name), User.name).group_by(User.name).all()
SELECT count(users.name) AS count_1, users.name AS users_name
FROM users GROUP BY users.name
()
[(1, u'ed'), (1, u'fred'), (1, u'mary'), (1, u'wendy')]
为了实现最简单的SELECT count(*) FROM table,我们可以如下调用
>>> session.query(func.count('*')).select_from(User).scalar()
SELECT count(?) AS count_1
FROM users
('*',)
4
如果我们对User的主键进行计数,那么select_from也可以省略。
>>> session.query(func.count(User.id)).scalar()
SELECT count(users.id) AS count_1
FROM users
()
4