Extracting spectras with highest average

Change-Id: I5422623d2e1f1a338e53591f1166056199b6dd50
This commit is contained in:
t2 2017-07-11 10:28:19 +02:00
commit d0f71e336c
13 changed files with 379 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*.pyc
*.csv

21
README.md Normal file
View File

@ -0,0 +1,21 @@
## Bruker OPUS binary files reader
Python scripts in this project allow to read Bruker OPUS propriertary files. You can provide scripts with config file(example in conf/) to perform interpolation of spectra.
## Usage
You can run function brukeropusreader.opus_reader.opus reader to read opus file.
## Structure of OPUS files
OPUS file always consist of spectrum series. Each series is described by few parameters: NPT (number of points), FXV (value of first wavelength), LXV (value of last wavelength), END (address of spectra series).
This parameters can be found by searching for particular string in binary file(in ASCII). After founding occurence one have to move pointer few bytes further to read value.
It is not established how far forward pointer should be moved. We empirically checked that is is 12 for end and 8 for npt, fxv, lxv.
Beyond that, each file contains some metadata about hardware used for measurement.
## Controversies
Bruker OPUS is proprieratry file, therefore we don't know exactly what is its structure. We can just guess it. The main problem is - having few series how to decide which one is absorption spectra?
Our solution (empirically developd) is:
* Removed broken series (ones with fxv greater than lxv, without npt information, etc...)
* Filter out interferrograms - taken from https://github.com/philipp-baumann/simplerspec. Interferrograms have starting value 0. We don't need them.
* If after these two steps we still have more than one spectrum left we can choose one with highest average. We empirically checked that others are usually random noise with values near 0.

View File

View File

@ -0,0 +1,17 @@
from configparser import ConfigParser
class Config(object):
def __init__(self, wave_start=None, wave_end=None, wave_size=None):
self.wave_start = wave_start
self.wave_end = wave_end
self.iwsize = wave_size
@staticmethod
def read_conf_from_file(path):
config = ConfigParser()
config.read(path)
wave_start = float(config.get('mpa', 'start_wn'))
wave_end = float(config.get('mpa', 'end_wn'))
wave_size = int(config.get('mpa', 'num_wn'))
return Config(wave_start, wave_end, wave_size)

View File

@ -0,0 +1,48 @@
import matplotlib.pyplot as plt
import csv
import numpy as np
from numpy import linalg
class NoInterpolatedDataException(Exception):
pass
class OpusData(object):
def __init__(self, raw_data, interp=None, meta=None):
self.raw_data = raw_data
self.interpolated_data = interp
self.meta = meta
def plot_raw(self):
plt.plot(self.raw_data[0], self.raw_data[1], 'o', markeredgewidth=1,
markeredgecolor='r', markerfacecolor='None')
plt.show()
def plot_interpolated(self):
if self.interpolated_data:
plt.plot(self.interpolated_data[0], self.interpolated_data[1], 'o')
plt.show()
else:
raise NoInterpolatedDataException()
def plot_data(self):
plt.plot(self.raw_data[0], self.raw_data[1], 'bo')
if self.interpolated_data:
plt.plot(self.interpolated_data[0],
self.interpolated_data[1],
'ro')
plt.show()
def compare_with(self, file_path):
with open(file_path, 'rb') as csv_file:
csv_reader = csv.reader(csv_file)
x_r = np.array(map(lambda x: float(x[1:]), next(csv_reader)[2:]))
y_r = np.array(map(lambda x: float(x), next(csv_reader)[2:]))
dist = linalg.norm(y_r-self.interpolated_data[1], ord=1)
print "L1 distance between rbart and python versions:", dist
plt.plot(x_r, y_r, 'bo')
plt.plot(self.interpolated_data[0],
self.interpolated_data[1],
'ro')
plt.show()

View File

@ -0,0 +1,207 @@
import numpy as np
from struct import unpack_from
from scipy.interpolate import interp1d
from utils import find_all
from opus_data import OpusData
from itertools import product
class NoAbsorbanceSpectra(Exception):
pass
class WrongWavelengthsInConf(Exception):
pass
def opus_reader(filepath, conf, interpolate=True):
with open(filepath, 'rb') as f:
buff = f.read()
# Reading all waves and spectras
fxv_spc, spc, wavenumbers = read_all_spectras(buff)
# Choose best ab spectra
ab_spectra, ab_wavenumbers = choose_ab(fxv_spc, spc, wavenumbers)
pairs = reversed(zip(ab_wavenumbers, ab_spectra))
# AB spectra and wave extracted, now interpolating
if interpolate:
iwavenumber, yi = interpolate_spectra(pairs, ab_wavenumbers, conf)
# Spectra succesfully interpolated, now extracting metadata
meta = get_meta_data(buff)
if interpolate:
return OpusData(zip(*pairs), interp=(iwavenumber, yi), meta=meta)
else:
return OpusData(zip(*pairs), meta=meta)
def choose_ab(fxv_spc, spc, wavenumbers):
# Filtering interferograms - we don't need them
which_ig = np.where(fxv_spc == 0)[0]
not_ig = np.setdiff1d(range(len(fxv_spc)), which_ig)
# Filtering single channel spectras - that's just guessing, but it works!
ab = []
for x in not_ig:
if np.average(spc[x]) > 0.25:
ab.append(x)
if len(ab) > 1:
spc_avg = map(lambda x: np.average(spc[x]), ab)
max_avg_index = spc_avg.index(max(spc_avg))
ab_p = ab[max_avg_index]
elif len(ab) == 1:
ab_p = ab[0]
else:
raise NoAbsorbanceSpectra()
ab_spectra = spc[ab_p]
ab_wavenumbers = wavenumbers[ab[0]]
return ab_spectra, ab_wavenumbers
def keyword_positions(buff):
end = np.array(list(find_all("END", buff))) + 12
npt_all = np.array(list(find_all("NPT", buff))) + 8
fxv_all = np.array(list(find_all("FXV", buff))) + 8
lxv_all = np.array(list(find_all("LXV", buff))) + 8
return end, npt_all, fxv_all, lxv_all
def filter_unpaired(fxv_all, lxv_all):
if len(fxv_all) != len(lxv_all):
prod = product(fxv_all, lxv_all)
correct_adressess = zip(*filter(lambda d: (d[1] - d[0]) == 16, prod))
fxv_all = np.array(correct_adressess[0])
lxv_all = np.array(correct_adressess[1])
return fxv_all, lxv_all
def read_all_spectras(buff):
end, npt_all, fxv_all, lxv_all = keyword_positions(buff)
fxv_all, lxv_all = filter_unpaired(fxv_all, lxv_all)
# Number of wavepoints
npt = [unpack_from("<i", buff, adr)[0] for adr in npt_all]
# "end_spc is vector of offsets where spectra start"
end_spc = end[np.where(np.diff(end) > 4 * min(npt))]
spc_param_list = {'npt': npt_all, 'fxv': fxv_all, 'lxv': lxv_all}
# Filtering some corrupted series
param_spc = filter_spc_params(end_spc, spc_param_list, npt_all)
# Number of points in correct spectras
npt_spc = [unpack_from("<i", buff, adr)[0] for adr in param_spc["npt"]]
npt_spc = np.array(npt_spc)
mask = npt_spc > 0
for key in param_spc.keys():
param_spc[key] = param_spc[key][mask]
npt_spc = npt_spc[mask]
def read_spec(x):
return np.array(unpack_from("<" + str(x[1]) + "f", buff, x[0] - 4))
def read_waves(x):
return unpack_from("<2d", buff, x)[0]
spc = map(read_spec, zip(param_spc['end'], npt_spc))
fxv_spc = np.array(map(read_waves, param_spc["fxv"]))
lxv_spc = map(lambda x: unpack_from("<2d", buff, x)[0], param_spc["lxv"])
wavenumbers = generate_wavelengths(lxv_spc, fxv_spc, npt_spc)
return fxv_spc, spc, wavenumbers
def generate_wavelengths(lxv_spc, fxv_spc, npt_spc):
wavenumbers = []
for lx, fx, npt1 in zip(lxv_spc, fxv_spc, npt_spc):
ratio = (fx - lx) / (npt1 - 1)
arr = np.flipud(np.arange(lx, fx + ratio, ratio))
wavenumbers.append(arr)
return wavenumbers
def generate_interpolated_wavenumbers(conf):
wunit = (conf.wave_start - conf.wave_end) / (float(conf.iwsize - 1))
a = conf.wave_start + wunit
iwavenumber = [0] * conf.iwsize
for i in xrange(len(iwavenumber)):
a -= wunit
iwavenumber[i] = a
return iwavenumber
def interpolate_spectra(pairs, ab_wavenumbers, conf):
iwavenumber = generate_interpolated_wavenumbers(conf)
xav, yav = zip(*pairs)[0], zip(*pairs)[1]
xa_min = min(ab_wavenumbers)
xa_max = max(ab_wavenumbers)
n_interp = conf.iwsize
yi = []
f2 = interp1d(xav, yav, kind='cubic', assume_sorted=True)
for k in range(n_interp):
if iwavenumber[k] < xa_min:
if k == n_interp - 1:
yi.append(yav[0])
else:
print("Wrong wavelenghts for interpolating data")
elif iwavenumber[k] > xa_max:
if k == 0:
yi.append(yav[-1])
else:
raise WrongWavelengthsInConf()
else:
yi.append(f2(iwavenumber[k]))
return iwavenumber, yi
def get_meta_data(buff):
# Getting source of instruments
all_ins = tuple(find_all('INS', buff))
inst = unpack_from("<3s", buff, all_ins[-1] + 8)[0]
# Getting source of infrared <NIR/MIR>
all_src = tuple(find_all('SRC', buff))
src = unpack_from("<3s", buff, all_src[-1] + 5)[0]
dat = buff.find('DAT') + 8
scandate = unpack_from("10s", buff, dat)[0]
snm = buff.find('SNM') + 8
snm_lab_material = unpack_from("22s", buff, snm)[0]
meta = {'ins': inst,
'src': src,
'date': scandate,
'snm': snm_lab_material}
return meta
def filter_spc_params(end_spc, spc_param_list, npt_all):
def indexes_of_valid_series(arr):
return list(arr).index(min(filter(lambda x: x > 0, arr)))
new_end = []
new_fxv = []
new_lxv = []
fxv_spc = spc_param_list['fxv']
lxv_spc = spc_param_list['lxv']
for npy in npt_all:
end_diff = npy - end_spc
lxv_diff = lxv_spc - npy
fxv_diff = fxv_spc - npy
lxv_tmp = indexes_of_valid_series(lxv_diff)
fxv_tmp = indexes_of_valid_series(fxv_diff)
end_tmp = indexes_of_valid_series(end_diff)
new_end.append(end_spc[end_tmp])
new_fxv.append(fxv_spc[fxv_tmp])
new_lxv.append(lxv_spc[lxv_tmp])
spc_param_list['end'] = np.array(new_end)
spc_param_list['lxv'] = np.array(new_lxv)
spc_param_list['fxv'] = np.array(new_fxv)
return spc_param_list

View File

@ -0,0 +1,9 @@
# function to find all occurrences of substring in string
def find_all(sub, a_str):
start = 0
while True:
start = a_str.find(sub, start)
if start == -1:
return
yield start
start += len(sub) # use start += 1 to find overlapping matches

24
conf/opus.conf Normal file
View File

@ -0,0 +1,24 @@
[main]
avg_dir = avg
avg_suffix = _avg.csv
[htsxt]
start_wn = 7497.964
end_wn = 599.76
num_wn = 3578
[alpha_kbr]
start_wn = 3998.12872
end_wn = 399.387991
num_wn = 2542
[alpha_znse]
start_wn = 3996.4810
end_wn = 499.8151
num_wn = 1714
[mpa]
start_wn = 12489.456160
end_wn = 3594.86
num_wn = 1154

6
get_bart_result.R Normal file
View File

@ -0,0 +1,6 @@
library(rbart)
save.soil <- function(opus.file.path, conf.path, output.file='result.csv', output.path='./'){
df = read.opus(opus.file.path, conf.path)
write.csv(df, paste(output.path, output.file, sep=""))
}

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
numpy
scipy

0
tests/__init__.py Normal file
View File

25
tests/config_reading.py Normal file
View File

@ -0,0 +1,25 @@
from unittest import TestCase, main
from brukeropusreader.configuration import Config
from configparser import NoSectionError
class ConfigurationTest(TestCase):
def test_loading_file(self):
try:
self.data = Config.read_conf_from_file("../conf/opus.conf")
except NoSectionError:
self.fail("Cannot load configuration file")
def test_correct_max_wave(self):
self.assertEqual(self.data.wave_start, 12489.456160)
def test_correct_min_wave(self):
self.assertEqual(self.data.wave_end, 3594.86)
def test_correct_wave_len(self):
self.assertEqual(self.data.iwsize, 1154)
if __name__ == '__main__':
main()

18
tests/find_all.py Normal file
View File

@ -0,0 +1,18 @@
import unittest
from brukeropusreader.utils import find_all
class MyTestCase(unittest.TestCase):
def test_find_existing(self):
self.assertEqual(list(find_all('test', 'somelongtestwie')), [8])
def test_not_existing(self):
self.assertEqual(list(find_all('test', 'nothinghere')), [])
def test_three(self):
self.assertEqual(list(find_all('txt', 'txtxttxtastxt')), [0, 5, 10])
if __name__ == '__main__':
unittest.main()