目录
一、认证(补充的一个点)
认证请求头
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 from rest_framework.views import APIView
4 from rest_framework.response import Response
5 from rest_framework.authentication import BaseAuthentication
6 from rest_framework.permissions import BasePermission
7
8 from rest_framework.request import Request
9 from rest_framework import exceptions
10
11 token_list = [
12 'sfsfss123kuf3j123',
13 'asijnfowerkkf9812',
14 ]
15
16
17 class TestAuthentication(BaseAuthentication):
18 def authenticate(self, request):
19 """
20 用户认证,如果验证成功后返回元组: (用户,用户Token)
21 :param request:
22 :return:
23 None,表示跳过该验证;
24 如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
25 self._authenticator = None
26 if api_settings.UNAUTHENTICATED_USER:
27 self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户
28 else:
29 self.user = None
30
31 if api_settings.UNAUTHENTICATED_TOKEN:
32 self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None
33 else:
34 self.auth = None
35 (user,token)表示验证通过并设置用户名和Token;
36 AuthenticationFailed异常
37 """
38 val = request.query_params.get('token')
39 if val not in token_list:
40 raise exceptions.AuthenticationFailed("用户认证失败")
41
42 return ('登录用户', '用户token')
43
44 def authenticate_header(self, request):
45 """
46 Return a string to be used as the value of the `WWW-Authenticate`
47 header in a `401 Unauthenticated` response, or `None` if the
48 authentication scheme should return `403 Permission Denied` responses.
49 """
50 pass
51
52
53 class TestPermission(BasePermission):
54 message = "权限验证失败"
55
56 def has_permission(self, request, view):
57 """
58 判断是否有权限访问当前请求
59 Return `True` if permission is granted, `False` otherwise.
60 :param request:
61 :param view:
62 :return: True有权限;False无权限
63 """
64 if request.user == "管理员":
65 return True
66
67 # GenericAPIView中get_object时调用
68 def has_object_permission(self, request, view, obj):
69 """
70 视图继承GenericAPIView,并在其中使用get_object时获取对象时,触发单独对象权限验证
71 Return `True` if permission is granted, `False` otherwise.
72 :param request:
73 :param view:
74 :param obj:
75 :return: True有权限;False无权限
76 """
77 if request.user == "管理员":
78 return True
79
80
81 class TestView(APIView):
82 # 认证的动作是由request.user触发
83 authentication_classes = [TestAuthentication, ]
84
85 # 权限
86 # 循环执行所有的权限
87 permission_classes = [TestPermission, ]
88
89 def get(self, request, *args, **kwargs):
90 # self.dispatch
91 print(request.user)
92 print(request.auth)
93 return Response('GET请求,响应内容')
94
95 def post(self, request, *args, **kwargs):
96 return Response('POST请求,响应内容')
97
98 def put(self, request, *args, **kwargs):
99 return Response('PUT请求,响应内容')
views.py
1 #
2 class MyAuthtication(BasicAuthentication):
3 def authenticate(self, request):
4 token = request.query_params.get('token') #注意是没有GET的,用query_params表示
5 if token == 'zxxzzxzc':
6 return ('uuuuuu','afsdsgdf') #返回user,auth
7 # raise AuthenticationFailed('认证错误') #只要抛出认证错误这样的异常就会去执行下面的函数
8 raise APIException('认证错误')
9 def authenticate_header(self, request): #认证不成功的时候执行
10 return 'Basic reala="api"'
11
12 class UserView(APIView):
13 authentication_classes = [MyAuthtication,]
14 def get(self,request,*args,**kwargs):
15 print(request.user)
16 print(request.auth)
17 return Response('用户列表')
自定义认证功能
二、权限
1、需求:Host是匿名用户和用户都能访问 #匿名用户的request.user = none;User只有注册用户能访问
1 from app03 import views
2 from django.conf.urls import url
3 urlpatterns = [
4 # django rest framework
5 url('^auth/', views.AuthView.as_view()),
6 url(r'^hosts/', views.HostView.as_view()),
7 url(r'^users/', views.UsersView.as_view()),
8 url(r'^salary/', views.SalaryView.as_view()),
9 ]
urls.py
1 from django.shortcuts import render
2 from rest_framework.views import APIView #继承的view
3 from rest_framework.response import Response #友好的返回
4 from rest_framework.authentication import BaseAuthentication #认证的类
5 from rest_framework.authentication import BasicAuthentication
6 from app01 import models
7 from rest_framework import exceptions
8 from rest_framework.permissions import AllowAny #权限在这个类里面
9 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
10 # Create your views here.
11 # +++++++++++++++认证类和权限类========================
12 class MyAuthentication(BaseAuthentication):
13 def authenticate(self, request):
14 token = request.query_params.get('token')
15 obj = models.UserInfo.objects.filter(token=token).first()
16 if obj : #如果认证成功,返回用户名和auth
17 return (obj.username,obj)
18 return None #如果没有认证成功就不处理,进行下一步
19
20 def authenticate_header(self, request):
21 pass
22
23 class MyPermission(object):
24 message = '无权访问'
25 def has_permission(self,request,view): #has_permission里面的self是view视图对象
26 if request.user:
27 return True #如果不是匿名用户就说明有权限
28 return False #否则无权限
29
30 class AdminPermission(object):
31 message = '无权访问'
32 def has_permission(self, request, view): # has_permission里面的self是view视图对象
33 if request.user=='haiyun':
34 return True # 返回True表示有权限
35 return False #返回False表示无权限
36
37 # +++++++++++++++++++++++++++
38 class AuthView(APIView):
39 authentication_classes = [] #认证页面不需要认证
40
41 def get(self,request):
42 self.dispatch
43 return '认证列表'
44
45 class HostView(APIView):
46 '''需求:
47 Host是匿名用户和用户都能访问 #匿名用户的request.user = none
48 User只有注册用户能访问
49 '''
50 authentication_classes = [MyAuthentication,]
51 permission_classes = [] #都能访问就没必要设置权限了
52 def get(self,request):
53 print(request.user)
54 print(request.auth)
55 return Response('主机列表')
56
57 class UsersView(APIView):
58 '''用户能访问,request.user里面有值'''
59 authentication_classes = [MyAuthentication,]
60 permission_classes = [MyPermission,]
61 def get(self,request):
62 print(request.user,'111111111')
63 return Response('用户列表')
64
65 def permission_denied(self, request, message=None):
66 """
67 If request is not permitted, determine what kind of exception to raise.
68 """
69 if request.authenticators and not request.successful_authenticator:
70 '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
71 raise exceptions.NotAuthenticated(detail='无权访问')
72 raise exceptions.PermissionDenied(detail=message)
views.py
1 class SalaryView(APIView):
2 '''用户能访问'''
3 message ='无权访问'
4 authentication_classes = [MyAuthentication,] #验证是不是用户
5 permission_classes = [MyPermission,AdminPermission,] #再看用户有没有权限,如果有权限在判断有没有管理员的权限
6 def get(self,request):
7 return Response('薪资列表')
8
9 def permission_denied(self, request, message=None):
10 """
11 If request is not permitted, determine what kind of exception to raise.
12 """
13 if request.authenticators and not request.successful_authenticator:
14 '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
15 raise exceptions.NotAuthenticated(detail='无权访问')
16 raise exceptions.PermissionDenied(detail=message)
认证和权限配合使用
如果遇上这样的,还可以自定制,参考源码
def check_permissions(self, request):
"""
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
"""
for permission in self.get_permissions():
#循环每一个permission对象,调用has_permission
#如果False,则抛出异常
#True 说明有权访问
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
'''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message)
那么我们可以重写permission_denied这个方法,如下:
1 class UsersView(APIView):
2 '''用户能访问,request.user里面有值'''
3 authentication_classes = [MyAuthentication,]
4 permission_classes = [MyPermission,]
5 def get(self,request):
6 return Response('用户列表')
7
8 def permission_denied(self, request, message=None):
9 """
10 If request is not permitted, determine what kind of exception to raise.
11 """
12 if request.authenticators and not request.successful_authenticator:
13 '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
14 raise exceptions.NotAuthenticated(detail='无权访问')
15 raise exceptions.PermissionDenied(detail=message)
views.py
2. 全局使用
上述操作中均是对单独视图进行特殊配置,如果想要对全局进行配置,则需要再配置文件中写入即可。
1 REST_FRAMEWORK = {
2 'UNAUTHENTICATED_USER': None,
3 'UNAUTHENTICATED_TOKEN': None, #将匿名用户设置为None
4 "DEFAULT_AUTHENTICATION_CLASSES": [
5 "app01.utils.MyAuthentication",
6 ],
7 'DEFAULT_PERMISSION_CLASSES':[
8 "app03.utils.MyPermission",#设置路径,
9 ]
10 }
settings.py
1 class AuthView(APIView):
2 authentication_classes = [] #认证页面不需要认证
3
4 def get(self,request):
5 self.dispatch
6 return '认证列表'
7
8 class HostView(APIView):
9 '''需求:
10 Host是匿名用户和用户都能访问 #匿名用户的request.user = none
11 User只有注册用户能访问
12 '''
13 authentication_classes = [MyAuthentication,]
14 permission_classes = [] #都能访问就没必要设置权限了
15 def get(self,request):
16 print(request.user)
17 print(request.auth)
18 return Response('主机列表')
19
20 class UsersView(APIView):
21 '''用户能访问,request.user里面有值'''
22 authentication_classes = [MyAuthentication,]
23 permission_classes = [MyPermission,]
24 def get(self,request):
25 print(request.user,'111111111')
26 return Response('用户列表')
27
28 def permission_denied(self, request, message=None):
29 """
30 If request is not permitted, determine what kind of exception to raise.
31 """
32 if request.authenticators and not request.successful_authenticator:
33 '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
34 raise exceptions.NotAuthenticated(detail='无权访问')
35 raise exceptions.PermissionDenied(detail=message)
36
37
38 class SalaryView(APIView):
39 '''用户能访问'''
40 message ='无权访问'
41 authentication_classes = [MyAuthentication,] #验证是不是用户
42 permission_classes = [MyPermission,AdminPermission,] #再看用户有没有权限,如果有权限在判断有没有管理员的权限
43 def get(self,request):
44 return Response('薪资列表')
45
46 def permission_denied(self, request, message=None):
47 """
48 If request is not permitted, determine what kind of exception to raise.
49 """
50 if request.authenticators and not request.successful_authenticator:
51 '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
52 raise exceptions.NotAuthenticated(detail='无权访问')
53 raise exceptions.PermissionDenied(detail=message)
Views.py
三、限流
1、为什么要限流呢?
答:防爬
2、限制访问频率源码分析
1 self.check_throttles(request)
self.check_throttles(request)
1 def check_throttles(self, request):
2 """
3 Check if request should be throttled.
4 Raises an appropriate exception if the request is throttled.
5 """
6 for throttle in self.get_throttles():
7 #循环每一个throttle对象,执行allow_request方法
8 # allow_request:
9 #返回False,说明限制访问频率
10 #返回True,说明不限制,通行
11 if not throttle.allow_request(request, self):
12 self.throttled(request, throttle.wait())
13 #throttle.wait()表示还要等多少秒就能访问了
check_throttles
1 def get_throttles(self):
2 """
3 Instantiates and returns the list of throttles that this view uses.
4 """
5 #返回对象
6 return [throttle() for throttle in self.throttle_classes]
get_throttles
1 throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
找到类,可自定制类throttle_classes
1 class BaseThrottle(object):
2 """
3 Rate throttling of requests.
4 """
5
6 def allow_request(self, request, view):
7 """
8 Return `True` if the request should be allowed, `False` otherwise.
9 """
10 raise NotImplementedError('.allow_request() must be overridden')
11
12 def get_ident(self, request):
13 """
14 Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
15 if present and number of proxies is > 0. If not use all of
16 HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
17 """
18 xff = request.META.get('HTTP_X_FORWARDED_FOR')
19 remote_addr = request.META.get('REMOTE_ADDR')
20 num_proxies = api_settings.NUM_PROXIES
21
22 if num_proxies is not None:
23 if num_proxies == 0 or xff is None:
24 return remote_addr
25 addrs = xff.split(',')
26 client_addr = addrs[-min(num_proxies, len(addrs))]
27 return client_addr.strip()
28
29 return ''.join(xff.split()) if xff else remote_addr
30
31 def wait(self):
32 """
33 Optionally, return a recommended number of seconds to wait before
34 the next request.
35 """
36 return None
BaseThrottle
1 zz
也可以重写allow_request方法
1 def throttled(self, request, wait):
2 """
3 If request is throttled, determine what kind of exception to raise.
4 """
5 raise exceptions.Throttled(wait)
可自定制返回的错误信息throttled
1 class Throttled(APIException):
2 status_code = status.HTTP_429_TOO_MANY_REQUESTS
3 default_detail = _('Request was throttled.')
4 extra_detail_singular = 'Expected available in {wait} second.'
5 extra_detail_plural = 'Expected available in {wait} seconds.'
6 default_code = 'throttled'
7
8 def __init__(self, wait=None, detail=None, code=None):
9 if detail is None:
10 detail = force_text(self.default_detail)
11 if wait is not None:
12 wait = math.ceil(wait)
13 detail = ' '.join((
14 detail,
15 force_text(ungettext(self.extra_detail_singular.format(wait=wait),
16 self.extra_detail_plural.format(wait=wait),
17 wait))))
18 self.wait = wait
19 super(Throttled, self).__init__(detail, code)
raise exceptions.Throttled(wait)错误信息详情
下面来看看最简单的从源码中分析的示例,这只是举例说明了一下
1 from django.conf.urls import url
2 from app04 import views
3 urlpatterns = [
4 url('limit/',views.LimitView.as_view()),
5
6 ]
urls.py
1 from django.shortcuts import render
2 from rest_framework.views import APIView
3 from rest_framework.response import Response
4 from rest_framework import exceptions
5 # from rest_framewor import
6 # Create your views here.
7 class MyThrottle(object):
8 def allow_request(self,request,view):
9 #返回False,限制
10 #返回True,不限制
11 pass
12 def wait(self):
13 return 1000
14
15
16 class LimitView(APIView):
17 authentication_classes = [] #不让认证用户
18 permission_classes = [] #不让验证权限
19 throttle_classes = [MyThrottle, ]
20 def get(self,request):
21 # self.dispatch
22 return Response('控制访问频率示例')
23
24 def throttled(self, request, wait):
25 '''可定制方法设置中文错误'''
26 # raise exceptions.Throttled(wait)
27 class MyThrottle(exceptions.Throttled):
28 default_detail = '请求被限制'
29 extra_detail_singular = 'Expected available in {wait} second.'
30 extra_detail_plural = 'Expected available in {wait} seconds.'
31 default_code = '还需要再等{wait}秒'
32 raise MyThrottle(wait)
views.py
3、需求:对匿名用户进行限制,每个用户一分钟允许访问10次(只针对用户来说)
a、基于用户IP限制访问频率
流程分析:
- 先获取用户信息,如果是匿名用户,获取IP。如果不是匿名用户就可以获取用户名。
- 获取匿名用户IP,在request里面获取,比如IP= 1.1.1.1。
- 吧获取到的IP添加到到recode字典里面,需要在添加之前先限制一下。
- 如果时间间隔大于60秒,说明时间久远了,就把那个时间给剔除 了pop。在timelist列表里面现在留的是有效的访问时间段。
- 然后判断他的访问次数超过了10次没有,如果超过了时间就return False。
- 美中不足的是时间是固定的,我们改变他为动态的:列表里面最开始进来的时间和当前的时间进行比较,看需要等多久。
具体实现:
1 from django.shortcuts import render
2 from rest_framework.views import APIView
3 from rest_framework.response import Response
4 from rest_framework import exceptions
5 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制访问频率
6 import time
7 # Create your views here.
8 RECORD = {}
9 class MyThrottle(BaseThrottle):
10
11 def allow_request(self,request,view):
12 '''对匿名用户进行限制,每个用户一分钟访问10次 '''
13 ctime = time.time()
14 ip = '1.1.1.1'
15 if ip not in RECORD:
16 RECORD[ip] = [ctime]
17 else:
18 #[152042123,15204212,3152042,123152042123]
19 time_list = RECORD[ip] #获取ip里面的值
20 while True:
21 val = time_list[-1]#取出最后一个时间,也就是访问最早的时间
22 if (ctime-60)>val: #吧时间大于60秒的给剔除了
23 time_list.pop()
24 #剔除了之后timelist里面就是有效的时间了,在进行判断他的访问次数是不是超过10次
25 else:
26 break
27 if len(time_list) >10:
28 return False # 返回False,限制
29 time_list.insert(0, ctime)
30 return True #返回True,不限制
31
32 def wait(self):
33 ctime = time.time()
34 first_in_time = RECORD['1.1.1.1'][-1]
35 wt = 60-(ctime-first_in_time)
36 return wt
37
38
39 class LimitView(APIView):
40 authentication_classes = [] #不让认证用户
41 permission_classes = [] #不让验证权限
42 throttle_classes = [MyThrottle, ]
43 def get(self,request):
44 # self.dispatch
45 return Response('控制访问频率示例')
46
47 def throttled(self, request, wait):
48 '''可定制方法设置中文错误'''
49 # raise exceptions.Throttled(wait)
50 class MyThrottle(exceptions.Throttled):
51 default_detail = '请求被限制'
52 extra_detail_singular = 'Expected available in {wait} second.'
53 extra_detail_plural = 'Expected available in {wait} seconds.'
54 default_code = '还需要再等{wait}秒'
55 raise MyThrottle(wait)
views初级版本
1 # from django.shortcuts import render
2 # from rest_framework.views import APIView
3 # from rest_framework.response import Response
4 # from rest_framework import exceptions
5 # from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制访问频率
6 # import time
7 # # Create your views here.
8 # RECORD = {}
9 # class MyThrottle(BaseThrottle):
10 #
11 # def allow_request(self,request,view):
12 # '''对匿名用户进行限制,每个用户一分钟访问10次 '''
13 # ctime = time.time()
14 # ip = '1.1.1.1'
15 # if ip not in RECORD:
16 # RECORD[ip] = [ctime]
17 # else:
18 # #[152042123,15204212,3152042,123152042123]
19 # time_list = RECORD[ip] #获取ip里面的值
20 # while True:
21 # val = time_list[-1]#取出最后一个时间,也就是访问最早的时间
22 # if (ctime-60)>val: #吧时间大于60秒的给剔除了
23 # time_list.pop()
24 # #剔除了之后timelist里面就是有效的时间了,在进行判断他的访问次数是不是超过10次
25 # else:
26 # break
27 # if len(time_list) >10:
28 # return False # 返回False,限制
29 # time_list.insert(0, ctime)
30 # return True #返回True,不限制
31 #
32 # def wait(self):
33 # ctime = time.time()
34 # first_in_time = RECORD['1.1.1.1'][-1]
35 # wt = 60-(ctime-first_in_time)
36 # return wt
37 #
38 #
39 # class LimitView(APIView):
40 # authentication_classes = [] #不让认证用户
41 # permission_classes = [] #不让验证权限
42 # throttle_classes = [MyThrottle, ]
43 # def get(self,request):
44 # # self.dispatch
45 # return Response('控制访问频率示例')
46 #
47 # def throttled(self, request, wait):
48 # '''可定制方法设置中文错误'''
49 # # raise exceptions.Throttled(wait)
50 # class MyThrottle(exceptions.Throttled):
51 # default_detail = '请求被限制'
52 # extra_detail_singular = 'Expected available in {wait} second.'
53 # extra_detail_plural = 'Expected available in {wait} seconds.'
54 # default_code = '还需要再等{wait}秒'
55 # raise MyThrottle(wait)
56
57
58
59 from django.shortcuts import render
60 from rest_framework.views import APIView
61 from rest_framework.response import Response
62 from rest_framework import exceptions
63 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制访问频率
64 import time
65 # Create your views here.
66 RECORD = {}
67 class MyThrottle(BaseThrottle):
68
69 def allow_request(self,request,view):
70 '''对匿名用户进行限制,每个用户一分钟访问10次 '''
71 ctime = time.time()
72 self.ip =self.get_ident(request)
73 if self.ip not in RECORD:
74 RECORD[self.ip] = [ctime]
75 else:
76 #[152042123,15204212,3152042,123152042123]
77 time_list = RECORD[self.ip] #获取ip里面的值
78 while True:
79 val = time_list[-1]#取出最后一个时间,也就是访问最早的时间
80 if (ctime-60)>val: #吧时间大于60秒的给剔除了
81 time_list.pop()
82 #剔除了之后timelist里面就是有效的时间了,在进行判断他的访问次数是不是超过10次
83 else:
84 break
85 if len(time_list) >10:
86 return False # 返回False,限制
87 time_list.insert(0, ctime)
88 return True #返回True,不限制
89
90 def wait(self):
91 ctime = time.time()
92 first_in_time = RECORD[self.ip][-1]
93 wt = 60-(ctime-first_in_time)
94 return wt
95
96
97 class LimitView(APIView):
98 authentication_classes = [] #不让认证用户
99 permission_classes = [] #不让验证权限
100 throttle_classes = [MyThrottle, ]
101 def get(self,request):
102 # self.dispatch
103 return Response('控制访问频率示例')
104
105 def throttled(self, request, wait):
106 '''可定制方法设置中文错误'''
107 # raise exceptions.Throttled(wait)
108 class MyThrottle(exceptions.Throttled):
109 default_detail = '请求被限制'
110 extra_detail_singular = 'Expected available in {wait} second.'
111 extra_detail_plural = 'Expected available in {wait} seconds.'
112 default_code = '还需要再等{wait}秒'
113 raise MyThrottle(wait)
稍微做了改动
b、用resetframework内部的限制访问频率(利于Django缓存)
源码分析:
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制访问频率
1 class BaseThrottle(object):
2 """
3 Rate throttling of requests.
4 """
5
6 def allow_request(self, request, view):
7 """
8 Return `True` if the request should be allowed, `False` otherwise.
9 """
10 raise NotImplementedError('.allow_request() must be overridden')
11
12 def get_ident(self, request): #唯一标识
13 """
14 Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
15 if present and number of proxies is > 0. If not use all of
16 HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
17 """
18 xff = request.META.get('HTTP_X_FORWARDED_FOR')
19 remote_addr = request.META.get('REMOTE_ADDR') #获取IP等
20 num_proxies = api_settings.NUM_PROXIES
21
22 if num_proxies is not None:
23 if num_proxies == 0 or xff is None:
24 return remote_addr
25 addrs = xff.split(',')
26 client_addr = addrs[-min(num_proxies, len(addrs))]
27 return client_addr.strip()
28
29 return ''.join(xff.split()) if xff else remote_addr
30
31 def wait(self):
32 """
33 Optionally, return a recommended number of seconds to wait before
34 the next request.
35 """
36 return None
BaseThrottle相当于一个抽象类
1 class SimpleRateThrottle(BaseThrottle):
2 """
3 一个简单的缓存实现,只需要` get_cache_key() `。被覆盖。
4 速率(请求/秒)是由视图上的“速率”属性设置的。类。该属性是一个字符串的形式number_of_requests /期。
5 周期应该是:(的),“秒”,“M”,“min”,“h”,“小时”,“D”,“一天”。
6 以前用于节流的请求信息存储在高速缓存中。
7 A simple cache implementation, that only requires `.get_cache_key()`
8 to be overridden.
9
10 The rate (requests / seconds) is set by a `rate` attribute on the View
11 class. The attribute is a string of the form 'number_of_requests/period'.
12
13 Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')
14
15 Previous request information used for throttling is stored in the cache.
16 """
17 cache = default_cache
18 timer = time.time
19 cache_format = 'throttle_%(scope)s_%(ident)s'
20 scope = None
21 THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
22
23 def __init__(self):
24 if not getattr(self, 'rate', None):
25 self.rate = self.get_rate()
26 self.num_requests, self.duration = self.parse_rate(self.rate)
27
28 def get_cache_key(self, request, view):#这个相当于是一个半成品,我们可以来补充它
29 """
30 Should return a unique cache-key which can be used for throttling.
31 Must be overridden.
32
33 May return `None` if the request should not be throttled.
34 """
35 raise NotImplementedError('.get_cache_key() must be overridden')
36
37 def get_rate(self):
38 """
39 Determine the string representation of the allowed request rate.
40 """
41 if not getattr(self, 'scope', None):
42 msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
43 self.__class__.__name__)
44 raise ImproperlyConfigured(msg)
45
46 try:
47 return self.THROTTLE_RATES[self.scope]
48 except KeyError:
49 msg = "No default throttle rate set for '%s' scope" % self.scope
50 raise ImproperlyConfigured(msg)
51
52 def parse_rate(self, rate):
53 """
54 Given the request rate string, return a two tuple of:
55 <allowed number of requests>, <period of time in seconds>
56 """
57 if rate is None:
58 return (None, None)
59 num, period = rate.split('/')
60 num_requests = int(num)
61 duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
62 return (num_requests, duration)
63
64 #1、一进来会先执行他,
65 def allow_request(self, request, view):
66 """
67 Implement the check to see if the request should be throttled.
68
69 On success calls `throttle_success`.
70 On failure calls `throttle_failure`.
71 """
72 if self.rate is None:
73 return True
74
75 self.key = self.get_cache_key(request, view) #2、执行get_cache_key,这里的self.key就相当于我们举例ip
76 if self.key is None:
77 return True
78
79 self.history = self.cache.get(self.key, []) #3、得到的key,默认是一个列表,赋值给了self.history,
80 # 这时候self.history就是每一个ip对应的访问记录
81 self.now = self.timer()
82
83 # Drop any requests from the history which have now passed the
84 # throttle duration
85 while self.history and self.history[-1] <= self.now - self.duration:
86 self.history.pop()
87 if len(self.history) >= self.num_requests:
88 return self.throttle_failure()
89 return self.throttle_success()
90
91 def throttle_success(self):
92 """
93 Inserts the current request's timestamp along with the key
94 into the cache.
95 """
96 self.history.insert(0, self.now)
97 self.cache.set(self.key, self.history, self.duration)
98 return True
99
100 def throttle_failure(self):
101 """
102 Called when a request to the API has failed due to throttling.
103 """
104 return False
105
106 def wait(self):
107 """
108 Returns the recommended next request time in seconds.
109 """
110 if self.history:
111 remaining_duration = self.duration - (self.now - self.history[-1])
112 else:
113 remaining_duration = self.duration
114
115 available_requests = self.num_requests - len(self.history) + 1
116 if available_requests <= 0:
117 return None
118
119 return remaining_duration / float(available_requests)
SimpleRateThrottle
请求一进来会先执行SimpleRateThrottle这个类的构造方法
1 def __init__(self):
2 if not getattr(self, 'rate', None):
3 self.rate = self.get_rate() #点进去看到需要些一个scope ,2/m
4 self.num_requests, self.duration = self.parse_rate(self.rate)
__init__
1 def get_rate(self):
2 """
3 Determine the string representation of the allowed request rate.
4 """
5 if not getattr(self, 'scope', None): #检测必须有scope,没有就报错了
6 msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
7 self.__class__.__name__)
8 raise ImproperlyConfigured(msg)
9
10 try:
11 return self.THROTTLE_RATES[self.scope]
12 except KeyError:
13 msg = "No default throttle rate set for '%s' scope" % self.scope
14 raise ImproperlyConfigured(msg)
get_rate
1 def parse_rate(self, rate):
2 """
3 Given the request rate string, return a two tuple of:
4 <allowed number of requests>, <period of time in seconds>
5 """
6 if rate is None:
7 return (None, None)
8 num, period = rate.split('/')
9 num_requests = int(num)
10 duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
11 return (num_requests, duration)
parse_rate
1 #2、接下来会先执行他,
2 def allow_request(self, request, view):
3 """
4 Implement the check to see if the request should be throttled.
5
6 On success calls `throttle_success`.
7 On failure calls `throttle_failure`.
8 """
9 if self.rate is None:
10 return True
11
12 self.key = self.get_cache_key(request, view) #2、执行get_cache_key,这里的self.key就相当于我们举例ip
13 if self.key is None:
14 return True #不限制
15 # [114521212,11452121211,45212121145,21212114,521212]
16 self.history = self.cache.get(self.key, []) #3、得到的key,默认是一个列表,赋值给了self.history,
17 # 这时候self.history就是每一个ip对应的访问记录
18 self.now = self.timer()
19
20 # Drop any requests from the history which have now passed the
21 # throttle duration
22 while self.history and self.history[-1] <= self.now - self.duration:
23 self.history.pop()
24 if len(self.history) >= self.num_requests:
25 return self.throttle_failure()
26 return self.throttle_success()
allow_request
1 def wait(self):
2 """
3 Returns the recommended next request time in seconds.
4 """
5 if self.history:
6 remaining_duration = self.duration - (self.now - self.history[-1])
7 else:
8 remaining_duration = self.duration
9
10 available_requests = self.num_requests - len(self.history) + 1
11 if available_requests <= 0:
12 return None
13
14 return remaining_duration / float(available_requests)
wait
代码实现:
1 ###########用resetframework内部的限制访问频率##############
2 class MySimpleRateThrottle(SimpleRateThrottle):
3 scope = 'xxx'
4 def get_cache_key(self, request, view):
5 return self.get_ident(request) #返回唯一标识IP
6
7 class LimitView(APIView):
8 authentication_classes = [] #不让认证用户
9 permission_classes = [] #不让验证权限
10 throttle_classes = [MySimpleRateThrottle, ]
11 def get(self,request):
12 # self.dispatch
13 return Response('控制访问频率示例')
14
15 def throttled(self, request, wait):
16 '''可定制方法设置中文错误'''
17 # raise exceptions.Throttled(wait)
18 class MyThrottle(exceptions.Throttled):
19 default_detail = '请求被限制'
20 extra_detail_singular = 'Expected available in {wait} second.'
21 extra_detail_plural = 'Expected available in {wait} seconds.'
22 default_code = '还需要再等{wait}秒'
23 raise MyThrottle(wait)
views.py
记得在settings里面配置
1 REST_FRAMEWORK = {
2 'UNAUTHENTICATED_USER': None,
3 'UNAUTHENTICATED_TOKEN': None, #将匿名用户设置为None
4 "DEFAULT_AUTHENTICATION_CLASSES": [
5 "app01.utils.MyAuthentication",
6 ],
7 'DEFAULT_PERMISSION_CLASSES':[
8 # "app03.utils.MyPermission",#设置路径,
9 ],
10 'DEFAULT_THROTTLE_RATES':{
11 'xxx':'2/minute' #2分钟
12 }
13 }
14
15 #缓存:放在文件
16 CACHES = {
17 'default': {
18 'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache',
19 'LOCATION': 'cache', #文件路径
20 }
21 }
settings.py
4、对匿名用户进行限制,每个用户1分钟允许访问5次,对于登录的普通用户1分钟访问10次,VIP用户一分钟访问20次
- 比如首页可以匿名访问
- #先认证,只有认证了才知道是不是匿名的,
- #权限登录成功之后才能访问, ,index页面就不需要权限了
- If request.user #判断登录了没有
1 from django.contrib import admin
2
3 from django.conf.urls import url, include
4 from app05 import views
5
6 urlpatterns = [
7 url('index/',views.IndexView.as_view()),
8 url('manage/',views.ManageView.as_view()),
9 ]
urls.py
1 from django.shortcuts import render
2 from rest_framework.views import APIView
3 from rest_framework.response import Response
4 from rest_framework.authentication import BaseAuthentication #认证需要
5 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限流处理
6 from rest_framework.permissions import BasePermission
7 from rest_framework import exceptions
8 from app01 import models
9 # Create your views here.
10 ###############3##认证#####################
11 class MyAuthentcate(BaseAuthentication):
12 '''检查用户是否存在,如果存在就返回user和auth,如果没有就返回'''
13 def authenticate(self, request):
14 token = request.query_params.get('token')
15 obj = models.UserInfo.objects.filter(token=token).first()
16 if obj:
17 return (obj.username,obj.token)
18 return None #表示我不处理
19
20 ##################权限#####################
21 class MyPermission(BasePermission):
22 message='无权访问'
23 def has_permission(self, request, view):
24 if request.user:
25 return True #true表示有权限
26 return False #false表示无权限
27
28 class AdminPermission(BasePermission):
29 message = '无权访问'
30
31 def has_permission(self, request, view):
32 if request.user=='haiyan':
33 return True # true表示有权限
34 return False # false表示无权限
35
36 ############3#####限流##################3##
37 class AnonThrottle(SimpleRateThrottle):
38 scope = 'wdp_anon' #相当于设置了最大的访问次数和时间
39 def get_cache_key(self, request, view):
40 if request.user:
41 return None #返回None表示我不限制,登录用户我不管
42 #匿名用户
43 return self.get_ident(request) #返回一个唯一标识IP
44
45 class UserThrottle(SimpleRateThrottle):
46 scope = 'wdp_user'
47 def get_cache_key(self, request, view):
48 #登录用户
49 if request.user:
50 return request.user
51 return None #返回NOne表示匿名用户我不管
52
53
54 ##################视图#####################
55 #首页支持匿名访问,
56 #无需要登录就可以访问
57 class IndexView(APIView):
58 authentication_classes = [MyAuthentcate,] #认证判断他是不是匿名用户
59 permission_classes = [] #一般主页就不需要权限验证了
60 throttle_classes = [AnonThrottle,UserThrottle,] #对匿名用户和普通用户的访问限制
61
62 def get(self,request):
63 # self.dispatch
64 return Response('访问首页')
65
66 def throttled(self, request, wait):
67 '''可定制方法设置中文错误'''
68
69 # raise exceptions.Throttled(wait)
70 class MyThrottle(exceptions.Throttled):
71 default_detail = '请求被限制'
72 extra_detail_singular = 'Expected available in {wait} second.'
73 extra_detail_plural = 'Expected available in {wait} seconds.'
74 default_code = '还需要再等{wait}秒'
75
76 raise MyThrottle(wait)
77
78 #需登录就可以访问
79 class ManageView(APIView):
80 authentication_classes = [MyAuthentcate, ] # 认证判断他是不是匿名用户
81 permission_classes = [MyPermission,] # 一般主页就不需要权限验证了
82 throttle_classes = [AnonThrottle, UserThrottle, ] # 对匿名用户和普通用户的访问限制
83
84 def get(self, request):
85 # self.dispatch
86 return Response('管理人员访问页面')
87
88 def throttled(self, request, wait):
89 '''可定制方法设置中文错误'''
90
91 # raise exceptions.Throttled(wait)
92 class MyThrottle(exceptions.Throttled):
93 default_detail = '请求被限制'
94 extra_detail_singular = 'Expected available in {wait} second.'
95 extra_detail_plural = 'Expected available in {wait} seconds.'
96 default_code = '还需要再等{wait}秒'
97
98 raise MyThrottle(wait)
views.py
四、总结
1、认证:就是检查用户是否存在;如果存在返回(request.user,request.auth);不存在request.user/request.auth=None
2、权限:进行职责的划分
3、限制访问频率
认证
- 类:authenticate/authenticate_header ##验证不成功的时候执行的
- 返回值:
- return None,
- return (user,auth),
- raise 异常
- 配置:
- 视图:
class IndexView(APIView):
authentication_classes = [MyAuthentication,]
- 全局:
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
"DEFAULT_AUTHENTICATION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
权限
- 类:has_permission/has_object_permission
- 返回值:
- True、#有权限
- False、#无权限
- exceptions.PermissionDenied(detail="错误信息") #异常自己随意,想抛就抛,错误信息自己指定
- 配置:
- 视图:
class IndexView(APIView):
permission_classes = [MyPermission,]
- 全局:
REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
限流
- 类:allow_request/wait PS: scope = "wdp_user"
- 返回值: return True、#不限制 return False #限制
- 配置:
- 视图:
class IndexView(APIView):
throttle_classes=[AnonThrottle,UserThrottle,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('访问首页')
- 全局
REST_FRAMEWORK = {
"DEFAULT_THROTTLE_CLASSES":[
],
'DEFAULT_THROTTLE_RATES':{
'wdp_anon':'5/minute',
'wdp_user':'10/minute',
}
}