Some Notes About SQLAlchemy

近期在收集处理大量的数据,通常是把 XML/文本格式的数据转为关系型,结构化的的数据,能够方便导出直接进行分析,尤其是导出能直接被 pandas 分析的格式(csv/json)。因此需要使用一个在 Python 下的 mysql 处理库,既然找了处理库,也干脆一步到位,直接选一个支持 ORM 的库。于是,就选择了 SQLAlchemy。用了一段时间后,写个记录来记下这段时间用到的常用的东西。

SQLAlchemy 是 Python 编程语言下的一款开源软件。提供了 SQL 工具包及对象关系映射(ORM)工具,主要能满足一下的需求:

  • 提供对数据库的常用操作:增删改查
  • 提供 ORM 功能,可以让操作的单元变为对象,而不用写 sql 语句进行硬编码/解码解析数据

数据库连接

SQLalchemy 不能支持操作数据库,因而需要安装额外的数据库驱动,对于不同的数据库与驱动,有着不一样的配置 URI,总体格式为dbms://user:pwd@host/dbname。这里用的 Mysql+mysqlconnector,具体的连接代码如下

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

sql_connection = 'mysql+mysqlconnector://root:pwd@localhost:3306/database'
engine = create_engine(sql_connection)

# 执行 sql 事务需要使用会话
DBsession = sessionmaker(bind=engine)
session = DBsession()

# 结束前需要关闭 session
session.close()

结构定义

在 SQLalchemy 中,ORM 通过定义对象进行数据库 model 的绑定。


from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relation, sessionmaker

Base = declarative_base()
 
class Movie(Base):
    __tablename__ = 'movies'
 
    id = Column(Integer, primary_key=True)
    title = Column(String(255), nullable=False)
    year = Column(Integer)
    directed_by = Column(Integer)

 
    def __init__(self, title=None, year=None):
        self.title = title
        self.year = year
  • ORM 的 model 类通过继承 declarative_base 进行定义,通过__tablename__绑定到对应的 table
  • 每一个 Column 属性为表中的一列,需要设置类型,类型必须要与数据库中列的类型对应,常用类型有 Integer,Float, String(对应 varchar),Text
  • 必须设置一个属性/列为primary_key
  • 可以通过Base.metadata.create_all(engine)创建 table

数据操作

数据使用 ORM 进行操作,操作的基本单位为对象(也可以直接使用 sql 语句,但对于常用操作来说没有必要)。除查询外,对数据库有修改的操作都需要 commit 事务

插入

m1 = Movie("Star Trek", 2009)
session.add(m1)
session.commit()
  1. 创建待插入的对象,添加相应的数据属性
  2. 添加进 session 中
  3. 提交 session,提交事务

查询

movies = session.query(Movie).all() # 获取所有数据
movie = session.query(Movie).get(movie_id) # 获取单个记录
filter_movie = session.query(Movie).filter(Movie.id == 1).one()
  • 查询中 query 的参数为查询的 table/返回的对象
  • filter 等于 where 语句,可以对查询结果进行进一步筛选,one 返回唯一行,all 则返回所有行

更新

查询出来的数据,修改属性后重新提交事务即可。

movie = session.query(Movie).get(id)
movie.year = 1999
session.commit()

删除

movie = session.query(Movie).get(id)
session.delete()
session.commit()

some problems

大批量数据插入

在使用的过程中,遇到一次需要插入三十多万条数据到数据库的情况。在此之前,我一直都是插完数据再一起 commit 的(几千/几百条),因为每次 commit 需要写入数据库,会比较慢(磁盘 IO 制约)。 但这次数量过多的话,提交的时候出现错误,提示 mysql 的连接丢失。估计是写入的时间太多,时间太长超出了其限制,进而断开了连接。于是我选择批量 commit,在添加一定数量数据后提交一次(5000/1000/500/100/10),但是没有成功,插入一定量的记录后,提示某一条插入的记录的某一列过长(尽管我那一列是不限长度的 Text),显然有问题。 上网搜索批量数据插入的方法,但内容基本都是千篇一律,全部都是转载/翻译 StackOverflow 上的一篇 回答。给出了两种方法

  • bulk_save_objects(objects)
  • add_all()

都没有效果,最终还是采取了逐条添加并提交的方法,虽然慢了点,但是能 work。

写这件事只是为了吐槽一下现在用 Google 搜索一些具体的技术问题是,内容严重同质化的现象,大家都抄来抄去,转来转去。像这一次的搜索,最开始的 source 是在 StackOverflow 上的提问,然后有网站翻译成了中文并发布(甚至初创,然后就流传于各个中文网站了,一搜,全是这些内容相同的网页,属实恶心。