支持多种数据库后端

发布时间:2025-06-24 18:50:57  作者:北方职教升学中心  阅读量:625


所以综上,scoped session通过封装context管理,让session的生命周期与当前上下文(线程、

fromsqlalchemy.orm importsessionmaker,scoped_sessionsession_factory =sessionmaker(bind=engine)Session =scoped_session(session_factory)defget_user():session =Session()returnsession.query(User).first()

这里的Session是一个scoped session类。
这种错误的常见原因包括:

  1. 应用程序进程崩溃或异常退出,没有正常关闭数据库连接。

    三、

    PostgreSQL 和 MySQL 自动提交区别

    PostgreSQL 的默认设置是自动提交关闭的,每次事务需要显式地通过 COMMIT 提交。

  2. 数据库连接泄露,占用了所有可用连接。
  3. 检查客户端应用程序代码,确保正确处理超时、
  4. db_session.execute() 直接执行语句并返回结果。关于SQLAlchemy自动提交事务
    • PostgreSQL 和 MySQL 自动提交区别
    • 查看PostgreSQL当前连接是否开启自动提交
  5. 五、
  6. 性能不同:
    • 对简单查询,db_session.execute()往往更快。
      MySQL的事务autocommit标志是session级的。
    • 在数据库端执行以下命令,手动终止该占用连接的事务:
SELECT pg_terminate_backend(pid)FROM pg_stat_activity WHERE state ='idle in transaction';

你执行这个语句后,处在 idle in transaction 状态的后端进程将被强制终止,相关的事务也会被回滚。

  • 复杂查询: SQLAlchemy提供丰富的查询方式,如过滤、

    SQLAlchemy 使用场景

    SQLAlchemy是一个强大的Python ORM框架,主要应用于以下场景:

    1. 数据库访问和操作:SQLAlchemy提供了高层抽象来操作数据库,可以避免写原生SQL语句。【重要】会话实例多线程环境使用
      • 单线程使用实例
      • 多线程实例:使用线程局部存储来管理会话threading.local() 方式
      • 多线程实例:使用scoped_session
      • [重要]选择scoped_session 方式还是threading.local() 方式
    2. 四、
    3. 大数据集查询:基于Pagination实现数据分页,避免大量数据查询内存溢出。

      是否可以直接查询PostgreSQL的autocommit状态,而不通过 default_transaction_isolation 这种间接的方式。我们可以在会话关闭后,使用这个查询加上过滤条件等再次执行,产生新结果。许多大型Python项目都选择使用SQLAlchemy作为ORM框架。

      终止后端进程后,还需要几个后续步骤:

      1. 再次执行同样的查询,检查是否还有处在 idle in transaction 状态的进程:
      SELECT * FROM pg_stat_activity WHERE state ='idle in transaction';
      1. 分析日志,找到未提交事务的根本原因,比如编程错误、
      2. 多数据库支持:支持Postgres、
      3. 不同的框架可以集成scoped session到自己的上下文。但是从版本 1.4 开始,这个参数就不再支持了。
      fromsqlalchemy importcreate_enginefromsqlalchemy.orm importscoped_session,sessionmakerengine =create_engine(SQLALCHEMY_DATABASE_URI)db_session =scoped_session(sessionmaker(autoflush=False,bind=engine))
      1. 使用sql 或者 数据模型

      这里先举例模型方式:
      在SQLAlchemy中,declarative_base()用来创建一个基类,这个基类会为所有继承它的子类提供declarative的ORM功能。什么是SQLAlchemy

      官网:https://www.sqlalchemy.org/

      SQLAlchemy是一个基于Python实现的SQL工具包和ORM框架,提供了高层抽象来管理数据库交互。SQLAlchemy使用

      • SQLAlchemy根据模型查询
      • SQLAlchemy SQL 格式化的方式
      • db_session.query和 db_session.execute区别
        • 实测demo 总结:让我们留意一下SQLAlchemy 的 lazy loading 特性
      • scoped session
        • 关于scoped_session自动提交事务
      • 参数autoflush=False 的工作机制
    4. 三、**因此当你调用db_session.execute(sql)时,会默认复用当前session的事务来执行,而这个session已经在一个未提交的事务中了。支持多种数据库后端。因为autocommit实际上不是一个配置参数,而是个概念,是由default_transaction_isolation参数决定的。
      所以,简单来说:
    • db_session.query() 是面向对象的查询方式,更灵活,但复杂查询可能有性能问题。【重要】会话实例多线程环境使用

      单线程使用实例

      importosfromdotenv importload_dotenvfromsqlalchemy importcreate_enginefromsqlalchemy.orm importsessionmakerfrompathlib importPathroot_path =Path(__file__).parent.parentdotenv_path =root_path /".env"load_dotenv(dotenv_path)DATABASE_URL =os.environ.get("DATABASE_URL")print(DATABASE_URL)classDatabaseUtils:_db_engine =None_session =None@classmethoddefget_db_engine(cls):ifcls._db_engine isNone:cls._db_engine =create_engine(DATABASE_URL)returncls._db_engine    @classmethoddefget_session(cls):ifcls._session isNone:Session =sessionmaker(bind=cls.get_db_engine())cls._session =Session()returncls._session

      这种session在多线程环境下存在问题,比如并发的执行2个sql,两个都使用 with DatabaseUtils.get_session() as session:方式,其中一个执行完毕,会自动关闭session,这时候另个线程就会报错:

      Method 'close()'can't be called here; method 'commit()' is already inprogress and this would cause an unexpected state change to <SessionTransactionState.CLOSED: 5>(Background on this error at: https://sqlalche.me/e/20/isce)

      因此,我们需要多线程方案。这个连接下的所有事务操作默认都是非自动提交的。

    • 设置自动提交模式会改变连接的默认提交行为。
      之所以我们常用 default_transaction_isolation 来判断,是因为这两个设置在PostgreSQL内部是耦合的:

      • read committed 表示关闭了autocommit
      • read uncommitted 表示开启了autocommit
        所以default_transaction_isolation等于read committed的时候,就可以确定autocommit是关闭的。比如网络闪断,客户端机器宕机等。
      • 网络连接异常中断。

        1. 使用参数绑定的方式,而不是%格式化:
        sql =text('SELECT id FROM :schema.table...')params ={'schema':'my_schema'}result =session.execute(sql,params)

        总结:

        1. sql文本内不要再使用% formatting
        2. 使用f-string或参数绑定格式化
        3. 创建text对象时直接构建完整sql文本

        参数绑定也可以提高安全性,避免SQL注入\

        db_session.query和 db_session.execute区别

        在SQLAlchemy中,db_session.query()db_session.execute()主要有以下几点区别:

        1. 返回值不同:
          • db_session.query() 返回一个Query对象,可以用于构建查询并最终获取结果。

          scoped session

          使用 db_session.query()时,为了避免访问 detached 实例,我们希望延迟关闭会话 db_session。

          • MySQL 的默认设置是自动提交开启的,每条语句会自动提交事务。资源、

            没有办法直接用SHOW或者SELECT方式获取autocommit的状态。其特点是:

            1. 每个线程或请求都会自动创建一个新的session实例,避免同一个session跨线程/请求使用。
            2. 这样就可以不需要我们手动管理session的生命周期。
            3. 合理配置数据库连接池,避免连接泄露。

              使用scoped_session,这个 session 实例绑定到线程/请求,在其线程结束时自动关闭.

              关于scoped_session自动提交事务

              scoped_session 的工作原理是:

              • 为每个线程或请求创建一个新的 session 实例
              • 这个 session 实例绑定到线程/请求,在其结束时自动关闭
              • 开发者只需要使用 Session 类即可,不需要手动关闭
                也就是说,每个线程/请求都会有一个独立的 session 实例,这个实例不会自动提交。

            SQLAlchemy中的session.commit()内部会自动调用session.flush()来保证所有pending的变更都被持久化。
            直接查询autocommit的参数在PostgreSQL中是不支持的。

            总结:
            方法1: 我们只调commmit,这里不调colse(),需要其他地方,只有在确定不用的情况下才调close。也就是说,一个会话内全部操作默认都是自动提交的。

            SQLAlchemy功能强大,可以省去很多手动管理数据库连接、

          您的问题提到了MySQL中的自动提交模式,这和PostgreSQL中的事务处理模式有些不同。

          导致这个错误的典型代码如:

          engine =create_engine(URL,autocommit=True)

          要修复这个错误,需要移除 autocommit 参数,改为手动管理事务:

          1. 去掉 autocommit=True
          engine =create_engine(URL)
          1. 在代码中手动提交事务:
          withengine.begin()asconn:conn.execute(...)conn.commit()
          1. 或者开启自动提交模式:
          connection =engine.connect()connection.autocommit =True

          总之,Engine 对象不再支持 autocommit 参数。关于SQLAlchemy自动提交事务

          背景:
          工作发现使用,出现很多 sqlalchemy出现很多未提交事务 情况。事务提交等情况。

        2. 对复杂查询,db_session.query()可以通过ORM特性进行优化。

          文章目录

          • python常用库之数据库orm框架之SQLAlchemy
            • 一、访问实例属性之前,先显式提交事务:
        db_session.query(...)db_session.commit()# 提交事务obj.some_attr # 访问属性

        这可以先释放锁,同时实例也绑定到会话中。

        总结:PostgreSQL的autocommit属性是针对每个事务的,而不是整个会话。
        MySQL中的自动提交(autocommit)是作用于整个数据库连接会话的,主要体现在:

        1. 新建立的连接默认是自动提交模式(autocommit=on)。如果连接是非自动提交的,那么这个事务也是非自动提交的。objs =db_session.execute(sql,{"userid":userid}).fetchall()print(objs)

          **每个session默认都是在一个事务中,不会自动提交。

          SQLAlchemy 的设计是为了最大程度兼容不同数据库的行为,所以从 1.4 版本开始采用关闭自动提交作为默认值,这更符合 PostgreSQL 和一些其他数据库的行为。

          1. 使用scoped session,可以避免手动 close db_session。
          2. 但执行查询并获取实例,实际上已经加载了具体的数据,所以会依赖会话提供的数据状态。
          3. 异步查询:基于Greenlet等实现异步查询,提高查询效率。

          这与 SQLAlchemy 的 lazy loading 特性有关 - 查询只有在需要时才执行和加载实例。推荐的方式还是手动提交控制事务。

        2. db_session.execute() 只能执行简单的SQL语句查询。工作遇到的问题

          1. pg报错:unexpected EOF on client connection

          在 PostgreSQL 中,“unexpected EOF on client connection” 这个错误通常表示客户端应用程序异常中断了与数据库的连接,导致有一个未完成的打开事务状态。
          如果在获取到 user 对象后立即关闭会话,然后访问 user.name,会发生 detached实例错误。

        3. 客户端没有正确处理查询超时或者服务器重启的情况。

      需要特别注意的点:使用 db_session.query() 后获取的 ORM 对象实例,在访问其属性之前,需要确保与该查询关联的 db_session 没有关闭或失效。
      注:pg_terminate_backend() 函数的返回值 ‘t’ 表示终止后端成功。Oracle等主流数据库。invalidate该session。

  • 这会针对当前线程的 session 实例开启自动提交。只是在commit之外的其他情况下,需要手动调用flush。
  • 连接的自动提交模式作用于该连接下执行的所有事务。
  • 查询方式不同:
    • db_session.query() 通过ORM构建查询。

    四、这就要求我们必须显式地提交或回滚事务来结束一个事务。
    TblObject = declarative_base() 的作用是:
    3. 创建了一个名为TblObject的基类
    4. 这个TblObject具有declarative的功能
    5. 后续定义的模型类可以继承这个TblObject基类来使用declarative

    fromsqlalchemy.orm importdeclarative_basefromsqlalchemy importColumn,Text,Integer,String,VARCHAR,TIMESTAMP,BOOLEAN,Float,textTblObject =declarative_base()classUser(TblObject):__tablename__ ='table_user'__table_args__ =({"schema":"public"})id=Column(Integer,primary_key=True)name =Column(String)query =db_session.query(User)user =query.first()print(user.name)

    SQLAlchemy SQL 格式化的方式

    1. 将sql语句本身不包含参数格式化,直接作为文本构建完整的sql:
      要想实现变量替换,需要使用f-string的格式
    myschema ='xxxx'sql =text(f'SELECT id FROM {myschema }.table...')

    注意看字符串前面有个f。

    这与直接调用begin()开始一个新事务其实效果是一样的。

  • 事务控制: 通过Session管理数据库会话和事务。分组、
    在会话关闭后,这两种情况的行为不同:

    1. 对于仅构造查询的 db_session.query(User),由于没有结果产生,所以不会有 detached 实例的问题。
    2. db_session.execute() 直接返回行数据组成的列表。
    3. 由于编程错误或者资源问题,客户端忘记提交/回滚一个长时间运行的事务。
    4. db_session.execute() 通过原生SQL语句查询。
    5. 五、当开始一个新的事务时,这个事务继承了连接的非自动提交属性。

      autoflush=False需要手动flush,但不影响commit的自动flush行为。

    6. 工具集成:如数据迁移工具Alembic,可以实现Schema版本控制和迁移。
    7. 使用完session后不需要关闭它,scoped session会在当前上下文退出后自动关闭、需要通过上述方式自行控制事务提交。

      autoflush=False的情况下:

      • 正常的增删改操作不会自动flush
      • 调用commit会触发强制flush以持久化变更
        这是SQLAlchemy的一种保护机制,来确保在事务提交时不会丢失还未flush的变更。但这样就无法及时提交事务,可能会导致锁表问题。

      • 如果你的应用是独立的后台任务处理程序,或者需要更细粒度的线程隔离,那么使用 threading.local() 可能更合适。

      • 开始一个新的事务时,这个事务会继承连接当前的autocommit状态。什么是SQLAlchemy
        • SQLAlchemy 使用场景
      • 二、工作遇到的问题
        • 1. pg报错:unexpected EOF on client connection
        • 2. 报错:sqlalchemy.exc.ArgumentError: autocommit=True is no longer supported