-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdlshelpers.py
345 lines (313 loc) · 17.1 KB
/
dlshelpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# -*- coding: utf-8 -*-
# utils.py
import os, re, io, codecs
from warnings import warn
from pathlib import Path
from dateutil.parser import parse as parseDateTime
import zipfile
import bson
import pprint
from uuid import UUID
import numpy as np
import pandas as pd
from jupyter_analysis_tools.utils import isList
# transmission levels of ALV-CGS4F/8F given in measurement software, angles/detectors 1-4
transmissionLevels = np.array(((1., .1, .03, .01),)+((1., .3, .1, .03),)*3)
def angleToQ(deg, refrac, wavelen):
return 4.*np.pi*refrac/wavelen*np.sin(deg*np.pi/360.)
# constant for the experiment at each angle
def calcDLSconst(q, temp, visc):
kB = 1.38066E-23 # Boltzman constant, [kB] = Newton m / Kelvin
return kB*temp/(6*np.pi*visc)*q**2
def getDLSgammaSi(angle, refrac, wavelen, temp, visc):
return calcDLSconst(angleToQ(angle, refrac, wavelen), temp, visc)
def parseValue(val):
val = val.strip('"')
try:
val = float(val)
except ValueError:
pass
return val
def bufferFromLines(ln):
assert isList(ln)
return io.StringIO("".join(ln))
def readDLSMetaASC(lines, encoding='cp1250'):
"""Reads the measurement settings (temperature, viscosity, refractive index and wavelength).
The *MeanCR* field is ignored because they were found to be either inconsistent with the
recorded count rate in the same file (type `ALV-7004/USB`) or they have the wrong ordering
compared to the recorded count rate in the same file (type `ALV-7004 CGS-8F Data`). The
means can be calculated easily from the count rate read by *readDLSDataASC()*.
**For examples**, please see the tests for this function in `tests/test_dlshelpers.py`.
"""
assert isList(lines)
meta = {'filetype': lines[0].strip()}
# section names to look for and their keys in the metadata dict
sections = [('corrStartLn', '"Correlation"'), ('crStartLn', '"Count Rate"'),
('monitorDiodeLn', 'Monitor Diode'), ('cum1StartLn', '"Cumulant 1.Order"'),
('cum2StartLn', '"Cumulant 2.Order"'), ('cum3StartLn', '"Cumulant 3.Order"'),
('stddevStartLn', '"StandardDeviation"')]
meta['sections'] = {key: -1 for key, _ in sections}
key, pattern = sections.pop(0)
for idx, line in enumerate(lines):
if pattern in line:
meta['sections'][key] = idx
if len(sections):
key, pattern = sections.pop(0)
else:
break
assert meta['sections']['corrStartLn'] > 0, \
'The mandatory "Correlation" section was not found!'
# read the recorded environment data, device settings and sample notes
df = pd.read_csv(bufferFromLines(lines), sep=':', encoding=encoding,
skiprows=1, nrows=meta['sections']['corrStartLn']-2,
index_col=0,
names=range(5), # also read lines containing more colons = multiple fields
na_filter=False, # don't replace empty fields with NaN
header=None,
#quotechar='"',
escapechar="\t", # filters any TAB whitespace
skipinitialspace=True # strips whitespace from fields outside of quotes
)
# parse floats and put datetimes back together
meta.update({name.strip():
parseValue(':'.join(field for field in df.T[name] if len(field)).strip())
for name in df.index
if "MeanCR" not in name
})
if meta['sections']['monitorDiodeLn'] > 0:
meta['monitorDiode'] = float(lines[meta['sections']['monitorDiodeLn']].split()[-1])
# return the sorted dict
return meta
def readDLSDataASC(filename, showProgress=False, encoding='cp1250',
attenKeys={'key': 'abgeschwächt', 'detectorKey': 'detektor', 'levelKey': 'stufe'}):
"""
*Mode*: The mode describes which channels (count rate columns) were used for auto- or
cross-correlation, as given in
[the specs sheet by ALV](https://www.alvgmbh.de/dwnload/alv7000specs.pdf).
*corr*: Data with `C-CH0/1+1/0` mode contain the cross-correlation of two channels
in both directions in the first and second column. Both columns are averaged
on load and the averaged column is prepended and indexed with the measurement
angle, to be used for further processing (improves data quality, see issue #1).
**For examples**, please see the tests for this function in `tests/test_dlshelpers.py`.
"""
if showProgress:
print('.', end="") # some progress output
filename = Path(filename)
filelines = []
with codecs.open(filename, encoding=encoding) as fh:
filelines = [ln for ln in fh.readlines() if len(ln.strip())] # filter empty lines
data = readDLSMetaASC(filelines)
data['filename'] = filename
data['timestamp'] = parseDateTime(data['Date']+' '+data['Time'])
# parsing the scattering angles
angles = [value for key,value in data.items() if key.startswith("Angle")]
data['angles'] = angles
# try to get the concentration from the memo field, TODO: write a test for this
data['memo'] = "".join([value for key,value in data.items() if key.startswith("SampMemo")])
#print(f"memostr: '{data['memo']}'")
data['memofields'] = [field.strip(' ,;') for field in data['memo'].split()]
try:
concentration = [float(field.split(':')[1])
for field in data['memofields'] if ':' in field][0]
concentration = 1/float(concentration)
except (IndexError, ValueError):
concentration = 1
data['concentration'] = concentration
# parse attenuation values
atten = [value for key,value in data.items() if key.startswith("Atten")]
atten += [1.0] * (len(angles) - len(atten)) # pad attenuation list to have same amount as angles (only 3 stored in file)
if any([(key in data['memo'].lower()) for key in (attenKeys['key'], attenKeys['detectorKey'])]):
attenManual = getAttenuationFromMemo(data['memo'], len(angles), **attenKeys)
if atten != attenManual:
warn("\n"f" Attenuation values specified in sample description: {attenManual}\n"
f" ('{data['memo']}'),\n"
f" differ from the values stored by software: {atten}!\n"
f" -> Using values from sample description: {attenManual}.", # 'cause they may contain 4 instead of 3 values
None, stacklevel=2)
atten = attenManual
data['attenuation'] = pd.DataFrame(np.array(atten).reshape(1,-1), columns=angles)
data = dict(sorted(data.items(), key=lambda x: x[0].lower()))
# read a section of data columns from header until begin of next section, or file end
lineIndices = np.array([idx for key, idx in data['sections'].items()], dtype=int)
def sectionEndLn(startLn): # return the index of any section directly following the given index
followingSections = lineIndices[lineIndices > startLn]
return followingSections.min() if followingSections.size else len(filelines)
# read correlation data columns (mandatory)
startln, endln = data['sections']['corrStartLn']+1, sectionEndLn(data['sections']['corrStartLn'])
corr = pd.read_csv(bufferFromLines(filelines), encoding=encoding, sep=r'\s+',
index_col=0, header=None,
skiprows=startln, nrows=endln-startln)
if data['Mode'].strip() == "C-CH0/1+1/0": # cross-correlation of two channels in both directions
corr.insert(0, 0, corr.iloc[:,:2].mean(axis=1))
# adjust column names if more columns than angles were found
columns = angles
if len(corr.columns) > len(columns):
# repeat the last angle with suffix if there are more columns
columns = columns + ([columns[-1],] * (len(corr.columns) - len(columns)))
columns = [f"{int(angle)}_{idx}" if idx else angle for idx, angle in enumerate(columns)]
corr.rename(columns=dict(zip(corr.columns, columns)), inplace = True)
corr.index.names = ["tau"]
data['correlation'] = corr
# read the count rate data columns
if data['sections']['crStartLn'] > 0: # does not exist in an 'averaged' data file
startln, endln = data['sections']['crStartLn']+1, sectionEndLn(data['sections']['crStartLn'])
cr = pd.read_csv(bufferFromLines(filelines), encoding=encoding, sep=r'\s+',
#names=["time"]+columns,
header=None,
index_col=0,
skiprows=startln, nrows=endln-startln)
if data['Mode'].strip() == "C-CH0/1+1/0": # cross-correlation of two channels in both directions
cr.insert(0, 0, cr.iloc[:,:2].mean(axis=1))
cr.rename(columns=dict(zip(cr.columns, columns)), inplace = True)
cr.index.names = ["time"]
data['countrate'] = cr
# read cumulants from data
cumulants = []
for num in range(3):
cumulants.append(None)
secname = f"cum{num+1}StartLn"
if data['sections'][secname] <= 0:
continue
startln, endln = data['sections'][secname]+1, sectionEndLn(data['sections'][secname])
secname = filelines[startln-1].strip().strip('"')
cumulants[num] = pd.read_csv(bufferFromLines(filelines), encoding=encoding, sep=r'\t',
names=("value",), index_col=0, engine='python',
skiprows=startln, nrows=endln-startln)
if any([cumu is not None for cumu in cumulants]):
data['cumulants'] = cumulants
# read the standard deviation
stddevStartLn = data['sections']['stddevStartLn']
if stddevStartLn > 0:
startln, endln = stddevStartLn+1, sectionEndLn(stddevStartLn)
data['stddev'] = pd.read_csv(bufferFromLines(filelines), encoding=encoding, sep=r'\s+',
names=["tau", columns[0]], index_col=0,
skiprows=startln, nrows=endln-startln)
return data
def getAttenuationFromMemo(memo, count=4, detectorKey='detektor', levelKey='stufe', key=None):
"""Extract manually specified attenuation of individual scattering angles from a given
measurement data files (ASC) *SampMemo* field.
Different notations and numbers of detectors are accepted (test cases):
>>> getAttenuationFromMemo("Toluolmessung für Statik,, Detektor 1 und 2, 1 Stufe abgeschwächt")
[0.1, 0.3, 1.0, 1.0]
>>> getAttenuationFromMemo("Toluolmessung für Statik,, Detektor 1 und 2, 1 Stufe abgeschwächt, Detektor 3 , 2 Stufen abgeschwächt")
[0.1, 0.3, 0.1, 1.0]
>>> getAttenuationFromMemo("Toluolmessung für Statik,, Detektor 1 und 2, 1 Stufe abgeschwächt, Detektor 3 und 4, 2 Stufen abgeschwächt")
[0.1, 0.3, 0.1, 0.1]
>>> getAttenuationFromMemo("Toluolmessung für Statik,, Detektor 1, 1 Stufe abgeschwächt, Detektor 3 , 2 Stufen abgeschwächt")
[0.1, 1.0, 0.1, 1.0]
>>> getAttenuationFromMemo("1:100 verdünnt mit Wasser, ungefiltert,, Detektor 1(10%) und 2(30%), 1 Stufe abgeschwächt")
[0.1, 0.3, 1.0, 1.0]
Table of transmission levels 1-4 (columns) as defined in the program for detectors 1-4 (rows):
>>> print(transmissionLevels)
[[1. 0.1 0.03 0.01]
[1. 0.3 0.1 0.03]
[1. 0.3 0.1 0.03]
[1. 0.3 0.1 0.03]]
"""
def getnumbers(text, idxoffset=0):
return ([(int(idx)+idxoffset, int(inner)/100 if len(inner) else 1.)
for idx, _, inner in re.findall(r"\b(\d)\b(\((\d+)%\))?", text)])
atten, detector = list(np.ones(count)), ()
for field in memo.split(','):
field = field.strip().lower()
#print(f"-> field '{field}':")
# extract detectors indices first, may contain transmission % already
if detectorKey in field:
detector = getnumbers(field, idxoffset=-1)
#print(detector)
# extract transmission level next, level indices map to hard coded transmission factor table
if levelKey in field:
level = getnumbers(field)
detector = [(d, transmissionLevels[d,level[d%len(level)][0]]) for d, _ in detector]
#print(detector)
for d, val in detector:
atten[d] = val
return atten
def convertAPKWentries(data):
"""Gets a data dictionary loaded from an APKW file and adds or converts field
values for compatibility with existing code."""
data['timestamp'] = data['MetaInfo']['StartTime']
data['Samplename'] = data['InputParameter']['Name']
data['memo'] = data['InputParameter']['Comment']
data['memofields'] = [field.strip(' ,;') for field in data['memo'].split()] # same a in getDLSFileData()
data['concentration'] = 1
# use the ref. index value of the solvent
data['Refractive Index'] = data['InputParameter']['Solvent']['RefractiveIndex']
if data['Refractive Index'] == 0.:
# find the ref. index in look-up table if not set
data['Refractive Index'] = [entry['Value']
for entry in data['InputParameter']['Solvent']['RefractiveIndices']
if entry['Key']['Temperature'] == data['InputParameter']['Solvent']['Temperature']
and entry['Key']['Wavelength'] == data['InputParameter']['Solvent']['Wavelength']][0]
data['Wavelength [nm]'] = data['InputParameter']['Solvent']['Wavelength']*1e9
data['Temperature [K]'] = data['InputParameter']['Solvent']['Temperature']
data['Viscosity [cp]'] = data['InputParameter']['Solvent']['Viscosity']*1e3
if data['MetaInfo']['InstrumentType'] == 'Litesizer 100':
data['Angle [°]'] = 175.
data['angles'] = [data['Angle [°]']]
data['correlation'] = pd.DataFrame(data['RawData'][0]['CorrelationDataScaled'],
columns=data['angles'],
index=data['RawData'][0]['CorrelationDelayTimes'],)
data['correlation'] -= 1.
data['correlation'].index *= 1e3
data['correlation'].index.names = ["tau"]
data['countrate'] = pd.DataFrame(data['RawData'][0]['IntensityTrace'], columns=data['angles'])
data['countrate'] *= 10. # converted to match the exported XLS sheet data
del data['RawData'] # remove obsolete, duplicate data
data['Duration [s]'] = data['InputParameter']['MeasurementTime']
# hardcoded count rate reading times? nowhere stored in file
data['countrate'].index = np.linspace(0.1, 0.1*data['countrate'][data['Angle [°]']].size,
data['countrate'][data['Angle [°]']].size)
data['attenuation'] = pd.DataFrame({data['Angle [°]']: (data['Attenuation'],)})
# extract data of the cumulant fit
rr = data['RecentRun']
data['cumulantfit'] = pd.DataFrame(zip(rr['CorrelationFitFunction'], rr['G2Residuals']),
columns=('CorrelationFitFunction','G2Residuals'),
index=rr['CumulantDelayTimes'])
data['cumulantfit']['CorrelationFitFunction'] -= 1.
data['cumulantfit'].index.names = ["tau"]
# remove obsolete, duplicate data
del data['RecentRun']['CorrelationFitFunction']
del data['RecentRun']['G2Residuals']
del data['RecentRun']['CumulantDelayTimes']
# extract reconstructed distribution
data['distribution'] = pd.DataFrame(zip(rr['ReconstructionIntensityDistribution'], rr['ReconstructionVolumeDistribution'], rr['ReconstructionNumberDistribution']),
columns=('intensity', 'volume', 'number'),
index=rr['ReconstructionRadiusNodes'])
data['distribution'].index.names = ["radius"]
# remove obsolete, duplicate data
del data['RecentRun']['ReconstructionIntensityDistribution']
del data['RecentRun']['ReconstructionVolumeDistribution']
del data['RecentRun']['ReconstructionNumberDistribution']
del data['RecentRun']['ReconstructionRadiusNodes']
return data
def readDLSDataAPKW(filename):
"""Yields none or more unique dicts, one for each measurement,
stored in the given APKW file name."""
#print(filename)
filename = Path(filename)
if zipfile.is_zipfile(filename):
with zipfile.ZipFile(filename, 'r') as zf:
for zi in zf.infolist():
if zi.filename.startswith('measurement'):
#print(zi)
data = bson.loads(zf.read(zi))
#pprint.pprint(data)
if data['StorageStatus'] == 3 and data['UsedAngle'] > 0:
data['filename'] = filename / str(data['Id']) # remember it here
yield convertAPKWentries(data)
def readDLSData(files, *args, **kwargs):
"""Read DLS measurements from provided files, *.ASC or *.apkw are supported."""
if not isinstance(files, list) and not isinstance(files, tuple):
files = (files,)
# gather all measurement data from APKW files first
dirData = list({datadict['Id']: datadict
for fn in files
for datadict in readDLSDataAPKW(fn)}.values())
# try to read ASC files
if not len(dirData):
dirData = [readDLSDataASC(fn) for fn in files]
return dirData
if __name__ == "__main__":
import doctest
doctest.testmod()