Source code for anyblok.model

# This file is a part of the AnyBlok project
#
#    Copyright (C) 2014 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#    Copyright (C) 2017 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#    Copyright (C) 2018 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#    Copyright (C) 2019 Jean-Sebastien SUZANNE <js.suzanne@gmail.com>
#    Copyright (C) 2021 Jean-Sebastien SUZANNE <js.suzanne@gmail.com>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
import inspect
from copy import deepcopy

from sqlalchemy import inspection
from sqlalchemy.orm import declared_attr
from texttable import Texttable

from anyblok import Declarations
from anyblok.column import Column
from anyblok.common import TypeList, anyblok_column_prefix
from anyblok.field import Field, FieldException
from anyblok.mapper import ModelAttribute, format_schema
from anyblok.registry import RegistryManager
from anyblok.relationship import RelationShip

from .exceptions import ModelException
from .factory import ModelFactory, has_sql_fields
from .plugins import get_model_plugins


def has_sqlalchemy_fields(base):
    for p in base.__dict__.keys():
        attr = base.__dict__[p]
        if inspection.inspect(attr, raiseerr=False) is not None:
            return True

    return False


def is_in_mro(cls, attr):
    return cls in attr.__class__.__mro__


def get_fields(
    base,
    without_relationship=False,
    only_relationship=False,
    without_column=False,
):
    """Return the fields for a model

    :param base: Model Class
    :param without_relationship: Do not return the relationship field
    :param only_relationship: return only the relationship field
    :param without_column: Do not return the column field
    :rtype: dict with name of the field in key and instance of Field in value
    """
    fields = {}
    for p in base.__dict__:
        if p.startswith("__"):
            continue

        try:
            attr = getattr(base, p)
            if hasattr(attr, "__class__"):
                if without_relationship and is_in_mro(RelationShip, attr):
                    continue

                if without_column and is_in_mro(Column, attr):
                    continue

                if only_relationship and not is_in_mro(RelationShip, attr):
                    continue

                if is_in_mro(Field, attr):
                    fields[p] = attr

        except FieldException:  # pragma: no cover
            pass

    return fields


def autodoc_fields(declaration_cls, model_cls):  # pragma: no cover
    """Produces autodocumentation table for the fields.

    Exposed as a function in order to be reusable by a simple export,
    e.g., from anyblok.mixin.
    """
    if not has_sql_fields([model_cls]):
        return ""

    rows = [["Fields", ""]]
    rows.extend([x, y.autodoc()] for x, y in get_fields(model_cls).items())
    table = Texttable(max_width=0)
    table.set_cols_valign(["m", "t"])
    table.add_rows(rows)
    return table.draw() + "\n\n"


def update_factory(kwargs):
    if "factory" in kwargs:
        kwargs["__model_factory__"] = kwargs.pop("factory")


[docs]@Declarations.add_declaration_type( isAnEntry=True, pre_assemble="pre_assemble_callback", assemble="assemble_callback", initialize="initialize_callback", ) class Model: """The Model class is used to define or inherit an SQL table. Add new model class:: @Declarations.register(Declarations.Model) class MyModelclass: pass Remove a model class:: Declarations.unregister(Declarations.Model.MyModelclass, MyModelclass) There are three Model families: * No SQL Model: These models have got any field, so any table * SQL Model: * SQL View Model: it is a model mapped with a SQL View, the insert, update delete method are forbidden by the database Each model has a: * registry name: compose by the parent + . + class model name * table name: compose by the parent + '_' + class model name The table name can be overloaded by the attribute tablename. the wanted value are a string (name of the table) of a model in the declaration. ..warning:: Two models can have the same table name, both models are mapped on the table. But they must have the same column. """ autodoc_anyblok_kwargs = True autodoc_anyblok_bases = True autodoc_anyblok_fields = True @classmethod def pre_assemble_callback(cls, registry): plugins_by = {} for plugin in get_model_plugins(registry): for attr, func in inspect.getmembers( plugin, predicate=inspect.ismethod ): by = plugins_by.setdefault(attr, []) by.append(func) def call_plugins(method, *args, **kwargs): """call the method on each plugin""" for func in plugins_by.get(method, []): func(*args, **kwargs) registry.call_plugins = call_plugins
[docs] @classmethod def register(self, parent, name, cls_, **kwargs): """add new sub registry in the registry :param parent: Existing global registry :param name: Name of the new registry to add it :param cls_: Class Interface to add in registry """ _registryname = parent.__registry_name__ + "." + name if "tablename" in kwargs: tablename = kwargs.pop("tablename") if not isinstance(tablename, str): tablename = tablename.__tablename__ elif hasattr(parent, name): tablename = getattr(parent, name).__tablename__ else: if parent is Declarations or parent is Declarations.Model: tablename = name.lower() elif hasattr(parent, "__tablename__"): tablename = parent.__tablename__ tablename += "_" + name.lower() if not hasattr(parent, name): p = { "__tablename__": tablename, "__registry_name__": _registryname, "use": lambda x: ModelAttribute(_registryname, x), } ns = type(name, tuple(), p) setattr(parent, name, ns) if parent is Declarations: return # pragma: no cover kwargs["__registry_name__"] = _registryname kwargs["__tablename__"] = tablename update_factory(kwargs) RegistryManager.add_entry_in_register( "Model", _registryname, cls_, **kwargs ) setattr(cls_, "__anyblok_kwargs__", kwargs)
[docs] @classmethod def unregister(self, entry, cls_): """Remove the Interface from the registry :param entry: entry declaration of the model where the ``cls_`` must be removed :param cls_: Class Interface to remove in registry """ RegistryManager.remove_in_register(cls_)
[docs] @classmethod def declare_field( cls, registry, name, field, namespace, properties, transformation_properties, ): """Declare the field/column/relationship to put in the properties of the model :param registry: the current registry :param name: name of the field / column or relationship :param field: the declaration field / column or relationship :param namespace: the namespace of the model :param properties: the properties of the model """ if name in properties["loaded_columns"]: return if field.must_be_copied_before_declaration(): field = deepcopy(field) attr_name = name if field.use_hybrid_property: attr_name = anyblok_column_prefix + name if field.must_be_declared_as_attr(): # All the declaration are seen as mixin for sqlalchemy # some of them need de be defered for the initialisation # cause of the mixin as relation ship and column with foreign key def wrapper(cls): return field.get_sqlalchemy_mapping( registry, namespace, name, properties ) properties[attr_name] = declared_attr(wrapper) properties[attr_name].anyblok_field = field else: properties[attr_name] = field.get_sqlalchemy_mapping( registry, namespace, name, properties ) if field.use_hybrid_property: properties[name] = field.get_property( registry, namespace, name, properties ) properties[name].sqla_column = properties[attr_name] properties["hybrid_property_columns"].append(name) def field_description(): return registry.get(namespace).fields_description(name)[name] def from_model(): return registry.get(namespace) properties[name].anyblok_field_name = name properties[name].anyblok_registry_name = namespace properties[name].field_description = field_description properties[name].from_model = from_model registry.call_plugins( "declare_field", name, field, namespace, properties, transformation_properties, ) properties["loaded_columns"].append(name) field.update_properties(registry, namespace, name, properties)
[docs] @classmethod def transform_base(cls, registry, namespace, base, properties): """Detect specific declaration which must define by registry :param registry: the current registry :param namespace: the namespace of the model :param base: One of the base of the model :param properties: the properties of the model :rtype: new base """ new_type_properties = {} for attr, method in inspect.getmembers(base): if attr in ("registry", "anyblok", "_sa_registry"): continue if attr.startswith("__"): continue registry.call_plugins( "transform_base_attribute", attr, method, namespace, base, properties, new_type_properties, ) registry.call_plugins( "transform_base", namespace, base, properties, new_type_properties ) if new_type_properties: return [type(namespace, (), new_type_properties), base] return [base]
[docs] @classmethod def insert_in_bases( cls, registry, namespace, bases, transformation_properties, properties ): """Add in the declared namespaces new base. :param registry: the current registry :param namespace: the namespace of the model :param base: One of the base of the model :param transformation_properties: the properties of the model :param properties: assembled attributes of the namespace """ new_base = type(namespace, (), {}) bases.insert(0, new_base) registry.call_plugins( "insert_in_bases", new_base, namespace, properties, transformation_properties, )
@classmethod def raise_if_has_sqlalchemy(cls, base): if has_sqlalchemy_fields(base): raise ModelException( "the base %r have an SQLAlchemy attribute" % base )
[docs] @classmethod def load_namespace_first_step(cls, registry, namespace): """Return the properties of the declared bases for a namespace. This is the first step because some actions need to known all the properties :param registry: the current registry :param namespace: the namespace of the model :rtype: dict of the known properties """ if namespace in registry.loaded_namespaces_first_step: return registry.loaded_namespaces_first_step[namespace] properties = { "__depends__": set(), "__db_schema__": format_schema(None, namespace), } ns = registry.loaded_registries[namespace] for b in ns["bases"]: cls.raise_if_has_sqlalchemy(b) for b_ns in b.__anyblok_bases__: if b_ns.__registry_name__.startswith("Model."): properties["__depends__"].add(b_ns.__registry_name__) ps = cls.load_namespace_first_step( registry, b_ns.__registry_name__ ) ps = ps.copy() ps.update(properties) properties.update(ps) fields = get_fields(b) for p, f in fields.items(): if p not in properties: properties[p] = f if hasattr(b, "__db_schema__"): properties["__db_schema__"] = format_schema( b.__db_schema__, namespace ) if "__tablename__" in ns["properties"]: properties["__tablename__"] = ns["properties"]["__tablename__"] registry.loaded_namespaces_first_step[namespace] = properties return properties
@classmethod def apply_inheritance_base( cls, registry, namespace, ns, bases, realregistryname, properties, transformation_properties, ): # remove doublon for b in ns["bases"]: if b in bases: continue kwargs = {"namespace": realregistryname} if realregistryname else {} bases.append(b, **kwargs) if b.__doc__ and "__doc__" not in properties: properties["__doc__"] = b.__doc__ for b_ns in b.__anyblok_bases__: brn = b_ns.__registry_name__ if brn in registry.loaded_registries["Mixin_names"]: tp = transformation_properties if realregistryname: bs, ps = cls.load_namespace_second_step( registry, brn, realregistryname=realregistryname, transformation_properties=tp, ) else: bs, ps = cls.load_namespace_second_step( registry, brn, realregistryname=namespace, transformation_properties=tp, ) elif brn in registry.loaded_registries["Model_names"]: bs, ps = cls.load_namespace_second_step(registry, brn) else: raise ModelException( # pragma: no cover "You have not to inherit the %r " "Only the 'Mixin' and %r types are allowed" % (brn, cls.__name__) ) bases += bs @classmethod def init_core_properties_and_bases(cls, registry, bases, properties): properties["loaded_columns"] = [] properties["hybrid_property_columns"] = [] properties["loaded_fields"] = {} properties["__model_factory__"].insert_core_bases(bases, properties) @classmethod def declare_all_fields( cls, registry, namespace, bases, properties, transformation_properties ): # do in the first time the fields and columns # because for the relationship on the same model # the primary keys must exist before the relationship # load all the base before do relationship because primary key # can be come from inherit for b in bases: for p, f in get_fields(b, without_relationship=True).items(): cls.declare_field( registry, p, f, namespace, properties, transformation_properties, ) for b in bases: for p, f in get_fields(b, only_relationship=True).items(): cls.declare_field( registry, p, f, namespace, properties, transformation_properties, ) @classmethod def apply_existing_table( cls, registry, namespace, tablename, properties, bases, transformation_properties, ): if "__tablename__" in properties: del properties["__tablename__"] for t in registry.loaded_namespaces.keys(): m = registry.loaded_namespaces[t] if m.is_sql: if getattr(m, "__tablename__"): if m.__tablename__ == tablename: properties["__table__"] = m.__table__ tablename = namespace.replace(".", "_").lower() for b in bases: for p, f in get_fields( b, without_relationship=True, without_column=True ).items(): cls.declare_field( registry, p, f, namespace, properties, transformation_properties, )
[docs] @classmethod def load_namespace_second_step( cls, registry, namespace, realregistryname=None, transformation_properties=None, ): """Return the bases and the properties of the namespace :param registry: the current registry :param namespace: the namespace of the model :param realregistryname: the name of the model if the namespace is a mixin :rtype: the list od the bases and the properties :exception: ModelException """ if namespace in registry.loaded_namespaces: return [registry.loaded_namespaces[namespace]], {} if transformation_properties is None: transformation_properties = {} bases = TypeList(cls, registry, namespace, transformation_properties) ns = registry.loaded_registries[namespace] properties = ns["properties"].copy() first_step = registry.loaded_namespaces_first_step[namespace] properties["__depends__"] = first_step["__depends__"] properties["__db_schema__"] = first_step.get("__db_schema__", None) registry.call_plugins( "initialisation_tranformation_properties", properties, transformation_properties, ) properties["__model_factory__"] = properties.get( "__model_factory__", ModelFactory )(registry) cls.apply_inheritance_base( registry, namespace, ns, bases, realregistryname, properties, transformation_properties, ) if namespace in registry.loaded_registries["Model_names"]: tablename = properties["__tablename__"] modelname = namespace.replace(".", "") cls.init_core_properties_and_bases(registry, bases, properties) if tablename in registry.declarativebase.metadata.tables: cls.apply_existing_table( registry, namespace, tablename, properties, bases, transformation_properties, ) else: cls.declare_all_fields( registry, namespace, bases, properties, transformation_properties, ) bases.append(registry.registry_base) cls.insert_in_bases( registry, namespace, bases, transformation_properties, properties, ) bases = [ properties["__model_factory__"].build_model( modelname, bases, properties ) ] properties = {} registry.add_in_registry(namespace, bases[0]) registry.loaded_namespaces[namespace] = bases[0] registry.call_plugins( "after_model_construction", bases[0], namespace, transformation_properties, ) return bases, properties
[docs] @classmethod def assemble_callback(cls, registry): """Assemble callback is called to assemble all the Model from the installed bloks :param registry: registry to update """ registry.loaded_namespaces_first_step = {} registry.loaded_views = {} # get all the information to create a namespace for namespace in registry.loaded_registries["Model_names"]: cls.load_namespace_first_step(registry, namespace) # create the namespace with all the information come from first # step for namespace in registry.loaded_registries["Model_names"]: cls.load_namespace_second_step(registry, namespace)
[docs] @classmethod def initialize_callback(cls, registry): """initialize callback is called after assembling all entries This callback updates the database information about * Model * Column * RelationShip :param registry: registry to update """ for Model in registry.loaded_namespaces.values(): Model.initialize_model() if not registry.loadwithoutmigration: Model.clear_all_model_caches() if registry.loadwithoutmigration: return False Blok = registry.System.Blok if not registry.withoutautomigration: registry.update_blok_list() bloks = Blok.list_by_state("touninstall") Blok.uninstall_all(*bloks) return Blok.apply_state(*registry.ordered_loaded_bloks)