# !/usr/bin/env python3
# =========================================================================
# Program: iota2
#
# Copyright (c) CESBIO. All rights reserved.
#
# See LICENSE for details.
#
# This software is distributed WITHOUT ANY WARRANTY; without even
# the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the above copyright notices for more information.
#
# =========================================================================
"""Definition of classes for configuration parameters"""
import warnings
from collections.abc import Generator
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Extra, root_validator
from pydantic.fields import ModelField
import iota2.common.i2_constants as i2_const
I2_CONST = i2_const.Iota2Constants()
[docs]@dataclass
class TermColors:
"""Definition of colors for terminal output"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
[docs]class ConfigError(Exception):
"""Throws exceptions related to i2 config file."""
[docs] def __init__(self, msg: str, blinking: bool = False) -> None:
self.msg = msg
pref_blink = "\033[5m" if blinking else ""
super().__init__(
f"{pref_blink}"
f"{TermColors.FAIL} "
f"iota2 configuration file error : {msg}{TermColors.ENDC}"
)
[docs]class ConfigParamWarning(Warning):
"""Print warnings."""
[docs] def __init__(self, msg: str) -> None:
self.msg = msg
super().__init__(
f"{TermColors.OKBLUE} iota2 configuration file warning : "
f"'{msg}'{TermColors.ENDC}"
)
[docs]class ConfigNotRecognisedParamWarning(Warning):
"""Print warnings."""
[docs] def __init__(self, msg: str) -> None:
self.msg = msg
super().__init__(
f"{TermColors.OKBLUE} iota2 configuration file warning : "
f"'{msg}' parameter not recognised{TermColors.ENDC}"
)
[docs]class FileParameter(str):
"""https://field-idempotency--pydantic-docs.netlify.app/usage/types/#custom-data-types"""
@classmethod
def __get_validators__(cls) -> Generator:
yield cls.validate
[docs] @classmethod
def validate(cls, value: Any, field: ModelField) -> str:
"""Check if the value is a string and if the file exists."""
if not isinstance(value, str):
raise ConfigError(
f"The parameter '{field.name}' must be a string, "
"type detected : {type(value)}"
)
if not Path(value).exists():
raise ConfigError(
f"The parameter '{field.name}' get the file : '{value}' "
"which does not exists"
)
return value
[docs]class PathParameter(str):
"""https://field-idempotency--pydantic-docs.netlify.app/usage/types/#custom-data-types"""
@classmethod
def __get_validators__(cls) -> Generator:
yield cls.validate
[docs] @classmethod
def validate(cls, value: Any, field: ModelField) -> str:
"""Check if the value is a string and if the path exists."""
if not isinstance(value, str):
raise ConfigError(
f"The parameter '{field.name}' must be a string, "
"type detected : {type(value)}"
)
if not Path(value).exists():
raise ConfigError(
f"The parameter '{field.name}' get the path : '{value}' "
"which does not exists"
)
return value
[docs]class Iota2ParamSection(BaseModel):
"""
Iota2 parameter section base class.
It ensures:
- unrecognized fields (print a warning)
- deactivate non-mandatory parameters (regarding builders)
"""
[docs] class Config: # pylint: disable=missing-class-docstring, too-few-public-methods
extra = Extra.allow
@root_validator(pre=True)
@classmethod
def unrecognized_fields(
cls, values: dict # pylint: disable=used-before-assignment
) -> dict:
"""
Check if the user supplied parameter is known by iota2.
If it is not, a warning is displayed.
Note
----
inspired from
https://stackoverflow.com/questions/69617489/can-i-get-incoming-extra-fields-from-pydantic
"""
all_required_field_names = {
field.alias for field in cls.__fields__.values() if field.alias != "extra"
}
for field_name in list(values):
if field_name not in all_required_field_names:
warnings.warn(field_name, ConfigNotRecognisedParamWarning)
return values
@root_validator(pre=True)
@classmethod
def deactivate_fields(cls, values: dict) -> dict:
"""Deactivate non-mandatory parameters (regarding builders)."""
current_builders = cls.schema()["properties"][I2_CONST.i2_builders_section][
"default"
]
avail_fields = list(cls.__fields__.keys())
for field in avail_fields:
mandatory_on_builders = cls.schema()["properties"][field].get(
"mandatory_on_builders", {}
)
if mandatory_on_builders:
buff = []
for current_builder in current_builders:
buff.append(current_builder in mandatory_on_builders)
if not any(buff):
# deactivate
cls.__fields__.get(field).required = False
else:
cls.__fields__.get(field).required = True
return values
[docs] def dict(self, **kwargs: set) -> dict:
"""Hide the builders section."""
hidden_fields: set[str] = {
attribute_name
for attribute_name, model_field in self.__fields__.items()
if attribute_name == I2_CONST.i2_builders_section
}
kwargs.setdefault("exclude", hidden_fields)
kwargs["exclude"] = hidden_fields
out: dict = super().dict(**kwargs)
return out
[docs] @classmethod
def add_fields(cls, **field_definitions: Any) -> None:
"""Add fields to an existing BaseModel.
Tribute to https://github.com/samuelcolvin/pydantic/issues/1937
Note
----
If the field already exists, it will be overwritten
"""
new_fields: dict[str, ModelField] = {}
new_annotations: dict[str, type | None] = {}
for f_name, f_def in field_definitions.items():
if isinstance(f_def, tuple):
try:
f_annotation, f_value = f_def
except ValueError as err:
raise Exception(
"field definitions should either be "
"a tuple of (<type>, <default>) or just a "
"default value, unfortunately this means tuples as "
"default values are not allowed"
) from err
else:
f_annotation, f_value = None, f_def
if f_annotation:
new_annotations[f_name] = f_annotation
new_fields[f_name] = ModelField.infer(
name=f_name,
value=f_value,
annotation=f_annotation,
class_validators=None,
config=cls.__config__,
)
# remove before update
for field_name, _ in new_fields.items():
cls.__fields__.pop(field_name, None)
for annotation, _ in new_annotations.items():
cls.__annotations__.pop(annotation, None) # pylint: disable=no-member
cls.__fields__.update(new_fields)
cls.__annotations__.update(new_annotations) # pylint: disable=no-member
for _, field_meta in new_fields.items():
cls.schema()["properties"][I2_CONST.i2_builders_section][
"default"
] = field_meta.default