Source code for anyblok.mapper

# This file is a part of the AnyBlok project
#
#    Copyright (C) 2014 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#    Copyright (C) 2019 Jean-Sebastien SUZANNE <js.suzanne@gmail.com>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
from sqlalchemy.schema import ForeignKey

from anyblok.common import anyblok_column_prefix

from .config import Configuration


def get_for(basekey, path, default=None):
    key = "%s.%s" % (basekey, ".".join(path))
    key = key.lower()

    if path in (["Model", "*"], ["Mixin", "*"]):
        return Configuration.get(key, default)

    res = Configuration.get(key, None)
    if res is not None:
        return res  # pragma: no cover

    new_path = path[:-1]
    if path[-1] == "*":
        new_path[-1] = "*"
    else:
        new_path.append("*")

    return get_for(basekey, new_path, default=default)


def get_schema_for(path, default=None):
    return get_for("db_schema", path, default=default)


def get_schema_prefix_for(path, default=None):
    return get_for("prefix_db_schema", path, default=default)


def get_schema_suffix_for(path, default=None):
    return get_for("suffix_db_schema", path, default=default)


def format_schema(schema, registry_name):
    path = registry_name.split(".")
    schema = get_schema_for(path, default=schema)
    if schema is not None:
        prefix = get_schema_prefix_for(path, default="")
        suffix = get_schema_suffix_for(path, default="")

        return prefix + schema + suffix

    return schema


[docs]class ModelReprException(Exception): """Exception for Model attribute"""
[docs]class ModelAttributeException(Exception): """Exception for Model attribute"""
[docs]class ModelAttributeAdapterException(Exception): """Exception for Model attribute adapter"""
[docs]class MapperException(AttributeError): """Simple Exception for Mapper"""
class FakeColumn: db_column_name = None def update_description(self, registry, model, res): pass class FakeRelationShip: def __init__(self, mapper): self.mapper = mapper def update_description(self, registry, model, res): pass
[docs]class ModelAttribute: """The Model attribute represente the using of a declared attribute, in the goal of get the real attribute after of the foreign_key:: ma = ModelAttribute('registry name', 'attribute name') """ def __repr__(self): return "%s => %s" % (self.model_name, self.attribute_name) def __str__(self): return "%s => %s" % (self.model_name, self.attribute_name) def __init__(self, model_name, attribute_name): if not model_name or not attribute_name: raise ModelAttributeException( "model_name (%s) and attribute_name (%s) are " "required" % (model_name, attribute_name) ) self.model_name = model_name self.attribute_name = attribute_name self._options = {}
[docs] def get_attribute(self, registry, usehybrid=True): """Return the assembled attribute, the model need to be assembled :param registry: instance of the registry :param usehybrid: if True return the hybrid property if exist :rtype: instance of the attribute :exceptions: ModelAttributeException """ if self.model_name not in registry.loaded_namespaces: raise ModelAttributeException( "Unknow model %r, maybe the model doesn't exist or is not" "assembled yet" % self.model_name ) Model = registry.get(self.model_name) if not hasattr(Model, self.attribute_name): raise ModelAttributeException( "Model %r has not get %r attribute" % (self.model_name, self.attribute_name) ) attribute_name = self.attribute_name if not usehybrid: if attribute_name in Model.hybrid_property_columns: attribute_name = anyblok_column_prefix + attribute_name return getattr(Model, attribute_name)
[docs] def get_type(self, registry): """Return the foreign key which represent the attribute in the data base :param registry: instance of the sqlalchemy ForeignKey :rtype: instance of the attribute """ Model = self.check_model_in_first_step(registry) column_name = self.check_column_in_first_step(registry, Model) col = registry.loaded_namespaces_first_step[self.model_name][ column_name ] return col
[docs] def get_fk_column(self, registry): """Return the foreign key which represent the attribute in the data base :param registry: instance of the sqlalchemy ForeignKey :rtype: instance of the attribute """ mapper = self.get_fk_mapper(registry) if mapper: return mapper.attribute_name return None
[docs] def get_fk_mapper(self, registry): """Return the foreign key which represent the attribute in the data base :param registry: instance of the sqlalchemy ForeignKey :rtype: instance of the attribute """ Model = self.check_model_in_first_step(registry) try: column_name = self.check_column_in_first_step(registry, Model) if Model[column_name].foreign_key: return Model[column_name].foreign_key except ModelAttributeException: pass return None
[docs] def get_fk(self, registry): """Return the foreign key which represent the attribute in the data base :param registry: instance of the sqlalchemy ForeignKey :rtype: instance of the attribute """ return ForeignKey(self.get_fk_name(registry), **self._options)
[docs] def options(self, **kwargs): """Add foreign key options to create the sqlalchemy ForeignKey :param **kwargs: options :rtype: the instance of the ModelAttribute """ self._options.update(kwargs) return self
[docs] def get_fk_name(self, registry, with_schema=True): """Return the name of the foreign key the need of foreign key may be before the creation of the model in the registry, so we must use the first step of assembling :param registry: instance of the registry :rtype: str of the foreign key (tablename.columnname) :exceptions: ModelAttributeException """ Model = self.check_model_in_first_step(registry) column_name = self.check_column_in_first_step(registry, Model) tablename = Model["__tablename__"] if Model[self.attribute_name].db_column_name: column_name = Model[self.attribute_name].db_column_name if with_schema and Model.get("__db_schema__"): return "%s.%s.%s" % (Model["__db_schema__"], tablename, column_name) return tablename + "." + column_name
[docs] def get_complete_name(self, registry): """Return the name of the foreign key the need of foreign key may be before the creation of the model in the registry, so we must use the first step of assembling :param registry: instance of the registry :rtype: str of the foreign key (modelname.columnname) :exceptions: ModelAttributeException """ Model = self.check_model_in_first_step(registry) column_name = self.check_column_in_first_step(registry, Model) modelname = self.model_name.replace(".", "") return modelname + "." + column_name
def get_fk_remote(self, registry): Model = self.check_model_in_first_step(registry) column_name = self.check_column_in_first_step(registry, Model) remote = Model[column_name].foreign_key if not remote: return None return remote.get_fk_name(registry) def get_complete_remote(self, registry): Model = self.check_model_in_first_step(registry) column_name = self.check_column_in_first_step(registry, Model) remote = Model[column_name].foreign_key if not remote: return None return remote.get_complete_name(registry) def add_fake_column(self, registry): Model = self.check_model_in_first_step(registry) if self.attribute_name in Model: return Model[self.attribute_name] = FakeColumn() def add_fake_relationship(self, registry, namespace, fieldname): Model = self.check_model_in_first_step(registry) if self.attribute_name in Model: return Model[self.attribute_name] = FakeRelationShip( ModelAttribute(namespace, fieldname) )
[docs] def get_column_name(self, registry): """Return the name of the column the need of foreign key may be before the creation of the model in the registry, so we must use the first step of assembling :param registry: instance of the registry :rtype: str of the foreign key (tablename.columnname) :exceptions: ModelAttributeException """ Model = self.check_model_in_first_step(registry) column_name = self.check_column_in_first_step(registry, Model) if hasattr(Model[self.attribute_name], "db_column_name"): if Model[self.attribute_name].db_column_name: column_name = Model[self.attribute_name].db_column_name return column_name
def check_model_in_first_step(self, registry): if self.model_name not in registry.loaded_namespaces_first_step: raise ModelAttributeException("Unknow model %r" % self.model_name) Model = registry.loaded_namespaces_first_step[self.model_name] if len(Model.keys()) == 3: # (__depends__, __db_schema__, __tablename__) # No column found, so is not an sql model raise ModelAttributeException( "The Model %r is not an SQL Model" % self.model_name ) return Model def check_column_in_first_step(self, registry, Model): if self.attribute_name not in Model: raise ModelAttributeException( "the Model %r has not got attribute %r" % (self.model_name, self.attribute_name) ) return self.attribute_name def is_declared(self, registry): Model = self.check_model_in_first_step(registry) if self.attribute_name not in Model: return False return True def native_type(self, registry): Model = self.check_model_in_first_step(registry) column_name = self.check_column_in_first_step(registry, Model) return Model[column_name].native_type(registry)
[docs]class ModelRepr: """Pseudo class to represent a model :: mr = ModelRepr('registry name') """ def __init__(self, model_name): if not model_name: raise ModelReprException("model_name (%s) is required" % model_name) self.model_name = model_name def __str__(self): return self.model_name
[docs] def check_model(self, registry): """Check if the model exist else raise an exception :param registry: instance of the registry :rtype: dict which represent the first step of the model :exceptions: ModelReprException """ if self.model_name not in registry.loaded_namespaces_first_step: raise ModelReprException("Model %r unexisting" % self.model_name) return registry.loaded_namespaces_first_step[self.model_name]
[docs] def tablename(self, registry, with_schema=True): """Return the real tablename of the Model :param registry: instance of the registry :rtype: string """ Model = self.check_model(registry) if with_schema and Model.get("__db_schema__"): return Model["__db_schema__"] + "." + Model["__tablename__"] return Model["__tablename__"]
[docs] def modelname(self, registry): """Return the real tablename of the Model :param registry: instance of the registry :rtype: string """ self.check_model(registry) return self.model_name.replace(".", "")
[docs] def primary_keys(self, registry): """Return the of the primary keys :param registry: instance of the registry :rtype: list of ModelAttribute """ from anyblok.column import Column Model = self.check_model(registry) pks = [] for k, v in Model.items(): if isinstance(v, Column): if v.kwargs.get("primary_key") is True: pks.append(ModelAttribute(self.model_name, k)) return pks
[docs] def foreign_keys_for(self, registry, remote_model): """Return the of the primary keys :param registry: instance of the registry :rtype: list of ModelAttribute """ from anyblok.column import Column Model = self.check_model(registry) fks = [] for k, v in Model.items(): if isinstance(v, Column): if v.foreign_key: if v.foreign_key.model_name == remote_model: fks.append(ModelAttribute(self.model_name, k)) return fks
[docs] def many2one_for(self, registry, remote_model): """Return the many2one links to the remote_model :param registry: instance of the registry :rtype: list of many2one field """ from anyblok.relationship import Many2One Model = self.check_model(registry) many2ones = [] for k, v in Model.items(): if isinstance(v, Many2One): if v.model.model_name == remote_model: many2ones.append((k, v)) return many2ones
[docs]def ModelAttributeAdapter(Model): """Return a ModelAttribute :param Model: ModelAttribute or string ('registry name'=>'attribute name') :rtype: instance of ModelAttribute :exceptions: ModelAttributeAdapterException """ if isinstance(Model, str): if "=>" not in Model: raise ModelAttributeAdapterException( "Wrong value %r impossible to find model and attribtue" % (Model) ) model, attribute = Model.split("=>") return ModelAttribute(model.strip(), attribute.strip()) else: return Model
[docs]def ModelAdapter(Model): """Return a ModelRepr :param Model: ModelRepr or string :rtype: instance of ModelRepr :exceptions: ModelAdapterException """ if isinstance(Model, str): return ModelRepr(Model) elif isinstance(Model, ModelRepr): return Model else: return ModelRepr(Model.__registry_name__)
[docs]class ModelMapper: sqlalchemy_known_events = [ "after_delete", "after_insert", "after_update", "append_result", "before_delete", "before_insert", "before_update", "create_instance", "expire", "first_init", "init", "load", "refresh", ] def __init__(self, mapper, event, *args, **kwargs): if isinstance(mapper, str): self.model = ModelRepr(mapper) elif isinstance(mapper, ModelRepr): self.model = mapper elif hasattr(mapper, "__registry_name__"): self.model = ModelRepr(mapper.__registry_name__) self.event = event self.args = args self.kwargs = kwargs @classmethod def capable(cls, mapper): if isinstance(mapper, str): return True elif isinstance(mapper, ModelRepr): return True elif hasattr(mapper, "__registry_name__"): return True return False def listen(self, method): if self.event in self.sqlalchemy_known_events: method.is_an_sqlalchemy_event_listener = True method.sqlalchemy_listener = self else: method.is_an_event_listener = True method.model = self.model.model_name method.event = self.event def mapper(self, registry, namespace, **kwargs): model = self.model if self.model.model_name.upper() == "SELF": model = ModelRepr(namespace) model.check_model(registry) return registry.get(model.model_name)
[docs]class ModelAttributeMapper: def __init__(self, mapper, event, *args, **kwargs): if isinstance(mapper, str): self.attribute = ModelAttribute(*mapper.split("=>")) elif isinstance(mapper, ModelAttribute): self.attribute = mapper self.event = event self.args = args self.kwargs = kwargs @classmethod def capable(cls, mapper): if isinstance(mapper, str): if "=>" in mapper: model, column = mapper.split("=>") if model and column: return True elif isinstance(mapper, ModelAttribute): return True return False def listen(self, method): method.is_an_sqlalchemy_event_listener = True method.sqlalchemy_listener = self def mapper(self, registry, namespace, usehybrid=True): attribute = self.attribute if self.attribute.model_name.upper() == "SELF": attribute = ModelAttribute( # pragma: no cover namespace, self.attribute.attribute_name ) return attribute.get_attribute(registry, usehybrid=usehybrid)
[docs]def MapperAdapter(mapper, *args, **kwargs): if ModelAttributeMapper.capable(mapper): return ModelAttributeMapper(mapper, *args, **kwargs) elif ModelMapper.capable(mapper): return ModelMapper(mapper, *args, **kwargs) else: raise MapperException("Unknow mapper type %r")