import abc
import logging
import typing
from datetime import datetime
from divera.api.v2.pull import All as _All
[docs]
class Model(abc.ABC):
sorting_key: str = 'id'
def __init__(self,
data: dict = None,
):
super().__init__()
if data is None:
data = {}
self.data = data
def __getattribute__(self, item):
try:
# Try the usual way
return super().__getattribute__(item)
except AttributeError as original_error:
try:
# There is a chance for the value to be included in `self.data`, but not as `self.<<attribute>>`.
# This might happen if the version of either the instance
# or the library is not up-to-date with the other.
res = self.data[item]
logging.debug(
f"{self.__class__.__name__}.{item} has been accessed, but is not available as property yet."
)
return res
except KeyError:
return original_error
def __eq__(self, other):
return self.__getattribute__(self.sorting_key) == other.__getattribute__(other.sorting_key)
def __repr__(self):
res = '<' + ' '.join([
type(self).__name__,
str(self.sorting_key(self) if callable(self.sorting_key) else self.data[self.sorting_key]),
f'"{self.title}"'
if 'title' in self.data
and self.title is not None
else f'"{self.name}"'
if 'name' in self.data
and self.name is not None
else '',
]).strip() + '>'
return res
def __sub__(self, other):
different_attributes = []
for x in self.data.keys():
if self.data[x] != other.data[x]:
different_attributes.append(x)
return different_attributes
[docs]
@abc.abstractmethod
def get(
self,
object_id=None,
):
pass
[docs]
@staticmethod
@abc.abstractmethod
def get_all(
):
pass
[docs]
class APIModel(Model, abc.ABC):
@property
def id(self) -> int:
return self.data['id']
@property
def foreign_id(self) -> str:
return self.data['foreign_id']
@property
def author_id(self) -> int:
return self.data['author_id']
@property
def date(self) -> datetime:
if self.data['date'] is not None:
return datetime.fromtimestamp(self.data['date'])
@property
def title(self) -> str:
return self.data['title']
@property
def text(self) -> str:
return self.data['text']
@property
def address(self) -> str:
return self.data['address']
@property
def lat(self) -> float:
return self.data['lat']
@property
def lng(self) -> float:
return self.data['lng']
@property
def cluster(self) -> list:
return self.data['cluster']
@property
def group(self) -> list:
return self.data['group']
@property
def user_cluster_relation(self) -> list:
return self.data['user_cluster_relation']
@property
def private_mode(self) -> bool:
return self.data['private_mode']
@property
def notification_type(self) -> int:
return self.data['notification_type']
@property
def new(self) -> bool:
return self.data['new']
@property
def editable(self) -> bool:
return self.data['editable']
@property
def answerable(self) -> bool:
return self.data['answerable']
@property
def hidden(self) -> bool:
return self.data['hidden']
@property
def deleted(self) -> bool:
return self.data['deleted']
@property
def ucr_self_addressed(self) -> bool:
return self.data['ucr_self_addressed']
@property
def count_recipients(self) -> int:
return self.data['count_recipients']
@property
def count_read(self) -> int:
return self.data['count_read']
@property
def ts_publish(self) -> datetime:
if self.data['ts_publish'] is not None:
return datetime.fromtimestamp(self.data['ts_publish'])
@property
def ts_create(self) -> datetime:
if self.data['ts_create'] is not None:
return datetime.fromtimestamp(self.data['ts_create'])
@property
def ts_update(self) -> datetime:
if self.data['ts_update'] is not None:
return datetime.fromtimestamp(self.data['ts_update'])
@property
def cluster_id(self) -> int:
return self.data['cluster_id']
@property
def message_channel_id(self) -> int:
return self.data['message_channel_id']
@property
def message_channel(self) -> bool:
return self.data['message_channel']
@property
def attachment_count(self) -> int:
return self.data['attachment_count']
@property
def ucr_addressed(self) -> list:
return self.data['ucr_addressed']
@property
def ucr_read(self) -> list:
return self.data['ucr_read']
def __eq__(self, other):
return self.id == other.id
def __sub__(self, other):
different_attributes = []
for x in self.data.keys():
if self.data[x] != other.data[x]:
different_attributes.append(x)
return different_attributes
[docs]
class AlarmAndEvent(APIModel, abc.ABC):
@property
def custom_answers(self) -> bool:
return self.data['custom_answers']
@property
def ts_response(self) -> datetime:
return self.data['ts_response']
@property
def ucr_answered(self) -> dict:
return self.data['ucr_answered']
[docs]
class PullModel(Model, abc.ABC):
"""
Parent class for all models that get their data from pull_data
"""
path: typing.List = [
]
path_to_sorting: typing.List = None
[docs]
def get(
self,
object_id=None,
):
request = _All(
)
def process_result(result):
data = result
for p in self.__class__.path:
data = data[p]
sorting_data = result
for p in self.__class__.path_to_sorting:
sorting_data = sorting_data[p]
all_objects = [
self.__class__(
data={
**data[key],
**(
{
'id': int(key),
}
if 'id' not in data[key]
and key not in data[key].values()
else {}
)
},
)
for key in sorting_data
]
key = (
'id'
if 'id' in self.data else
self.sorting_key
)
relevant_objects = [
pm
for pm in all_objects
if pm.__getattribute__(key) == self.__getattribute__(key)
]
if len(relevant_objects) == 1:
return relevant_objects[0]
elif len(relevant_objects) > 1:
raise ValueError('There should only be one object')
else:
raise ValueError('Object could not be found')
request.process_result = process_result
return request
[docs]
@staticmethod
def get_all(
model: typing.Type,
):
request = _All(
)
def process_result(result):
data = result
for p in model.path:
data = data[p]
all_objects = [
model(
data={
**data[key],
**(
{
'id': int(key)
}
if 'id' not in data[key]
and key not in data[key].values()
else {}
)
},
)
for key in data
]
def get_by_sorting_key(obj, key):
# automatically handle lambdas and str as self.sorting_key
if callable(key):
return key(obj)
else:
return obj.__getattribute__(key)
if model.path_to_sorting is not None:
sorting = result
for p in model.path_to_sorting:
sorting = sorting[p]
objects_sorted = []
for ident in sorting:
# Pick the item from wherever it is located the original list and append it to the output
l = [
get_by_sorting_key(obj, obj.sorting_key)
for obj in all_objects
]
try:
index = l.index(ident)
except ValueError as e:
logging.warning('List contains an id that could not be found')
objects_sorted = all_objects
break
objects_sorted.append(all_objects.pop(index))
else:
objects_sorted = sorted(
all_objects,
key=lambda x: get_by_sorting_key(x, x.sorting_key),
)
all_objects = objects_sorted
return all_objects
request.process_result = process_result
return request