Source code for pyArango.jwauth

from base64 import b64decode
import time
import json as json_mod
import logging


import requests
from requests import exceptions as requests_exceptions


[docs]class JWTAuth(requests.auth.AuthBase): # Half a day before the actual expiration. REAUTH_TIME_INTERVEL = 43200 def __init__( self, username, password, urls, use_lock_for_reseting_jwt=False, max_retries=5 ): self.username = username self.password = password self.urls = urls self.lock_for_reseting_jwt = Lock() if use_lock_for_reseting_jwt else None self.__init_request_session(max_retries) self.__set_token() def __init_request_session(self, max_retries): self.max_retries = max_retries self.session = requests.Session() http = requests.adapters.HTTPAdapter(max_retries=max_retries) https = requests.adapters.HTTPAdapter(max_retries=max_retries) self.session.mount('http://', http) self.session.mount('https://', https) def __parse_token(self): decoded_token = b64decode(self.token.split('.')[1].encode()) return json_mod.loads(decoded_token.decode()) def __get_auth_token(self): request_data = '{"username":"%s","password":"%s"}' % (self.username, self.password) for connection_url in self.urls: try: response = self.session.post('%s/_open/auth' % connection_url, data=request_data) if response.ok: json_data = response.content if json_data: data_dict = json_mod.loads(json_data.decode("utf-8")) return data_dict.get('jwt') except requests_exceptions.ConnectionError: if connection_url is not self.urls[-1]: logging.critical("Unable to connect to %s trying another", connection_url) else: logging.critical("Unable to connect to any of the urls: %s", self.urls) raise def __set_token(self): self.token = self.__get_auth_token() self.parsed_token = \ self.__parse_token() if self.token is not None else {} self.token_last_updated = time.time() def reset_token(self): logging.warning("Reseting the token.") self.__set_token() def is_token_expired(self): return ( self.parsed_token.get("exp", 0) - time.time() < JWTAuth.REAUTH_TIME_INTERVEL ) def __call__(self, req): # Implement JWT authentication if self.is_token_expired(): if self.lock_for_reseting_jwt is not None: self.lock_for_reseting_jwt.acquire() if self.is_token_expired(): self.reset_token() if self.lock_for_reseting_jwt is not None: self.lock_for_reseting_jwt.release() req.headers['Authorization'] = 'Bearer %s' % self.token return req