import uuid
import json as json_mod
from datetime import datetime
import requests
from .action import ConnectionAction
from .database import Database, DBHandle
from .theExceptions import CreationError, ConnectionError
from .users import Users
from .ca_certificate import CA_Certificate
from json.decoder import JSONDecodeError
[docs]class JsonHook(object):
"""This one replaces requests' original json() function. If a call to json() fails, it will print a message with the request content"""
def __init__(self, ret):
self.ret = ret
self.ret.json_originalFct = self.ret.json
def __call__(self, *args, **kwargs):
try:
return self.ret.json_originalFct(*args, **kwargs)
except Exception as e:
print( "Unable to get json for request: %s. Content: %s" % (self.ret.url, self.ret.content) )
raise e
[docs]class AikidoSession:
"""Magical Aikido being that you probably do not need to access directly
that deflects every http request to requests in the most graceful way.
It will also save basic stats on requests in it's attribute '.log'.
"""
class Holder(object):
def __init__(self, fct, auth, max_conflict_retries=5, verify=True, timeout=30):
self.fct = fct
self.auth = auth
self.max_conflict_retries = max_conflict_retries
if not isinstance(verify, bool) and not isinstance(verify, CA_Certificate) and not not isinstance(verify, str) :
raise ValueError("'verify' argument can only be of type: bool, CA_Certificate or str ")
self.verify = verify
self.timeout = timeout
def __call__(self, *args, **kwargs):
if self.auth:
kwargs["auth"] = self.auth
if isinstance(self.verify, CA_Certificate):
kwargs["verify"] = self.verify.get_file_path()
else :
kwargs["verify"] = self.verify
kwargs["timeout"] = self.timeout
try:
do_retry = True
retry = 0
while do_retry and retry < self.max_conflict_retries:
ret = self.fct(*args, **kwargs)
do_retry = ret.status_code == 1200
try :
data = ret.json()
do_retry = do_retry or ("errorNum" in data and data["errorNum"] == 1200)
except JSONDecodeError:
pass
retry += 1
except:
print ("===\nUnable to establish connection, perhaps arango is not running.\n===")
raise
if len(ret.content) < 1:
raise ConnectionError("Empty server response", ret.url, ret.status_code, ret.content)
elif ret.status_code == 401:
raise ConnectionError("Unauthorized access, you must supply a (username, password) with the correct credentials", ret.url, ret.status_code, ret.content)
ret.json = JsonHook(ret)
return ret
def __init__(
self,
username,
password,
verify=True,
max_conflict_retries=5,
max_retries=5,
single_session=True,
log_requests=False,
pool_maxsize=10,
timeout=30,
):
if username:
self.auth = (username, password)
else:
self.auth = None
self.pool_maxsize = pool_maxsize
self.verify = verify
self.max_retries = max_retries
self.log_requests = log_requests
self.max_conflict_retries = max_conflict_retries
self.timeout = timeout
self.session = None
if single_session:
self.session = self._make_session()
if log_requests:
self.log = {}
self.log["nb_request"] = 0
self.log["requests"] = {}
def _make_session(self):
session = requests.Session()
kwargs = {
'max_retries': self.max_retries,
'pool_connections': self.pool_maxsize,
'pool_maxsize': self.pool_maxsize,
#'pool_block': True # We don't want to lose connections
}
http = requests.adapters.HTTPAdapter(**kwargs)
https = requests.adapters.HTTPAdapter(**kwargs)
session.mount('http://', http)
session.mount('https://', https)
return session
def __getattr__(self, request_function_name):
if self.session is not None:
session = self.session
else:
session = self._make_session()
try:
request_function = getattr(session, request_function_name)
except AttributeError:
raise AttributeError("Attribute '%s' not found (no Aikido move available)" % request_function_name)
auth = object.__getattribute__(self, "auth")
verify = object.__getattribute__(self, "verify")
timeout = object.__getattribute__(self, "timeout")
if self.log_requests:
log = object.__getattribute__(self, "log")
log["nb_request"] += 1
log["requests"][request_function.__name__] += 1
return AikidoSession.Holder(request_function, auth, max_conflict_retries=self.max_conflict_retries, verify=verify, timeout=timeout)
def disconnect(self):
pass
[docs]class Connection(object):
"""This is the entry point in pyArango and directly handles databases.
@param arangoURL: can be either a string url or a list of string urls to different coordinators
@param use_grequests: allows for running concurent requets.
Parameters
----------
arangoURL: list or str
list of urls or url for connecting to the db
username: str
for credentials
password: str
for credentials
verify: bool
check the validity of the CA certificate
verbose: bool
flag for addictional prints during run
statsdClient: instance
statsd instance
reportFileName: str
where to save statsd report
loadBalancing: str
type of load balancing between collections
use_grequests: bool
parallelise requests using gevents. Use with care as gevents monkey patches python, this could have unintended concequences on other packages
use_jwt_authentication: bool
use JWT authentication
use_lock_for_reseting_jwt: bool
use lock for reseting gevents authentication
max_retries: int
max number of retries for a request
max_conflict_retries: int
max number of requests for a conflict error (1200 arangodb error). Does not work with gevents (grequests),
pool_maxsize: int
max number of open connections. (Not intended for grequest)
timeout: int
number of seconds to wait on a hanging connection before giving up
"""
LOAD_BLANCING_METHODS = {'round-robin', 'random'}
def __init__(
self,
arangoURL='http://127.0.0.1:8529',
username=None,
password=None,
verify=True,
verbose=False,
statsdClient=None,
reportFileName=None,
loadBalancing="round-robin",
use_grequests=False,
use_jwt_authentication=False,
use_lock_for_reseting_jwt=True,
max_retries=5,
max_conflict_retries=5,
pool_maxsize=10,
timeout=30
):
if loadBalancing not in Connection.LOAD_BLANCING_METHODS:
raise ValueError("loadBalancing should be one of : %s, got %s" % (Connection.LOAD_BLANCING_METHODS, loadBalancing) )
self.pool_maxsize = pool_maxsize
self.loadBalancing = loadBalancing
self.currentURLId = 0
self.username = username
self.use_grequests = use_grequests
self.use_jwt_authentication = use_jwt_authentication
self.use_lock_for_reseting_jwt = use_lock_for_reseting_jwt
self.max_retries = max_retries
self.max_conflict_retries = max_conflict_retries
self.action = ConnectionAction(self)
self.timeout = timeout
self.databases = {}
self.verbose = verbose
if isinstance(arangoURL, str):
self.arangoURL = [arangoURL]
else:
self.arangoURL = arangoURL
for i, url in enumerate(self.arangoURL):
if url[-1] == "/":
self.arangoURL[i] = url[:-1]
self.identifier = None
self.startTime = None
self.session = None
self.resetSession(username, password, verify)
self.users = Users(self)
if reportFileName != None:
self.reportFile = open(reportFileName, 'a')
else:
self.reportFile = None
self.statsdc = statsdClient
self.reload()
[docs] def getEndpointURL(self):
"""return an endpoint url applying load balacing strategy"""
if self.loadBalancing == "round-robin":
url = self.arangoURL[self.currentURLId]
self.currentURLId = (self.currentURLId + 1) % len(self.arangoURL)
return url
elif self.loadBalancing == "random":
import random
return random.choice(self.arangoURL)
[docs] def getURL(self):
"""return an URL for the connection"""
return '%s/_api' % self.getEndpointURL()
[docs] def getDatabasesURL(self):
"""return an URL to the databases"""
if not self.session.auth:
return '%s/database/user' % self.getURL()
else:
return '%s/user/%s/database' % (self.getURL(), self.username)
[docs] def updateEndpoints(self, coordinatorURL = None):
"""udpdates the list of available endpoints from the server"""
raise NotImplementedError("Not done yet.")
def disconnectSession(self):
if self.session:
self.session.disconnect()
[docs] def getVersion(self):
"""fetches the arangodb server version"""
r = self.session.get(self.getURL() + "/version")
data = r.json()
if r.status_code == 200 and not "error" in data:
return data
else:
raise CreationError(data["errorMessage"], data)
def create_aikido_session(
self,
username,
password,
verify
) -> AikidoSession:
return AikidoSession(
username=username,
password=password,
verify=verify,
single_session=True,
max_conflict_retries=self.max_conflict_retries,
max_retries=self.max_retries,
log_requests=False,
pool_maxsize=self.pool_maxsize,
timeout=self.timeout
)
def create_grequest_session(
self,
username,
password,
verify
):
from .gevent_session import AikidoSession_GRequests
return AikidoSession_GRequests(
username, password, self.arangoURL,
self.use_jwt_authentication,
self.use_lock_for_reseting_jwt,
self.max_retries,
verify
)
[docs] def resetSession(self, username=None, password=None, verify=True):
"""resets the session"""
self.disconnectSession()
if self.use_grequests:
self.session = self.create_grequest_session(
username,
password,
verify
)
else:
self.session = self.create_aikido_session(
username,
password,
verify
)
[docs] def reload(self):
"""Reloads the database list.
Because loading a database triggers the loading of all collections and graphs within,
only handles are loaded when this function is called. The full databases are loaded on demand when accessed
"""
r = self.session.get(self.getDatabasesURL())
data = r.json()
if r.status_code == 200 and not data["error"]:
self.databases = {}
for dbName in data["result"]:
if dbName not in self.databases:
self.databases[dbName] = DBHandle(self, dbName)
else:
raise ConnectionError(data["errorMessage"], self.getDatabasesURL(), r.status_code, r.content)
[docs] def createDatabase(self, name, **dbArgs):
"use dbArgs for arguments other than name. for a full list of arguments please have a look at arangoDB's doc"
dbArgs['name'] = name
payload = json_mod.dumps(dbArgs, default=str)
url = self.getURL() + "/database"
r = self.session.post(url, data = payload)
data = r.json()
if r.status_code == 201 and not data["error"]:
db = Database(self, name)
self.databases[name] = db
return self.databases[name]
else:
raise CreationError(data["errorMessage"], r.content)
[docs] def hasDatabase(self, name):
"""returns true/false wether the connection has a database by the name of 'name'"""
return name in self.databases
def __contains__(self, name):
"""Alias for hasDatabase"""
return self.hasDatabase(name)
def __getitem__(self, dbName):
"""Collection[dbName] returns a database by the name of 'dbName', raises a KeyError if not found"""
try:
return self.databases[dbName]
except KeyError:
self.reload()
try:
return self.databases[dbName]
except KeyError:
raise KeyError("Can't find any database named : %s" % dbName)
def reportStart(self, name):
if self.statsdc != None:
self.identifier = str(uuid.uuid5(uuid.NAMESPACE_DNS, name))[-6:]
if self.reportFile != None:
self.reportFile.write("[%s]: %s\n" % (self.identifier, name))
self.reportFile.flush()
self.startTime = datetime.now()
def reportItem(self):
if self.statsdc != None:
diff = datetime.now() - self.startTime
microsecs = (diff.total_seconds() * (1000 ** 2) ) + diff.microseconds
self.statsdc.timing("pyArango_" + self.identifier, int(microsecs))