Source code for iota2.configuration_files.sections.cfg_utils

# !/usr/bin/env python3

# =========================================================================
#   Program:   iota2
#
#   Copyright (c) CESBIO. All rights reserved.
#
#   See LICENSE for details.
#
#   This software is distributed WITHOUT ANY WARRANTY; without even
#   the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
#   PURPOSE.  See the above copyright notices for more information.
#
# =========================================================================
"""Definition of classes for configuration parameters"""

import warnings
from collections.abc import Generator
from dataclasses import dataclass
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Extra, root_validator
from pydantic.fields import ModelField

import iota2.common.i2_constants as i2_const

I2_CONST = i2_const.Iota2Constants()


[docs]@dataclass class TermColors: """Definition of colors for terminal output""" HEADER = "\033[95m" OKBLUE = "\033[94m" OKCYAN = "\033[96m" OKGREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m"
[docs]class ConfigError(Exception): """Throws exceptions related to i2 config file."""
[docs] def __init__(self, msg: str, blinking: bool = False) -> None: self.msg = msg pref_blink = "\033[5m" if blinking else "" super().__init__( f"{pref_blink}" f"{TermColors.FAIL} " f"iota2 configuration file error : {msg}{TermColors.ENDC}" )
[docs]class ConfigParamWarning(Warning): """Print warnings."""
[docs] def __init__(self, msg: str) -> None: self.msg = msg super().__init__( f"{TermColors.OKBLUE} iota2 configuration file warning : " f"'{msg}'{TermColors.ENDC}" )
[docs]class ConfigNotRecognisedParamWarning(Warning): """Print warnings."""
[docs] def __init__(self, msg: str) -> None: self.msg = msg super().__init__( f"{TermColors.OKBLUE} iota2 configuration file warning : " f"'{msg}' parameter not recognised{TermColors.ENDC}" )
[docs]class FileParameter(str): """https://field-idempotency--pydantic-docs.netlify.app/usage/types/#custom-data-types""" @classmethod def __get_validators__(cls) -> Generator: yield cls.validate
[docs] @classmethod def validate(cls, value: Any, field: ModelField) -> str: """Check if the value is a string and if the file exists.""" if not isinstance(value, str): raise ConfigError( f"The parameter '{field.name}' must be a string, " "type detected : {type(value)}" ) if not Path(value).exists(): raise ConfigError( f"The parameter '{field.name}' get the file : '{value}' " "which does not exists" ) return value
[docs]class PathParameter(str): """https://field-idempotency--pydantic-docs.netlify.app/usage/types/#custom-data-types""" @classmethod def __get_validators__(cls) -> Generator: yield cls.validate
[docs] @classmethod def validate(cls, value: Any, field: ModelField) -> str: """Check if the value is a string and if the path exists.""" if not isinstance(value, str): raise ConfigError( f"The parameter '{field.name}' must be a string, " "type detected : {type(value)}" ) if not Path(value).exists(): raise ConfigError( f"The parameter '{field.name}' get the path : '{value}' " "which does not exists" ) return value
[docs]class Iota2ParamSection(BaseModel): """ Iota2 parameter section base class. It ensures: - unrecognized fields (print a warning) - deactivate non-mandatory parameters (regarding builders) """
[docs] class Config: # pylint: disable=missing-class-docstring, too-few-public-methods extra = Extra.allow
@root_validator(pre=True) @classmethod def unrecognized_fields( cls, values: dict # pylint: disable=used-before-assignment ) -> dict: """ Check if the user supplied parameter is known by iota2. If it is not, a warning is displayed. Note ---- inspired from https://stackoverflow.com/questions/69617489/can-i-get-incoming-extra-fields-from-pydantic """ all_required_field_names = { field.alias for field in cls.__fields__.values() if field.alias != "extra" } for field_name in list(values): if field_name not in all_required_field_names: warnings.warn(field_name, ConfigNotRecognisedParamWarning) return values @root_validator(pre=True) @classmethod def deactivate_fields(cls, values: dict) -> dict: """Deactivate non-mandatory parameters (regarding builders).""" current_builders = cls.schema()["properties"][I2_CONST.i2_builders_section][ "default" ] avail_fields = list(cls.__fields__.keys()) for field in avail_fields: mandatory_on_builders = cls.schema()["properties"][field].get( "mandatory_on_builders", {} ) if mandatory_on_builders: buff = [] for current_builder in current_builders: buff.append(current_builder in mandatory_on_builders) if not any(buff): # deactivate cls.__fields__.get(field).required = False else: cls.__fields__.get(field).required = True return values
[docs] def dict(self, **kwargs: set) -> dict: """Hide the builders section.""" hidden_fields: set[str] = { attribute_name for attribute_name, model_field in self.__fields__.items() if attribute_name == I2_CONST.i2_builders_section } kwargs.setdefault("exclude", hidden_fields) kwargs["exclude"] = hidden_fields out: dict = super().dict(**kwargs) return out
[docs] @classmethod def add_fields(cls, **field_definitions: Any) -> None: """Add fields to an existing BaseModel. Tribute to https://github.com/samuelcolvin/pydantic/issues/1937 Note ---- If the field already exists, it will be overwritten """ new_fields: dict[str, ModelField] = {} new_annotations: dict[str, type | None] = {} for f_name, f_def in field_definitions.items(): if isinstance(f_def, tuple): try: f_annotation, f_value = f_def except ValueError as err: raise Exception( "field definitions should either be " "a tuple of (<type>, <default>) or just a " "default value, unfortunately this means tuples as " "default values are not allowed" ) from err else: f_annotation, f_value = None, f_def if f_annotation: new_annotations[f_name] = f_annotation new_fields[f_name] = ModelField.infer( name=f_name, value=f_value, annotation=f_annotation, class_validators=None, config=cls.__config__, ) # remove before update for field_name, _ in new_fields.items(): cls.__fields__.pop(field_name, None) for annotation, _ in new_annotations.items(): cls.__annotations__.pop(annotation, None) # pylint: disable=no-member cls.__fields__.update(new_fields) cls.__annotations__.update(new_annotations) # pylint: disable=no-member for _, field_meta in new_fields.items(): cls.schema()["properties"][I2_CONST.i2_builders_section][ "default" ] = field_meta.default