PyJeeves/pyjeeves/connector.py
Marcus Lindvall 0af38e286e Stored procedure helpers. Order repo and model. More SQLService updates.
* A generic stored procedure helper added, and support for calling them.
* Order and OrderItem tables added, including helpers and calls to SP for creation and updates.
* Minor updates to other repositories.
2019-08-30 12:09:10 +02:00

108 lines
3.3 KiB
Python

# -*- coding: utf-8 -*-
"""
pyjeeves
~~~~~~~~~~~~~~~
Global objects
"""
from pyjeeves import logging, config
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
# from sqlalchemy.orm.exc import UnmappedClassError
from pymssql import OperationalError
from sqlservice import SQLClient, SQLQuery
logger = logging.getLogger("PyJeeves." + __name__)
class BaseFilterQuery(SQLQuery):
def get(self, ident):
# Override get() so that the flag is always checked in the
# DB as opposed to pulling from the identity map. - this is optional.
return SQLQuery.get(self.populate_existing(), ident)
def __iter__(self):
return SQLQuery.__iter__(self.private())
def from_self(self, *ent):
# Override from_self() to automatically apply
# the criterion to. this works with count() and
# others.
return SQLQuery.from_self(self.private(), *ent)
def private(self):
# Fetch the model name and column list and apply model-specific base filters
mzero = self._mapper_zero()
if mzero:
# Sometimes a plain model class will be fetched instead of mzero
try:
model = mzero.class_
obj = mzero.class_
except Exception:
model = mzero.__class__
obj = mzero
if hasattr(model, '_base_filters'):
return self.enable_assertions(False).filter(model._base_filters(obj))
return self
class DBConnector(object):
"""This class is used to control the SQLAlchemy integration"""
def __init__(self, enabled_clients=['raw'], metadata=None):
logger.info("Creating engines and sessionmakers")
self.raw, self.raw_engine = (self.raw_session() if 'raw' in enabled_clients else {})
self.meta = (self.meta_session() if 'meta' in enabled_clients else {})
def callproc(self, procedure="", params=[]):
conn = self.raw_engine.raw_connection()
with conn.cursor() as cursor:
try:
retval = cursor.callproc(procedure, params)
try:
cursor.nextset()
retval = cursor.fetchall()
except OperationalError:
logger.debug("Executed statement has no resultset")
conn.commit()
finally:
conn.close()
return retval
def execute(self, operation=""):
conn = self.raw_engine.raw_connection()
with conn.cursor(as_dict=True) as cursor:
try:
cursor.execute(operation)
results = cursor.fetchall()
finally:
conn.close()
return results
def raw_session(self):
uri = 'mssql+pymssql://{user}:{pw}@{host}:{port}/{db}?charset=utf8'.format(
**config.config['databases']['raw'])
sql_client_config = {'SQL_DATABASE_URI': uri}
db = SQLClient(sql_client_config, query_class=BaseFilterQuery)
return db.session, db.engine
def meta_session(self):
meta_engine = create_engine(
'mysql+pymysql://{user}:{pw}@{host}:{port}/{db}?charset=utf8mb4'.format(
**config.config['databases']['meta']))
return scoped_session(sessionmaker(bind=meta_engine))