Source code for

# This file is a part of the AnyBlok project
#    Copyright (C) 2015 Jean-Sebastien SUZANNE <>
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at
from anyblok.declarations import Declarations, hybrid_method
from anyblok.column import String, Json
from .exceptions import IOMappingCheckException, IOMappingSetException
from logging import getLogger
logger = getLogger(__name__)

register = Declarations.register
Model = Declarations.Model

[docs]@register(Model.IO) class Mapping: key = String(primary_key=True) model = String(primary_key=True, foreign_key=Model.System.Model.use('name')) primary_key = Json(nullable=False) blokname = String(label="Blok name", foreign_key=Model.System.Blok.use('name'))
[docs] @hybrid_method def filter_by_model_and_key(self, model, key): """ SQLAlechemy hybrid method to filter by model and key :param model: model of the mapping :param key: external key of the mapping """ return (self.model == model) & (self.key == key)
[docs] @hybrid_method def filter_by_model_and_keys(self, model, *keys): """ SQLAlechemy hybrid method to filter by model and key :param model: model of the mapping :param key: external key of the mapping """ return (self.model == model) & self.key.in_(keys)
def remove_element(self, byquery=False): val = self.registry.get(self.model).from_primary_keys( **self.primary_key)"Remove entity for %r.%r: %r" % ( self.model, self.key, val)) val.delete(byquery=byquery, remove_mapping=False)
[docs] @classmethod def multi_delete(cls, model, *keys, **kwargs): """ Delete all the keys for this model :param model: model of the mapping :param \*keys: list of the key :rtype: Boolean True if the mappings are removed """ mapping_only = kwargs.get('mapping_only', True) byquery = kwargs.get('byquery', False) query = cls.query() query = query.filter(cls.filter_by_model_and_keys(model, *keys)) count = query.count() if count: if not mapping_only: query.all().remove_element(byquery=byquery) query.delete(synchronize_session='fetch', remove_mapping=False) cls.registry.expire_all() return count return 0
[docs] @classmethod def delete(cls, model, key, mapping_only=True, byquery=False): """ Delete the key for this model :param model: model of the mapping :param key: string of the key :rtype: Boolean True if the mapping is removed """ query = cls.query() query = query.filter(cls.filter_by_model_and_key(model, key)) count = query.count() if count: if not mapping_only: query.delete(remove_mapping=False) return count return 0
[docs] @classmethod def get_mapping_primary_keys(cls, model, key): """ return primary key for a model and an external key :param model: model of the mapping :param key: string of the key :rtype: dict primary key: value or None """ query = cls.query() query = query.filter(cls.filter_by_model_and_key(model, key)) if query.count(): pks = query.first().primary_key cls.check_primary_keys(model, *pks.keys()) return pks return None
[docs] @classmethod def check_primary_keys(cls, model, *pks): """ check if the all the primary keys match with primary keys of the model :param model: model to check :param pks: list of the primary keys to check :exception: IOMappingCheckException """ for pk in cls.get_model(model).get_primary_keys(): if pk not in pks: raise IOMappingCheckException( "No primary key %r found in %r for model %r" % ( pk, pks, model))
[docs] @classmethod def set_primary_keys(cls, model, key, pks, raiseifexist=True, blokname=None): """ Add or update a mmping with a model and a external key :param model: model to check :param key: string of the key :param pks: dict of the primary key to save :param raiseifexist: boolean (True by default), if True and the entry exist then an exception is raised :param blokname: name of the blok where come from the mapping :exception: IOMappingSetException """ if cls.get_mapping_primary_keys(model, key): if raiseifexist: raise IOMappingSetException( "One value found for model %r and key %r" % (model, key)) cls.delete(model, key) if not pks: raise IOMappingSetException( "No value to save %r for model %r and key %r" % ( pks, model, key)) cls.check_primary_keys(model, *pks.keys()) vals = dict(model=model, key=key, primary_key=pks) if blokname is not None: vals['blokname'] = blokname return cls.insert(**vals)
[docs] @classmethod def set(cls, key, instance, raiseifexist=True, blokname=None): """ Add or update a mmping with a model and a external key :param model: model to check :param key: string of the key :param instance: instance of the model to save :param raiseifexist: boolean (True by default), if True and the entry exist then an exception is raised :param blokname: name of the blok where come from the mapping :exception: IOMappingSetException """ pks = instance.to_primary_keys() return cls.set_primary_keys(instance.__registry_name__, key, pks, blokname=blokname, raiseifexist=raiseifexist)
[docs] @classmethod def get(cls, model, key): """ return instance of the model with this external key :param model: model of the mapping :param key: string of the key :rtype: instance of the model """ pks = cls.get_mapping_primary_keys(model, key) if pks is None: return None return cls.get_model(model).from_primary_keys(**pks)
@classmethod def get_from_model_and_primary_keys(cls, model, pks): query = cls.query().filter(cls.model == model) for mapping in query.all(): if mapping.primary_key == pks: return mapping return None @classmethod def get_from_entry(cls, entry): return cls.get_from_model_and_primary_keys( entry.__registry_name__, entry.to_primary_keys()) @classmethod def __get_models(cls, models): """Return models name if models is not: return all the existing model if models is a list of instance model, convert them :params models: list of model """ if models is None: models = cls.registry.System.Model.query().all().name elif not isinstance(models, (list, tuple)): models = [models] return [m.__registry_name__ if hasattr(m, '__registry_name__') else m for m in models]
[docs] @classmethod def clean(cls, bloknames=None, models=None): """Clean all mapping with removed object linked:: Mapping.clean(bloknames=['My blok']) .. warning:: For filter only the no blokname:: Mapping.clean(bloknames=[None]) :params bloknames: filter by blok, keep the order to remove the mapping :params models: filter by model, keep the order to remove the mapping """ if bloknames is None: bloknames = cls.registry.System.Blok.query().all().name + [None] elif not isinstance(bloknames, (list, tuple)): bloknames = [bloknames] models = cls.__get_models(models) removed = 0 for blokname in bloknames: for model in models: query = cls.query().filter_by(blokname=blokname, model=model) for key in query.all().key: if cls.get(model, key) is None: cls.delete(model, key) removed += 1 return removed
[docs] @classmethod def delete_for_blokname(cls, blokname, models=None, byquery=False): """Clean all mapping with removed object linked:: Mapping.clean('My blok') .. warning:: For filter only the no blokname:: Mapping.clean(None) :params blokname: filter by blok :params models: filter by model, keep the order to remove the mapping """ models = cls.__get_models(models) removed = 0 for model in models: query = cls.query().filter_by(blokname=blokname, model=model) for key in query.all().key: if cls.get(model, key): cls.delete(model, key, mapping_only=False, byquery=byquery) cls.registry.flush() removed += 1 return removed