#This file is part of the PyPhase software.alpha
#
#Copyright (c) Max Langer (2019)
#
#max.langer@creatis.insa-lyon.fr
#
#This software is a computer program whose purpose is to allow development,
#implementation, and deployment of phase retrieval algorihtms.
#
#This software is governed by the CeCILL license under French law and
#abiding by the rules of distribution of free software. You can use,
#modify and/ or redistribute the software under the terms of the CeCILL
#license as circulated by CEA, CNRS and INRIA at the following URL
#"http://www.cecill.info".
#
#As a counterpart to the access to the source code and rights to copy,
#modify and redistribute granted by the license, users are provided only
#with a limited warranty and the software's author, the holder of the
#economic rights, and the successive licensors have only limited
#liability.
#
#In this respect, the user's attention is drawn to the risks associated
#with loading, using, modifying and/or developing or reproducing the
#software by the user in light of its specific status of free software,
#that may mean that it is complicated to manipulate, and that also
#therefore means that it is reserved for developers and experienced
#professionals having in-depth computer knowledge. Users are therefore
#encouraged to load and test the software's suitability as regards their
#requirements in conditions enabling the security of their systems and/or
#data to be ensured and, more generally, to use and operate it in the
#same conditions as regards security.
#
#The fact that you are presently reading this means that you have had
#knowledge of the CeCILL license and that you accept its terms.
import os
import os.path
import sys
import numpy as np
import yaml
import re
#from sortedcontainers import SortedList
from vendor.EdfFile import EdfFile
import pickle
from math import *
import time
from xml.dom import minidom
import matplotlib.pyplot as pyplot
import scipy.ndimage as ndimage
from pathlib import Path
import h5py
if os.name == 'posix':
import fcntl #TODO:from importlib import reload # python 2.7 does not require this
from pyphase.config import *
import vendor.pyelastix as pyelastix
[docs]class Dataset:
"""
Abstract class representing a set of recorded images, acquisition
parameters, reconstructed images and any intermediary images.
Attributes
----------
"""
def __init__(self, name: str, path: str='.', version: str='master'):
"""
Template for initialisation of a Dataset.
Parameters
----------
name : str
Name of the dataset. Corresponds to the name of the file(s) or
directory(ies) of the recorded data.
path : str, optional
Path to the file(s) or directory(ies) of the recorded data
version : str, optional.
Version of dataset file. To permit tests of different alignment
and reconstruction parameters.
"""
pass
def _initialise(self):
self.motion_corrected = 0
self.aligned = 0
# TODO: Should erase files also
self.populate()
self._write_parameter_file()
[docs] def align_projection(self, *, projection=0, save_projection=False):
"""
Aligns images at different positions at one projection angle
Aligns (registers) images taken at different positions using the
default registration algorithm (pyphase.registrator).
Parameters
----------
projection : int, optional
Number of projection to register.
save_projection : bool, optional
Save aligned images to file (creates new data files)
"""
#%% Images read should not be aligned if alignent exist.
# TODO: necessary?
correct_alignment = self.correct_alignment
self.correct_alignment = 0
#%% Read intensities
I_D = np.zeros((len(self.position), self.ny, self.nx))
for N in self.position:
I_D[N]= self.get_projection(projection=projection, position=N, aligned=False, pad=False)
#%% register images
alignment = np.zeros((len(self.position),registrator.number_of_parameters)) #TODO: check existance
for N in self.position:
if N != self.reference_position:
print('Aligning position '+ str(N))
field, im1reg, transform_parameters = registrator.register(I_D[N], I_D[self.reference_position])
alignment[N] = transform_parameters
else:
im1reg = I_D[self.reference_position] #TODO: Maybe not the safest to take im1reg but rather transform image?
if save_projection: #TODO: This should be generalised somehow I suspect
aligned_filename = self.path + '/' + self.name + '_aligned' + '_dist' + str(N) + '.h5'
with h5py.File(aligned_filename, 'a') as f:
if not 'data' in f.keys():
f.create_group(self.cpr_prefix)
if str(projection).zfill(6) in f[self.cpr_prefix].keys():
f[self.cpr_prefix+str(projection).zfill(6)][:] = im1reg
else:
f.create_dataset(self.cpr_prefix+str(projection).zfill(6), data=im1reg)
#%% Store alignment
#TODO: Check for length of trasnform parameterrs
#TODO: Quick and dirty implementation wait for write access parallel
while True:
try:
with h5py.File(self.dataset_filename, 'a') as f:
if 'alignment' in f.keys():
f['alignment'][projection] = alignment
else:
f.create_dataset('alignment', data=np.zeros((self.number_of_projections, len(self.position), registrator.number_of_parameters)))
f['alignment'][projection] = alignment
break # Success!
except OSError:
time.sleep(1) # Wait a bit
self.correct_alignment = correct_alignment
[docs] def get_alignment(self, *, projection, position):
"""
Get transform parameters for alignment.
Returns
-------
transform_parameters : numpy array
Array of transform parameters. Size depends on transform used.
"""
with h5py.File(self.dataset_filename, 'r') as f:
transform_parameters = f['alignment'][projection, position, :]
return transform_parameters
[docs] def write_image(self, *, image, projection, projection_type='phase'):
# TODO: raise error where needed! number of args, args type...
''' Saves images into file.
Parameters
----------
image : ndarray
An ndarray of the image to save.
projection : int
The number of the projection to be saved.
proj_type : str
A string containing the prefix for the name of the file to be saved.
position: int
Number of the position of the projection (for propagation).
Returns
-------
None
Saves the images into the file ``[prefix]_[projection_number].edf`` or
``[prefix]_[distance]_[projection_number].edf`` into the ``name_`` directory.
'''
if projection_type.lower() == 'phase':
prefix = self.phase_prefix
elif projection_type.lower() == 'attenuation':
prefix = self.attenuation_prefix
# elif proj_type.lower() == 'phase update':
# pref = self.update_prefix
# elif proj_type.lower() == 'attenuation update':
# pref = self.attenuation_update_prefix
# elif proj_type.lower() == 'intensity':
# pref = self.propagated_prefix
else:
raise TypeError("proj_type must be one of the following: 'phase', 'attenuation', 'phase update', 'attenuation update', 'intensity'" )
projection_name = str(projection).zfill(6)
# TODO: Initial file locking implementation for parallelisation on LINUX/SLURM
lock_filename = self.dataset_filename.with_suffix(self.dataset_filename.suffix+'.lock')
try:
lock_file = open(lock_filename, 'a') # Create a lock file next to dataset file (TODO: Consider making invisible at least)
if os.name == 'posix': #TODO: quick fix for lock file. Shouldn be necessary on windows due to no such parallelism? WHat about future multithread?
fcntl.lockf(lock_file, fcntl.LOCK_EX) # Acquire a lockf lock (holds inter-process but not intra?)
with h5py.File(self.dataset_filename, 'a') as f:
data = f.require_dataset(prefix+projection_name, shape=image.shape, dtype=image.dtype)
data[:] = image
lock_file.close() # Release lock
except OSError:
time.sleep(1) # Wait a bit
[docs] @Parallelize # TODO:How to get to reload from config?
def align_projections(self, *, projections):
"""
Align a series of projections.
Parameters
----------
projections : tuple of ints
In the form [start, end]
"""
for projection in range(projections[0], projections[1]+1):
self.align_projection(projection=projection)
[docs] def get_image(self, *, projection, image_type='phase', Fourier=False, pad=False):
"""Get reconstructed images (phase by default)
Parameters
----------
projection : int
Number of the projection.
image_type : str, optional
Can be phase or attenuation.
Fourier : bool
Return Fourier transform of image.
pad : bool
Return padded image.
Returns
-------
image : ndarray
"""
#TODO: handling majuscule minuscule
# if 'distance' in kwargs:
# distance = kwargs.get('distance') #TODO: Needs error handling
if image_type == 'attenuation':
prefix = self.attenuation_prefix
# elif 'propagated' in argv:
# fname = self.propagated_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf'
# elif 'difference' in argv:
# fname = self.difference_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf'
# elif 'prior' in argv:
# fname = self.prior_forward_prefix.as_posix()+('_'+str(projection).zfill(4)+'.edf')
else:
prefix = self.phase_prefix #TODO: separate file type...
with h5py.File(self.dataset_filename, 'r') as f:
image=f[prefix+str(projection).zfill(6)][:]
if Fourier:
#TODO: Refactor into an FT function in utilities (f ex)
return np.fft.fft2(Utilities.resize(image, (self.nfx,self.nfy)))
elif pad:
image = Utilities.resize(image, (self.nfx,self.nfy))
return image
[docs]class Nanomax(Dataset):
"""Class for raw Nanomax data sets.
Attributes
----------
ztot : float
Focus to detector distance in m.
path : str
Path to dataset.
version : str
Version of Dataset backend structure, e.g. 'TIEHOM'
name : str
Name of dataset
frames_per_position : int
Number of frames per position.
initial_frame : int
Number of the initial frame.
projection_prefix : str
HDF5 path to projection data.
reference_position : int
Position number as reference for alignment (usually highest resolution)
diode_name : str
Name of diode (HDF path) for normalisation
darkfield_prefix : str
HDF5 path prefix to darkfields
flatfield_prefix : str
HDF5 path prefix to flatfields
cpr_prefix : str
HDF5 path prefix to flatfield corrected images
phase_prefix : str
HDF5 path prefix to phase maps
attenuation_prefix : str
HDF5 path prefix to attenuation images
aligned : int
Flag if dataset is aligned or not.
"""
def __init__(self, name: str,path: str='.', version: str= 'ḿaster'):
self.ztot = 1.123 # Focus to detector distance in m
self.path = path
self.version = version
self.name = name
self.data_basename = self.path + '/' + self.name + '/' #TODO: Attribute?
self.dataset_filename = pyphase_path / (self.name + '_' + self.version + '.h5') #TODO: Attribute?
self.frames_per_position = 4
self.initial_frame = 0
self.projection_prefix = 'entry/measurement/andor/frames/'
self.reference_position = 0
self.correct_alignment = 1
self.measurement_prefix = 'entry/snapshot/'
self.pixel_size_filename = path + '/' 'pixelsizes.txt'
self.diode_name = 'entry/measurement/alba2/1'
self.darkfield_prefix = 'reference_images/darkfield/'
self.flatfield_prefix = 'reference_images/flatfield/'
self.cpr_prefix = 'data/cpr/'
self.phase_prefix = '/phase/'
self.attenuation_prefix = '/attenuation/'
self.aligned=0 # TODO: Should be bool
super().__init__(name, path, version)
#TODO: Quick and dirty concession to parallel requirements
while True:
try:
with h5py.File(self.dataset_filename, 'a') as dataset_file:
if 'scan_parameters' in dataset_file.keys():
#TODO: initialise must override reading of parameter file
#self.ReadParameters()
data = dataset_file['scan_parameters']
self.energy = data['energy'][()]
self.effective_distance = data['effective_distance'][()]
self.pixel_size_x = data['pixel_size_x'][()]
self.pixel_size_y = data['pixel_size_y'][()]
self.number_of_projections = data['number_of_projections'][()]
self.nx = data['nx'][()]
self.ny = data['ny'][()]
else:
#self.Initialise()
#TODO: STub. Should take values from the data h5 file
with open(self.pixel_size_filename) as pixel_size_file:
lines = pixel_size_file.readlines()
del lines[0] # remove header line
self.pixel_size_x = np.zeros(len(lines))
self.pixel_size_y = np.zeros_like(self.pixel_size_x)
z1 = np.zeros_like(self.pixel_size_x) # Focus to sample distance (given in um in text files)
for n, line in enumerate(lines):
line = line.split()
z1[n] = float(line[0])/1e6 # Focus to sample distance (given in um in text files)
self.pixel_size_x[n] = float(line[1])
self.pixel_size_y[n] = float(line[2])
pass
z2 = self.ztot - z1 # Sample to detector distance
self.effective_distance = z1*z2 / (z1+z2)
# Read parameters from data file
projection_data_filename = self.data_basename + str(self.initial_frame+2).zfill(6) + '.h5'
with h5py.File(projection_data_filename, 'r') as data_file:
self.energy = float(data_file[self.measurement_prefix+'energy'][:]) / 1e3 # eV -> keV
self.number_of_projections = len(data_file[self.projection_prefix])
self.nx = data_file[self.projection_prefix][:].shape[2]
self.ny = data_file[self.projection_prefix][:].shape[1]
# Calculate darks & flats
print('Calculating darkfield and flatfield images')
for position in range(len(self.effective_distance)):
file_base_number = self.initial_frame + position*self.frames_per_position
# Calculate darks
dark_data_filename = self.data_basename + str(file_base_number).zfill(6) + '.h5'
with h5py.File(dark_data_filename, 'r') as data_file:
darkfields = data_file[self.projection_prefix][:]
weights = data_file[self.diode_name][:] # Read diode measurements
darkfield = np.average(darkfields,axis=0,weights=weights)
dataset_dark = dataset_file.require_dataset(self.darkfield_prefix + str(position).zfill(6), darkfield.shape, darkfield.dtype)
dataset_dark[:] = darkfield
# Calculate flats
# TODO: not cleanly implemented wrt intermediate flats (but YAGNI)
flat_beginning_data_filename = self.data_basename + str(file_base_number+1).zfill(6) + '.h5'
flatfield = np.zeros((2, self.ny, self.nx)) # TODO: Adapt to flexible number of flats (but YAGNI)
with h5py.File(flat_beginning_data_filename, 'r') as data_file:
flatfields = data_file[self.projection_prefix][:]
weights = data_file[self.diode_name][:]
flatfield[0] = np.average(flatfields, axis=0, weights=weights)
flat_end_data_filename = self.data_basename + str(file_base_number+3).zfill(6) + '.h5'
with h5py.File(flat_end_data_filename, 'r') as data_file:
flatfields = data_file[self.projection_prefix][:]
weights = data_file[self.diode_name][:]
flatfield[1] = np.average(flatfields, axis=0, weights=weights)
dataset_flat = dataset_file.require_dataset(self.flatfield_prefix + str(position).zfill(6), flatfield.shape, flatfield.dtype)
dataset_flat[:] = flatfield
g = dataset_file.create_group('scan_parameters')
g.create_dataset('energy', data=self.energy)
g.create_dataset('effective_distance', data=self.effective_distance)
g.create_dataset('pixel_size_x', data=self.pixel_size_x)
g.create_dataset('pixel_size_y', data=self.pixel_size_y)
g.create_dataset('number_of_projections', data=self.number_of_projections)
g.create_dataset('nx', data=self.nx)
g.create_dataset('ny', data=self.ny)
if 'alignment' in dataset_file.keys():
self.aligned = 1
pass #With
break # Success!
except OSError:
time.sleep(1) # Wait a bit
self.padding = 2
self.position=np.arange(len(self.effective_distance))
self.magnification_x = self.pixel_size_x[0] / self.pixel_size_x
self.magnification_y = self.pixel_size_x[0] / self.pixel_size_x
self.pixel_size = np.array([self.pixel_size_x[0], self.pixel_size_y[0]])
self.reference_interval = self.number_of_projections
@property
def nfx(self):
return self.padding * self.nx
@property
def nfy(self):
return self.padding * self.ny
[docs] def get_projection(self, *, projection, position, pad=True, magnification=True, aligned=True, Fourier=False):
"""
Read one recorded image.
Parameters
----------
projection : int
Number of projection to read.
position : int
Number of positiion ("distance") to read.
pad : bool, optional
Pads the image.
magnification : bool, optional
Brings the image to the magnification of reference_position.
aligned : bool, optional
Corrects alignment (requires alignment of projections).
Fourier : bool
Returns the Fourier transform of the image.
"""
# TODO: consider renaming just projection() to get data.projection(1,23)
# Nanomax data: frame inital_frame=dark, initial_frame+1=flat_beginning, initial_frame+2=projections, initial_frame+3=flat_end
projection_name = str(projection).zfill(6)
position_name = str(position).zfill(6)
#TODO: Quick and nasty concession to parallel computing
while True:
try:
with h5py.File(self.dataset_filename, 'r') as f:
dark_image = f[self.darkfield_prefix+position_name][:] # Read projection
flatfield_image = f[self.flatfield_prefix+position_name][:] # Read projection
break # Success!
except OSError:
time.sleep(1) # Wait a bit
file_base_number = self.initial_frame + position*self.frames_per_position + 2
#TODO: Quick and nasty concession to parallel computing
while True:
try:
with h5py.File(self.data_basename + str(file_base_number).zfill(6) + '.h5', 'r') as f:
projection_image = f[self.projection_prefix][projection] # Read projection
break # Success!
except OSError:
time.sleep(1) # Wait a bit
reference_index_1 = (self.reference_interval * (projection // self.reference_interval))
reference_index_2 = min(reference_index_1+self.reference_interval, self.number_of_projections)
# Weights flat fields
weight_1 = (reference_index_2-projection)/max(reference_index_2-reference_index_1, 1)
weight_2 = 1 - weight_1
# Calculate flat field
reference_image = weight_1*flatfield_image[0] + weight_2*flatfield_image[1]
# Calculate flat and dark field corrected projecion
projection_image = (projection_image-dark_image) / (reference_image-dark_image)
# Zoom images to same scale, corresponding to highest magnification
if position != self.reference_position and magnification: #TODO: should implement distance_number, but YAGN?
projection_image = ndimage.zoom(projection_image, [1/self.magnification_y[position], 1/self.magnification_x[position]], mode='nearest') # Zoom to highest magnification TODO: combine w/ alignment?
# Correct alignment
if aligned and self.aligned and position != self.reference_position:
transform_parameters = self.get_alignment(projection=projection, position=position) # Get alignment parameters TODO: add zoom factor here?
projection_image = registrator.apply_transformation(projection_image, transform_parameters)
if Fourier:
projection_image = Utilities.resize(projection_image, (self.nfy, self.nfx))
return np.fft.fft2(projection_image)
else:
if pad:
return Utilities.resize(projection_image, (self.nfy, self.nfx))
else:
return Utilities.resize(projection_image, (self.ny, self.nx))
[docs]class NanomaxPreprocessed(Dataset):
"""
Class for preprocessed Nanomax datasets.
Attributes
----------
ztot : float
Focus to detector distance in m.
path : str
Path to dataset.
version : str
Version of Dataset backend structure, e.g. 'TIEHOM'
name : str
Name of dataset
correct_alignent : int
Flag to correct alignment.
aligned: int
Flag for dataset aligned.
phase_prefix : str
HDF5 path prefix to phase maps
attenuation_prefix : str
HDF5 path prefix to attenuation images
projection_prefix : str
HDF5 path to corrected projections
reference_position : int
Position number as reference for alignment (usually highest resolution)
energy : float
Energy in keV
nx, ny : int
Number of pixels, horizontal, vertical
padding : int
Padding factor
detector_pixel_size : float
The detector pixel size in µm
alpha : tuple of floats
Regularization parameter in the form [LF, HF]
"""
def __init__(self, name: str, path: str='.', version: str='master'):
self.ztot = 1.123 # Focus to detector distance in m
self.path = path
self.version = version
self.name = name
self.correct_alignment = 1 #TODO: should be bool
self.aligned = 0 # TODO: inelegant solution, and should be bool
# TODO: Pre-defined parameters for the moment. Should be stored in h5 files
self.projection_prefix = '/data/cpr/'
self.phase_prefix = '/phase/'
self.attenuation_prefix = '/attenuation/'
self.cpr_prefix = '/data/cpr/'
self.reference_position = 0
self.energy = 13
self.nx = 2048
self.ny = 2048
self.padding=2
self.detector_pixel_size = 0
self.alpha = [-8, -10] #TODO: HOw should this be handled really? The reconstruction parameters (alg, params, ... should definitely be in dataset, associated to a version even!)
@property
def nfx(self):
"""Number of pixels in Fourier domain, horizontal, calculated from
nx and padding"""
return self.padding * self.nx
@property
def nfy(self):
"""Number of pixels in Fourier domain, vertical, calculated from
ny and padding"""
return self.padding * self.ny
@property
def Lambda(self):
"""Wavelength in m, calculated from energy."""
return 12.4e-10 / self.energy
[docs]class NanomaxPreprocessedTomo(NanomaxPreprocessed):
"""
Class for preprocessed Nanomax tomographic datasets.
Attributes
----------
data_basename : str
File prefix to projection data files.
magnification_x,y : numpy array
Magnification factors for each position, horizontal, vertical
pixel_size_x,y : numpy array
Effective pixel size for each position
pixel_size : numpy array
Effective pixel size at reconstruction position
"""
def __init__(self, name: str, path: str='.', version: str='master'):
#TODO: Pixel sizes must be calculated on the fly, but not possible at the moment
self.path = path
self.version = version
self.name = name
self.data_basename = self.path + '/' + self.name + '_dist'
self.dataset_filename = pyphase_path / (self.name + '_' + self.version + '.h5')
super().__init__(name, path, version)
with h5py.File(self.dataset_filename, 'a') as f:
if 'scan_parameters' in f.keys():
#TODO: initialise must override reading of parameter file
#self.ReadParameters()
data = f['scan_parameters']
self.energy = data['energy'][()]
self.effective_distance = data['effective_distance'][()]
self.pixel_size_x = data['pixel_size_x'][()]
self.pixel_size_y = data['pixel_size_y'][()]
self.number_of_projections = data['number_of_projections'][()]
else:
#self.Initialise()
#TODO: Stub. Should take values from the data h5 file
self.pixel_size_filename = path + '/' 'pixelsizes.txt'
with open(self.pixel_size_filename) as pixel_size_file:
lines = pixel_size_file.readlines()
del lines[0] # remove header line
self.pixel_size_x = np.zeros(len(lines))
self.pixel_size_y = np.zeros_like(self.pixel_size_x)
z1 = np.zeros_like(self.pixel_size_x) # Focus to sample distance (given in um in text files)
for n, line in enumerate(lines):
line = line.split()
z1[n] = float(line[0])/1e6 # Focus to sample distance (given in um in text files)
self.pixel_size_x[n] = float(line[1])
self.pixel_size_y[n] = float(line[2])
pass
z2 = self.ztot - z1 # Sample to detector distance
self.effective_distance = z1*z2 / (z1+z2)
g = f.create_group('scan_parameters')
g.create_dataset('energy', data=self.energy)
g.create_dataset('effective_distance', data=self.effective_distance)
g.create_dataset('pixel_size_x', data=self.pixel_size_x)
g.create_dataset('pixel_size_y', data=self.pixel_size_y)
with h5py.File(self.data_basename+str(0)+'.h5', 'r') as data_file:
self.number_of_projections = len(data_file[self.projection_prefix])
g.create_dataset('number_of_projections', data=self.number_of_projections)
if 'alignment' in f.keys():
self.aligned = 1
self.position=np.arange(len(self.effective_distance))
self.magnification_x = self.pixel_size_x[0] / self.pixel_size_x
self.magnification_y = self.pixel_size_x[0] / self.pixel_size_x
self.pixel_size = np.array([self.pixel_size_x[0], self.pixel_size_y[0]])
[docs] def get_projection(self, *, projection, position, pad=True, magnification=True, aligned=True, Fourier=False):
"""
Read one recorded image.
Parameters
----------
projection : int
Number of projection to read.
position : int
Number of positiion ("distance") to read.
pad : bool, optional
Pads the image.
magnification : bool, optional
Brings the image to the magnification of reference_position.
aligned : bool, optional
Corrects alignment (requires alignment of projections).
Fourier : bool
Returns the Fourier transform of the image.
"""
#TODO: consider renaming just projection() to get data.projection(1,23)
data_filename = self.data_basename + str(position) + '.h5'
projection_name = str(projection).zfill(6)
with h5py.File(data_filename, 'r') as f:
projection_image = f[self.projection_prefix+projection_name][:] # Read projection
if position != self.reference_position and magnification: #TODO: should implement distance_number, but YAGN?
projection_image = ndimage.zoom(projection_image, [1/self.magnification_y[position], 1/self.magnification_x[position]], mode='nearest') # Zoom to highest magnification TODO: combine w/ alignment?
if aligned and self.aligned and position != self.reference_position:
# TODO: Currently using similarity transform. It would be nice to be able to change transforms:
# TODO: Translation, Euler, Similarity, Affine... and handle the different length of parameter vector
transform_parameters = self.get_alignment(projection=projection, position=position) # Get alignment parameters TODO: add zoom factor here?
#projection_image = ndimage.zoom(projection_image, magnification_factor/transform_parameters[0])
#projection_image = ndimage.rotate(projection_image, np.degrees(transform_parameters[1]))
#projection_image = ndimage.shift(projection_image, [-transform_parameters[3], -transform_parameters[2]])
projection_image = registrator.apply_transformation(projection_image, transform_parameters)
if Fourier:
projection_image = Utilities.resize(projection_image, (self.nfy, self.nfx))
return np.fft.fft2(projection_image)
else:
if pad:
return Utilities.resize(projection_image, (self.nfy, self.nfx))
else:
return Utilities.resize(projection_image, (self.ny, self.nx))
[docs]class NanomaxPreprocessed2D(NanomaxPreprocessed):
"""
Class for single projection Nanomax data.
"""
def __init__(self, name: str, path: str='.', version: str='master'):
self.number_of_projections = 1 # Single projection data set
super().__init__(name, path, version)
# Define file access parameters
self.data_filename = self.path + '/' + self.name + '.h5'
self.dataset_filename = pyphase_path / (self.name + '_' + self.version + '.h5')
self.projection_prefix = '/data/cpr/'
self.phase_prefix = '/phase/'
self.attenuation_prefix = '/attenuation/'
# Read scan parameters
with h5py.File(self.dataset_filename, 'a') as f:
# If the dataset is already created, read from the file
if 'scan_parameters' in f.keys():
#TODO: initialise must override reading of parameter file
#self.ReadParameters()
data = f['scan_parameters']
self.energy = data['energy'][()]
self.effective_distance = data['effective_distance'][()]
self.pixel_size_x = data['pixel_size_x'][()]
self.pixel_size_y = data['pixel_size_y'][()]
self.number_of_projections = data['number_of_projections'][()]
else:
#Else, read from the pixelsizes.txt (temporary solution? To be defined with Nanomax)
#TODO: Stub. Should take values from the data h5 file
self.pixel_size_filename = path + '/' 'pixelsizes.txt'
with open(self.pixel_size_filename) as pixel_size_file:
lines = pixel_size_file.readlines()
del lines[0] # remove header line
self.pixel_size_x = np.zeros(len(lines))
self.pixel_size_y = np.zeros_like(self.pixel_size_x)
z1 = np.zeros_like(self.pixel_size_x) # Focus to sample distance (given in um in text files)
for n, line in enumerate(lines):
line = line.split()
z1[n] = float(line[0])/1e6 # Focus to sample distance (given in um in text files)
self.pixel_size_x[n] = float(line[1])
self.pixel_size_y[n] = float(line[2])
pass
# Calculate effective distances
z2 = self.ztot - z1 # Sample to detector distance
self.effective_distance = z1*z2 / (z1+z2)
# Create the hdf5 data
g = f.create_group('scan_parameters')
g.create_dataset('energy', data=self.energy)
g.create_dataset('effective_distance', data=self.effective_distance)
g.create_dataset('pixel_size_x', data=self.pixel_size_x)
g.create_dataset('pixel_size_y', data=self.pixel_size_y)
g.create_dataset('number_of_projections', data=self.number_of_projections)
if 'alignment' in f.keys():
self.aligned = 1
# Calculate magnifications and effective pixel size
self.magnification_x = self.pixel_size_x[0] / self.pixel_size_x
self.magnification_y = self.pixel_size_x[0] / self.pixel_size_x
self.pixel_size = np.array([self.pixel_size_x[0], self.pixel_size_y[0]])
self.position=np.arange(len(self.effective_distance))
pass # __init__
[docs] def get_projection(self, *, projection, position, pad=True, magnification=True, aligned=True, Fourier=False):
"""
Read one recorded image.
Parameters
----------
projection : int
Number of projection to read.
position : int
Number of positiion ("distance") to read.
pad : bool, optional
Pads the image.
magnification : bool, optional
Brings the image to the magnification of reference_position.
aligned : bool, optional
Corrects alignment (requires alignment of projections).
Fourier : bool
Returns the Fourier transform of the image.
"""
#TODO: consider renaming just projection() to get data.projection(1,23)
#TODO: Possibility non-corrected?
projection_name = str(position).zfill(6) # Constitute projection name in Nanomax dataset
with h5py.File(self.data_filename, 'r') as f:
projection_image = f[self.projection_prefix+projection_name][:] # Read projection
if position != self.reference_position and magnification: #TODO: should implement distance_number, but YAGN?
projection_image = ndimage.zoom(projection_image, [1/self.magnification_y[position], 1/self.magnification_x[position]], mode='nearest') # Zoom to reference_plane magnification TODO: combine w/ alignment?
if aligned and position != self.reference_position: # COrrect alignment
# TODO: Currently using similarity transform. It would be nice to be able to change transforms
# TODO: Translation, Euler, Similarity, Affine... and handle the different length of parameter vector
transform_parameters = self.get_alignment(projection=projection, position=position) # Get alignment parameters TODO: add zoom factor here?
#projection_image = ndimage.zoom(projection_image, magnification_factor/transform_parameters[0])
#projection_image = ndimage.rotate(projection_image, np.degrees(transform_parameters[1]))
#projection_image = ndimage.shift(projection_image, [-transform_parameters[3], -transform_parameters[2]])
projection_image = registrator.apply_transformation(projection_image, transform_parameters)
if Fourier: # Pad and Fourier transform
projection_image = Utilities.resize(projection_image, (self.nfy, self.nfx))
return np.fft.fft2(projection_image)
else:
if pad: # Pad
return Utilities.resize(projection_image, (self.nfy, self.nfx))
else:
return Utilities.resize(projection_image, (self.ny, self.nx))
[docs]class ESRF():
"""Legacy code for ESRF datasets.
Note
----
Will be aligned with Nanomax functionality.
"""
def __init__(self, name: str, path: str='.', version: str='master'):
self.registrator = Utilities.ElastixRigid()
self.path=Path(path) # TODO: OR shoud it take a path object already?
self.name=name # name of sample
self.version=version
self.axis_position = np.array([])
self.position = np.array([]) # in m
self.distance_source_sample= 0
self.energy = 0 # in keV
self.nD = 0
self.nbprojections = 0
self.position_number = []
self.nx = 0
self.ny = 0
self.detector_pixel_size = 0
self.reference_interval = 0
self.reference_position = 0
self.scan_range = 0
self.delta_beta = 0
#TODO: Some of these things should surely go in some kind of configuration file
self.phase_tag='phase'
self.delta_tag='delta'
self.beta_tag='beta'
self.measured_beta_tag='measured_beta'
self.forward_tag='forward'
self.attenuation_tag='attenuation'
self.measured_attenuation_tag='measured_attenuation'
self.propagated_tag='propagated'
self.update_tag='update'
self.prior_tag='prior'
self.attenuation_update_tag='attenuation_update'
self.difference_tag='difference'
self.parameter_filename='parameters.yml'
self.dataset_parameter_filename = self.path / pyphase_path / name / self.version / self.parameter_filename
if (self.dataset_parameter_filename.exists()):
#TODO: initialise must override reading of parameter file
self._read_parameter_file()
else:
self.dataset_parameter_filename.parents[0].mkdir(parents=True)
self.dataset_parameter_filename.touch()
self._initialise()
self.preprocessing=1
self.correct_alignment=1
self.correct_motion=1
self.padding=2 #TODO: should be a parameter/in parameter file (in each version?)
self.magnification = 1 #TODO: should be a parameter/in parameter file (in each version?)
# Pixel size & magnification (from parameter file)
self.magnification_factor = (self.distance_source_sample+self.position*1e-3) / self.distance_source_sample
self.max_magnification_factor = max(self.magnification_factor)
self.pixel_size = self.detector_pixel_size / self.max_magnification_factor
# File access
# Basix paths
self.data_prefix = self.path / self.name
self.result_prefix = self.path / pyphase_path / self.name / self.version
# self.vol_prefix = self.path+'/volfloat/' #TODO: Should change, volumes should be stored either directly under version path or in their own directories
# Phase
self.phase_name = self.name+'_'+self.version # TODO necessary? Good to mark data set & version in file names?
self.phase_prefix = self.result_prefix / self.phase_tag / self.phase_name
# Delta
volume_suffix = '.vol' #TODO: This will be dependent on the internal format
self.delta_filename = (self.result_prefix / self.delta_tag / self.phase_name).with_suffix(volume_suffix)
# Projected Delta
self.phase_forward_name = self.phase_name+'_'+self.delta_tag+'_'+self.forward_tag # Projection of delta
self.phase_forward_prefix = self.result_prefix / (self.delta_tag+'_'+self.forward_tag) / self.phase_forward_name
# Measured attenuation / shortest distance
self.measured_attenuation_name = self.name+'_'+str(self.position_number[0])
self.measured_attenuation_prefix = self.path / self.measured_attenuation_name / self.measured_attenuation_name
# measured beta
self.measured_beta_filename = (self.result_prefix / self.measured_attenuation_name).with_suffix(volume_suffix)
# Projected measured beta
self.measured_attenuation_forward_name = self.measured_attenuation_name+'_'+self.forward_tag
self.measured_attenuation_forward_prefix = self.result_prefix / (self.measured_beta_tag+'_'+self.forward_tag) / self.measured_attenuation_forward_name
# retrieved attenuation
self.attenuation_name = self.phase_name+'_'+self.attenuation_tag
self.attenuation_prefix = self.result_prefix / self.attenuation_tag / self.attenuation_name
# reconstructed beta
self.beta_filename = (self.result_prefix / self.attenuation_name).with_suffix(volume_suffix)
# projected reconstructed beta
self.attenuation_forward_name = self.attenuation_name+'_'+self.forward_tag
self.attenuation_forward_prefix = self.result_prefix / (self.beta_tag+'_'+self.forward_tag) / self.attenuation_forward_name
# prior
self.prior_name=self.phase_name+'_'+self.prior_tag
self.prior_prefix= self.result_prefix / self.prior_name
self.prior_vol_filename = (self.result_prefix / self.prior_tag / self.prior_name).with_suffix(volume_suffix)
self.prior_forward_name = self.prior_name+'_'+self.forward_tag
self.prior_forward_prefix = self.result_prefix / (self.prior_tag+'_'+self.forward_tag) / self.prior_forward_name
# Propagated, updates
self.propagated_prefix=self.result_prefix / self.propagated_tag / (self.phase_name+'_'+self.propagated_tag)
self.update_name = self.phase_name+'_'+self.update_tag
self.update_prefix = self.result_prefix / self.update_tag / self.update_name
self.update_vol_filename = (self.result_prefix / self.update_tag / self.update_name).with_suffix(volume_suffix)
self.attenuation_update_name=self.phase_name+'_'+self.attenuation_update_tag
self.attenuation_update_prefix= self.result_prefix / self.attenuation_update_tag / self.attenuation_update_name
self.attenuation_update_vol_filename = (self.result_prefix / self.attenuation_update_tag / self.attenuation_update_name).with_suffix(volume_suffix)
self.difference_name = self.phase_name + self.difference_tag
self.difference_prefix = self.result_prefix / self.difference_tag / self.difference_name
self.lengthscale=10e-6 #TODO: Dataset should surely not have lengthscale?
# TODO: there should be the possibility to use different parameter files ofc
# TODO: Add argument for parameter file\
# TODO: If no parameter file is given, check if the standard one exists, otherwise populate from info files
#
self.angle_increment=self.scan_range/self.nbprojections
self.Lambda = 12.4e-10 / self.energy # wavelength (in m?)
self.effective_distance = (self.distance_source_sample * self.position) / (self.distance_source_sample + self.position)
self.Fresnel_number = self.Lambda * self.effective_distance / (self.lengthscale ** 2)
self.position_number_no_reference_position = self.position_number.copy()
self.position_number_no_reference_position.remove(self.position_number[self.reference_position])
# Check if shifts are done (i.e. file shifts.pickle exists)
self.shift_filename = self.result_prefix / 'shifts.pickle'
self.shift_polynomial_x=[0 for x in range (self.nD)]
self.shift_polynomial_y=[0 for x in range (self.nD)]
self.shift_polynomial_y=[0 for x in range (self.nD)]
self.shift_polynomial_order=[4, 2, 2]
if os.path.isfile(self.shift_filename):
self.aligned=1
with open(self.shift_filename, 'rb') as f:
self.shifts=pickle.load(f)
self.fit_alignment()
else:
self.shifts=SortedList() # entry: []
self.aligned=0
self.motion_filename=path+'/'+name+'_/motion.pickle'
if os.path.isfile(self.motion_filename):
self.motion_corrected=1
with open(self.motion_filename, 'rb') as f:
self.motion_correlations = pickle.load(f)
else:
self.motion_correlations = [ [] for i in range(self.nD) ]
self.motion_corrected=0
def _initialise(self):
self.motion_corrected = 0
self.aligned = 0
# TODO: Should erase files also
self.populate()
self._write_parameter_file()
@property
def nfx(self):
return self.padding * self.nx
@property
def nfy(self):
return self.padding * self.ny
[docs] def get_projection(self, *, projection, position, pad=True, Fourier=False, aligned=True, magnification=True):
#TODO: consider renaming just projection() to get data.projection(1,23)
# For integration with rest, necessary(?) conversion from position 0 to 1
projection_prefix = self.path / (self.name + '_' + str(self.position_number[position]) + '_/')
projection_filename = projection_prefix / (self.name + '_' + str(self.position_number[position]) + '_' + str(projection).zfill(4) + '.edf')
projection_image = EdfFile(projection_filename).GetData(0)
xpad = (self.nfx-self.nx)//2
ypad = (self.nfy-self.ny)//2
if self.preprocessing:
dark_filename = projection_prefix / 'dark.edf'
#print(dark_filename)
dark_image = EdfFile(dark_filename).GetData(0)
# Get previous ff
reference_index_1 = (self.reference_interval * (projection // self.reference_interval))
reference_index_2 = min(reference_index_1+self.reference_interval, self.nbprojections)
# Weights flat fields
weight_1 = (reference_index_2-projection)/max(reference_index_2-reference_index_1, 1)
weight_2 = 1 - weight_1
# Open 'previous' flat
reference_filename_1 = projection_prefix / ('refHST' + str(reference_index_1).zfill(4) + '.edf')
#print(reference_filename_1)
reference_image_1 = EdfFile(reference_filename_1).GetData(0) - dark_image
# open 'next' flat
reference_filename_2 = projection_prefix / ('refHST' + str(reference_index_2).zfill(4) + '.edf')
#print(reference_filename_2)
reference_image_2 = EdfFile(reference_filename_2).GetData(0) - dark_image
# reference for current projection
reference_image = weight_1*reference_image_1 + weight_2*reference_image_2
# Open projection image
projection_image = (projection_image-dark_image) / reference_image
if self.magnification and magnification:
if self.magnification_factor[position] != self.max_magnification_factor: # Unless its the highest magnification
projection_image = ndimage.zoom(projection_image, self.max_magnification_factor/self.magnification_factor[position])
#TODO: Simple approach first. Magnification and center should perhaps be calculated in shifts?
correction = np.zeros(3)
projection_image = np.pad(projection_image, ((ypad, ypad), (xpad, xpad)), 'edge') #TODO: watch out with magnification
if self.correct_motion and self.motion_corrected and aligned:
#Apply motion
#TODO: Is there any reason to not apply motion correction to reconstructed image like this?
#TODO: Should be better to just motion correct the reference plane no?
motion = self.get_motion(projection, self.reference_position)
correction[0] = motion[0] * self.max_magnification_factor/self.magnification_factor[self.reference_position]
correction[1] = motion[1] * self.max_magnification_factor/self.magnification_factor[self.reference_position]
if self.correct_alignment and self.aligned and aligned:
correction = correction + self.get_alignment(projection, position)
# TODO: Why am I not using freq. variables correctly? investigate
# TODO: Handle padding somehow/correcly
if sum(correction*correction)**0.5 > 0.001:
f = np.arange(0, self.nfx//2, 1)
f = f * ((correction[0])*2*pi / self.nfx)
f = np.exp(1j*f)
#f = np.cos(f)-1j*np.sin(f)
f = np.hstack((f, np.conj(f[::-1])))
f = np.tile(f, (self.nfy, 1))
g = np.arange(0, self.nfy//2, 1)
g = g * ((correction[1]) * 2 * pi / self.nfy)
g = np.exp(1j*g)
#g = np.cos(g)-1j*np.sin(g)
g = np.hstack((g, np.conj(g[::-1])))
g = np.tile(g, (self.nfx, 1))
g = g.transpose()
t = f * g
projection_image_FT = np.fft.fft2(projection_image) * t
else:
projection_image_FT = np.fft.fft2(projection_image)
projection_image = np.real(np.fft.ifft2(projection_image_FT))
projection_image = projection_image[self.ny//2:-self.ny//2, self.nx//2:-self.nx//2]
# watch out how to keep track... corrected image is already in FD
#fig = pyplot.imshow(np.real(np.fft.ifft2(projection_image)))
#pyplot.colorbar()
if Fourier:
return projection_image_FT
else:
if pad:
projection_image = np.pad(projection_image, ((ypad, ypad), (xpad, xpad)), 'edge')
return projection_image
[docs] def preprocess(self):
"""Runs all preprocessing on the dataset: axis positions, motion,
reference position, alignment"""
self.calculate_axis_positions() # TODO: Verify order of shifts, motion, rotaxs
self.calculate_motion()
self.calculate_reference_position()
self.align()
self.fit_alignment()
#self.display_alignment() #TODO: does not work for the moment
self._write_parameter_file()
def _write_parameter_file(self):
"""Writes current state to the associated parameter file"""
parameters = dict(reference_position=self.reference_position,
nD=self.nD,
nbprojections=self.nbprojections,
position_number=self.position_number,
position=self.position.tolist(),
dss=self.distance_source_sample,
energy=self.energy,
detector_pixel_size=self.detector_pixel_size,
nx=self.nx,
ny=self.ny,
axis_position=self.axis_position.tolist(),
reference_interval=self.reference_interval,
scan_range=self.scan_range,
version=self.version,
alpha=self.alpha.tolist(),
delta_beta=self.delta_beta)
with open(self.dataset_parameter_filename, 'w') as f:
yaml.dump(parameters, f)
pass
def _read_parameter_file(self):
"""Reads the associated parameter file."""
with open(self.dataset_parameter_filename, 'r') as f:
parameters=yaml.safe_load(f)
self.reference_position=parameters['reference_position']
self.nD=parameters['nD']
self.nbprojections=parameters['nbprojections']
self.position_number=parameters['position_number']
self.position=np.array(parameters['position'])
self.distance_source_sample=parameters['dss']
self.energy=parameters['energy']
self.detector_pixel_size=parameters['detector_pixel_size']
self.nx=parameters['nx']
self.ny=parameters['ny']
self.axis_position=np.array(parameters['axis_position'])
self.reference_interval=parameters['reference_interval']
self.scan_range=parameters['scan_range']
self.version=parameters['version']
self.alpha=np.array(parameters['alpha'])
self.delta_beta=parameters['delta_beta']
pass
[docs] def populate(self):
"""Tries to find the dataset parameters in the accompanying *.info* and *xml* files.
It is called when a parameter file ``pyphase_parameter.yml`` is not found.
Returns
-------
self
Returns *self* with the updated attributes.
"""
# self.N # projection angles
# Checks number of directories named 'name_%d_'
print(self.path)
type(self.path)
directories = os.listdir(self.path)
name_regex = r"%s(\_)(\d)(\_)" % self.name #TODO: Too unreadable. Avoid cryptic code
data_dirs = [dir for dir in directories if re.match(name_regex, dir)]
self.alpha = np.array([-8, -10]) #TODO: default value... should this really go here?
self.nD = len(data_dirs) # number of distances
self.position_number = np.arange(1, self.nD + 1).tolist()
#TODO: Not nice, should be based on the generic paths
fname_info = self.path / (self.name + '_1_') / (self.name + '_1_' + '.info')
fname_xml = fname_info.with_suffix('.xml')
print(fname_info)
print(fname_xml)
if fname_xml.exists():
xml_file = minidom.parse(str(fname_xml))
dists = []
for dist in range(1, self.nD + 1):
fname = self.path / (self.name + '_' + str(dist) + '_') / (self.name + '_' + str(dist) + '_' + '.xml')
print(fname)
xml_f = minidom.parse(str(fname))
distance = float(xml_f.getElementsByTagName('distance')[0].firstChild.data)
dists.append(distance)
self.reference_position = 0
self.nbprojections = int(xml_file.getElementsByTagName('tomo_N')[0].firstChild.data)
self.position = np.array(dists) / 1000
self.distance_source_sample = 144
self.energy = float( xml_file.getElementsByTagName('energy')[0].firstChild.data)
self.detector_pixel_size = float(xml_file.getElementsByTagName('pixelSize')[0].firstChild.data)
self.nx = int(xml_file.getElementsByTagName('DIM_1')[0].firstChild.data)
self.ny = int(xml_file.getElementsByTagName('DIM_2')[0].firstChild.data)
self.reference_interval = int(xml_file.getElementsByTagName('ref_On')[0].firstChild.data)
self.scan_range = int(xml_file.getElementsByTagName('scanRange')[0].firstChild.data)
elif fname_info.exists():
# Adding file information into a dictionary
parameter_dict = {}
with open(fname_info) as file:
for line in file:
listedline = line.strip().split('=')
parameter_dict[listedline[0]] = listedline[1].strip()
#self.reference_distance = 1 # stationary plane in registration #D# TODO: Where to find this?
self.nbprojections = int(parameter_dict['TOMO_N'])
# Reading distances from the .info files
dists = []
for dist in range(1, self.nD + 1):
fname = elf.path / (self.name + '_' + str(dist) + '_') / (self.name + '_' + str(dist) + '_' + '.info')
with open(fname) as file:
for i, line in enumerate(file):
if i == 1: # 'distance' is in the second line
listedline = line.strip().split('=')
distance = float(listedline[1].strip()) * 0.01
dists.append(distance)
self.position = np.array(dists) / 1000
self.distance_source_sample = 145 # Distance Source - Sample TODO: Where to find this?
self.energy = float(parameter_dict['Energy']) # keV
self.detector_pixel_size = float(parameter_dict['Optic used'])
self.nx = int(parameter_dict['Dim_1'])
self.ny = int(parameter_dict['Dim_2'])
self.reference_interval = int(parameter_dict['REF_ON'])
self.scan_range = int(parameter_dict['ScanRange'])
else: raise Exception('Supporting files '+ self.name +'_1_.info and ' + self.name +'_1_.xml not found.')
[docs] def GetSinogram(self, distance=1):
"""Return projections as sinogram
Parameters
----------
distance : int
Number of the distance of the projection.
Returns
-------
sinogram
Returns projections stacked as sinogram
"""
#TODO: RIT/YAGNI. Add functionality as needed
#TODO: Probably has to be able to split over lines for serialisation/parallellisation
#Allocate a 3D numpy array of correct dimensions
sinogram = np.empty([self.nx, self.ny, self.nbprojections])
#Load projections one by one
for projection in range(self.nbprojections):
sinogram[:,:,projection]=self.get_projection(projection, self.position_number[distance])
#return
return sinogram
[docs] def get_image(self, *, projection, image_type='phase', Fourier=False, pad=False):
"""Get reconstructed images (phase by default)
Parameters
----------
projection : int
Number of the projection.
image_type : str, optional
Can be phase or attenuation.
Fourier : bool
Return Fourier transform of image.
pad : bool
Return padded image.
Returns
-------
image : ndarray
"""
if image_type.lower() == 'attenuation':
fname = self.attenuation_prefix.as_posix()+'_'+str(projection).zfill(4)+'.edf'
# elif 'propagated' in argv:
# fname = self.propagated_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf'
# elif 'difference' in argv:
# fname = self.difference_prefix.as_posix()+'_'+str(distance)+'_'+str(projection).zfill(4)+'.edf'
# elif 'prior' in argv:
# fname = self.prior_forward_prefix.as_posix()+('_'+str(projection).zfill(4)+'.edf')
elif image_type.lower() == 'phase':
fname = self.phase_prefix.as_posix()+'_'+str(projection).zfill(4)+'.edf' #TODO: separate file type...
else:
raise TypeError("proj_type must be one of the following: 'phase', 'attenuation'" )
image = EdfFile(fname).GetData(0)
xpad = (self.nfx-self.nx)//2
ypad = (self.nfy-self.ny)//2
if Fourier:
#TODO: Refactor into an FT function in utilities (f ex)
return np.fft.fft2(np.pad(image, ((ypad, ypad), (xpad, xpad)), 'edge'))
else:
if pad:
image = np.pad(image, ((ypad, ypad), (xpad, xpad)), 'edge')
return image
[docs] def calculate_axis_position(self, distance):
"""Calculates the axis position at one position"""
# During/after scan
# 180/360
correct_alignment = self.correct_alignment
self.correct_alignment = 0
correct_motion = self.correct_motion
self.correct_motion = 0
if self.scan_range == 360:
projection_0 = self.nbprojections + 4
projection_180 = self.nbprojections + 2
else:
projection_0 = self.nbprojections + 2
projection_180 = self.nbprojections
im0 = self.get_projection(projection=projection_0, position=distance)
im180 = np.fliplr(self.get_projection(projection=projection_180, position=distance))
field, image_transormed, transform_parameters = self.registrator.register(im0, im180)
offset = field[0][0][0]/2
print("Rotation axis offset: {}".format(offset))
self.axis_position[distance] = self.nx/2 + offset
print("Axis position: {}".format(self.axis_position[distance]))
self.correct_alignment = correct_alignment
self.correct_motion = correct_motion
[docs] def calculate_axis_positions(self):
"""Calculates all axis positions."""
self.axis_position = np.zeros(self.nD)
for distance in range(self.nD):
self.calculate_axis_position(distance)
self._write_parameter_file()
[docs] def calculate_motion(self):
"""Estimates motion in the scans based on supplementary images."""
# define reference positions
self.motion_correlations = [ [] for i in range(self.nD) ]
self.motion_corrected = 0
correct_alignment = self.correct_alignment
self.correct_alignment = 0
reference_positions = range(0,self.scan_range, 90)
reference_position_indices = range(self.nbprojections+int(self.scan_range/90), self.nbprojections, -1)
image = []
reference = []
for Distance in range(self.nD):
print("Calculating motion position {}".format(str(Distance)))
for index, position in enumerate(reference_positions):
# Get reference projecions and scan projections at reference positions
image_index = self.nbprojections * (position/self.scan_range)
reference = self.get_projection(projection=reference_position_indices[index], position=Distance)
image = self.get_projection(projection=floor(image_index), position=Distance)
if image_index != floor(image_index):
print("Interpolating projection for motion estimation")
image2 = self.get_projection(floor(image_index)+1, Distance)
image = (floor(image_index)+1-image_index)*image + (image_index-floor(image_index))*image2
# calculate correlations
field, aligned_image, transform_parameters = self.registrator.register(image, reference)
# Will create a list of lists with [distance][angle (corresponding to ref positions, 0=0, 1=90, ...)][dim (0=x,1=y,...)]
self.motion_correlations[Distance].append([field[0][0][0], field[1][0][0]])
# save motions
with open(self.motion_filename, 'wb') as f:
pickle.dump(self.motion_correlations, f, pickle.HIGHEST_PROTOCOL)
self.motion_corrected=1
self.correct_alignment = correct_alignment
pass
[docs] def get_motion(self, projection, distance):
"""Returns estimation motion at one projection and position."""
#TODO: disgusting, needs cleaning
# fit motion with sinusoids (?)
#TODO: Should be called from get_projection no???
if self.correct_motion and self.motion_corrected:
reference_positions = np.arange(0, self.scan_range, 90)
projections_all = np.arange(self.nbprojections+len(reference_positions)+1)
polynomial_order_h = min(floor(len(reference_positions)/2), 3)
polynomial_order_v = min(len(reference_positions), 3)
arg = np.array(reference_positions)/self.scan_range - 1 # TODO: to be extended in case of more images a la id16a?
arg_all = projections_all/self.nbprojections - 1
corr=np.array(self.motion_correlations[distance-1])
# Vertical fit
M = np.tile(arg, [polynomial_order_v, 1])
for k in range(1, polynomial_order_v):
M[k,:] = M[k,:]*M[k-1,:]
M = -np.flipud(M) #TODO: could certainly be more elegant...
coefficients = np.dot(np.linalg.pinv(M).T, corr[:,1])
# subtract mean from vertical motion
ref_v = np.polyval(np.append(coefficients,0), arg_all)
ref_v_mean = np.mean(ref_v);
ref_v -= ref_v_mean;
ref_v_meas = -corr[:,1]-ref_v_mean
# Horizontal fit
M = np.tile(arg, [polynomial_order_h, 1])
for k in range(1, polynomial_order_h):
M[k,:] = M[k,:]*M[k-1,:]
M = -np.flipud(M) #TODO: could certainly be more elegant...
M = np.append(M*np.tile(np.cos((np.pi/180)*reference_positions),[polynomial_order_h,1]),
M*np.tile(np.sin((np.pi/180)*reference_positions),[polynomial_order_h,1]),axis=0)
coefficients = np.dot(np.linalg.pinv(M).T, corr[:,0])
sx_cal = np.polyval(np.append(coefficients[0:polynomial_order_h], 0), arg_all);
sy_cal = np.polyval(np.append(coefficients[polynomial_order_h:], 0), arg_all);
arg = projections_all*(self.scan_range/180*pi/self.nbprojections);
reference_position_indices = self.nbprojections + np.arange(int(self.scan_range/90), 0, -1)
arg[reference_position_indices] = np.array(reference_positions)*(pi/180)
M = np.vstack((np.cos(arg), np.sin(arg)));
ref_h = sx_cal*M[0,:] + sy_cal*M[1,:]
#Fit with sinusoid
coefficients = np.dot(np.linalg.pinv(M).T,ref_h)
sx_cal -= coefficients[0];
sy_cal -= coefficients[1];
ref_h -= np.dot(coefficients,M);
index_rots = np.round(np.array(reference_positions)*(self.nbprojections/self.scan_range))
ref_h_meas=-corr[:,0]
ref_h_meas -= np.dot(coefficients, M[:,index_rots.astype(int)])
#pyplot.figure()
#pyplot.plot(projections_all, ref_v, 'b', index_rots, ref_v_meas, 'bx', projections_all, ref_h, 'r', index_rots, ref_h_meas, 'rx')
return ref_h[projection], ref_v[projection]
else:
return 0, 0
[docs] def calculate_reference_position(self):
"""Calculates and sets reference position based on motion"""
#TODO: add check if motion is calculated (self.motion_correlations)
#TODO: manipulation of the motion list is quite roundabout
correlation_amplitudes = []
for index in range(self.nD):
tmp = [item for sublist in self.motion_correlations[index] for item in sublist] #flatten list
tmp = [x**2 for x in tmp] #square elements
correlation_amplitudes.append(sqrt(sum(tmp)))
self.reference_position = correlation_amplitudes.index(min(correlation_amplitudes))
print('Reference position: {}'.format(self.reference_position))
self._write_parameter_file()
# def IsAligned(self):
# return self.aligned
# def ClearShifts(self, D):
# """clear shifts. Either 1 distance or D='all'"""
# pass
[docs] def fit_alignment(self):
"""Fits measured alignment parameters with polynomials"""
nb_shifts=len(self.shifts)
projection=[0 for x in range(nb_shifts)]
x=[[0 for x in range(nb_shifts)] for y in range(self.nD)]
y=[[0 for x in range(nb_shifts)] for y in range(self.nD)]
theta=[[0 for x in range(nb_shifts)] for y in range(self.nD)]
for p_i in range(nb_shifts):
projection[p_i]=self.shifts[p_i][0]
for d_i in range(self.nD):
for p_i in range(nb_shifts):
x[d_i][p_i] = self.shifts[p_i][1][d_i][0]
y[d_i][p_i] = self.shifts[p_i][1][d_i][1]
#theta[dndx][projndx]=self.shifts[projndx][1][dndx][2]
for d_i in range(self.nD):
self.shift_polynomial_x[d_i]=np.polyfit(projection, x[d_i], self.shift_polynomial_order[0])
self.shift_polynomial_y[d_i]=np.polyfit(projection, y[d_i], self.shift_polynomial_order[1])
pass
[docs] def display_alignment(self, projection=0):
"""Displays alignment and fit."""
# TODO: quite a lot of duplicate code with FitShifts
nb_shifts=len(self.shifts)
projection_range=[0 for x in range(nb_shifts)]
x=[[0 for x in range(nb_shifts)] for y in range(self.nD)]
y=[[0 for x in range(nb_shifts)] for y in range(self.nD)]
theta=[[0 for x in range(nb_shifts)] for y in range(self.nD)]
X=np.linspace(0, self.nbprojections-1, self.nbprojections)
for d_i in range(self.nD):
for p_i in range(nb_shifts):
x[d_i][p_i] = self.shifts[p_i][1][d_i][0]
y[d_i][p_i] = self.shifts[p_i][1][d_i][1]
#theta[dndx][projndx]=self.shifts[projndx][1][dndx][2]
for p_i in range(nb_shifts):
projection_range[p_i]=self.shifts[p_i][0]
pyplot.figure()
for index, distance in enumerate(self.position_number_no_reference_position):
pyplot.subplot(len(self.position_number_no_reference_position), 2, 2*(index)+1)
pyplot.plot(projection_range, x[distance-1], 'rx', X, np.polyval(self.shift_polynomial_x[distance-1], X), 'b')
pyplot.title('Distance '+str(distance))
for index, distance in enumerate(self.position_number_no_reference_position):
#yi=np.polyfit(projection_range, y[distance], self.shift_polynomial_order[1])
pyplot.subplot(len(self.position_number_no_reference_position), 2, 2*(index)+2)
pyplot.plot(projection_range, y[distance-1], 'rx', X, np.polyval(self.shift_polynomial_y[distance-1], X), 'b')
pyplot.title('Distance '+str(distance))
pyplot.subplots_adjust(hspace=.6, left=.07, bottom=0.05, right=.98, top=.95)
projection_images = np.zeros([self.ny, self.nx, self.nD])
for distance in range(1, self.nD+1):
projection_images[:,:,distance-1] = self.get_projection(projection=projection, position=distance)
viewer.display_stack(projection_images)
# def SetShift(self, projection, distance, x, y, theta=0):
# pass
[docs] def get_alignment(self, projection, distance):
"""Returns the transform parameters for alignment at one projection
and position"""
#TODO: stub
# if distance == self.reference_distance or self.correct_alignment == 0:
# shift_x = 0
# shift_y = 0
# shift_theta = 0
# else:
shift_x = np.polyval(self.shift_polynomial_x[distance-1], projection)
shift_y = np.polyval(self.shift_polynomial_y[distance-1], projection)
shift_theta = 0
return [shift_x, shift_y, shift_theta]
[docs] def align(self, interval=100):
"""
Align(self, RA, interval=100)
Align complete data set.
TODO: Should probably check for the last projection also so that it is
always included
"""
self.position_number_no_reference_position = self.position_number.copy()
self.position_number_no_reference_position.remove(self.reference_position)
correct_alignment = self.correct_alignment
self.correct_alignment = 0
correct_motion = self.correct_motion
self.correct_motion=0
self.shifts = SortedList()
for projection in range(0, self.nbprojections, interval):
text = "\r" + "Aligning projection {}".format(projection)
sys.stdout.write(text)
sys.stdout.flush()
self.align_projection(projection)
# Pickle the 'data' dictionary using the highest protocol available.
with open(self.shift_filename, 'wb') as f:
pickle.dump(self.shifts, f, pickle.HIGHEST_PROTOCOL)
self.aligned=1
self.correct_alignment = correct_alignment
self.correct_motion = correct_motion
[docs] def align_projection(self, projection):
"""
projection = number of projection to register
"""
correct_alignment = self.correct_alignment
self.correct_alignment = 0
im=[]
# read images
for position in range(self.nD):
im.append(self.get_projection(projection=projection, position=position))
# register (should be done with RA)
tmp=[]
for ndx in range(self.nD):
if ndx != self.reference_position:
field, aligned_image, transform_parameters = registrator.register(im[ndx], im[self.reference_position])
tmp.append([field[0][0][0], field[1][0][0]])
else:
tmp.append([0, 0])
# store shifts
self.shifts.add([projection, tmp])
with open(self.shift_filename, 'wb') as f:
pickle.dump(self.shifts, f, pickle.HIGHEST_PROTOCOL)
self.correct_alignment = correct_alignment
#TODO: Difference just shouldn't be here..
# def DifferenceProjection(self, projection, distance):
# # Calculate difference measured/calculated intensity
# # TODO: Should eventually be an option if it's tomo or not?
# # TODO: Should this really go here?
# # Open measured
# measured_projection = self.get_projection(projection, distance)
# # Get calculated
# calculated_projection = self.get_image(projection, 'propagated', distance)
# # Calculate difference (correct order?)
# difference_image = measured_projection - calculated_projection
# # Save difference
# #TODO: IMPORTANT: USE DATASET TO READ/WRITE IMAGES!!!
# fname = self.path + '/' + self.name + '_1_/' + self.name + '_1_0000.edf' # TODO:Fulhack deluxe to get edfheader. Whatever. Stub/spike. Fix later
# imEDF = EdfFile(fname)
# EDFHeader = imEDF.GetHeader(0)
# time.sleep(1) #TODO: This is ridiculous, how can this be necessary? What is causing this writing problem?
# # TODO: the difference files should be handled in constructor as well I suppose
# fname = self.difference_prefix + '_' + str(distance) + '_' + str(projection).zfill(4) + '.edf'
# EDF = EdfFile(fname)
# # TODO: verify what should go into the header...
# EDF.WriteImage(EDFHeader, difference_image, 0, "Float")
# def DifferenceProjections(self, projections, distance):
# if projections[0] > projections[1]:
# projections[1]=projections[0]
# for projection in range(projections[0], projections[1]+1):
# self.DifferenceProjection(projection, distance)
# def Difference(self):
# #TODO: Needs to be parallelised... how? Come up with something...
# parallelizer = Parallelizer.OAR()
# for distance in range(1, self.nD+1):
# parallelizer.Launch(self, 'difference', distance=distance)
# def UpdatePhase(self):
# # add update to current solution
# current_solution = np.memmap(self.phase_vol_filename, dtype=np.float32)
# update = np.memmap(self.update_vol_filename, dtype=np.float32)
# current_solution += update
# del current_solution, update
# def UpdateAttenuation(self):
# current_solution = np.memmap(self.retrieved_attenuation_vol_filename, dtype=np.float32)
# update = np.memmap(self.attenuation_update_vol_filename, dtype=np.float32)
# current_solution -= update
# del current_solution, update
# def write_image(self, data, proj_type, projection_number, *args):
[docs] def write_image(self, *, image, projection, projection_type='phase'):
# TODO: raise error where needed! number of args, args type...
''' Saves images into file.
Parameters
----------
data : ndarray
An ndarray of the image to save.
projection_number : int
The number of the projection to be saved.
proj_type : str
A string containing the prefix for the name of the file to be saved.
args: int
Number of the distance of the projection.
Returns
-------
None
Saves the images into the file ``[prefix]_[projection_number].edf`` or
``[prefix]_[distance]_[projection_number].edf`` into the ``name_`` directory.
'''
if projection_type.lower() == 'phase':
pref = self.phase_prefix
elif projection_type.lower() == 'attenuation':
pref = self.attenuation_prefix
# elif projection_type.lower() == 'phase update':
# pref = self.update_prefix
# elif projection_type.lower() == 'attenuation update':
# pref = self.attenuation_update_prefix
# elif projection_type.lower() == 'intensity':
# pref = self.propagated_prefix
else:
raise TypeError("proj_type must be one of the following: 'phase', 'attenuation'" )
if not pref.parent.exists():
pref.parent.mkdir(parents=True)
# if len(args) != 0:
# distance = str(args[0]) + '_'
# else:
# distance = ''
#
fname = self.path / (self.name + '_1_/' + self.name + '_1_0000.edf') # TODO:Fulhack deluxe
imEDF = EdfFile(fname)
EDFHeader = imEDF.GetHeader(0)
#fname = pref.as_posix() + '_' + position + str(projection).zfill(4) + '.edf'
fname = pref.as_posix() + '_' + str(projection).zfill(4) + '.edf'
EDF = EdfFile(fname)
EDF.WriteImage(EDFHeader, image, 0, "Float")
#D# TODO: These methods are to be used in propagator.py (for later)
#TODO: Seems like a mistake, since get_image already exists... I guess both could exist? Ev. Refactor
# def get_attenuation(self, projection_number):
# ''' Reads attenuation information from file and retruns it as an ndarray.
# Parameters
# ----------
# projection_number : int
# The number of the projection to get.
# Returns
# -------
# ndarray
# Array containing attenuation information.
# '''
# projection = projection_number
# fname = self.retrieved_attenuation_forward_prefix + '_' + str(projection).zfill(4) + '.edf'
# imEDF = EdfFile(fname)
# attenuation = imEDF.GetData(0)
# return attenuation
# def get_phase(self, projection_number):
# ''' Reads phase information from file and retruns it as an ndarray.
# Parameters
# ----------
# projection_number : int
# The number of the projection to get.
# Returns
# -------
# ndarray
# Array containing phase information.
# '''
# projection = projection_number
# fname = self.phase_forward_prefix + '_' + str(projection).zfill(4) + '.edf'
# imEDF = EdfFile(fname)
# phase = imEDF.GetData(0)
# return phase
#class ESRF(Dataset):
#maybe necessary at some point
# pass
#class ESRFID19(Dataset):
# pass
#class ESRFID16A(Dataset):
# def __init__(self):
# super().__init__()
# def populate(self):
# pass