From d0f71e336cb46682f66ad3352c0c93292a0a2bec Mon Sep 17 00:00:00 2001 From: t2 Date: Tue, 11 Jul 2017 10:28:19 +0200 Subject: [PATCH] Extracting spectras with highest average Change-Id: I5422623d2e1f1a338e53591f1166056199b6dd50 --- .gitignore | 2 + README.md | 21 +++ brukeropusreader/__init__.py | 0 brukeropusreader/configuration.py | 17 +++ brukeropusreader/opus_data.py | 48 +++++++ brukeropusreader/opus_reader.py | 207 ++++++++++++++++++++++++++++++ brukeropusreader/utils.py | 9 ++ conf/opus.conf | 24 ++++ get_bart_result.R | 6 + requirements.txt | 2 + tests/__init__.py | 0 tests/config_reading.py | 25 ++++ tests/find_all.py | 18 +++ 13 files changed, 379 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 brukeropusreader/__init__.py create mode 100644 brukeropusreader/configuration.py create mode 100644 brukeropusreader/opus_data.py create mode 100644 brukeropusreader/opus_reader.py create mode 100644 brukeropusreader/utils.py create mode 100644 conf/opus.conf create mode 100644 get_bart_result.R create mode 100644 requirements.txt create mode 100644 tests/__init__.py create mode 100644 tests/config_reading.py create mode 100644 tests/find_all.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..85cc01c --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.pyc +*.csv \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..487b9ca --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +## Bruker OPUS binary files reader +Python scripts in this project allow to read Bruker OPUS propriertary files. You can provide scripts with config file(example in conf/) to perform interpolation of spectra. + +## Usage +You can run function brukeropusreader.opus_reader.opus reader to read opus file. + +## Structure of OPUS files +OPUS file always consist of spectrum series. Each series is described by few parameters: NPT (number of points), FXV (value of first wavelength), LXV (value of last wavelength), END (address of spectra series). + +This parameters can be found by searching for particular string in binary file(in ASCII). After founding occurence one have to move pointer few bytes further to read value. +It is not established how far forward pointer should be moved. We empirically checked that is is 12 for end and 8 for npt, fxv, lxv. +Beyond that, each file contains some metadata about hardware used for measurement. + +## Controversies +Bruker OPUS is proprieratry file, therefore we don't know exactly what is its structure. We can just guess it. The main problem is - having few series how to decide which one is absorption spectra? +Our solution (empirically developd) is: +* Removed broken series (ones with fxv greater than lxv, without npt information, etc...) +* Filter out interferrograms - taken from https://github.com/philipp-baumann/simplerspec. Interferrograms have starting value 0. We don't need them. +* If after these two steps we still have more than one spectrum left we can choose one with highest average. We empirically checked that others are usually random noise with values near 0. + + diff --git a/brukeropusreader/__init__.py b/brukeropusreader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/brukeropusreader/configuration.py b/brukeropusreader/configuration.py new file mode 100644 index 0000000..420822d --- /dev/null +++ b/brukeropusreader/configuration.py @@ -0,0 +1,17 @@ +from configparser import ConfigParser + + +class Config(object): + def __init__(self, wave_start=None, wave_end=None, wave_size=None): + self.wave_start = wave_start + self.wave_end = wave_end + self.iwsize = wave_size + + @staticmethod + def read_conf_from_file(path): + config = ConfigParser() + config.read(path) + wave_start = float(config.get('mpa', 'start_wn')) + wave_end = float(config.get('mpa', 'end_wn')) + wave_size = int(config.get('mpa', 'num_wn')) + return Config(wave_start, wave_end, wave_size) diff --git a/brukeropusreader/opus_data.py b/brukeropusreader/opus_data.py new file mode 100644 index 0000000..3e8decc --- /dev/null +++ b/brukeropusreader/opus_data.py @@ -0,0 +1,48 @@ +import matplotlib.pyplot as plt +import csv +import numpy as np +from numpy import linalg + + +class NoInterpolatedDataException(Exception): + pass + + +class OpusData(object): + def __init__(self, raw_data, interp=None, meta=None): + self.raw_data = raw_data + self.interpolated_data = interp + self.meta = meta + + def plot_raw(self): + plt.plot(self.raw_data[0], self.raw_data[1], 'o', markeredgewidth=1, + markeredgecolor='r', markerfacecolor='None') + plt.show() + + def plot_interpolated(self): + if self.interpolated_data: + plt.plot(self.interpolated_data[0], self.interpolated_data[1], 'o') + plt.show() + else: + raise NoInterpolatedDataException() + + def plot_data(self): + plt.plot(self.raw_data[0], self.raw_data[1], 'bo') + if self.interpolated_data: + plt.plot(self.interpolated_data[0], + self.interpolated_data[1], + 'ro') + plt.show() + + def compare_with(self, file_path): + with open(file_path, 'rb') as csv_file: + csv_reader = csv.reader(csv_file) + x_r = np.array(map(lambda x: float(x[1:]), next(csv_reader)[2:])) + y_r = np.array(map(lambda x: float(x), next(csv_reader)[2:])) + dist = linalg.norm(y_r-self.interpolated_data[1], ord=1) + print "L1 distance between rbart and python versions:", dist + plt.plot(x_r, y_r, 'bo') + plt.plot(self.interpolated_data[0], + self.interpolated_data[1], + 'ro') + plt.show() diff --git a/brukeropusreader/opus_reader.py b/brukeropusreader/opus_reader.py new file mode 100644 index 0000000..096d754 --- /dev/null +++ b/brukeropusreader/opus_reader.py @@ -0,0 +1,207 @@ +import numpy as np +from struct import unpack_from +from scipy.interpolate import interp1d +from utils import find_all +from opus_data import OpusData +from itertools import product + + +class NoAbsorbanceSpectra(Exception): + pass + + +class WrongWavelengthsInConf(Exception): + pass + + +def opus_reader(filepath, conf, interpolate=True): + with open(filepath, 'rb') as f: + buff = f.read() + + # Reading all waves and spectras + fxv_spc, spc, wavenumbers = read_all_spectras(buff) + # Choose best ab spectra + ab_spectra, ab_wavenumbers = choose_ab(fxv_spc, spc, wavenumbers) + + pairs = reversed(zip(ab_wavenumbers, ab_spectra)) + + # AB spectra and wave extracted, now interpolating + if interpolate: + iwavenumber, yi = interpolate_spectra(pairs, ab_wavenumbers, conf) + + # Spectra succesfully interpolated, now extracting metadata + meta = get_meta_data(buff) + + if interpolate: + return OpusData(zip(*pairs), interp=(iwavenumber, yi), meta=meta) + else: + return OpusData(zip(*pairs), meta=meta) + + +def choose_ab(fxv_spc, spc, wavenumbers): + # Filtering interferograms - we don't need them + which_ig = np.where(fxv_spc == 0)[0] + not_ig = np.setdiff1d(range(len(fxv_spc)), which_ig) + + # Filtering single channel spectras - that's just guessing, but it works! + ab = [] + for x in not_ig: + if np.average(spc[x]) > 0.25: + ab.append(x) + if len(ab) > 1: + spc_avg = map(lambda x: np.average(spc[x]), ab) + max_avg_index = spc_avg.index(max(spc_avg)) + ab_p = ab[max_avg_index] + elif len(ab) == 1: + ab_p = ab[0] + else: + raise NoAbsorbanceSpectra() + + ab_spectra = spc[ab_p] + ab_wavenumbers = wavenumbers[ab[0]] + return ab_spectra, ab_wavenumbers + + +def keyword_positions(buff): + end = np.array(list(find_all("END", buff))) + 12 + npt_all = np.array(list(find_all("NPT", buff))) + 8 + fxv_all = np.array(list(find_all("FXV", buff))) + 8 + lxv_all = np.array(list(find_all("LXV", buff))) + 8 + return end, npt_all, fxv_all, lxv_all + + +def filter_unpaired(fxv_all, lxv_all): + if len(fxv_all) != len(lxv_all): + prod = product(fxv_all, lxv_all) + correct_adressess = zip(*filter(lambda d: (d[1] - d[0]) == 16, prod)) + fxv_all = np.array(correct_adressess[0]) + lxv_all = np.array(correct_adressess[1]) + return fxv_all, lxv_all + + +def read_all_spectras(buff): + end, npt_all, fxv_all, lxv_all = keyword_positions(buff) + fxv_all, lxv_all = filter_unpaired(fxv_all, lxv_all) + + # Number of wavepoints + npt = [unpack_from(" 4 * min(npt))] + spc_param_list = {'npt': npt_all, 'fxv': fxv_all, 'lxv': lxv_all} + + # Filtering some corrupted series + param_spc = filter_spc_params(end_spc, spc_param_list, npt_all) + # Number of points in correct spectras + npt_spc = [unpack_from(" 0 + for key in param_spc.keys(): + param_spc[key] = param_spc[key][mask] + npt_spc = npt_spc[mask] + + def read_spec(x): + return np.array(unpack_from("<" + str(x[1]) + "f", buff, x[0] - 4)) + + def read_waves(x): + return unpack_from("<2d", buff, x)[0] + + spc = map(read_spec, zip(param_spc['end'], npt_spc)) + fxv_spc = np.array(map(read_waves, param_spc["fxv"])) + lxv_spc = map(lambda x: unpack_from("<2d", buff, x)[0], param_spc["lxv"]) + wavenumbers = generate_wavelengths(lxv_spc, fxv_spc, npt_spc) + + return fxv_spc, spc, wavenumbers + + +def generate_wavelengths(lxv_spc, fxv_spc, npt_spc): + wavenumbers = [] + for lx, fx, npt1 in zip(lxv_spc, fxv_spc, npt_spc): + ratio = (fx - lx) / (npt1 - 1) + arr = np.flipud(np.arange(lx, fx + ratio, ratio)) + wavenumbers.append(arr) + return wavenumbers + + +def generate_interpolated_wavenumbers(conf): + wunit = (conf.wave_start - conf.wave_end) / (float(conf.iwsize - 1)) + a = conf.wave_start + wunit + iwavenumber = [0] * conf.iwsize + for i in xrange(len(iwavenumber)): + a -= wunit + iwavenumber[i] = a + return iwavenumber + + +def interpolate_spectra(pairs, ab_wavenumbers, conf): + iwavenumber = generate_interpolated_wavenumbers(conf) + + xav, yav = zip(*pairs)[0], zip(*pairs)[1] + + xa_min = min(ab_wavenumbers) + xa_max = max(ab_wavenumbers) + n_interp = conf.iwsize + yi = [] + + f2 = interp1d(xav, yav, kind='cubic', assume_sorted=True) + + for k in range(n_interp): + if iwavenumber[k] < xa_min: + if k == n_interp - 1: + yi.append(yav[0]) + else: + print("Wrong wavelenghts for interpolating data") + elif iwavenumber[k] > xa_max: + if k == 0: + yi.append(yav[-1]) + else: + raise WrongWavelengthsInConf() + else: + yi.append(f2(iwavenumber[k])) + return iwavenumber, yi + + +def get_meta_data(buff): + # Getting source of instruments + all_ins = tuple(find_all('INS', buff)) + inst = unpack_from("<3s", buff, all_ins[-1] + 8)[0] + # Getting source of infrared + all_src = tuple(find_all('SRC', buff)) + src = unpack_from("<3s", buff, all_src[-1] + 5)[0] + + dat = buff.find('DAT') + 8 + scandate = unpack_from("10s", buff, dat)[0] + + snm = buff.find('SNM') + 8 + snm_lab_material = unpack_from("22s", buff, snm)[0] + + meta = {'ins': inst, + 'src': src, + 'date': scandate, + 'snm': snm_lab_material} + return meta + + +def filter_spc_params(end_spc, spc_param_list, npt_all): + def indexes_of_valid_series(arr): + return list(arr).index(min(filter(lambda x: x > 0, arr))) + new_end = [] + new_fxv = [] + new_lxv = [] + fxv_spc = spc_param_list['fxv'] + lxv_spc = spc_param_list['lxv'] + for npy in npt_all: + end_diff = npy - end_spc + lxv_diff = lxv_spc - npy + fxv_diff = fxv_spc - npy + lxv_tmp = indexes_of_valid_series(lxv_diff) + fxv_tmp = indexes_of_valid_series(fxv_diff) + end_tmp = indexes_of_valid_series(end_diff) + new_end.append(end_spc[end_tmp]) + new_fxv.append(fxv_spc[fxv_tmp]) + new_lxv.append(lxv_spc[lxv_tmp]) + spc_param_list['end'] = np.array(new_end) + spc_param_list['lxv'] = np.array(new_lxv) + spc_param_list['fxv'] = np.array(new_fxv) + + return spc_param_list diff --git a/brukeropusreader/utils.py b/brukeropusreader/utils.py new file mode 100644 index 0000000..db85fdd --- /dev/null +++ b/brukeropusreader/utils.py @@ -0,0 +1,9 @@ +# function to find all occurrences of substring in string +def find_all(sub, a_str): + start = 0 + while True: + start = a_str.find(sub, start) + if start == -1: + return + yield start + start += len(sub) # use start += 1 to find overlapping matches diff --git a/conf/opus.conf b/conf/opus.conf new file mode 100644 index 0000000..e925acf --- /dev/null +++ b/conf/opus.conf @@ -0,0 +1,24 @@ +[main] +avg_dir = avg +avg_suffix = _avg.csv + +[htsxt] +start_wn = 7497.964 +end_wn = 599.76 +num_wn = 3578 + +[alpha_kbr] +start_wn = 3998.12872 +end_wn = 399.387991 +num_wn = 2542 + +[alpha_znse] +start_wn = 3996.4810 +end_wn = 499.8151 +num_wn = 1714 + +[mpa] +start_wn = 12489.456160 +end_wn = 3594.86 +num_wn = 1154 + diff --git a/get_bart_result.R b/get_bart_result.R new file mode 100644 index 0000000..cb435d5 --- /dev/null +++ b/get_bart_result.R @@ -0,0 +1,6 @@ +library(rbart) + +save.soil <- function(opus.file.path, conf.path, output.file='result.csv', output.path='./'){ + df = read.opus(opus.file.path, conf.path) + write.csv(df, paste(output.path, output.file, sep="")) +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6bad103 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +numpy +scipy diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/config_reading.py b/tests/config_reading.py new file mode 100644 index 0000000..cbd05cb --- /dev/null +++ b/tests/config_reading.py @@ -0,0 +1,25 @@ +from unittest import TestCase, main +from brukeropusreader.configuration import Config +from configparser import NoSectionError + + +class ConfigurationTest(TestCase): + + def test_loading_file(self): + try: + self.data = Config.read_conf_from_file("../conf/opus.conf") + except NoSectionError: + self.fail("Cannot load configuration file") + + def test_correct_max_wave(self): + self.assertEqual(self.data.wave_start, 12489.456160) + + def test_correct_min_wave(self): + self.assertEqual(self.data.wave_end, 3594.86) + + def test_correct_wave_len(self): + self.assertEqual(self.data.iwsize, 1154) + + +if __name__ == '__main__': + main() diff --git a/tests/find_all.py b/tests/find_all.py new file mode 100644 index 0000000..f92acab --- /dev/null +++ b/tests/find_all.py @@ -0,0 +1,18 @@ +import unittest +from brukeropusreader.utils import find_all + + +class MyTestCase(unittest.TestCase): + + def test_find_existing(self): + self.assertEqual(list(find_all('test', 'somelongtestwie')), [8]) + + def test_not_existing(self): + self.assertEqual(list(find_all('test', 'nothinghere')), []) + + def test_three(self): + self.assertEqual(list(find_all('txt', 'txtxttxtastxt')), [0, 5, 10]) + + +if __name__ == '__main__': + unittest.main()