Source code for dataset

#This file is part of the PyPhase software.alpha
#
#Copyright (c) Max Langer (2019) 
#
#max.langer@creatis.insa-lyon.fr
#
#This software is a computer program whose purpose is to allow development,
#implementation, and deployment of phase retrieval algorihtms.
#
#This software is governed by the CeCILL  license under French law and
#abiding by the rules of distribution of free software.  You can  use, 
#modify and/ or redistribute the software under the terms of the CeCILL
#license as circulated by CEA, CNRS and INRIA at the following URL
#"http://www.cecill.info". 
#
#As a counterpart to the access to the source code and  rights to copy,
#modify and redistribute granted by the license, users are provided only
#with a limited warranty  and the software's author,  the holder of the
#economic rights,  and the successive licensors  have only  limited
#liability. 
#
#In this respect, the user's attention is drawn to the risks associated
#with loading,  using,  modifying and/or developing or reproducing the
#software by the user in light of its specific status of free software,
#that may mean  that it is complicated to manipulate,  and  that  also
#therefore means  that it is reserved for developers  and  experienced
#professionals having in-depth computer knowledge. Users are therefore
#encouraged to load and test the software's suitability as regards their
#requirements in conditions enabling the security of their systems and/or 
#data to be ensured and,  more generally, to use and operate it in the 
#same conditions as regards security. 
#
#The fact that you are presently reading this means that you have had
#knowledge of the CeCILL license and that you accept its terms.

import os
import os.path
import sys
import numpy as np
import yaml
import re
#from sortedcontainers import SortedList
from vendor.EdfFile import EdfFile
import pickle
from math import *
import time
from xml.dom import minidom
import matplotlib.pyplot as pyplot
import scipy.ndimage as ndimage
from pathlib import Path
import h5py
if os.name == 'posix':
    import fcntl #TODO:from importlib import reload # python 2.7 does not require this
from pyphase.config import *
import vendor.pyelastix as pyelastix

[docs]class Dataset(object): """ Abstract class representing a set of recorded images, acquisition parameters, reconstructed images and any intermediary images. Attributes ---------- """ def __init__(self, name: str, path: str='.', version: str='master'): """ Initialisation for all Dataset objects. Parameters ---------- name : str Name of the dataset. Corresponds to the name of the file(s) or directory(ies) of the recorded data. path : str, optional Path to the file(s) or directory(ies) of the recorded data version : str, optional. Version of dataset file. To permit tests of different alignment and reconstruction parameters. """ super().__init__(name, path, version) # Necessary for multiple inheritance to work ## Mandatory parameters that will not necessarily exist in source data goes here self.reference_interval = 1 self.reference_position = 1 if self.backend_initialized: #TODO: How should this be done... shouldn front end parameters always be read? and only read backend parameters if they exist/if they have been modified? self.read_parameters_backend() else: self.read_parameters_frontend() self.initialize_backend() # self.z2 = self.ztot - self.z1 # Sample to detector distance #TODO: Would be nice to check which z1 or z2 is defined # self.effective_distance = self.z1*self.z2 / (self.z1+self.z2) #TODO: also needs x/y component? ItÅ› actually this that should be saved? self.position = np.arange(len(self.effective_distance)) self.magnification_x = self.pixel_size_x[0] / self.pixel_size_x #TODO REFACTOR: This can't go here as it's not the same order for ESRF and NanoMax self.magnification_y = self.pixel_size_x[0] / self.pixel_size_x #TODO REFACTOR: this is relative magnification wrt highest resolution! self.pixel_size = np.array([self.pixel_size_x[0], self.pixel_size_y[0]]) self.correct_alignment = True self.aligned = False # TODO: inelegant solution, and should be bool # self.projection_prefix = '/data/cpr/' # self.phase_prefix = '/phase/' #Refactor: remove (moved to backend) # self.attenuation_prefix = '/attenuation/' #Refactor: remove (moved to backend) # self.reference_position = 0 Refactor: moved to Dataset # self.energy = 13 #TODO: hard coded? Not provided in nanomax datafiles? OK, change by hand by user ftm self.path = path self.version = version self.name = name # if backend_name == 'HDF5': #TODO: Quick and dirty solution, should be done much more elegant eg with a mapping and register decorators # self.backend = HDF5(name, path, version) # elif backend_name == 'EDF': # self.backend = EDF(name, path, version) # self.nx = 2048 # self.ny = 2048 self.padding = 2 self.detector_pixel_size = 0 # self.reference_position = 0 self.alpha = [-8, -10] #TODO: HOw should this be handled really? The reconstruction parameters (alg, params, ... should definitely be in dataset, associated to a version even!) # def _initialise(self): # self.motion_corrected = 0 # self.aligned = 0 # # TODO: Should erase files also # self.populate() # self._write_parameter_file() @property def nfx(self): return self.padding * self.nx @property def nfy(self): return self.padding * self.ny @property def Lambda(self): """Wavelength in m, calculated from energy.""" return 12.4e-10 / self.energy
[docs] def align_projection(self, *, projection=0, save_projection=False): """ Aligns images at different positions at one projection angle Aligns (registers) images taken at different positions using the default registration algorithm (pyphase.registrator). Parameters ---------- projection : int, optional Number of projection to register. save_projection : bool, optional Save aligned images to file (creates new data files) """ #%% Images read should not be aligned if alignent exist. # TODO: necessary? correct_alignment = self.correct_alignment self.correct_alignment = 0 #%% Read intensities I_D = np.zeros((len(self.position), self.ny, self.nx)) for N in self.position: I_D[N]= self.get_projection(projection=projection, position=N, aligned=False, pad=False) #%% register images alignment = np.zeros((len(self.position),registrator.number_of_parameters)) #TODO: check existance for N in self.position: if N != self.reference_position: print('Aligning position '+ str(N)) field, im1reg, transform_parameters = registrator.register(I_D[N], I_D[self.reference_position]) alignment[N] = transform_parameters else: im1reg = I_D[self.reference_position] #TODO: Maybe not the safest to take im1reg but rather transform image? if save_projection: #TODO: This should be generalised somehow I suspect aligned_filename = self.path + '/' + self.name + '_aligned' + '_dist' + str(N) + '.h5' with h5py.File(aligned_filename, 'a') as f: if not 'data' in f.keys(): f.create_group(self.cpr_prefix) if str(projection).zfill(6) in f[self.cpr_prefix].keys(): f[self.cpr_prefix+str(projection).zfill(6)][:] = im1reg else: f.create_dataset(self.cpr_prefix+str(projection).zfill(6), data=im1reg) #%% Store alignment #TODO: Check for length of trasnform parameterrs #TODO: Quick and dirty implementation wait for write access parallel while True: try: with h5py.File(self.dataset_filename, 'a') as f: if 'alignment' in f.keys(): f['alignment'][projection] = alignment else: f.create_dataset('alignment', data=np.zeros((self.number_of_projections, len(self.position), registrator.number_of_parameters))) f['alignment'][projection] = alignment break # Success! except OSError: time.sleep(1) # Wait a bit self.correct_alignment = correct_alignment
[docs] def get_alignment(self, *, projection, position): """ Get transform parameters for alignment. Returns ------- transform_parameters : numpy array Array of transform parameters. Size depends on transform used. """ with h5py.File(self.dataset_filename, 'r') as f: transform_parameters = f['alignment'][projection, position, :] return transform_parameters
[docs] @Parallelize # TODO:How to get to reload from config? def align_projections(self, *, projections): """ Align a series of projections. Parameters ---------- projections : tuple of ints In the form [start, end] """ for projection in range(projections[0], projections[1]+1): self.align_projection(projection=projection)
########## BACKEND ###########
[docs]class Backend(): def __init__(self, name: str, path: str='.', version: str='master'): super().__init__() self.path = path self.version = version self.name = name
[docs]class HDF5(Backend): def __init__(self, name: str, path: str='.', version: str='master'): super().__init__(name, path, version) self.dataset_filename = pyphase_path / (self.name + '_' + self.version + '.h5') #TODO: Attribute? self.phase_prefix = '/phase/' self.attenuation_prefix = '/attenuation/' self.energy = 13 # keV TODO: Energy must be communicated in the original data!
[docs] def read_parameters_backend(self): with h5py.File(self.dataset_filename, 'a') as f: if 'scan_parameters' in f.keys(): #TODO: initialise must override reading of parameter file #self.ReadParameters() data = f['scan_parameters'] self.energy = data['energy'][()] self.z1 = data['z1'][()] self.pixel_size_x = data['pixel_size_x'][()] self.pixel_size_y = data['pixel_size_y'][()] self.number_of_projections = data['number_of_projections'][()] self.reference_interval = data['reference_interval'][()] self.effective_distance = data['effective_distance'][()] self.nx = data[('nx')][()] self.ny = data[('ny')][()] if 'alignment' in f.keys(): self.aligned = True
[docs] def initialize_backend(self): #self.Initialise() #TODO: Stub. Should take values from the data h5 file #TODO: Needs to be updated so that variables z1, z2, ztot, mag, pixsize, eff dist are interdependent with h5py.File(self.dataset_filename, 'a') as f: g = f.create_group('scan_parameters') g.create_dataset('energy', data=self.energy) g.create_dataset('z1', data=self.z1) g.create_dataset('pixel_size_x', data=self.pixel_size_x) g.create_dataset('pixel_size_y', data=self.pixel_size_y) g.create_dataset('number_of_projections', data=self.number_of_projections) g.create_dataset('reference_interval', data=self.reference_interval) g.create_dataset('effective_distance', data=self.effective_distance) g.create_dataset('nx', data=self.nx) g.create_dataset('ny', data=self.ny)
@property def backend_initialized(self): return self.dataset_filename.exists()
[docs] def get_image(self, *, projection, image_type='phase', Fourier=False, pad=False): """Get reconstructed images (phase by default) Parameters ---------- projection : int Number of the projection. image_type : str, optional Can be phase or attenuation. Fourier : bool Return Fourier transform of image. pad : bool Return padded image. Returns ------- image : ndarray """ #TODO: handling majuscule minuscule # if 'distance' in kwargs: # distance = kwargs.get('distance') #TODO: Needs error handling if image_type == 'attenuation': prefix = self.attenuation_prefix # elif 'propagated' in argv: # fname = self.propagated_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf' # elif 'difference' in argv: # fname = self.difference_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf' # elif 'prior' in argv: # fname = self.prior_forward_prefix.as_posix()+('_'+str(projection).zfill(4)+'.edf') elif image_type == 'phase': prefix = self.phase_prefix #TODO: separate file type... elif image_type == 'dark': with h5py.File(self.dataset_filename, 'r') as dark: image = dark['darkfield'][:] return image elif image_type == 'flat': with h5py.File(self.dataset_filename, 'r') as flat: image = flat['flatfield'][:] return image with h5py.File(self.dataset_filename, 'r') as f: image=f[prefix+str(projection).zfill(6)][:] if Fourier: #TODO: Refactor into an FT function in utilities (f ex) return np.fft.fft2(Utilities.resize(image, (self.nfx, self.nfy))) elif pad: image = Utilities.resize(image, (self.nfx, self.nfy)) return image
[docs] def write_image(self, *, image, projection, projection_type='phase'): # TODO: raise error where needed! number of args, args type... ''' Saves images into file. Parameters ---------- image : ndarray An ndarray of the image to save. projection : int The number of the projection to be saved. proj_type : str A string containing the prefix for the name of the file to be saved. position: int Number of the position of the projection (for propagation). Returns ------- None Saves the images into the file ``[prefix]_[projection_number].edf`` or ``[prefix]_[distance]_[projection_number].edf`` into the ``name_`` directory. ''' if projection_type.lower() == 'phase': prefix = self.phase_prefix elif projection_type.lower() == 'attenuation': prefix = self.attenuation_prefix # elif proj_type.lower() == 'phase update': # pref = self.update_prefix # elif proj_type.lower() == 'attenuation update': # pref = self.attenuation_update_prefix # elif proj_type.lower() == 'intensity': # pref = self.propagated_prefix elif projection_type.lower() == 'flatfield': with h5py.File(self.dataset_filename, 'a') as d: data = d.require_dataset('flatfield', shape=image.shape, dtype=image.dtype) data[:] = image elif projection_type.lower() == 'darkfield': with h5py.File(self.dataset_filename, 'a') as f: data = f.require_dataset('darkfield', shape=image.shape, dtype= image.dtype) data[:] = image else: raise TypeError("proj_type must be one of the following: 'phase', 'attenuation', 'phase update', 'attenuation update', 'intensity'" ) if projection_type.lower() == 'phase' or projection_type == 'attenuation': projection_name = str(projection).zfill(6) # TODO: Initial file locking implementation for parallelisation on LINUX/SLURM lock_filename = self.dataset_filename.with_suffix(self.dataset_filename.suffix+'.lock') while True: try: lock_file = open(lock_filename, 'a') # Create a lock file next to dataset file (TODO: Consider making invisible at least) if os.name == 'posix': #TODO: quick fix for lock file. Shouldn be necessary on windows due to no such parallelism? WHat about future multithread? fcntl.lockf(lock_file, fcntl.LOCK_EX) # Acquire a lockf lock (holds inter-process but not intra?) with h5py.File(self.dataset_filename, 'a') as f: data = f.require_dataset(prefix+projection_name, shape=image.shape, dtype=image.dtype) data[:] = image lock_file.close() # Release lock break except OSError: print("File already locked, waiting for access.") time.sleep(1) # Wait a bit
[docs] def write_flat(self, flatfield, position): """Writes a tensor of flat field images to file""" print("STUB: Writing flat") while True: try: with h5py.File(self.dataset_filename, 'r') as dataset_file: dataset_flat = dataset_file.require_dataset(self.flatfield_prefix + str(position).zfill(6), flatfield.shape, flatfield.dtype) dataset_flat[:] = flatfield break except OSError: print("File already locked, waiting for access.") time.sleep(1) # Wait a bit
[docs] def write_dark(self, darkfield, position): """Writes a tensor of dark images to file""" print("STUB: writing dark") while True: try: with h5py.File(self.dataset_filename, 'r') as dataset_file: dataset_dark = dataset_file.require_dataset(self.darkfield_prefix + str(position).zfill(6), darkfield.shape, darkfield.dtype) dataset_dark[:] = darkfield break except OSError: print("File already locked, waiting for access.") time.sleep(1) # Wait a bit
@property def preprocessing_initialised(self): print("STUB: preprocessing initialised?") return False
[docs]class EDF(Backend): def __init__(self, name: str, path: str='.', version: str='master'): super().__init__(name, path, version) self.parameter_filename = 'parameters.yml' #TODO: OK, this should not be done with an .yml file, in the EDF backend, this should be done with ESRF style .info/.xml self.dataset_parameter_filename = self.path / pyphase_path / name / self.version / self.parameter_filename self.result_prefix = self.path / pyphase_path / self.name / self.version # Phase self.phase_tag='phase' self.phase_name = self.name+'_'+self.version # TODO necessary? Good to mark data set & version in file names? self.phase_prefix = self.result_prefix / self.phase_tag / self.phase_name # retrieved attenuation self.attenuation_tag='attenuation' self.attenuation_name = self.phase_name+'_'+self.attenuation_tag self.attenuation_prefix = self.result_prefix / self.attenuation_tag / self.attenuation_name
[docs] def initialize_backend(self): #TODO: initialise must override reading of parameter file self.dataset_parameter_filename.parents[0].mkdir(parents=True) self.dataset_parameter_filename.touch() self._write_parameter_file() pass
def _write_parameter_file(self): """Write current state to the associated parameter file.""" #TODO: What really needs to go here? Everything necessary for parallel exeution, and everything that can be modified? But yml is appropriate. The question is if par files should be written? But this could be a separate script? #TODO: Number of projections is fucked up, it has to be corrected with the number of movement correction images (this is known a priori) parameters = dict(reference_position=self.reference_position, nD=self.nD, number_of_projections=self.number_of_projections, position_number=self.position_number, #z2=self.z2.tolist(), #ztot=self.ztot, effective_distance=self.effective_distance.tolist(), energy=self.energy, #detector_pixel_size=self.detector_pixel_size, pixel_size_x = self.pixel_size_x.tolist(), pixel_size_y = self.pixel_size_y.tolist(), nx=self.nx, ny=self.ny, #axis_position=self.axis_position.tolist(), reference_interval=self.reference_interval, #scan_range=self.scan_range, version=self.version) #alpha=self.alpha.tolist(), #delta_beta=self.delta_beta) with open(self.dataset_parameter_filename, 'w') as f: yaml.dump(parameters, f) pass @property def backend_initialized(self): return self.dataset_parameter_filename.exists()
[docs] def read_parameters_backend(self): """Reads the associated backend parameter file.""" with open(self.dataset_parameter_filename, 'r') as f: parameters = yaml.safe_load(f) self.reference_position=parameters['reference_position'] self.nD=parameters['nD'] self.number_of_projections=parameters['number_of_projections'] self.position_number=parameters['position_number'] self.effective_distance=np.array(parameters['effective_distance']) #self.z1=np.array(parameters['z1']) #self.distance_source_sample=parameters['dss'] self.energy=parameters['energy'] #self.detector_pixel_size=parameters['detector_pixel_size'] self.pixel_size_x=np.array(parameters['pixel_size_x']) self.pixel_size_y=np.array(parameters['pixel_size_y']) self.nx=parameters['nx'] self.ny=parameters['ny'] #self.axis_position=np.array(parameters['axis_position']) self.reference_interval=parameters['reference_interval'] #self.scan_range=parameters['scan_range'] self.version=parameters['version'] #self.alpha=np.array(parameters['alpha']) #self.delta_beta=parameters['delta_beta'] # TODO: If exists shift.pickle, assume aligned? pass
def get_image(self, *, projection, image_type='phase', Fourier=False, pad=False): """Get reconstructed images (phase by default) Parameters ---------- projection : int Number of the projection. image_type : str, optional Can be phase or attenuation. Fourier : bool Return Fourier transform of image. pad : bool Return padded image. Returns ------- image : ndarray """ if image_type.lower() == 'attenuation': fname = self.attenuation_prefix.as_posix()+'_'+str(projection).zfill(4)+'.edf' elif image_type.lower() == 'phase': fname = self.phase_prefix.as_posix()+'_'+str(projection).zfill(4)+'.edf' #TODO: separate file type... else: raise TypeError("proj_type must be one of the following: 'phase', 'attenuation'" ) image = EdfFile(fname).GetData(0) xpad = (self.nfx-self.nx)//2 ypad = (self.nfy-self.ny)//2 if Fourier: #TODO: Refactor into an FT function in utilities (f ex) return np.fft.fft2(np.pad(image, ((ypad, ypad), (xpad, xpad)), 'edge')) else: if pad: image = np.pad(image, ((ypad, ypad), (xpad, xpad)), 'edge') return image
[docs] def write_image(self, *, image, projection, projection_type='phase'): ''' Saves images into file. Parameters ---------- data : ndarray An ndarray of the image to save. projection_number : int The number of the projection to be saved. proj_type : str A string containing the prefix for the name of the file to be saved. args: int Number of the distance of the projection. Returns ------- None Saves the images into the file ``[prefix]_[projection_number].edf`` or ``[prefix]_[distance]_[projection_number].edf`` into the ``name_`` directory. ''' # TODO: raise error where needed! number of args, args type... if projection_type.lower() == 'phase': pref = self.phase_prefix elif projection_type.lower() == 'attenuation': pref = self.attenuation_prefix # elif projection_type.lower() == 'phase update': # pref = self.update_prefix # elif projection_type.lower() == 'attenuation update': # pref = self.attenuation_update_prefix # elif projection_type.lower() == 'intensity': # pref = self.propagated_prefix else: raise TypeError("proj_type must be one of the following: 'phase', 'attenuation'" ) if not pref.parent.exists(): pref.parent.mkdir(parents=True) # if len(args) != 0: # distance = str(args[0]) + '_' # else: # distance = '' # fname = self.path + '/' + (self.name + '_1_/' + self.name + '_1_0000.edf') # TODO: Fulhack DeLuxe (tm) imEDF = EdfFile(fname) EDFHeader = imEDF.GetHeader(0) #fname = pref.as_posix() + '_' + position + str(projection).zfill(4) + '.edf' fname = pref.as_posix() + '_' + str(projection).zfill(4) + '.edf' EDF = EdfFile(fname) EDF.WriteImage(EDFHeader, image, 0, "Float")
[docs] def get_image(self, *, projection, image_type='phase', Fourier=False, pad=True): """Get reconstructed images (phase by default) Parameters ---------- projection : int Number of the projection. image_type : str, optional Can be phase or attenuation. Fourier : bool Return Fourier transform of image. pad : bool Return padded image. Returns ------- image : ndarray """ if image_type.lower() == 'attenuation': fname = self.attenuation_prefix.as_posix()+'_'+str(projection).zfill(4)+'.edf' # elif 'propagated' in argv: # fname = self.propagated_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf' # elif 'difference' in argv: # fname = self.difference_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf' # elif 'prior' in argv: # fname = self.prior_forward_prefix.as_posix()+('_'+str(projection).zfill(4)+'.edf') elif image_type.lower() == 'phase': fname = self.phase_prefix.as_posix()+'_'+str(projection).zfill(4)+'.edf' #TODO: separate file type... else: raise TypeError("proj_type must be one of the following: 'phase', 'attenuation'" ) image = EdfFile(fname).GetData(0) xpad = (self.nfx-self.nx)//2 ypad = (self.nfy-self.ny)//2 if Fourier: #TODO: Refactor into an FT function in utilities (f ex) return np.fft.fft2(np.pad(image, ((ypad, ypad), (xpad, xpad)), 'edge')) else: if pad: image = np.pad(image, ((ypad, ypad), (xpad, xpad)), 'edge') return image
#backend=EDF #TODO: How do I get this into config... from dataset import * and __all__? backend = HDF5
[docs]class Nanomax(Dataset, backend): """Class for raw Nanomax data sets. Attributes ---------- ztot : float Focus to detector distance in m. path : str Path to dataset. version : str Version of Dataset backend structure, e.g. 'TIEHOM' name : str Name of dataset frames_per_position : int Number of frames per position. initial_frame : int Number of the initial frame. projection_prefix : str HDF5 path to projection data. reference_position : int Position number as reference for alignment (usually highest resolution) diode_name : str Name of diode (HDF path) for normalisation darkfield_prefix : str HDF5 path prefix to darkfields flatfield_prefix : str HDF5 path prefix to flatfields cpr_prefix : str HDF5 path prefix to flatfield corrected images phase_prefix : str HDF5 path prefix to phase maps attenuation_prefix : str HDF5 path prefix to attenuation images aligned : int Flag if dataset is aligned or not. """ def __init__(self, name: str,path: str='.', version: str= 'master'): #TODO: Create anoter intermediate class Nanomax, declinasions NanomaxRaw, NanomaxPreprocessed... self.ztot = 1.123 # Focus to detector distance in m, current NanoMax setup not given in data files atm self.pixel_size_filename = path + '/' 'pixelsizes.txt' self.data_basename = path + '/' + name + '/' #TODO: Find appropriate place for this... # self.path = path # self.version = version # self.name = name self.frames_per_position = 4 self.initial_frame = 0 self.projection_prefix = 'entry/measurement/andor/frames/' self.reference_position = 0 self.correct_alignment = 1 self.measurement_prefix = 'entry/snapshot/' self.pixel_size_filename = path + '/' 'pixelsizes.txt' self.diode_name = 'entry/measurement/alba2/1' self.darkfield_prefix = 'reference_images/darkfield/' self.flatfield_prefix = 'reference_images/flatfield/' self.cpr_prefix = 'data/cpr/' # self.phase_prefix = '/phase/' # self.attenuation_prefix = '/attenuation/' self.aligned=False super().__init__(name, path, version) if not self.preprocessing_initialised: # Calculate darks & flats print('Calculating darkfield and flatfield images') for position in range(len(self.effective_distance)): file_base_number = self.initial_frame + position*self.frames_per_position # Calculate darks dark_data_filename = self.data_basename + str(file_base_number).zfill(6) + '.h5' with h5py.File(dark_data_filename, 'r') as data_file: darkfields = data_file[self.projection_prefix][:] weights = data_file[self.diode_name][:] # Read diode measurements darkfield = np.average(darkfields,axis=0,weights=weights) self.write_dark(darkfield, position) # Calculate flats # TODO: not cleanly implemented wrt intermediate flats (but YAGNI) flat_beginning_data_filename = self.data_basename + str(file_base_number+1).zfill(6) + '.h5' flatfield = np.zeros((2, self.ny, self.nx)) # TODO: Adapt to flexible number of flats (but YAGNI) with h5py.File(flat_beginning_data_filename, 'r') as data_file: flatfields = data_file[self.projection_prefix][:] weights = data_file[self.diode_name][:] flatfield[0] = np.average(flatfields, axis=0, weights=weights) flat_end_data_filename = self.data_basename + str(file_base_number+3).zfill(6) + '.h5' with h5py.File(flat_end_data_filename, 'r') as data_file: flatfields = data_file[self.projection_prefix][:] weights = data_file[self.diode_name][:] flatfield[1] = np.average(flatfields, axis=0, weights=weights) #dataset_flat = dataset_file.require_dataset(self.flatfield_prefix + str(position).zfill(6), flatfield.shape, flatfield.dtype) #dataset_flat[:] = flatfield self.write_flat(flatfield, position)
[docs] def read_parameters_frontend(self): #TODO: STub. Should take values from the data h5 file with open(self.pixel_size_filename) as pixel_size_file: lines = pixel_size_file.readlines() del lines[0] # remove header line self.nD = len(lines) self.pixel_size_x = np.zeros(len(lines)) self.pixel_size_y = np.zeros_like(self.pixel_size_x) self.z1 = np.zeros_like(self.pixel_size_x) # Focus to sample distance (given in um in text files) for n, line in enumerate(lines): line = line.split() self.z1[n] = float(line[0])/1e6 # Focus to sample distance (given in um in text files) self.pixel_size_x[n] = float(line[1]) self.pixel_size_y[n] = float(line[2]) pass # Read parameters from data file projection_data_filename = self.data_basename + str(self.initial_frame+2).zfill(6) + '.h5' with h5py.File(projection_data_filename, 'r') as data_file: self.energy = float(data_file[self.measurement_prefix+'energy'][:]) / 1e3 # eV -> keV self.number_of_projections = len(data_file[self.projection_prefix]) self.nx = data_file[self.projection_prefix][:].shape[2] self.ny = data_file[self.projection_prefix][:].shape[1] self.z2 = self.ztot - self.z1 # Sample to detector distance #TODO: Would be nice to check which z1 or z2 is defined self.effective_distance = self.z1*self.z2 / (self.z1+self.z2) #TODO: also needs x/y component? ItÅ› actually this that should be saved?
[docs] def get_projection(self, *, projection, position, pad=True, magnification=True, aligned=True, Fourier=False): """ Read one recorded image. Parameters ---------- projection : int Number of projection to read. position : int Number of positiion ("distance") to read. pad : bool, optional Pads the image. magnification : bool, optional Brings the image to the magnification of reference_position. aligned : bool, optional Corrects alignment (requires alignment of projections). Fourier : bool Returns the Fourier transform of the image. """ # TODO: consider renaming just projection() to get data.projection(1,23) # Nanomax data: frame inital_frame=dark, initial_frame+1=flat_beginning, initial_frame+2=projections, initial_frame+3=flat_end projection_name = str(projection).zfill(6) position_name = str(position).zfill(6) #TODO: Quick and nasty concession to parallel computing while True: try: with h5py.File(self.dataset_filename, 'r') as f: dark_image = f[self.darkfield_prefix+position_name][:] # Read projection flatfield_image = f[self.flatfield_prefix+position_name][:] # Read projection break # Success! except OSError: time.sleep(1) # Wait a bit file_base_number = self.initial_frame + position*self.frames_per_position + 2 #TODO: Quick and nasty concession to parallel computing while True: try: with h5py.File(self.data_basename + str(file_base_number).zfill(6) + '.h5', 'r') as f: projection_image = f[self.projection_prefix][projection] # Read projection break # Success! except OSError: time.sleep(1) # Wait a bit reference_index_1 = (self.reference_interval * (projection // self.reference_interval)) reference_index_2 = min(reference_index_1+self.reference_interval, self.number_of_projections) # Weights flat fields weight_1 = (reference_index_2-projection)/max(reference_index_2-reference_index_1, 1) weight_2 = 1 - weight_1 # Calculate flat field reference_image = weight_1*flatfield_image[0] + weight_2*flatfield_image[1] # Calculate flat and dark field corrected projecion projection_image = (projection_image-dark_image) / (reference_image-dark_image) # Zoom images to same scale, corresponding to highest magnification if position != self.reference_position and magnification: #TODO: should implement distance_number, but YAGN? projection_image = ndimage.zoom(projection_image, [1/self.magnification_y[position], 1/self.magnification_x[position]], mode='nearest') # Zoom to highest magnification TODO: combine w/ alignment? # Correct alignment if aligned and self.aligned and position != self.reference_position: transform_parameters = self.get_alignment(projection=projection, position=position) # Get alignment parameters TODO: add zoom factor here? projection_image = registrator.apply_transformation(projection_image, transform_parameters) if Fourier: projection_image = Utilities.resize(projection_image, (self.nfy, self.nfx)) return np.fft.fft2(projection_image) else: if pad: return Utilities.resize(projection_image, (self.nfy, self.nfx)) else: return Utilities.resize(projection_image, (self.ny, self.nx))
[docs]class Elettra(Dataset, backend): """Class for raw Elettra Synchrotron data sets. Attributes ---------- ztot : float Focus to detector distance in m. path : str Path to dataset. version : str Version of Dataset backend structure, e.g. 'TIEHOM' name : str Name of dataset projection_prefix : str HDF5 path to projection data. reference_position : int Position number as reference for alignment (usually highest resolution) darkfield_prefix : str HDF5 path prefix to darkfields flatfield_prefix : str HDF5 path prefix to flatfields aligned : int Flag if dataset is aligned or not. """ def __init__(self, name: str, path: str = '.', version: str = 'ḿaster'): self.path = path self.version = version self.name = name self.data_basename = self.path + '/' + self.name # TODO: Attribute? self.dataset_filename = pyphase_path / (self.name + '_' + self.version + '.tdf') # TODO: Attribute? self.projection_prefix = 'exchange/data' self.correct_alignment = 1 self.ztot = 10 # Focus to detector distance in m #self.z2 = 'entry/instrument/detector/distance/' # Sample to detector distance in mm self.measurement_prefix = 'provenance/detector_output/' self.pixel_size_filename = path + '/' 'pixelsize.txt' self.darkfield_prefix = 'exchange/data_dark/' self.flatfield_prefix = 'exchange/data_white/' self.aligned = 0 # TODO: Should be bool super().__init__(name, path, version) self.reference_position = 0
[docs] def read_parameters_frontend(self): if os.path.isfile(self.pixel_size_filename): print('Text file with scan parameters found!') # self.Initialise() # TODO: STub. Should take values from the data h5 file with open(self.pixel_size_filename) as pixel_size_file: lines = pixel_size_file.readlines() del lines[0] # remove header line self.nD = len(lines) self.position_number = np.arange(1, self.nD + 1).tolist() self.pixel_size_x = np.zeros(len(lines)) self.pixel_size_y = np.zeros_like(self.pixel_size_x) self.z2 = np.zeros_like(self.pixel_size_x) for n, line in enumerate(lines): line = line.split() self.pixel_size_x[n] = float(line[1]) self.pixel_size_y[n] = float(line[2]) self.z2[n] = float(line[0]) self.energy = float(line[3]) pass else: print('No text file with scan parameters found!\nEnter required parameters:\n') self.energy = float(input('Insert energy in KeV: ')) self.pixel_size_x = float(input('Insert pixel size x in um: ')) self.pixel_size_y = float(input('Insert pixel size y in um: ')) self.z2 = float(input('Insert Sample-Detector distance in cm: ')) self.z2 = self.z2 / 1e3 #z2 in m self.z1 = self.ztot - self.z2 # Fucus to sample distance in m self.effective_distance = self.z1 * self.z2 / (self.z1 + self.z2) # Read parameters from data file projection_data_filename = self.data_basename + '.tdf' with h5py.File(projection_data_filename, 'r') as data_file: self.number_of_projections = data_file[self.projection_prefix].shape[1] self.nx = data_file[self.projection_prefix].shape[2] self.ny = data_file[self.projection_prefix].shape[0] # Calculate dark and flat references dark_images = data_file[self.darkfield_prefix][:] flat_images = data_file[self.flatfield_prefix][:] dark_image = np.median(dark_images, axis=1) flat_image = np.median(flat_images, axis=1) HDF5.write_image(self, image=dark_image, projection=0, projection_type='darkfield') HDF5.write_image(self, image=flat_image, projection=0, projection_type='flatfield')
[docs] def get_projection(self, *, projection, position=0, pad=True, magnification=True, aligned=True, Fourier=False): """ Read one recorded image. Parameters ---------- projection : int Number of projection to read. position : int Number of positiion ("distance") to read. pad : bool, optional Pads the image. magnification : bool, optional Brings the image to the magnification of reference_position. aligned : bool, optional Corrects alignment (requires alignment of projections). Fourier : bool Returns the Fourier transform of the image. """ while True: try: with h5py.File(self.data_basename + '.tdf', 'r') as f: projection_image = f[self.projection_prefix][:, projection, :] # Read projection break # Success! except OSError: time.sleep(1) # Wait a bit flat_image = self.get_image(projection=0, image_type='flat') dark_image = self.get_image(projection=0, image_type='dark') projection_image = (projection_image-dark_image) / (flat_image-dark_image) # Zoom images to same scale, corresponding to highest magnification if position != self.reference_position and magnification: # TODO: should implement distance_number, but YAGN? projection_image = ndimage.zoom(projection_image, [1 / self.magnification_y[position], 1 / self.magnification_x[position]], mode='nearest') # Zoom to highest magnification TODO: combine w/ alignment? # Correct alignment if aligned and self.aligned and position != self.reference_position: transform_parameters = self.get_alignment(projection=projection, position=position) # Get alignment parameters TODO: add zoom factor here? projection_image = registrator.apply_transformation(projection_image, transform_parameters) if Fourier: projection_image = Utilities.resize(projection_image, (self.nfy, self.nfx)) return np.fft.fft2(projection_image) else: if pad: return Utilities.resize(projection_image, (self.nfy, self.nfx)) else: return Utilities.resize(projection_image, (self.ny, self.nx))
[docs]class NanomaxPreprocessed(Dataset, backend): """ Class for preprocessed Nanomax datasets. Attributes ---------- ztot : float Focus to detector distance in m. path : str Path to dataset. version : str Version of Dataset backend structure, e.g. 'TIEHOM' name : str Name of dataset correct_alignent : int Flag to correct alignment. aligned: int Flag for dataset aligned. phase_prefix : str HDF5 path prefix to phase maps attenuation_prefix : str HDF5 path prefix to attenuation images projection_prefix : str HDF5 path to corrected projections reference_position : int Position number as reference for alignment (usually highest resolution) energy : float Energy in keV nx, ny : int Number of pixels, horizontal, vertical padding : int Padding factor detector_pixel_size : float The detector pixel size in µm alpha : tuple of floats Regularization parameter in the form [LF, HF] """ def __init__(self, name: str, path: str='.', version: str='master'): self.ztot = 1.123 # Focus to detector distance in m # TODOL this is hard coded for the moment, since no value is provided in the nanomax data files self.data_basename = path + '/' + name + '_dist' self.pixel_size_filename = path + '/' 'pixelsizes.txt' self.projection_prefix = '/data/cpr/' self.cpr_prefix = '/data/cpr/' super().__init__(name, path, version)
[docs] def read_parameters_frontend(self): #self.Initialise() #TODO: Stub. Should take values from the data h5 file self.energy=13 #TODO: Request this be included in data files with open(self.pixel_size_filename) as pixel_size_file: lines = pixel_size_file.readlines() del lines[0] # remove header line self.nD = len(lines) self.position_number = np.arange(1, self.nD + 1).tolist() self.pixel_size_x = np.zeros(len(lines)) self.pixel_size_y = np.zeros_like(self.pixel_size_x) self.z1 = np.zeros_like(self.pixel_size_x) # Focus to sample distance (given in um in text files) for n, line in enumerate(lines): line = line.split() self.z1[n] = float(line[0])/1e6 # Focus to sample distance (given in um in text files) self.pixel_size_x[n] = float(line[1]) self.pixel_size_y[n] = float(line[2]) pass self.z2 = self.ztot - self.z1 # Sample to detector distance #TODO: Should be attributes I guess? self.effective_distance = self.z1*self.z2 / (self.z1+self.z2)
[docs]class NanomaxPreprocessedTomo(NanomaxPreprocessed, backend): """ Class for preprocessed Nanomax tomographic datasets. Attributes ---------- data_basename : str File prefix to projection data files. magnification_x,y : numpy array Magnification factors for each position, horizontal, vertical pixel_size_x,y : numpy array Effective pixel size for each position pixel_size : numpy array Effective pixel size at reconstruction position """ def __init__(self, name: str, path: str='.', version: str='master'): #TODO: Pixel sizes must be calculated on the fly, but not possible at the moment super(NanomaxPreprocessedTomo,self).__init__(name, path, version)
[docs] def read_parameters_frontend(self): super().read_parameters_frontend() with h5py.File(self.data_basename+str(0)+'.h5', 'r') as data_file: self.number_of_projections = len(data_file[self.projection_prefix])
[docs] def get_projection(self, *, projection, position, pad=True, magnification=True, aligned=True, Fourier=False): """ Read one recorded image. Parameters ---------- projection : int Number of projection to read. position : int Number of positiion ("distance") to read. pad : bool, optional Pads the image. magnification : bool, optional Brings the image to the magnification of reference_position. aligned : bool, optional Corrects alignment (requires alignment of projections). Fourier : bool Returns the Fourier transform of the image. """ #TODO: consider renaming just projection() to get data.projection(1,23) data_filename = self.data_basename + str(position) + '.h5' projection_name = str(projection).zfill(6) with h5py.File(data_filename, 'r') as f: projection_image = f[self.projection_prefix+projection_name][:].astype('float64') # Read projection if position != self.reference_position and magnification: #TODO: should implement distance_number, but YAGN? projection_image = ndimage.zoom(projection_image, [1/self.magnification_y[position], 1/self.magnification_x[position]], mode='nearest') # Zoom to highest magnification TODO: combine w/ alignment? if aligned and self.aligned and position != self.reference_position: # TODO: Currently using similarity transform. It would be nice to be able to change transforms: # TODO: Translation, Euler, Similarity, Affine... and handle the different length of parameter vector transform_parameters = self.get_alignment(projection=projection, position=position) # Get alignment parameters TODO: add zoom factor here? #projection_image = ndimage.zoom(projection_image, magnification_factor/transform_parameters[0]) #projection_image = ndimage.rotate(projection_image, np.degrees(transform_parameters[1])) #projection_image = ndimage.shift(projection_image, [-transform_parameters[3], -transform_parameters[2]]) projection_image = registrator.apply_transformation(projection_image, transform_parameters) if Fourier: projection_image = Utilities.resize(projection_image, (self.nfy, self.nfx)) return np.fft.fft2(projection_image) else: if pad: return Utilities.resize(projection_image, (self.nfy, self.nfx)) else: return Utilities.resize(projection_image, (self.ny, self.nx))
[docs]class NanomaxPreprocessed2D(NanomaxPreprocessed, backend): """ Class for single projection Nanomax data. """ def __init__(self, name: str, path: str='.', version: str='master'): self.projection_prefix = '/data/cpr/' super().__init__(name, path, version) self.data_filename = self.path + '/' + self.name + '.h5'
[docs] def read_parameters_frontend(self): super().read_parameters_frontend() self.number_of_projections = 1
[docs] def get_projection(self, *, projection, position, pad=True, magnification=True, aligned=True, Fourier=False): """ Read one recorded image. Parameters ---------- projection : int Number of projection to read. position : int Number of positiion ("distance") to read. pad : bool, optional Pads the image. magnification : bool, optional Brings the image to the magnification of reference_position. aligned : bool, optional Corrects alignment (requires alignment of projections). Fourier : bool Returns the Fourier transform of the image. """ #TODO: consider renaming just projection() to get data.projection(1,23) #TODO: Possibility non-corrected? projection_name = str(position).zfill(6) # Constitute projection name in Nanomax dataset with h5py.File(self.data_filename, 'r') as f: projection_image = f[self.projection_prefix+projection_name][:] # Read projection if position != self.reference_position and magnification: #TODO: should implement distance_number, but YAGN? projection_image = ndimage.zoom(projection_image, [1/self.magnification_y[position], 1/self.magnification_x[position]], mode='nearest') # Zoom to reference_plane magnification TODO: combine w/ alignment? if aligned and position != self.reference_position: # COrrect alignment # TODO: Currently using similarity transform. It would be nice to be able to change transforms # TODO: Translation, Euler, Similarity, Affine... and handle the different length of parameter vector transform_parameters = self.get_alignment(projection=projection, position=position) # Get alignment parameters TODO: add zoom factor here? projection_image = registrator.apply_transformation(projection_image, transform_parameters) if Fourier: # Pad and Fourier transform projection_image = Utilities.resize(projection_image, (self.nfy, self.nfx)) return np.fft.fft2(projection_image) else: if pad: # Pad return Utilities.resize(projection_image, (self.nfy, self.nfx)) else: return Utilities.resize(projection_image, (self.ny, self.nx))
#class ESRF_dev(Dataset, backend): # def __init__(self, name: str, path: str='.', version: str='master'): # super().__init__(name, path, version)
[docs]class ESRF_dev(Dataset, backend): def __init__(self, name: str, path: str='.', version: str='master'): super().__init__(name, path, version)
[docs] def read_parameters_frontend(self): # Checks number of directories named 'name_%d_' print(self.path) type(self.path) # Goes through all directories to find distances. TODO: Should use name rather. Get rid of the cryptic regex directories = os.listdir(self.path) name_regex = r"%s(\_)(\d)(\_)" % self.name #TODO: Too unreadable. Avoid cryptic code data_dirs = [dir for dir in directories if re.match(name_regex, dir)] # Get number of positions self.nD = len(data_dirs) # number of distances self.position_number = np.arange(1, self.nD + 1).tolist() # Info file names. TODO: Not nice, should be based on the generic paths fname_info = self.path + '/' + self.name + '_1_' + '/' + self.name + '_1_' + '.info' fname_xml = self.path + '/' + self.name + '_1_' + '/' + self.name + '_1_' + '.xml' #TODO: OK you actually need both files, detector ps from info, z1 from xml... #TODO: For the moment, implement isotropic pixels and equal prop distance. print(fname_info) self.reference_position = 0 #TODO: Should this really go here? self.z1 = np.zeros(self.nD) if Path(fname_info).exists() and Path(fname_xml).exists(): # Read first distance info file parameter_dict = {} with open(fname_info) as file: for line in file: listedline = line.strip().split('=') parameter_dict[listedline[0]] = listedline[1].strip() self.number_of_projections = int(parameter_dict['TOMO_N']) self.energy = float(parameter_dict['Energy']) # keV self.detector_pixel_size = float(parameter_dict['Optic_used']) self.nx = int(parameter_dict['Dim_1']) self.ny = int(parameter_dict['Dim_2']) self.reference_interval = int(parameter_dict['REF_ON']) self.scan_range = int(parameter_dict['ScanRange']) xml_file = minidom.parse(str(fname_xml)) #TODO: No better way to do this? for position in range(self.nD): #TODO: why this nested reading? Must be simplified fname = self.path + '/' + self.name + '_' + str(position+1) + '_/' + self.name + '_' + str(position+1) + '_' + '.xml' print(fname) xml_f = minidom.parse(str(fname)) self.z1[position] = float(xml_f.getElementsByTagName('sourceSampleDistance')[0].firstChild.data)/1000 else: raise Exception('Supporting files '+ self.name +'_1_.info and ' + self.name +'_1_.xml not found.') self.z2 = self.ztot - self.z1 self.absolute_magnification = (self.z1+self.z2)/self.z1 self.effective_distance = (self.z1*self.z2)/(self.z1+self.z2) self.pixel_size_x = self.detector_pixel_size/self.absolute_magnification self.pixel_size_y = self.pixel_size_x # TODO: non-isotropic pixels self.position = np.arange(self.nD)
# Calculation of magnification and pixel size. TODO: As opposed to NanoMax, pixel size is not measured. # TODO: With how the different numbers are stored (or not...) it's fairly technical to extract & calculate the x/y ps & mag. # TODO: I keep it isotropic for the time being # TODO: WHat needs to be done is use the sample offsets to calculate z, mag, pixsize #self.sample_offset_x = 1.150422805517745 #self.sample_offset_y = 1.128909237475981 #z1x = (raw_positions - self.sample_offset_x) / 1000 # raw position 1: 87.4573 #z1y = (raw_positions - self.sample_offset_y) / 1000 # TODO: For refactoring: # #TODO: STub. Should take values from the data h5 file # with open(self.pixel_size_filename) as pixel_size_file: # lines = pixel_size_file.readlines() # del lines[0] # remove header line # self.pixel_size_x = np.zeros(len(lines)) # self.pixel_size_y = np.zeros_like(self.pixel_size_x) # self.z1 = np.zeros_like(self.pixel_size_x) # Focus to sample distance (given in um in text files) # for n, line in enumerate(lines): # line = line.split() # self.z1[n] = float(line[0])/1e6 # Focus to sample distance (given in um in text files) # self.pixel_size_x[n] = float(line[1]) # self.pixel_size_y[n] = float(line[2]) # pass # self.z2 = self.ztot - self.z1 # Sample to detector distance # self.effective_distance = self.z1*self.z2 / (self.z1+self.z2) # # Read parameters from data file # projection_data_filename = self.data_basename + str(self.initial_frame+2).zfill(6) + '.h5' # with h5py.File(projection_data_filename, 'r') as data_file: # self.energy = float(data_file[self.measurement_prefix+'energy'][:]) / 1e3 # eV -> keV # self.number_of_projections = len(data_file[self.projection_prefix]) # self.nx = data_file[self.projection_prefix][:].shape[2] # self.ny = data_file[self.projection_prefix][:].shape[1]
[docs] def get_projection(self, *, projection, position, pad=True, Fourier=False, aligned=True, magnification=True): #TODO: consider renaming just projection() to get data.projection(1,23) # For integration with rest, necessary(?) conversion from position 0 to 1 #TODO: takes position_number for ESRF data no, so no +1?! projection_prefix = self.path + '/' + (self.name + '_' + str(position) + '_') projection_filename = projection_prefix + '/' + (self.name + '_' + str(position) + '_' + str(projection).zfill(4) + '.edf') projection_image = EdfFile(projection_filename).GetData(0) xpad = (self.nfx-self.nx)//2 ypad = (self.nfy-self.ny)//2 # Preprocessing dark_filename = projection_prefix + '/' + 'dark.edf' dark_image = EdfFile(dark_filename).GetData(0) # Get previous flatfield reference_index_1 = (self.reference_interval * (projection // self.reference_interval)) reference_index_2 = min(reference_index_1+self.reference_interval, self.number_of_projections) # Weights flat fields weight_1 = (reference_index_2-projection)/max(reference_index_2-reference_index_1, 1) weight_2 = 1 - weight_1 # Open 'previous' flat reference_filename_1 = projection_prefix + '/' + ('refHST' + str(reference_index_1).zfill(4) + '.edf') reference_image_1 = EdfFile(reference_filename_1).GetData(0) - dark_image # open 'next' flat reference_filename_2 = projection_prefix + '/' + ('refHST' + str(reference_index_2).zfill(4) + '.edf') reference_image_2 = EdfFile(reference_filename_2).GetData(0) - dark_image # reference for current projection reference_image = weight_1*reference_image_1 + weight_2*reference_image_2 # Open projection image projection_image = (projection_image-dark_image) / reference_image if position != self.reference_position and magnification: #TODO: should implement distance_number, but YAGN? projection_image = ndimage.zoom(projection_image, [1/self.magnification_y[position], 1/self.magnification_x[position]], mode='nearest') # Zoom to reference_plane magnification TODO: combine w/ alignment? if aligned and self.aligned and position != self.reference_position: # COrrect alignment # TODO: Currently using similarity transform. It would be nice to be able to change transforms # TODO: Translation, Euler, Similarity, Affine... and handle the different length of parameter vector transform_parameters = self.get_alignment(projection=projection, position=position) # Get alignment parameters TODO: add zoom factor here? projection_image = registrator.apply_transformation(projection_image, transform_parameters) if Fourier: # Pad and Fourier transform projection_image = Utilities.resize(projection_image, (self.nfy, self.nfx)) return np.fft.fft2(projection_image) else: if pad: # Pad return Utilities.resize(projection_image, (self.nfy, self.nfx)) else: return Utilities.resize(projection_image, (self.ny, self.nx))
[docs]class ID16B(ESRF_dev, backend): def __init__(self, name: str, path: str='.', version: str='master'): self.ztot = 0.5655134742720727 #TODO: From old ID16B dataset (Zr73B) super().__init__(name, path, version) pass
[docs]class ID19(ESRF_dev, backend): def __init__(self, name: str, path: str='.', version: str='master'): self.z1 = 144 # TODO: I guess there is simply no other way than hard coding? super().__init__(name, path, version)
[docs] def read_parameters_frontend(self): # Checks number of directories named 'name_%d_' print(self.path) type(self.path) # Goes through all directories to find distances. TODO: Should use name rather. Get rid of the cryptic regex directories = os.listdir(self.path) name_regex = r"%s(\_)(\d)(\_)" % self.name #TODO: Too unreadable. Avoid cryptic code data_dirs = [dir for dir in directories if re.match(name_regex, dir)] # Get number of positions self.nD = len(data_dirs) # number of distances self.position_number = np.arange(1, self.nD + 1).tolist() # Info file names. TODO: Not nice, should be based on the generic paths fname_info = self.path + '/' + self.name + '_1_' + '/' + self.name + '_1_' + '.info' fname_xml = self.path + '/' + self.name + '_1_' + '/' + self.name + '_1_' + '.xml' #TODO: OK you actually need both files, detector ps from info, z1 from xml... #TODO: For the moment, implement isotropic pixels and equal prop distance. print(fname_info) self.reference_position = 0 #TODO: Should this really go here? if Path(fname_info).exists() and Path(fname_xml).exists(): # Read first distance info file parameter_dict = {} with open(fname_info) as file: for line in file: listedline = line.strip().split('=') parameter_dict[listedline[0]] = listedline[1].strip() self.number_of_projections = int(parameter_dict['TOMO_N']) self.energy = float(parameter_dict['Energy']) # keV self.detector_pixel_size = float(parameter_dict['Optic_used']) #TODO: watch out, might be a space instead of underscore self.nx = int(parameter_dict['Dim_1']) self.ny = int(parameter_dict['Dim_2']) self.reference_interval = int(parameter_dict['REF_ON']) self.scan_range = int(parameter_dict['ScanRange']) self.z2 = np.zeros(self.nD) xml_file = minidom.parse(str(fname_xml)) #TODO: No better way to do this? for position in range(self.nD): #TODO: why this nested reading? Must be simplified fname = self.path + '/' + self.name + '_' + str(position+1) + '_/' + self.name + '_' + str(position+1) + '_' + '.xml' print(fname) xml_f = minidom.parse(str(fname)) self.z2[position] = float(xml_f.getElementsByTagName('distance')[0].firstChild.data)/1000 #TODO: Is this the only difference to ID16B? else: raise Exception('Supporting files '+ self.name +'_1_.info and ' + self.name +'_1_.xml not found.') self.ztot = self.z1 + self.z2 self.absolute_magnification = (self.z1+self.z2)/self.z1 #TODO: Refactor: this also is different ID19/ID16B self.effective_distance = (self.z1*self.z2)/(self.z1+self.z2) #TODO: Is it not effective distance and magnification that should be stored then?! self.pixel_size_x = self.detector_pixel_size/self.absolute_magnification self.pixel_size_y = self.pixel_size_x # TODO: non-isotropic pixels self.position = np.arange(self.nD)
# Calculation of magnification and pixel size. TODO: As opposed to NanoMax, pixel size is not measured. # TODO: With how the different numbers are stored (or not...) it's fairly technical to extract & calculate the x/y ps & mag. # TODO: I keep it isotropic for the time being # TODO: WHat needs to be done is use the sample offsets to calculate z, mag, pixsize #self.sample_offset_x = 1.150422805517745 #self.sample_offset_y = 1.128909237475981 #z1x = (raw_positions - self.sample_offset_x) / 1000 # raw position 1: 87.4573 #z1y = (raw_positions - self.sample_offset_y) / 1000
[docs]class ESRF(): """Legacy code for ESRF datasets. Note ---- Will be aligned with Nanomax functionality. """ def __init__(self, name: str, path: str='.', version: str='master'): self.registrator = Utilities.ElastixRigid() self.path=Path(path) # TODO: OR shoud it take a path object already? self.name=name # name of sample self.version=version self.axis_position = np.array([]) self.position = np.array([]) # in m self.distance_source_sample= 0 self.energy = 0 # in keV self.nD = 0 self.nbprojections = 0 self.position_number = [] self.nx = 0 self.ny = 0 self.detector_pixel_size = 0 self.reference_interval = 0 self.reference_position = 0 self.scan_range = 0 self.delta_beta = 0 #TODO: Some of these things should surely go in some kind of configuration file self.phase_tag='phase' self.delta_tag='delta' self.beta_tag='beta' self.measured_beta_tag='measured_beta' self.forward_tag='forward' self.attenuation_tag='attenuation' self.measured_attenuation_tag='measured_attenuation' self.propagated_tag='propagated' self.update_tag='update' self.prior_tag='prior' self.attenuation_update_tag='attenuation_update' self.difference_tag='difference' self.parameter_filename='parameters.yml' self.dataset_parameter_filename = self.path / pyphase_path / name / self.version / self.parameter_filename if (self.dataset_parameter_filename.exists()): #TODO: initialise must override reading of parameter file self._read_parameter_file() else: self.dataset_parameter_filename.parents[0].mkdir(parents=True) self.dataset_parameter_filename.touch() self._initialise() self.preprocessing=1 self.correct_alignment=1 self.correct_motion=1 self.padding=2 #TODO: should be a parameter/in parameter file (in each version?) self.magnification = 1 #TODO: should be a parameter/in parameter file (in each version?) # Pixel size & magnification (from parameter file) self.magnification_factor = (self.distance_source_sample+self.position*1e-3) / self.distance_source_sample self.max_magnification_factor = max(self.magnification_factor) self.pixel_size = self.detector_pixel_size / self.max_magnification_factor # File access # Basix paths self.data_prefix = self.path / self.name self.result_prefix = self.path / pyphase_path / self.name / self.version # self.vol_prefix = self.path+'/volfloat/' #TODO: Should change, volumes should be stored either directly under version path or in their own directories # Phase self.phase_name = self.name+'_'+self.version # TODO necessary? Good to mark data set & version in file names? self.phase_prefix = self.result_prefix / self.phase_tag / self.phase_name # Delta volume_suffix = '.vol' #TODO: This will be dependent on the internal format self.delta_filename = (self.result_prefix / self.delta_tag / self.phase_name).with_suffix(volume_suffix) # Projected Delta self.phase_forward_name = self.phase_name+'_'+self.delta_tag+'_'+self.forward_tag # Projection of delta self.phase_forward_prefix = self.result_prefix / (self.delta_tag+'_'+self.forward_tag) / self.phase_forward_name # Measured attenuation / shortest distance self.measured_attenuation_name = self.name+'_'+str(self.position_number[0]) self.measured_attenuation_prefix = self.path / self.measured_attenuation_name / self.measured_attenuation_name # measured beta self.measured_beta_filename = (self.result_prefix / self.measured_attenuation_name).with_suffix(volume_suffix) # Projected measured beta self.measured_attenuation_forward_name = self.measured_attenuation_name+'_'+self.forward_tag self.measured_attenuation_forward_prefix = self.result_prefix / (self.measured_beta_tag+'_'+self.forward_tag) / self.measured_attenuation_forward_name # retrieved attenuation self.attenuation_name = self.phase_name+'_'+self.attenuation_tag self.attenuation_prefix = self.result_prefix / self.attenuation_tag / self.attenuation_name # reconstructed beta self.beta_filename = (self.result_prefix / self.attenuation_name).with_suffix(volume_suffix) # projected reconstructed beta self.attenuation_forward_name = self.attenuation_name+'_'+self.forward_tag self.attenuation_forward_prefix = self.result_prefix / (self.beta_tag+'_'+self.forward_tag) / self.attenuation_forward_name # prior self.prior_name=self.phase_name+'_'+self.prior_tag self.prior_prefix= self.result_prefix / self.prior_name self.prior_vol_filename = (self.result_prefix / self.prior_tag / self.prior_name).with_suffix(volume_suffix) self.prior_forward_name = self.prior_name+'_'+self.forward_tag self.prior_forward_prefix = self.result_prefix / (self.prior_tag+'_'+self.forward_tag) / self.prior_forward_name # Propagated, updates self.propagated_prefix=self.result_prefix / self.propagated_tag / (self.phase_name+'_'+self.propagated_tag) self.update_name = self.phase_name+'_'+self.update_tag self.update_prefix = self.result_prefix / self.update_tag / self.update_name self.update_vol_filename = (self.result_prefix / self.update_tag / self.update_name).with_suffix(volume_suffix) self.attenuation_update_name=self.phase_name+'_'+self.attenuation_update_tag self.attenuation_update_prefix= self.result_prefix / self.attenuation_update_tag / self.attenuation_update_name self.attenuation_update_vol_filename = (self.result_prefix / self.attenuation_update_tag / self.attenuation_update_name).with_suffix(volume_suffix) self.difference_name = self.phase_name + self.difference_tag self.difference_prefix = self.result_prefix / self.difference_tag / self.difference_name self.lengthscale=10e-6 #TODO: Dataset should surely not have lengthscale? # TODO: there should be the possibility to use different parameter files ofc # TODO: Add argument for parameter file\ # TODO: If no parameter file is given, check if the standard one exists, otherwise populate from info files # self.angle_increment=self.scan_range/self.nbprojections self.Lambda = 12.4e-10 / self.energy # wavelength (in m?) self.effective_distance = (self.distance_source_sample * self.position) / (self.distance_source_sample + self.position) self.Fresnel_number = self.Lambda * self.effective_distance / (self.lengthscale ** 2) self.position_number_no_reference_position = self.position_number.copy() self.position_number_no_reference_position.remove(self.position_number[self.reference_position]) # Check if shifts are done (i.e. file shifts.pickle exists) self.shift_filename = self.result_prefix / 'shifts.pickle' self.shift_polynomial_x=[0 for x in range (self.nD)] self.shift_polynomial_y=[0 for x in range (self.nD)] self.shift_polynomial_y=[0 for x in range (self.nD)] self.shift_polynomial_order=[4, 2, 2] if os.path.isfile(self.shift_filename): self.aligned=True with open(self.shift_filename, 'rb') as f: self.shifts=pickle.load(f) self.fit_alignment() else: self.shifts=SortedList() # entry: [] self.aligned=False self.motion_filename=path+'/'+name+'_/motion.pickle' if os.path.isfile(self.motion_filename): self.motion_corrected=1 with open(self.motion_filename, 'rb') as f: self.motion_correlations = pickle.load(f) else: self.motion_correlations = [ [] for i in range(self.nD) ] self.motion_corrected=0 def _initialise(self): self.motion_corrected = 0 self.aligned = False # TODO: Should erase files also self.populate() self._write_parameter_file() @property def nfx(self): return self.padding * self.nx @property def nfy(self): return self.padding * self.ny
[docs] def get_projection(self, *, projection, position, pad=True, Fourier=False, aligned=True, magnification=True): #TODO: consider renaming just projection() to get data.projection(1,23) # For integration with rest, necessary(?) conversion from position 0 to 1 projection_prefix = self.path / (self.name + '_' + str(self.position_number[position]) + '_/') projection_filename = projection_prefix / (self.name + '_' + str(self.position_number[position]) + '_' + str(projection).zfill(4) + '.edf') projection_image = EdfFile(projection_filename).GetData(0) xpad = (self.nfx-self.nx)//2 ypad = (self.nfy-self.ny)//2 if self.preprocessing: dark_filename = projection_prefix / 'dark.edf' #print(dark_filename) dark_image = EdfFile(dark_filename).GetData(0) # Get previous ff reference_index_1 = (self.reference_interval * (projection // self.reference_interval)) reference_index_2 = min(reference_index_1+self.reference_interval, self.nbprojections) # Weights flat fields weight_1 = (reference_index_2-projection)/max(reference_index_2-reference_index_1, 1) weight_2 = 1 - weight_1 # Open 'previous' flat reference_filename_1 = projection_prefix / ('refHST' + str(reference_index_1).zfill(4) + '.edf') #print(reference_filename_1) reference_image_1 = EdfFile(reference_filename_1).GetData(0) - dark_image # open 'next' flat reference_filename_2 = projection_prefix / ('refHST' + str(reference_index_2).zfill(4) + '.edf') #print(reference_filename_2) reference_image_2 = EdfFile(reference_filename_2).GetData(0) - dark_image # reference for current projection reference_image = weight_1*reference_image_1 + weight_2*reference_image_2 # Open projection image projection_image = (projection_image-dark_image) / reference_image if self.magnification and magnification: if self.magnification_factor[position] != self.max_magnification_factor: # Unless its the highest magnification projection_image = ndimage.zoom(projection_image, self.max_magnification_factor/self.magnification_factor[position]) #TODO: Simple approach first. Magnification and center should perhaps be calculated in shifts? correction = np.zeros(3) projection_image = np.pad(projection_image, ((ypad, ypad), (xpad, xpad)), 'edge') #TODO: watch out with magnification if self.correct_motion and self.motion_corrected and aligned: #Apply motion #TODO: Is there any reason to not apply motion correction to reconstructed image like this? #TODO: Should be better to just motion correct the reference plane no? motion = self.get_motion(projection, self.reference_position) correction[0] = motion[0] * self.max_magnification_factor/self.magnification_factor[self.reference_position] correction[1] = motion[1] * self.max_magnification_factor/self.magnification_factor[self.reference_position] if self.correct_alignment and self.aligned and aligned: correction = correction + self.get_alignment(projection, position) # TODO: Why am I not using freq. variables correctly? investigate # TODO: Handle padding somehow/correcly if sum(correction*correction)**0.5 > 0.001: f = np.arange(0, self.nfx//2, 1) f = f * ((correction[0])*2*pi / self.nfx) f = np.exp(1j*f) #f = np.cos(f)-1j*np.sin(f) f = np.hstack((f, np.conj(f[::-1]))) f = np.tile(f, (self.nfy, 1)) g = np.arange(0, self.nfy//2, 1) g = g * ((correction[1]) * 2 * pi / self.nfy) g = np.exp(1j*g) #g = np.cos(g)-1j*np.sin(g) g = np.hstack((g, np.conj(g[::-1]))) g = np.tile(g, (self.nfx, 1)) g = g.transpose() t = f * g projection_image_FT = np.fft.fft2(projection_image) * t else: projection_image_FT = np.fft.fft2(projection_image) projection_image = np.real(np.fft.ifft2(projection_image_FT)) projection_image = projection_image[self.ny//2:-self.ny//2, self.nx//2:-self.nx//2] # watch out how to keep track... corrected image is already in FD #fig = pyplot.imshow(np.real(np.fft.ifft2(projection_image))) #pyplot.colorbar() if Fourier: return projection_image_FT else: if pad: projection_image = np.pad(projection_image, ((ypad, ypad), (xpad, xpad)), 'edge') return projection_image
[docs] def preprocess(self): """Runs all preprocessing on the dataset: axis positions, motion, reference position, alignment""" self.calculate_axis_positions() # TODO: Verify order of shifts, motion, rotaxs self.calculate_motion() self.calculate_reference_position() self.align() self.fit_alignment() #self.display_alignment() #TODO: does not work for the moment self._write_parameter_file()
def _write_parameter_file(self): """Writes current state to the associated parameter file""" parameters = dict(reference_position=self.reference_position, nD=self.nD, nbprojections=self.nbprojections, position_number=self.position_number, position=self.position.tolist(), dss=self.distance_source_sample, energy=self.energy, detector_pixel_size=self.detector_pixel_size, nx=self.nx, ny=self.ny, axis_position=self.axis_position.tolist(), reference_interval=self.reference_interval, scan_range=self.scan_range, version=self.version, alpha=self.alpha.tolist(), delta_beta=self.delta_beta) with open(self.dataset_parameter_filename, 'w') as f: yaml.dump(parameters, f) pass def _read_parameter_file(self): """Reads the associated parameter file.""" with open(self.dataset_parameter_filename, 'r') as f: parameters=yaml.safe_load(f) self.reference_position=parameters['reference_position'] self.nD=parameters['nD'] self.nbprojections=parameters['nbprojections'] self.position_number=parameters['position_number'] self.position=np.array(parameters['position']) self.distance_source_sample=parameters['dss'] self.energy=parameters['energy'] self.detector_pixel_size=parameters['detector_pixel_size'] self.nx=parameters['nx'] self.ny=parameters['ny'] self.axis_position=np.array(parameters['axis_position']) self.reference_interval=parameters['reference_interval'] self.scan_range=parameters['scan_range'] self.version=parameters['version'] self.alpha=np.array(parameters['alpha']) self.delta_beta=parameters['delta_beta'] pass
[docs] def populate(self): """Tries to find the dataset parameters in the accompanying *.info* and *xml* files. It is called when a parameter file ``pyphase_parameter.yml`` is not found. Returns ------- self Returns *self* with the updated attributes. """ # self.N # projection angles # Checks number of directories named 'name_%d_' print(self.path) type(self.path) directories = os.listdir(self.path) name_regex = r"%s(\_)(\d)(\_)" % self.name #TODO: Too unreadable. Avoid cryptic code data_dirs = [dir for dir in directories if re.match(name_regex, dir)] self.alpha = np.array([-8, -10]) #TODO: default value... should this really go here? self.nD = len(data_dirs) # number of distances self.position_number = np.arange(1, self.nD + 1).tolist() #TODO: Not nice, should be based on the generic paths fname_info = self.path / (self.name + '_1_') / (self.name + '_1_' + '.info') fname_xml = fname_info.with_suffix('.xml') print(fname_info) print(fname_xml) if fname_xml.exists(): xml_file = minidom.parse(str(fname_xml)) dists = [] for dist in range(1, self.nD + 1): fname = self.path / (self.name + '_' + str(dist) + '_') / (self.name + '_' + str(dist) + '_' + '.xml') print(fname) xml_f = minidom.parse(str(fname)) distance = float(xml_f.getElementsByTagName('distance')[0].firstChild.data) dists.append(distance) self.reference_position = 0 self.nbprojections = int(xml_file.getElementsByTagName('tomo_N')[0].firstChild.data) self.position = np.array(dists) / 1000 self.distance_source_sample = 144 self.energy = float( xml_file.getElementsByTagName('energy')[0].firstChild.data) self.detector_pixel_size = float(xml_file.getElementsByTagName('pixelSize')[0].firstChild.data) self.nx = int(xml_file.getElementsByTagName('DIM_1')[0].firstChild.data) self.ny = int(xml_file.getElementsByTagName('DIM_2')[0].firstChild.data) self.reference_interval = int(xml_file.getElementsByTagName('ref_On')[0].firstChild.data) self.scan_range = int(xml_file.getElementsByTagName('scanRange')[0].firstChild.data) elif fname_info.exists(): # Adding file information into a dictionary parameter_dict = {} with open(fname_info) as file: for line in file: listedline = line.strip().split('=') parameter_dict[listedline[0]] = listedline[1].strip() #self.reference_distance = 1 # stationary plane in registration #D# TODO: Where to find this? self.nbprojections = int(parameter_dict['TOMO_N']) # Reading distances from the .info files dists = [] for dist in range(1, self.nD + 1): fname = elf.path / (self.name + '_' + str(dist) + '_') / (self.name + '_' + str(dist) + '_' + '.info') with open(fname) as file: for i, line in enumerate(file): if i == 1: # 'distance' is in the second line listedline = line.strip().split('=') distance = float(listedline[1].strip()) * 0.01 dists.append(distance) self.position = np.array(dists) / 1000 self.distance_source_sample = 145 # Distance Source - Sample TODO: Where to find this? self.energy = float(parameter_dict['Energy']) # keV self.detector_pixel_size = float(parameter_dict['Optic used']) self.nx = int(parameter_dict['Dim_1']) self.ny = int(parameter_dict['Dim_2']) self.reference_interval = int(parameter_dict['REF_ON']) self.scan_range = int(parameter_dict['ScanRange']) else: raise Exception('Supporting files '+ self.name +'_1_.info and ' + self.name +'_1_.xml not found.')
[docs] def GetSinogram(self, distance=1): """Return projections as sinogram Parameters ---------- distance : int Number of the distance of the projection. Returns ------- sinogram Returns projections stacked as sinogram """ #TODO: RIT/YAGNI. Add functionality as needed #TODO: Probably has to be able to split over lines for serialisation/parallellisation #Allocate a 3D numpy array of correct dimensions sinogram = np.empty([self.nx, self.ny, self.nbprojections]) #Load projections one by one for projection in range(self.nbprojections): sinogram[:,:,projection]=self.get_projection(projection, self.position_number[distance]) #return return sinogram
[docs] def get_image(self, *, projection, image_type='phase', Fourier=False, pad=False): """Get reconstructed images (phase by default) Parameters ---------- projection : int Number of the projection. image_type : str, optional Can be phase or attenuation. Fourier : bool Return Fourier transform of image. pad : bool Return padded image. Returns ------- image : ndarray """ if image_type.lower() == 'attenuation': fname = self.attenuation_prefix.as_posix()+'_'+str(projection).zfill(4)+'.edf' # elif 'propagated' in argv: # fname = self.propagated_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf' # elif 'difference' in argv: # fname = self.difference_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf' # elif 'prior' in argv: # fname = self.prior_forward_prefix.as_posix()+('_'+str(projection).zfill(4)+'.edf') elif image_type.lower() == 'phase': fname = self.phase_prefix.as_posix()+'_'+str(projection).zfill(4)+'.edf' #TODO: separate file type... else: raise TypeError("proj_type must be one of the following: 'phase', 'attenuation'" ) image = EdfFile(fname).GetData(0) xpad = (self.nfx-self.nx)//2 ypad = (self.nfy-self.ny)//2 if Fourier: #TODO: Refactor into an FT function in utilities (f ex) return np.fft.fft2(np.pad(image, ((ypad, ypad), (xpad, xpad)), 'edge')) else: if pad: image = np.pad(image, ((ypad, ypad), (xpad, xpad)), 'edge') return image
[docs] def calculate_axis_position(self, distance): """Calculates the axis position at one position""" # During/after scan # 180/360 correct_alignment = self.correct_alignment self.correct_alignment = 0 correct_motion = self.correct_motion self.correct_motion = 0 if self.scan_range == 360: projection_0 = self.nbprojections + 4 projection_180 = self.nbprojections + 2 else: projection_0 = self.nbprojections + 2 projection_180 = self.nbprojections im0 = self.get_projection(projection=projection_0, position=distance) im180 = np.fliplr(self.get_projection(projection=projection_180, position=distance)) field, image_transormed, transform_parameters = self.registrator.register(im0, im180) offset = field[0][0][0]/2 print("Rotation axis offset: {}".format(offset)) self.axis_position[distance] = self.nx/2 + offset print("Axis position: {}".format(self.axis_position[distance])) self.correct_alignment = correct_alignment self.correct_motion = correct_motion
[docs] def calculate_axis_positions(self): """Calculates all axis positions.""" self.axis_position = np.zeros(self.nD) for distance in range(self.nD): self.calculate_axis_position(distance) self._write_parameter_file()
[docs] def calculate_motion(self): """Estimates motion in the scans based on supplementary images.""" # define reference positions self.motion_correlations = [ [] for i in range(self.nD) ] self.motion_corrected = 0 correct_alignment = self.correct_alignment self.correct_alignment = 0 reference_positions = range(0,self.scan_range, 90) reference_position_indices = range(self.nbprojections+int(self.scan_range/90), self.nbprojections, -1) image = [] reference = [] for Distance in range(self.nD): print("Calculating motion position {}".format(str(Distance))) for index, position in enumerate(reference_positions): # Get reference projecions and scan projections at reference positions image_index = self.nbprojections * (position/self.scan_range) reference = self.get_projection(projection=reference_position_indices[index], position=Distance) image = self.get_projection(projection=floor(image_index), position=Distance) if image_index != floor(image_index): print("Interpolating projection for motion estimation") image2 = self.get_projection(floor(image_index)+1, Distance) image = (floor(image_index)+1-image_index)*image + (image_index-floor(image_index))*image2 # calculate correlations field, aligned_image, transform_parameters = self.registrator.register(image, reference) # Will create a list of lists with [distance][angle (corresponding to ref positions, 0=0, 1=90, ...)][dim (0=x,1=y,...)] self.motion_correlations[Distance].append([field[0][0][0], field[1][0][0]]) # save motions with open(self.motion_filename, 'wb') as f: pickle.dump(self.motion_correlations, f, pickle.HIGHEST_PROTOCOL) self.motion_corrected=1 self.correct_alignment = correct_alignment pass
[docs] def get_motion(self, projection, distance): """Returns estimation motion at one projection and position.""" #TODO: disgusting, needs cleaning # fit motion with sinusoids (?) #TODO: Should be called from get_projection no??? if self.correct_motion and self.motion_corrected: reference_positions = np.arange(0, self.scan_range, 90) projections_all = np.arange(self.nbprojections+len(reference_positions)+1) polynomial_order_h = min(floor(len(reference_positions)/2), 3) polynomial_order_v = min(len(reference_positions), 3) arg = np.array(reference_positions)/self.scan_range - 1 # TODO: to be extended in case of more images a la id16a? arg_all = projections_all/self.nbprojections - 1 corr=np.array(self.motion_correlations[distance-1]) # Vertical fit M = np.tile(arg, [polynomial_order_v, 1]) for k in range(1, polynomial_order_v): M[k,:] = M[k,:]*M[k-1,:] M = -np.flipud(M) #TODO: could certainly be more elegant... coefficients = np.dot(np.linalg.pinv(M).T, corr[:,1]) # subtract mean from vertical motion ref_v = np.polyval(np.append(coefficients,0), arg_all) ref_v_mean = np.mean(ref_v); ref_v -= ref_v_mean; ref_v_meas = -corr[:,1]-ref_v_mean # Horizontal fit M = np.tile(arg, [polynomial_order_h, 1]) for k in range(1, polynomial_order_h): M[k,:] = M[k,:]*M[k-1,:] M = -np.flipud(M) #TODO: could certainly be more elegant... M = np.append(M*np.tile(np.cos((np.pi/180)*reference_positions),[polynomial_order_h,1]), M*np.tile(np.sin((np.pi/180)*reference_positions),[polynomial_order_h,1]),axis=0) coefficients = np.dot(np.linalg.pinv(M).T, corr[:,0]) sx_cal = np.polyval(np.append(coefficients[0:polynomial_order_h], 0), arg_all); sy_cal = np.polyval(np.append(coefficients[polynomial_order_h:], 0), arg_all); arg = projections_all*(self.scan_range/180*pi/self.nbprojections); reference_position_indices = self.nbprojections + np.arange(int(self.scan_range/90), 0, -1) arg[reference_position_indices] = np.array(reference_positions)*(pi/180) M = np.vstack((np.cos(arg), np.sin(arg))); ref_h = sx_cal*M[0,:] + sy_cal*M[1,:] #Fit with sinusoid coefficients = np.dot(np.linalg.pinv(M).T,ref_h) sx_cal -= coefficients[0]; sy_cal -= coefficients[1]; ref_h -= np.dot(coefficients,M); index_rots = np.round(np.array(reference_positions)*(self.nbprojections/self.scan_range)) ref_h_meas=-corr[:,0] ref_h_meas -= np.dot(coefficients, M[:,index_rots.astype(int)]) #pyplot.figure() #pyplot.plot(projections_all, ref_v, 'b', index_rots, ref_v_meas, 'bx', projections_all, ref_h, 'r', index_rots, ref_h_meas, 'rx') return ref_h[projection], ref_v[projection] else: return 0, 0
[docs] def calculate_reference_position(self): """Calculates and sets reference position based on motion""" #TODO: add check if motion is calculated (self.motion_correlations) #TODO: manipulation of the motion list is quite roundabout correlation_amplitudes = [] for index in range(self.nD): tmp = [item for sublist in self.motion_correlations[index] for item in sublist] #flatten list tmp = [x**2 for x in tmp] #square elements correlation_amplitudes.append(sqrt(sum(tmp))) self.reference_position = correlation_amplitudes.index(min(correlation_amplitudes)) print('Reference position: {}'.format(self.reference_position)) self._write_parameter_file()
# def IsAligned(self): # return self.aligned # def ClearShifts(self, D): # """clear shifts. Either 1 distance or D='all'""" # pass
[docs] def fit_alignment(self): """Fits measured alignment parameters with polynomials""" nb_shifts=len(self.shifts) projection=[0 for x in range(nb_shifts)] x=[[0 for x in range(nb_shifts)] for y in range(self.nD)] y=[[0 for x in range(nb_shifts)] for y in range(self.nD)] theta=[[0 for x in range(nb_shifts)] for y in range(self.nD)] for p_i in range(nb_shifts): projection[p_i]=self.shifts[p_i][0] for d_i in range(self.nD): for p_i in range(nb_shifts): x[d_i][p_i] = self.shifts[p_i][1][d_i][0] y[d_i][p_i] = self.shifts[p_i][1][d_i][1] #theta[dndx][projndx]=self.shifts[projndx][1][dndx][2] for d_i in range(self.nD): self.shift_polynomial_x[d_i]=np.polyfit(projection, x[d_i], self.shift_polynomial_order[0]) self.shift_polynomial_y[d_i]=np.polyfit(projection, y[d_i], self.shift_polynomial_order[1]) pass
[docs] def display_alignment(self, projection=0): """Displays alignment and fit.""" # TODO: quite a lot of duplicate code with FitShifts nb_shifts=len(self.shifts) projection_range=[0 for x in range(nb_shifts)] x=[[0 for x in range(nb_shifts)] for y in range(self.nD)] y=[[0 for x in range(nb_shifts)] for y in range(self.nD)] theta=[[0 for x in range(nb_shifts)] for y in range(self.nD)] X=np.linspace(0, self.nbprojections-1, self.nbprojections) for d_i in range(self.nD): for p_i in range(nb_shifts): x[d_i][p_i] = self.shifts[p_i][1][d_i][0] y[d_i][p_i] = self.shifts[p_i][1][d_i][1] #theta[dndx][projndx]=self.shifts[projndx][1][dndx][2] for p_i in range(nb_shifts): projection_range[p_i]=self.shifts[p_i][0] pyplot.figure() for index, distance in enumerate(self.position_number_no_reference_position): pyplot.subplot(len(self.position_number_no_reference_position), 2, 2*(index)+1) pyplot.plot(projection_range, x[distance-1], 'rx', X, np.polyval(self.shift_polynomial_x[distance-1], X), 'b') pyplot.title('Distance '+str(distance)) for index, distance in enumerate(self.position_number_no_reference_position): #yi=np.polyfit(projection_range, y[distance], self.shift_polynomial_order[1]) pyplot.subplot(len(self.position_number_no_reference_position), 2, 2*(index)+2) pyplot.plot(projection_range, y[distance-1], 'rx', X, np.polyval(self.shift_polynomial_y[distance-1], X), 'b') pyplot.title('Distance '+str(distance)) pyplot.subplots_adjust(hspace=.6, left=.07, bottom=0.05, right=.98, top=.95) projection_images = np.zeros([self.ny, self.nx, self.nD]) for distance in range(1, self.nD+1): projection_images[:,:,distance-1] = self.get_projection(projection=projection, position=distance) viewer.display_stack(projection_images)
# def SetShift(self, projection, distance, x, y, theta=0): # pass
[docs] def get_alignment(self, projection, distance): """Returns the transform parameters for alignment at one projection and position""" #TODO: stub # if distance == self.reference_distance or self.correct_alignment == 0: # shift_x = 0 # shift_y = 0 # shift_theta = 0 # else: shift_x = np.polyval(self.shift_polynomial_x[distance-1], projection) shift_y = np.polyval(self.shift_polynomial_y[distance-1], projection) shift_theta = 0 return [shift_x, shift_y, shift_theta]
[docs] def align(self, interval=100): """ Align(self, RA, interval=100) Align complete data set. TODO: Should probably check for the last projection also so that it is always included """ self.position_number_no_reference_position = self.position_number.copy() self.position_number_no_reference_position.remove(self.reference_position) correct_alignment = self.correct_alignment self.correct_alignment = 0 correct_motion = self.correct_motion self.correct_motion=0 self.shifts = SortedList() for projection in range(0, self.nbprojections, interval): text = "\r" + "Aligning projection {}".format(projection) sys.stdout.write(text) sys.stdout.flush() self.align_projection(projection) # Pickle the 'data' dictionary using the highest protocol available. with open(self.shift_filename, 'wb') as f: pickle.dump(self.shifts, f, pickle.HIGHEST_PROTOCOL) self.aligned=True self.correct_alignment = correct_alignment self.correct_motion = correct_motion
[docs] def align_projection(self, projection): """ projection = number of projection to register """ correct_alignment = self.correct_alignment self.correct_alignment = 0 im=[] # read images for position in range(self.nD): im.append(self.get_projection(projection=projection, position=position)) # register (should be done with RA) tmp=[] for ndx in range(self.nD): if ndx != self.reference_position: field, aligned_image, transform_parameters = registrator.register(im[ndx], im[self.reference_position]) tmp.append([field[0][0][0], field[1][0][0]]) else: tmp.append([0, 0]) # store shifts self.shifts.add([projection, tmp]) with open(self.shift_filename, 'wb') as f: pickle.dump(self.shifts, f, pickle.HIGHEST_PROTOCOL) self.correct_alignment = correct_alignment
#TODO: Difference just shouldn't be here.. # def DifferenceProjection(self, projection, distance): # # Calculate difference measured/calculated intensity # # TODO: Should eventually be an option if it's tomo or not? # # TODO: Should this really go here? # # Open measured # measured_projection = self.get_projection(projection, distance) # # Get calculated # calculated_projection = self.get_image(projection, 'propagated', distance) # # Calculate difference (correct order?) # difference_image = measured_projection - calculated_projection # # Save difference # #TODO: IMPORTANT: USE DATASET TO READ/WRITE IMAGES!!! # fname = self.path + '/' + self.name + '_1_/' + self.name + '_1_0000.edf' # TODO:Fulhack deluxe to get edfheader. Whatever. Stub/spike. Fix later # imEDF = EdfFile(fname) # EDFHeader = imEDF.GetHeader(0) # time.sleep(1) #TODO: This is ridiculous, how can this be necessary? What is causing this writing problem? # # TODO: the difference files should be handled in constructor as well I suppose # fname = self.difference_prefix + '_' + str(distance) + '_' + str(projection).zfill(4) + '.edf' # EDF = EdfFile(fname) # # TODO: verify what should go into the header... # EDF.WriteImage(EDFHeader, difference_image, 0, "Float") # def DifferenceProjections(self, projections, distance): # if projections[0] > projections[1]: # projections[1]=projections[0] # for projection in range(projections[0], projections[1]+1): # self.DifferenceProjection(projection, distance) # def Difference(self): # #TODO: Needs to be parallelised... how? Come up with something... # parallelizer = Parallelizer.OAR() # for distance in range(1, self.nD+1): # parallelizer.Launch(self, 'difference', distance=distance) # def UpdatePhase(self): # # add update to current solution # current_solution = np.memmap(self.phase_vol_filename, dtype=np.float32) # update = np.memmap(self.update_vol_filename, dtype=np.float32) # current_solution += update # del current_solution, update # def UpdateAttenuation(self): # current_solution = np.memmap(self.retrieved_attenuation_vol_filename, dtype=np.float32) # update = np.memmap(self.attenuation_update_vol_filename, dtype=np.float32) # current_solution -= update # del current_solution, update # def write_image(self, data, proj_type, projection_number, *args):
[docs] def write_image(self, *, image, projection, projection_type='phase'): # TODO: raise error where needed! number of args, args type... ''' Saves images into file. Parameters ---------- data : ndarray An ndarray of the image to save. projection_number : int The number of the projection to be saved. proj_type : str A string containing the prefix for the name of the file to be saved. args: int Number of the distance of the projection. Returns ------- None Saves the images into the file ``[prefix]_[projection_number].edf`` or ``[prefix]_[distance]_[projection_number].edf`` into the ``name_`` directory. ''' if projection_type.lower() == 'phase': pref = self.phase_prefix elif projection_type.lower() == 'attenuation': pref = self.attenuation_prefix # elif projection_type.lower() == 'phase update': # pref = self.update_prefix # elif projection_type.lower() == 'attenuation update': # pref = self.attenuation_update_prefix # elif projection_type.lower() == 'intensity': # pref = self.propagated_prefix else: raise TypeError("proj_type must be one of the following: 'phase', 'attenuation'" ) if not pref.parent.exists(): pref.parent.mkdir(parents=True) # if len(args) != 0: # distance = str(args[0]) + '_' # else: # distance = '' # fname = self.path / (self.name + '_1_/' + self.name + '_1_0000.edf') # TODO:Fulhack deluxe imEDF = EdfFile(fname) EDFHeader = imEDF.GetHeader(0) #fname = pref.as_posix() + '_' + position + str(projection).zfill(4) + '.edf' fname = pref.as_posix() + '_' + str(projection).zfill(4) + '.edf' EDF = EdfFile(fname) EDF.WriteImage(EDFHeader, image, 0, "Float")
#D# TODO: These methods are to be used in propagator.py (for later) #TODO: Seems like a mistake, since get_image already exists... I guess both could exist? Ev. Refactor # def get_attenuation(self, projection_number): # ''' Reads attenuation information from file and retruns it as an ndarray. # Parameters # ---------- # projection_number : int # The number of the projection to get. # Returns # ------- # ndarray # Array containing attenuation information. # ''' # projection = projection_number # fname = self.retrieved_attenuation_forward_prefix + '_' + str(projection).zfill(4) + '.edf' # imEDF = EdfFile(fname) # attenuation = imEDF.GetData(0) # return attenuation # def get_phase(self, projection_number): # ''' Reads phase information from file and retruns it as an ndarray. # Parameters # ---------- # projection_number : int # The number of the projection to get. # Returns # ------- # ndarray # Array containing phase information. # ''' # projection = projection_number # fname = self.phase_forward_prefix + '_' + str(projection).zfill(4) + '.edf' # imEDF = EdfFile(fname) # phase = imEDF.GetData(0) # return phase #class ESRF(Dataset): #maybe necessary at some point # pass #class ESRFID19(Dataset): # pass #class ESRFID16A(Dataset): # def __init__(self): # super().__init__() # def populate(self): # pass