import pandas as pd
from io import StringIO
import numpy as np
import re
from typing import Union, Optional
BEGIN_GEOM = '----- Begin geometry file -----'
END_GEOM = '----- End geometry file -----'
BEGIN_CELL = '----- Begin unit cell -----'
END_CELL = '----- End unit cell -----'
BEGIN_CHUNK = '----- Begin chunk -----'
END_CHUNK = '----- End chunk -----'
BEGIN_CRYSTAL = '--- Begin crystal'
END_CRYSTAL = '--- End crystal'
BEGIN_PEAKS = 'Peaks from peak search'
END_PEAKS = 'End of peak list'
BEGIN_REFLECTIONS = 'Reflections measured after indexing'
END_REFLECTIONS = 'End of reflections'
HEAD = 'CrystFEL stream format {}.{}'.format(2, 3)
GENERATOR = 'Generated by diffractem StreamParser'
PEAK_COLUMNS = ['fs/px', 'ss/px', '(1/d)/nm^-1', 'Intensity', 'Panel']
REFLECTION_COLUMNS = ['h', 'k', 'l', 'I', 'Sigma(I)', 'Peak', 'Background', 'fs/px', 'ss/px', 'Panel']
ID_FIELDS = ['file', 'Event', 'serial']
[docs]def make_substream(stream: 'StreamParser', Ncryst: int, seed: Optional[int] = None,
filename: Optional[str] = None, query: Optional[str] = None):
"""Write a stream file containing a sub-set of events to a new stream file.
Args:
stream (StreamParser): StreamParser object holding the original stream
Ncryst (int): Number of events to sample
seed (Optional[int], optional): Seed of the random generator. Defaults to None.
filename (Optional[str], optional): Output stream filename. Defaults to filename of original stream,
with '-N_{Ncryst}' appended, where {Ncryst} is the number of sampled crystals.
query (str, optional): query to pre-select events. A sensible choice might be to
only pick indexed events by setting query='indexed_by != "none"'. Defaults to None.
Returns:
[type]: [description]
"""
fn2 = 'subsets/' + stream.filename.rsplit('.',1)[0] + f'-N_{Ncryst}.stream' \
if filename is None else filename
sel = stream.shots if query is None else stream.shots.query(query)
sel = sel.sample(n=Ncryst, random_state=seed)
sel.sort_values(by='first_line', ascending=False, inplace=True)
first = list(sel.first_line)
last = list(sel.last_line)
first.append(0)
last.append(stream.shots.first_line.min() - 1)
copying = False
section = (first.pop(), last.pop())
with open(stream.filename,'r') as fh_from, open(fn2,'w') as fh_to:
for ln, l in enumerate(fh_from):
if not copying:
if ln == section[0]:
copying = True
#print(section[0], ln)
if copying:
fh_to.write(l)
if ln == section[1]:
copying = False
try:
section = (first.pop(), last.pop())
except IndexError:
break
print('Wrote subset with', len(sel), 'events to', fn2)
return fn2
[docs]def augment_stream(streamname: str, outfile:str, new_fields: Union[pd.DataFrame, dict], where: str = 'chunk'):
"""Add new fields to chunk headers in the stream file, which can then be used for chopping or filtering.
Somewhat similar to indexamajig's "include-hdf5-field" option, just *after* the fact.
Args:
streamname (str): Name of stream file
new_fields (pd.DataFrame): pandas DataFrame with index matching the file and Event of the stream file
and columns matching the additional fields to be added
"""
chunk_init = False
found_fn = ''
found_event = ''
with open(streamname, 'r') as fh_in, open(outfile, 'w') as fh:
for ln, l in enumerate(fh_in):
if not chunk_init and l.startswith(BEGIN_CHUNK):
# print('new chunk')
chunk_init = True
file_init = False
event_init = False
found_event = ''
cols = list(new_fields.keys())
elif chunk_init and l.startswith('Image filename:'):
found_fn = l.split(': ')[-1].strip()
# print(found_fn)
file_init = True
elif chunk_init and l.startswith('Event:'):
found_event = l.split(': ')[-1].strip()
# print(found_event)
event_init = True
elif chunk_init and event_init and file_init and \
l.startswith(BEGIN_REFLECTIONS if where=='crystal' else BEGIN_PEAKS):
# now is the time to insert the new stuff
# print(found_fn, found_event)
# print(chunk_init, event_init)
for k, v in new_fields.loc[(found_fn, found_event),:].iteritems():
# print(v)
fh.write(f'{k} = {v}\n')
elif chunk_init and l.startswith(END_CHUNK):
chunk_init = False
fh.write(l)
[docs]def chop_stream(streamname: str, id_list: list, id_field: str = 'hdf5/%/shots/frame',
id_suffix: str = 'frame', fn_contains: str = None):
"""Chops a stream file into sub-streams containing only shots with a specific value of
a defined field, which must be in the chunk header. Useful e.g. for chopping into aggregation
frames, different sample grids, runs with different rotation angles etc.
If you just want to *select* a sub-set of a stream file instead of chopping it up into many parts,
consider using the stream_grep script included with CrystFEL, which is way faster and more flexible.
Args:
streamname (str): Stream file name
id_list (str): List of values of the ID variable which you want to have in the final files.
id_field (str): Field in chunk data to select by. Defaults to 'hdf5/%/shots/frame'.
id_appendix (str): Appendix to be applied to the output stream file names. Defaults to 'frame'.
"""
outfiles = {}
for fnum in id_list:
outfiles[fnum] = open(streamname.rsplit('.', 1)[0] + f'-{id_suffix}{fnum}.stream', 'w')
chunk_init = False
chunk_string = ''
value = -1
with open(streamname, 'r') as fh_in:
for ln, l in enumerate(fh_in):
if not chunk_init and l.startswith(BEGIN_CHUNK):
chunk_init = True
chunk_string += l
value = None
include_file = True
elif chunk_init and (fn_contains is not None) and l.startswith('Image filename'):
found_fn = l.split(': ')[-1].strip()
include_file = fn_contains in found_fn
chunk_string += l
elif chunk_init and l.startswith(id_field):
found_value = l.rsplit('=',1)[-1].strip()
found_value = parse_str_val(found_value)
chunk_string += l
value = found_value if found_value in id_list else None
elif chunk_init and l.startswith(END_CHUNK):
chunk_init = False
chunk_string += l
#print(frame)
if (value is not None) and include_file:
#print(chunk_string)
outfiles[value].write(chunk_string)
chunk_string = ''
elif chunk_init:
chunk_string += l
elif not chunk_init:
# no chunk initialized, write to all files
for _, fh in outfiles.items():
fh.write(l)
else:
raise RuntimeError('This should not happen?! Please debug me.')
[docs]def parse_str_val(input: str):
try:
return int(input.strip())
except ValueError:
try:
return float(input.strip())
except:
return input.strip()
[docs]class StreamParser:
def __init__(self, filename, parse_now=True, serial_offset=-1, new_folder=None):
self.merge_shot = False
self.command = ''
self._cell_string = []
self._geometry_string = []
self._peaks = pd.DataFrame()
self._indexed = pd.DataFrame()
self._shots = pd.DataFrame()
self._crystals = pd.DataFrame()
self._parsed_lines = 0
self._total_lines = 0
self.filename = filename
self.serial_offset = serial_offset
if parse_now:
self.parse(new_folder)
@property
def geometry(self):
"""
:return: geometry section as dictionary
"""
g = {}
for l in self._geometry_string:
if l.startswith(';'):
continue
if '=' not in l:
continue
k, v = l.split(';')[0].split('=', 1)
g[k.strip()] = parse_str_val(v)
return g
@property
def cell(self):
"""
:return: cell section as dictionary
"""
c = {}
for l in self._cell_string:
if '=' not in l:
continue
k, v = l.split('=', 1)
try:
c[k.strip()] = float(v)
except ValueError:
c[k.strip()] = v.strip()
return c
@property
def options(self):
"""
:return: crystfel call options (ONLY -- ones) as dict
"""
o = {}
for opt in re.findall('--\S+', self.command):
if '=' in opt:
k, v = opt[2:].split('=', 1)
try:
o[k.strip()] = int(v)
except ValueError:
try:
o[k.strip()] = float(v)
except ValueError:
o[k.strip()] = v.strip()
else:
o[opt[2:].strip()] = None
return o
@property
def indexed(self):
return self._indexed
@property
def peaks(self):
return self._peaks
@property
def shots(self):
return self._shots.merge(self._crystals, on=ID_FIELDS, how='left')
@property
def input_file(self):
return self.command.split('-i ')[1].split(' -')[0].strip()
@property
def files(self):
return list(self.shots.file.unique())
@property
def num_crystals(self):
return len(self._crystals)
@property
def num_shots(self):
return len(self._shots)
[docs] def parse(self, new_folder):
linedat_peak = StringIO()
linedat_index = StringIO()
shotlist = []
crystallist = []
init_peak = False
init_index = False
init_geom = False
init_cell = False
init_crystal_info = False
init_chunk = False
shotdat = {'Event': None, 'shot_in_subset': None, 'subset': None,
'file': None, 'serial': None}
crystal_info = {}
idstr = None
self._parsed_lines = 0
self._total_lines = 0
skip = False
# lines are queried for their meaning. Lines belonging to tables are appended to StringIO virtual files,
# which are then read into pandas data frames at the very end. The order of Queries is chosen to optimize
# performance, that is, the table lines (most frequent) come first.
with open(self.filename) as fh:
for ln, l in enumerate(fh):
self._parsed_lines += 1
self._total_lines += 1
if skip:
skip = False
continue
# EVENT CHUNKS
# Actual parsing (indexed peaks)
if init_index and END_REFLECTIONS in l:
init_index = False
elif init_index:
linedat_index.write(
' '.join([l.strip(), str(ln), idstr, '\n']))
# Actual parsing (found peaks)
elif init_peak and END_PEAKS in l:
init_peak = False
elif init_peak:
linedat_peak.write(
' '.join([l.strip(), str(ln), idstr, '\n']))
# Required info at chunk head
elif BEGIN_CHUNK in l:
shotdat = {'Event': '_', 'shot_in_subset': -1, 'subset': '_',
'file': '', 'serial': -1, 'first_line': ln, 'last_line': -1}
init_chunk = True
elif END_CHUNK in l:
shotdat['last_line'] = ln
shotlist.append(shotdat)
shotdat = {'Event': None, 'shot_in_subset': None, 'subset': None,
'file': None, 'serial': None, 'first_line': None, 'last_line': None}
init_chunk = False
elif 'Event:' in l:
shotdat['Event'] = l.split(': ')[-1].strip()
dummy_shot = shotdat['Event'].split('//')[-1]
if dummy_shot in ['_', '']:
shotdat['shot_in_subset'] = 0
else:
shotdat['shot_in_subset'] = int(shotdat['Event'].split('//')[-1])
shotdat['subset'] = shotdat['Event'].split('//')[0].strip()
elif 'Image filename:' in l:
shotdat['file'] = l.split(':')[-1].strip()
if new_folder is not None:
shotdat['file'] = new_folder + '/' + shotdat['file'].rsplit('/', 1)[-1]
elif 'Image serial number:' in l:
shotdat['serial'] = int(l.split(': ')[1]) + self.serial_offset
elif (' = ' in l) and (not init_crystal_info) and init_chunk: # optional shot info
k, v = l.split(' = ', 1)
shotdat[k.strip()] = parse_str_val(v)
# Table parsing activation for found peaks
elif (None not in shotdat.values()) and (BEGIN_PEAKS in l):
skip = True # skip the column header line
init_peak = True
idstr = ' '.join([shotdat['file'], shotdat['Event'], str(shotdat['serial'])])
# Table parsing activation for indexing
elif (None not in shotdat.values()) and (BEGIN_REFLECTIONS in l):
skip = True
init_index = True
idstr = ' '.join([shotdat['file'], shotdat['Event'], str(shotdat['serial'])])
# Additional information from indexing
elif BEGIN_CRYSTAL in l:
crystal_info = {k: shotdat[k] for k in ID_FIELDS}
init_crystal_info = True
elif END_CRYSTAL in l:
crystallist.append(crystal_info)
crystal_info = {}
init_crystal_info = False
elif 'Cell parameters' in l:
for k, v in zip(['a', 'b', 'c', 'dummy', 'al', 'be', 'ga'], l.split(' ')[2:9]):
if k == 'dummy':
continue
crystal_info[k] = float(v)
elif 'astar' in l:
crystal_info.update(
{k: float(v) for k, v in zip(['astar_x', 'astar_y', 'astar_z'], l.split(' ')[2:5])})
elif 'bstar' in l:
crystal_info.update(
{k: float(v) for k, v in zip(['bstar_x', 'bstar_y', 'bstar_z'], l.split(' ')[2:5])})
elif 'cstar' in l:
crystal_info.update(
{k: float(v) for k, v in zip(['cstar_x', 'cstar_y', 'cstar_z'], l.split(' ')[2:5])})
elif 'diffraction_resolution_limit' in l:
crystal_info['diff_limit'] = float(l.rsplit(' nm', 1)[0].rsplit('= ', 1)[-1])
elif 'predict_refine/det_shift' in l:
crystal_info['xshift'] = float(l.split(' ')[3])
crystal_info['yshift'] = float(l.split(' ')[6])
continue
elif (' = ' in l) and init_crystal_info and init_chunk: # optional shot info
k, v = l.split(' = ', 1)
crystal_info[k.strip()] = parse_str_val(v)
# CALL STRING
elif 'indexamajig' in l:
self.command = l
# GEOMETRY FILE
elif init_geom and (END_GEOM in l):
init_geom = False
elif init_geom:
self._geometry_string.append(l.strip())
elif BEGIN_GEOM in l:
init_geom = True
# CELL FILE
elif init_cell and (END_CELL in l):
init_cell = False
elif init_cell:
self._cell_string.append(l.strip())
elif BEGIN_CELL in l:
init_cell = True
else:
self._parsed_lines -= 1
# Now convert to pandas data frames
linedat_index.seek(0)
linedat_peak.seek(0)
self._peaks = pd.read_csv(linedat_peak, delim_whitespace=True, header=None,
names=PEAK_COLUMNS + ['stream_line', 'file', 'Event', 'serial']
).sort_values('serial').reset_index().sort_values(['serial', 'index']).reset_index(
drop=True).drop('index', axis=1)
self._indexed = pd.read_csv(linedat_index, delim_whitespace=True, header=None,
names=REFLECTION_COLUMNS + ['stream_line', 'file', 'Event', 'serial']
).sort_values('serial').reset_index().sort_values(['serial', 'index']).reset_index(
drop=True).drop('index', axis=1)
self._shots = pd.DataFrame(shotlist).sort_values('serial').reset_index(drop=True)
if crystallist:
self._crystals = pd.DataFrame(crystallist).sort_values('serial').reset_index(drop=True)
else:
self._crystals = pd.DataFrame(columns=ID_FIELDS)
[docs] def write(self, filename, include_peaks=True, include_indexed=True, include_geom=True, include_cell=True):
from tqdm import tqdm
with open(filename, 'w') as fh:
fh.write(HEAD+'\n'+GENERATOR+'\n'+self.command+'\n')
if include_geom:
fh.write(BEGIN_GEOM+'\n'+'\n'.join(self._geometry_string)+'\n'+END_GEOM + '\n')
if include_cell:
fh.write(BEGIN_CELL + '\n' + '\n'.join(self._cell_string) + '\n' + END_CELL + '\n')
for ii, shot in tqdm(self._shots.iterrows(), total=len(self._shots)):
fh.write(BEGIN_CHUNK + '\n')
fh.write(f'Image filename: {shot.file}\n')
fh.write(f'Event: {shot.Event}\n')
fh.write(f'Image serial number: {shot.serial - self.serial_offset}\n')
keys = set(shot.keys()).difference(
{'Event', 'file', 'serial', 'shot_in_subset', 'subset'})
for k in keys:
fh.write(f'{k} = {shot[k]}\n')
if include_peaks:
fh.write(BEGIN_PEAKS + '\n')
self._peaks.loc[self._peaks.serial == shot.serial, PEAK_COLUMNS].to_csv(
fh, sep=' ', index=False, na_rep='-nan')
fh.write(END_PEAKS + '\n')
crystals = self._crystals.loc[self._crystals.serial == shot.serial, :]
for cid, crs in crystals.iterrows():
fh.write(BEGIN_CRYSTAL + '\n')
fh.write(f'Cell parameters {crs.a} {crs.b} {crs.c} nm, {crs.al} {crs.be} {crs.ga} deg\n')
fh.write(f'astar = {crs.astar_x} {crs.astar_y} {crs.astar_z} nm^-1\n')
fh.write(f'bstar = {crs.bstar_x} {crs.bstar_y} {crs.bstar_z} nm^-1\n')
fh.write(f'cstar = {crs.cstar_x} {crs.cstar_y} {crs.cstar_z} nm^-1\n')
fh.write(f'diffraction_resolution_limit = {crs.diff_limit} nm^-1 or {10 / crs.diff_limit} A\n')
fh.write(f'predict_refine/det_shift x = {crs.xshift} y = {crs.yshift} mm\n')
keys = set(crs.keys()).difference(
{'Event', 'file', 'serial', 'shot_in_subset', 'subset',
'a', 'b', 'c', 'al', 'be', 'ga',
'astar_x', 'astar_y', 'astar_z',
'bstar_x', 'bstar_y', 'bstar_z',
'cstar_x', 'cstar_y', 'cstar_z',
'diff_limit', 'xshift', 'yshift'})
for k in keys:
fh.write(f'{k} = {crs[k]}\n')
if include_indexed:
fh.write(BEGIN_REFLECTIONS + '\n')
self._indexed.loc[self._indexed.serial == shot.serial, REFLECTION_COLUMNS].to_csv(
fh, sep=' ', index=False, na_rep='-nan')
fh.write(END_REFLECTIONS + '\n')
fh.write(END_CRYSTAL + '\n')
fh.write(END_CHUNK + '\n')
[docs] def change_path(self, new_folder=None, old_pattern=None, new_pattern=None):
for df in [self._crystals, self._shots, self._indexed, self._peaks]:
if (new_folder is not None) and (old_pattern is not None):
df.file = new_folder + '/' + \
df.file.str.rsplit('/', 1, True).iloc[:, -1].str.replace(old_pattern, new_pattern)
elif old_pattern is not None:
df.file = df.file.str.replace(old_pattern, new_pattern)
elif new_folder is not None:
df.file = new_folder + '/' + df.file.str.rsplit('/', 1, True).iloc[:, -1]