Source code for PrognosAIs.Pipeline

import argparse
import copy
import os
import shutil
import sys
import warnings

import PrognosAIs.IO.utils as IO_utils

from PrognosAIs.IO import ConfigLoader
from PrognosAIs.IO import Configs
from PrognosAIs.Model import Evaluators
from PrognosAIs.Model import Trainer
from PrognosAIs.Preprocessing import Preprocessors
from slurmpie import slurmpie


[docs]class Pipeline(object): def __init__( self, config_file: str, preprocess: bool = True, train: bool = True, evaluate: bool = True, samples_folder: str = None, ): self.path = os.path.dirname(os.path.realpath(__file__)) self.config_file = config_file self.config = ConfigLoader.ConfigLoader(config_file) self.input_folder = self.config.get_input_folder() self.output_folder = os.path.join( self.config.get_output_folder(), self.config.get_specific_output_folder() ) if os.path.exists(self.output_folder): temp_i = 0 original_folder = copy.deepcopy(self.output_folder) while os.path.exists(self.output_folder): self.output_folder = original_folder + "_" + str(temp_i) temp_i += 1 IO_utils.create_directory(self.output_folder) self.config_file = self.config.copy_config(self.output_folder) # Paths to the python scripts we need self.preprocessor = os.path.join(self.path, "Preprocessing", "Preprocessors.py") self.trainer = os.path.join(self.path, "Model", "Trainer.py") self.evaluator = os.path.join(self.path, "Model", "Evaluators.py") self.preprocess = preprocess self.train = train self.evaluate = evaluate self.preprocessing_config = self.config.get_preprocessings_settings() if "saving" in self.preprocessing_config: saving_config = Configs.saving_config(self.preprocessing_config["saving"]) print(saving_config) else: saving_config = Configs.saving_config(None) if samples_folder is None: self.samples_folder = os.path.join(self.output_folder, saving_config.out_dir_name) else: self.samples_folder = samples_folder
[docs] def start_local_pipeline(self): if self.preprocess: preprocessor = Preprocessors.BatchPreprocessor( self.input_folder, self.output_folder, self.preprocessing_config ) self.samples_folder = preprocessor.start() if self.train: trainer = Trainer.Trainer(self.config, self.samples_folder, self.output_folder) self.model_file = trainer.train_model() if self.train and self.evaluate: evaluator = Evaluators.Evaluator( self.model_file, self.samples_folder, self.config_file, self.output_folder, ) evaluator.evaluate() elif self.evaluate and not self.train: warnings.warn( "Cannot evaluate if no model is trained, not doing evaluation", SyntaxWarning )
[docs] def start_slurm_pipeline( self, preprocess_job: slurmpie.Job, train_job: slurmpie.Job, evaluate_job: slurmpie.Job ): save_name = self.config.get_save_name() self.model_file = os.path.join(self.output_folder, "MODEL", save_name + ".hdf5") preprocessing_command = """python -u {command} --config={config_file} --input={input_dir} --output={output_dir}""".format( command=self.preprocessor, config_file=self.config_file, input_dir=self.input_folder, output_dir=self.output_folder, ) training_command = "srun python {command} --config={config_file} --input={input_dir} --output={output_dir} --savename={savename}".format( command=self.trainer, config_file=self.config_file, input_dir=self.samples_folder, output_dir=self.output_folder, savename=save_name, ) evaluation_command = "python -u {command} --config={config_file} --input={input_dir} --output={output_dir} --model={model_file}".format( command=self.evaluator, config_file=self.config_file, input_dir=self.samples_folder, output_dir=self.output_folder, model_file=self.model_file, ) # Here we make the actual jobs prognosAIs_pipeline = slurmpie.Pipeline() preprocess_job.script_is_file = False if preprocess_job.script is not None: preprocess_job.script += preprocessing_command else: preprocess_job.script = preprocessing_command train_job.script_is_file = False if train_job.script is not None: train_job.script += training_command else: train_job.script = training_command evaluate_job.script_is_file = False if evaluate_job.script is not None: evaluate_job.script += evaluation_command else: evaluate_job.script = evaluation_command if self.preprocess: prognosAIs_pipeline.add(preprocess_job) if self.preprocess and self.train: prognosAIs_pipeline.add({"afterok": [train_job]}, parent_job=preprocess_job) elif self.train: prognosAIs_pipeline.add(train_job) if self.evaluate: if self.train: prognosAIs_pipeline.add({"afterok": [evaluate_job]}, parent_job=train_job) elif self.preprocess: prognosAIs_pipeline.add({"afterok": [evaluate_job]}, parent_job=preprocess_job) else: prognosAIs_pipeline.add(evaluate_job) prognosAIs_pipeline.submit() print("Submitted the PrognosAIs pipeline!")
[docs] @classmethod def init_from_sys_args(cls, args_in): parser = argparse.ArgumentParser(description="Start a PrognosAIs pipeline") parser.add_argument( "-c", "--config", required=True, help="The location of the PrognosAIs config file", metavar="configuration file", dest="config", type=str, ) parser.add_argument( "-p", "--preprocess", required=False, help="Whether the pipeline should include preprocessing", dest="preprocess", action="store_true", ) parser.add_argument( "-t", "--train", required=False, help="Whether the pipeline should include training", dest="train", action="store_true", ) parser.add_argument( "-e", "--evaluate", required=False, help="Whether the pipeline should include evaluation", dest="evaluate", action="store_true", ) args = parser.parse_args(args_in) if not args.preprocess and not args.train and not args.evaluate: args.preprocess = True args.train = True args.evaluate = True pipeline = cls(args.config, args.preprocess, args.train, args.evaluate) return pipeline
if __name__ == "__main__": pipeline = Pipeline.init_from_sys_args(sys.argv[1:]) pipeline.start_local_pipeline()