# This file is a part of the AnyBlok project
#
# Copyright (C) 2016 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
# Copyright (C) 2017 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
# Copyright (C) 2018 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
# Copyright (C) 2019 Jean-Sebastien SUZANNE <js.suzanne@gmail.com>
# Copyright (C) 2020 Jean-Sebastien SUZANNE <js.suzanne@gmail.com>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
from base64 import b64encode, b64decode
from .field import Field, FieldException
from .mapper import ModelAttributeAdapter, ModelAttribute
from sqlalchemy.schema import Sequence as SA_Sequence, Column as SA_Column
from sqlalchemy import types, CheckConstraint
from sqlalchemy_utils.types.color import ColorType
from sqlalchemy_utils.types.encrypted.encrypted_type import StringEncryptedType
from sqlalchemy_utils.types.password import PasswordType, Password as SAU_PWD
from sqlalchemy_utils.types.uuid import UUIDType
from sqlalchemy_utils.types.url import URLType
from sqlalchemy_utils.types.phone_number import PhoneNumberType
from sqlalchemy_utils.types.email import EmailType
from sqlalchemy_utils.types.scalar_coercible import ScalarCoercible
from sqlalchemy_utils import JSONType
from datetime import datetime, date, timedelta
from dateutil.parser import parse
from inspect import ismethod
from anyblok.config import Configuration
from .common import sgdb_in
from json import dumps, loads
import time
import pytz
import decimal
from logging import getLogger
from hashlib import md5
pycountry = None
python_pycountry_type = None
try:
import pycountry
if not pycountry.countries._is_loaded:
pycountry.countries._load()
python_pycountry_type = pycountry.countries.data_class
except ImportError:
pass
logger = getLogger(__name__)
def wrap_default(registry, namespace, default_val):
"""Return default wrapper
:param registry: the current registry
:param namespace: the namespace of the model
:param default_val:
:return: default wrapper
"""
def wrapper():
"""Return wrapper
:return: default val
"""
Model = registry.get(namespace)
if hasattr(Model, default_val):
func = getattr(Model, default_val)
if ismethod(func):
if default_val not in Model.loaded_columns:
if default_val not in Model.loaded_fields:
return func()
else:
logger.warning(
"The attribute %r is already declared as a default "
"value on the Model %r, a field with the same name "
"already exists" % (default_val, namespace))
else:
logger.warning(
"The attribute %r is already declared as a default "
"value on the Model %r, a column with the same name "
"already exists" % (default_val, namespace))
else:
logger.warning(
"The attribute %r is already declared as a default "
"value on the Model %r, an instance method with the same "
"name already exists" % (default_val, namespace))
return default_val
return wrapper
class ColumnDefaultValue:
def __init__(self, callable):
self.callable = callable
def get_default_callable(self, registry, namespace, fieldname, properties):
"""Get default callable
:param registry: the current registry
:param namespace: the namespace of the model
:param fieldname: the fieldname of the model
:param properties: the properties of the model
:return: default callable
"""
return self.callable(registry, namespace, fieldname, properties)
class CompareType:
comparators = []
@classmethod
def default_comparator(cls, col1, type1, col2, type2):
if type1.__class__ is not type2.__class__:
raise FieldException(
"You can't add a foreign key using columns with different "
"types {model1!s}.{col1!s}` pointing to `{model2!s}.{col2!s}` "
"have different types {type1!r} -> {type2!r}".format(
model1=col1.model_name,
col1=col1.attribute_name,
model2=col2.model_name,
col2=col2.attribute_name,
type1=type1.__class__,
type2=type2.__class__
)
)
@classmethod
def add_comparator(cls, type1, type2):
def wrapper(funct):
cls.comparators.append((type1, type2, funct))
return funct
return wrapper
@classmethod
def validate(cls, col1, type1, col2, type2):
for (cls1, cls2, funct) in cls.comparators:
if type1.__class__ is cls1 and type2.__class__ is cls2:
funct(col1, type1, col2, type2)
return
cls.default_comparator(col1, type1, col2, type2)
class NoDefaultValue:
pass
[docs]class Column(Field):
"""Column class
This class can't be instantiated
"""
use_hybrid_property = True
foreign_key = None
sqlalchemy_type = None
type = None
def __init__(self, *args, **kwargs):
"""Initialize the column
:param label: label of this field
:type label: str
"""
self.forbid_instance(Column)
assert self.sqlalchemy_type
self.sequence = None
if 'type_' in kwargs:
del kwargs['type_']
if 'foreign_key' in kwargs:
self.foreign_key = ModelAttributeAdapter(kwargs.pop('foreign_key'))
if 'sequence' in kwargs:
self.sequence = SA_Sequence(kwargs.pop('sequence'))
self.db_column_name = None
if 'db_column_name' in kwargs:
self.db_column_name = kwargs.pop('db_column_name')
self.default_val = NoDefaultValue
if 'default' in kwargs:
self.default_val = kwargs.pop('default')
self.encrypt_key = kwargs.pop('encrypt_key', None)
super(Column, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self):
"""Return properties list for autodoc
:return: autodoc properties
"""
res = super(Column, self).autodoc_get_properties()
res['foreign_key'] = self.foreign_key
res['DB column name'] = self.db_column_name
res['default'] = self.default_val
res['is crypted'] = True if self.encrypt_key else False
return res
autodoc_omit_property_values = Field.autodoc_omit_property_values.union((
('foreign_key', None),
('DB column name', None),
('default', None),
('is crypted', False),
))
[docs] def native_type(self, registry):
"""Return the native SqlAlchemy type
:param registry:
:rtype: sqlalchemy native type
"""
return self.sqlalchemy_type
[docs] def get_sqlalchemy_mapping(self, registry, namespace, fieldname,
properties):
"""Return the instance of the real field
:param registry: current registry
:param namespace: name of the model
:param fieldname: name of the field
:param properties: known properties of the model
:rtype: sqlalchemy column instance
"""
self.format_label(fieldname)
args = self.args
kwargs = self.kwargs.copy()
if 'info' not in kwargs:
kwargs['info'] = {}
args = self.format_foreign_key(
registry, namespace, fieldname, args, kwargs)
kwargs['info']['label'] = self.label
if self.sequence:
args = (self.sequence,) + args
if self.db_column_name:
db_column_name = self.db_column_name
else:
db_column_name = fieldname
if self.default_val is not NoDefaultValue:
if isinstance(self.default_val, str):
kwargs['default'] = wrap_default(registry, namespace,
self.default_val)
elif isinstance(self.default_val, ColumnDefaultValue):
kwargs['default'] = self.default_val.get_default_callable(
registry, namespace, fieldname, properties)
else:
kwargs['default'] = self.default_val
sqlalchemy_type = self.native_type(registry)
if self.encrypt_key:
encrypt_key = self.format_encrypt_key(registry, namespace)
sqlalchemy_type = self.get_encrypt_key_type(
registry, sqlalchemy_type, encrypt_key)
return SA_Column(db_column_name, sqlalchemy_type, *args, **kwargs)
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key):
sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key)
if sgdb_in(registry.engine, ['MySQL', 'MariaDB']):
sqlalchemy_type.impl = types.String(64)
return sqlalchemy_type
[docs] def must_be_declared_as_attr(self):
"""Return True if the column have a foreign key to a remote column """
if self.foreign_key is not None:
return True
return False
[docs]class Integer(Column):
"""Integer column
::
from anyblok.declarations import Declarations
from anyblok.column import Integer
@Declarations.register(Declarations.Model)
class Test:
x = Integer(default=1)
"""
def __init__(self, *args, **kwargs):
super(Integer, self).__init__(*args, **kwargs)
if self.kwargs.get('primary_key') is True:
if 'autoincrement' not in self.kwargs:
self.kwargs['autoincrement'] = True
sqlalchemy_type = types.Integer
[docs]class BigInteger(Column):
"""Big integer column
::
from anyblok.declarations import Declarations
from anyblok.column import BigInteger
@Declarations.register(Declarations.Model)
class Test:
x = BigInteger(default=1)
"""
sqlalchemy_type = types.BigInteger
[docs]class Boolean(Column):
"""Boolean column
::
from anyblok.declarations import Declarations
from anyblok.column import Boolean
@Declarations.register(Declarations.Model)
class Test:
x = Boolean(default=True)
"""
sqlalchemy_type = types.Boolean
[docs]class Float(Column):
"""Float column
::
from anyblok.declarations import Declarations
from anyblok.column import Float
@Declarations.register(Declarations.Model)
class Test:
x = Float(default=1.0)
"""
sqlalchemy_type = types.Float
"""
Added *process_result_value* at the class *DECIMAL*, because
this method is necessary for encrypt the column
"""
types.DECIMAL.process_result_value = lambda self, value, dialect: value
[docs]class Decimal(Column):
"""Decimal column
::
from decimal import Decimal as D
from anyblok.declarations import Declarations
from anyblok.column import Decimal
@Declarations.register(Declarations.Model)
class Test:
x = Decimal(default=D('1.1'))
"""
sqlalchemy_type = types.DECIMAL
def getter_format_value(self, value):
if value is None:
return None # pragma: no cover
if self.encrypt_key:
value = decimal.Decimal(value)
return value
[docs]class Date(Column):
"""Date column
::
from datetime import date
from anyblok.declarations import Declarations
from anyblok.column import Date
@Declarations.register(Declarations.Model)
class Test:
x = Date(default=date.today())
"""
sqlalchemy_type = types.Date
def convert_string_to_datetime(value):
"""Convert a given value to datetime
:param value:
:return: datetime value
"""
if value is None:
return None
elif isinstance(value, datetime):
return value
elif isinstance(value, date):
return datetime.combine(value, datetime.min.time())
elif isinstance(value, str):
return parse(value)
raise FieldException(
"We can't convert this value %s to datetime")
def add_timezone_on_datetime(dt, default_timezone):
"""Convert a datetime considering the default timezone
:param dt:
:param default_timezone:
:return:
"""
if dt is not None:
if dt.tzinfo is None:
dt = default_timezone.localize(dt)
return dt
class DateTimeType(types.TypeDecorator):
impl = types.DateTime(timezone=True)
def __init__(self, field):
self.default_timezone = field.default_timezone
self.field = field
def process_bind_param(self, value, engine):
value = convert_string_to_datetime(value)
value = add_timezone_on_datetime(value, self.default_timezone)
if self.field.encrypt_key:
return value.isoformat()
return value
def process_result_value(self, value, dialect):
if self.field.encrypt_key:
return convert_string_to_datetime(value)
return value
@property
def python_type(self):
return datetime # pragma: no cover
[docs]class DateTime(Column):
"""DateTime column
::
from datetime import datetime
from anyblok.declarations import Declarations
from anyblok.column import DateTime
@Declarations.register(Declarations.Model)
class Test:
x = DateTime(default=datetime.now)
"""
def __init__(self, *args, **kwargs):
self.auto_update = kwargs.pop('auto_update', False)
default_timezone = kwargs.pop(
'default_timezone',
Configuration.get('default_timezone')
)
if not default_timezone:
default_timezone = time.tzname[0]
if isinstance(default_timezone, str):
default_timezone = pytz.timezone(default_timezone)
self.default_timezone = default_timezone
self.sqlalchemy_type = DateTimeType(self)
super(DateTime, self).__init__(*args, **kwargs)
def getter_format_value(self, value):
value = convert_string_to_datetime(value)
return add_timezone_on_datetime(value, self.default_timezone)
[docs] def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(Column, self).autodoc_get_properties()
res['is auto updated'] = self.auto_update
if self.default_timezone:
res['default timezone'] = self.default_timezone
return res
class TimeStamp(DateTime):
""" TimeStamp column
::
from datetime import datetime
from anyblok.declarations import Declarations
from anyblok.column import DateTime
@Declarations.register(Declarations.Model)
class Test:
x = TimeStamp(default=datetime.now)
"""
def __init__(self, *args, **kwargs):
super(TimeStamp, self).__init__(*args, **kwargs)
self.sqlalchemy_type = types.TIMESTAMP(timezone=True)
def getter_format_value(self, value):
value = convert_string_to_datetime(value)
return add_timezone_on_datetime(value, self.default_timezone)
[docs]class Time(Column):
"""Time column
::
from datetime import time
from anyblok.declarations import Declarations
from anyblok.column import Time
@Declarations.register(Declarations.Model)
class Test:
x = Time(default=time())
"""
sqlalchemy_type = types.Time
[docs]class Interval(Column):
"""Datetime interval column
::
from datetime import timedelta
from anyblok.declarations import Declarations
from anyblok.column import Interval
@Declarations.register(Declarations.Model)
class Test:
x = Interval(default=timedelta(days=5))
"""
sqlalchemy_type = types.Interval
[docs] def native_type(self, registry):
if self.encrypt_key:
return types.VARCHAR(1024)
return self.sqlalchemy_type
def setter_format_value(self, value):
if self.encrypt_key:
value = dumps({
x: getattr(value, x)
for x in ['days', 'seconds', 'microseconds']})
return value
def getter_format_value(self, value):
if self.encrypt_key:
value = timedelta(**loads(value))
return value
class StringType(types.TypeDecorator):
impl = types.String
def process_bind_param(self, value, engine):
if value is False:
value = ''
return value
def process_result_value(self, value, dialect):
return value
[docs]class String(Column):
"""String column
::
from anyblok.declarations import Declarations
from anyblok.column import String
@Declarations.register(Declarations.Model)
class Test:
x = String(default='test')
"""
def __init__(self, *args, **kwargs):
self.size = kwargs.pop('size', 64)
kwargs.pop('type_', None)
self.sqlalchemy_type = StringType(self.size)
super(String, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(String, self).autodoc_get_properties()
res['size'] = self.size
return res
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key):
sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key)
if sgdb_in(registry.engine, ['MySQL', 'MariaDB']):
sqlalchemy_type.impl = types.String(max(self.size, 64))
return sqlalchemy_type
class Enum(Column):
"""Enum column
::
from anyblok.declarations import Declarations
from anyblok.column import Enum
import enum
class MyEnumClass(enum.Enum):
one = 1
two = 2
three = 3
@Declarations.register(Declarations.Model)
class Test:
x = Enum(enum_cls=MyEnumClass, default='test')
enum_cls should be an enum class
"""
def __init__(self, *args, **kwargs):
self.enum_cls = kwargs.pop('enum_cls')
self.sqlalchemy_type = types.Enum(self.enum_cls)
super(Enum, self).__init__(*args, **kwargs)
def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(Enum, self).autodoc_get_properties()
res['enum_cls'] = repr(self.enum_cls)
return res
class MsSQLPasswordType(PasswordType):
impl = types.VARCHAR(1024)
def load_dialect_impl(self, dialect):
return dialect.type_descriptor(types.VARCHAR(self.length))
[docs]class Password(Column):
"""String column
::
from anyblok.declarations import Declarations
from anyblok.column import Password
@Declarations.register(Declarations.Model)
class Test:
x = Password(crypt_context={'schemes': ['md5_crypt']})
=========================================
test = Test.insert()
test.x = 'mypassword'
test.x
==> Password object with encrypt value, the value can not be read
test.x == 'mypassword'
==> True
..warning::
the column type Password can not be querying::
Test.query().filter(Test.x == 'mypassword').count()
==> 0
"""
def __init__(self, *args, **kwargs):
self.size = kwargs.pop('size', 64)
crypt_context = kwargs.pop('crypt_context', {})
self.crypt_context = crypt_context
kwargs.pop('type_', None)
if 'foreign_key' in kwargs:
raise FieldException("Column Password can not have a foreign key")
self.sqlalchemy_type = PasswordType(
max_length=self.size, **crypt_context)
super(Password, self).__init__(*args, **kwargs)
[docs] def native_type(self, registry):
""" Return the native SqlAlchemy type """
if sgdb_in(registry.engine, ['MsSQL']):
return MsSQLPasswordType(max_length=self.size, **self.crypt_context)
return self.sqlalchemy_type
[docs] def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(Password, self).autodoc_get_properties()
res['size'] = self.size
res['Crypt context'] = self.crypt_context
return res
class TextType(types.TypeDecorator):
impl = types.Text
def process_bind_param(self, value, engine):
if value is False:
value = ''
return value
def process_result_value(self, value, dialect):
return value
[docs]class Text(Column):
"""Text column
::
from anyblok.declarations import Declarations
from anyblok.column import Text
@Declarations.register(Declarations.Model)
class Test:
x = Text(default='test')
"""
sqlalchemy_type = TextType
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key):
sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key)
if sgdb_in(registry.engine, ['MySQL', 'MariaDB']):
sqlalchemy_type.impl = types.Text()
return sqlalchemy_type
[docs]class StrSelection(str):
"""Class representing the data of one column Selection """
selections = dumps({})
registry = None
namespace = None
[docs] def get_selections(self):
"""Return a dict of selections
:return: selections dict
"""
selections = loads(self.selections)
if isinstance(selections, dict):
return selections
if isinstance(selections, str):
m = self.registry.get(self.namespace)
return dict(getattr(m, selections)())
[docs] def validate(self):
"""Validate if the key is in the selections
:return: True or False
"""
a = super(StrSelection, self).__str__()
return a in self.get_selections().keys()
@property
def label(self):
"""Return the label corresponding to the selection key
:return:
"""
a = super(StrSelection, self).__str__()
return self.get_selections()[a]
[docs]class SelectionType(types.TypeDecorator):
"""Generic type for Column Selection """
impl = types.String
def __init__(self, selections, size, registry=None, namespace=None):
super(SelectionType, self).__init__(length=size)
self.size = size
if isinstance(selections, (dict, str)):
self.selections = selections
elif isinstance(selections, (list, tuple)):
self.selections = dict(selections)
else:
raise FieldException( # pragma: no cover
"selection wainting 'dict', get %r" % type(selections))
if isinstance(self.selections, dict):
for k in self.selections.keys():
if not isinstance(k, str):
raise FieldException('The key must be a str')
if len(k) > 64:
raise Exception( # pragma: no cover
'%r is too long %r, waiting max %s or use size arg' % (
k, len(k), size))
self.selections = dumps(self.selections)
self._StrSelection = type('StrSelection', (StrSelection,),
{'selections': self.selections,
'registry': registry,
'namespace': namespace})
@property
def python_type(self):
return self._StrSelection
[docs] def process_bind_param(self, value, engine):
if value is not None:
value = self.python_type(value)
return value
[docs] def process_result_value(self, value, dialect):
return value
[docs]class Selection(Column):
"""Selection column
::
from anyblok.declarations import Declarations
from anyblok.column import Selection
@Declarations.register(Declarations.Model)
class Test:
STATUS = (
(u'draft', u'Draft'),
(u'done', u'Done'),
)
x = Selection(selections=STATUS, size=64, default=u'draft')
"""
def __init__(self, *args, **kwargs):
self.selections = tuple()
if 'selections' in kwargs:
self.selections = kwargs.pop('selections')
self.size = kwargs.pop('size', 64)
self.sqlalchemy_type = 'tmp value for assert'
super(Selection, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(Selection, self).autodoc_get_properties()
res['selections'] = self.selections
res['size'] = self.size
return res
[docs] def get_sqlalchemy_mapping(self, registry, namespace, fieldname,
properties):
"""Return sqlalchmy mapping
:param registry: the current registry
:param namespace: the namespace of the model
:param fieldname: the fieldname of the model
:param properties: the properties of the model
:return: instance of the real field
"""
self.sqlalchemy_type = SelectionType(
self.selections, self.size, registry=registry, namespace=namespace)
return super(Selection, self).get_sqlalchemy_mapping(
registry, namespace, fieldname, properties)
[docs] def update_description(self, registry, model, res):
"""Update model description
:param registry: the current registry
:param model:
:param res:
"""
super(Selection, self).update_description(registry, model, res)
sqlalchemy_type = SelectionType(
self.selections, self.size, registry=registry, namespace=model)
values = sqlalchemy_type._StrSelection().get_selections()
res['selections'] = [(k, v) for k, v in values.items()]
[docs] def must_be_copied_before_declaration(self):
"""Return True if selections is an instance of str.
In the case of the field selection is a mixin, it must be copied or the
selection method can fail
"""
if isinstance(self.selections, str):
return True
else:
return False
[docs] def update_properties(self, registry, namespace, fieldname, properties):
"""Update column properties
:param registry: the current registry
:param namespace: the namespace of the model
:param fieldname: the fieldname of the model
:param properties: the properties of the model
"""
super(Selection, self).update_properties(registry, namespace,
fieldname, properties)
self.fieldname = fieldname
properties['add_in_table_args'].append(self)
[docs] def update_table_args(self, registry, Model):
"""Return check constraints to limit the value
:param registry:
:param Model:
:return: list of checkConstraint
"""
if self.encrypt_key:
# dont add constraint because the state is crypted and nobody
# can add new entry
return []
if sgdb_in(registry.engine, ['MariaDB', 'MsSQL']):
# No check constraint in MariaDB
return []
selections = loads(self.sqlalchemy_type.selections)
if isinstance(selections, dict):
enum = selections.keys()
else:
enum = getattr(Model, selections)()
if isinstance(enum, (list, tuple)):
enum = dict(enum)
enum = enum.keys()
if len(enum) > 1:
constraint = """"%s" in ('%s')""" % (
self.fieldname, "', '".join(enum))
elif enum:
constraint = """"%s" = '%s'""" % (self.fieldname, list(enum)[0])
else:
constraint = None
if constraint:
enum = list(enum)
enum.sort()
key = md5()
key.update(str(enum).encode('utf-8'))
name = self.fieldname + '_' + key.hexdigest() + '_types'
return [CheckConstraint(constraint, name=name)]
return []
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key):
sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key)
if sgdb_in(registry.engine, ['MySQL', 'MariaDB']):
sqlalchemy_type.impl = types.String(max(self.size, 64))
return sqlalchemy_type
"""
Added *process_result_value* at the class *JSON*, because
this method is necessary for encrypt the column
"""
types.JSON.process_result_value = lambda self, value, dialect: value
[docs]class Json(Column):
"""JSON column
::
from anyblok.declarations import Declarations
from anyblok.column import Json
@Declarations.register(Declarations.Model)
class Test:
x = Json()
"""
sqlalchemy_type = types.JSON(none_as_null=True)
[docs] def native_type(self, registry):
""" Return the native SqlAlchemy type """
if sgdb_in(registry.engine, ['MariaDB', 'MsSQL']):
return JSONType
return self.sqlalchemy_type
def setter_format_value(self, value):
if self.encrypt_key:
value = dumps(value)
return value
def getter_format_value(self, value):
if value is None:
return None
if self.encrypt_key:
value = loads(value)
return value
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key):
sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key)
if sgdb_in(registry.engine, ['MySQL', 'MariaDB']):
sqlalchemy_type.impl = types.Text()
return sqlalchemy_type
[docs]class LargeBinary(Column):
"""Large binary column
::
from os import urandom
from anyblok.declarations import Declarations
from anyblok.column import LargeBinary
blob = urandom(100000)
@Declarations.register(Declarations.Model)
class Test:
x = LargeBinary(default=blob)
"""
sqlalchemy_type = types.LargeBinary
[docs] def native_type(self, registry):
if self.encrypt_key:
return types.Text
return self.sqlalchemy_type
def setter_format_value(self, value):
if self.encrypt_key:
value = b64encode(value).decode('utf-8')
return value
def getter_format_value(self, value):
if self.encrypt_key:
value = b64decode(value.encode('utf-8'))
return value
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key):
sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key)
if sgdb_in(registry.engine, ['MySQL', 'MariaDB']):
sqlalchemy_type.impl = types.Text()
return sqlalchemy_type
class Sequence(String):
"""Sequence column
::
from anyblok.column import Sequence
@Declarations.register(Declarations.Model)
class Test:
x = Sequence()
"""
def __init__(self, *args, **kwargs):
if 'foreign_key' in kwargs:
raise FieldException("Sequence column can not define a foreign key"
" %r" % kwargs['foreign_key'])
if 'default' in kwargs:
raise FieldException("Sequence column can not define a default "
"value")
kwargs['default'] = ColumnDefaultValue(self.wrap_default)
self.code = kwargs.pop('code') if 'code' in kwargs else None
self.start = kwargs.pop('start', 1)
self.formater = kwargs.pop(
'formater') if 'formater' in kwargs else None
super(Sequence, self).__init__(*args, **kwargs)
def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(Sequence, self).autodoc_get_properties()
res['formater'] = self.formater
return res
def wrap_default(self, registry, namespace, fieldname, properties):
"""Return default wrapper
:param registry: the current registry
:param namespace: the namespace of the model
:param fieldname: the fieldname of the model
:return:
"""
if not hasattr(
registry, '_need_sequence_to_create_if_not_exist'
): # pragma: no cover
registry._need_sequence_to_create_if_not_exist = []
elif registry._need_sequence_to_create_if_not_exist is None:
registry._need_sequence_to_create_if_not_exist = []
code = self.code if self.code else "%s=>%s" % (namespace, fieldname)
registry._need_sequence_to_create_if_not_exist.append(
{'code': code, 'formater': self.formater, 'start': self.start})
def default_value():
"""Return next sequence value
:return:
"""
return registry.System.Sequence.nextvalBy(code=code)
return default_value
[docs]class Color(Column):
"""Color column.
`See colour package on pypi <https://pypi.python.org/pypi/colour/>`_
::
from anyblok.declarations import Declarations
from anyblok.column import Color
@Declarations.register(Declarations.Model)
class Test:
x = Color(default='green')
"""
def __init__(self, *args, **kwargs):
self.max_length = max_length = kwargs.pop('size', 20)
kwargs.pop('type_', None)
self.sqlalchemy_type = ColorType(max_length)
super(Color, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(Color, self).autodoc_get_properties()
res['size'] = self.max_length
return res
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key):
sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key)
if sgdb_in(registry.engine, ['MySQL', 'MariaDB']):
sqlalchemy_type.impl = types.String(max(self.max_length, 64))
return sqlalchemy_type
class UUID(Column):
"""UUID column
::
from anyblok.column import UUID
@Declarations.register(Declarations.Model)
class Test:
x = UUID()
"""
def __init__(self, *args, **kwargs):
uuid_kwargs = {}
self.binary = None
self.native = None
for kwarg in ('binary', 'native'):
if kwarg in kwargs:
uuid_kwargs[kwarg] = kwargs.pop(kwarg)
setattr(self, kwarg, uuid_kwargs[kwarg])
self.sqlalchemy_type = UUIDType(**uuid_kwargs)
super(UUID, self).__init__(*args, **kwargs)
def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(UUID, self).autodoc_get_properties()
res['binary'] = self.binary
res['native'] = self.native
return res
class URL(Column):
"""URL column
::
from anyblok.declarations import Declarations
from anyblok.column import URL
@Declarations.register(Declarations.Model)
class Test:
x = URL(default='doc.anyblok.org')
"""
sqlalchemy_type = URLType
def setter_format_value(self, value):
"""Return formatted url value
:param value:
:return:
"""
from furl import furl
if value is not None:
if isinstance(value, str):
value = furl(value)
return value
[docs]class PhoneNumber(Column):
"""PhoneNumber column
::
from anyblok.declarations import Declarations
from anyblok.column import PhoneNumber
@Declarations.register(Declarations.Model)
class Test:
x = PhoneNumber(default='+120012301')
.. note:: ``phonenumbers`` >= **8.9.5** distribution is required
"""
def __init__(self, region='FR', max_length=20, *args, **kwargs):
self.region = region
self.max_length = max_length
kwargs.pop('type_', None)
self.sqlalchemy_type = PhoneNumberType(
region=region, max_length=max_length)
super(PhoneNumber, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(PhoneNumber, self).autodoc_get_properties()
res['region'] = self.region
res['max_length'] = self.max_length
return res
def get_encrypt_key_type(self, registry, sqlalchemy_type, encrypt_key):
sqlalchemy_type = StringEncryptedType(sqlalchemy_type, encrypt_key)
if sgdb_in(registry.engine, ['MySQL', 'MariaDB']):
sqlalchemy_type.impl = types.String(max(self.max_length, 64))
return sqlalchemy_type
"""
Added *process_result_value* at the class *EmailType*, because
this method is necessary for encrypt the column
"""
EmailType.process_result_value = lambda self, value, dialect: value
[docs]class Email(Column):
"""Email column
::
from anyblok.column import Email
@Declarations.register(Declarations.Model)
class Test:
x = Email()
"""
sqlalchemy_type = EmailType
class CountryType(types.TypeDecorator, ScalarCoercible):
"""Generic type for Column Country """
impl = types.Unicode(3)
python_type = python_pycountry_type
def process_bind_param(self, value, dialect):
if value and isinstance(value, self.python_type):
return value.alpha_3
return value
def process_result_value(self, value, dialect):
return self._coerce(value)
def _coerce(self, value):
if value is not None and not isinstance(value, self.python_type):
return pycountry.countries.get(alpha_3=value)
return value # pragma: no cover
[docs]class Country(Column):
"""Country column.
::
from anyblok.declarations import Declarations
from anyblok.column import Country
from pycountry import countries
@Declarations.register(Declarations.Model)
class Test:
x = Country(default=countries.get(alpha_2='FR'))
"""
sqlalchemy_type = CountryType
def __init__(self, mode='alpha_2', *args, **kwargs):
self.mode = mode
if pycountry is None:
raise FieldException( # pragma: no cover
"'pycountry' package is required for use 'CountryType'")
self.choices = {getattr(country, mode): country.name
for country in pycountry.countries}
super(Country, self).__init__(*args, **kwargs)
[docs] def autodoc_get_properties(self):
"""Return properties for autodoc
:return: autodoc properties
"""
res = super(Country, self).autodoc_get_properties()
res['mode'] = self.mode
res['choices'] = self.choices
return res
[docs] def update_properties(self, registry, namespace, fieldname, properties):
"""Update column properties
:param registry: the current registry
:param namespace: the namespace of the model
:param fieldname: the fieldname of the model
:param properties: the properties of the model
"""
super(Country, self).update_properties(registry, namespace,
fieldname, properties)
self.fieldname = fieldname
properties['add_in_table_args'].append(self)
[docs] def update_table_args(self, registry, Model):
"""Return check constraints to limit the value
:param registry:
:param Model:
:return: list of checkConstraint
"""
if self.encrypt_key:
# dont add constraint because the state is crypted and nobody
# can add new entry
return []
if sgdb_in(registry.engine, ['MariaDB', 'MsSQL']):
# No Check constraint in MariaDB
return []
enum = [country.alpha_3 for country in pycountry.countries]
constraint = """"%s" in ('%s')""" % (self.fieldname, "', '".join(enum))
enum.sort()
key = md5()
key.update(str(enum).encode('utf-8'))
name = self.fieldname + '_' + key.hexdigest() + '_types'
return [CheckConstraint(constraint, name=name)]
@CompareType.add_comparator(String, String)
@CompareType.add_comparator(String, Selection)
@CompareType.add_comparator(String, Sequence)
def compare_strings(col1, type1, col2, type2):
if type1.size != type2.size:
raise FieldException(
"You can't add a foreign key using based String columns with "
"different size `{model1!s}.{col1!s}` pointing to "
"`{model2!s}.{col2!s}` have different sizes {type1!r}({size1:d}) "
"-> ({type2!r}){size2:d}".format(
model1=col1.model_name,
col1=col1.attribute_name,
model2=col2.model_name,
col2=col2.attribute_name,
type1=type1.__class__,
type2=type2.__class__,
size1=type1.size,
size2=type2.size
)
)
@CompareType.add_comparator(String, Color)
def compare_string_to_color(col1, type1, col2, type2):
if type1.size != type2.max_length:
raise FieldException(
"You can't add a foreign key using based String columns with "
"different size `{model1!s}.{col1!s}` pointing to "
"`{model2!s}.{col2!s}` have different sizes {type1!r}({size1:d}) "
"-> ({type2!r}){size2:d}".format(
model1=col1.model_name,
col1=col1.attribute_name,
model2=col2.model_name,
col2=col2.attribute_name,
type1=type1.__class__,
type2=type2.__class__,
size1=type1.size,
size2=type2.max_length
)
)