Source code for diffractem.proc_peaks

# Friedel-pair refinement
from scipy.optimize import least_squares
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED
from multiprocessing import current_process
from typing import Optional
from .pre_proc_opts import PreProcOpts
from . import tools, proc2d
from warnings import warn


def _ctr_from_pks(pkl: np.ndarray, p0: np.ndarray,
                  int_weight: bool = False, sigma: float = 2.0, bound: float = 5.0, label: str = None):
    """Gets the refined peak center position from a list of peaks containing Friedel mates
    
    Arguments:
        pkl {np.ndarray} -- [List of peaks, with x and y values in 0th and 1st column, optionally intensity on 2nd]
        p0 {np.ndarray} -- [Initial position]
    
    Keyword Arguments:
        int_weight {bool} -- [weight peaks by their intensity] (default: {False})
        sigma {float} -- [assumed peak rms radius for matching] (default: {2.0})
        bound {float} -- [maximum shift] (default: {5.0})
        label {str} -- [label to be returned in output] (default: {None})
    
    Returns:
        [tuple] -- [refined position, inverse cost function, label]
    """
    if int_weight:
        corr = lambda p: np.sum(np.matmul(pkl[:, 2:3], pkl[:, 2:3].T)
                                * np.exp(-((pkl[:, 0:1] + pkl[:, 0:1].T - 2 * p[0]) ** 2
                                           + (pkl[:, 1:2] + pkl[:, 1:2].T - 2 * p[1]) ** 2) / (2 * sigma ** 2))) \
                         / np.sum(np.matmul(pkl[:, 2:3], pkl[:, 2:3].T))
    else:
        corr = lambda p: np.sum(np.exp(-((pkl[:, 0:1] + pkl[:, 0:1].T - 2 * p[0]) ** 2
                                         + (pkl[:, 1:2] + pkl[:, 1:2].T - 2 * p[1]) ** 2) / (2 * sigma ** 2))) \
                         / (2*pkl.shape[0])

    fun = lambda p: 1 / max(corr(p), 1e-10)  # prevent infs
    if np.isnan(fun(p0)):
        return p0, np.nan, label
    else:
        lsq = least_squares(fun, p0, bounds=(p0 - bound, p0 + bound))
        return lsq.x - 0.5, 1 / lsq.cost, label # -0.5 changes from CrystFEL-like to pixel-center convention


[docs]def center_friedel(peaks: pd.DataFrame, shots: Optional[pd.DataFrame] = None, p0=(778, 308), colnames=('fs/px', 'ss/px'), sigma=2, minpeaks=4, maxres: Optional[float] = None): """[Center refinement of diffraction patterns from a list of peaks, assuming the presence of a significant number of Friedel mates.] Arguments: peaks {[pd.DataFrame]} -- [peaks list for entire data set, as returned by StreamParser. CrystFEL convention!] Keyword Arguments: shots {[pd.DataFrame]} -- [shot list of data set, optional] (default: {None}) p0 {tuple} -- [starting position for center search] (default: {(778, 308)}) colnames {tuple} -- [column names for x and y coordinate] (default: {('fs/px', 'ss/px')}) sigma {int} -- [peak rms radius (determines 'sharpness' of matching)] (default: {2}) minpeaks {int} -- [minimum peak number to try matching] (default: {4}) maxres {int} -- [maximum radius of peaks to still be considered] (default: {None}) """ colnames = list(colnames) p0 = np.array(p0) if current_process().daemon: print('Danger, its a Daemon.') with ProcessPoolExecutor() as p: futures = [] for grp, pks in peaks.groupby(['file', 'Event']): pkl = pks.loc[:, colnames].values rsq = (pkl[:, 0] - p0[0]) ** 2 + (pkl[:, 1] - p0[1]) ** 2 if maxres is not None: pkl = pkl[rsq < maxres ** 2, :] if (minpeaks is None) or pkl.shape[0] > minpeaks: futures.append(p.submit(_ctr_from_pks, pkl, p0, sigma=sigma, label=grp)) wait(futures, return_when=ALL_COMPLETED) if len(futures) == 0: cpos = shots[['file', 'Event']].copy() cpos['beam_x'] = p0[0] cpos['beam_y'] = p0[1] cpos['friedel_cost'] = np.nan return cpos # reformat result into a dataframe cpos = pd.concat([pd.DataFrame(data=np.array([t.result()[2] for t in futures if t.exception() is None]), columns=['file', 'Event']), pd.DataFrame(data=np.array([t.result()[0] for t in futures if t.exception() is None]), columns=['beam_x', 'beam_y']), pd.DataFrame(data=np.array([t.result()[1] for t in futures if t.exception() is None]), columns=['friedel_cost'])], axis=1) if shots is not None: # include shots that were not present in the peaks table cpos = shots[['file', 'Event']].merge(cpos, on=['file', 'Event'], how='left'). \ fillna({'beam_x': p0[0], 'beam_y': p0[1]}) return cpos
[docs]def get_acf(npk, x, y, I=None, roi_length=512, output_radius=256, oversample=4, radial=True, px_ang=None, execution='processes'): """Gets the autocorrelation/pair correlation function of Bragg peak positions, optionally with intensity weighting. It is important to set the computation region properly (i.e., the maximum peak positions from the center to take into account), as this affects computation speed and impact of non-paraxiality at larger angles. It can be defined using the `roi_length` argument. Peaks must be given in CXI format. Args: npk (np.ndarray, int): number of peaks x (np.ndarray): x-coordinates of peaks, *relative to pattern center* y (np.ndarray): y-coordinates of peaks, *relative to pattern center* I ([type], optional): peak intensities. Set to 1 if None. Defaults to None. roi_length (int, optional): edge length of the region around the image center that is used for the computation. Defaults to 512. output_radius (int, optional): maximum included radius of the output ACF. The size of the 2D output will be 2*output_radius*oversample, the size of the radial average will be output_radius*oversample. Defaults to 600. oversample (int, optional): oversampling, that is, by how much smaller the bin sizes of the output are than that of the input (usually the pixels). Defaults to 4. radial (bool, optional): compute the radial average of the ACF. Defaults to True. px_ang (double, optional): diffraction angle corresponding to a distance of 1 pixel from the center, given in rad (practically: detector pixel size/cam length). If given, non-paraxiality of the geometry is corrected (not tested well yet). Defaults to None. execution (str, optional): way of parallelism if a stack of pattern peak data is supplied. Can be 'single-threaded', 'threads', 'processes'. Returns: np.ndarray: 2D autocorrelation function. Length will be 2 * oversample * output_range np.ndarray: 1D radial sum (None for radial=False). Length will be oversample * output_ramge """ from numpy import fft from itertools import repeat # if a stack of pattern data is supplied, call recursively on single shots if isinstance(npk, np.ndarray) and len(npk) > 1: _all_args = zip(npk, x, y, repeat(None) if I is None else I) _kwargs = {'roi_length': roi_length, 'output_radius': output_radius, 'oversample': oversample, 'radial': radial, 'px_ang': px_ang} if execution == 'single-threaded': res = [get_acf(*_args, **_kwargs) for _args in _all_args] else: with (ProcessPoolExecutor() if execution=='processes' else ThreadPoolExecutor()) as exc: ftrs = [exc.submit(get_acf, *_args, **_kwargs) for _args in _all_args] wait(ftrs, return_when='FIRST_EXCEPTION'); # for ftr in ftrs: # if ftr.exception() is not None: # raise ftr.exception() res = [f.result() for f in ftrs] return (np.stack(stk) for stk in zip(*res)) sz = roi_length * oversample rng = output_radius * oversample if rng > sz//2-1: raise ValueError(f'Maximum output range is {roi_length//2-1}.') if px_ang is not None: t_par = (x[:npk]**2 + y[:npk]**2)**.5 * px_ang acorr = 2*np.sin(np.arctan(t_par)/2) / t_par else: acorr = 1 pkx = (oversample * acorr * x[:npk]).round().astype(int) + sz//2 pky = (oversample * acorr * y[:npk]).round().astype(int) + sz//2 pkI = None if I is None else I[:npk] valid = (pkx >= 0) & (pkx < sz) & (pky >= 0) & (pky < sz) pkx, pky, pkI = pkx[valid], pky[valid], 1 if I is None else pkI[valid] dense = np.zeros((sz, sz), dtype=np.float if I is None else np.uint8) dense[pky, pkx] = pkI if I is not None else 1 # print(f'{dense.shape}, {rng}, {sz}') acf = fft.ifft2(np.abs(fft.fft2(dense))**2) acf = fft.ifftshift(acf).real if I is None: # if no intensities were given, the result is (should be) # integer, up to numerical noise acf = acf.round().astype(np.uint8) if acf[sz//2, sz//2] != sum(valid): warn(f'Autocorrelation center pixel ({acf[sz//2, sz//2]}) does not equal the peak number ({sum(valid)})!') acf[sz//2, sz//2] = 0 # remove self-correlation (which will be equal to the peak number) if radial: rad = proc2d.radial_proj(acf, min_size=rng, max_size=rng, my_func=np.sum, x0=sz//2, y0=sz//2) else: rad = None return acf[sz//2-rng:sz//2+rng, sz//2-rng:sz//2+rng], rad
[docs]def get_pk_data(n_pk: np.ndarray, pk_x: np.ndarray, pk_y: np.ndarray, ctr_x: np.ndarray, ctr_y: np.ndarray, pk_I: Optional[np.ndarray] = None, opts: Optional[PreProcOpts] = None, peakmask=None, return_vec=True, pxs=None, clen=None, wl=None, el_rat=None, el_ang=None): if peakmask is None: peakmask = np.ones_like(pk_x, dtype=np.float) for N, row in zip(n_pk, peakmask): row[N:] = np.nan if opts is not None: pxs = opts.pixel_size if pxs is None else pxs clen = opts.cam_length if clen is None else clen wl = opts.wavelength if wl is None else wl el_rat = opts.ellipse_ratio if el_rat is None else el_rat el_ang = opts.ellipse_angle if el_ang is None else el_ang # assert (np.nansum(peakmask, axis=1) == n_pk).all() pk_xr, pk_yr = pk_x - ctr_x.reshape(-1,1), pk_y - ctr_y.reshape(-1,1) pk_xr, pk_yr = pk_xr * peakmask, pk_yr * peakmask # ellipticity correction if el_rat is not None and (el_rat != 1): c, s = np.cos(np.pi/180*el_ang), np.sin(np.pi/180*el_ang) pk_xrc, pk_yrc = 1/el_rat**.5*(c*pk_xr - s*pk_yr), el_rat**.5*(s*pk_xr + c*pk_yr) pk_xrc, pk_yrc = c*pk_xrc + s*pk_yrc, - s*pk_xrc + c*pk_yrc else: pk_xrc, pk_yrc = pk_xr, pk_yr res = {'peakXPosRaw': pk_x, 'peakYPosRaw': pk_y, 'peakXPosRel': pk_xr, 'peakYPosRel': pk_yr, 'peakXPosCor': pk_xrc, 'peakYPosCor': pk_yrc, 'nPeaks': n_pk} if pk_I is not None: res['peakTotalIntensity'] = pk_I if return_vec: if (pxs is None) or (clen is None) or (wl is None): raise ValueError('Cannot return angle parameters without pxs, clen, wl.') pk_r = (pk_xrc**2 + pk_yrc**2)**.5 pk_tt = np.arctan(pxs * pk_r / clen) pk_az = np.arctan2(pk_yrc, pk_xrc) pk_d = wl/(2*np.sin(pk_tt/2)) res.update({'peakTwoTheta': pk_tt, 'peakAzimuth': pk_az, 'peakD': pk_d}) return res
[docs]class Cell(object): """ Partially taken from the PyFAI package, with some simplifications and speed enhancements for d-spacing calculation, as well as a new refinement function. Calculates d-spacings and cell volume as described in: http://geoweb3.princeton.edu/research/MineralPhy/xtalgeometry.pdf """ lattices = ["cubic", "tetragonal", "hexagonal", "rhombohedral", "orthorhombic", "monoclinic", "triclinic"] ctr_types = {"P": "Primitive", "I": "Body centered", "F": "Face centered", "C": "Side centered", "R": "Rhombohedral"} def __init__(self, a=1, b=1, c=1, alpha=90, beta=90, gamma=90, lattice_type="triclinic", centering="P", unique_axis="c", d_min=2): """Constructor of the Cell class: Crystallographic units are Angstrom for distances and degrees for angles :param a,b,c: unit cell length in Angstrom :param alpha, beta, gamma: unit cell angle in degrees :param lattice: "cubic", "tetragonal", "hexagonal", "rhombohedral", "orthorhombic", "monoclinic", "triclinic" :param lattice_type: P, I, F, C or R """ self.a = a self.b = b self.c = c self.alpha = alpha self.beta = beta self.gamma = gamma self.lattice_type = lattice_type if lattice_type in self.lattices else "triclinic" self.unique_axis = unique_axis self._volume = None self.selection_rules = [] "contains a list of functions returning True(allowed)/False(forbidden)/None(unknown)" self.centering = centering self.hkl = None self._d_min = d_min self.init_hkl(d_min) def __repr__(self, *args, **kwargs): return "%s %s cell (unique %s) a=%.4f b=%.4f c=%.4f alpha=%.3f beta=%.3f gamma=%.3f" % \ (self.ctr_types[self.centering], self.lattice_type, self.unique_axis, self.a, self.b, self.c, self.alpha, self.beta, self.gamma)
[docs] @classmethod def cubic(cls, a, centering="P"): """Factory for cubic lattice_types :param a: unit cell length """ a = float(a) self = cls(a, a, a, 90, 90, 90, lattice_type="cubic", centering=centering) return self
[docs] @classmethod def tetragonal(cls, a, c, centering="P"): """Factory for tetragonal lattice_types :param a: unit cell length :param c: unit cell length """ a = float(a) self = cls(a, a, float(c), 90, 90, 90, lattice_type="tetragonal", centering=centering) return self
[docs] @classmethod def orthorhombic(cls, a, b, c, centering="P"): """Factory for orthorhombic lattice_types :param a: unit cell length :param b: unit cell length :param c: unit cell length """ self = cls(float(a), float(b), float(c), 90, 90, 90, lattice_type="orthorhombic", centering=centering) return self
[docs] @classmethod def hexagonal(cls, a, c, centering="P"): """Factory for hexagonal lattice_types :param a: unit cell length :param c: unit cell length """ a = float(a) self = cls(a, a, float(c), 90, 90, 120, lattice_type="hexagonal", centering=centering) return self
[docs] @classmethod def monoclinic(cls, a, b, c, beta, centering="P"): """Factory for hexagonal lattice_types :param a: unit cell length :param b: unit cell length :param c: unit cell length :param beta: unit cell angle """ self = cls(float(a), float(b), float(c), 90, float(beta), 90, centering=centering, lattice_type="monoclinic", unique_axis='b') return self
[docs] @classmethod def rhombohedral(cls, a, alpha, centering="P"): """Factory for hexagonal lattice_types :param a: unit cell length :param alpha: unit cell angle """ a = float(a) alpha = float(a) self = cls(a, a, a, alpha, alpha, alpha, lattice_type="rhombohedral", centering=centering) return self
[docs] @classmethod def diamond(cls, a): """Factory for Diamond type FCC like Si and Ge :param a: unit cell length """ self = cls.cubic(a, centering="F") self.selection_rules.append(lambda h, k, l: not((h % 2 == 0) and (k % 2 == 0) and (l % 2 == 0) and ((h + k + l) % 4 != 0))) return self
[docs] @classmethod def triclinic(cls, a, b, c, alpha, beta, gamma, centering="P"): a, b, c, alpha, beta, gamma = (float(p) for p in [a, b, c, alpha, beta, gamma]) self = cls(a, b, c, alpha, beta, gamma, lattice_type='triclinic', centering='P') return self
@property def volume(self): if self._volume is None: self._volume = self.a * self.b * self.c if self.lattice_type not in ["cubic", "tetragonal", "orthorhombic"]: cosa = np.cos(self.alpha * np.pi / 180.) cosb = np.cos(self.beta * np.pi / 180.) cosg = np.cos(self.gamma * np.pi / 180.) self._volume *= np.sqrt(1 - cosa ** 2 - cosb ** 2 - cosg ** 2 + 2 * cosa * cosb * cosg) return self._volume @property def centering(self): return self._centering @centering.setter def centering(self, centering): self._centering = centering if centering in self.ctr_types else "P" self.selection_rules = [lambda h, k, l: ~((h == 0) & (k == 0) & (l == 0))] if self._centering == "I": self.selection_rules.append(lambda h, k, l: (h + k + l) % 2 == 0) if self._centering == "F": self.selection_rules.append(lambda h, k, l: np.isin(h % 2 + k % 2 + l % 2, (0, 3))) if self._centering == "R": self.selection_rules.append(lambda h, k, l: ((h - k + l) % 3 == 0)) if self._centering == "C": self.selection_rules.append(lambda h, k, l: ((h + k) % 2 == 0))
[docs] def init_hkl(self, d_min: float = 5.): """Sets up a grid with valid Miller indices for this lattice. Useful to pre-compute the indices before running any optimization, which speeds up the computation. Args: d_min (float, optional): Minimum d-spacing, in A. Defaults to 5. """ hmax = int(np.ceil(self.a / d_min)) kmax = int(np.ceil(self.b / d_min)) lmax = int(np.ceil(self.c / d_min)) hkl = np.mgrid[-hmax:hmax+1, -kmax:kmax+1, -lmax:lmax+1] valid = np.stack([r(*hkl) for r in self.selection_rules], axis=0).all(axis=0) self.hkl = tuple(H[valid].ravel() for H in hkl) d = self.d(d_min=None) self.hkl = tuple(H[d >= d_min] for H in self.hkl) self._d_min = d_min
[docs] def d(self, d_min=None, unique=False, a=None, b=None, c=None, alpha=None, beta=None, gamma=None): """Calculates d-spacings for the cell. Cell parameters can transiently be changed, which does *not* affect the values stored with the cell object. This is useful in the context of optimization. Args: d_min (float, optional): Minimum d-spacing. If None, uses the stored value of the object which can be set using init_hkl. Leaving it at None significantly speeds up the computation, which is recommended for refinements. Defaults to None. unique (bool, optional): if True, only unique d-spacings are returned. Otherwise, all spacings are returned which are ordered the same way as in the object's hkl attribute. Defaults to False. a (float, optional): Temporary cell length. Defaults to None. b (float, optional): Temporary cell length. Defaults to None. c (float, optional): Temporary cell length. Defaults to None. alpha (float, optional): Temporary cell angle. Defaults to None. beta (float, optional): Temporary cell angle. Defaults to None. gamma (float, optional): Temporary cell angle. Defaults to None. Returns: np.array: Array of d-spacings """ a = self.a if a is None else a b = self.b if b is None else b c = self.c if c is None else c alpha = self.alpha if alpha is None else alpha beta = self.beta if beta is None else beta gamma = self.gamma if gamma is None else gamma if (d_min is not None) and (d_min != self._d_min): self.init_hkl(d_min) h, k, l = self.hkl if self.lattice_type in ["cubic", "tetragonal", "orthorhombic"]: invd2 = (h / a) ** 2 + (k / b) ** 2 + (l / c) ** 2 else: cosa, sina = np.cos(alpha * np.pi / 180), np.sin(alpha * np.pi / 180) cosb, sinb = np.cos(beta * np.pi / 180), np.sin(beta * np.pi / 180) cosg, sing = np.cos(gamma * np.pi / 180), np.sin(gamma * np.pi / 180) S11 = (b * c * sina) ** 2 S22 = (a * c * sinb) ** 2 S33 = (a * b * sing) ** 2 S12 = a * b * c * c * (cosa * cosb - cosg) S23 = a * a * b * c * (cosb * cosg - cosa) S13 = a * b * b * c * (cosg * cosa - cosb) invd2 = (S11 * h * h + S22 * k * k + S33 * l * l + 2 * S12 * h * k + 2 * S23 * k * l + 2 * S13 * h * l) invd2 /= (self.volume) ** 2 return np.sqrt(1 / (np.unique(invd2) if unique else invd2))
d_spacing = d
[docs] def export(self, filename='refined.cell'): from textwrap import dedent """Exports the cell to a CrystFEL cell file. Args: filename (str, optional): Cell file name. Defaults to 'refined.cell'. """ cellfile = dedent(f''' CrystFEL unit cell file version 1.0 lattice_type = {self.lattice_type} centering = {self.centering} unique_axis = {self.unique_axis} a = {self.a:.3f} A b = {self.b:.3f} A c = {self.c:.3f} A al = {self.alpha:.2f} deg be = {self.beta:.2f} deg ga = {self.gamma:.2f} deg ''').strip() with open(filename, 'w') as fh: fh.write(cellfile)
[docs] def refine_powder(self, svec, pattern, method='distance', fill=0.1, min_prom=0., min_height=0., weights='prom', length_bound=2., angle_bound=3., **kwargs): """Refine unit cell parameters against a powder pattern. The refinement is done using a least-squares fit, where you can pick three different cost functions: * 'distance': the positions of the peaks in the powder pattern are detected. For each peak, the distance to the closest d-spacing is computed. * 'xcorr': the inverse values of the powder pattern at the d-spacings are computed. * 'derivative': the derivative of the powder pattern at the d-spacings are computed Depending on the chosen method, further parameters can be set. The function returns a new Cell object with refined parameters, and a structure with some useful information. Args: svec (np.ndarray): scattering vector (x-axis) of the powder pattern, expressed in inverse nanometer (not angstrom - following CrystFEL convention). pattern (np.ndarray): powder pattern at values svec (y-axis) method (str, optional): Cost function. See description. Defaults to 'distance'. fill (float, optional): Fill value for out-of-range or zero-count s-vectors if method is 'derivative' or 'xcorr'. Defaults to 0.1. min_prom (float, optional): Minimum prominence of peaks (that is, height relative to its vicinity) if method is 'distance'. Increase if too many small peaks are spuriously detected. Usually it is a good idea. Defaults to 0. min_height (float, optional): Minimum peak height to be detected. Usually min_prom is the better parameter. Defaults to 0. weights (str, optional): Weights of the peaks for the least-squares optimization if method is 'derivative'. Can be 'prom' or 'height'. Defaults to 'prom'. length_bound (float, optional): Bound range for cell lengths, in A. Defaults to 2. angle_bound (float, optional): Bound range for cell angles. Defaults to 3. **kwargs: Further arguments will be passed on to scipy.least_sqaures Returns: tuple: 2-Tuple of a new Cell object with the refined parameters, and a structure with useful information from the optimization, including the peak positions and heights if method was 'distance'. """ from scipy.interpolate import interp1d from scipy.optimize import least_squares # find out which parameters should be optimized if self.lattice_type == 'triclinic': parameters = ['a', 'b', 'c', 'alpha', 'beta', 'gamma'] elif self.lattice_type == 'monoclinic': parameters = ['a', 'b', 'c', 'beta'] elif self.lattice_type == 'orthorhombic': parameters = ['a', 'b', 'c'] elif self.lattice_type == 'tetragonal': parameters = ['a', 'c'] elif self.lattice_type == 'cubic': parameters = ['a'] elif self.lattice_type == 'hexagonal': parameters = ['a', 'c'] elif self.lattice_type == 'rhombohedral': parameters = ['a', 'alpha'] else: raise Exception(f'This should not happen (lattice type is set to {self.lattice_type}). Yell at Robert.') _, unique_pos = np.unique(self.d(), return_index=True) # unique d-spacings p0 = [getattr(self, cpar) for cpar in parameters] dsp = lambda p: self.d(**{cpar: p[ii] for ii, cpar in enumerate(parameters)})[unique_pos] bounds = ([getattr(self, cpar) - (length_bound if cpar in 'abc' else angle_bound) for cpar in parameters], [getattr(self, cpar) + (length_bound if cpar in 'abc' else angle_bound) for cpar in parameters]) if method == 'xcorr': cost_profile = interp1d(svec, 1/np.where(pattern != 0, pattern, fill), bounds_error=False, fill_value=fill) cost = lambda p: cost_profile(10/dsp(p)) pk_pos = pk_height = pk_prom = [] elif method == 'derivative': cost_profile = interp1d(svec[1:]/2+svec[:-1]/2, np.diff(pattern), bounds_error=False, fill_value=0) cost = lambda p: cost_profile(10/dsp(p)) pk_pos = pk_height = pk_prom = [] elif method == 'distance': from scipy.signal import find_peaks lim = 0.95 * 10/dsp(p0).min() pkdat = find_peaks(pattern[svec < lim], height=min_height, prominence=min_prom) pk_pos = svec[pkdat[0]] pk_height = pkdat[1]['peak_heights'] pk_prom = pkdat[1]['prominences'] w = pk_prom if weights == 'prom' else 1 w = pk_height if weights == 'height' else 1 cost = lambda p: 100*w * (np.abs(10/dsp(p).reshape(1,-1) - pk_pos.reshape(-1,1))).min(axis=1) else: raise ValueError(f'Unknown refinement method {method}') cost_init = 0.5 * (cost(p0)**2).sum() lsq = least_squares(cost, p0, bounds=bounds, **kwargs) # return a new cell with the optimized parameters C_ref = getattr(self, self.lattice_type)(centering=self._centering, **{cpar: lsq.x[ii] for ii, cpar in enumerate(parameters)}) C_ref.selection_rulse = self.selection_rules C_ref.init_hkl(self._d_min) info = {'lsq_result': lsq, 'initial_cost': cost_init} if method == 'distance': info.update({'peak_position': pk_pos, 'peak_height': pk_height, 'peak_prominence': pk_prom}) if not lsq.success: warn('Powder refinement did not converge!') return C_ref, info