当前位置: 亚洲城ca88 > ca88 > 正文

开垦进级Flask的底蕴,使用教程

时间:2019-05-18 00:55来源:ca88
​ Web程序开发中最重要的莫过于关系型数据库,即SQL数据库,另外文档数据库(如 mongodb)、键值对数据库(如redis)慢慢变得流行. 内容概要: Flask 是一个 python web micro framework。所谓微框

​ Web程序开发中最重要的莫过于关系型数据库,即SQL 数据库,另外文档数据库(如 mongodb)、键值对数据库(如 redis)慢慢变得流行.

内容概要:

Flask 是一个 python web micro framework。所谓微框架,主要是 flask 简洁与轻巧,自定义程度高。相比 django 更加轻量级。
之前一直折腾 django,得益于django 的 ORM 模式很好用,上手简单,使用方便。Flask里面没有原生的 orm,需要用到第三方的库,大名顶顶的 SQLALchemy正是一类 实现ORM的库。
下面简单介绍一下,Flask中如何使用sqlchemy (注意,这个和 flask-sqlalchemy 的使用还是有差别的,至于 flask-sqlalchemy 用法下一篇博客介绍。)。参考官方文档在 Flask 中使用 SQLAlchemy

原因 : 我们不直接使用这些数据库引擎提供的 Python 包,而是使用对象关系映射(Object-Relational Mapper, ORM)框架,是因为它将低层的数据库操作指令抽象成高层的面向对象操作。也就是说,如果我们直接使用数据库引擎,我们就要写 SQL 操作语句,但是,如果我们使用了 ORM 框架,我们对诸如表、文档此类的数据库实体就可以简化成对 Python 对象的操作。

  1. SQLAlchemy
  2. flsak-sqlalchemy
  3. flask-script
  4. flask-migrate
  5. Flask的目录结构

一 安装 SQLAlchemy 扩展
在 shell 命令下输入

(1) Flask - SQLAlchemy

Flask使用的ORM框架为 SQLAlchemy,数据库采用了URL指定,下面我们列举几种数据库引擎:

数据库引擎 URL指定
MySQL mysql://username:password@hostname/database
Postgres postgresql://username:password@hostname/database
SQLite (Unix) sqlite:////absolute/path/to/database
SQLite (Windows) sqlite:///c:/absolute/path/to/database

注意:

  1. username 和 password 表示登录数据库的用户名和密码
  2. hostname 表示 SQL 服务所在的主机,可以是本地主机(localhost)也可以是远程服务器
  3. database 表示要使用的数据库 , SQLite 数据库不需要使用服务器,它使用硬盘上的文件名作为 database

ORM使用的优点:

  1. 增加少sql的重复使用率
  2. 使表更加的可读性
  3. 可移植性

一、SQLAlchemy

pip install sqlchemy

(2) SQLAlchemy操作sql原生

安装操作数据库的模块

pip3 install pymysql

安装 flask-sqlalchemy

sudo pip3 install flask-sqlalchemy

配置路径

DB_URI = 'mysql pymysql://root:password@host:port/database'

下面先看下sqlalchemy操作的写法:

from sqlalchemy import create_engine

HOST = '127.0.0.1'
USERNAME = 'root'
PASSWORD = '123456'
DATABASE = 'demo'  #数据库名
PORT = 3306
DB_URI = 'mysql pymysql://{}:{}@{}:{}/{}'.format(USERNAME,PASSWORD,HOST,PORT,DATABASE)
#创建引擎
engine = create_engine(DB_URI)

with engine.connect() as db:
    data = db.execute('select * from user') #从user表中获取全部数据
    db.execute('delete from user where id=1')  #删除id=1的数据

1、概述

SQLAlchemy是一个ORM的框架,ORM就是关系对象映射,具体可以参照Django中的ORM。

作用:帮助我们使用类和对象快速实现数据库操作

数据库:

  -原生:MYSQLdb pymysql

  区别就是 MYSQLdb 不支持python3 pymysql 都支持

ORM框架

  SQLAlchemy

检测是否安装成功:

(3) 设计数据表

2、SQLAlchemy用法  

1、安装

pip3 install sqlalchemy

2、配置

图片 1图片 2

from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Columnfrom sqlalchemy import Integer,String,Text,Date,DateTimefrom sqlalchemy import create_engineBase = declarative_base()class Users:    __tablename__ = 'users'    id = Column(Integer, primary_key=True)    name = Column(String, index=True, nullable=False)def create_all():    engine = create_engine(        "mysql pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",        max_overflow=0,  # 超过连接池大小外最多创建的连接        pool_size=5,  # 连接池大小        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收    )    Base.metadata.create_alldef drop_all():    engine = create_engine(        "mysql pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",        max_overflow=0,  # 超过连接池大小外最多创建的连接        pool_size=5,  # 连接池大小        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收    )    Base.metadata.drop_allif __name__ == '__main__':    create_all()

配置

3、增删改查的演示

图片 3图片 4

from duoduo.test import Usersfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engineengine = create_engine(        "mysql pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",        max_overflow=0,  # 超过连接池大小外最多创建的连接        pool_size=5,  # 连接池大小        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收    )SessionFactory = sessionmaker(bind=engine)#根据Users类对user表进行增删改查session=SessionFactory()#增加#单个# obj=Users(name='many qian')# session.add# session.commit()#多个# session.add_all([#         Users(name='大娃'),#         Users(name='二娃')# ])# session.commit()#查  所有# tes=session.query.all()# print #拿到的对象# for row in tes:#         print(row.id,row.name)# tes=session.query.filter(Users.id==3) #> < >= <=## for row in tes:#     print(row.id,row.name)## tes=session.query.filter(Users.id<=3).first()#> < >= <=## print(tes.id,tes.name)#删除# tes=session.query.filter(Users.id==3).delete() #> < >= <=# session.commit()#改# session.query.filter(Users.id ==4).update({Users.name:'三娃'})# session.query.filter(Users.id ==4).update({'name':'四娃'})# session.query.filter(Users.id ==4).update({'name':Users.name 'NICE'},synchronize_session=False)# session.commit()### session.close()

增删改查

4、单表常用操作

图片 5图片 6

# ############################## 其他常用 ################################ 1. 指定列# select id,name as cname from users;# result = session.query(Users.id,Users.name.label.all()# for item in result:#         print(item[0],item.id,item.cname)# 2. 默认条件and# session.query.filter(Users.id > 1, Users.name == 'duoduo').all()# 3. between# session.query.filter(Users.id.between, Users.name == 'duoduo').all()# 4. in# session.query.filter(Users.id.in_.all()# session.query.filter(~Users.id.in_.all()# 5. 子查询# session.query.filter(Users.id.in_(session.query.filter(Users.name=='duoduo'))).all()# 6. and 和 or# from sqlalchemy import and_, or_# session.query.filter(Users.id > 3, Users.name == 'duoduo').all()# session.query.filter(and_(Users.id > 3, Users.name == 'duoduo')).all()# session.query.filter(or_(Users.id < 2, Users.name == 'duoduo')).all()# session.query.filter(#     or_(#         Users.id < 2,#         and_(Users.name == 'duoduo', Users.id > 3),#         Users.extra != ""#     )).all()# 7. filter_by# session.query.filter_by(name='duoduo').all()# 8. 通配符# ret = session.query.filter(Users.name.like.all()   #%任何的东西# ret = session.query.filter(~Users.name.like.all()   #_ 只有一个字符# 9. 切片# result = session.query[1:2]# 10.排序# ret = session.query.order_by(Users.name.desc# ret = session.query.order_by(Users.name.desc(), Users.id.asc  #desc从大到小  asc从小到大排# 11. group byfrom sqlalchemy.sql import func# ret = session.query(#         Users.depart_id,#         func.count,# ).group_by(Users.depart_id).all()# for item in ret:#         print## from sqlalchemy.sql import func#进行二次筛选,只能用having# ret = session.query(#         Users.depart_id,#         func.count,# ).group_by(Users.depart_id).having(func.count >= 2).all()# for item in ret:#         print# 12.union 去重 和 union all 上下拼接不去重"""select id,name from usersUNIONselect id,name from users;"""# q1 = session.query(Users.name).filter(Users.id > 2)# q2 = session.query(Users.name).filter(Users.id > 2)# ret = q1.union## q1 = session.query(Users.name).filter(Users.id > 2)# q2 = session.query(Users.name).filter(Users.id > 2)# ret = q1.union_allsession.close()

单表常用操作

5、连表常用操作

在上面的配置里,重新搞两给表

from sqlalchemy import ForeignKeyfrom sqlalchemy.orm import relationship#按照上面配置的代码加上这些东西,配置的表删除 class Depart:    __tablename__ = 'depart'    id = Column(Integer, primary_key=True)    title = Column(String, index=True, nullable=False)class Users:    __tablename__ = 'users'    id = Column(Integer, primary_key=True)    name = Column(String, index=True, nullable=False)    depart_id = Column(Integer,ForeignKey("depart.id"))    #跟表结构无关    dp = relationship("Depart", backref='pers')

下面是实例演示:之前自己添加一点数据:

图片 7图片 8

#django的manytomany  在flask中两个ForeignKey完成from duoduo.test import Users,Depart,Student,Course,Student2Coursefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engineengine = create_engine(        "mysql pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",        max_overflow=0,  # 超过连接池大小外最多创建的连接        pool_size=5,  # 连接池大小        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收    )SessionFactory = sessionmaker(bind=engine)#根据Users类对user表进行增删改查session=SessionFactory()# #1、查询所有用户# # ret =session.query.all()# ## # for i in ret:# #     print(i.id,i.name,i.depart_id)#2、查询所有用户,所属部门名称# ret=session.query(Users.id,Users.name,Depart.title).join(Depart,User.depart_id==Depart.id).all()#  #这里有默认ForeignKey ,User.depart_id==Depart.id 可以不加也行# for i in ret:#     print(i.id ,i.name,i.title)#SELECT users.id AS users_id, users.name AS users_name, depart.title AS depart_title#FROM users INNER JOIN depart ON depart.id = users.depart_id# q=session.query(Users.id,Users.name,Depart.title).join# print#3、relation字段:查询所有用户 所有部门名称# ret=session.query.all()# for row in ret:#     print(row.id,row.name,row.depart_id,row.dp.title)#4、relation字段:查询销售部所有人员# obj=session.query.filter(Depart.title =='大娃').first()# # for i in obj.pers:# #     print(i.id,i.name,obj.title)#5、创建一个名称叫:IT部门,再在该部门中添加一个员工叫:多多#方式一:# d1=Depart(title='IT')# session.add# session.commit()## u1=Users(name='duoduo',depart_id=d1.id)# session.add# session.commit()#方式二:# u1=Users(name='多多1',dp=Depart(title='IT'))# session.add# session.commit()#6、创建一个部门叫王者荣耀,这个部门添加多个员工:亚瑟,后裔,貂蝉# d1=Depart(title='王者荣耀')# d1.pers=[Users(name='亚瑟'),Users(name='后裔'),Users(name='貂蝉')]## session.add# session.commit()#1、录入数据# session.add_all([#     Student(name='大娃'),#     Student(name='二娃'),#     Course(title='物理'),#     Course(title='化学'),## ])# session.add_all([#     Student2Course(student_id=2,course_id=1),#     # Student2Course(student_id=1,course_id=2)# ]# )#2、查每个人选了课程的名称,三张表进行关联# ret=session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc# for i in ret:#     print#3、'大娃'所选的所有课# ret=session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='大娃').order_by(Student2Course.id.asc## for i in ret:#     print# obj=session.query.filter(Student.name=='大娃').first()# for item in obj.course_list:#     print(item.title)#4选了'化学'课程的所有人的名字# obj=session.query.filter(Course.title=='化学').first()# for item in obj.student_list:#     print(item.name)#创建一个课程,创建2个学,两个学生选新创建的课程# obj=Course(title='体育')# obj.student_list=[Student(name='五娃'),Student(name='六娃')]## session.add# session.commit()# session.close()

ForeignKey图片 9图片 10

class Student:    __tablename__ = 'student'    id = Column(Integer, primary_key=True)    name = Column(String, index=True, nullable=False)    course_list = relationship('Course', secondary='student2course', backref='student_list')class Course:    __tablename__ = 'course'    id = Column(Integer, primary_key=True)    title = Column(String, index=True, nullable=False)class Student2Course:    __tablename__ = 'student2course'    id = Column(Integer, primary_key=True, autoincrement=True)    student_id = Column(Integer, ForeignKey('student.id'))    course_id = Column(Integer, ForeignKey('course.id'))    __table_args__ = (        UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引        # Index('ix_id_name', 'name', 'extra'),        # 联合索引    )

后面新建的三个表

连接的方式:

from duoduo.test import Users,Depart,Student,Course,Student2Coursefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engineengine = create_engine(        "mysql pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",        max_overflow=0,  # 超过连接池大小外最多创建的连接        pool_size=5,  # 连接池大小        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收    )SessionFactory = sessionmaker(bind=engine)#连接#第一种方式# #并发# def task():#     #去连接池获取一个连接#     session=SessionFactory()#     ret=session.query.all()#     print#     #将连接交还给连接池#     session.close()### from threading import Thread## for i in range:#     t=Thread(target=task)#     t.start()##第二种方式from sqlalchemy.orm import scoped_sessionsession=scoped_session(SessionFactory)def task():    ret=session.query.all()    print    session.remove()  #连接断开from threading import Threadfor i in range(20):    t=Thread(target=task)    t.start()

执行原生SQ:

from sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.orm import scoped_sessionfrom models import Student,Course,Student2Courseengine = create_engine(        "mysql pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",        max_overflow=0,  # 超过连接池大小外最多创建的连接        pool_size=5,  # 连接池大小        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收    )SessionFactory = sessionmaker(bind=engine)session = scoped_session(SessionFactory)def task():    """"""    # 方式一:    """    # 查询    # cursor = session.execute('select * from users')    # result = cursor.fetchall()    # 添加    cursor = session.execute('INSERT INTO users VALUES', params={"value": 'duoduo'})    session.commit()    print(cursor.lastrowid)    """    # 方式二:    """    # conn = engine.raw_connection()    # cursor = conn.cursor()    # cursor.execute(    #     "select * from t1"    # )    # result = cursor.fetchall()    # cursor.close()    # conn.close()    """    # 将连接交还给连接池    session.remove()from threading import Threadfor i in range(20):    t = Thread(target=task)    t.start()
import sqlchemy
sqlchemy.__version__ # 0.10.1

1 字段类型

类型名 python中的类型 说明
Integer int 存储整形 32位
SmallInteger int 小整形 16为
BigInteger int 大整形
Float float 浮点数
String str 字符串 varchar
Text str 长文本
Boolean bool bool值
Date datetimedate 日期
Time datetime.time 时间
datetime datetime.datetime 时间日期

二、Flask的第三方组件

如果没有报错,则安装成功

2 可选条件

选项 说明
primary_key 主键, 如果设为True,表示主键
unique 唯一索引 ,如果设为True,这列唯一
index 常规索引, 如果设为True,创建索引,提升查询效率
nullable 是否可以为null 默认True
default 默认值

1、flask-sqlalchemy

a、先下载安装

pip3 install flask-sqlalchemy

b、项目下的__init__.py导入

#第一步  :导入并实例化SQLALchemyfrom flask_sqlalchemy import SQLAlchemydb=SQLAlchemy()#一定要在蓝图导入的上面,是全局变量
#也要导入表的models.py的所有表

c、初始化

db.init_app  #在注册蓝图的地方的下面

d、在配置文件中写入配置

# ##### SQLALchemy配置文件 #####    SQLALCHEMY_DATABASE_URI = "mysql pymysql://root:123@127.0.0.1:3306/duoduo?charset=utf8"    SQLALCHEMY_POOL_SIZE = 10    SQLALCHEMY_MAX_OVERFLOW = 5

e、创建models.py中的类

from sqlalchemy import Columnfrom sqlalchemy import Integer,String,Text,Date,DateTimefrom  import dbclass Users:  #继承的db.Model    __tablename__ = 'users'    id = Column(Integer, primary_key=True)    name = Column(String, index=True, nullable=False)    # depart_id = Column

f、生成表

from  import db,create_appapp = create_app()app_ctx = app.app_context() # app_ctx = app/gwith app_ctx: # __enter__,通过LocalStack放入Local中    db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置

补充一个知识点:

class  Foo:    def __enter__:        print('进入')    def __exit__(self, exc_type, exc_val, exc_tb):        print('出来')obj =Foo()with obj:  #with Foo()    print('多多')#结果 #进入#多多#出来

g、基于ORM对数据库进行操作

from flask import Blueprintfrom  import dbfrom  import modelsus = Blueprint('us',__name__)@us.route('/index')def index():    # 使用SQLAlchemy在数据库中插入一条数据    # db.session.add(models.Users(name='多多',depart_id=1))     #插入数据    # db.session.commit()    # db.session.remove()    result = db.session.query(models.Users).all()    print    db.session.remove()    return 'Index'

这里面db.session.add的源码:

图片 11

图片 12

二 显式调用
所谓显示调用,就是免去操作 sqlchemy 和 python class 的映射操作,而是由代码直接完成。关于 sqlchemy的详细使用,再另外一篇博客介绍。移步 sqlchemy的使用教程。
新建一个 flask项目。项目结构如下:

(4)在flask中使用ORM模型

下面我们使用ORM模型

from flask import Flask
from flask_script import Manager
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql pymysql://root:20111673@127.0.0.1:3306/demo'
db = SQLAlchemy(app)  #

manager = Manager(app)

#创建User用户,表名为user
class User(db.Model):
    __table__name = 'user'
    id = db.Column(db.Integer,primary_key=True)
    username = db.Column(db.String(20),index=True)
    sex = db.Column(db.Boolean,default=True)
    info = db.Column(db.String(50))

# 定义一个视图函数
@app.route('/create')
def create():
    # db.drop_all()  #删除仅为模型表
    db.create_all()  #创建模型表
    return '创建成功'

if __name__ == '__main__':
    manager.run()

2、 flask-script

下载安装

pip3 install flask-script

功能:

  a、增加runsever

from  import create_appfrom flask_script import Managerapp = create_app()manager = Managerif __name__ == '__main__':    # app.run()    manager.run()

  b、位置传参

from  import create_appfrom flask_script import Managerapp = create_app()manager = Manager@manager.commanddef custom:    """    自定义命令    python manage.py custom 123    :param arg:    :return:    """    printif __name__ == '__main__':    # app.run()    manager.run()

  c、关键字传参  

from flask_script import Managerfrom  import create_appapp = create_app()manager = Manager@manager.option('-n', '--name', dest='name')@manager.option('-u', '--url', dest='url')def cmd(name, url):    """    自定义命令    执行: python manage.py  cmd -n qianduoduo -u http://www.baidu.com    :param name:    :param url:    :return:    """    print(name, url)if __name__ == '__main__':    # app.run()    manager.run()
(env)ghost@ghost-H61M-S2V-B3:~/project/flask/fsqlauto$ tree
.
├── app.py
├── database.py
├── models.py
├── static
└── templates

2 directories, 3 files

(5)增加数据

添加数据方式1

#方式1
# sqlalchemy默认开启了事务处理
@app.route('/insert/')
def insert():
    try:
        u = User(username='WANGWU',info='personal WANGWU message')
        db.session.add(u)  #添加数据对象
        db.session.commit()  #事务提交
    except:
        db.session.rollback()#事务回滚
    return '添加单条数据!'

@app.route('/insertMany/')
def insertMany():
    u1 = User(username='name1',info='personal name1 message')
    u2 = User(username='name2',info='personal name2 message')
    db.session.add_all([u1,u2]) #以add_all(数据对象列表)
    db.session.commit() #
    return '添加多条数据!'

添加数据方式2

#方式2
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True  #在app设置里开启自动提交
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False  #关闭数据追踪,避免内存资源浪费

@app.route('/insertMany/')
def insertMany():
    u1 = User(username='name1',info='personal name1 message')
    u2 = User(username='name2',info='personal name2 message')
    db.session.add_all([u1,u2])
    return '提交多条数据'

3、flask-migrate

安装

pip3 install flask-migrate

配置是依赖flask-script

from flask_script import Managerfrom flask_migrate import Migrate, MigrateCommandfrom  import create_appfrom  import dbapp = create_app()manager = ManagerMigrate"""# 数据库迁移命名    python manage.py db init   #只执行一次    python manage.py db migrate # makemirations    python manage.py db upgrade # migrate"""manager.add_command('db', MigrateCommand)if __name__ == '__main__':    # app.run()    manager.run()

我们先定义 数据库连接,操作 database.py

(6)更新与删除

# 类名.query  返回对应的查询集
# 类名.query.get(查询条件)  返回对应的查询对象
@app.route('/update/')
def update():
    u = User.query.get(1)
    u.username = 'update name'  #更新内容
    db.session.add(u)   #进行添加
    return 'update'

# 删除数据
@app.route('/delete/')
def delete():
    u = User.query.get(2)  #找到对应的查询集对象
    db.session.delete(u)  # 删除对应的u对象
    return 'delete id=2'

补充工具:(协同开发的保证开发的环境一致性)

1、pipreqs

找到项目使用的所有组件的版本

pip3 install  pipreqs

在项目中的命令行

pipreqs ./ --encoding=utf-8

在项目中找了文件requirements.txt

如何装那些模块?

pip install 的时候可以指定一个文件,他会自己读每一行,然后安装

pycharm 打开文件的时候也会提示下载

2、虚拟环境 (开发环境版本不能共存的)

安装

pip3 install virtualenv

创建虚拟环境

#找一个存放虚拟环境的文件夹virtualenv env1 --no-site-packages  创建环境activate  #激活环境deactivate # 退出环境#也可以用pycharm鼠标点点就好了,最新的版本就有虚拟环境goon功能#当我们需要特定的环境,可以在电脑上有多高开发环境
# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine('sqlite:///./test.db', convert_unicode=True) # 创建数据库引擎( 当前目录下保存数据库文件) 
db_session = scoped_session(sessionmaker(autocommit=False,
                                         autoflush=False,
                                         bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

def init_db():
    # 在这里导入所有的可能与定义模型有关的模块,这样他们才会合适地
    # 在 metadata 中注册。否则,您将不得不在第一次执行 init_db() 时
    # 先导入他们。
    import models
    Base.metadata.create_all(bind=engine)

(7) 拆分MVT

目录结构

project/
    manage.py  #启动项存放
    ext.py  #作为当前sqlalchemy扩展
    settings.py  #配置存放
    app/
        __init__.py
        models.py  #应用models.py
        views.py   #应用视图views.py
    templates/     #模板目录
    static/  #静态文件目录

ext.py SQLAlchemy扩展

from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()   #实例化db对象

蓝本view view.py视图函数

from flask import Blueprint
from .models import User
from ext import db
#创建蓝本view
view = Blueprint('view',__name__)
#定义视图函数
@view.route('/')
def index():
    return 'index'

@view.route('/insert/')
def insert():
    u = User(username='张三',info='个人信息')
    db.session.add(u)
    return 'insert success'

蓝本view models.py模型类

from ext import db  #导入db
#构建User模型类
class User(db.Model,Base):
    __table__name = 'user'
    id = db.Column(db.Integer,primary_key=True)
    username = db.Column(db.String(20),index=True)
    sex = db.Column(db.Boolean,default=True)
    info = db.Column(db.String(50))

manage.py启动项

from flask import Flask
from flask_script import Manager
from ext import db
import settings
from app.view import view

app = Flask(__name__)
#将系统配置项Config类加载到app
app.config.from_object(settings.Config)
#通过db对象将app初始化
db.init_app(app)
#将蓝图view注册进app
app.register_blueprint(view)
manager = Manager(app)

if __name__ == '__main__':
    manager.run()

setting.py配置文件

class Config:
    #设置mysql pymysql的连接
    SQLALCHEMY_DATABASE_URI = 'mysql pymysql://root:20111673@127.0.0.1:3306/demo'
    #加密设置
    SECRETE_KEY = 'secret_key'
    #关闭数据追踪
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    #开启提交
    SQLALCHEMY_COMMIT_ON_TEARDOWN = True

前面我们采用系统的每次自动提交session 即SQLALCHEMY_COMMIT_ON_TEARDOWN

但是如果想自己定义提交方式,同时不想传入关键字参数,那么该怎样入手呢?这里提供一种思路

init_db 方法创建数据库
然后定义我们的 models.py

(8) 自定义增删改类

我们对模型类进行了修改,models.py 内容如下:

from ext import db
#定义了base基类
class Base:
    def save(self):
        try:
            db.session.add(self)  #self实例化对象代表就是u对象
            db.session.commit()  
        except:
            db.session.rollback()
    #定义静态类方法接收List参数        
    @staticmethod
    def save_all(List):
        try:
            db.session.add_all(List)
            db.session.commit()
        except:
            db.session.rollback()
    #定义删除方法
    def delete(self):
        try:
            db.session.delete(self)  
            db.session.commit()  
        except:
            db.session.rollback()
#定义模型user类
class User(db.Model,Base):
    __table__name = 'user'
    id = db.Column(db.Integer,primary_key=True)
    username = db.Column(db.String(20),index=True)
    sex = db.Column(db.Boolean,default=True)
    info = db.Column(db.String(50))
    #
    def __init__(self,username='',info='',sex=True):
        self.username = username
        self.info = info
        self.sex = sex
#注意:
#原实例化:  u = User(username='张三',info='个人信息')
#现实例化: u = User('李四','李四个人信息')

在views.py中使用

from flask import Blueprint
from .models import User
from ext import db

view = Blueprint('view',__name__)

@view.route('/')
def index():
    return 'index'
#插入单条数据
@view.route('/insert/')
def insert():
    # u = User(username='test',info='default')
    u = User('xiaomeng','default')
    u.save()
    db.session.add(u)
    return 'insert success'
#保存多条数据
@view.route('/saveMany/')
def saveMany():
    u1 = User('zhan123','default123')
    u2 = User('li123','default message')
    User.save_all([u1,u2])
    return 'add many'
#删除数据
@view.route('/delete/')
def delete():
    u = User.query.get(1)  #获取查询集
    u.delete()
    return 'delete message'

其他都不做改变,基本思路是封装到类,通过多继承来实现方法的调用。

# -*- coding: utf-8 -*-

from sqlalchemy import Column, Integer, String
from database import Base

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True)
    email = Column(String(120), unique=True)

    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __repr__(self):
        return '%s (%r, %r)' % (self.__class__.__name__, self.name, self.email)

定义了一个 python class ,实际上是映射了 users 表里.
最后就是我们的应用程序主入口

# -*- coding: utf-8 -*-

from flask import Flask
from database import init_db, db_session
from models import User

app = Flask(__name__)

@app.teardown_request
def shutdown_session(exception=None):
    db_session.remove()


@app.route('/')
def index():
    return 'hello world flask'

@app.route('/add/<name>/<email>')
def add(name, email):
    u = User(name=name, email=email)
    try:
        db_session.add(u)
        db_session.commit()
    except Exception, e:
        return 'wrong'
    return 'Add %s user successfully' % name

@app.route('/get/<name>')
def get(name):
    try:
        u = User.query.filter(User.name==name).first()
    except Exception, e:
        return 'there isnot %s' % name
    return 'hello %s' % u.name

if __name__ == '__main__':
    init_db()
    app.debug = True
    app.run()

需要注意,定义了一个 @app.teardown_request 装饰器.用查询完毕后关闭数据库,具体可以参考flask文档 数据库.
这里的主方法中 init_db 主要是初始化数据库,如果数据库存在就链接读取.
add 是数据库添加记录方法, get 正好是数据库查询,更多例子请参考 sqlchemy教程

三 手动实现 ORM
上面是自动实现 orm,如果是手动,相应的文件修改如下:
database.py

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker


engine = create_engine('sqlite:///./test.db', convert_unicode=True)
metadata = MetaData()
db_session = scoped_session(sessionmaker(autocommit=False,
                                        autoflush=False,
                                        bind=engine))


def init_db():
    metadata.create_all(bind=engine)

models.py

from sqlalchemy import Table, Column, Integer, String
from sqlalchemy.orm import mapper
from database import metadata, db_session

class User(object):

    query = db_session.query_property()

    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __repr__(self):
        return '<User %s>' % (self.name)

users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50), unique=True),
    Column('email', String(120), unique=True)
)

mapper(User, users)

model 需要手动定义 table 并指定映射
app.py

# -*- coding: utf-8 -*-

import sqlite3
from flask import Flask
from database import *
from models import *

app = Flask(__name__)


@app.teardown_request
def teardown_request(exception=None):
    db_session.remove()

@app.route('/')
def index():
    return 'hello'

@app.route('/add/<name>/<email>')
def add(name, email):
    u = User(name, email)
    try:
        db_session.add(u)    
        db_session.commit()  
    except Exception, e:
        return 'wrong'

    return '%s add successful' % name

@app.route('/get/<name>')
def get(name):
    try:
        u = User.query.filter(User.name==name).first()
    except Exception, e:
        return 'there isnot %s' % name
    return 'hello %s' % u.name


if __name__ == '__main__':
    init_db()
    app.debug = True
    app.run()

更底层的操作就是 在 flask app 里面写 sql 查询,具体和 sqlalchemy 使用方法一致

注:本文转载自
http://rsj217.diandian.com/post/2014-01-09/40060722735

编辑:ca88 本文来源:开垦进级Flask的底蕴,使用教程

关键词: 亚洲城ca88