Source code for parallelizer

#This file is part of the PyPhase software.
#
#Copyright (c) Max Langer (2019) 
#
#max.langer@creatis.insa-lyon.fr
#
#This software is a computer program whose purpose is to allow development,
#implementation, and deployment of phase retrieval algorihtms.
#
#This software is governed by the CeCILL  license under French law and
#abiding by the rules of distribution of free software.  You can  use, 
#modify and/ or redistribute the software under the terms of the CeCILL
#license as circulated by CEA, CNRS and INRIA at the following URL
#"http://www.cecill.info". 
#
#As a counterpart to the access to the source code and  rights to copy,
#modify and redistribute granted by the license, users are provided only
#with a limited warranty  and the software's author,  the holder of the
#economic rights,  and the successive licensors  have only  limited
#liability. 
#
#In this respect, the user's attention is drawn to the risks associated
#with loading,  using,  modifying and/or developing or reproducing the
#software by the user in light of its specific status of free software,
#that may mean  that it is complicated to manipulate,  and  that  also
#therefore means  that it is reserved for developers  and  experienced
#professionals having in-depth computer knowledge. Users are therefore
#encouraged to load and test the software's suitability as regards their
#requirements in conditions enabling the security of their systems and/or 
#data to be ensured and,  more generally, to use and operate it in the 
#same conditions as regards security. 
#
#The fact that you are presently reading this means that you have had
#knowledge of the CeCILL license and that you accept its terms.

import os
from math import *
import glob
#import utilities
import time
from pyphase.config import *
import inspect
from functools import wraps
import pathlib
import pickle
import numpy as np

[docs]def Serial(func): """Decorator for serial processing. Parallelisation decorators work on functions with signature (self, *, dataset, projections)""" @wraps(func) def wrapper(*argv, **kwargs): func(*argv, **kwargs) return wrapper
[docs]def SLURM(func): """Decorator for parallellisation on SLURM""" @wraps(func) def wrapper(*argv, **kwargs): # Create a "worker" script that just calls align_projections print('SLURM') job_number = str(np.random.randint(0, 1e9)) projections = kwargs.pop('projections') #Necessary argument parent_object = argv[0] current_dir = os.getcwd() tmpdir = '/.pyphase/SLURM/' serialisation_filename = 'pyphase_'+ job_number+'.pickle' pathlib.Path(current_dir+tmpdir).mkdir(parents=True, exist_ok=True) with open(current_dir+tmpdir+serialisation_filename, 'wb') as f: pickle.dump([parent_object, kwargs], f, protocol=pickle.HIGHEST_PROTOCOL) pathlib.Path(current_dir+tmpdir).mkdir(parents=True, exist_ok=True) worker_file_name = current_dir + tmpdir + 'pyphase_worker_'+job_number+'.py' with open(worker_file_name, 'w') as worker_file: print("import sys", file=worker_file) print("import dataset", file=worker_file) print("import phaseretrieval", file=worker_file) print("import pickle", file=worker_file) print("with open('{}', 'rb') as f:".format(current_dir+tmpdir+serialisation_filename), file=worker_file) print(" stream = pickle.load(f)", file=worker_file) print("projections = [int(sys.argv[1]), int(sys.argv[2])]", file=worker_file) print("print(str(sys.argv))", file=worker_file) print("stream[0].{}.__wrapped__(stream[0], projections=projections, **stream[1])".format(func.__name__), file=worker_file) # Create a SLURM batch file number_of_CPU_per_node = min(number_of_nodes*number_of_cores, int(floor(memory_per_node/min_memory_per_core))) # Use the number of CPU corresponding to memory needs slurm_batch_file_name = current_dir + tmpdir + 'pyphase_slurm_'+job_number+'.sh' tasks = int(projections[1]) - int(projections[0]) + 1 number_of_CPU = min(number_of_CPU_per_node * number_of_nodes, tasks) step = tasks // number_of_CPU - 1 remainder = tasks % (number_of_CPU) with open(slurm_batch_file_name, 'w') as slurm_batch_file: print("#!/bin/sh", file=slurm_batch_file) print("#SBATCH -N {}".format(number_of_nodes), file=slurm_batch_file) #print("#SBATCH --exclusive", file=slurm_batch_file) print("#SBATCH --mem-per-cpu {}".format(min_memory_per_core), file=slurm_batch_file) print("#SBATCH --tasks-per-node {}".format(number_of_CPU_per_node), file=slurm_batch_file) print("#SBATCH -t 20:00:00", file=slurm_batch_file) print("#SBATCH -J pyphase", file=slurm_batch_file) print("#SBATCH -o {}pyphase_%j.out".format(current_dir+tmpdir), file=slurm_batch_file) print("#SBATCH -e {}pyphase_%j.out".format(current_dir+tmpdir), file=slurm_batch_file) print("cat $0", file=slurm_batch_file) print("export tasks={}".format(tasks), file=slurm_batch_file) print("export start={}".format(projections[0]), file=slurm_batch_file) print("export step={}".format(step), file=slurm_batch_file) print("export remainder={}".format(remainder), file=slurm_batch_file) print("for (( i=0; i<{}; i++ ))".format(number_of_CPU), file=slurm_batch_file) print("do", file=slurm_batch_file) print("export end=$(( start+step+(i<remainder) ))", file=slurm_batch_file) print("srun -Q --exclusive -n 1 -N 1 python {} $start $end &> {}pyphase_worker_${{SLURM_JOB_ID}}_${{i}} &".format(worker_file_name, current_dir+tmpdir), file=slurm_batch_file) print("echo \"python {} $start $end\"".format(worker_file_name), file=slurm_batch_file) print("export start=$(( end+1 ))", file=slurm_batch_file) print("sleep 1", file=slurm_batch_file) print("done", file=slurm_batch_file) print("wait", file=slurm_batch_file) print("touch {}pyphase_{}.sig".format(current_dir+tmpdir, job_number), file= slurm_batch_file) # Submit cmd="sbatch ."+tmpdir+"pyphase_slurm_"+job_number+".sh" print("Executing: " + cmd) os.system(cmd) timer=0 sigfile = 'pyphase_'+job_number+'.sig' while not os.path.exists(current_dir+tmpdir+sigfile): #if not(timer%30): # print("Executing: " + func.__name__ + ", {} s".format(timer), end='\r') time.sleep(1) timer += 1 os.remove(current_dir+tmpdir+sigfile) os.remove(current_dir+tmpdir+serialisation_filename) return wrapper
# pass
[docs]class OAR: """Legacy class for parallellisation on OAR. Will be reimplemented as decorator""" def __init__(self): self.cores = 100 self.executable_name = 'pyphase' self.path = os.path.split(os.path.realpath(__file__))[0] self.executable = os.path.join(self.path, self.executable_name) pass
[docs] def WriteOarFiles(self, DS): pass
#TODO: Don likne this implementation at all. Should probably be a decorator?
[docs] def Launch(self, dataset, operator, **kwargs): # algorithm='', parameter='', distance='' #TODO: Don't like this solution at all, needs reimplementation... this is not elegant #TODO: standardised file names...oars #fname = dataset.path+'/'+dataset.name+'_/'+dataset.name+'_'+dataset.version+'_'+operator if operator=='retrieve': command_prefix = '{} {}'.format(operator, dataset.name) command_postfix = '' fname = dataset.phase_prefix if 'algorithm' in kwargs: command_postfix+=' --algorithm {}'.format(kwargs.get('algorithm')) elif operator=='retrieve_difference': fname = dataset.update_prefix if 'algorithm' in kwargs: command+=' --algorithm {}'.format(kwargs.get('algorithm')) elif operator=='propagate': fname = dataset.propagated_prefix # TODO:surely needs distance somehow? if 'algorithm' in kwargs: command+=' --algorithm {}'.format(kwargs.get('algorithm')) elif operator=='difference': fname = dataset.difference_prefix if 'distance' in kwargs: command+=' --distance {}'.format(kwargs.get('distance')) #TODO: remove edf files (?) if os.path.exists(fname+"_.oar"): os.remove(fname+"_.oar") with open(fname+'_.oar', 'w') as oar_file: print("#!/bin/bash", file=oar_file) print("#OAR -l {mem_core_mb>=8000}/core=1,walltime=24", file=oar_file) print("#OAR -l {mem_core_mb>=4000 and mem_core_mb <8000}/nodes=1/core=2,walltime=48", file=oar_file) print("#OAR --array-param-file {}".format(fname+'_.params'), file=oar_file) print("#OAR --name {}".format(dataset.name), file=oar_file) print("#OAR --type besteffort", file=oar_file) print("#OAR --type idempotent", file=oar_file) print("{} $@".format(self.executable), file=oar_file) if os.path.exists(fname+"_.params"): os.remove(fname+"_.params") with open(fname+'_.params', 'w') as oar_file: interval = ceil(dataset.nbprojections/self.cores) for x in range(self.cores): print(command_prefix+' -p {} {} '.format(x*interval, (x+1)*interval)+command_postfix, file=oar_file) #with open(fname+'.params', 'w') as oar_file: # print("/mntdirect/_users/mlanger/Python/pyPhase/main.py 0 1", file=oar_file) # remove edf files before launching if operator in ('difference', 'propagate'): for file in glob.glob(fname+'_'+str(kwargs.get('distance'))+"_????.edf"): os.remove(file) else: for file in glob.glob(fname+"_????.edf"): os.remove(file) os.chmod(fname+'_.oar', 511) os.chmod(fname+'_.params', 511) # submit oar job that runs a script to signal end of job somehow #program_path=os.path.dirname(__file__) cmd="oarsub -S "+fname+"_.oar > /dev/null" print("Executing: " + cmd) os.system(cmd) # wait for job to finish (track number of edf's finally, I guess) # progress bar for niceness #TODO: this won't work now. Needs proper file name handling edfCounter = 0 if operator in ('difference', 'propagate'): nbprojections=dataset.nbprojections while edfCounter < nbprojections: utilities.update(operator.capitalize() + ' Distance ' + str(kwargs.get('distance')), edfCounter, nbprojections) edfCounter = len(glob.glob(fname+'_'+str(kwargs.get('distance'))+"_????.edf")) time.sleep(1) utilities.update(operator.capitalize() + ' Distance ' + str(kwargs.get('distance')), edfCounter, nbprojections) else: while edfCounter < dataset.nbprojections: utilities.update(operator.capitalize(), edfCounter, dataset.nbprojections) edfCounter = len(glob.glob(fname+"_????.edf")) time.sleep(1) utilities.update(operator.capitalize(), edfCounter, dataset.nbprojections)
# pass