In progress: Jeeves communication is now based on SQLAlchemy

This commit is contained in:
Marcus Lindvall 2019-04-05 16:50:38 +02:00
parent 0fdc029153
commit 28726fee01
21 changed files with 637 additions and 78 deletions

View file

@ -0,0 +1,6 @@
# from logging.config import dictConfig
# import config
import logging

70
pyjeeves/config.py Normal file
View file

@ -0,0 +1,70 @@
from logging.config import dictConfig
import os
import yaml
defaults = {
'alembic': {
'script_location': 'migrations',
'sqlalchemy.url': ''},
'databases': {
'meta': {},
'raw': {}},
'logging': {
'formatters': {
'simpleFormatter': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'}},
'handlers': {
'consoleHandler': {
'class': 'logging.StreamHandler',
'formatter': 'simpleFormatter',
'level': 'DEBUG',
'stream': 'ext://sys.stdout'},
'fileHandler': {
'class': 'logging.FileHandler',
'filename': 'pyjeeves.log',
'formatter': 'simpleFormatter',
'level': 'INFO'}},
'loggers': {
'PyJeeves': {'handlers': ['fileHandler'],
'level': 'DEBUG'},
'alembic': {'handlers': ['fileHandler'],
'level': 'INFO'},
'sqlalchemy': {'handlers': ['fileHandler'],
'level': 'WARN',
'qualname': 'sqlalchemy.engine'}},
'root': {'handlers': ['consoleHandler'], 'level': 'DEBUG'},
'version': 1},
'sync_interval': 60}
try:
with open("config.yml", 'r') as ymlfile:
file_config = yaml.load(ymlfile, Loader=yaml.FullLoader)
config = {**defaults, **file_config} # Syntax introduced in Python 3.5
except IOError as e:
pass
dictConfig(config['logging'])
config['debug'] = os.getenv('ENVIRONEMENT') == 'DEV'
DB_CONTAINER = os.getenv('APPLICATION_DB_CONTAINER', 'db')
config['databases']['raw'] = {
'user': os.getenv('JEEVES_USER', 'jvsdbo'),
'pw': os.getenv('JEEVES_PW', ''),
'host': os.getenv('JEEVES_HOST', ''),
'port': os.getenv('JEEVES_PORT', 1433),
'db': os.getenv('JEEVES_DB', ''),
}
config['databases']['meta'] = {
'user': os.getenv('META_MYSQL_USER', 'pyjeeves'),
'pw': os.getenv('META_MYSQL_PW', ''),
'host': os.getenv('META_MYSQL_HOST', DB_CONTAINER),
'port': os.getenv('META_MYSQL_PORT', 3306),
'db': os.getenv('META_MYSQL_DB', 'pyjeeves'),
}
# DB_URI = 'postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES
config['alembic']['sqlalchemy.url'] = (
'mysql+pymysql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s?charset=utf8mb4' %
config['databases']['meta'])

142
pyjeeves/connector.py Normal file
View file

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
"""
pyjeeves
~~~~~~~~~~~~~~~
Global objects
"""
from pyjeeves import logging, config
from weakref import WeakValueDictionary
from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker, scoped_session, Query, aliased
from sqlalchemy.orm.exc import UnmappedClassError
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
logger = logging.getLogger("PyJeeves." + __name__)
class BaseFilterQuery(Query):
def get(self, ident):
# Override get() so that the flag is always checked in the
# DB as opposed to pulling from the identity map. - this is optional.
return Query.get(self.populate_existing(), ident)
def __iter__(self):
return Query.__iter__(self.private())
def from_self(self, *ent):
# Override from_self() to automatically apply
# the criterion to. this works with count() and
# others.
return Query.from_self(self.private(), *ent)
def private(self):
# Fetch the model name and column list and apply model-specific base filters
mzero = self._mapper_zero()
if mzero:
# Sometimes a plain model class will be fetched instead of mzero
try:
model = mzero.class_
obj = mzero.class_
except Exception:
model = mzero.__class__
obj = mzero
if hasattr(model, '_base_filters'):
return self.enable_assertions(False).filter(model._base_filters(obj))
return self
class Model(object):
"""Baseclass for custom user models."""
#: the query class used. The :attr:`query` attribute is an instance
#: of this class. By default a :class:`BaseQuery` is used.
query_class = BaseFilterQuery
#: an instance of :attr:`query_class`. Can be used to query the
#: database for instances of this model.
query = None
class MetaBaseModel(DeclarativeMeta):
""" Define a metaclass for the BaseModel
Implement `__getitem__` for managing aliases """
def __init__(cls, *args):
super().__init__(*args)
cls.aliases = WeakValueDictionary()
def __getitem__(cls, key):
try:
alias = cls.aliases[key]
except KeyError:
alias = aliased(cls)
cls.aliases[key] = alias
return alias
class _QueryProperty(object):
def __init__(self, sa):
self.sa = sa
def __get__(self, obj, type):
try:
mapper = orm.class_mapper(type)
if mapper:
if type.__module__ == 'pyjeeves.models.raw':
return type.query_class(mapper, session=self.sa.raw_session())
else:
return type.query_class(mapper, session=self.sa.meta_session())
except UnmappedClassError:
return None
class DBConnector(object):
"""This class is used to control the SQLAlchemy integration"""
def __init__(self, enabled_sessions=['raw'], metadata=None):
logger.info("Creating engines and sessionmakers")
self.raw_session, self.meta_session = self.create_scoped_session(enabled_sessions)
self.Model = self.make_declarative_base(metadata)
# self.Query = Query
@property
def metadata(self):
"""Returns the metadata"""
return self.Model.metadata
# @property
# def _config(self):
# """Returns the configuration"""
# return config()
def make_declarative_base(self, metadata=None):
"""Creates the declarative base."""
base = declarative_base(cls=Model, name='Model',
metadata=metadata,
metaclass=MetaBaseModel)
base.query = _QueryProperty(self)
return base
def create_scoped_session(self, sessions=[]):
RawSession, MetaSession = None, None
if 'raw' in sessions:
raw_engine = create_engine(
'mssql+pymssql://{user}:{pw}@{host}:{port}/{db}?charset=utf8'.format(
**config.config['databases']['raw']),
implicit_returning=False)
RawSession = scoped_session(sessionmaker(bind=raw_engine))
if 'meta' in sessions:
meta_engine = create_engine(
'mysql+pymysql://{user}:{pw}@{host}:{port}/{db}?charset=utf8mb4'.format(
**config.config['databases']['meta']))
MetaSession = scoped_session(sessionmaker(bind=meta_engine))
return RawSession, MetaSession

View file

@ -1,26 +0,0 @@
# -*- coding: utf-8 -*-
from sqlalchemy import create_engine
from sqlalchemy.orm.session import Session
from models.jvsmodels import Base
class MySQLSession(Session):
"""docstring for MySQLSession"""
def __init__(self, settings):
self.engine = create_engine(
'mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}?charset=utf8mb4'.format(**settings))
super(MySQLSession, self).__init__(bind=self.engine)
def create_db(self):
Base.metadata.create_all(self.engine)
if __name__ == '__main__':
import yaml
with open("config.yml", 'r') as ymlfile:
cfg = yaml.load(ymlfile)
session = MySQLSession(cfg['mysql'])
session.create_db()

12
pyjeeves/db_meta.py Normal file
View file

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
from pyjeeves import meta_engine
from models.meta import Base
if __name__ == '__main__':
from pyjeeves import logging
logger = logging.getLogger("PyJeeves." + __name__)
logger.info("Creating meta database")
Base.metadata.create_all(meta_engine)

View file

@ -7,14 +7,14 @@
"""
import pymssql
import datetime
import logging
from pyjeeves import logging, config
class JvsQuery():
"""JvsQuery based on http://pymssql.org/en/stable/ """
def __init__(self, settings):
def __init__(self):
super(JvsQuery, self).__init__()
self.settings = settings
self.settings = config['jeeves_db']
self.logger = logging.getLogger("PyJeeves.jvsquery")
def _execute(self, query="", params=(), iterator=True):

View file

@ -1,19 +1,19 @@
# -*- coding: utf-8 -*-
import pprint
import yaml
import signal
import sys
import logging
import logging.config
# import logging
# import logging.config
from alembic.config import Config
from alembic import command
from pyjeeves.connector import DBConnector
from pyjeeves import config
from process import Process
from jvsquery import JvsQuery
from db import MySQLSession
from utils import TaskThread
@ -25,8 +25,9 @@ class SyncTread(TaskThread):
def __init__(self, config):
super(SyncTread, self).__init__()
jvs_query = JvsQuery(config['jeeves_db'])
db_session = MySQLSession(config['mysql'])
# Use RawSession instead...
jvs_query = JvsQuery()
None, db_session = DBConnector.create_scoped_session(['meta'])
self.process = Process(jvs_query, db_session)
self.logger = logging.getLogger("PyJeeves.SyncTread")
@ -38,16 +39,15 @@ class SyncTread(TaskThread):
if __name__ == '__main__':
with open("config.yml", 'r') as ymlfile:
cfg = yaml.load(ymlfile)
logging.config.dictConfig(cfg['logging'])
from pyjeeves import logging
# logging.config.dictConfig(config['logging'])
logger = logging.getLogger("PyJeeves")
logger.info("Running migrations")
alembic_cfg = Config()
for k in cfg['alembic']:
alembic_cfg.set_main_option(k, cfg['alembic'][k])
for k in config['alembic']:
alembic_cfg.set_main_option(k, config['alembic'][k])
command.upgrade(alembic_cfg, "head")
logger.info("Application started")
@ -59,9 +59,9 @@ if __name__ == '__main__':
signal.signal(signal.SIGINT, sigterm_handler)
signal.signal(signal.SIGTERM, sigterm_handler)
sync_thread = SyncTread(cfg)
sync_thread = SyncTread()
try:
sync_thread.setInterval(cfg['sync_interval'])
sync_thread.setInterval(config['sync_interval'])
sync_thread.start()
sync_thread.join()
finally:

View file

@ -3,7 +3,8 @@
pyjeeves.models
~~~~~~~~~~~~~~~
consolodated models module
models for databases
"""
from pyjeeves.connector import DBConnector
from .jvsmodels import * # noqa
db = DBConnector()

112
pyjeeves/models/abc.py Normal file
View file

@ -0,0 +1,112 @@
"""
Define an Abstract Base Class (ABC) for models
"""
from datetime import datetime
from decimal import Decimal
from sqlalchemy import inspect
from sqlalchemy.sql.expression import and_
from sqlalchemy.orm.collections import InstrumentedList
from pyjeeves import logging
from . import db
logger = logging.getLogger("PyJeeves." + __name__)
class RawBaseModel():
""" Generalize __init__, __repr__ and to_json
Based on the models columns , ForetagKod=1"""
print_only = () # First filter
print_filter = () # Second filter
to_json_filter = () # Only json filter
column_map = {}
__table_args__ = {
'extend_existing': True
}
@classmethod
def _base_filters(self, obj, filters=and_()):
# This method provides base filtering, additional filtering can be done in subclasses
# Add this method to your model if you want more filtering, otherwise leave it out
# import and_ from sqlalchemy package
# this is a base filter for ALL queries
return and_(
obj.ForetagKod == 1,
filters
)
def __repr__(self):
""" Define a base way to print models
Columns inside `print_filter` are excluded """
return '%s(%s)' % (self.__class__.__name__, {
column: value
for column, value in self._to_dict().items()
if column not in self.print_filter
})
@staticmethod
def _to_json_types(value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d')
if isinstance(value, Decimal):
return "%.2f" % value
try:
if isinstance(value, InstrumentedList):
return [x.json for x in value]
if type(value).__module__ != 'builtins': # Perhaps == builtin?
return value.json
except AttributeError:
logger.debug(str(type(value)) + " was not converted to jsonifyable type")
return None
return value
@property
def json(self):
""" Define a base way to jsonify models
Columns inside `to_json_filter` are excluded
Columns inside `to_json_only_filter` are only included """
return {
column: RawBaseModel._to_json_types(value)
# if not isinstance(value, datetime) else value.strftime('%Y-%m-%d')
# if type(value).__module__ != self.__module__ # Perhaps == builtin?
# else value.json # Convert instances to json if same module
for column, value in self._to_dict().items()
if column not in self.to_json_filter
}
def _to_dict(self):
""" This would more or less be the same as a `to_json`
But putting it in a "private" function
Allows to_json to be overriden without impacting __repr__
Or the other way around
And to add filter lists """
return {
self._map_columns(column.key): getattr(self, column.key)
for column in inspect(self.__class__).attrs
if not self.print_only or column.key in self.print_only
}
def _map_columns(self, key):
if key in self.column_map:
return self.column_map[key]
return key
def merge(self):
db.raw_session.merge(self)
return self
def commit(self):
db.raw_session.commit()
def save(self):
db.raw_session.add(self)
db.raw_session.commit()
return self
def delete(self):
db.raw_session.delete(self)
db.raw_session.commit()

View file

@ -3,7 +3,7 @@
pyjeeves.models
~~~~~~~~~~~~~~~~~~~~~~
Jeeves data models
Jeeves meta data models
"""
from sqlalchemy.ext.declarative import declarative_base

104
pyjeeves/models/raw.py Normal file
View file

@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
"""
pyjeeves.models
~~~~~~~~~~~~~~~~~~~~~~
Jeeves raw data models
"""
# from sqlalchemy import Column, String
from sqlalchemy.schema import MetaData, ForeignKey, Column
from sqlalchemy.orm import relationship
from sqlalchemy.types import Integer, String
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.sql.expression import and_
# from pyjeeves.session import raw_engine
from . import db
from pyjeeves import logging
from .abc import RawBaseModel
logger = logging.getLogger("PyJeeves." + __name__)
logger.info("Reading Jeeves DB structure")
meta = MetaData()
meta.reflect(bind=db.raw_session.connection(), only=['ar', 'ars', 'fr', 'kus', 'oh', 'lp', 'vg'])
# Table('fr', meta, implicit_returning=False)
Base = automap_base(cls=db.Model, name='Model', metadata=meta)
class CommodityGroup(Base, RawBaseModel):
__tablename__ = 'vg'
column_map = {'VaruGruppKod': 'CommodityGroupNumber', 'VaruGruppBeskr': 'CommodityGroupName'}
print_only = ('VaruGruppKod', 'VaruGruppBeskr')
print_filter = ('Articles', 'articles_collection')
# to_json_filter = ('Articles', 'articles_collection')
class ArticleBalance(Base, RawBaseModel):
__tablename__ = 'ars'
column_map = {'LagSaldo': 'Balance',
'LagResAnt': 'ReservedBalance',
'LagsaldoAltEnh': 'BalanceAlternative',
'LagResAntAltEnh': 'ReservedAlternativeBalance',
'LagStalle': 'StorageLocationNumber'}
print_only = ('LagSaldo',
'LagResAnt',
'LagsaldoAltEnh',
'LagResAntAltEnh',
'LagStalle')
# print_filter = ('Articles', 'articles_collection')
# to_json_filter = ('Articles', 'articles_collection')
ArtNr = Column(Integer, ForeignKey('ar.ArtNr'), primary_key=True)
class Articles(Base, RawBaseModel):
__tablename__ = 'ar'
column_map = {'ArtNr': 'ArticleNumber',
'ArtBeskr': 'ArticleName',
'LagSaldoArtikel': 'Balance',
'EnhetsKod': 'Unit',
'ArtListPris': 'ListPrice'}
print_only = (
'ArtNr',
'ArtBeskr',
'CommodityGroup',
'ArticleBalance',
'EnhetsKod',
'LagSaldoArtikel',
'RowCreatedDt',
'ArtListPris')
ArtNr = Column(Integer, primary_key=True)
VaruGruppKod = Column(Integer, ForeignKey('vg.VaruGruppKod'), primary_key=True)
CommodityGroup = relationship(CommodityGroup)
ArticleBalance = relationship(ArticleBalance)
@classmethod
def _base_filters(self, obj):
return RawBaseModel._base_filters(
obj,
and_(obj.LagTyp == 0)
)
class Companies(Base, RawBaseModel):
__tablename__ = 'fr'
column_map = {'FtgNr': 'CompanyNumber', 'FtgNamn': 'CompanyName'}
print_only = ('CompanyNumber', 'CompanyName')
FtgNr = Column(String, primary_key=True)
Base.prepare()
# Base companies for cusomters and suppliers
Customers = Base.classes.kus # Customer information
Orders = Base.classes.oh # Orders by customers
DelivLoc = Base.classes.lp # Connections between a delivery company and customer company

View file

@ -7,7 +7,7 @@
"""
from models import Articles, Customers, InvoiceRows, OrderRows
from models.meta import Articles, Customers, InvoiceRows, OrderRows
from sqlalchemy import desc
from sqlalchemy.inspection import inspect
@ -19,9 +19,10 @@ class Process():
"""docstring for Process"""
def __init__(self, jvs_query, db_session):
super(Process, self).__init__()
# Refactor code to use RawSession instead of raw jvs queries
self.query = jvs_query
self.session = db_session
self.logger = logging.getLogger("PyJeeves.process")
self.logger = logging.getLogger("PyJeeves." + __name__)
def _update_model(self, model, kwargs):
for k, v in kwargs.items():
@ -47,6 +48,7 @@ class Process():
if _data:
self.logger.info("Syncing %s" % jvs_tbl)
nth_item = 0
for item in _data:
_filter_kwargs = {k: item.get(k) for k in _p_keys}
_existing = self.session.query(model).\
@ -57,6 +59,12 @@ class Process():
else:
_new = model(**item)
self.session.add(_new)
if nth_item % 1000 == 0 and nth_item != 0:
self.session.commit()
nth_item += 1
self.session.commit()
else:
self.logger.info("No sync made for %s" % jvs_tbl)
@ -65,4 +73,3 @@ class Process():
self._sync_model(Articles, 'Articles')
self._sync_model(InvoiceRows, 'InvoiceRows')
self._sync_model(OrderRows, 'OrderRows')
self.session.commit()

View file

@ -0,0 +1,2 @@
from .location import Location
from .article import Article

View file

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
from pyjeeves.models.raw import Articles
from sqlalchemy.sql.expression import and_
from pyjeeves import logging
logger = logging.getLogger("PyJeeves." + __name__)
# Relocate Jeeves modules to separate folder and let a "master" module handle imports, and setup.
class Article():
"""Handles dispatch locations in Jeeves"""
def __init__(self):
super(Article, self).__init__()
@staticmethod
def get(art_no):
""" Query an article by number """
return Articles.query.filter_by(
ArtNr=art_no
).one()
@staticmethod
def get_all(filter_=and_(Articles.ItemStatusCode == 0, Articles.ArtKod != 2)):
# .filter_by(ItemStatusCode=0, ArtKod=2)
return Articles.query.filter(filter_).all()
if __name__ == '__main__':
# print([column.key for column in Companies.__table__.columns])
logger.info("Starting TEST")
# session = RawSession()
logger.info("Testing gettings an article")
# c1 = session.query(Companies).filter_by(FtgNr="179580").first()
# print(Articles)
c1 = Articles.query.filter_by(ArtNr="2103").first()
print(c1)
logger.info(c1.json)
print (
len(Article.get_all())
)

View file

@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
from pyjeeves.models.raw import Companies, DelivLoc
from pyjeeves import logging
logger = logging.getLogger("PyJeeves." + __name__)
# Relocate Jeeves modules to separate folder and let a "master" module handle imports, and setup.
class Location():
"""Handles dispatch locations in Jeeves"""
def __init__(self):
super(Location, self).__init__()
self.associated_company = '' # Company with new/existing locations
self._deliv_locs = [] # List of locations to be connected
def _connect_deliv_loc(self, ftgnr, description, code):
if self.associated_company == '':
raise
if len(description) > 36:
logger.warn("Truncated description %s", (description))
description = description[:36]
_deliv_loc = DelivLoc(
FtgNr=self.associated_company, OrdLevPlats1=ftgnr,
OrdLevPlBeskr=description, ForetagKod=1)
self._deliv_locs.append(_deliv_loc)
# self.session.merge(_deliv_loc)
return _deliv_loc
def create_lev_location(self, ftgnr='', name='', address='',
postal_code='', city='', gln='', invoice_ref='', phone=''):
_loc = Companies(
FtgNr=str(ftgnr), FtgNamn=name, FtgPostadr5=address,
FtgLevPostNr=postal_code, FtgPostLevAdr3=city,
EAN_Loc_Code=gln, FtgPostAdr1=invoice_ref, ComNr=phone,
ForetagKod=1)
# logger.debug("Adding company to location session")
# with self.session.no_autoflush:
# # self.session.merge(_loc) # "merge" updates if existing location exists.
_deliv_loc = self._connect_deliv_loc(ftgnr, name, gln)
return _loc, _deliv_loc
def save_locations(self):
logger.debug("Committing all location changes")
# self.session.commit() # Location company needs to be created in order to connect them.
for deliv_loc in self._deliv_locs:
deliv_loc.merge()
# self.session.merge(deliv_loc) # Create "connnections" between Customer and Location.
Companies.commit()
# self.session.commit()
if __name__ == '__main__':
# print([column.key for column in Companies.__table__.columns])
logger.info("Starting TEST")
# session = RawSession()
logger.info("Testing gettings a company")
# c1 = session.query(Companies).filter_by(FtgNr="179580").first()
print(Companies)
c1 = Companies.query.filter_by(FtgNr="179580").first()
logger.info(c1.json)
# RawSession.remove()
# from sqlalchemy.inspection import inspect
# print (inspect(Companies).columns.items())