Source code for anyblok.column

# This file is a part of the AnyBlok project
#
#    Copyright (C) 2016 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#    Copyright (C) 2017 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#    Copyright (C) 2018 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#    Copyright (C) 2019 Jean-Sebastien SUZANNE <js.suzanne@gmail.com>
#    Copyright (C) 2020 Jean-Sebastien SUZANNE <js.suzanne@gmail.com>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
import decimal
import time
from base64 import b64decode, b64encode
from datetime import date, datetime, timedelta
from hashlib import md5
from inspect import ismethod
from json import dumps, loads
from logging import getLogger

import pytz
from dateutil.parser import parse
from sqlalchemy import CheckConstraint, types
from sqlalchemy.schema import Column as SA_Column
from sqlalchemy.schema import Sequence as SA_Sequence
from sqlalchemy_utils import JSONType
from sqlalchemy_utils.types.color import ColorType
from sqlalchemy_utils.types.email import EmailType
from sqlalchemy_utils.types.encrypted.encrypted_type import StringEncryptedType
from sqlalchemy_utils.types.password import Password as SAU_PWD
from sqlalchemy_utils.types.password import PasswordType
from sqlalchemy_utils.types.phone_number import PhoneNumberType
from sqlalchemy_utils.types.scalar_coercible import ScalarCoercible
from sqlalchemy_utils.types.url import URLType
from sqlalchemy_utils.types.uuid import UUIDType

from anyblok.config import Configuration

from .common import sgdb_in
from .field import Field, FieldException
from .mapper import ModelAttribute, ModelAttributeAdapter

pycountry = None
python_pycountry_type = None
try:
    import pycountry

    if not pycountry.countries._is_loaded:
        pycountry.countries._load()

    python_pycountry_type = pycountry.countries.data_class
except ImportError:
    pass


logger = getLogger(__name__)


def wrap_default(registry, namespace, default_val):
    """Return default wrapper

    :param registry: the current registry
    :param namespace: the namespace of the model
    :param default_val:
    :return: default wrapper
    """

    def wrapper():
        """Return wrapper

        :return: default val
        """
        Model = registry.get(namespace)
        if hasattr(Model, default_val):
            func = getattr(Model, default_val)
            if ismethod(func):
                if default_val not in Model.loaded_columns:
                    if default_val not in Model.loaded_fields:
                        return func()
                    else:
                        logger.warning(
                            "The attribute %r is already declared as a default "
                            "value on the Model %r, a field with the same name "
                            "already exists" % (default_val, namespace)
                        )
                else:
                    logger.warning(
                        "The attribute %r is already declared as a default "
                        "value on the Model %r, a column with the same name "
                        "already exists" % (default_val, namespace)
                    )
            else:
                logger.warning(
                    "The attribute %r is already declared as a default "
                    "value on the Model %r, an instance method with the same "
                    "name already exists" % (default_val, namespace)
                )

        return default_val

    return wrapper


class ColumnDefaultValue:
    def __init__(self, callable):
        self.callable = callable

    def get_default_callable(self, registry, namespace, fieldname, properties):
        """Get default callable

        :param registry: the current registry
        :param namespace: the namespace of the model
        :param fieldname: the fieldname of the model
        :param properties: the properties of the model
        :return: default callable
        """
        return self.callable(registry, namespace, fieldname, properties)


class CompareType:
    comparators = []

    @classmethod
    def default_comparator(cls, col1, type1, col2, type2):
        if type1.__class__ is not type2.__class__:
            raise FieldException(
                "You can't add a foreign key using columns with different "
                "types {model1!s}.{col1!s}` pointing to `{model2!s}.{col2!s}` "
                "have different types  {type1!r} -> {type2!r}".format(
                    model1=col1.model_name,
                    col1=col1.attribute_name,
                    model2=col2.model_name,
                    col2=col2.attribute_name,
                    type1=type1.__class__,
                    type2=type2.__class__,
                )
            )

    @classmethod
    def add_comparator(cls, type1, type2):
        def wrapper(funct):
            cls.comparators.append((type1, type2, funct))
            return funct

        return wrapper

    @classmethod
    def validate(cls, col1, type1, col2, type2):
        for cls1, cls2, funct in cls.comparators:
            if type1.__class__ is cls1 and type2.__class__ is cls2:
                funct(col1, type1, col2, type2)
                return

        cls.default_comparator(col1, type1, col2, type2)


class NoDefaultValue:
    pass


[docs]class Column(Field): """Column class This class can't be instantiated """ use_hybrid_property = True foreign_key = None sqlalchemy_type = None type = None def __init__(self, *args, **kwargs): """Initialize the column :param label: label of this field :type label: str """ self.forbid_instance(Column) assert self.sqlalchemy_type self.sequence = None if "type_" in kwargs: del kwargs["type_"] if "foreign_key" in kwargs: self.foreign_key = ModelAttributeAdapter(kwargs.pop("foreign_key")) if "sequence" in kwargs: self.sequence = SA_Sequence(kwargs.pop("sequence")) self.db_column_name = None if "db_column_name" in kwargs: self.db_column_name = kwargs.pop("db_column_name") self.default_val = NoDefaultValue if "default" in kwargs: self.default_val = kwargs.pop("default") self.encrypt_key = kwargs.pop("encrypt_key", None) super(Column, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self): """Return properties list for autodoc :return: autodoc properties """ res = super(Column, self).autodoc_get_properties() res["foreign_key"] = self.foreign_key res["DB column name"] = self.db_column_name res["default"] = self.default_val res["is crypted"] = True if self.encrypt_key else False return res
autodoc_omit_property_values = Field.autodoc_omit_property_values.union( ( ("foreign_key", None), ("DB column name", None), ("default", None), ("is crypted", False), ) )
[docs] def native_type(self, registry): """Return the native SqlAlchemy type :param registry: :rtype: sqlalchemy native type """ return self.sqlalchemy_type
[docs] def format_foreign_key(self, registry, namespace, fieldname, args, kwargs): """Format a foreign key :param registry: the current registry :param args: :param kwargs: :return: """ if self.foreign_key: CompareType.validate( ModelAttribute(namespace, fieldname), self, self.foreign_key, self.foreign_key.get_type(registry), ) args = args + (self.foreign_key.get_fk(registry),) kwargs["info"].update( { "foreign_key": self.foreign_key.get_fk_name(registry), "remote_model": self.foreign_key.model_name, } ) return args
[docs] def get_sqlalchemy_mapping( self, registry, namespace, fieldname, properties ): """Return the instance of the real field :param registry: current registry :param namespace: name of the model :param fieldname: name of the field :param properties: known properties of the model :rtype: sqlalchemy column instance """ self.format_label(fieldname) args = self.args kwargs = self.kwargs.copy() if "info" not in kwargs: kwargs["info"] = {} args = self.format_foreign_key( registry, namespace, fieldname, args, kwargs ) kwargs["info"]["label"] = self.label if self.sequence: args = (self.sequence,) + args if self.db_column_name: db_column_name = self.db_column_name else: db_column_name = fieldname if self.default_val is not NoDefaultValue: if isinstance(self.default_val, str): kwargs["default"] = wrap_default( registry, namespace, self.default_val ) elif isinstance(self.default_val, ColumnDefaultValue): kwargs["default"] = self.default_val.get_default_callable( registry, namespace, fieldname, properties ) else: kwargs["default"] = self.default_val sqlalchemy_type = self.native_type(registry) if self.encrypt_key: encrypt_key = self.format_encrypt_key(registry, namespace) sqlalchemy_type = self.get_encrypt_key_type( registry, sqlalchemy_type, encrypt_key ) return SA_Column(db_column_name, sqlalchemy_type, *args, **kwargs)
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key): sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key) if sgdb_in(registry.engine, ["MySQL", "MariaDB"]): sqlalchemy_type.impl = types.String(64) return sqlalchemy_type
[docs] def format_encrypt_key(self, registry, namespace): """Format and return the encyption key :param registry: the current registry :param namespace: the namespace of the model :return: encrypt key """ encrypt_key = self.encrypt_key if encrypt_key is True: encrypt_key = Configuration.get("default_encrypt_key") if not encrypt_key: raise FieldException( # pragma: no cover "No encrypt_key defined in the configuration" ) def wrapper(): """Return encrypt_key wrapper :return: """ Model = registry.get(namespace) if hasattr(Model, encrypt_key): func = getattr(Model, encrypt_key) if ismethod(func): if encrypt_key not in Model.loaded_columns: if encrypt_key not in Model.loaded_fields: return func() return encrypt_key return wrapper
[docs] def must_be_declared_as_attr(self): """Return True if the column have a foreign key to a remote column""" if self.foreign_key is not None: return True return False
class ForbiddenPrimaryKey: """Mixin to forbid primary key on column type""" def get_sqlalchemy_mapping( self, registry, namespace, fieldname, properties ): if self.kwargs.get("primary_key") is True: raise FieldException( f"{self.__class__} column `{namespace}.{fieldname}` " "are not allowed as primary key" ) return super().get_sqlalchemy_mapping( registry, namespace, fieldname, properties )
[docs]class Integer(Column): """Integer column :: from anyblok.declarations import Declarations from anyblok.column import Integer @Declarations.register(Declarations.Model) class Test: x = Integer(default=1) """ def __init__(self, *args, **kwargs): super(Integer, self).__init__(*args, **kwargs) if self.kwargs.get("primary_key") is True: if "autoincrement" not in self.kwargs: self.kwargs["autoincrement"] = True sqlalchemy_type = types.Integer
[docs]class BigInteger(Column): """Big integer column :: from anyblok.declarations import Declarations from anyblok.column import BigInteger @Declarations.register(Declarations.Model) class Test: x = BigInteger(default=1) """ sqlalchemy_type = types.BigInteger
[docs]class Boolean(Column): """Boolean column :: from anyblok.declarations import Declarations from anyblok.column import Boolean @Declarations.register(Declarations.Model) class Test: x = Boolean(default=True) """ sqlalchemy_type = types.Boolean
[docs]class Float(ForbiddenPrimaryKey, Column): """Float column :: from anyblok.declarations import Declarations from anyblok.column import Float @Declarations.register(Declarations.Model) class Test: x = Float(default=1.0) """ sqlalchemy_type = types.Float
""" Added *process_result_value* at the class *DECIMAL*, because this method is necessary for encrypt the column """ types.DECIMAL.process_result_value = lambda self, value, dialect: value
[docs]class Decimal(ForbiddenPrimaryKey, Column): """Decimal column :: from decimal import Decimal as D from anyblok.declarations import Declarations from anyblok.column import Decimal @Declarations.register(Declarations.Model) class Test: x = Decimal(default=D('1.1')) """ sqlalchemy_type = types.DECIMAL
[docs] def setter_format_value(self, value): """Format the given value to decimal if needed :param value: :return: decimal value """ if value is not None: if self.encrypt_key: value = str(value) elif not isinstance(value, decimal.Decimal): value = decimal.Decimal(value) return value
def getter_format_value(self, value): if value is None: return None # pragma: no cover if self.encrypt_key: value = decimal.Decimal(value) return value
[docs]class Date(Column): """Date column :: from datetime import date from anyblok.declarations import Declarations from anyblok.column import Date @Declarations.register(Declarations.Model) class Test: x = Date(default=date.today()) """ sqlalchemy_type = types.Date
def convert_string_to_datetime(value): """Convert a given value to datetime :param value: :return: datetime value """ if value is None: return None elif isinstance(value, datetime): return value elif isinstance(value, date): return datetime.combine(value, datetime.min.time()) elif isinstance(value, str): return parse(value) raise FieldException("We can't convert this value %s to datetime") def add_timezone_on_datetime(dt, default_timezone): """Convert a datetime considering the default timezone :param dt: :param default_timezone: :return: """ if dt is not None: if dt.tzinfo is None: dt = default_timezone.localize(dt) return dt class DateTimeType(types.TypeDecorator): impl = types.DateTime(timezone=True) cache_ok = True def __init__(self, field): self.default_timezone = field.default_timezone self.field = field def process_bind_param(self, value, engine): value = convert_string_to_datetime(value) value = add_timezone_on_datetime(value, self.default_timezone) if self.field.encrypt_key: return value.isoformat() return value def process_result_value(self, value, dialect): if self.field.encrypt_key: return convert_string_to_datetime(value) return value @property def python_type(self): return datetime # pragma: no cover
[docs]class DateTime(Column): """DateTime column :: from datetime import datetime from anyblok.declarations import Declarations from anyblok.column import DateTime @Declarations.register(Declarations.Model) class Test: x = DateTime(default=datetime.now) """ def __init__(self, *args, **kwargs): self.auto_update = kwargs.pop("auto_update", False) default_timezone = kwargs.pop( "default_timezone", Configuration.get("default_timezone") ) if not default_timezone: default_timezone = time.tzname[0] if isinstance(default_timezone, str): default_timezone = pytz.timezone(default_timezone) self.default_timezone = default_timezone self.sqlalchemy_type = DateTimeType(self) super(DateTime, self).__init__(*args, **kwargs)
[docs] def setter_format_value(self, value): """Return converted and formatted value :param value: :return: """ value = convert_string_to_datetime(value) return add_timezone_on_datetime(value, self.default_timezone)
def getter_format_value(self, value): value = convert_string_to_datetime(value) return add_timezone_on_datetime(value, self.default_timezone)
[docs] def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(Column, self).autodoc_get_properties() res["is auto updated"] = self.auto_update if self.default_timezone: res["default timezone"] = self.default_timezone return res
class TimeStamp(DateTime): """TimeStamp column :: from datetime import datetime from anyblok.declarations import Declarations from anyblok.column import DateTime @Declarations.register(Declarations.Model) class Test: x = TimeStamp(default=datetime.now) """ def __init__(self, *args, **kwargs): super(TimeStamp, self).__init__(*args, **kwargs) self.sqlalchemy_type = types.TIMESTAMP(timezone=True) def getter_format_value(self, value): value = convert_string_to_datetime(value) return add_timezone_on_datetime(value, self.default_timezone)
[docs]class Time(Column): """Time column :: from datetime import time from anyblok.declarations import Declarations from anyblok.column import Time @Declarations.register(Declarations.Model) class Test: x = Time(default=time()) """ sqlalchemy_type = types.Time
[docs]class Interval(Column): """Datetime interval column :: from datetime import timedelta from anyblok.declarations import Declarations from anyblok.column import Interval @Declarations.register(Declarations.Model) class Test: x = Interval(default=timedelta(days=5)) """ sqlalchemy_type = types.Interval
[docs] def native_type(self, registry): if self.encrypt_key: return types.VARCHAR(1024) return self.sqlalchemy_type
def setter_format_value(self, value): if self.encrypt_key: value = dumps( { x: getattr(value, x) for x in ["days", "seconds", "microseconds"] } ) return value def getter_format_value(self, value): if self.encrypt_key: value = timedelta(**loads(value)) return value
class StringType(types.TypeDecorator): impl = types.String cache_ok = True def process_bind_param(self, value, engine): if value is False: value = "" return value def process_result_value(self, value, dialect): return value
[docs]class String(Column): """String column :: from anyblok.declarations import Declarations from anyblok.column import String @Declarations.register(Declarations.Model) class Test: x = String(default='test') """ def __init__(self, *args, **kwargs): self.size = kwargs.pop("size", 64) kwargs.pop("type_", None) self.sqlalchemy_type = StringType(self.size) super(String, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(String, self).autodoc_get_properties() res["size"] = self.size return res
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key): sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key) if sgdb_in(registry.engine, ["MySQL", "MariaDB"]): sqlalchemy_type.impl = types.String(max(self.size, 64)) return sqlalchemy_type
class Enum(Column): """Enum column :: from anyblok.declarations import Declarations from anyblok.column import Enum import enum class MyEnumClass(enum.Enum): one = 1 two = 2 three = 3 @Declarations.register(Declarations.Model) class Test: x = Enum(enum_cls=MyEnumClass, default='test') enum_cls should be an enum class """ def __init__(self, *args, **kwargs): self.enum_cls = kwargs.pop("enum_cls") self.sqlalchemy_type = types.Enum(self.enum_cls) super(Enum, self).__init__(*args, **kwargs) def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(Enum, self).autodoc_get_properties() res["enum_cls"] = repr(self.enum_cls) return res class MsSQLPasswordType(PasswordType): impl = types.VARCHAR(1024) def load_dialect_impl(self, dialect): return dialect.type_descriptor(types.VARCHAR(self.length))
[docs]class Password(Column): """String column :: from anyblok.declarations import Declarations from anyblok.column import Password @Declarations.register(Declarations.Model) class Test: x = Password(crypt_context={'schemes': ['md5_crypt']}) ========================================= test = Test.insert() test.x = 'mypassword' test.x ==> Password object with encrypt value, the value can not be read test.x == 'mypassword' ==> True ..warning:: the column type Password can not be querying:: Test.query().filter(Test.x == 'mypassword').count() ==> 0 """ def __init__(self, *args, **kwargs): self.size = kwargs.pop("size", 64) crypt_context = kwargs.pop("crypt_context", {}) self.crypt_context = crypt_context kwargs.pop("type_", None) if "foreign_key" in kwargs: raise FieldException("Column Password can not have a foreign key") self.sqlalchemy_type = PasswordType( max_length=self.size, **crypt_context ) super(Password, self).__init__(*args, **kwargs)
[docs] def setter_format_value(self, value): """Return formatted value :param value: :return: """ value = self.sqlalchemy_type.context.hash(value).encode("utf8") value = SAU_PWD(value, context=self.sqlalchemy_type.context) return value
[docs] def native_type(self, registry): """Return the native SqlAlchemy type""" if sgdb_in(registry.engine, ["MsSQL"]): return MsSQLPasswordType(max_length=self.size, **self.crypt_context) return self.sqlalchemy_type
[docs] def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(Password, self).autodoc_get_properties() res["size"] = self.size res["Crypt context"] = self.crypt_context return res
class TextType(types.TypeDecorator): impl = types.Text cache_ok = True def process_bind_param(self, value, engine): if value is False: value = "" return value def process_result_value(self, value, dialect): return value
[docs]class Text(Column): """Text column :: from anyblok.declarations import Declarations from anyblok.column import Text @Declarations.register(Declarations.Model) class Test: x = Text(default='test') """ sqlalchemy_type = TextType def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key): sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key) if sgdb_in(registry.engine, ["MySQL", "MariaDB"]): sqlalchemy_type.impl = types.Text() return sqlalchemy_type
[docs]class StrSelection(str): """Class representing the data of one column Selection""" selections = dumps({}) registry = None namespace = None
[docs] def get_selections(self): """Return a dict of selections :return: selections dict """ selections = loads(self.selections) if isinstance(selections, dict): return selections if isinstance(selections, str): m = self.registry.get(self.namespace) return dict(getattr(m, selections)())
[docs] def validate(self): """Validate if the key is in the selections :return: True or False """ a = super(StrSelection, self).__str__() return a in self.get_selections().keys()
@property def label(self): """Return the label corresponding to the selection key :return: """ a = super(StrSelection, self).__str__() return self.get_selections()[a]
[docs]class SelectionType(types.TypeDecorator): """Generic type for Column Selection""" impl = types.String cache_ok = True def __init__(self, selections, size, registry=None, namespace=None): super(SelectionType, self).__init__(length=size) self.size = size if isinstance(selections, (dict, str)): self.selections = selections elif isinstance(selections, (list, tuple)): self.selections = dict(selections) else: raise FieldException( # pragma: no cover "selection wainting 'dict', get %r" % type(selections) ) if isinstance(self.selections, dict): for k in self.selections.keys(): if not isinstance(k, str): raise FieldException("The key must be a str") if len(k) > 64: raise Exception( # pragma: no cover "%r is too long %r, waiting max %s or use size arg" % (k, len(k), size) ) self.selections = dumps(self.selections) self._StrSelection = type( "StrSelection", (StrSelection,), { "selections": self.selections, "registry": registry, "namespace": namespace, }, ) @property def python_type(self): return self._StrSelection
[docs] def process_bind_param(self, value, engine): if value is not None: value = self.python_type(value) return value
[docs] def process_result_value(self, value, dialect): return value
[docs]class Selection(Column): """Selection column :: from anyblok.declarations import Declarations from anyblok.column import Selection @Declarations.register(Declarations.Model) class Test: STATUS = ( (u'draft', u'Draft'), (u'done', u'Done'), ) x = Selection(selections=STATUS, size=64, default=u'draft') """ def __init__(self, *args, **kwargs): self.selections = tuple() if "selections" in kwargs: self.selections = kwargs.pop("selections") self.size = kwargs.pop("size", 64) self.sqlalchemy_type = "tmp value for assert" super(Selection, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(Selection, self).autodoc_get_properties() res["selections"] = self.selections res["size"] = self.size return res
[docs] def getter_format_value(self, value): """Return formatted value :param value: :return: """ if value is None: return None return self.sqlalchemy_type.python_type(value)
[docs] def setter_format_value(self, value): """Return value or raise exception if the given value is invalid :param value: :exception FieldException :return: """ if value is not None: val = self.sqlalchemy_type.python_type(value) if not val.validate(): raise FieldException( "%r is not in the selections (%s)" % (value, ", ".join(val.get_selections())) ) return value
[docs] def get_sqlalchemy_mapping( self, registry, namespace, fieldname, properties ): """Return sqlalchmy mapping :param registry: the current registry :param namespace: the namespace of the model :param fieldname: the fieldname of the model :param properties: the properties of the model :return: instance of the real field """ self.sqlalchemy_type = SelectionType( self.selections, self.size, registry=registry, namespace=namespace ) return super(Selection, self).get_sqlalchemy_mapping( registry, namespace, fieldname, properties )
[docs] def update_description(self, registry, model, res): """Update model description :param registry: the current registry :param model: :param res: """ super(Selection, self).update_description(registry, model, res) sqlalchemy_type = SelectionType( self.selections, self.size, registry=registry, namespace=model ) values = sqlalchemy_type._StrSelection().get_selections() res["selections"] = [(k, v) for k, v in values.items()]
[docs] def must_be_copied_before_declaration(self): """Return True if selections is an instance of str. In the case of the field selection is a mixin, it must be copied or the selection method can fail """ if isinstance(self.selections, str): return True else: return False
[docs] def update_properties(self, registry, namespace, fieldname, properties): """Update column properties :param registry: the current registry :param namespace: the namespace of the model :param fieldname: the fieldname of the model :param properties: the properties of the model """ super(Selection, self).update_properties( registry, namespace, fieldname, properties ) self.fieldname = fieldname properties["add_in_table_args"].append(self)
[docs] def update_table_args(self, registry, Model): """Return check constraints to limit the value :param registry: :param Model: :return: list of checkConstraint """ if self.encrypt_key: # dont add constraint because the state is crypted and nobody # can add new entry return [] if sgdb_in(registry.engine, ["MariaDB", "MsSQL", "MySQL"]): # No check constraint in MariaDB return [] selections = loads(self.sqlalchemy_type.selections) if isinstance(selections, dict): enum = selections.keys() else: enum = getattr(Model, selections)() if isinstance(enum, (list, tuple)): enum = dict(enum) enum = enum.keys() if len(enum) > 1: constraint = """"%s" in ('%s')""" % ( self.fieldname, "', '".join(enum), ) elif enum: constraint = """"%s" = '%s'""" % (self.fieldname, list(enum)[0]) else: constraint = None if constraint: enum = list(enum) enum.sort() key = md5() key.update(str(enum).encode("utf-8")) name = self.fieldname + "_" + key.hexdigest() + "_types" return [CheckConstraint(constraint, name=name)] return []
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key): sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key) if sgdb_in(registry.engine, ["MySQL", "MariaDB"]): sqlalchemy_type.impl = types.String(max(self.size, 64)) return sqlalchemy_type
""" Added *process_result_value* at the class *JSON*, because this method is necessary for encrypt the column """ types.JSON.process_result_value = lambda self, value, dialect: value
[docs]class Json(Column): """JSON column :: from anyblok.declarations import Declarations from anyblok.column import Json @Declarations.register(Declarations.Model) class Test: x = Json() """ sqlalchemy_type = types.JSON(none_as_null=True)
[docs] def native_type(self, registry): """Return the native SqlAlchemy type""" if sgdb_in(registry.engine, ["MariaDB", "MsSQL"]): return JSONType return self.sqlalchemy_type
def setter_format_value(self, value): if self.encrypt_key: value = dumps(value) return value def getter_format_value(self, value): if value is None: return None if self.encrypt_key: value = loads(value) return value def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key): sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key) if sgdb_in(registry.engine, ["MySQL", "MariaDB"]): sqlalchemy_type.impl = types.Text() return sqlalchemy_type
[docs]class LargeBinary(Column): """Large binary column :: from os import urandom from anyblok.declarations import Declarations from anyblok.column import LargeBinary blob = urandom(100000) @Declarations.register(Declarations.Model) class Test: x = LargeBinary(default=blob) """ sqlalchemy_type = types.LargeBinary
[docs] def native_type(self, registry): if self.encrypt_key: return types.Text return self.sqlalchemy_type
def setter_format_value(self, value): if self.encrypt_key: value = b64encode(value).decode("utf-8") return value def getter_format_value(self, value): if self.encrypt_key: value = b64decode(value.encode("utf-8")) return value def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key): sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key) if sgdb_in(registry.engine, ["MySQL", "MariaDB"]): sqlalchemy_type.impl = types.Text() return sqlalchemy_type
class Sequence(String): """Sequence column :: from anyblok.column import Sequence @Declarations.register(Declarations.Model) class Test: x = Sequence() If you wish ensure no gap in the sequence:: from anyblok.column import Sequence @Declarations.register(Declarations.Model) class Test: x = Sequence(no_gap=True, code="SO", formater="{code}-{seq:06d}") .. warning:: Keep in mind `no_gap=True` will raise an `sqlalchemy.exc.OperationalError: (psycopg2.errors.LockNotAvailable)` exception in case a concurrent transaction do not release the lock while getting the next value. usage with `no_gap=True`:: >>> Test.insert().x "SO-000001" >>> Test.insert().x "SO-000002" >>> registry.rollback() >>> Test.insert().x "SO-000001" """ def __init__(self, *args, **kwargs): if "foreign_key" in kwargs: raise FieldException( "Sequence column can not define a foreign key" " %r" % kwargs["foreign_key"] ) if "default" in kwargs: raise FieldException( "Sequence column can not define a default " "value" ) kwargs["default"] = ColumnDefaultValue(self.wrap_default) self.code = kwargs.pop("code") if "code" in kwargs else None self.start = kwargs.pop("start", 1) self.formater = kwargs.pop("formater") if "formater" in kwargs else None self.no_gap = kwargs.pop("no_gap") if "no_gap" in kwargs else None super(Sequence, self).__init__(*args, **kwargs) def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(Sequence, self).autodoc_get_properties() res["formater"] = self.formater res["no_gap"] = self.no_gap return res def wrap_default(self, registry, namespace, fieldname, properties): """Return default wrapper :param registry: the current registry :param namespace: the namespace of the model :param fieldname: the fieldname of the model :return: """ if not hasattr( registry, "_need_sequence_to_create_if_not_exist" ): # pragma: no cover registry._need_sequence_to_create_if_not_exist = [] elif registry._need_sequence_to_create_if_not_exist is None: registry._need_sequence_to_create_if_not_exist = [] code = self.code if self.code else "%s=>%s" % (namespace, fieldname) registry._need_sequence_to_create_if_not_exist.append( {"code": code, "formater": self.formater, "no_gap": self.no_gap} ) # {'code': code, 'formater': self.formater, 'start': self.start}) def default_value(self, *args, **kwargs): """Return next sequence value :return: """ return registry.System.Sequence.nextvalBy(code=code) return default_value
[docs]class Color(Column): """Color column. `See colour package on pypi <https://pypi.python.org/pypi/colour/>`_ :: from anyblok.declarations import Declarations from anyblok.column import Color @Declarations.register(Declarations.Model) class Test: x = Color(default='green') """ def __init__(self, *args, **kwargs): self.max_length = max_length = kwargs.pop("size", 20) kwargs.pop("type_", None) self.sqlalchemy_type = ColorType(max_length) super(Color, self).__init__(*args, **kwargs)
[docs] def setter_format_value(self, value): """Format the given value :param value: :return: """ if isinstance(value, str): value = self.sqlalchemy_type.python_type(value) return value
[docs] def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(Color, self).autodoc_get_properties() res["size"] = self.max_length return res
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key): sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key) if sgdb_in(registry.engine, ["MySQL", "MariaDB"]): sqlalchemy_type.impl = types.String(max(self.max_length, 64)) return sqlalchemy_type
class UUID(Column): """UUID column :: from anyblok.column import UUID @Declarations.register(Declarations.Model) class Test: x = UUID() """ def __init__(self, *args, **kwargs): uuid_kwargs = {} self.binary = None self.native = None for kwarg in ("binary", "native"): if kwarg in kwargs: uuid_kwargs[kwarg] = kwargs.pop(kwarg) setattr(self, kwarg, uuid_kwargs[kwarg]) self.sqlalchemy_type = UUIDType(**uuid_kwargs) super(UUID, self).__init__(*args, **kwargs) def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(UUID, self).autodoc_get_properties() res["binary"] = self.binary res["native"] = self.native return res URLType.cache_ok = True # waiting fix from sqlalchemy_utils class URL(Column): """URL column :: from anyblok.declarations import Declarations from anyblok.column import URL @Declarations.register(Declarations.Model) class Test: x = URL(default='doc.anyblok.org') """ sqlalchemy_type = URLType def setter_format_value(self, value): """Return formatted url value :param value: :return: """ from furl import furl if value is not None: if isinstance(value, str): value = furl(value) return value
[docs]class PhoneNumber(Column): """PhoneNumber column :: from anyblok.declarations import Declarations from anyblok.column import PhoneNumber @Declarations.register(Declarations.Model) class Test: x = PhoneNumber(default='+120012301') .. note:: ``phonenumbers`` >= **8.9.5** distribution is required """ def __init__(self, region="FR", max_length=20, *args, **kwargs): self.region = region self.max_length = max_length kwargs.pop("type_", None) self.sqlalchemy_type = PhoneNumberType( region=region, max_length=max_length ) super(PhoneNumber, self).__init__(*args, **kwargs)
[docs] def setter_format_value(self, value): """Return formatted phone number value :param value: :return: """ if value and isinstance(value, str): value = self.sqlalchemy_type.python_type(value, self.region) return value
[docs] def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(PhoneNumber, self).autodoc_get_properties() res["region"] = self.region res["max_length"] = self.max_length return res
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key): sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key) if sgdb_in(registry.engine, ["MySQL", "MariaDB"]): sqlalchemy_type.impl = types.String(max(self.max_length, 64)) return sqlalchemy_type
""" Added *process_result_value* at the class *EmailType*, because this method is necessary for encrypt the column """ EmailType.process_result_value = lambda self, value, dialect: value EmailType.cache_ok = True # waiting fix from sqlalchemy_utils
[docs]class Email(Column): """Email column :: from anyblok.column import Email @Declarations.register(Declarations.Model) class Test: x = Email() """ sqlalchemy_type = EmailType
[docs] def setter_format_value(self, value): """Return formatted email value :param value: :return: """ if value is not None: return value.lower() return value # pragma: no cover
class CountryType(types.TypeDecorator, ScalarCoercible): """Generic type for Column Country""" impl = types.Unicode(3) cache_ok = True python_type = python_pycountry_type def process_bind_param(self, value, dialect): if value and isinstance(value, self.python_type): return value.alpha_3 return value def process_result_value(self, value, dialect): return self._coerce(value) def _coerce(self, value): if value is not None and not isinstance(value, self.python_type): return pycountry.countries.get(alpha_3=value) return value # pragma: no cover
[docs]class Country(Column): """Country column. :: from anyblok.declarations import Declarations from anyblok.column import Country from pycountry import countries @Declarations.register(Declarations.Model) class Test: x = Country(default=countries.get(alpha_2='FR')) """ sqlalchemy_type = CountryType def __init__(self, mode="alpha_2", *args, **kwargs): self.mode = mode if pycountry is None: raise FieldException( # pragma: no cover "'pycountry' package is required for use 'CountryType'" ) self.choices = { getattr(country, mode): country.name for country in pycountry.countries } super(Country, self).__init__(*args, **kwargs)
[docs] def setter_format_value(self, value): """Return formatted country value :param value: :return: """ if value and not isinstance(value, self.sqlalchemy_type.python_type): value = pycountry.countries.get( **{ self.mode: value, "default": pycountry.countries.lookup(value), } ) return value
[docs] def autodoc_get_properties(self): """Return properties for autodoc :return: autodoc properties """ res = super(Country, self).autodoc_get_properties() res["mode"] = self.mode res["choices"] = self.choices return res
[docs] def update_properties(self, registry, namespace, fieldname, properties): """Update column properties :param registry: the current registry :param namespace: the namespace of the model :param fieldname: the fieldname of the model :param properties: the properties of the model """ super(Country, self).update_properties( registry, namespace, fieldname, properties ) self.fieldname = fieldname properties["add_in_table_args"].append(self)
[docs] def update_table_args(self, registry, Model): """Return check constraints to limit the value :param registry: :param Model: :return: list of checkConstraint """ if self.encrypt_key: # dont add constraint because the state is crypted and nobody # can add new entry return [] if sgdb_in(registry.engine, ["MariaDB", "MsSQL"]): # No Check constraint in MariaDB return [] enum = [country.alpha_3 for country in pycountry.countries] constraint = """"%s" in ('%s')""" % (self.fieldname, "', '".join(enum)) enum.sort() key = md5() key.update(str(enum).encode("utf-8")) name = self.fieldname + "_" + key.hexdigest() + "_types" return [CheckConstraint(constraint, name=name)]
@CompareType.add_comparator(String, String) @CompareType.add_comparator(String, Selection) @CompareType.add_comparator(String, Sequence) def compare_strings(col1, type1, col2, type2): if type1.size != type2.size: raise FieldException( "You can't add a foreign key using based String columns with " "different size `{model1!s}.{col1!s}` pointing to " "`{model2!s}.{col2!s}` have different sizes {type1!r}({size1:d}) " "-> ({type2!r}){size2:d}".format( model1=col1.model_name, col1=col1.attribute_name, model2=col2.model_name, col2=col2.attribute_name, type1=type1.__class__, type2=type2.__class__, size1=type1.size, size2=type2.size, ) ) @CompareType.add_comparator(String, Color) def compare_string_to_color(col1, type1, col2, type2): if type1.size != type2.max_length: raise FieldException( "You can't add a foreign key using based String columns with " "different size `{model1!s}.{col1!s}` pointing to " "`{model2!s}.{col2!s}` have different sizes {type1!r}({size1:d}) " "-> ({type2!r}){size2:d}".format( model1=col1.model_name, col1=col1.attribute_name, model2=col2.model_name, col2=col2.attribute_name, type1=type1.__class__, type2=type2.__class__, size1=type1.size, size2=type2.max_length, ) )