import json
import types
from future.utils import with_metaclass
from enum import Enum
from . import consts as CONST
from .document import Document, Edge
from .theExceptions import ValidationError, SchemaViolation, CreationError, UpdateError, DeletionError, InvalidDocument, ExportError, DocumentNotFoundError, ArangoError, BulkOperationError, IndexError
from .query import SimpleQuery
from .index import Index
__all__ = ["Collection", "Edges", "Field", "DocumentCache", "CachedDoc", "Collection_metaclass", "getCollectionClass", "isCollection", "isDocumentCollection", "isEdgeCollection", "getCollectionClasses"]
class BulkMode(Enum):
NONE = 0
INSERT = 1
UPDATE = 2
DELETE = 3
[docs]class CachedDoc(object):
"""A cached document."""
def __init__(self, document, prev, nextDoc):
self.prev = prev
self.document = document
self.nextDoc = nextDoc
self._key = document._key
def __getitem__(self, k):
return self.document[k]
def __setitem__(self, k, v):
self.document[k] = v
def __getattribute__(self, k):
try:
return object.__getattribute__(self, k)
except Exception as e1:
try:
return getattr(self.document, k)
except Exception as e2:
raise e2
[docs]class DocumentCache(object):
"""Document cache for collection, with insert, deletes and updates in O(1)."""
def __init__(self, cacheSize):
self.cacheSize = cacheSize
self.cacheStore = {}
self.head = None
self.tail = None
def cache(self, doc):
if doc._key in self.cacheStore:
ret = self.cacheStore[doc._key]
if ret.prev is not None:
ret.prev.nextDoc = ret.nextDoc
self.head.prev = ret
ret.nextDoc = self.head
self.head = ret
return self.head
else:
if len(self.cacheStore) == 0:
ret = CachedDoc(doc, prev = None, nextDoc = None)
self.head = ret
self.tail = self.head
self.cacheStore[doc._key] = ret
else:
if len(self.cacheStore) >= self.cacheSize:
del(self.cacheStore[self.tail._key])
self.tail = self.tail.prev
self.tail.nextDoc = None
ret = CachedDoc(doc, prev = None, nextDoc = self.head)
self.head.prev = ret
self.head = self.head.prev
self.cacheStore[doc._key] = ret
[docs] def delete(self, _key):
"""Remove a document from the cache."""
try:
doc = self.cacheStore[_key]
doc.prev.nextDoc = doc.nextDoc
doc.nextDoc.prev = doc.prev
del(self.cacheStore[_key])
except KeyError:
raise KeyError("Document with _key %s is not available in cache" % _key)
[docs] def getChain(self):
"""Return a list of keys representing the chain of documents."""
l = []
h = self.head
while h:
l.append(h._key)
h = h.nextDoc
return l
[docs] def stringify(self) -> str:
"""Return a pretty string of 'getChain()'."""
l = []
h = self.head
while h:
l.append(str(h._key))
h = h.nextDoc
return "<->".join(l)
def __getitem__(self, _key):
try:
ret = self.cacheStore[_key]
self.cache(ret)
return ret
except KeyError:
raise KeyError("Document with _key %s is not available in cache" % _key)
def __repr__(self):
return "[DocumentCache, size: %d, full: %d]" %(self.cacheSize, len(self.cacheStore))
[docs]class Field(object):
"""The class for defining pyArango fields."""
def __init__(self, validators = None, default = None):
"""Validators must be a list of validators.
'default' can also be a callable."""
if not validators:
validators = []
self.validators = validators
self.default = default
[docs] def validate(self, value):
"""Check the validity of 'value' given the list of validators."""
for v in self.validators:
v.validate(value)
return True
def __str__(self):
strv = []
for v in self.validators:
strv.append(str(v))
return "<Field, validators: '%s'>" % ', '.join(strv)
[docs]def getCollectionClass(name) -> bool:
"""Return 'True' or 'False' whether 'name' is the name of collection."""
return Collection_metaclass.getCollectionClass(name)
[docs]def isCollection(name) -> bool:
"""Return 'True' or 'False' whether 'name' is the name of a document collection."""
return Collection_metaclass.isCollection(name)
[docs]def isDocumentCollection(name) -> bool:
"""Return 'True' or 'False' whether 'name' is the name of a document collection."""
return Collection_metaclass.isDocumentCollection(name)
[docs]def isEdgeCollection(name) -> bool:
"""Return 'True' or 'False' whether 'name' is the name of an edge collection."""
return Collection_metaclass.isEdgeCollection(name)
[docs]def getCollectionClasses() -> bool:
"""Return a dictionary of all defined collection classes."""
return Collection_metaclass.collectionClasses
[docs]class Collection(with_metaclass(Collection_metaclass, object)):
"""A document collection. Collections are meant to be instantiated by databases."""
# here you specify the fields that you want for the documents in your collection
_fields = {}
_validation = {
'on_save' : False,
'on_set' : False,
'on_load' : False,
'allow_foreign_fields' : True
}
arangoPrivates = ["_id", "_key", "_rev"]
def __init__(self, database, jsonData):
self.database = database
self.connection = self.database.connection
self.name = self.__class__.__name__
for k in jsonData:
setattr(self, k, jsonData[k])
self.documentCache = None
self.documentClass = Document
self.indexes = {
"primary" : {},
"hash" : {},
"skiplist" : {},
"persistent": {},
"ttl": {},
"geo" : {},
"fulltext" : {},
}
self.indexes_by_name = {}
# self.defaultDocument = None #getDefaultDoc(self._fields, {})
self._isBulkInProgress = False
self._bulkSize = 0
self._bulkCache = []
self._bulkMode = BulkMode.NONE
def getDefaultDocument(self, fields=None, dct=None):
if dct is None:
dct = {}
if fields is None:
fields = self._fields
for k, v in fields.items():
if isinstance(v, dict):
dct[k] = self.getDefaultDocument(fields[k], None)
elif isinstance(v, list) or isinstance(v, tuple):
dct[k] = []
elif isinstance(v, Field):
if callable(v.default):
dct[k] = v.default()
else :
dct[k] = v.default
else:
raise ValueError("Field '%s' is of invalid type '%s'" % (k, type(v)) )
return dct
def getURL(self):
return "%s/collection/%s" % (self.database.getURL(), self.name)
def getDocumentsURL(self):
return "%s/document" % (self.database.getURL())
[docs] def getIndexes(self):
"""Fill 'self.indexes' with all the indexes associated with the collection and return it."""
self.indexes_by_name = {}
url = "%s/index" % self.database.getURL()
r = self.connection.session.get(url, params = {"collection": self.name})
data = r.json()
for ind in data["indexes"]:
index = Index(collection = self, infos = ind)
self.indexes[ind["type"]][ind["id"]] = index
if "name" in ind:
self.indexes_by_name[ind["name"]] = index
return self.indexes
def getIndex(self, name):
if len(self.indexes_by_name) == 0:
raise IndexError("named indices unsupported")
return self.indexes_by_name[name]
[docs] def activateCache(self, cacheSize):
"""Activate the caching system.
Cached documents are only available through the __getitem__ interface."""
self.documentCache = DocumentCache(cacheSize)
[docs] def deactivateCache(self):
"""Deactivate the caching system."""
self.documentCache = None
[docs] def delete(self):
"""Delete the collection from the database."""
r = self.connection.session.delete(self.getURL())
data = r.json()
if not r.status_code == 200 or data["error"]:
raise DeletionError(data["errorMessage"], data)
[docs] def createDocument(self, initDict = None):
"""Create and return a completely empty document unless the initial document is set via 'initDict'."""
# res = dict(self.defaultDocument)
res = self.getDefaultDocument()
if initDict is not None:
res.update(initDict)
return self.documentClass(self, res)
def _writeBatch(self):
if not self._bulkCache:
return
if self._bulkMode != BulkMode.INSERT:
raise UpdateError("Mixed bulk operations not supported - have " + str(self._bulkMode))
payload = []
for d in self._bulkCache:
if isinstance(d,dict):
payload.append(json.dumps(d, default=str))
else:
try:
payload.append(d.toJson())
except Exception as e:
payload.append(json.dumps(d.getStore(), default=str))
payload = '[' + ','.join(payload) + ']'
r = self.connection.session.post(self.getDocumentsURL(), params = self._batchParams, data = payload)
data = r.json()
if (not isinstance(data, list)):
raise UpdateError("expected reply to be a json array" + r)
i = 0
bulkError = None
for xd in data:
if not '_key' in xd and 'error' in xd and 'errorNum' in xd:
if bulkError is None:
bulkError = BulkOperationError("saving failed")
bulkError.addBulkError(ArangoError(xd), self._bulkCache[i])
else:
self._bulkCache[i].setPrivates(xd)
self._bulkCache[i]._key = \
xd['_key']
i += 1
if bulkError is not None:
self._bulkCache = []
raise bulkError
self._bulkCache = []
def _saveBatch(self, document, params):
if self._bulkMode != BulkMode.NONE and self._bulkMode != BulkMode.INSERT:
raise UpdateError("Mixed bulk operations not supported - have " + str(self._bulkMode))
self._bulkMode = BulkMode.INSERT
self._bulkCache.append(document)
self._batchParams = params
if len(self._bulkCache) == self._bulkSize:
self._writeBatch()
self._bulkMode = BulkMode.NONE
def _updateBatch(self):
if not self._bulkCache:
return
if self._bulkMode != BulkMode.UPDATE:
raise UpdateError("Mixed bulk operations not supported - have " + str(self._bulkMode))
payload = []
for d in self._bulkCache:
dPayload = d._store.getPatches()
if d.collection._validation['on_save']:
d.validate()
if isinstance(d,dict):
payload.append(json.dumps(d, default=str))
else:
try:
payload.append(d.toJson())
except Exception as e:
payload.append(json.dumps(d.getStore(), default=str))
payload = '[' + ','.join(payload) + ']'
r = self.connection.session.patch(self.getDocumentsURL(), params = self._batchParams, data = payload)
data = r.json()
if (not isinstance(data, list)):
raise UpdateError("expected reply to be a json array" + dir(r))
i = 0
bulkError = None
for xd in data:
if not '_key' in xd and 'error' in xd and 'errorNum' in xd:
if bulkError is None:
bulkError = BulkOperationError("patching failed")
bulkError.addBulkError(ArangoError(xd), str(self._bulkCache[i]))
else:
self._bulkCache[i].setPrivates(xd)
self._bulkCache[i]._key = \
xd['_key']
i += 1
self._bulkCache = []
if bulkError is not None:
raise bulkError
def _patchBatch(self, document, params):
if self._bulkMode != BulkMode.NONE and self._bulkMode != BulkMode.UPDATE:
raise UpdateError("Mixed bulk operations not supported - have " + str(self._bulkMode))
self._bulkMode = BulkMode.UPDATE
self._bulkCache.append(document)
self._batchParams = params
if len(self._bulkCache) == self._bulkSize:
self._updateBatch()
self._bulkMode = BulkMode.NONE
def _removeBatch(self):
if not self._bulkCache:
return
if self._bulkMode != BulkMode.DELETE:
raise UpdateError("Mixed bulk operations not supported - have " + self._bulkMode)
payload = []
for d in self._bulkCache:
if isinstance(d,dict):
payload.append('"%s"' % d['_key'])
else:
try:
payload.append('"%s"' % d['_key'])
except Exception as e:
payload.append('"%s"' % d['_key'])
payload = '[' + ','.join(payload) + ']'
r = self.connection.session.delete(self.getDocumentsURL() + "/" + self.name, params = self._batchParams, data = payload)
data = r.json()
if (not isinstance(data, list)):
raise UpdateError("expected reply to be a json array" + r)
i = 0
bulkError = None
for xd in data:
if not '_key' in xd and 'error' in xd and 'errorNum' in xd:
if bulkError is None:
bulkError = BulkOperationError("deleting failed")
bulkError.addBulkError(ArangoError(xd), self._bulkCache[i])
else:
self._bulkCache[i].reset(self)
i += 1
self._bulkCache = []
if bulkError is not None:
raise bulkError
def _deleteBatch(self, document, params):
if self._bulkMode != BulkMode.NONE and self._bulkMode != BulkMode.DELETE:
raise UpdateError("Mixed bulk operations not supported - have " + str(self._bulkMode))
self._bulkMode = BulkMode.DELETE
self._bulkCache.append(document)
self._batchParams = params
if len(self._bulkCache) == self._bulkSize:
self._removeBatch()
self._bulkMode = BulkMode.NONE
def _finalizeBatch(self):
if self._bulkMode == BulkMode.INSERT:
self._writeBatch()
elif self._bulkMode == BulkMode.UPDATE:
self._updateBatch()
elif self._bulkMode == BulkMode.DELETE:
self._removeBatch()
# elif self._bulkMode == BulkMode.NONE:
self._bulkSize = 0
self._isBulkInProgress = False
self._batchParams = None
self._bulkMode = BulkMode.NONE
def importBulk(self, data, **addParams):
url = "%s/import" % (self.database.getURL())
payload = json.dumps(data, default=str)
params = {"collection": self.name, "type": "auto"}
params.update(addParams)
r = self.connection.session.post(url , params = params, data = payload)
data = r.json()
if not r.status_code == 201 or data["error"]:
raise CreationError(data["errorMessage"], data)
return data
def exportDocs( self, **data):
url = "%s/export" % (self.database.getURL())
params = {"collection": self.name}
payload = json.dumps(data)
r = self.connection.session.post(url, params = params, data = payload)
data = r.json()
if not r.status_code == 201 or data["error"]:
raise ExportError( data["errorMessage"], data )
docs = data['result']
return docs
[docs] def ensureHashIndex(self, fields, unique = False, sparse = True, deduplicate = False, name = None):
"""Create a hash index if it does not already exist, then return it."""
data = {
"type" : "hash",
"fields" : fields,
"unique" : unique,
"sparse" : sparse,
"deduplicate": deduplicate
}
if name:
data["name"] = name
ind = Index(self, creationData = data)
self.indexes["hash"][ind.infos["id"]] = ind
if name:
self.indexes_by_name[name] = ind
return ind
[docs] def ensureSkiplistIndex(self, fields, unique = False, sparse = True, deduplicate = False, name = None):
"""Create a skiplist index if it does not already exist, then return it."""
data = {
"type" : "skiplist",
"fields" : fields,
"unique" : unique,
"sparse" : sparse,
"deduplicate": deduplicate
}
if name:
data["name"] = name
ind = Index(self, creationData = data)
self.indexes["skiplist"][ind.infos["id"]] = ind
if name:
self.indexes_by_name[name] = ind
return ind
[docs] def ensurePersistentIndex(self, fields, unique = False, sparse = True, deduplicate = False, name = None):
"""Create a persistent index if it does not already exist, then return it."""
data = {
"type" : "persistent",
"fields" : fields,
"unique" : unique,
"sparse" : sparse,
"deduplicate": deduplicate
}
if name:
data["name"] = name
ind = Index(self, creationData = data)
self.indexes["skiplist"][ind.infos["id"]] = ind
if name:
self.indexes_by_name[name] = ind
return ind
[docs] def ensureTTLIndex(self, fields, expireAfter, unique = False, sparse = True, name = None):
"""Create a TTL index if it does not already exist, then return it."""
data = {
"type" : "ttl",
"fields" : fields,
"unique" : unique,
"sparse" : sparse,
"expireAfter" : expireAfter
}
if name:
data["name"] = name
ind = Index(self, creationData = data)
self.indexes["skiplist"][ind.infos["id"]] = ind
if name:
self.indexes_by_name[name] = ind
return ind
[docs] def ensureGeoIndex(self, fields, name = None):
"""Create a geo index if it does not already exist, then return it."""
data = {
"type" : "geo",
"fields" : fields,
}
if name:
data["name"] = name
ind = Index(self, creationData = data)
self.indexes["geo"][ind.infos["id"]] = ind
if name:
self.indexes_by_name[name] = ind
return ind
[docs] def ensureFulltextIndex(self, fields, minLength = None, name = None):
"""Create a fulltext index if it does not already exist, then return it."""
data = {
"type" : "fulltext",
"fields" : fields,
}
if name:
data["name"] = name
if minLength is not None:
data["minLength"] = minLength
ind = Index(self, creationData = data)
self.indexes["fulltext"][ind.infos["id"]] = ind
if name:
self.indexes_by_name[name] = ind
return ind
[docs] def ensureIndex(self, index_type, fields, name=None, **index_args):
"""Create an index of any type."""
data = {
"type" : index_type,
"fields" : fields,
}
data.update(index_args)
if name:
data["name"] = name
ind = Index(self, creationData = data)
self.indexes[index_type][ind.infos["id"]] = ind
if name:
self.indexes_by_name[name] = ind
return ind
[docs] def restoreIndexes(self, indexes_dct=None):
"""Restore all previously removed indexes."""
if indexes_dct is None:
indexes_dct = self.indexes
for typ in indexes_dct.keys():
if typ != "primary":
for name, idx in indexes_dct[typ].items():
infos = dict(idx.infos)
del infos["fields"]
self.ensureIndex(typ, idx.infos["fields"], **infos)
[docs] def validatePrivate(self, field, value):
"""Validate a private field value."""
if field not in self.arangoPrivates:
raise ValueError("%s is not a private field of collection %s" % (field, self))
if field in self._fields:
self._fields[field].validate(value)
return True
[docs] @classmethod
def hasField(cls, fieldName):
"""Return 'True' or 'False' whether the collection has field 'K' in its schema.
Use the dot notation for the nested fields: address.street"""
path = fieldName.split(".")
v = cls._fields
for k in path:
try:
v = v[k]
except KeyError:
return False
return True
[docs] def fetchDocument(self, key, rawResults = False, rev = None):
"""Fetche a document from the collection given its key.
This function always goes straight to the db and bypasses the cache.
If you want to take advantage of the cache use the '__getitem__' interface: collection[key]"""
url = "%s/%s/%s" % (self.getDocumentsURL(), self.name, key)
if rev is not None:
r = self.connection.session.get(url, params = {'rev' : rev})
else:
r = self.connection.session.get(url)
if r.status_code < 400:
if rawResults:
return r.json()
return self.documentClass(self, r.json(), on_load_validation=self._validation["on_load"])
elif r.status_code == 404 :
raise DocumentNotFoundError("Unable to find document with _key: %s" % key, r.json())
raise DocumentNotFoundError("Unable to find document with _key: %s, response: %s" % (key, r.json()), r.json())
[docs] def fetchByExample(self, exampleDict, batchSize, rawResults = False, **queryArgs):
"""'exampleDict' should be something like {'age' : 28}."""
return self.simpleQuery('by-example', rawResults, example = exampleDict, batchSize = batchSize, **queryArgs)
[docs] def fetchFirstExample(self, exampleDict, rawResults = False):
"""'exampleDict' should be something like {'age' : 28}.
Return the first example found that matches the example, still in a 'SimpleQuery' object."""
return self.simpleQuery('first-example', rawResults = rawResults, example = exampleDict)
[docs] def fetchAll(self, rawResults = False, **queryArgs):
"""Returns all the documents in the collection.
You can use the optinal arguments 'skip' and 'limit'::
fetchAlll(limit = 3, shik = 10)"""
return self.simpleQuery('all', rawResults = rawResults, **queryArgs)
[docs] def simpleQuery(self, queryType, rawResults = False, **queryArgs):
"""General interface for simple queries.
'queryType' takes the arguments known to the ArangoDB, for instance: 'all' or 'by-example'.
See the ArangoDB documentation for a list of valid 'queryType's.
If 'rawResults' is set to 'True', the query will return dictionaries instead of 'Document' objetcs."""
return SimpleQuery(self, queryType, rawResults, **queryArgs)
[docs] def action(self, method, action, **params):
"""A generic 'fct' for interacting everything that does not have an assigned 'fct'."""
fct = getattr(self.connection.session, method.lower())
r = fct(self.getURL() + "/" + action, params = params)
return r.json()
[docs] def bulkSave(self, docs, onDuplicate="error", **params):
"""Parameter 'docs' must be either an iterable of documents or dictionaries.
This function will return the number of documents, created and updated, and will raise an UpdateError exception if there is at least one error.
'params' are any parameters from the ArangoDB documentation."""
payload = []
for d in docs:
if isinstance(d,dict):
payload.append(json.dumps(d, default=str))
else:
try:
payload.append(d.toJson())
except Exception as e:
payload.append(json.dumps(d.getStore(), default=str))
payload = '\n'.join(payload)
params["type"] = "documents"
params["onDuplicate"] = onDuplicate
params["collection"] = self.name
url = "%s/import" % self.database.getURL()
r = self.connection.session.post(url, params = params, data = payload)
data = r.json()
if (r.status_code == 201) and "error" not in data:
return True
if "errors" in data and data["errors"] > 0:
raise UpdateError("%d documents could not be created" % data["errors"], data)
elif data["error"]:
raise UpdateError("Documents could not be created", data)
return data["updated"] + data["created"]
[docs] def bulkImport_json(self, filename, onDuplicate="error", formatType="auto", **params):
"""Bulk import from a file following the ArangoDB key-value format."""
url = "%s/import" % self.database.getURL()
params["onDuplicate"] = onDuplicate
params["collection"] = self.name
params["type"] = formatType
with open(filename) as f:
data = f.read()
r = self.connection.session.post(url, params = params, data = data)
if r.status_code != 201:
raise UpdateError('Unable to bulk import JSON', r)
[docs] def bulkImport_values(self, filename, onDuplicate="error", **params):
"""Bulk import from a file following the ArangoDB json format."""
url = "%s/import" % self.database.getURL()
params["onDuplicate"] = onDuplicate
params["collection"] = self.name
with open(filename) as f:
data = f.read()
r = self.connection.session.post(url, params = params, data = data)
if r.status_code != 201:
raise UpdateError('Unable to bulk import values', r)
[docs] def truncate(self):
"""Delete every document in the collection."""
return self.action('PUT', 'truncate')
[docs] def empty(self):
"""Alias for truncate."""
return self.truncate()
[docs] def load(self):
"""Load collection in memory."""
return self.action('PUT', 'load')
[docs] def unload(self):
"""Unload collection from memory."""
return self.action('PUT', 'unload')
[docs] def revision(self):
"""Return the current revision."""
return self.action('GET', 'revision')["revision"]
[docs] def properties(self):
"""Return the current properties."""
return self.action('GET', 'properties')
[docs] def checksum(self):
"""Return the current checksum."""
return self.action('GET', 'checksum')["checksum"]
[docs] def count(self):
"""Return the number of documents in the collection."""
return self.action('GET', 'count')["count"]
[docs] def getType(self):
"""Return a word describing the type of the collection (edges or ducments) instead of a number.
If you prefer the number it is in 'self.type'."""
if self.type == CONST.COLLECTION_DOCUMENT_TYPE:
return "document"
elif self.type == CONST.COLLECTION_EDGE_TYPE:
return "edge"
raise ValueError("The collection is of Unknown type %s" % self.type)
[docs] def getStatus(self):
"""Return a word describing the status of the collection (loaded, loading, deleted, unloaded, newborn) instead of a number, if you prefer the number it is in 'self.status'."""
if self.status == CONST.COLLECTION_LOADING_STATUS:
return "loading"
elif self.status == CONST.COLLECTION_LOADED_STATUS:
return "loaded"
elif self.status == CONST.COLLECTION_DELETED_STATUS:
return "deleted"
elif self.status == CONST.COLLECTION_UNLOADED_STATUS:
return "unloaded"
elif self.status == CONST.COLLECTION_NEWBORN_STATUS:
return "newborn"
raise ValueError("The collection has an Unknown status %s" % self.status)
def __len__(self):
"""Return the number of documents in the collection."""
return self.count()
def __repr__(self):
return "ArangoDB collection name: %s, id: %s, type: %s, status: %s" % (self.name, self.id, self.getType(), self.getStatus())
def __getitem__(self, key):
"""Return a document from the cache.
If it is not there, fetch from the db and cache it first.
If the cache is not activated, this is equivalent to 'fetchDocument(rawResults=False)'."""
if self.documentCache is None:
return self.fetchDocument(key, rawResults = False)
try:
return self.documentCache[key]
except KeyError:
doc = self.fetchDocument(key, rawResults = False)
self.documentCache.cache(doc)
return doc
def __contains__(self, key):
"""Return 'True' or 'False' whether the doc is in the collection."""
try:
self.fetchDocument(key, rawResults = False)
return True
except DocumentNotFoundError as e:
return False
class SystemCollection(Collection):
"""For all collections with 'isSystem=True'."""
def __init__(self, database, jsonData):
Collection.__init__(self, database, jsonData)
[docs]class Edges(Collection):
"""The default edge collection. All edge Collections must inherit from it."""
arangoPrivates = ["_id", "_key", "_rev", "_to", "_from"]
def __init__(self, database, jsonData):
"""This one is meant to be called by the database."""
Collection.__init__(self, database, jsonData)
self.documentClass = Edge
self.edgesURL = "%s/edges/%s" % (self.database.getURL(), self.name)
[docs] @classmethod
def validateField(cls, fieldName, value):
"""Check if 'value' is valid for field 'fieldName'.
If the validation fails, raise a 'SchemaViolation' or a 'ValidationError'.
For nested dicts ex: {address : { street: xxx} }, 'fieldName' can take the form 'address.street'."""
try:
valValue = Collection.validateField(fieldName, value)
except SchemaViolation as e:
if fieldName == "_from" or fieldName == "_to":
return True
raise e
return valValue
[docs] def createEdge(self, initValues = None):
"""Create an edge populated with defaults."""
return self.createDocument(initValues)
[docs] def getInEdges(self, vertex, rawResults = False):
"""An alias for 'getEdges()' that returns only the in 'Edges'."""
return self.getEdges(vertex, inEdges = True, outEdges = False, rawResults = rawResults)
[docs] def getOutEdges(self, vertex, rawResults = False):
"""An alias for 'getEdges()' that returns only the out 'Edges'."""
return self.getEdges(vertex, inEdges = False, outEdges = True, rawResults = rawResults)
[docs] def getEdges(self, vertex, inEdges = True, outEdges = True, rawResults = False):
"""Return in, out, or both edges linked to a given document.
Vertex can be either a 'Document' object or a string for an '_id'.
If 'rawResults' is set to 'True', return the results just as fetched without any processing.
Otherwise, return a list of Edge objects."""
if isinstance(vertex, Document):
vId = vertex._id
elif isinstance(vertex,str) or isinstance(vertex,bytes):
vId = vertex
else:
raise ValueError("Vertex is neither a Document nor a String")
params = {"vertex" : vId}
if inEdges and outEdges:
pass
elif inEdges:
params["direction"] = "in"
elif outEdges:
params["direction"] = "out"
else:
raise ValueError("inEdges, outEdges or both must have a boolean value")
r = self.connection.session.get(self.edgesURL, params = params)
data = r.json()
if r.status_code == 200:
if not rawResults:
ret = []
for e in data["edges"]:
ret.append(Edge(self, e))
return ret
return data["edges"]
else:
raise CreationError("Unable to return edges for vertex: %s" % vId, data)
class BulkOperation(object):
def __init__(self, collection, batchSize=100):
self.coll = collection
self.batchSize = batchSize
def __enter__(self):
self.coll._isBulkInProgress = True
self.coll._bulkSize = self.batchSize
return self.coll
def __exit__(self, type, value, traceback):
self.coll._finalizeBatch();