Source code for diffractem.io

import h5py
import numpy as np
import pandas as pd
from dask import array as da
from collections import defaultdict
import dask.diagnostics
import os.path
from diffractem import normalize_names
import warnings
from typing import Union
from glob import glob


[docs]def expand_files(file_list: Union[str, list], scan_shots=False, validate=False): def remove_bs(fns): return [fn.replace('\\', '/') for fn in fns] if isinstance(file_list, list) or isinstance(file_list, tuple): fl = remove_bs(file_list) if scan_shots: fl = pd.DataFrame(fl, columns=['file']) elif isinstance(file_list, str) and file_list.endswith('.lst'): if scan_shots: fl = pd.read_csv(file_list, sep=' ', header=None, engine='python', names=['file', 'Event']) fl['file'] = remove_bs(fl['file']) if fl.Event.isna().all(): fl.drop('Event', axis=1, inplace=True) else: fl = [] for s in open(file_list, 'r').readlines(): if '//' in s: raise RuntimeError('Shot identifier found in list file. You may want to set scan_shots=True') fl.append(s.split(' ', 1)[0].strip()) fl = remove_bs(fl) elif isinstance(file_list, str) and (file_list.endswith('.h5') or file_list.endswith('.nxs')): fl = remove_bs(sorted(glob(file_list))) if scan_shots: fl = pd.DataFrame(fl, columns=['file']) else: raise TypeError('file_list must be a list file, single or glob pattern of h5/nxs files, or a list of filenames') if (not scan_shots) and (not len(fl) == len(set(fl))): raise ValueError('File identifiers are not unique, most likely because the file names are not.') if validate: if scan_shots: raise ValueError('Validation is only allowed if scan_shot=False.') valid_files = [] for r in fl: try: with h5py.File(r, 'r') as fh: for k in fh.keys(): if (f'/{k}/shots' in fh) and (f'/{k}/map/features' in fh) and (f'/{k}/data' in fh): # print(r,': file validated!') valid_files.append(r) else: print(r, k, ': invalid file/subset!') except (OSError, IOError) as err: print('Could not open file', r, 'for validation because:') print(err) return valid_files else: return fl
[docs]def dict_to_h5(grp, data, exclude=()): """ Write dictionary into HDF group (or file) object :param grp: HDF group or file object :param data: dictionary to be written into HDF5 :param exclude: dataset or group names to be excluded :return: """ for k, v in data.items(): nk = normalize_names(k) if k in exclude: continue elif isinstance(v, dict): dict_to_h5(grp.require_group(nk), v, exclude=exclude) else: if nk in grp.keys(): grp[nk][...] = v else: grp.create_dataset(nk, data=v)
[docs]def h5_to_dict(grp, exclude=('data', 'image'), max_len=100): """ Get dictionary from HDF group (or file) object :param grp: HDF group or file :param exclude: (sub-)group or dataset names to be excluded; by default 'data' and 'image :param max_len: maximum length of data field to be included (along first direction) :return: dictionary corresponding to HDF group """ d = {} for k, v in grp.items(): if k in exclude: continue if isinstance(v, h5py.Group): d[k] = h5_to_dict(v, exclude=exclude, max_len=max_len) elif isinstance(v, h5py.Dataset): if (len(v.shape) > 0) and (len(v) > max_len): print('Skipping', v.shape, len(v), max_len, v) continue d[k] = v.value return d
[docs]def make_master_h5(file_list, file_name=None, abs_path=False, local_group='/', remote_group='/entry', verbose=False): fns, ids = expand_files(file_list, True) if isinstance(file_list, str) and file_list.endswith('.lst'): if file_name is None: file_name = file_list.rsplit('.', 1)[0] + '.h5' else: if file_name is None: raise ValueError('Please provide output file name explicitly, if input is not a file list.') f = h5py.File(file_name, 'w') try: subsets = [] for fn, id in zip(fns, ids): subset = id if subset in subsets: raise KeyError('File names are not unique!') else: subsets.append(subset) if abs_path: fn2 = os.getcwd() + '/' + fn else: fn2 = fn if not os.path.isfile(fn2): raise FileNotFoundError(f'File {fn2} present in {file_list} not found!') if verbose: print(f'Referencing file {fn2} as {subset}') if local_group != '/': f.require_group(local_group) f[local_group + '/' + subset] = h5py.ExternalLink(fn2, remote_group) except Exception as err: f.close() os.remove(file_name) raise err f.close() return file_name