From fb91f203a8461271f295f99bc04cda8ab7f72430 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Tue, 5 Sep 2023 19:04:16 +0200 Subject: [PATCH 01/19] Tests to skip module (MCCD PSF) if output file exists --- shapepipe/modules/mccd_fit_val_runner.py | 16 ++++++++++++---- shapepipe/modules/ngmix_package/ngmix.py | 2 ++ shapepipe/pipeline/file_handler.py | 6 ++++-- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/shapepipe/modules/mccd_fit_val_runner.py b/shapepipe/modules/mccd_fit_val_runner.py index cb959e31c..f88a526f0 100644 --- a/shapepipe/modules/mccd_fit_val_runner.py +++ b/shapepipe/modules/mccd_fit_val_runner.py @@ -6,6 +6,8 @@ """ +import os + import mccd from shapepipe.modules.mccd_package import shapepipe_auxiliary_mccd as aux_mccd @@ -50,6 +52,16 @@ def mccd_fit_val_runner( if mccd_mode == 'FIT_VALIDATION': + # Fitted model is found in the output directory + mccd_model_path = output_dir + fit_saving_name + file_number_string \ + + '.npy' + + if os.path.exists(mccd_model_path): + w_log.info( + f"output file {mccd_model_path} already exists, skipping" + ) + return None, None + aux_mccd.mccd_fit_pipeline( trainstar_path=trainstar_path, file_number_string=file_number_string, @@ -60,10 +72,6 @@ def mccd_fit_val_runner( w_log=w_log, ) - # Fitted model is found in the output directory - mccd_model_path = output_dir + fit_saving_name + file_number_string \ - + '.npy' - aux_mccd.mccd_validation_pipeline( teststar_path=teststar_path, mccd_model_path=mccd_model_path, diff --git a/shapepipe/modules/ngmix_package/ngmix.py b/shapepipe/modules/ngmix_package/ngmix.py index 7cb614a42..70ad45ae1 100644 --- a/shapepipe/modules/ngmix_package/ngmix.py +++ b/shapepipe/modules/ngmix_package/ngmix.py @@ -809,6 +809,8 @@ def do_ngmix_metacal( psf_obs = Observation(psfs[n_e], jacobian=psf_jacob) + ## MKDEBUG TODO usg sigma_to_... + ## psf_T = psfs_sigma[n_e] * 1.17741 * pixel_scale weight_map = np.copy(weights[n_e]) diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index b7479f6f3..821da6da0 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -93,7 +93,7 @@ def run_dir(self): @run_dir.setter def run_dir(self, value): - self._run_dir = self.check_dir(value, check_exists=True) + self._run_dir = self.check_dir(value, check_exists=False) @property def _input_dir(self): @@ -188,7 +188,7 @@ def mkdir(cls, dir_name): Directory name with full path """ - cls.check_dir(dir_name, check_exists=True) + cls.check_dir(dir_name, check_exists=False) mkdir(dir_name) @staticmethod @@ -376,6 +376,7 @@ def create_global_run_dirs(self): This method creates the pipeline output directories for a given run. """ + # MKDEBUG: Error occured here self.run_dir = self.setpath(self._output_dir, self._run_name) self._log_dir = self.setpath(self.run_dir, 'logs') self._tmp_dir = self.setpath(self.run_dir, 'tmp') @@ -386,6 +387,7 @@ def create_global_run_dirs(self): self.run_dir, ) + # MKDBUG: Error occured here self.mkdir(self.run_dir) self.mkdir(self._log_dir) self.mkdir(self._tmp_dir) From a93546e5f3f47fe3530746a8da0d54badc5221bd Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Sun, 17 Sep 2023 15:46:27 +0200 Subject: [PATCH 02/19] n_smp for more jobs; N_EPOCH bugs; ngmix checking for existing output --- example/cfis/config_tile_MiViSmVi.ini | 2 +- scripts/sh/job_sp.bash | 4 +-- .../mccd_package/mccd_interpolation_script.py | 3 +- shapepipe/modules/ngmix_package/ngmix.py | 21 ++++-------- shapepipe/modules/ngmix_runner.py | 33 +++++++++++-------- 5 files changed, 31 insertions(+), 32 deletions(-) diff --git a/example/cfis/config_tile_MiViSmVi.ini b/example/cfis/config_tile_MiViSmVi.ini index 6198c324c..7dcbe6c50 100644 --- a/example/cfis/config_tile_MiViSmVi.ini +++ b/example/cfis/config_tile_MiViSmVi.ini @@ -47,7 +47,7 @@ OUTPUT_DIR = $SP_RUN/output [JOB] # Batch size of parallel processing (optional), default is 1, i.e. run all jobs in serial -SMP_BATCH_SIZE = 12 +SMP_BATCH_SIZE = 1 # Timeout value (optional), default is None, i.e. no timeout limit applied TIMEOUT = 96:00:00 diff --git a/scripts/sh/job_sp.bash b/scripts/sh/job_sp.bash index 15cb1428a..3e0169bbd 100755 --- a/scripts/sh/job_sp.bash +++ b/scripts/sh/job_sp.bash @@ -420,7 +420,7 @@ if [[ $do_job != 0 ]]; then ### Star detection, selection, PSF model. setools can exit with an error for CCD with insufficient stars, ### the script should continue STOP=0 - command_sp "shapepipe_run -c $SP_CONFIG/config_tile_Sx_exp_${psf}.ini" "Run shapepipe (tile detection, exp $psf)" + command_cfg_shapepipe "config_tile_Sx_exp_${psf}.ini" "Run shapepipe (tile detection, exp $psf)" $n_smp STOP=1 fi @@ -432,7 +432,7 @@ if [[ $do_job != 0 ]]; then ### PSF model letter: 'P' (psfex) or 'M' (mccd) letter=${psf:0:1} Letter=${letter^} - command_sp "shapepipe_run -c $SP_CONFIG/config_tile_${Letter}iViSmVi.ini" "Run shapepipe (tile PsfInterp=$Letter}: up to ngmix+galsim)" + command_cfg_shapepipe "config_tile_${Letter}iViSmVi.ini" "Run shapepipe (tile PsfInterp=$Letter}: up to ngmix+galsim)" $n_smp fi diff --git a/shapepipe/modules/mccd_package/mccd_interpolation_script.py b/shapepipe/modules/mccd_package/mccd_interpolation_script.py index 40e644e8b..b9d4564df 100644 --- a/shapepipe/modules/mccd_package/mccd_interpolation_script.py +++ b/shapepipe/modules/mccd_package/mccd_interpolation_script.py @@ -394,8 +394,7 @@ def _interpolate_me(self): cat.open() all_id = np.copy(cat.get_data()['NUMBER']) - key_ne = 'N_EPOCH' - if key_ne not in cat.get_data(): + if key_ne not in cat.get_data().dtype.names: raise KeyError( f'Key {key_ne} not found in input galaxy catalogue, needed for' + ' PSF interpolation to multi-epoch data; run previous module' diff --git a/shapepipe/modules/ngmix_package/ngmix.py b/shapepipe/modules/ngmix_package/ngmix.py index 70ad45ae1..aff023384 100644 --- a/shapepipe/modules/ngmix_package/ngmix.py +++ b/shapepipe/modules/ngmix_package/ngmix.py @@ -30,10 +30,8 @@ class Ngmix(object): ---------- input_file_list : list Input files - output_dir : str - Output directory - file_number_string : str - File numbering scheme + output_path : str + Output file path zero_point : float Photometric zero point pixel_scale : float @@ -59,7 +57,7 @@ class Ngmix(object): def __init__( self, input_file_list, - output_dir, + output_path, file_number_string, zero_point, pixel_scale, @@ -82,8 +80,7 @@ def __init__( self._weight_vignet_path = input_file_list[4] self._flag_vignet_path = input_file_list[5] - self._output_dir = output_dir - self._file_number_string = file_number_string + self._output_path = output_path self._zero_point = zero_point self._pixel_scale = pixel_scale @@ -94,8 +91,8 @@ def __init__( self._w_log = w_log - # Initiatlise random generator - seed = int(''.join(re.findall(r'\d+', self._file_number_string))) + # Initiatlise random generator using image ID number + seed = int(''.join(re.findall(r'\d+', self._output_path))) np.random.seed(seed) self._w_log.info(f'Random generator initialisation seed = {seed}') @@ -314,12 +311,8 @@ def save_results(self, output_dict): Dictionary containing the results """ - output_name = ( - f'{self._output_dir}/ngmix{self._file_number_string}.fits' - ) - f = file_io.FITSCatalogue( - output_name, + self._output_path, open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite ) diff --git a/shapepipe/modules/ngmix_runner.py b/shapepipe/modules/ngmix_runner.py index 432008793..9b7187fee 100644 --- a/shapepipe/modules/ngmix_runner.py +++ b/shapepipe/modules/ngmix_runner.py @@ -52,21 +52,28 @@ def ngmix_runner( id_obj_min = config.getint(module_config_sec, 'ID_OBJ_MIN') id_obj_max = config.getint(module_config_sec, 'ID_OBJ_MAX') - # Initialise class instance - ngmix_inst = Ngmix( - input_file_list, - run_dirs['output'], - file_number_string, - zero_point, - pixel_scale, - f_wcs_path, - w_log, - id_obj_min=id_obj_min, - id_obj_max=id_obj_max, + output_path = ( + f"{run_dirs['output'}/ngmix{file_number_string}.fits" ) + if os.path.exists(output_path): + w_log.info( + f"output file {output_path} already exists, skipping" + ) + else: + # Initialise class instance + ngmix_inst = Ngmix( + input_file_list, + output_path, + zero_point, + pixel_scale, + f_wcs_path, + w_log, + id_obj_min=id_obj_min, + id_obj_max=id_obj_max, + ) - # Process ngmix shape measurement and metacalibration - ngmix_inst.process() + # Process ngmix shape measurement and metacalibration + ngmix_inst.process() # No return objects return None, None From 1ed596c2c9f94cb75b2408f8caef261915fc5b80 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Sun, 24 Sep 2023 18:04:50 +0200 Subject: [PATCH 03/19] numbering scheme with re pattern: copied, not changed --- example/cfis/config_tile_Ng_template.ini | 4 ++- .../mccd_package/mccd_interpolation_script.py | 1 + shapepipe/modules/ngmix_package/ngmix.py | 1 - shapepipe/modules/ngmix_runner.py | 4 ++- shapepipe/pipeline/file_handler.py | 29 ++++++++++++------- 5 files changed, 25 insertions(+), 14 deletions(-) diff --git a/example/cfis/config_tile_Ng_template.ini b/example/cfis/config_tile_Ng_template.ini index f9d733d71..91a8626c4 100644 --- a/example/cfis/config_tile_Ng_template.ini +++ b/example/cfis/config_tile_Ng_template.ini @@ -62,7 +62,9 @@ FILE_PATTERN = sexcat, image_vignet, background_vignet, galaxy_psf, weight_vigne FILE_EXT = .fits, .sqlite, .sqlite, .sqlite, .sqlite, .sqlite # NUMBERING_SCHEME (optional) string with numbering pattern for input files -NUMBERING_SCHEME = -000-000 +#NUMBERING_SCHEME = \d{0}-218-\d{3} +NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}3 +#NUMBERING_SCHEME = -000-000 # Multi-epoch mode: Path to file with single-exposure WCS header information LOG_WCS = $SP_RUN/output/log_exp_headers.sqlite diff --git a/shapepipe/modules/mccd_package/mccd_interpolation_script.py b/shapepipe/modules/mccd_package/mccd_interpolation_script.py index b9d4564df..84ccb02a4 100644 --- a/shapepipe/modules/mccd_package/mccd_interpolation_script.py +++ b/shapepipe/modules/mccd_package/mccd_interpolation_script.py @@ -394,6 +394,7 @@ def _interpolate_me(self): cat.open() all_id = np.copy(cat.get_data()['NUMBER']) + key_ne = 'N_EPOCH' if key_ne not in cat.get_data().dtype.names: raise KeyError( f'Key {key_ne} not found in input galaxy catalogue, needed for' diff --git a/shapepipe/modules/ngmix_package/ngmix.py b/shapepipe/modules/ngmix_package/ngmix.py index aff023384..13ba79e06 100644 --- a/shapepipe/modules/ngmix_package/ngmix.py +++ b/shapepipe/modules/ngmix_package/ngmix.py @@ -58,7 +58,6 @@ def __init__( self, input_file_list, output_path, - file_number_string, zero_point, pixel_scale, f_wcs_path, diff --git a/shapepipe/modules/ngmix_runner.py b/shapepipe/modules/ngmix_runner.py index 9b7187fee..83ef2a670 100644 --- a/shapepipe/modules/ngmix_runner.py +++ b/shapepipe/modules/ngmix_runner.py @@ -6,6 +6,8 @@ """ +import os + from shapepipe.modules.module_decorator import module_runner from shapepipe.modules.ngmix_package.ngmix import Ngmix @@ -53,7 +55,7 @@ def ngmix_runner( id_obj_max = config.getint(module_config_sec, 'ID_OBJ_MAX') output_path = ( - f"{run_dirs['output'}/ngmix{file_number_string}.fits" + f"{run_dirs['output']}/ngmix{file_number_string}.fits" ) if os.path.exists(output_path): w_log.info( diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index 821da6da0..735fd766f 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -807,16 +807,22 @@ def _generate_re_pattern(match_pattern): if not isinstance(match_pattern, str): TypeError('Match pattern must be a string.') - chars = [char for char in match_pattern if not char.isalnum()] - split_pattern = '|'.join(chars).replace('.', r'\.') - chars = [f'\\{char}' for char in chars] + [''] - num_length = [ - f'\\d{{{len(digits)}}}' - for digits in re.split(split_pattern, match_pattern) - ] - re_pattern = r''.join( - [a for b in zip(num_length, chars) for a in b] - ).replace('{1}', '+') + if re.search("\\\\", match_pattern) is not None: + # Regular expression on input: Use pattern + re_pattern = match_pattern + else: + # Generate pattern from input + chars = [char for char in match_pattern if not char.isalnum()] + split_pattern = '|'.join(chars).replace('.', r'\.') + chars = [f'\\{char}' for char in chars] + [''] + digit_list = re.split(split_pattern, match_pattern) + num_length = [ + f'\\d{{{len(digits)}}}' + for digits in re.split(split_pattern, match_pattern) + ] + re_pattern = r''.join( + [a for b in zip(num_length, chars) for a in b] + ).replace('{1}', '+') return re.compile(re_pattern) @@ -972,11 +978,12 @@ def _save_num_patterns( if not found_match: raise RuntimeError( - f'Could not match numbering scheme "{self._numbering_scheme}" ' + f'Could not match numbering scheme "{re_pattern}" ' + f'to any of the input files matching "{pattern}" and ' + f'"{ext}" in the directories {dir_list}.' ) + elem = shared.check_duplicate(final_file_list) if elem != '': raise RuntimeError( From ba4aa19d5d71d3bb8f7f72df247da3a5a420fea9 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Sun, 24 Sep 2023 18:05:19 +0200 Subject: [PATCH 04/19] numbering scheme with re pattern: copied, not changed --- example/cfis/config_tile_Ng_template.ini | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/example/cfis/config_tile_Ng_template.ini b/example/cfis/config_tile_Ng_template.ini index 91a8626c4..1dd3ca8e3 100644 --- a/example/cfis/config_tile_Ng_template.ini +++ b/example/cfis/config_tile_Ng_template.ini @@ -62,9 +62,8 @@ FILE_PATTERN = sexcat, image_vignet, background_vignet, galaxy_psf, weight_vigne FILE_EXT = .fits, .sqlite, .sqlite, .sqlite, .sqlite, .sqlite # NUMBERING_SCHEME (optional) string with numbering pattern for input files -#NUMBERING_SCHEME = \d{0}-218-\d{3} -NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}3 -#NUMBERING_SCHEME = -000-000 +#NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}3 +NUMBERING_SCHEME = -000-000 # Multi-epoch mode: Path to file with single-exposure WCS header information LOG_WCS = $SP_RUN/output/log_exp_headers.sqlite From 776943ccfb2cfdbadcd6299b9e937c7596e64d42 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Sun, 24 Sep 2023 18:42:22 +0200 Subject: [PATCH 05/19] removed galsim from job script message --- scripts/sh/job_sp.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/sh/job_sp.bash b/scripts/sh/job_sp.bash index 3e0169bbd..89692703e 100755 --- a/scripts/sh/job_sp.bash +++ b/scripts/sh/job_sp.bash @@ -464,7 +464,7 @@ if [[ $do_job != 0 ]]; then ### Shapes, run $nsh_jobs parallel processes VERBOSE=0 for k in $(seq 1 $nsh_jobs); do - command_sp "shapepipe_run -c $SP_CONFIG_MOD/config_tile_Ng${k}u.ini" "Run shapepipe (tile: ngmix+galsim $k)" & + command_sp "shapepipe_run -c $SP_CONFIG_MOD/config_tile_Ng${k}u.ini" "Run shapepipe (tile: ngmix $k)" & done wait VERBOSE=1 From 3216eb635d7d95b0f9841c8bdda7aadaf5e4ecf4 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Mon, 16 Oct 2023 15:15:48 +0200 Subject: [PATCH 06/19] Fixed import typo --- example/cfis/config_tile_Ng_template.ini | 6 +++--- shapepipe/modules/make_cat_package/make_cat.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/example/cfis/config_tile_Ng_template.ini b/example/cfis/config_tile_Ng_template.ini index 1dd3ca8e3..7fa91a83a 100644 --- a/example/cfis/config_tile_Ng_template.ini +++ b/example/cfis/config_tile_Ng_template.ini @@ -44,7 +44,7 @@ OUTPUT_DIR = $SP_RUN/output [JOB] # Batch size of parallel processing (optional), default is 1, i.e. run all jobs in serial -SMP_BATCH_SIZE = 1 +SMP_BATCH_SIZE = 8 # Timeout value (optional), default is None, i.e. no timeout limit applied TIMEOUT = 96:00:00 @@ -62,8 +62,8 @@ FILE_PATTERN = sexcat, image_vignet, background_vignet, galaxy_psf, weight_vigne FILE_EXT = .fits, .sqlite, .sqlite, .sqlite, .sqlite, .sqlite # NUMBERING_SCHEME (optional) string with numbering pattern for input files -#NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}3 -NUMBERING_SCHEME = -000-000 +NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}5 +#NUMBERING_SCHEME = -000-000 # Multi-epoch mode: Path to file with single-exposure WCS header information LOG_WCS = $SP_RUN/output/log_exp_headers.sqlite diff --git a/shapepipe/modules/make_cat_package/make_cat.py b/shapepipe/modules/make_cat_package/make_cat.py index 6bbe26648..4e30e3552 100644 --- a/shapepipe/modules/make_cat_package/make_cat.py +++ b/shapepipe/modules/make_cat_package/make_cat.py @@ -16,7 +16,7 @@ from sqlitedict import SqliteDict from shapepipe.pipeline import file_io -from shapepipe.utitities import galaxy +from shapepipe.utilities import galaxy def prepare_final_cat_file(output_path, file_number_string): From eeadfbf0628f67bad1f4a6444648c863edc2a57f Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Wed, 18 Oct 2023 11:26:13 +0200 Subject: [PATCH 07/19] ngmix template --- example/cfis/config_tile_Ng_template.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/cfis/config_tile_Ng_template.ini b/example/cfis/config_tile_Ng_template.ini index 7fa91a83a..fc80fe453 100644 --- a/example/cfis/config_tile_Ng_template.ini +++ b/example/cfis/config_tile_Ng_template.ini @@ -62,7 +62,7 @@ FILE_PATTERN = sexcat, image_vignet, background_vignet, galaxy_psf, weight_vigne FILE_EXT = .fits, .sqlite, .sqlite, .sqlite, .sqlite, .sqlite # NUMBERING_SCHEME (optional) string with numbering pattern for input files -NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}5 +NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}1 #NUMBERING_SCHEME = -000-000 # Multi-epoch mode: Path to file with single-exposure WCS header information From ee90dc1319af5eb1cf90390c64b0dbec86805d03 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Sat, 28 Oct 2023 08:05:18 +0200 Subject: [PATCH 08/19] testing openmpi 5.0.0 on candide --- environment.yml | 2 -- example/cfis/config_tile_Ng_template.ini | 8 +++--- example/config.ini | 2 +- example/output/.gitkeep | 0 example/pbs/candide_mpi.sh | 30 ++++++++++++++++++---- example/pbs/config_mpi.ini | 6 ++--- shapepipe/modules/ngmix_package/ngmix.py | 3 ++- shapepipe/modules/python_example_runner.py | 2 -- shapepipe/pipeline/file_handler.py | 3 +++ 9 files changed, 38 insertions(+), 18 deletions(-) delete mode 100644 example/output/.gitkeep diff --git a/environment.yml b/environment.yml index 9279402ef..83cf5918f 100644 --- a/environment.yml +++ b/environment.yml @@ -17,11 +17,9 @@ dependencies: - pandas==1.4.1 - pip: - cs_util==0.0.5 - - mccd==1.2.3 - modopt==1.6.0 - PyQt5==5.15.6 - pyqtgraph==0.12.4 - - python-pysap==0.0.6 - reproject==0.8 - sip_tpv==1.1 - sf_tools==2.0.4 diff --git a/example/cfis/config_tile_Ng_template.ini b/example/cfis/config_tile_Ng_template.ini index fc80fe453..9b7d5ca69 100644 --- a/example/cfis/config_tile_Ng_template.ini +++ b/example/cfis/config_tile_Ng_template.ini @@ -11,7 +11,7 @@ VERBOSE = True RUN_NAME = run_sp_tile_ngmix_NgXu # Add date and time to RUN_NAME, optional, default: False -RUN_DATETIME = False +RUN_DATETIME = True ## ShapePipe execution options @@ -47,7 +47,7 @@ OUTPUT_DIR = $SP_RUN/output SMP_BATCH_SIZE = 8 # Timeout value (optional), default is None, i.e. no timeout limit applied -TIMEOUT = 96:00:00 +TIMEOUT = 40:00:00 ## Module options @@ -62,8 +62,8 @@ FILE_PATTERN = sexcat, image_vignet, background_vignet, galaxy_psf, weight_vigne FILE_EXT = .fits, .sqlite, .sqlite, .sqlite, .sqlite, .sqlite # NUMBERING_SCHEME (optional) string with numbering pattern for input files -NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}1 -#NUMBERING_SCHEME = -000-000 +#NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}9 +NUMBERING_SCHEME = -000-000 # Multi-epoch mode: Path to file with single-exposure WCS header information LOG_WCS = $SP_RUN/output/log_exp_headers.sqlite diff --git a/example/config.ini b/example/config.ini index 49541f944..51d1479b5 100644 --- a/example/config.ini +++ b/example/config.ini @@ -14,7 +14,7 @@ # MODULE (required) must be a valid module runner name (or a comma separated list of names) MODULE = python_example_runner, serial_example_runner, execute_example_runner, python_example_runner, execute_example_runner # MODE (optional) options are smp or mpi, default is smp -; MODE = mpi +MODE = mpi ## ShapePipe file handling options [FILE] diff --git a/example/output/.gitkeep b/example/output/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/example/pbs/candide_mpi.sh b/example/pbs/candide_mpi.sh index 0abbbb7f4..b32e85783 100644 --- a/example/pbs/candide_mpi.sh +++ b/example/pbs/candide_mpi.sh @@ -18,7 +18,7 @@ #PBS -l walltime=00:05:00 # Request number of cores (e.g. 2 from 2 different machines) -#PBS -l nodes=2:ppn=2 +#PBS -l nodes=4:ppn=2 # Full path to environment export SPENV="$HOME/.conda/envs/shapepipe" @@ -27,14 +27,34 @@ export SPENV="$HOME/.conda/envs/shapepipe" export SPDIR="$HOME/shapepipe" # Load modules -module load intelpython/3 -module load openmpi/4.0.5 +module remove gcc +module load gcc/9.3.0 +module load intelpython/3-2023.1.0 +module load openmpi/5.0.0 # Activate conda environment source activate $SPENV -# Run ShapePipe using full paths to executables -$SPENV/bin/mpiexec --map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/config_mpi.ini +# Other options to test +# -map-by + +if [ -f "$PBS_NODEFILE" ]; then + NSLOTS=`cat $PBS_NODEFILE | wc -l` + echo "Using $NSLOTS CPUs from PBS_NODEFILE $PBS_NODEFILE" +else + NSLOTS=8 + echo "Using $NSLOTS CPUs set by hand" +fi + +# Test: print hostname. +# Only version 5.0.0 downloaded from the web recognised the --mca argument +#/home/mkilbing/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname + +/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname +/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini +#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini + +#/home/mkilbing/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini # Return exit code exit 0 diff --git a/example/pbs/config_mpi.ini b/example/pbs/config_mpi.ini index bb2b8f95d..cd41c9ea3 100644 --- a/example/pbs/config_mpi.ini +++ b/example/pbs/config_mpi.ini @@ -2,7 +2,7 @@ ## ShapePipe execution options [EXECUTION] -MODULE = python_example, serial_example, execute_example +MODULE = python_example_runner, serial_example_runner, execute_example_runner MODE = mpi ## ShapePipe file handling options @@ -15,8 +15,8 @@ OUTPUT_DIR = $SPDIR/example/output TIMEOUT = 00:01:35 ## Module options -[PYTHON_EXAMPLE] +[PYTHON_EXAMPLE_RUNNER] MESSAGE = The obtained value is: -[SERIAL_EXAMPLE] +[SERIAL_EXAMPLE_RUNNER] ADD_INPUT_DIR = $SPDIR/example/data/numbers, $SPDIR/example/data/letters diff --git a/shapepipe/modules/ngmix_package/ngmix.py b/shapepipe/modules/ngmix_package/ngmix.py index 13ba79e06..2a1ed73dc 100644 --- a/shapepipe/modules/ngmix_package/ngmix.py +++ b/shapepipe/modules/ngmix_package/ngmix.py @@ -91,7 +91,8 @@ def __init__( self._w_log = w_log # Initiatlise random generator using image ID number - seed = int(''.join(re.findall(r'\d+', self._output_path))) + #seed = int(''.join(re.findall(r'\d+', self._output_path))) + seed = 6121975 np.random.seed(seed) self._w_log.info(f'Random generator initialisation seed = {seed}') diff --git a/shapepipe/modules/python_example_runner.py b/shapepipe/modules/python_example_runner.py index d468c10e0..fc5565ecc 100644 --- a/shapepipe/modules/python_example_runner.py +++ b/shapepipe/modules/python_example_runner.py @@ -19,10 +19,8 @@ 'astropy', 'galsim', 'joblib', - 'mccd', 'ngmix', 'pandas', - 'pysap', 'scipy', 'sf_tools', 'sip_tpv', diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index 735fd766f..30d37ad97 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -1186,10 +1186,13 @@ def _save_process_list( run_method, ) + print("MKDEBUG save process mmap", self.process_mmap) np.save(self.process_mmap, np.array(process_list)) del process_list + print("MKDEBUG load process mmap", self.process_mmap) self.process_list = np.load(self.process_mmap, mmap_mode='r') + print("MKDEBUG load process mmap done") self.missed = [] From 22c2f56606bd0df66b1b530b5d393ba4824e697c Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Tue, 31 Oct 2023 11:56:14 +0100 Subject: [PATCH 09/19] Testing MPI on candide; errors with process list --- environment.yml | 2 +- example/cfis/config_tile_Ng_template.ini | 2 +- example/pbs/candide_mpi.sh | 11 ++++++--- shapepipe/modules/ngmix_package/ngmix.py | 11 +++++++-- shapepipe/modules/ngmix_runner.py | 3 +++ shapepipe/pipeline/file_handler.py | 29 +++++++++++++++++++++++- shapepipe/pipeline/job_handler.py | 5 ++++ shapepipe/pipeline/mpi_run.py | 11 +++++++++ shapepipe/run.py | 9 +++++--- 9 files changed, 72 insertions(+), 11 deletions(-) diff --git a/environment.yml b/environment.yml index 83cf5918f..2d3317523 100644 --- a/environment.yml +++ b/environment.yml @@ -4,7 +4,7 @@ channels: dependencies: - python=3.9 - pip>=21.2.4 - - numpy==1.21.6 + - numpy==1.22 - astropy==5.0 - automake==1.16.2 - autoconf==2.69 diff --git a/example/cfis/config_tile_Ng_template.ini b/example/cfis/config_tile_Ng_template.ini index 9b7d5ca69..ff5c92c72 100644 --- a/example/cfis/config_tile_Ng_template.ini +++ b/example/cfis/config_tile_Ng_template.ini @@ -11,7 +11,7 @@ VERBOSE = True RUN_NAME = run_sp_tile_ngmix_NgXu # Add date and time to RUN_NAME, optional, default: False -RUN_DATETIME = True +RUN_DATETIME = False ## ShapePipe execution options diff --git a/example/pbs/candide_mpi.sh b/example/pbs/candide_mpi.sh index b32e85783..ee09e1935 100644 --- a/example/pbs/candide_mpi.sh +++ b/example/pbs/candide_mpi.sh @@ -18,7 +18,7 @@ #PBS -l walltime=00:05:00 # Request number of cores (e.g. 2 from 2 different machines) -#PBS -l nodes=4:ppn=2 +#PBS -l nodes=2:ppn=2 # Full path to environment export SPENV="$HOME/.conda/envs/shapepipe" @@ -50,9 +50,14 @@ fi # Only version 5.0.0 downloaded from the web recognised the --mca argument #/home/mkilbing/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname -/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname -/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini +#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by node --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname +#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini +#/home/mkilbing/bin/mpirun -map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini +#/home/mkilbing/bin/mpirun -n $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini +#$SPENV/bin/mpiexec -n $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini #/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini +/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS hostname +/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini #/home/mkilbing/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini diff --git a/shapepipe/modules/ngmix_package/ngmix.py b/shapepipe/modules/ngmix_package/ngmix.py index 2a1ed73dc..7d30eef07 100644 --- a/shapepipe/modules/ngmix_package/ngmix.py +++ b/shapepipe/modules/ngmix_package/ngmix.py @@ -7,6 +7,7 @@ """ import re +import os import galsim import ngmix @@ -91,8 +92,12 @@ def __init__( self._w_log = w_log # Initiatlise random generator using image ID number - #seed = int(''.join(re.findall(r'\d+', self._output_path))) - seed = 6121975 + basename = os.path.basename(self._output_path) + print("MKDEBUG output_path basename ", basename) + print(''.join(re.findall(r'\d+', basename))) + seed = int(''.join(re.findall(r'\d+', basename))) + print(seed) + #seed = 6121975 np.random.seed(seed) self._w_log.info(f'Random generator initialisation seed = {seed}') @@ -311,6 +316,8 @@ def save_results(self, output_dict): Dictionary containing the results """ + if os.path.exists(self._output_path): + raise IOError(f"Output file {self._output_path} already exists") f = file_io.FITSCatalogue( self._output_path, open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite diff --git a/shapepipe/modules/ngmix_runner.py b/shapepipe/modules/ngmix_runner.py index 83ef2a670..9b5cc982d 100644 --- a/shapepipe/modules/ngmix_runner.py +++ b/shapepipe/modules/ngmix_runner.py @@ -63,6 +63,9 @@ def ngmix_runner( ) else: # Initialise class instance + w_log.info( + f"Processing data for output file {output_path}" + ) ngmix_inst = Ngmix( input_file_list, output_path, diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index 30d37ad97..e5289536f 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -18,6 +18,10 @@ from shapepipe.pipeline import shared from shapepipe.utilities.file_system import mkdir +from mpi4py import MPI +import datetime +import time + class FileHandler(object): """File Handler. @@ -930,6 +934,7 @@ def _save_num_patterns( del file_list break + print("MKDEBUG save_num_pattern: path = ", path) if not true_file_list: raise RuntimeError( f'No files found matching "{pattern}" and "{ext}" in the ' @@ -995,6 +1000,11 @@ def _save_num_patterns( ) # Save file list + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + now = datetime.datetime.now() + print(f"MKDEBUG save_num_patterns: save file list {output_file}, rank={rank}, size={size} time={now.time()}") np.save(output_file, np.array(final_file_list)) del true_file_list, final_file_list @@ -1015,7 +1025,21 @@ def _save_match_patterns(output_file, mmap_list): List of memory maps """ - num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list] + #num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list] + num_pattern_list = [] + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + for mmap in mmap_list: + now = datetime.datetime.now() + print(f"MKDEBUG load mmap {mmap}, rank={rank}, size={size} time={now.time()}") + if not os.path.exists(mmap): + n_sec = 5 + print(f"MKDEBUG waiting {n_sec}...") + time.sleep(n_sec) + if not os.path.exists(mmap): + print("MKDEBUG still not found") + num_pattern_list.append(np.load(mmap, mmap_mode="r")) np.save( output_file, @@ -1223,6 +1247,8 @@ def _get_module_input_files(self, module, run_name): num_scheme = self._module_dict[module][run_name]['numbering_scheme'] run_method = self._module_dict[module][run_name]['run_method'] + print("MKDEBUG call _save_process_list, dir_list =", dir_list) + print("MKDEBUG tmp dir = ", self._tmp_dir, os.path.exists(self._tmp_dir)) self._save_process_list( dir_list, pattern_list, @@ -1273,6 +1299,7 @@ def set_up_module(self, module): self._set_module_properties(module, run_name) self._create_module_run_dirs(module, run_name) self._set_module_input_dir(module, run_name) + print("MKDEBUG call _get_module_input_files") self._get_module_input_files(module, run_name) def get_worker_log_name(self, module, file_number_string): diff --git a/shapepipe/pipeline/job_handler.py b/shapepipe/pipeline/job_handler.py index d6baebd89..28d450a15 100644 --- a/shapepipe/pipeline/job_handler.py +++ b/shapepipe/pipeline/job_handler.py @@ -15,6 +15,7 @@ from shapepipe.pipeline.worker_handler import WorkerHandler +from mpi4py import MPI class JobHandler(object): """Job Handler. @@ -78,6 +79,10 @@ def __init__( self._log_job_parameters() # Set up module in file handler + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + print(f"MKDEBUG set_up_module, rank = {rank}, size = {size}") self.filehd.set_up_module(self._module) # Set the total number of processes diff --git a/shapepipe/pipeline/mpi_run.py b/shapepipe/pipeline/mpi_run.py index c92d943c8..2bd88bcb1 100644 --- a/shapepipe/pipeline/mpi_run.py +++ b/shapepipe/pipeline/mpi_run.py @@ -36,6 +36,7 @@ def submit_mpi_jobs( timeout, run_dirs, module_runner, + module_config_sec, worker_log, verbose, ): @@ -51,12 +52,22 @@ def submit_mpi_jobs( w_log_name = worker_log(module_runner.__name__, process[0]) wh = WorkerHandler(verbose=verbose) + print("MKDEBUG in submit_mpi_job") + print("MKDEBUG process[1:] = ", process[1:]) + print("MKDEBUG process[0] = ", process[0]) + print("MKDEBUG w_log_name = ", w_log_name) + print("MKDEBUG run_dirs = ", run_dirs) + print("MKDEBUG config = ", config) + print("MKDEBUG timeout = ", timeout) + print("MKDEBUG module_runner = ", module_runner) + print("MKDEBUG module_config_sec = ", module_config_sec) result.append(wh.worker( process[1:], process[0], w_log_name, run_dirs, config, + module_config_sec, timeout, module_runner )) diff --git a/shapepipe/run.py b/shapepipe/run.py index 2443eab50..7ae36aa10 100644 --- a/shapepipe/run.py +++ b/shapepipe/run.py @@ -405,6 +405,7 @@ def run_mpi(pipe, comm): # Get file handler objects run_dirs = jh.filehd.module_run_dirs module_runner = jh.filehd.module_runners[module] + module_config_sec = jh.filehd.get_module_config_sec(module) worker_log = jh.filehd.get_worker_log_name # Define process list process_list = jh.filehd.process_list @@ -412,8 +413,8 @@ def run_mpi(pipe, comm): jobs = split_mpi_jobs(process_list, comm.size) del process_list else: - job_type = module_runner = worker_log = timeout = \ - jobs = run_dirs = None + job_type = module_runner = module_config_sec = worker_log = \ + timeout = jobs = run_dirs = None # Broadcast job type to all nodes job_type = comm.bcast(job_type, root=0) @@ -424,6 +425,7 @@ def run_mpi(pipe, comm): run_dirs = comm.bcast(run_dirs, root=0) module_runner = comm.bcast(module_runner, root=0) + module_config_sec = comm.bcast(module_config_sec, root=0) worker_log = comm.bcast(worker_log, root=0) timeout = comm.bcast(timeout, root=0) jobs = comm.scatter(jobs, root=0) @@ -436,6 +438,7 @@ def run_mpi(pipe, comm): timeout, run_dirs, module_runner, + module_config_sec, worker_log, verbose ), @@ -443,7 +446,7 @@ def run_mpi(pipe, comm): ) # Delete broadcast objects - del module_runner, worker_log, timeout, jobs + del module_runner, module_config_sec, worker_log, timeout, jobs # Finish up parallel jobs if master: From 27e9bbbf0afd59243b63197d10a9d504bca3c61d Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Tue, 31 Oct 2023 12:11:00 +0100 Subject: [PATCH 10/19] Updated MPI setting and candide job --- example/pbs/candide_mpi.sh | 12 ------ shapepipe/pipeline/file_handler.py | 67 +++++++----------------------- shapepipe/pipeline/job_handler.py | 5 --- shapepipe/pipeline/mpi_run.py | 11 ----- 4 files changed, 14 insertions(+), 81 deletions(-) diff --git a/example/pbs/candide_mpi.sh b/example/pbs/candide_mpi.sh index ee09e1935..cda3a9411 100644 --- a/example/pbs/candide_mpi.sh +++ b/example/pbs/candide_mpi.sh @@ -46,20 +46,8 @@ else echo "Using $NSLOTS CPUs set by hand" fi -# Test: print hostname. -# Only version 5.0.0 downloaded from the web recognised the --mca argument -#/home/mkilbing/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname - -#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by node --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname -#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini -#/home/mkilbing/bin/mpirun -map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini -#/home/mkilbing/bin/mpirun -n $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini -#$SPENV/bin/mpiexec -n $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini -#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini /softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS hostname /softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini -#/home/mkilbing/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini - # Return exit code exit 0 diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index e5289536f..b7479f6f3 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -18,10 +18,6 @@ from shapepipe.pipeline import shared from shapepipe.utilities.file_system import mkdir -from mpi4py import MPI -import datetime -import time - class FileHandler(object): """File Handler. @@ -97,7 +93,7 @@ def run_dir(self): @run_dir.setter def run_dir(self, value): - self._run_dir = self.check_dir(value, check_exists=False) + self._run_dir = self.check_dir(value, check_exists=True) @property def _input_dir(self): @@ -192,7 +188,7 @@ def mkdir(cls, dir_name): Directory name with full path """ - cls.check_dir(dir_name, check_exists=False) + cls.check_dir(dir_name, check_exists=True) mkdir(dir_name) @staticmethod @@ -380,7 +376,6 @@ def create_global_run_dirs(self): This method creates the pipeline output directories for a given run. """ - # MKDEBUG: Error occured here self.run_dir = self.setpath(self._output_dir, self._run_name) self._log_dir = self.setpath(self.run_dir, 'logs') self._tmp_dir = self.setpath(self.run_dir, 'tmp') @@ -391,7 +386,6 @@ def create_global_run_dirs(self): self.run_dir, ) - # MKDBUG: Error occured here self.mkdir(self.run_dir) self.mkdir(self._log_dir) self.mkdir(self._tmp_dir) @@ -811,22 +805,16 @@ def _generate_re_pattern(match_pattern): if not isinstance(match_pattern, str): TypeError('Match pattern must be a string.') - if re.search("\\\\", match_pattern) is not None: - # Regular expression on input: Use pattern - re_pattern = match_pattern - else: - # Generate pattern from input - chars = [char for char in match_pattern if not char.isalnum()] - split_pattern = '|'.join(chars).replace('.', r'\.') - chars = [f'\\{char}' for char in chars] + [''] - digit_list = re.split(split_pattern, match_pattern) - num_length = [ - f'\\d{{{len(digits)}}}' - for digits in re.split(split_pattern, match_pattern) - ] - re_pattern = r''.join( - [a for b in zip(num_length, chars) for a in b] - ).replace('{1}', '+') + chars = [char for char in match_pattern if not char.isalnum()] + split_pattern = '|'.join(chars).replace('.', r'\.') + chars = [f'\\{char}' for char in chars] + [''] + num_length = [ + f'\\d{{{len(digits)}}}' + for digits in re.split(split_pattern, match_pattern) + ] + re_pattern = r''.join( + [a for b in zip(num_length, chars) for a in b] + ).replace('{1}', '+') return re.compile(re_pattern) @@ -934,7 +922,6 @@ def _save_num_patterns( del file_list break - print("MKDEBUG save_num_pattern: path = ", path) if not true_file_list: raise RuntimeError( f'No files found matching "{pattern}" and "{ext}" in the ' @@ -983,12 +970,11 @@ def _save_num_patterns( if not found_match: raise RuntimeError( - f'Could not match numbering scheme "{re_pattern}" ' + f'Could not match numbering scheme "{self._numbering_scheme}" ' + f'to any of the input files matching "{pattern}" and ' + f'"{ext}" in the directories {dir_list}.' ) - elem = shared.check_duplicate(final_file_list) if elem != '': raise RuntimeError( @@ -1000,11 +986,6 @@ def _save_num_patterns( ) # Save file list - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - size = comm.Get_size() - now = datetime.datetime.now() - print(f"MKDEBUG save_num_patterns: save file list {output_file}, rank={rank}, size={size} time={now.time()}") np.save(output_file, np.array(final_file_list)) del true_file_list, final_file_list @@ -1025,21 +1006,7 @@ def _save_match_patterns(output_file, mmap_list): List of memory maps """ - #num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list] - num_pattern_list = [] - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - size = comm.Get_size() - for mmap in mmap_list: - now = datetime.datetime.now() - print(f"MKDEBUG load mmap {mmap}, rank={rank}, size={size} time={now.time()}") - if not os.path.exists(mmap): - n_sec = 5 - print(f"MKDEBUG waiting {n_sec}...") - time.sleep(n_sec) - if not os.path.exists(mmap): - print("MKDEBUG still not found") - num_pattern_list.append(np.load(mmap, mmap_mode="r")) + num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list] np.save( output_file, @@ -1210,13 +1177,10 @@ def _save_process_list( run_method, ) - print("MKDEBUG save process mmap", self.process_mmap) np.save(self.process_mmap, np.array(process_list)) del process_list - print("MKDEBUG load process mmap", self.process_mmap) self.process_list = np.load(self.process_mmap, mmap_mode='r') - print("MKDEBUG load process mmap done") self.missed = [] @@ -1247,8 +1211,6 @@ def _get_module_input_files(self, module, run_name): num_scheme = self._module_dict[module][run_name]['numbering_scheme'] run_method = self._module_dict[module][run_name]['run_method'] - print("MKDEBUG call _save_process_list, dir_list =", dir_list) - print("MKDEBUG tmp dir = ", self._tmp_dir, os.path.exists(self._tmp_dir)) self._save_process_list( dir_list, pattern_list, @@ -1299,7 +1261,6 @@ def set_up_module(self, module): self._set_module_properties(module, run_name) self._create_module_run_dirs(module, run_name) self._set_module_input_dir(module, run_name) - print("MKDEBUG call _get_module_input_files") self._get_module_input_files(module, run_name) def get_worker_log_name(self, module, file_number_string): diff --git a/shapepipe/pipeline/job_handler.py b/shapepipe/pipeline/job_handler.py index 28d450a15..d6baebd89 100644 --- a/shapepipe/pipeline/job_handler.py +++ b/shapepipe/pipeline/job_handler.py @@ -15,7 +15,6 @@ from shapepipe.pipeline.worker_handler import WorkerHandler -from mpi4py import MPI class JobHandler(object): """Job Handler. @@ -79,10 +78,6 @@ def __init__( self._log_job_parameters() # Set up module in file handler - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - size = comm.Get_size() - print(f"MKDEBUG set_up_module, rank = {rank}, size = {size}") self.filehd.set_up_module(self._module) # Set the total number of processes diff --git a/shapepipe/pipeline/mpi_run.py b/shapepipe/pipeline/mpi_run.py index 2bd88bcb1..c92d943c8 100644 --- a/shapepipe/pipeline/mpi_run.py +++ b/shapepipe/pipeline/mpi_run.py @@ -36,7 +36,6 @@ def submit_mpi_jobs( timeout, run_dirs, module_runner, - module_config_sec, worker_log, verbose, ): @@ -52,22 +51,12 @@ def submit_mpi_jobs( w_log_name = worker_log(module_runner.__name__, process[0]) wh = WorkerHandler(verbose=verbose) - print("MKDEBUG in submit_mpi_job") - print("MKDEBUG process[1:] = ", process[1:]) - print("MKDEBUG process[0] = ", process[0]) - print("MKDEBUG w_log_name = ", w_log_name) - print("MKDEBUG run_dirs = ", run_dirs) - print("MKDEBUG config = ", config) - print("MKDEBUG timeout = ", timeout) - print("MKDEBUG module_runner = ", module_runner) - print("MKDEBUG module_config_sec = ", module_config_sec) result.append(wh.worker( process[1:], process[0], w_log_name, run_dirs, config, - module_config_sec, timeout, module_runner )) From c380cd49b9cc3d0aa82340093ad673df26d24e26 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Tue, 31 Oct 2023 12:14:05 +0100 Subject: [PATCH 11/19] ngmix runner reset to develop --- shapepipe/modules/mccd_fit_val_runner.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/shapepipe/modules/mccd_fit_val_runner.py b/shapepipe/modules/mccd_fit_val_runner.py index f88a526f0..cb959e31c 100644 --- a/shapepipe/modules/mccd_fit_val_runner.py +++ b/shapepipe/modules/mccd_fit_val_runner.py @@ -6,8 +6,6 @@ """ -import os - import mccd from shapepipe.modules.mccd_package import shapepipe_auxiliary_mccd as aux_mccd @@ -52,16 +50,6 @@ def mccd_fit_val_runner( if mccd_mode == 'FIT_VALIDATION': - # Fitted model is found in the output directory - mccd_model_path = output_dir + fit_saving_name + file_number_string \ - + '.npy' - - if os.path.exists(mccd_model_path): - w_log.info( - f"output file {mccd_model_path} already exists, skipping" - ) - return None, None - aux_mccd.mccd_fit_pipeline( trainstar_path=trainstar_path, file_number_string=file_number_string, @@ -72,6 +60,10 @@ def mccd_fit_val_runner( w_log=w_log, ) + # Fitted model is found in the output directory + mccd_model_path = output_dir + fit_saving_name + file_number_string \ + + '.npy' + aux_mccd.mccd_validation_pipeline( teststar_path=teststar_path, mccd_model_path=mccd_model_path, From 13940f5bc8ed7625fe14441d9bcc9604d8479242 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Tue, 31 Oct 2023 12:14:51 +0100 Subject: [PATCH 12/19] ngmix script reset to develop --- shapepipe/modules/ngmix_package/ngmix.py | 32 +++++++++++------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/shapepipe/modules/ngmix_package/ngmix.py b/shapepipe/modules/ngmix_package/ngmix.py index 7d30eef07..7cb614a42 100644 --- a/shapepipe/modules/ngmix_package/ngmix.py +++ b/shapepipe/modules/ngmix_package/ngmix.py @@ -7,7 +7,6 @@ """ import re -import os import galsim import ngmix @@ -31,8 +30,10 @@ class Ngmix(object): ---------- input_file_list : list Input files - output_path : str - Output file path + output_dir : str + Output directory + file_number_string : str + File numbering scheme zero_point : float Photometric zero point pixel_scale : float @@ -58,7 +59,8 @@ class Ngmix(object): def __init__( self, input_file_list, - output_path, + output_dir, + file_number_string, zero_point, pixel_scale, f_wcs_path, @@ -80,7 +82,8 @@ def __init__( self._weight_vignet_path = input_file_list[4] self._flag_vignet_path = input_file_list[5] - self._output_path = output_path + self._output_dir = output_dir + self._file_number_string = file_number_string self._zero_point = zero_point self._pixel_scale = pixel_scale @@ -91,13 +94,8 @@ def __init__( self._w_log = w_log - # Initiatlise random generator using image ID number - basename = os.path.basename(self._output_path) - print("MKDEBUG output_path basename ", basename) - print(''.join(re.findall(r'\d+', basename))) - seed = int(''.join(re.findall(r'\d+', basename))) - print(seed) - #seed = 6121975 + # Initiatlise random generator + seed = int(''.join(re.findall(r'\d+', self._file_number_string))) np.random.seed(seed) self._w_log.info(f'Random generator initialisation seed = {seed}') @@ -316,10 +314,12 @@ def save_results(self, output_dict): Dictionary containing the results """ - if os.path.exists(self._output_path): - raise IOError(f"Output file {self._output_path} already exists") + output_name = ( + f'{self._output_dir}/ngmix{self._file_number_string}.fits' + ) + f = file_io.FITSCatalogue( - self._output_path, + output_name, open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite ) @@ -809,8 +809,6 @@ def do_ngmix_metacal( psf_obs = Observation(psfs[n_e], jacobian=psf_jacob) - ## MKDEBUG TODO usg sigma_to_... - ## psf_T = psfs_sigma[n_e] * 1.17741 * pixel_scale weight_map = np.copy(weights[n_e]) From 0bec31883f57ae6d3c6d6310a4856793cd729a96 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Tue, 31 Oct 2023 12:16:16 +0100 Subject: [PATCH 13/19] mccd and pysap dependencies added back in to example --- shapepipe/modules/python_example_runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/shapepipe/modules/python_example_runner.py b/shapepipe/modules/python_example_runner.py index fc5565ecc..d468c10e0 100644 --- a/shapepipe/modules/python_example_runner.py +++ b/shapepipe/modules/python_example_runner.py @@ -19,8 +19,10 @@ 'astropy', 'galsim', 'joblib', + 'mccd', 'ngmix', 'pandas', + 'pysap', 'scipy', 'sf_tools', 'sip_tpv', From 691daf30f1150593fc59f2b2d512a085d7c2d9ff Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Tue, 31 Oct 2023 12:21:09 +0100 Subject: [PATCH 14/19] submit run added missing arg --- shapepipe/pipeline/mpi_run.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/shapepipe/pipeline/mpi_run.py b/shapepipe/pipeline/mpi_run.py index c92d943c8..5baf84227 100644 --- a/shapepipe/pipeline/mpi_run.py +++ b/shapepipe/pipeline/mpi_run.py @@ -36,6 +36,7 @@ def submit_mpi_jobs( timeout, run_dirs, module_runner, + module_config_sec, worker_log, verbose, ): @@ -57,6 +58,7 @@ def submit_mpi_jobs( w_log_name, run_dirs, config, + module_config_sec, timeout, module_runner )) From 37127345a8ebae0792b434b80a7438a31d1a7dd5 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Tue, 31 Oct 2023 16:27:09 +0100 Subject: [PATCH 15/19] config mpi --- example/pbs/candide_mpi.sh | 15 +++++++++++---- .../merge_sep_cats_package/merge_sep_cats.py | 8 ++++++++ shapepipe/run.py | 2 ++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/example/pbs/candide_mpi.sh b/example/pbs/candide_mpi.sh index cda3a9411..2875bd2ec 100644 --- a/example/pbs/candide_mpi.sh +++ b/example/pbs/candide_mpi.sh @@ -27,7 +27,6 @@ export SPENV="$HOME/.conda/envs/shapepipe" export SPDIR="$HOME/shapepipe" # Load modules -module remove gcc module load gcc/9.3.0 module load intelpython/3-2023.1.0 module load openmpi/5.0.0 @@ -42,12 +41,20 @@ if [ -f "$PBS_NODEFILE" ]; then NSLOTS=`cat $PBS_NODEFILE | wc -l` echo "Using $NSLOTS CPUs from PBS_NODEFILE $PBS_NODEFILE" else - NSLOTS=8 + NSLOTS=4 echo "Using $NSLOTS CPUs set by hand" fi -/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS hostname -/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini +# Creates #node output dirs +MPI_CMD=/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun +MPI_ARGS="-np $NSLOTS" + +#MPI_CMD=$SPENV/bin/mpiexec +#MPI_CMD=$HOME/bin/mpiexec +#MPI_ARGS=-map-by + +${MPI_CMD} ${MPI_ARGS} hostname +${MPI_CMD} ${MPI_ARGS} $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini # Return exit code exit 0 diff --git a/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py b/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py index 85f7e194f..350ea65c0 100644 --- a/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py +++ b/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py @@ -89,6 +89,14 @@ def process(self): list_col_name = cat0.get_col_names() cat0.close() + # Check that exactly 6 (1 primary + secondary HDUs) exist + if len(list_ext_name) != 6: + msg = f"Number of HDUs is {len(list_ext_name)}, expected 6" + #raise ValueError(msg) + print(f"MKDEBUG warning: {msg}; setting to 6") + list_ext_name = list_ext_name[:6] + list_col_name = list_col_name[:6] + # Create empty dictionary # data dimension = n_extension x n_column x n_obj data = {} diff --git a/shapepipe/run.py b/shapepipe/run.py index 7ae36aa10..991b31740 100644 --- a/shapepipe/run.py +++ b/shapepipe/run.py @@ -477,12 +477,14 @@ def run(*args): if import_mpi: comm = MPI.COMM_WORLD + print(f"MKDEBUG comm = {comm}") master = comm.rank == 0 else: master = True if master: pipe = ShapePipe() + print("MKDEBUG set_up (-> mkdir out)") pipe.set_up() mode = pipe.mode else: From ddf4091ee960e1ca1cb58d0f34d6410dc524490d Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Mon, 6 Nov 2023 13:27:19 +0100 Subject: [PATCH 16/19] removed debug prints --- shapepipe/pipeline/file_handler.py | 14 -------------- shapepipe/pipeline/job_handler.py | 5 ----- 2 files changed, 19 deletions(-) diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index e5289536f..fe9129402 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -18,9 +18,6 @@ from shapepipe.pipeline import shared from shapepipe.utilities.file_system import mkdir -from mpi4py import MPI -import datetime -import time class FileHandler(object): @@ -1000,11 +997,6 @@ def _save_num_patterns( ) # Save file list - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - size = comm.Get_size() - now = datetime.datetime.now() - print(f"MKDEBUG save_num_patterns: save file list {output_file}, rank={rank}, size={size} time={now.time()}") np.save(output_file, np.array(final_file_list)) del true_file_list, final_file_list @@ -1027,15 +1019,9 @@ def _save_match_patterns(output_file, mmap_list): """ #num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list] num_pattern_list = [] - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - size = comm.Get_size() for mmap in mmap_list: - now = datetime.datetime.now() - print(f"MKDEBUG load mmap {mmap}, rank={rank}, size={size} time={now.time()}") if not os.path.exists(mmap): n_sec = 5 - print(f"MKDEBUG waiting {n_sec}...") time.sleep(n_sec) if not os.path.exists(mmap): print("MKDEBUG still not found") diff --git a/shapepipe/pipeline/job_handler.py b/shapepipe/pipeline/job_handler.py index 28d450a15..d6baebd89 100644 --- a/shapepipe/pipeline/job_handler.py +++ b/shapepipe/pipeline/job_handler.py @@ -15,7 +15,6 @@ from shapepipe.pipeline.worker_handler import WorkerHandler -from mpi4py import MPI class JobHandler(object): """Job Handler. @@ -79,10 +78,6 @@ def __init__( self._log_job_parameters() # Set up module in file handler - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - size = comm.Get_size() - print(f"MKDEBUG set_up_module, rank = {rank}, size = {size}") self.filehd.set_up_module(self._module) # Set the total number of processes From 3fb5a25ffcb288e0edc6a6810046ad5d9e5dd7fa Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Mon, 6 Nov 2023 16:11:10 +0100 Subject: [PATCH 17/19] mpi4py upgraded to 3.1.5 --- example/pbs/candide_mpi.sh | 4 ---- install_shapepipe | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/example/pbs/candide_mpi.sh b/example/pbs/candide_mpi.sh index 2875bd2ec..390756308 100644 --- a/example/pbs/candide_mpi.sh +++ b/example/pbs/candide_mpi.sh @@ -49,10 +49,6 @@ fi MPI_CMD=/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun MPI_ARGS="-np $NSLOTS" -#MPI_CMD=$SPENV/bin/mpiexec -#MPI_CMD=$HOME/bin/mpiexec -#MPI_ARGS=-map-by - ${MPI_CMD} ${MPI_ARGS} hostname ${MPI_CMD} ${MPI_ARGS} $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini diff --git a/install_shapepipe b/install_shapepipe index c370d91d0..5af39c158 100755 --- a/install_shapepipe +++ b/install_shapepipe @@ -22,7 +22,7 @@ last_update="08/03/22" # Conda package versions fftw_ver="3.3.10" libpng_ver="1.6.37" -mpi4py_ver="3.1.3" +mpi4py_ver="3.1.5" openblas_ver="0.3.18" # SExtractor Package From 6a764545d1291959113ed97a8977adf45dced3c8 Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Mon, 6 Nov 2023 16:38:36 +0100 Subject: [PATCH 18/19] changed warning print output --- shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py b/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py index 350ea65c0..83d95f567 100644 --- a/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py +++ b/shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py @@ -93,7 +93,7 @@ def process(self): if len(list_ext_name) != 6: msg = f"Number of HDUs is {len(list_ext_name)}, expected 6" #raise ValueError(msg) - print(f"MKDEBUG warning: {msg}; setting to 6") + print(f"warning: {msg}; setting to 6") list_ext_name = list_ext_name[:6] list_col_name = list_col_name[:6] From 7ac5042dc5328ec25f95ce1a35d402bd0228f90b Mon Sep 17 00:00:00 2001 From: Martin Kilbinger Date: Mon, 6 Nov 2023 16:40:00 +0100 Subject: [PATCH 19/19] ngmix_runner checked out from develop --- shapepipe/modules/ngmix_runner.py | 38 +++++++++++-------------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/shapepipe/modules/ngmix_runner.py b/shapepipe/modules/ngmix_runner.py index 9b5cc982d..432008793 100644 --- a/shapepipe/modules/ngmix_runner.py +++ b/shapepipe/modules/ngmix_runner.py @@ -6,8 +6,6 @@ """ -import os - from shapepipe.modules.module_decorator import module_runner from shapepipe.modules.ngmix_package.ngmix import Ngmix @@ -54,31 +52,21 @@ def ngmix_runner( id_obj_min = config.getint(module_config_sec, 'ID_OBJ_MIN') id_obj_max = config.getint(module_config_sec, 'ID_OBJ_MAX') - output_path = ( - f"{run_dirs['output']}/ngmix{file_number_string}.fits" + # Initialise class instance + ngmix_inst = Ngmix( + input_file_list, + run_dirs['output'], + file_number_string, + zero_point, + pixel_scale, + f_wcs_path, + w_log, + id_obj_min=id_obj_min, + id_obj_max=id_obj_max, ) - if os.path.exists(output_path): - w_log.info( - f"output file {output_path} already exists, skipping" - ) - else: - # Initialise class instance - w_log.info( - f"Processing data for output file {output_path}" - ) - ngmix_inst = Ngmix( - input_file_list, - output_path, - zero_point, - pixel_scale, - f_wcs_path, - w_log, - id_obj_min=id_obj_min, - id_obj_max=id_obj_max, - ) - # Process ngmix shape measurement and metacalibration - ngmix_inst.process() + # Process ngmix shape measurement and metacalibration + ngmix_inst.process() # No return objects return None, None