bruker opus reader new algorithm

* reading all data from spectrum
* reading spectra we were not able to parse in past
* new output data format
* all spectra kinds interpolation
This commit is contained in:
t2 2019-08-27 16:11:06 +02:00
parent d2a22afde7
commit 18b3714a3e
16 changed files with 297 additions and 278 deletions

View File

@ -15,47 +15,22 @@ from brukeropusreader import read_file
opus_data = read_file('opus_file.0')
```
`OpusData` consists of three fields `wave_nums`, `spectrum` and `meta`:
`OpusData` is a dict consisting of all fields found in opus file:
```
print(f'Dimension of data: '
f'{len(opus_data.wave_nums)}')
print(f'Parsed fields: '
f'{list(opus_data.keys())}')
print(f'Spectrum range: ['
f'{min(opus_data.spectrum)}; '
f'{max(opus_data.spectrum)}]')
print(f'Metadata: '
f'{opus_data.meta}')
print(f'Absorption spectrum: '
f'{opus_data["AB"]}'
```
Spectrum can be plotted with `matplotlib` library:
```
plt.plot(opus_data.wave_nums, opus_data.spectrum)
plt.show()
```
For full code see [example](example.py).
## Structure of OPUS files
OPUS files consist of several series of spectra.
Each series is described by a few parameters:
## Algorithm
Algorithm taken from
https://bitbucket.org/hirschbeutel/ono/src/default/ono/bruker_opus_filereader.py
- NPT (number of points)
- FXV (value of first wavelength)
- LXV (value of last wavelength)
- END (address of spectra series)
These parameters are found by searching for ASCII strings in the binary files.
After finding a match with one of these parameters, we move the pointer a few bytes further to read the values.
Unfortunately, there is no published open standard describing how much further the pointer should be moved.
We empirically checked that this shift factor is 8 bytes for NPT, FXV, and LXV, and 12 bytes for END.
In addition, each file contains some metadata about the hardware used for measurement.
## Known issues
As Bruker's OPUS file format is not described openly, we do not know its exact structure. One problem is, given only a few series, how to decide which are absorption spectra? Our solution, empirically developed, is as follows:
1. Remove broken series (such as ones where FXV > LXV, missing NPT information, etc.)
2. Remove interferograms. (See [simplerspec](https://github.com/philipp-baumann/simplerspec).) Interferograms have a starting value of 0.
3. If after these two steps we still have more than one series left, choose the one with highest average value. We empirically verified that other series are usually random noise with values near 0.
Author: @twagner
## Contact
For developer issues, please start a ticket in Github.

View File

@ -1,7 +1,7 @@
from .opus_parser import read_file
from .opus_data import OpusData
from .opus_reader import read_file
name = 'brukeropusreader'
name = "brukeropusreader"
__all__ = ['OpusData', 'read_file']
__version__ = '1.2'
__all__ = ["read_file", "OpusData"]
__version__ = "1.3"

View File

@ -0,0 +1,73 @@
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, Tuple
from brukeropusreader.block.parser import parse_series, parse_param, parse_text
class UnknownBlockType(Exception):
pass
BLOCK_0 = defaultdict(
lambda: ("Text Information", parse_text),
{
8: ("Info Block", parse_param),
104: ("History", parse_text),
152: ("Curve Fit", parse_text),
168: ("Signature", parse_text),
240: ("Integration Method", parse_text),
},
)
BLOCK_7 = {4: "ScSm", 8: "IgSm", 12: "PhSm"}
BLOCK_11 = {4: "ScRf", 8: "IgRf"}
BLOCK_23 = {
4: "ScSm Data Parameter",
8: "IgSm Data Parameter",
12: "PhSm Data Parameter",
}
BLOCK_27 = {4: "ScRf Data Parameter", 8: "IgRf Data Parameter"}
DIFFERENT_BLOCKS = {
31: "AB Data Parameter",
32: "Instrument",
40: "Instrument (Rf)",
48: "Acquisition",
56: "Acquisition (Rf)",
64: "Fourier Transformation",
72: "Fourier Transformation (Rf)",
96: "Optik",
104: "Optik (Rf)",
160: "Sample",
}
@dataclass
class BlockMeta:
data_type: int
channel_type: int
text_type: int
chunk_size: int
offset: int
def get_name_and_parser(self) -> Tuple[str, Callable]:
if self.data_type == 0:
return BLOCK_0[self.text_type]
elif self.data_type == 7:
return BLOCK_7[self.channel_type], parse_series
elif self.data_type == 11:
return BLOCK_11[self.channel_type], parse_series
elif self.data_type == 15:
return "AB", parse_series
elif self.data_type == 23:
return BLOCK_23[self.channel_type], parse_param
elif self.data_type == 27:
return BLOCK_27[self.channel_type], parse_param
elif self.data_type in DIFFERENT_BLOCKS.keys():
return DIFFERENT_BLOCKS[self.data_type], parse_param
else:
raise UnknownBlockType()

View File

@ -0,0 +1,54 @@
from brukeropusreader.constants import (
UNSIGNED_SHORT,
PARAM_TYPES,
INT,
DOUBLE,
NULL_BYTE,
ENCODING_LATIN,
ENCODING_UTF,
NULL_STR)
from brukeropusreader.opus_reader import read_chunk
from struct import unpack, error
import numpy as np
def parse_param(data: bytes, block_meta):
cursor = 0
chunk = read_chunk(data, block_meta)
params = {}
while True:
param_name = chunk[cursor : cursor + 3].decode(ENCODING_UTF)
if param_name == "END":
break
type_index = unpack(UNSIGNED_SHORT, chunk[cursor + 4 : cursor + 6])[0]
param_type = PARAM_TYPES[type_index]
param_size = unpack(UNSIGNED_SHORT, chunk[cursor + 6 : cursor + 8])[0]
param_bytes = chunk[cursor + 8 : cursor + 8 + 2 * param_size]
if param_type == "int":
try:
param_val = unpack(INT, param_bytes)[0]
except error:
return params
elif param_type == "float":
param_val = unpack(DOUBLE, param_bytes)[0]
elif param_type == "str":
p_end = param_bytes.find(NULL_BYTE)
param_val = param_bytes[:p_end].decode(ENCODING_LATIN)
else:
param_val = param_bytes
params[param_name] = param_val
cursor = cursor + 8 + 2 * param_size
return params
def parse_text(data: bytes, block_meta):
chunk = read_chunk(data, block_meta)
return chunk.decode(ENCODING_LATIN).strip(NULL_STR)
def parse_series(data: bytes, block_meta):
chunk = read_chunk(data, block_meta)
fmt = f"<{block_meta.chunk_size}f"
values = unpack(fmt, chunk)
return np.array(values)

View File

@ -1,17 +0,0 @@
from configparser import ConfigParser
class Config(object):
def __init__(self, wave_start=None, wave_end=None, wave_size=None):
self.wave_start = wave_start
self.wave_end = wave_end
self.iwsize = wave_size
@staticmethod
def read_conf_from_file(path):
config = ConfigParser()
config.read(path)
wave_start = float(config.get('mpa', 'start_wn'))
wave_end = float(config.get('mpa', 'end_wn'))
wave_size = int(config.get('mpa', 'num_wn'))
return Config(wave_start, wave_end, wave_size)

View File

@ -0,0 +1,13 @@
HEADER_LEN = 504
UNSIGNED_INT = "<I"
UNSIGNED_CHAR = "<B"
UNSIGNED_SHORT = "<H"
INT = "<i"
DOUBLE = "<d"
NULL_BYTE = b"\x00"
NULL_STR = "\x00"
FIRST_CURSOR_POSITION = 24
META_BLOCK_SIZE = 12
ENCODING_LATIN = "latin-1"
ENCODING_UTF = "utf-8"
PARAM_TYPES = {0: "int", 1: "float", 2: "str", 3: "str", 4: "str"}

View File

@ -2,19 +2,21 @@ import numpy as np
from scipy.interpolate import interp1d
class OpusData:
def __init__(self, wave_nums, spectrum, meta=None):
self.wave_nums = wave_nums
self.spectrum = spectrum
self.meta = meta
class OpusData(dict):
def get_range(self, spec_name="AB", wavenums=True):
param_key = f"{spec_name} Data Parameter"
fxv = self[param_key]["FXV"]
lxv = self[param_key]["LXV"]
npt = self[param_key]["NPT"]
x_no_unit = np.linspace(fxv, lxv, npt)
if wavenums:
return x_no_unit
else:
10_000_000 / x_no_unit
def interpolate(self, start, stop, num):
xav, yav = self.wave_nums, self.spectrum
iwave_nums = np.linspace(start,
stop,
num)
f2 = interp1d(xav, yav,
kind='cubic',
assume_sorted=True,
fill_value='extrapolate')
def interpolate(self, start, stop, num, spec_name="AB"):
xav = self.get_range(spec_name=spec_name)
yav = self[spec_name]
iwave_nums = np.linspace(start, stop, num)
f2 = interp1d(xav, yav, kind="cubic", fill_value="extrapolate")
return iwave_nums, f2(iwave_nums)

View File

@ -0,0 +1,65 @@
from typing import List
from brukeropusreader.block.data import BlockMeta, UnknownBlockType
from brukeropusreader.constants import (
HEADER_LEN,
FIRST_CURSOR_POSITION,
META_BLOCK_SIZE,
)
from brukeropusreader.opus_data import OpusData
from brukeropusreader.opus_reader import (
read_data_type,
read_channel_type,
read_text_type,
read_chunk_size,
read_offset,
)
def read_file(file_path: str) -> OpusData:
with open(file_path, "rb") as opus_file:
data = opus_file.read()
meta_data = parse_meta(data)
opus_data = parse_data(data, meta_data)
return opus_data
def parse_meta(data: bytes) -> List[BlockMeta]:
header = data[:HEADER_LEN]
spectra_meta = []
cursor = FIRST_CURSOR_POSITION
while True:
if cursor + META_BLOCK_SIZE > HEADER_LEN:
break
data_type = read_data_type(header, cursor)
channel_type = read_channel_type(header, cursor)
text_type = read_text_type(header, cursor)
chunk_size = read_chunk_size(header, cursor)
offset = read_offset(header, cursor)
if offset <= 0:
break
block_meta = BlockMeta(data_type, channel_type,
text_type, chunk_size, offset)
spectra_meta.append(block_meta)
next_offset = offset + 4 * chunk_size
if next_offset >= len(data):
break
cursor += META_BLOCK_SIZE
return spectra_meta
def parse_data(data: bytes, blocks_meta: List[BlockMeta]) -> OpusData:
opus_data = OpusData()
for block_meta in blocks_meta:
try:
name, parser = block_meta.get_name_and_parser()
except UnknownBlockType:
continue
parsed_data = parser(data, block_meta)
opus_data[name] = parsed_data
return opus_data

View File

@ -1,155 +1,39 @@
import numpy as np
from struct import unpack_from
from .utils import find_all
from .opus_data import OpusData
from itertools import product
from struct import unpack
from brukeropusreader.constants import UNSIGNED_INT, UNSIGNED_CHAR
class NoAbsorbanceSpectra(Exception):
pass
def read_data_type(header: bytes, cursor: int) -> int:
p1 = cursor
p2 = cursor + 1
return unpack(UNSIGNED_CHAR, header[p1:p2])[0]
def read_file(filepath):
with open(filepath, 'rb') as f:
buff = f.read()
# Reading all waves and spectra
fxv_spc, spc, wavenumbers = read_all_spectra(buff)
# Choose best ab spectra
ab_spectra, ab_wavenumbers = choose_ab(fxv_spc, spc, wavenumbers)
wave_num_abs_pair = reversed(list(zip(ab_wavenumbers, ab_spectra)))
meta = get_metadata(buff)
wave_nums, spectrum = list(zip(*wave_num_abs_pair))
return OpusData(wave_nums, spectrum, meta=meta)
def read_channel_type(header: bytes, cursor: int) -> int:
p1 = cursor + 1
p2 = cursor + 2
return unpack(UNSIGNED_CHAR, header[p1:p2])[0]
def choose_ab(fxv_spc, spc, wavenumbers):
# Removing interferograms
which_ig = np.where(fxv_spc == 0)[0]
not_ig = np.setdiff1d(range(len(fxv_spc)), which_ig)
# Removing single channel spectra
# (heuristics are empirically derived)
ab = []
for x in not_ig:
if np.average(spc[x]) > 0.25:
ab.append(x)
if len(ab) > 1:
spc_avg = [np.average(spc[x]) for x in ab]
max_avg_index = spc_avg.index(max(spc_avg))
ab_p = ab[max_avg_index]
elif len(ab) == 1:
ab_p = ab[0]
else:
raise NoAbsorbanceSpectra()
ab_spectra = spc[ab_p]
ab_wavenumbers = wavenumbers[ab[0]]
return ab_spectra, ab_wavenumbers
def read_text_type(header: bytes, cursor: int) -> int:
p1 = cursor + 2
p2 = cursor + 3
return unpack(UNSIGNED_CHAR, header[p1:p2])[0]
def keyword_positions(buff):
end = np.array(list(find_all(b'END', buff))) + 12
npt_all = np.array(list(find_all(b'NPT', buff))) + 8
fxv_all = np.array(list(find_all(b'FXV', buff))) + 8
lxv_all = np.array(list(find_all(b'LXV', buff))) + 8
return end, npt_all, fxv_all, lxv_all
def read_chunk_size(header: bytes, cursor: int) -> int:
p1 = cursor + 4
p2 = cursor + 8
return unpack(UNSIGNED_INT, header[p1:p2])[0]
def filter_unpaired(fxv_all, lxv_all):
if len(fxv_all) != len(lxv_all):
prod = product(fxv_all, lxv_all)
corr_adr = list(zip(*filter(lambda d: (d[1] - d[0]) == 16, prod)))
fxv_all = np.array(corr_adr[0])
lxv_all = np.array(corr_adr[1])
return fxv_all, lxv_all
def read_offset(header: bytes, cursor: int) -> int:
p1 = cursor + 8
p2 = cursor + 12
return unpack(UNSIGNED_INT, header[p1:p2])[0]
def read_all_spectra(buff):
end, npt_all, fxv_all, lxv_all = keyword_positions(buff)
fxv_all, lxv_all = filter_unpaired(fxv_all, lxv_all)
# Number of wavepoints
npt = [unpack_from('<i', buff, adr)[0] for adr in npt_all]
# 'end_spc is vector of offsets where spectra start'
end_spc = end[np.where(np.diff(end) > 4 * min(npt))]
spc_param_list = {'npt': npt_all, 'fxv': fxv_all, 'lxv': lxv_all}
# Filtering some corrupted series
param_spc = filter_spc_params(end_spc, spc_param_list, npt_all)
# Number of points in correct spectra
npt_spc = [unpack_from('<i', buff, adr)[0] for adr in param_spc['npt']]
npt_spc = np.array(npt_spc)
mask = npt_spc > 0
for key in param_spc.keys():
param_spc[key] = param_spc[key][mask]
npt_spc = npt_spc[mask]
def read_spec(x):
return np.array(unpack_from('<' + str(x[1]) + 'f', buff, x[0] - 4))
def read_waves(x):
return unpack_from('<2d', buff, x)[0]
spc = list(map(read_spec, zip(param_spc['end'], npt_spc)))
fxv_spc = np.array([read_waves(x) for x in param_spc['fxv']])
lxv_spc = [unpack_from('<2d', buff, x)[0] for x in param_spc['lxv']]
wavenumbers = generate_wavelengths(lxv_spc, fxv_spc, npt_spc)
return fxv_spc, spc, wavenumbers
def generate_wavelengths(lxv_spc, fxv_spc, npt_spc):
wavenumbers = []
for lx, fx, npt1 in zip(lxv_spc, fxv_spc, npt_spc):
ratio = (fx - lx) / (npt1 - 1)
arr = np.flipud(np.arange(lx, fx + ratio, ratio))
wavenumbers.append(arr)
return wavenumbers
def find_key(buff, key):
hit = buff.find(key) + 8
value = unpack_from('2000s', buff, hit)[0]
value = value[:value.find(b'\x00')]
return value
def get_metadata(buff):
meta = {}
all_keys = ['INS', 'SRC', 'DAT', 'SNM', 'TIM', 'SFM']
for k in all_keys:
keystr = k
value = find_key(buff, keystr.encode('utf-8'))
meta[k] = value
return meta
def filter_spc_params(end_spc, spc_param_list, npt_all):
def indexes_of_valid_series(arr):
return list(arr).index(min(filter(lambda x: x > 0, arr)))
new_end = []
new_fxv = []
new_lxv = []
fxv_spc = spc_param_list['fxv']
lxv_spc = spc_param_list['lxv']
for npy in npt_all:
end_diff = npy - end_spc
lxv_diff = lxv_spc - npy
fxv_diff = fxv_spc - npy
lxv_tmp = indexes_of_valid_series(lxv_diff)
fxv_tmp = indexes_of_valid_series(fxv_diff)
end_tmp = indexes_of_valid_series(end_diff)
new_end.append(end_spc[end_tmp])
new_fxv.append(fxv_spc[fxv_tmp])
new_lxv.append(lxv_spc[lxv_tmp])
spc_param_list['end'] = np.array(new_end)
spc_param_list['lxv'] = np.array(new_lxv)
spc_param_list['fxv'] = np.array(new_fxv)
return spc_param_list
def read_chunk(data: bytes, block_meta):
p1 = block_meta.offset
p2 = p1 + 4 * block_meta.chunk_size
return data[p1:p2]

View File

@ -1,9 +0,0 @@
# function to find all occurrences of substring in string
def find_all(sub, a_str):
start = 0
while True:
start = a_str.find(sub, start)
if start == -1:
return
yield start
start += len(sub) # use start += 1 to find overlapping matches

View File

@ -1,34 +1,36 @@
from brukeropusreader import read_file
import sys
import argparse
import matplotlib.pyplot as plt
import sys
from brukeropusreader import read_file
def main(path_to_file):
print(f'Reading opus file from path'
f'{path_to_file}')
print(f"Reading opus file from path" f"{path_to_file}")
opus_data = read_file(path_to_file)
print(f'Dimension of data: '
f'{len(opus_data.wave_nums)}')
print(f"Data fields: " f"{list(opus_data.keys())}")
print(f'Spectrum range: ['
f'{min(opus_data.spectrum)}; '
f'{max(opus_data.spectrum)}]')
ab_x = opus_data.get_range("AB")
print(f"Absorption spectrum range: " f"{ab_x[0]} {ab_x[-1]}")
print(f"Absorption elements num: " f'{len(opus_data["AB"])}')
print(f'Metadata: '
f'{opus_data.meta}')
try:
import matplotlib.pyplot as plt
plt.plot(opus_data.wave_nums, opus_data.spectrum)
plt.title(f'Spectrum {path_to_file}')
print("Plotting AB")
plt.plot(opus_data.get_range("AB"), opus_data["AB"])
plt.show()
print("Plotting interpolated AB")
plt.plot(*opus_data.interpolate(ab_x[0], ab_x[-1], 100))
plt.show()
if __name__ == '__main__':
except ImportError:
print(f"Install matplotlib to plot spectra")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("opus_path",
help="Path to opus file",
action="store")
parser.add_argument("opus_path", help="Path to opus file", action="store")
args = parser.parse_args()
main(sys.argv[1])

View File

@ -1,3 +1,3 @@
numpy
scipy
matplotlib

View File

@ -2,13 +2,7 @@
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile --output-file requirements.txt requirements.in
# pip-compile --output-file=requirements.txt requirements.in
#
cycler==0.10.0 # via matplotlib
kiwisolver==1.0.1 # via matplotlib
matplotlib==3.0.2
numpy==1.16.1
pyparsing==2.3.1 # via matplotlib
python-dateutil==2.8.0 # via matplotlib
scipy==1.2.1
six==1.12.0 # via cycler, python-dateutil

View File

@ -1,14 +1,15 @@
from distutils.core import setup
setup(name='brukeropusreader',
version='1.2.1',
description='Bruker OPUS File Reader',
author='QED',
author_email='brukeropusreader-dev@qed.ai',
packages=['brukeropusreader'],
setup(
name="brukeropusreader",
version="1.3.0",
description="Bruker OPUS File Reader",
author="QED",
author_email="brukeropusreader-dev@qed.ai",
packages=["brukeropusreader"],
python_requires=">=3",
install_requires=['numpy>=1.13.3', 'scipy>=0.19.1', 'matplotlib>=3.0.2'],
install_requires=["numpy>=1.13.3", "scipy>=0.19.1"],
license="GPLv3",
url="https://github.com/qedsoftware/brukeropusreader"
)
url="https://github.com/qedsoftware/brukeropusreader",
)

View File

@ -1,18 +0,0 @@
import unittest
from brukeropusreader.utils import find_all
class MyTestCase(unittest.TestCase):
def test_find_existing(self):
self.assertEqual(list(find_all('test', 'somelongtestwie')), [8])
def test_not_existing(self):
self.assertEqual(list(find_all('test', 'nothinghere')), [])
def test_three(self):
self.assertEqual(list(find_all('txt', 'txtxttxtastxt')), [0, 5, 10])
if __name__ == '__main__':
unittest.main()