"""
This submodule is a collection of storage drivers that can be used to store extracted model information
in different types of formats such as SQLite, JSON or API based databases.
"""
import json
import os
import re
import sqlite3
import sys
import requests
from revitron import Log
from time import time
from datetime import datetime
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
def reTokenCallback(match):
return os.getenv(match.group(1))
def parseToken(token):
return re.sub(r'\{\{\s*(\w+)\s*\}\}', reTokenCallback, token)
[docs]class AbstractStorageDriver:
"""
The abstract storage driver is the base class for all storage driver classes.
"""
__metaclass__ = ABCMeta
[docs] def __init__(self, config):
"""
Init a new storage driver instance with a givenm configuration.
Args:
config (dict): The driver configuration
"""
self.config = config
self.timestamp = datetime.fromtimestamp(time()).strftime('%Y-%m-%d %H:%M:%S')
[docs] @abstractmethod
def add(self, dataProviderResults, modelSize):
"""
Add a new snapshot.
Args:
dataProviderResults (list): The list of
:class:`revitron.analyze.DataProviderResult` objects
modelSize (float): The local file's size in bytes
"""
pass
[docs]class DirectusAPI():
"""
The **DirectusAPI** class provides the needed tools to interact with the `Directus API <https://docs.directus.io/reference/introduction/>`_.
"""
[docs] def __init__(self, host, token, collection):
"""
Init a new API wrapper instance.
Args:
host (string): The API URL
token (string): The API token that is used for authentication
collection (string): The collection name
"""
self.host = host
self.token = token
self.collection = collection
@property
def _headers(self):
return {
'Accept': 'application/json',
'Authorization': 'Bearer {}'.format(self.token),
'Content-Type': 'application/json'
}
[docs] def get(self, endpoint, log=True):
"""
Get data from a given endpoint.
Args:
endpoint (string): The Directus API endpoint
log (bool, optional): Enable logging. Defaults to True.
Returns:
dict: The reponse dictionary
"""
response = requests.get(
'{}/{}'.format(self.host, endpoint),
headers=self._headers,
allow_redirects=True
)
try:
responseJson = response.json()
return responseJson['data']
except:
if log:
Log().error('Request has failed')
Log().error(response.json())
return None
[docs] def post(self, endpoint, data):
"""
Post data to a given enpoint.
Args:
endpoint (string): The endpoint
data (dict): The data dict
Returns:
dict: The reponse dictionary
"""
response = requests.post(
'{}/{}'.format(self.host, endpoint),
headers=self._headers,
data=json.dumps(data),
allow_redirects=True
)
try:
responseJson = response.json()
return responseJson['data']
except:
Log().error(data)
Log().error('Request has failed')
Log().error(response.json())
return None
[docs] def collectionExists(self):
"""
Test whether a collection exists.
Returns:
bool: True if the collection exists
"""
return self.get('collections/{}'.format(self.collection), False) is not None
[docs] def getFields(self):
"""
Get the fields list of a collection.
Returns:
list: The list of fields
"""
fields = []
data = self.get('fields/{}'.format(self.collection))
if data:
for item in data:
fields.append(item['field'])
return fields
[docs] def clearCache(self):
"""
Clear the Directus cache.
Returns:
dict: The response data
"""
return requests.post(
'{}/{}'.format(self.host, 'utils/cache/clear'), headers=self._headers
)
[docs] def createCollection(self):
"""
Create the collection.
Returns:
dict: The response data
"""
return self.post(
'collections', {
'collection': self.collection, 'schema': {}, 'meta': {
'icon': 'timeline'
}
}
)
[docs] def createField(self, name, dataType):
"""
Create a field in the collection.
Args:
name (string): The field name
dataType (string): The data type for the field
Returns:
dict: The response data
"""
data = {
'field': name,
'type': dataType.replace('real', 'float'),
'schema': {},
'meta': {
'icon': 'data_usage'
}
}
return self.post('fields/{}'.format(self.collection), data)
[docs]class DirectusStorageDriver(AbstractStorageDriver):
"""
This storage driver handles storing snapshots to in Directus using the Directus API.
"""
[docs] def __init__(self, config):
"""
Init a new Directus storage driver instance with a givenm configuration.
Args:
config (dict): The driver configuration
"""
try:
collection = 'snapshots__{}'.format(
re.sub(r'[^a-z0-9]+', '_', config['collection'].lower())
)
host = config['host'].rstrip('/')
token = parseToken(config['token'])
except:
Log().error('Invalid Directus configuration')
sys.exit(1)
self.api = DirectusAPI(host, token, collection)
self.collection = collection
self.timestamp = datetime.fromtimestamp(time()).strftime('%Y-%m-%dT%H:%M:%S')
def _createMissingFields(self, dataProviderResults):
remoteFields = self.api.getFields()
if 'model_size' not in remoteFields:
self.api.createField('model_size', 'float')
if 'timestamp' not in remoteFields:
self.api.createField('timestamp', 'timestamp')
for item in dataProviderResults:
if item.name not in remoteFields:
self.api.createField(item.name, item.dataType)
[docs] def add(self, dataProviderResults, modelSize):
"""
Send a POST request to the Directus API in order store a snapshot
Args:
dataProviderResults (list): The list of
:class:`revitron.analyze.DataProviderResult` objects
modelSize (float): The model size in bytes
"""
api = self.api
api.clearCache()
rowId = 1
remoteItems = api.get('items/{}?sort=-id'.format(self.collection), log=False)
if remoteItems:
maxId = max(row['id'] for row in remoteItems)
rowId = maxId + 1
if not api.collectionExists():
api.createCollection()
self._createMissingFields(dataProviderResults)
data = {}
data['id'] = rowId
data['model_size'] = modelSize
data['timestamp'] = self.timestamp
for item in dataProviderResults:
data[item.name] = item.value
api.post('items/{}'.format(self.collection), data)
api.clearCache()
[docs]class JSONStorageDriver(AbstractStorageDriver):
"""
This storage driver handles appending snapshots to JSON files.
"""
[docs] def add(self, dataProviderResults, modelSize):
"""
Add a new item to JSON file.
Args:
dataProviderResults (list): The list of
:class:`revitron.analyze.DataProviderResult` objects
modelSize (float): The local file's size in bytes
"""
try:
file = self.config['file']
except:
Log().error('No JSON file defined')
sys.exit(1)
try:
with open(file) as handle:
snapshots = json.load(handle, object_pairs_hook=OrderedDict)
except:
snapshots = []
data = OrderedDict()
data['timestamp'] = self.timestamp
data['model_size'] = modelSize
for item in dataProviderResults:
data[item.name] = item.value
snapshots.append(data)
with open(file, 'w') as handle:
json.dump(snapshots, handle, indent=2)
[docs]class SQLiteStorageDriver(AbstractStorageDriver):
"""
This storage driver handles the connection to the SQLite database as well as the actual
creation of the snapshots.
"""
[docs] def add(self, dataProviderResults, modelSize):
"""
Add a new row to the snapshots table.
Args:
dataProviderResults (list): The list of
:class:`revitron.analyze.DataProviderResult` objects
modelSize (float): The local file's size in bytes
"""
try:
file = self.config['file']
except:
Log().error('No SQLite database file defined')
sys.exit(1)
create = ''
insertColumns = ', model_size'
insertParams = ', :model_size'
data = dict()
data['model_size'] = modelSize
for item in dataProviderResults:
create = '{} {} {},'.format(create, item.name, item.dataType)
insertColumns = '{}, {}'.format(insertColumns, item.name)
insertParams = '{}, :{}'.format(insertParams, item.name)
data[item.name] = item.value
conn = sqlite3.connect(file)
cursor = conn.cursor()
cursor.execute(self._createTable.format(create))
cursor.execute(self._insertRow.format(insertColumns, insertParams), data)
conn.commit()
conn.close()
@property
def _createTable(self):
return """
CREATE TABLE IF NOT EXISTS snapshots(
id integer PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,{}
model_size real
)
"""
@property
def _insertRow(self):
return """
INSERT INTO snapshots(id{}) VALUES(null{})
"""