ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

drf(十一) jwt的原理及使用

2022-04-12 17:32:20  阅读:270  来源: 互联网

标签:十一 jwt json payload header token options drf


JWT的原理及使用

介绍:

# jwt 一般用于用户认证(前后端分离,微信小程序,uniapp)的开发
json web token

认证流程。

image-20220409095343973

1. 区别

  • 传统认证

    用户登录,服务端返回token,并将token保存在服务端
    以后用户再来访问,需要携带token,服务端获取token后,再去数据库中获取token进行校验。
    
  • jwt认证

    用户登录,服务端返回一个token(服务端不保存)
    以后用户再来访问,需要携带token,服务端获取token后,再做token校验
    
    优势:相较于传统的token相比,它无需在服务端保存token
    

2. jwt 实现原理

jwt官网示例

image-20220409102254112

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

注意:jwt 生成的 token 是由三段字符串组成,并使用.进行连接

  • 第一段字符串,Header 内部包含算法/token 类型

    json 转化为字符串,然后做base64 url加密(base64url加密;+_)

    {
      "alg": "HS256",
      "typ": "JWT"
    }
    
  • 第二段字符串,pyload 自定义值。

    json 转化为字符串,然后做base64 url加密(base64url加密;+_)

    {
      "id": "1234567890",  //可以传入用户id
      "name": "John Doe", // 可以自定义值,用户名
      "iat": 1516239022 // 失效时间
    }
    
    /*注:一般不用传入用户密码,否则有泄露的危险。*/
    
  • 第三段字符串

    第一步:将第一步和第二步的密文进行拼接
    eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ
    第二步:对前两部分的密文进行HS256加密 + 加盐
    第三步:对HS256加密的密文再做base64url加密
    
  • 以后用户再来访问的时候,需要携带 token,后端对 token 进行校验。

    • 获取token

    • 第一步:对 token 进行切割

      eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
      
    • 第二步:对第二段进行base64url解密,并获取 payload 信息,检测 token 是否已经超时。

      {
        "id": "1234567890",  //可以传入用户id
        "name": "John Doe", // 可以自定义值,用户名
        "iat": 1516239022 // 超时时间
      }
      
    • 第三步:把第1,2 段的内容拼接再次执行 HASH256加密。

      第一步:将第一步和第二步的密文进行拼接
      eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ
      第二步:对前两部分的密文进行HS256加密 + 加盐
      
      密文=base64解密(SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c)
      如果相等,表示token 未被修改过(认证通过)
      
    • 说明:请保证的保密性。

3. 脚本使用

3.1 jwt 加密使用

import datetime

import jwt
from jwt import exceptions

SALT="ABCDEF"
def create_token():
    # 构造headers
    headers={
        "alg": "HS256",
        "typ": "JWT"
    }
    pyload={
        'id':1,
        'name':'ziqingbaojian',
        'iat':datetime.datetime.now()+datetime.timedelta(days=1) #有效期一天
    }
    result=jwt.encode(payload=pyload,key=SALT,algorithm="HS256",headers=headers)
    return result

image-20220411195821338

3.2 jwt 解密

def get_payload(token):
    '''
    根据token获取payload
    :param token:
    :return:
    '''
    try:
        verified_payload = jwt.decode(token, SALT, ['HS256'])
        return verified_payload
    except exceptions.DecodeError:
        print("token 认证失败")
    except exceptions.ExpiredSignatureError:
        print("token 已失效")
    except exceptions.InvalidTokenError:
        print("非法的token")
        
if __name__ == '__main__':
    token=create_token()
    print(token) #生成 token
    payload=get_payload(token)
    print(payload) # 生成解密数据

image-20220412151929048

4. jwt源码浅读

说明:原理在上述已经介绍完毕

# 使用encode方法完成加密。
result = jwt.encode(payload=payload, key=SALT, algorithm="HS256", headers=headers)

image-20220412084412155

class PyJwT():
    pass
_jwt_global_obj = PyJWT()
encode = _jwt_global_obj.encode
decode_complete = _jwt_global_obj.decode_complete
decode = _jwt_global_obj.decode

查看源码得知,_jwt_global_obj 使用了单利模式的设计模式。encode 调用对象方法。

def encode(
    self,
    payload: Dict[str, Any],
    key: str,
    algorithm: Optional[str] = "HS256",
    headers: Optional[Dict] = None,
    json_encoder: Optional[Type[json.JSONEncoder]] = None,
) -> str:
    # Check that we get a mapping
    if not isinstance(payload, Mapping):
        raise TypeError(
            "Expecting a mapping object, as JWT only supports "
            "JSON objects as payloads."
        )

    # Payload
    payload = payload.copy()
    for time_claim in ["exp", "iat", "nbf"]:
        # 将datetime转换为已知时间格式声明中的intDate值
        # Convert datetime to a intDate value in known time-format claims
        '''因此,时间的有效值键可以是这三个值'''
        if isinstance(payload.get(time_claim), datetime):
            payload[time_claim] = timegm(payload[time_claim].utctimetuple())

    json_payload = json.dumps(
        payload, separators=(",", ":"), cls=json_encoder
    ).encode("utf-8")

    return api_jws.encode(json_payload, key, algorithm, headers, json_encoder) #执行 api_jws.encode 方法。

image-20220412152917583

def encode(
    self,
    payload: bytes,
    key: str,
    algorithm: Optional[str] = "HS256",
    headers: Optional[Dict] = None,
    json_encoder: Optional[Type[json.JSONEncoder]] = None,
) -> str:
    segments = []

    if algorithm is None:
        algorithm = "none"

    # Prefer headers["alg"] if present to algorithm parameter.
    if headers and "alg" in headers and headers["alg"]:
        algorithm = headers["alg"]

    # Header
    header = {"typ": self.header_typ, "alg": algorithm} # 默认头部信息

    if headers:
        self._validate_headers(headers)
        header.update(headers)
        if not header["typ"]:
            del header["typ"]

    json_header = json.dumps(
        header, separators=(",", ":"), cls=json_encoder
    ).encode() # 使用json格式化加密头

    segments.append(base64url_encode(json_header)) # 使用 base64url进行加密
    segments.append(base64url_encode(payload))#使用base64url加密payload数据部分
	# 将两次结果存储到 segments 列表
    
    # Segments
    signing_input = b".".join(segments) # 使用`.`进行拼接两端字符串,作为待加密的字符串
    try:
        alg_obj = self._algorithms[algorithm] # 获取加密算法的类型
        key = alg_obj.prepare_key(key)# 加盐
        signature = alg_obj.sign(signing_input, key) # 使用加密算法加盐,并生成密文

    except KeyError as e:
        if not has_crypto and algorithm in requires_cryptography:
            raise NotImplementedError(
                "Algorithm '%s' could not be found. Do you have cryptography "
                "installed?" % algorithm
            ) from e
        else:
            raise NotImplementedError("Algorithm not supported") from e

    segments.append(base64url_encode(signature)) # 将base64url加密密文,并添加到列表中

    encoded_string = b".".join(segments) #是用`.`将三段密文进行拼接

    return encoded_string.decode("utf-8") # 将结果进行返回。

解密源码

def decode(
    self,
    jwt: str,
    key: str = "",
    algorithms: List[str] = None,
    options: Dict = None,
    **kwargs,
) -> Dict[str, Any]:
    # 解密时需要传入的参数。
    
    # 执行方法decode_complete
    decoded = self.decode_complete(jwt, key, algorithms, options, **kwargs)
    return decoded["payload"] #返回结果字典中的值

decode_complete()

def decode_complete(
    self,
    jwt: str,
    key: str = "",
    algorithms: List[str] = None,
    options: Dict = None,
    **kwargs,
) -> Dict[str, Any]:
    
    # 将verify_signature赋值为True
    if options is None: # 空字典赋值
        # 验证签名
        options = {"verify_signature": True}
    else:
        options.setdefault("verify_signature", True)

    if not options["verify_signature"]:
        options.setdefault("verify_exp", False)
        options.setdefault("verify_nbf", False)
        options.setdefault("verify_iat", False)
        options.setdefault("verify_aud", False)
        options.setdefault("verify_iss", False)

    if options["verify_signature"] and not algorithms:
        raise DecodeError(
            'It is required that you pass in a value for the "algorithms" argument when calling decode().'
        )
	# 执行该方法decode_complete
    decoded = api_jws.decode_complete(
        jwt,
        key=key,
        algorithms=algorithms,
        options=options,
        **kwargs,
    )

    try:
        payload = json.loads(decoded["payload"]) #使用json解析数据。
    except ValueError as e:
        raise DecodeError("Invalid payload string: %s" % e)
    if not isinstance(payload, dict):
        raise DecodeError("Invalid payload string: must be a json object")

    merged_options = {**self.options, **options}
    self._validate_claims(payload, merged_options, **kwargs)

    decoded["payload"] = payload
    return decoded

decode_complete()

def decode_complete(
    self,
    jwt: str,
    key: str = "",
    algorithms: List[str] = None,
    options: Dict = None,
    **kwargs,
) -> Dict[str, Any]:
    if options is None:
        options = {}
    merged_options = {**self.options, **options}
    verify_signature = merged_options["verify_signature"]

    if verify_signature and not algorithms:
        raise DecodeError(
            'It is required that you pass in a value for the "algorithms" argument when calling decode().'
        )
	# 执行私有返回数据
    payload, signing_input, header, signature = self._load(jwt)

    if verify_signature:
        self._verify_signature(signing_input, header, signature, key, algorithms)

    return {
        "payload": payload,
        "header": header,
        "signature": signature,
    }

_load();

def _load(self, jwt):
    if isinstance(jwt, str):
        jwt = jwt.encode("utf-8") #转换编码

    if not isinstance(jwt, bytes):
        raise DecodeError(f"Invalid token type. Token must be a {bytes}")

    try:
        # 使用字符串分割(按照`.`),从最后一个点分割,获取到两个值;分别是一二部分和三部分
        signing_input, crypto_segment = jwt.rsplit(b".", 1)
        
        # 对前一个分割的前一部分进行再次分割,得到前两部分的密文
        header_segment, payload_segment = signing_input.split(b".", 1)
    except ValueError as err:
        raise DecodeError("Not enough segments") from err

    try:
        header_data = base64url_decode(header_segment) #使用base64url解密请求头
    except (TypeError, binascii.Error) as err:
        raise DecodeError("Invalid header padding") from err

    try:
        header = json.loads(header_data) # json解析
    except ValueError as e:
        raise DecodeError("Invalid header string: %s" % e) from e

    if not isinstance(header, Mapping):
        raise DecodeError("Invalid header string: must be a json object")

    try:
        payload = base64url_decode(payload_segment) #解密第二部分。自定义值的部分
    except (TypeError, binascii.Error) as err:
        raise DecodeError("Invalid payload padding") from err

    try:
        signature = base64url_decode(crypto_segment) 
        #将第三部分的密文进行base64url解密,得到HS256加密的密文
    except (TypeError, binascii.Error) as err:
        raise DecodeError("Invalid crypto padding") from err

    return (payload, signing_input, header, signature) #返回结果

补充:字符串分割

image-20220412160141439

rsplit()从右(尾部)面开始分割,第一个点作为分割元素

5.drf中使用jwt认证

5.1 视图函数

from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
# Create your views here.
from app01 import models
from appjwt.utils import jwt_auth

class JwtView(APIView):
    authentication_classes = [] #注:登录函数不需要进行验证。
    def post(self,request,*args,**kwargs):
        ret = {'code':1000,'msg':None} #初始化返回值
        try:
            user = request._request.POST.get('username')
            pwd = request._request.POST.get('password')
            # 往数据库查询参数
            obj = models.UserInfo.objects.filter(username=user,password=pwd).first()
            if not obj:# 用户不存在
                ret['code'] = 1001
                ret['msg'] = "用户名或密码错误"
            # 为登录用户创建token
            payload={
                "id":obj.id,
                "name":obj.username
            }
            token = jwt_auth.create_token(payload)# 使用默认的失效时间
            ret['token'] = token
        except Exception as e:
            ret['code'] = 1002
            ret['msg'] = '请求异常'
        return Response(ret)

5.2 认证类

说明:将解密 jwt 的代码封装到认证类中,继承 drf 中的认证类。

import jwt
from jwt import exceptions
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from django.conf import settings

class JwtAuthentication(BaseAuthentication):
    def authenticate(self, request):
        token=request.query_params.get("token")
        SALT=settings.SECRET_KEY
        try:
            verified_payload = jwt.decode(token, SALT, ['HS256'])
        except exceptions.DecodeError:
            raise AuthenticationFailed({"1000":"token 认证失败"})
        except exceptions.ExpiredSignatureError:
            raise AuthenticationFailed({'1001':"token 已失效"})
        except exceptions.InvalidTokenError:
            raise AuthenticationFailed({"1002":"非法token"})
        return (verified_payload,token)

    def authenticate_header(self, request):
        pass

5.3 生成token的文件

import datetime

import jwt

from django.conf import settings

SALT=settings.SECRET_KEY
def create_token(payload,timeout=1):
    # 构造headers
    headers={
        "alg": "HS256",
        "typ": "JWT"
    }
    # 构造payload,默认失效时间是一分钟
    payload['exp']=datetime.datetime.now()+datetime.timedelta(minutes=timeout)
    token=jwt.encode(payload=payload,key=SALT,algorithm="HS256",headers=headers)
    return token

配置文件

REST_FRAMEWORK={
    "DEFAULT_AUTHENTICATION_CLASSES":['appjwt.utils.auth.JwtAuthentication',], #全局认证类
    
    
    
    # "UNAUTHENTICATED_USER":None, # 匿名,request.user = None
    # "UNAUTHENTICATED_TOKEN":None,
    # "DEFAULT_PERMISSION_CLASSES":['app01.utils.permission.MyPermission',],
    # "DEFAULT_THROTTLE_CLASSES":['app01.utils.throttle.MyThrottle',],# 匿名用户不能在全局配置需要为登录功能单独添加
    "DEFAULT_THROTTLE_RATES":{
        "visit":'3/m',#一分钟三次,匿名用户
        "loginuser":'10/m',# 登录成功,一分钟10次
    },
    "PAGE_SIZE":2,
    "DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",
    "DEFAULT_VERSION":'v1',
    "ALLOWED_VERSIONS":['v1','v2'], #允许的版本号
    "VERSION_PARAM":"version",# 这个参数应该和 路由中的名称相同version/
    "DEFAULT_PARSER_CLASSES":['rest_framework.parsers.JSONParser','rest_framework.parsers.FormParser'],
    "DEFAULT_RENDERER_CLASSES":['rest_framework.renderers.JSONRenderer','rest_framework.renderers.BrowsableAPIRenderer']
}

5.4 使用效果

登录生成 token

image-20220412170236012

image-20220412170249871

根据token查询结果

image-20220412171215388

6.扩展

pip install djangorestframework-jwt # 内部仍然调用的pyjwt

说明:djangorestframework-jwt 的使用仅限制在 drf 中,而 pyjwt 可以使用在任何框架中使用范围较广。

继续努力,终成大器!

标签:十一,jwt,json,payload,header,token,options,drf
来源: https://www.cnblogs.com/Blogwj123/p/16136125.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有