Source code for iota2.task_launcher

#!/usr/bin/env python3
# =========================================================================
#   Program:   iota2
#
#   Copyright (c) CESBIO. All rights reserved.
#
#   See LICENSE for details.
#
#   This software is distributed WITHOUT ANY WARRANTY; without even
#   the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
#   PURPOSE.  See the above copyright notices for more information.
#
# =========================================================================
"""Task launcher module"""
import argparse
import logging
import os
import sys

import dill

LOGGER = logging.getLogger("distributed.worker")


[docs]def set_working_dir_parameter(t_kwargs: dict, worker_working_dir: str) -> dict: """Add working directory to given parameters""" new_t_kwargs = t_kwargs.copy() working_dir_names = [ "working_directory", "pathWd", "working_directory", "working_dir", "path_wd", "WORKING_DIR", ] for working_dir_name in working_dir_names: if working_dir_name in new_t_kwargs: new_t_kwargs[working_dir_name] = worker_working_dir return new_t_kwargs
[docs]def task_launcher(func_pkl: str) -> None: """function dedicated to undill function and its kwargs then launch it useful to travel through cluster Parameters ---------- func_pkl : path to a function serialized thanks to dill func_kw_pkl : path to a dictionary serialized thanks to dill which represents function kwargs """ with open(func_pkl, "rb") as func_file: func, func_kwargs, logger_lvl = dill.load(func_file) worker_logger = logging.getLogger("distributed.worker") worker_logger.setLevel(logger_lvl) if (tmpdir := os.environ.get("TMPDIR")) is not None: f_kwargs = set_working_dir_parameter(func_kwargs, tmpdir) else: f_kwargs = func_kwargs func(**f_kwargs)
[docs]def main() -> int: """Main function to start task launcher from a terminal""" parser = argparse.ArgumentParser(description="launch function thanks to kwargs") parser.add_argument( "-dill_file", dest="dill_file", help="function file pickled", required=True ) args = parser.parse_args() task_launcher(args.dill_file) return 0
if __name__ == "__main__": sys.exit(main())