FLask框架的简单介绍
Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于接收http请求并对请求进行预处理,然后触发Flask框架,开发人员基于Flask框架提供的功能对请求进行相应的处理,并返回给用户,如果要返回给用户复杂的内容时,需要借助jinja2模板来实现对模板的处理,即:将模板和数据进行渲染,将渲染后的字符串返回给用户浏览器。
“微”(micro) 并不表示你需要把整个 Web 应用塞进单个 Python 文件(虽然确实可以 ),也不意味着 Flask 在功能上有所欠缺。微框架中的“微”意味着 Flask 旨在保持核心简单而易于扩展。Flask 不会替你做出太多决策——比如使用何种数据库。而那些 Flask 所选择的——比如使用何种模板引擎——则很容易替换。除此之外的一切都由可由你掌握。如此,Flask 可以与您珠联璧合。
默认情况下,Flask 不包含数据库抽象层、表单验证,或是其它任何已有多种库可以胜任的功能。然而,Flask 支持用扩展来给应用添加这些功能,如同是 Flask 本身实现的一样。众多的扩展提供了数据库集成、表单验证、上传处理、各种各样的开放认证技术等功能。Flask 也许是“微小”的,但它已准备好在需求繁杂的生产环境中投入使用。
1
pip3 install flask
路由系统
@app.route('/login/',methods=["GET","POST"],endpoint="login",)
methods:支持的请求方式,endpoint:用于反向生成url(url_for)
from flask import Flask, url_for
app = Flask(__name__)
# 定义转换的类
from werkzeug.routing import BaseConverter
class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex
def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
:param value:
:return:
"""
return int(value)
def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
:param value:
:return:
"""
val = super(RegexConverter, self).to_url(value)
return val
# 添加到converts中
app.url_map.converters['re'] = RegexConverter
# 进行使用
@app.route('/index/<re("\d+"):nid>')
def index(nid):
url_for('xx', nid=123)
return "Index"
if __name__ == '__main__':
app.run()
扩展Flask的路由系统,让他支持正则
视图函数
#第一种
@app.route('/index', endpoint='xx')
def index(nid):
url_for('xx', nid=123)
return "Index"
#第二种
def index(nid):
url_for('xx', nid=123)
return "Index"
app.add_url_rule('/index', index)
FBV
def auth(func):
'''装饰器'''
def inner(*args, **kwargs):
result = func(*args, **kwargs)
return result
return inner
class IndexView(views.MethodView):
# methods = ['POST']
decorators = [auth, ] #添加装饰器
def get(self):
v = url_for('index')
print(v)
return "GET"
def post(self):
return "GET"
app.add_url_rule('/index', view_func=IndexView.as_view(name='index'))
if __name__ == '__main__':
app.run()
CBV
请求和响应
from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
@app.route('/login.html', methods=['GET', "POST"])
def login():
# 请求相关信息
# request.method
# request.args
# request.form
# request.values
# request.cookies
# request.headers
# request.path
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files
# obj = request.files['the_file_name']
# obj.save('/var/www/uploads/' + secure_filename(f.filename))
# 响应相关信息
# return "字符串"
# return render_template('html模板路径',**{})
# return redirect('/index.html')
# response = make_response(render_template('index.html'))
# response是flask.wrappers.Response类型
# response.delete_cookie('key')
# response.set_cookie('key', 'value')
# response.headers['X-Something'] = 'A value'
# return response
return "内容"
if __name__ == '__main__':
app.run()
View Code
模板语言
Flask使用的是Jinja2模板,所以其语法和Django基本无差别
不过在django模板中执行函数或方法时,不用加括号就会自己执行,而Flask必须自己加括号才会执行。
flask中的Markup等价django的mark_safe
#第一种
@app.template_global()
def tag(a1, a2):
return a1 + a2 + 100
模板中的调用方式
{{tag(1,2)}}
#第二种
@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3
模板中的调用方式
{{ 1|db(2,3) }}
自定义标签的使用
闪现
from flask import Flask,flash,get_flashed_messages
用flash跟get_flashed_messages来实现(原理就是设置session和删除session)
扩展
@app.before_request
def process_request1():
print('process_request1')
@app.after_request
def process_response1(response):
print('process_response1')
return response
@app.before_request
def process_request2():
print('process_request2')
@app.after_request
def process_response2(response):
print('process_response2')
return response
伪中间件
配置文件
class BaseConfig(object):
'''
写共同的配置
'''
pass
class TestConfig(BaseConfig):
'''写自己独有的配置'''
DB = '127.0.0.1'
class DevConfig(BaseConfig):
'''写自己独有的配置'''
DB = '192.168.1.1'
class ProConfig(BaseConfig):
'''写自己独有的配置'''
DB = '47.18.1.1'
settings
配置方式 # 方式一: # app.config['SESSION_COOKIE_NAME'] = 'session_lvning' # 方式二: # app.config.from_pyfile('settings.py') # 方式三: # import os # os.environ['FLAKS-SETTINGS'] = 'settings.py' # app.config.from_envvar('FLAKS-SETTINGS') # 方式四: # app.config.from_object('settings.DevConfig')
蓝图
蓝图的功能就是将不同的功能放在不同的py文件中
例如:
order.py
from flask import Blueprint
order = Blueprint('order',__name__)
@order.route('/order')
def order():
return 'Order'
account.py
from flask import Blueprint,render_template
account = Blueprint('account',__name__)
@account.route('/login')
def login():
return render_template('login.html')
_init_.py
from flask import Flask
from .views import account
from .views import order
app = Flask(__name__)
app.register_blueprint(account.account)
app.register_blueprint(order.order)
数据库连接池
"""
为每个线程创建一个连接,thread.local实现。
"""
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable=False,
# 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def func():
# conn = SteadyDBConnection()
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect() conn.close()
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
conn.close()
import threading
for i in range(10):
t = threading.Thread(target=func)
t.start()
模式一
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
# PooledDedicatedDBConnection
conn = POOL.connection()
# print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '\r\n')
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close()
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close()
func()
模式二