ding.interaction.base.app¶
ding.interaction.base.app
¶
Full Source Code
../ding/interaction/base/app.py
1import json 2from enum import IntEnum, unique 3from functools import wraps 4from typing import Mapping, Any, Type, Optional, Tuple, Union, Iterable, Callable 5 6import flask 7import requests 8from flask import jsonify 9 10 11@unique 12class CommonErrorCode(IntEnum): 13 SUCCESS = 0 14 COMMON_FAILURE = 1 15 16 17def flask_response( 18 success: bool, 19 data: Optional[Mapping[str, Any]] = None, 20 message: Optional[str] = None, 21 code: Optional[int] = None, 22): 23 return jsonify( 24 { 25 'success': success, 26 'code': 0 if success else (code or CommonErrorCode.COMMON_FAILURE), 27 'message': (message or 'Success.') if success else (message or 'Failed.'), 28 'data': data, 29 } 30 ) 31 32 33def success_response(data: Optional[Mapping[str, Any]] = None, message: Optional[str] = None): 34 return flask_response( 35 success=True, 36 code=CommonErrorCode.SUCCESS, 37 message=message, 38 data=data, 39 ) 40 41 42def failure_response( 43 code: Optional[int] = None, message: Optional[str] = None, data: Optional[Mapping[str, Any]] = None 44): 45 return flask_response( 46 success=False, 47 code=code or CommonErrorCode.COMMON_FAILURE, 48 message=message, 49 data=data, 50 ) 51 52 53_RESPONSE_VALUE_TYPE = Tuple[int, bool, int, str, Mapping[str, Any]] 54 55 56def get_values_from_response(response: Union[requests.Response, flask.Response]) -> _RESPONSE_VALUE_TYPE: 57 status_code = response.status_code 58 59 _content = response.content if hasattr(response, 'content') else response.data 60 _json = json.loads(_content.decode()) 61 success, code, message, data = _json['success'], _json['code'], _json.get('message', ''), _json.get('data', {}) 62 63 return status_code, success, code, message, data 64 65 66class ResponsibleException(Exception): 67 68 def __init__( 69 self, 70 code: int = CommonErrorCode.COMMON_FAILURE, 71 message: Optional[str] = None, 72 data: Optional[Mapping[str, Any]] = None, 73 status_code: int = 400 74 ): 75 Exception.__init__(self, message) 76 self.__code = code 77 self.__message = message 78 self.__data = data or {} 79 self.__status_code = status_code 80 81 def get_response(self): 82 return failure_response(self.__code, self.__message, self.__data), self.__status_code 83 84 85def responsible(classes: Iterable[Type[ResponsibleException]] = None): 86 if classes is None: 87 classes = (ResponsibleException, ) 88 89 def _decorator(func: Callable[..., Any]) -> Callable[..., Any]: 90 91 @wraps(func) 92 def _func(*args, **kwargs): 93 try: 94 ret = func(*args, **kwargs) 95 except tuple(classes) as err: 96 return err.get_response() 97 else: 98 return ret 99 100 return _func 101 102 return _decorator