107 lines
3.1 KiB
Python
107 lines
3.1 KiB
Python
"""
|
|
Define an Abstract Base Class (ABC) for models
|
|
"""
|
|
from decimal import Decimal
|
|
|
|
from sqlalchemy.sql.expression import and_
|
|
from sqlalchemy.ext.hybrid import hybrid_property
|
|
|
|
from sqlservice import ModelBase
|
|
|
|
from pyjeeves import logging
|
|
|
|
from . import db
|
|
|
|
logger = logging.getLogger("PyJeeves." + __name__)
|
|
|
|
|
|
class RawBaseModel(ModelBase):
|
|
""" Generalize __init__, __repr__ and to_json
|
|
Based on the models columns , ForetagKod=1"""
|
|
|
|
__to_dict_filter__ = []
|
|
__to_dict_only__ = ()
|
|
__column_map__ = {}
|
|
|
|
__table_args__ = {
|
|
'extend_existing': True
|
|
}
|
|
|
|
__dict_args__ = {
|
|
'adapters': {
|
|
# datetime: lambda value, col, *_: value.strftime('%Y-%m-%d'),
|
|
Decimal: lambda value, col, *_: "{:.2f}".format(value)
|
|
}
|
|
}
|
|
|
|
@classmethod
|
|
def _base_filters(self, obj, filters=and_()):
|
|
# This method provides base filtering, additional filtering can be done in subclasses
|
|
# Add this method to your model if you want more filtering, otherwise leave it out
|
|
# import and_ from sqlalchemy package
|
|
# this is a base filter for ALL queries
|
|
return and_(
|
|
obj.ForetagKod == 1,
|
|
filters
|
|
)
|
|
|
|
def _map_columns(self, key):
|
|
if key in self.__column_map__:
|
|
return self.__column_map__[key]
|
|
return key
|
|
|
|
def descriptors_to_dict(self):
|
|
"""Return a ``dict`` that maps data loaded in :attr:`__dict__` to this
|
|
model's descriptors. The data contained in :attr:`__dict__` represents
|
|
the model's state that has been loaded from the database. Accessing
|
|
values in :attr:`__dict__` will prevent SQLAlchemy from issuing
|
|
database queries for any ORM data that hasn't been loaded from the
|
|
database already.
|
|
|
|
Note:
|
|
The ``dict`` returned will contain model instances for any
|
|
relationship data that is loaded. To get a ``dict`` containing all
|
|
non-ORM objects, use :meth:`to_dict`.
|
|
|
|
Returns:
|
|
dict
|
|
"""
|
|
descriptors = self.descriptors()
|
|
|
|
return { # Expose hybrid_property extension
|
|
**{key: getattr(self, key) for key in descriptors.keys()
|
|
if isinstance(descriptors.get(key), hybrid_property)},
|
|
# and return all items included in descriptors
|
|
**{key: value for key, value in self.__dict__.items()
|
|
if key in descriptors}}
|
|
|
|
def to_dict(self):
|
|
rv = super().to_dict()
|
|
|
|
if self.__to_dict_only__:
|
|
return {
|
|
self._map_columns(key): rv[key]
|
|
for key in rv
|
|
if key in self.__to_dict_only__
|
|
}
|
|
|
|
for _filter in self.__to_dict_filter__:
|
|
rv.pop(_filter)
|
|
|
|
return rv
|
|
|
|
def merge(self):
|
|
db.raw_session.merge(self)
|
|
return self
|
|
|
|
def commit(self):
|
|
db.raw_session.commit()
|
|
|
|
def save(self):
|
|
db.raw_session.add(self)
|
|
db.raw_session.commit()
|
|
return self
|
|
|
|
def delete(self):
|
|
db.raw_session.delete(self)
|
|
db.raw_session.commit()
|