Source code for divera.models

import abc
import logging
import typing
from datetime import datetime

from divera.api.v2.pull import All as _All


[docs] class Model(abc.ABC): sorting_key: str = 'id' def __init__(self, data: dict = None, ): super().__init__() if data is None: data = {} self.data = data def __getattribute__(self, item): try: # Try the usual way return super().__getattribute__(item) except AttributeError as original_error: try: # There is a chance for the value to be included in `self.data`, but not as `self.<<attribute>>`. # This might happen if the version of either the instance # or the library is not up-to-date with the other. res = self.data[item] logging.debug( f"{self.__class__.__name__}.{item} has been accessed, but is not available as property yet." ) return res except KeyError: return original_error def __eq__(self, other): return self.__getattribute__(self.sorting_key) == other.__getattribute__(other.sorting_key) def __repr__(self): res = '<' + ' '.join([ type(self).__name__, str(self.sorting_key(self) if callable(self.sorting_key) else self.data[self.sorting_key]), f'"{self.title}"' if 'title' in self.data and self.title is not None else f'"{self.name}"' if 'name' in self.data and self.name is not None else '', ]).strip() + '>' return res def __sub__(self, other): different_attributes = [] for x in self.data.keys(): if self.data[x] != other.data[x]: different_attributes.append(x) return different_attributes
[docs] @abc.abstractmethod def get( self, object_id=None, ): pass
[docs] @staticmethod @abc.abstractmethod def get_all( ): pass
[docs] class APIModel(Model, abc.ABC): @property def id(self) -> int: return self.data['id'] @property def foreign_id(self) -> str: return self.data['foreign_id'] @property def author_id(self) -> int: return self.data['author_id'] @property def date(self) -> datetime: if self.data['date'] is not None: return datetime.fromtimestamp(self.data['date']) @property def title(self) -> str: return self.data['title'] @property def text(self) -> str: return self.data['text'] @property def address(self) -> str: return self.data['address'] @property def lat(self) -> float: return self.data['lat'] @property def lng(self) -> float: return self.data['lng'] @property def cluster(self) -> list: return self.data['cluster'] @property def group(self) -> list: return self.data['group'] @property def user_cluster_relation(self) -> list: return self.data['user_cluster_relation'] @property def private_mode(self) -> bool: return self.data['private_mode'] @property def notification_type(self) -> int: return self.data['notification_type'] @property def new(self) -> bool: return self.data['new'] @property def editable(self) -> bool: return self.data['editable'] @property def answerable(self) -> bool: return self.data['answerable'] @property def hidden(self) -> bool: return self.data['hidden'] @property def deleted(self) -> bool: return self.data['deleted'] @property def ucr_self_addressed(self) -> bool: return self.data['ucr_self_addressed'] @property def count_recipients(self) -> int: return self.data['count_recipients'] @property def count_read(self) -> int: return self.data['count_read'] @property def ts_publish(self) -> datetime: if self.data['ts_publish'] is not None: return datetime.fromtimestamp(self.data['ts_publish']) @property def ts_create(self) -> datetime: if self.data['ts_create'] is not None: return datetime.fromtimestamp(self.data['ts_create']) @property def ts_update(self) -> datetime: if self.data['ts_update'] is not None: return datetime.fromtimestamp(self.data['ts_update']) @property def cluster_id(self) -> int: return self.data['cluster_id'] @property def message_channel_id(self) -> int: return self.data['message_channel_id'] @property def message_channel(self) -> bool: return self.data['message_channel'] @property def attachment_count(self) -> int: return self.data['attachment_count'] @property def ucr_addressed(self) -> list: return self.data['ucr_addressed'] @property def ucr_read(self) -> list: return self.data['ucr_read'] def __eq__(self, other): return self.id == other.id def __sub__(self, other): different_attributes = [] for x in self.data.keys(): if self.data[x] != other.data[x]: different_attributes.append(x) return different_attributes
[docs] class AlarmAndEvent(APIModel, abc.ABC): @property def custom_answers(self) -> bool: return self.data['custom_answers'] @property def ts_response(self) -> datetime: return self.data['ts_response'] @property def ucr_answered(self) -> dict: return self.data['ucr_answered']
[docs] class PullModel(Model, abc.ABC): """ Parent class for all models that get their data from pull_data """ path: typing.List = [ ] path_to_sorting: typing.List = None
[docs] def get( self, object_id=None, ): request = _All( ) def process_result(result): data = result for p in self.__class__.path: data = data[p] sorting_data = result for p in self.__class__.path_to_sorting: sorting_data = sorting_data[p] all_objects = [ self.__class__( data={ **data[key], **( { 'id': int(key), } if 'id' not in data[key] and key not in data[key].values() else {} ) }, ) for key in sorting_data ] key = ( 'id' if 'id' in self.data else self.sorting_key ) relevant_objects = [ pm for pm in all_objects if pm.__getattribute__(key) == self.__getattribute__(key) ] if len(relevant_objects) == 1: return relevant_objects[0] elif len(relevant_objects) > 1: raise ValueError('There should only be one object') else: raise ValueError('Object could not be found') request.process_result = process_result return request
[docs] @staticmethod def get_all( model: typing.Type, ): request = _All( ) def process_result(result): data = result for p in model.path: data = data[p] all_objects = [ model( data={ **data[key], **( { 'id': int(key) } if 'id' not in data[key] and key not in data[key].values() else {} ) }, ) for key in data ] def get_by_sorting_key(obj, key): # automatically handle lambdas and str as self.sorting_key if callable(key): return key(obj) else: return obj.__getattribute__(key) if model.path_to_sorting is not None: sorting = result for p in model.path_to_sorting: sorting = sorting[p] objects_sorted = [] for ident in sorting: # Pick the item from wherever it is located the original list and append it to the output l = [ get_by_sorting_key(obj, obj.sorting_key) for obj in all_objects ] try: index = l.index(ident) except ValueError as e: logging.warning('List contains an id that could not be found') objects_sorted = all_objects break objects_sorted.append(all_objects.pop(index)) else: objects_sorted = sorted( all_objects, key=lambda x: get_by_sorting_key(x, x.sorting_key), ) all_objects = objects_sorted return all_objects request.process_result = process_result return request