"""The phases subpackage implements a Python interface with the Phase
Objective-C classes as well as the infrastructure for pure phase thermodynamic calibration.
"""
from thermoengine import core
from thermoengine import model
from thermoengine import chem
import numpy as np
import scipy as sp
import pandas as pd
from os import path
import re
import copy
from pandas import DataFrame
from collections import OrderedDict
# specialized numerical imports
from scipy import stats, random, interpolate as interp, optimize as optim
import numdifftools as nd
RGAS = 8.3144598
# __all__ = ['Database','Data','ParamModel','RxnModel']
__all__ = ['Database']
#===================================================
[docs]class Database:
"""
Calibration database model object.
Parameters
----------
rxn_data: pandas df
experimental data input
modelDB: str
choose thermodynamic model database (e.g., 'Berman' is a valid input)
reaction_library:
TOLsvd: int
Ndraw: int
ortho_scale: int
ratio that dictates level of rxn complexity (orthogonality/simplicity)
TOL: int
complexity of rxns
ignore_kinetics: boolean, default False
contaminant_phases: str list
rxn_trans_typ: ['logisitic']
Tscl_energy: 1.0
Returns
-------
rxn_svd: array of ints
matrix of valid, lineraly independent reactions
TODO
----
- get trusted data?
- priors?
"""
RXN_FACTORS = ['flux', 'seed', 'contaminant']
RXN_SCL_VALUES = [1, 1, 1]
RXN_DEFAULT_VALUES = [1, 1, 0]
PHASE_PARAM_LOOKUP = {'V0': 'V', 'S0': 'S', 'dH0': 'delta H'}
T0 = 300.0
P0 = 1.0
def __init__(self, rxn_data, modelDB=None,reaction_library=None,
ignore_kinetics=False, contaminant_phases=None,
rxn_trans_typ='logistic', TOLsvd=1e-4, Ndraw=10,
ortho_scale=15, TOL=1e-10, phase_priors=None, rxn_priors=None,
ref_energy_phases=None):
if modelDB is None:
modelDB = model.Database()
self._init_rxns(rxn_data, modelDB, reaction_library, Ndraw, TOLsvd, ortho_scale, TOL)
self._init_params(phase_priors, rxn_priors, ref_energy_phases)
self.modelDB = modelDB
self.ignore_kinetics = ignore_kinetics
self.contaminant_phases = contaminant_phases
self.rxn_trans_typ = rxn_trans_typ
def _init_rxns(self, rxn_data, modelDB, reaction_library, Ndraw, TOLsvd, ortho_scale, TOL):
rxn_coefs = None
rxn_eqns = None #Need to revisit this
phases = None
if reaction_library is None:
#endmember_ids = [0,0]
#rxns = [modelDB.get_rxn(irxn_prop['phases'], endmember_ids,
#irxn_prop['coefs'])
#for irxn_prop in rxn_data.rxn_props]
#rxn_eqns = [irxn_prop['eqn'] for irxn_prop in rxn_data.rxn_props]
#phases = []
#for irxn in rxns:
#phases.extend(irxn.phases)
#phases = np.unique(phases)
phase_symbols=chem.get_phase_symbols(rxn_data)
rxn_svd_props = chem.calc_reaction_svd(phase_symbols, TOLsvd=TOLsvd)
rxn_svd = rxn_svd_props['rxn_svd']
Nbasis=len(rxn_svd)
wtcoefs, costs, rxn_coefs_raw, wtcoefs_ortho = chem.get_rxns(rxn_svd, Ndraw=Ndraw, ortho_scale=ortho_scale, Nbasis=Nbasis, TOL=TOL)
rxn_coefs = rxn_coefs_raw.copy()
(np.place(rxn_coefs, abs(rxn_coefs)< 1e-2, 0))
#rxns =
#endmember_ids = np.arange(0,len(rxn_coefs[0]))
phases = phase_symbols
else:
assert False, 'reaction_library is not None, need to implement user defined set of reactions'
self.reaction_library = reaction_library
self.rxn_data = rxn_data
self.rxn_coefs = rxn_coefs
self.rxn_eqns = rxn_eqns
self.rxns = rxns
self.phases = phases
#self.endmember_ids = endmember_ids
#TK
def _init_params(self, phase_priors, rxn_priors,
ref_energy_phases):
self._param_values = []
self._param_names = []
self._param_scales = []
self._set_ref_energy_phases(ref_energy_phases)
self._init_rxn_params()
self._init_phase_params()
N = len(self._param_names)
self._param_values = np.array(self._param_values)
self._param_names = np.array(self._param_names)
self._param_scales = np.array(self._param_scales)
self._free_params = np.tile(False, (N,))
self._set_phase_priors(phase_priors, ref_energy_phases)
self._set_rxn_priors(rxn_priors)
def _set_ref_energy_phases(self, ref_energy_phases):
phases = self.phases
#TK
if phases is None:
print('phases is None')
else:
phase_symbols = np.array(
[iphs.abbrev for iphs in phases])
print('phases is good to go')
#phase_symbols = np.array(
#[iphs.abbrev for iphs in phases])
if ref_energy_phases is None:
ref_energy_phases = [phase_symbols[0]]
else:
assert np.all([ref_phs in phase_symbols
for ref_phs in ref_energy_phases]),(
'The ref_energy_phases provided '
'are not valid.'
)
H0_ref_phases = []
for phase_name in ref_energy_phases:
iref_phase, = phases[phase_symbols==phase_name]
H0_ref_phases.append(iref_phase)
self._ref_energy_phases = H0_ref_phases
def _get_ref_energy(self, phase):
ref_energy_phases = self._ref_energy_phases
assert len(ref_energy_phases)==1, (
'Currently, only a single ref_energy_phase is implimented. Must work to implement composition-dependent enthalpy ref.'
)
ref_phase = ref_energy_phases[0]
H0, = ref_phase.get_param_values(param_names=['delta H'])
return H0
def _set_phase_priors(self, phase_priors, ref_energy_phases):
phases = self.phases
phase_symbols = np.array(
[iphs.abbrev for iphs in phases])
# from IPython import embed; embed(); import ipdb; ipdb.set_trace()
prior_name = []
prior_avg = []
prior_error = []
for ind, iphs in enumerate(phases):
isym = iphs.abbrev
imask = phase_priors['phase']==isym
if np.any(imask):
ipriors = phase_priors[imask]
for iname, iavg, ierror in zip(
ipriors['param_name'],
ipriors['param_value'],
ipriors['param_error']):
if iname=='H0':
iparam_name = 'dH0_P'+str(int(ind))
# Adjust dH0 value here
iavg = iavg
else:
iparam_name = iname+'_P'+str(int(ind))
prior_name.append(iparam_name)
prior_avg.append(iavg)
prior_error.append(ierror)
self._prior_name = np.array(prior_name)
self._prior_avg = np.array(prior_avg)
self._prior_error = np.array(prior_error)
def _set_rxn_priors(self, rxn_priors):
pass
def _append_param(self, param_name, param_value, scale=1):
self._param_names.append(param_name)
self._param_values.append(param_value/scale)
self._param_scales.append(scale)
def _init_rxn_params(self):
rxns = self.rxns
for factor, scl_val in zip(self.RXN_FACTORS,
self.RXN_SCL_VALUES):
param_basename = 'alpha_'+factor+'_R'
self._append_param(param_basename+'*', 0.0)
for ind, irxn in enumerate(rxns):
self._append_param(param_basename+str(ind), 0.0)
def _init_phase_params(self):
T0 = self.T0
P0 = self.P0
def get_set_phase_param(param_name):
if param_name.startswith('dH0_P'):
H0, iphs = self._get_phase_param(param_name, return_phase=True)
H0_ref = self._get_ref_energy(iphs)
# val0 = (H0 - H0_ref)/1e3
val0 = (H0 - H0_ref)
scale = np.abs(val0)
else:
val0 = self._get_phase_param(param_name)
scale = np.abs(val0)
self._append_param(param_name, val0, scale=scale)
for ind, iphs in enumerate(self.phases):
get_set_phase_param('V0_P'+str(ind))
get_set_phase_param('S0_P'+str(ind))
if iphs not in self._ref_energy_phases:
get_set_phase_param('dH0_P'+str(ind))
def _split_param_name(self, param_name):
ind_sep = param_name.rfind('_')
param_typ, phs_key = param_name[0:ind_sep], param_name[ind_sep+1:]
return param_typ, phs_key
def _get_phase_param(self, param_name, return_phase=False):
param_typ, phs_key = self._split_param_name(param_name)
assert phs_key[0]=='P', (
'This parameter must be a phase parameter.'
)
phs_ind = int(phs_key[1:])
iphs = self.phases[phs_ind]
value = iphs.get_param_values(
param_names=self.PHASE_PARAM_LOOKUP[param_typ])[0]
if return_phase:
return value, iphs
else:
return value
def _update_phase_params(self):
phase_param_names = ['delta H', 'V', 'S']
calib_param_basenames = ['dH0_P', 'V0_P', 'S0_P']
def _add_set_params(basename, phase, calib_names, calib_values,
phase_param_names, phase_param_values):
phase_param_lookup = {'V0':'V', 'S0':'S', 'dH0':'delta H'}
mask = [iname.startswith(basename) for iname in calib_names]
if np.any(mask):
val = calib_values[mask]
else:
val = None
if val is not None and basename=='dH0':
H0_ref = self._get_ref_energy(phase)
# H0 = 1e3*(val + H0_ref)
H0 = val + H0_ref
val = H0
if val is not None:
phase_param_values.append(val)
phase_param_names.append(phase_param_lookup[basename])
pass
for ind, iphase in enumerate(self.phases):
icalib_param_names = self.get_param_group(kind='phase', id=ind)
# icalib_param_values = self.param_values(
# param_group=icalib_param_names, scale_params=False)
icalib_param_values = self.param_values(
param_group=icalib_param_names, scale_params=True)
iphase_param_names = []
iphase_param_values = []
_add_set_params('V0', iphase,
icalib_param_names, icalib_param_values,
iphase_param_names, iphase_param_values)
_add_set_params('S0', iphase,
icalib_param_names, icalib_param_values,
iphase_param_names, iphase_param_values)
_add_set_params('dH0', iphase,
icalib_param_names, icalib_param_values,
iphase_param_names, iphase_param_values)
iphase.set_param_values(param_names=iphase_param_names,
param_values=iphase_param_values)
pass
#===========================
def _get_param_group_index(self, param_group):
if param_group is None:
param_group = self.get_param_group(free=True)
else:
param_group = np.array(param_group)
loc = np.zeros(len(param_group), dtype=int)
for ind, name in enumerate(param_group):
iloc, = np.where(self._param_names==name)
loc[ind] = iloc
return loc
[docs] def get_param_group(self, kind='all', id=None,
base=None, free=None):
"""
kind: ['all', 'phase', 'rxn']
id: [None, '*', int]
base: [None, str]
free: [None, True, False]
"""
param_names = self._param_names
def _get_param_mask(symbol, id):
base = '.*_'+symbol
if id is None:
mask = np.array(
[re.match(base+'[\*0-9]*', iname) is not None
for iname in param_names])
elif id == '*':
mask = np.array(
[re.match(base+'\*', iname) is not None
for iname in param_names])
else:
mask = np.array(
[re.match(base+str(int(id)), iname) is not None
for iname in param_names])
return mask
if kind == 'all':
mask = np.tile(True, param_names.size)
elif kind == 'phase':
mask = _get_param_mask('P', id)
elif kind == 'rxn':
mask = _get_param_mask('R', id)
else:
assert False, (
'That is not a valid param_group kind.'
)
if free is None:
pass
elif free:
mask = mask & self._free_params
else:
mask = mask & ~self._free_params
if base is not None:
mask = mask & np.array([name.startswith(base)
for name in param_names])
param_group = param_names[mask]
return param_group
def scale_params(self, param_values, param_group=None):
param_scales = self.param_scales(param_group=param_group)
scaled_param_values = param_values*param_scales
return scaled_param_values
def unscale_params(self, scaled_param_values, param_group=None):
param_scales = self.param_scales(param_group=param_group)
param_values = param_values/param_scales
return param_values
def param_names(self, param_group=None):
ind = self._get_param_group_index(param_group)
return self._param_names[ind]
def param_scales(self, param_group=None):
ind = self._get_param_group_index(param_group)
return self._param_scales[ind]
def param_values(self, param_group=None, scale_params=False):
ind = self._get_param_group_index(param_group)
param_values = self._param_values[ind]
if scale_params:
return self.scale_params(param_values, param_group=param_group)
else:
return param_values
def param_errors(self, param_group=None):
ind = self._get_param_group_index(param_group)
return self._param_errors[ind]
def set_param_values(self, param_values, param_group=None):
ind = self._get_param_group_index(param_group)
self._param_values[ind] = param_values
self._update_phase_params()
pass
def add_free_params(self, param_group):
ind = self._get_param_group_index(param_group)
self._free_params[ind] = True
def del_free_params(self, param_group):
ind = self._get_param_group_index(param_group)
self._free_params[ind] = False
pass
#===========================
def rxn_affinity(self):
rxns = self.rxns
rxn_data = self.rxn_data
P = rxn_data.conditions['P']
T = rxn_data.conditions['T']
rxn_id = rxn_data.rxn['rxn_id']
rxn_affinity = np.zeros(len(P))
for ind, (irxn_id, iT, iP) in enumerate(zip(rxn_id, T, P)):
irxn = rxns[irxn_id]
rxn_affinity[ind] = irxn.affinity(iT, iP)
return rxn_affinity
def rxn_affinity_error(self):
rxns = self.rxns
rxn_data = self.rxn_data
P = rxn_data.conditions['P']
T = rxn_data.conditions['T']
P_err = rxn_data.conditions['P_err']
T_err = rxn_data.conditions['T_err']
rxn_id = rxn_data.rxn['rxn_id']
affinity_err = np.zeros(len(P))
# from IPython import embed; embed(); import ipdb; ipdb.set_trace()
for ind, (irxn_id, iT, iP, iTerr, iPerr) in enumerate(zip(rxn_id, T, P, T_err, P_err)):
irxn = rxns[irxn_id]
irxn_vol = irxn.volume(iT, iP)
irxn_entropy = irxn.entropy(iT, iP)
affinity_err[ind] = np.sqrt(
(irxn_vol*iPerr)**2 + (irxn_entropy*iTerr)**2
)
return affinity_err
def rxn_affinity_thresh(self):
rxns = self.rxns
rxn_data = self.rxn_data
P = rxn_data.conditions['P']
T = rxn_data.conditions['T']
P_err = rxn_data.conditions['P_err']
T_err = rxn_data.conditions['T_err']
rxn_id = rxn_data.rxn['rxn_id']
affinity_thresh = np.zeros(len(P))
if self.ignore_kinetics:
return affinity_thresh
# # get universal reaction parameters
# alpha_t_all = param_d['alpha_t_rxn_all']
# alpha_T_all = param_d['alpha_T_rxn_all']
# alpha_H2O_all = param_d['alpha_H2O_rxn_all']
# for ind,(rxn_eqn, rxn_obj) in enumerate(zip(rxn_eqn_l,rxn_l)):
# msk_rxn = dat_d['rxn']==rxn_eqn
# idGrxn_a = rxn_obj.get_rxn_gibbs_energy(dat_d['T'][msk_rxn],
# dat_d['P'][msk_rxn],
# peratom=True )
# # idVrxn_a = rxn_obj.get_rxn_volume(dat_d['T'][msk_rxn],
# # dat_d['P'][msk_rxn],
# # peratom=True )
# # idSrxn_a = rxn_obj.get_rxn_entropy(dat_d['T'][msk_rxn],
# # dat_d['P'][msk_rxn],
# # peratom=True )
# # get reaction-specific parameters
# alpha_0_rxn = param_d['alpha_0_rxn'+str(ind)]
# dalpha_t_rxn = param_d['dalpha_t_rxn'+str(ind)]
# dalpha_T_rxn = param_d['dalpha_T_rxn'+str(ind)]
# dalpha_H2O_rxn = param_d['dalpha_H2O_rxn'+str(ind)]
# logGth_a[msk_rxn] = alpha_0_rxn \
# + (alpha_t_all+dalpha_t_rxn)*dat_d['time'][msk_rxn] \
# + (alpha_T_all+dalpha_T_rxn)*dat_d['T'][msk_rxn] \
# + (alpha_H2O_all+dalpha_H2O_rxn)*dat_d['water'][msk_rxn]
# dGrxn_a[msk_rxn] = idGrxn_a
# # dVrxn_a[msk_rxn] = idVrxn_a
# # dSrxn_a[msk_rxn] = idSrxn_a
# Gth_a = Gthresh_scl*np.exp(logGth_a)
# # sigG_a = np.sqrt((dat_d['Perr']*dVrxn_a)**2+(dat_d['Terr']*dSrxn_a)**2)
# loglk_a = np.zeros(Gth_a.size)
# loglk_a[msk_rxndir] = iloglk_a
# # from IPython import embed; embed(); import ipdb; ipdb.set_trace()
# for ind, (irxn_id, iT, iP, iTerr, iPerr) in enumerate(zip(rxn_id, T, P, T_err, P_err)):
# irxn = rxns[irxn_id]
# irxn_vol = irxn.volume(iT, iP)
# irxn_entropy = irxn.entropy(iT, iP)
# affinity_err[ind] = np.sqrt(
# (irxn_vol*iPerr)**2 + (irxn_entropy*iTerr)**2
# )
return affinity_thresh
def eval_model_costfun(self, param_values, param_group=None,
full_output=False, kind='logistic'):
N = len(param_values)
# log_prior = -0.5*((param_values-np.array([29,-125, -3.5]))/10)**2
# log_prior = -0.5*((param_values-np.array([5.147, 4.412, 4.983]))/0.1)**2
# log_prior = -0.5*((param_values-np.array([1,1,1]))/0.02)**2
prior = np.sign(param_values)*np.ones(param_values.shape)
log_prior = -0.5*((param_values-prior)/0.02)**2
# param_values = 5*np.exp(param_values*1e-3)
self.set_param_values(param_values, param_group=param_group)
# What about trust values?
affinity = self.rxn_affinity()
affinity_err = self.rxn_affinity_error()
affinity_thresh = self.rxn_affinity_thresh()
rxn_dir = self.rxn_data.rxn['rxn_dir']
log_like = Stats.logprob_rxn_dir(rxn_dir, affinity, affinity_err,
affinity_thresh, kind=kind)
# log_prior = -0.5*((param_values-np.array([29,-125, -3.5]))/1)**2
log_like_tot = np.sum(log_like)
log_prior_tot = np.sum(log_prior)
# log_prior = self.eval_log_prior()
cost_val = np.hstack((-log_like, -log_prior))
cost_tot = - log_like_tot - log_prior_tot
if full_output:
output = OrderedDict()
output['cost_tot'] = cost_tot
output['cost_val'] = cost_val
output['log_like'] = log_like
output['log_prior'] = log_prior
output['log_prior_tot'] = log_prior_tot
output['log_like_tot'] = log_like_tot
output['affinity'] = affinity
output['affinity_err'] = affinity_err
output['affinity_thresh'] = affinity_thresh
return output
else:
return cost_tot
def fit_model(self, param_group=None, full_output=False,
kind='logistic', method='Nelder-Mead'):
# Extract only trustworthy data
# self._dat_trust_d = self.extract_trust_data()
params0 = self.param_values(param_group=param_group)
# params0 = np.log(params0/5)/1e-3
model_cost0 = self.eval_model_costfun(
params0, param_group=param_group,
kind=kind, full_output=True)
# print(params0)
# print(model_cost0)
# param0_unscale_a = self.get_param_values(free_params)
# param0_a = self.unscale_params(param0_unscale_a, free_params)
# param0_tbl = self.get_param_table(param_nm_a=free_params)
# Precalculate approx Gibbs energy uncertainties
# sigG_trust_a = self.propagate_data_errors(param0_unscale_a,
# free_params=free_params)
# self._sigG_trust_a = sigG_trust_a
# costfun = lambda params: self.eval_model_costfun_scl(
# params0, free_params=free_params)
# lnprob_f = lambda param_a: -costfun(param_a)
costfun = lambda params: self.eval_model_costfun(
params, param_group=param_group, kind=kind, full_output=False)
method = 'Nelder-Mead'
# method = 'BFGS'
result = optim.minimize(costfun, params0, method=method,
options={'disp':True, 'maxiter':1e4})
# set best-fit value
self.set_param_values(result.x, param_group=param_group)
return result
# def shift_one_param(shift,ind,mu_a=result.x,costfun=costfun):
# param_a = np.copy(mu_a)
# param_a[ind] += shift
# return costfun(param_a)
# # Create re-scaled-shifted function for hessian
# mu_a = result.x
# cost0 = costfun(mu_a)
# delx_param_scl = np.zeros(mu_a.shape)
# dcost_target=1
# for ind,param in enumerate(mu_a):
# del0 = 1e-2
# idelcostfun = lambda dx, ind=ind,target=dcost_target: \
# shift_one_param(dx,ind)-cost0-dcost_target
# delx = optim.fsolve(idelcostfun,del0)
# delx_param_scl[ind] = np.abs(delx)
# norm_costfun = lambda dx_a, shift_scl=delx_param_scl,\
# mu_a=mu_a,costfun=costfun: costfun(dx_a*shift_scl+mu_a)
# curv_scl_a = delx_param_scl*self.get_param_scl_values(free_params)
# scl_mat_a = core.make_scale_matrix(curv_scl_a)
# Hnorm_fun = nd.Hessian(norm_costfun,step = 1e-2)
# Hnorm_a = Hnorm_fun(np.zeros(mu_a.shape))
# covnorm_a = np.linalg.pinv(Hnorm_a)
# cov_a = covnorm_a*scl_mat_a
# try:
# err_a = np.sqrt(np.diag(cov_a))
# # print(cov_a)
# err_scl_a = core.make_scale_matrix(err_a)
# corr_a = cov_a/err_scl_a
# except:
# err_a = None
# corr_a = None
# # MCMC
# # ndim = len(free_params)
# # nwalkers = 10*ndim
# # walker_pos0_a = [result['x'] + 1e-4*np.random.randn(ndim)
# # for i in range(nwalkers)]
# # sampler = emcee.EnsembleSampler(nwalkers, ndim,lnprob_f)
# # sampler.run_mcmc(walker_pos0_a,10)
# # from IPython import embed; embed(); import ipdb; ipdb.set_trace()
# paramf_unscale_a = self.scale_params(result.x, free_params)
# self.set_param_values(paramf_unscale_a,free_params)
# self._fit_result_d = result
# if full_output:
# output_d = {}
# output_d['err_a'] = err_a
# output_d['corr_a'] = corr_a
# self._mu_a = paramf_unscale_a
# self._err_a = err_a
# self._corr_a = corr_a
# for key in self._param_error_d:
# self._param_error_d[key] = np.nan
# for key,err_val in zip(free_params,err_a):
# self._param_error_d[key] = err_val
# model_cost_d = self.eval_model_costfun(paramf_unscale_a,
# free_params=free_params,
# full_output=True)
# param_tbl = self.get_param_table(param_nm_a=free_params)
# param_all_tbl = self.get_param_table(typ='all')
# output_d['free_params'] = free_params
# output_d['costval0'] = model_cost0_d['cost_val']
# output_d['costdata0_df'] = model_cost0_d['cost_data_df']
# output_d['param0_tbl'] = param0_tbl
# output_d['costval'] = model_cost_d['cost_val']
# output_d['costdata_df'] = model_cost_d['cost_data_df']
# output_d['prior_df'] = model_cost_d['prior_df']
# output_d['param_tbl'] = param_tbl
# output_d['param_all_tbl'] = param_all_tbl
# output_d['result'] = result
# # output_d['param_d'] = copy.copy(self._param_d)
# # output_d['param0_a'] = param0_a
# # output_d['paramf_a'] = result.x
# # output_d['param0_unscl_a'] = param0_unscale_a
# # output_d['paramf_unscl_a'] = paramf_unscale_a
# return output_d
pass
#===================================================
class Stats:
@classmethod
def logprior_fun(cls, x, kind='studentt', dof=5):
if kind == 'studentt':
# Variance of student's t distribution is slightly larger than a
# normal (depending on dof). Thus the relative residual x must be
# scaled down to match the desired standard deviation.
const = np.sqrt(1.0*dof/(dof-2))
log_prob = stats.t.logpdf(x/const,dof)
elif (kind == 'normal') | (kind == 'erf'):
log_prob = stats.norm.log_pdf(x)
return log_prob
@classmethod
def rxn_trans_fun(self, x, kind='logistic'):
if kind == 'logistic':
const = 1.8138 # pi/sqrt(3)
# F_a = 1.0/(1+np.exp(-const*x))
# Special optimized version of logistic function
prob = sp.special.expit(const*x)
elif (kind == 'normal') | (kind == 'erf'):
const = 0.70711 # 1/sqrt(2)
prob = 0.5*(1+sp.special.erf(const*x))
return prob
@classmethod
def rxn_logtrans_fun(cls, x, kind='logistic'):
if kind == 'logistic':
const = 1.8138 # pi/sqrt(3)
# Special optimized version of logistic function
log_prob = -np.logaddexp(0,-const*x)
elif (kind == 'normal') | (kind == 'erf'):
const = 0.70711 # 1/sqrt(2)
# Special optimized version of log-cdf for normal distribution
log_prob = sp.special.log_ndtr(x)
return log_prob
@classmethod
def logprob_rxn_dir(cls, rxn_dir, affinity, affinity_err, affinity_thresh,
kind='logistic'):
"""
rxndir = ['FWD', 'REV', 'NC', 'FWD?', 'REV?', 'NC?']
"""
shp = affinity.shape
x_fwd = (affinity-affinity_thresh)/affinity_err
x_rev = -(affinity+affinity_thresh)/affinity_err
ones = np.ones(shp)
zeros = np.zeros(shp)
log_prob = np.zeros(shp)
log_prob[rxn_dir=='FWD'] = cls.rxn_logtrans_fun(
x_fwd[rxn_dir=='FWD'], kind=kind)
log_prob[rxn_dir=='REV'] = cls.rxn_logtrans_fun(
x_rev[rxn_dir=='REV'], kind=kind)
log_prob[rxn_dir=='BIASED'] = 0.0
log_prob[rxn_dir=='FWD?'] = sp.special.logsumexp(
np.vstack((
cls.rxn_logtrans_fun(x_rev[rxn_dir=='FWD?'], kind=kind),
zeros[rxn_dir=='FWD?'])), axis=0,
b=np.vstack((-ones[rxn_dir=='FWD?'], +ones[rxn_dir=='FWD?'])))
log_prob[rxn_dir=='REV?'] = sp.special.logsumexp(
np.vstack((
cls.rxn_logtrans_fun(x_fwd[rxn_dir=='REV?'], kind=kind),
zeros[rxn_dir=='REV?'])), axis=0,
b=np.vstack((-ones[rxn_dir=='REV?'], +ones[rxn_dir=='REV?'])))
log_prob[rxn_dir=='NC'] = sp.special.logsumexp(
np.vstack((
cls.rxn_logtrans_fun(x_fwd[rxn_dir=='REV?'], kind=kind),
cls.rxn_logtrans_fun(x_rev[rxn_dir=='REV?'], kind=kind),
zeros[rxn_dir=='REV?'])), axis=0,
b=np.vstack((-ones[rxn_dir=='REV?'], -ones[rxn_dir=='REV?'], +ones[rxn_dir=='REV?'])))
log_prob[np.isnan(log_prob)] = -np.inf
# from IPython import embed; embed(); import ipdb; ipdb.set_trace()
return log_prob
#===================================================
class Database_OLD:
def __init__(self, rxn_data, thermoDB=None, ignore_kinetics=False,
contaminant_phases=None, rxn_trans_typ='logistic',
Tscl_energy=1.0):
if thermoDB is None:
thermoDB = model.Database()
self.rxn_data = rxn_data
self.thermoDB = thermoDB
self.ignore_kinetics = ignore_kinetics
self.contaminant_phases = contaminant_phases
self.rxn_trans_typ = rxn_trans_typ
# self.datadir = DATADIR
self.Escl = 3.0/2*RGAS*Tscl_energy
# self.Gthresh_scl = 3.0/2*Rgas*Tthresh_scl
# self.Gthresh_scl = self.Escl
# dat_df, rxn_d_l, phasesym_l = self.filter_phase_rev_data(
# raw_df, mask_phs_l=mask_phs_l)
# self._dat_trust_d = self.extract_trust_data()
# self.load_exp_prior_data()
self._param_d = {}
self.init_model_phases(phasesym_l)
self.init_model_rxns(rxn_d_l)
self.init_exp_priors()
self.init_param_scl()
param_error_d = self._param_d.copy()
for key in param_error_d:
param_error_d[key] = np.nan
self._param_error_d = param_error_d
param_name_a = np.array(list(self._param_d.keys()))
self._free_param_a = param_name_a
self.load_exp_prior_data()
# sigG_trust_a = self.propagate_data_errors(param0_unscale_a)
# self._sigG_trust_a = sigG_trust_a
self._sigG_trust_a = None
def load_exp_prior_data( self ):
datadir = self.datadir
filenm = 'ExpPriorData.xlsx'
parentpath = path.dirname(__file__)
pathnm = path.join(parentpath,datadir,filenm)
exp_prior_data_df = pd.read_excel(pathnm,sheetname=None)
# Cycle through sheets, each representing a different parameter
all_prior_df = pd.DataFrame()
for paramnm in exp_prior_data_df:
if (paramnm[0]=='<') & (paramnm[-1]=='>'):
# This sheet provides metadata (such as references) rather than
# actual priors
continue
data_df = exp_prior_data_df[paramnm]
param_s = pd.Series(np.tile(paramnm,data_df.shape[0]))
data_df['Param'] = param_s
prior_data_df = data_df[['Phase','Abbrev','Param','Trust',
'Data','Error','Ref']]
all_prior_df = all_prior_df.append(prior_data_df)
# phssym_a = data_df['Abbrev']
# val_a = data_df['Data']
# err_a = data_df['Error']
# trust_a = data_df['Trust']
# refID_a = data_df['RefID']
# for sym in phssym_a:
# prior_d = {'refID':refID_a,'phase_sym':phssym_a,'value':val_a,
# 'error':err_a,'trust':trust_a}
self.all_prior_df = all_prior_df
pass
def init_exp_priors(self):
all_prior_df = self.all_prior_df
exp_prior_df = pd.DataFrame()
for ind,phs in enumerate(self.phs_key):
prior_dat_phs_df = all_prior_df[all_prior_df['Abbrev'] == phs].copy()
param_name_s = pd.Series(prior_dat_phs_df['Param']+str(ind))
prior_dat_phs_df['Param'] = param_name_s
exp_prior_df = exp_prior_df.append(prior_dat_phs_df,ignore_index=True)
# print(prior_dat_phs_df)
self.exp_prior_df = exp_prior_df
pass
def init_param_scl(self):
param_name_a = np.array(list(self._param_d.keys()))
# Define the scale for each type of parameter
S0_param_keys_a = np.sort(param_name_a[np.char.startswith(param_name_a,'S0_phs')])
V0_param_keys_a = np.sort(param_name_a[np.char.startswith(param_name_a,'V0_phs')])
dH0_param_keys_a = np.sort(param_name_a[np.char.startswith(param_name_a,'dH0_phs')])
S0_scl_atom = np.mean(self.get_param_values(S0_param_keys_a)/self.Natom_a)
V0_scl_atom = np.mean(self.get_param_values(V0_param_keys_a)/self.Natom_a)
# dH0_a = self.get_param_values(dH0_param_keys_a)/self.Natom_a
# dH0_scl_atom = 3./2*Rgas*1e1
dH0_scl_atom = self.Escl
# alpha_T_scl = 1.0/1000 # 1/K
alpha_T_scl = 1.0/1.0 # 1/K
alpha_t_scl = 1.0
alpha_H2O_scl = 1.0
alpha_0_scl = 1.0
param_scl_d = {}
for ind,phs in enumerate(self.phase_l):
Natom=phs.props_d['Natom']
param_scl_d['S0_phs'+str(ind)] = S0_scl_atom*Natom
param_scl_d['V0_phs'+str(ind)] = V0_scl_atom*Natom
param_scl_d['dH0_phs'+str(ind)] = dH0_scl_atom*Natom
param_scl_d['alpha_t_rxn_all'] =alpha_t_scl
param_scl_d['alpha_T_rxn_all'] =alpha_T_scl
param_scl_d['alpha_H2O_rxn_all'] =alpha_H2O_scl
# logGth_rxn = log(3/2*Rgas*1) + alpha_i*X_i
rxn_l = self.rxn_l
for ind,rxn in enumerate(rxn_l):
# self._param_d['logGth0_rxn'+str(ind)] = -
param_scl_d['alpha_0_rxn'+str(ind)] = alpha_0_scl
param_scl_d['dalpha_t_rxn'+str(ind)] = alpha_t_scl
param_scl_d['dalpha_T_rxn'+str(ind)] = alpha_T_scl
param_scl_d['dalpha_H2O_rxn'+str(ind)] = alpha_H2O_scl
self.param_scl_d = param_scl_d
pass
def init_model_phases( self, phasesym_l ):
phase_l = [ self.thermoDB.new_phase(phasesym) for phasesym in phasesym_l]
Natom_a = [phs.props_d['Natom'] for phs in phase_l]
self._phasesym_l = phasesym_l
self.phase_l = phase_l
self.Natom_a = Natom_a
self.init_std_state_params()
def init_std_state_params(self):
phase_l = self.phase_l
phasesym_l = []
for ind,phs in enumerate(phase_l):
iparam_a = phs.get_param_values( param_names=['S','V','delta H'] )
phasesym_l.append(phs.props_d['abbrev'])
self._param_d['S0_phs'+str(ind)] = iparam_a[0]
self._param_d['V0_phs'+str(ind)] = iparam_a[1]
self._param_d['dH0_phs'+str(ind)] = iparam_a[2]
pass
def init_model_rxns(self, rxn_d_l):
rxn_obj_l = []
rxn_eqn_l = []
for rxn_d in rxn_d_l:
rxn_obj = self.thermoDB.new_rxn( rxn_d['reac_l'], rxn_d['prod_l'] )
rxn_obj_l.append(rxn_obj)
rxn_eqn_l.append(rxn_d['rxn_eqn'])
self.rxn_l = rxn_obj_l
self._rxn_eqn_l = rxn_eqn_l
self.init_rxn_params()
def init_rxn_params(self,dTthresh0=1.0):
self._param_d['alpha_t_rxn_all'] = 0.0
self._param_d['alpha_T_rxn_all'] = 0.0
self._param_d['alpha_H2O_rxn_all'] = 0.0
# logGth_rxn = log(3/2*Rgas*1) + alpha_i*X_i
rxn_l = self.rxn_l
for ind,rxn in enumerate(rxn_l):
# self._param_d['logGth0_rxn'+str(ind)] = -
self._param_d['alpha_0_rxn'+str(ind)] = 0.0
self._param_d['dalpha_t_rxn'+str(ind)] = 0.0
self._param_d['dalpha_T_rxn'+str(ind)] = 0.0
self._param_d['dalpha_H2O_rxn'+str(ind)] = 0.0
pass
@property
def param_d(self):
return copy.copy(self._param_d)
@property
def rxn_key(self):
return pd.Series(self._rxn_eqn_l)
@property
def phs_key(self):
return pd.Series(self._phasesym_l)
@property
def param_d(self):
return copy.copy(self._param_d)
#########
def get_param_names(self, typ='all'):
# typ = ['all','free','rxn','phs','rxnadj','rxnall']
param_names_a = np.array(list(self._param_d.keys()))
if typ is not None:
if typ == 'all':
param_names_typ_a = param_names_a
elif typ == 'free':
param_names_typ_a = self._free_param_a
elif typ == 'rxn':
param_names_typ_a = \
param_names_a[np.char.rfind(param_names_a,'_rxn')>=0]
elif typ == 'rxnadj':
param_names_typ_a = \
param_names_a[np.char.startswith(param_names_a,'dalpha')]
elif typ == 'rxnall':
param_names_typ_a = \
param_names_a[np.char.rfind(param_names_a,'_rxn_all')>=0]
elif typ == 'phs':
param_names_typ_a = \
param_names_a[np.char.rfind(param_names_a,'_phs')>=0]
else:
assert False, typ+' is not a valid param typ for get_param_names.'
# Finally, sort params into sensible order
# msk_phs_a = np.char.find(param_names_typ_a,'_phs')>=0
# msk_rxn_all_a = np.char.find(param_names_typ_a,'_rxn_all')>=0
# msk_rxn_a = np.char.find(param_names_typ_a,'_rxn')>=0
msk_phs_a = np.array([istr.find('_phs')>=0 for istr in param_names_typ_a])
msk_rxn_all_a = np.array([istr.find('_rxn_all')>=0 for istr in param_names_typ_a])
msk_rxn_a = np.array([istr.find('_rxn')>=0 for istr in param_names_typ_a])
msk_rxn_a = msk_rxn_a*~msk_rxn_all_a
msk_other_a = ~np.any((msk_phs_a,msk_rxn_all_a,msk_rxn_a),axis=0)
param_names_sort_a = []
if(np.any(msk_phs_a)):
param_names_sort_a.extend(np.sort(param_names_typ_a[msk_phs_a]))
if(np.any(msk_rxn_all_a)):
param_names_sort_a.extend(np.sort(param_names_typ_a[msk_rxn_all_a]))
if(np.any(msk_rxn_a)):
param_names_sort_a.extend(np.sort(param_names_typ_a[msk_rxn_a]))
if(np.any(msk_other_a)):
param_names_sort_a.extend(np.sort(param_names_typ_a[msk_other_a]))
param_names_sort_a = np.array(param_names_sort_a)
return param_names_sort_a
def get_param_values(self, param_key_a, typ=None):
if typ is not None:
param_key_a = self.get_param_names(typ=typ)
param_a = []
for key in param_key_a:
param_a.append(self._param_d[key])
return np.array(param_a)
def get_param_errors(self, param_key_a, typ=None):
if typ is not None:
param_key_a = self.get_param_names(typ=typ)
error_a = []
for key in param_key_a:
error_a.append(self._param_error_d[key])
return np.array(error_a)
def get_param_scl_values(self, param_key_a, typ=None):
if typ is not None:
param_key_a = self.get_param_names(typ=typ)
param_scl_a = []
for key in param_key_a:
param_scl_a.append(self.param_scl_d[key])
return np.array(param_scl_a)
def get_param_table(self, param_nm_a=None, typ='all'):
if param_nm_a is None:
param_nm_a = self.get_param_names(typ=typ)
param_val_a = self.get_param_values(param_nm_a)
param_scl_a = self.get_param_scl_values(param_nm_a)
scaled_param_a = self.unscale_params(param_val_a, param_nm_a)
err_a = self.get_param_errors(param_nm_a)
param_tbl_d = {'name':param_nm_a,
'value':param_val_a,
'error':err_a,
'scale':param_scl_a,
'scaled value':scaled_param_a}
param_tbl_df = pd.DataFrame(param_tbl_d,columns=['name','value','error',
'scale','scaled value'])
return param_tbl_df
def set_param_values(self, param_val_a, param_key_a ):
# print(param_val_a)
for key, val in zip(param_key_a, param_val_a):
self._param_d[key] = val
if key.rfind('phs') >=0:
self.set_phaseobj_param(val,key)
pass
def set_phaseobj_param(self, param_val, param_key):
phsid = 'phs'
loc = param_key.rfind(phsid)
phs_ind = int(param_key[loc+len(phsid):])
iphs = self.phase_l[phs_ind]
# print(param_key)
# print(param_val)
if param_key.startswith('S0'):
iphs.set_param_values(param_names=['S'],param_values=[param_val])
# print(iphs.get_param_values(param_names=['S']))
elif param_key.startswith('V0'):
iphs.set_param_values(param_names=['V'],param_values=[param_val])
# print(iphs.get_param_values(param_names=['V']))
elif param_key.startswith('dH0'):
initval= iphs.get_param_values(param_names=['delta H'])
initgibbs = iphs.get_gibbs_energy(300,1)
iphs.set_param_values(param_names=['delta H'],param_values=[param_val])
setval = iphs.get_param_values(param_names=['delta H'])
setgibbs = iphs.get_gibbs_energy(300,1)
# print(initval,setval,setval-initval)
# print('%%%%%')
# print(initgibbs,setgibbs,setgibbs-initgibbs)
# print('============')
else:
assert False, param_key+' is not a valid phase parameter name.'
pass
def add_free_params(self, new_free_param_a, typ=None):
if typ is not None:
new_free_param_a = self.get_param_names(typ=typ)
free_param_a = np.hstack((self._free_param_a,new_free_param_a))
self._free_param_a = np.unique(free_param_a)
pass
def del_free_params(self, fix_param_a, typ=None ):
if typ is not None:
fix_param_a = self.get_param_names(typ=typ)
free_param_a = self._free_param_a
self._free_param_a = np.setdiff1d(free_param_a, fix_param_a)
pass
#########
def extract_trust_data(self):
"""
Convert units
"""
data_df = self.data_df
trust_msk = data_df['Trust']=='Yes'
# print(data_df[trust_msk])
# test_df = data_df[trust_msk].set_index('Rxn')
# print(test_df)
# NEED TO CAST as FLOAT!!
num_dat_df = data_df[['P','P_err','T','T_err',
'equil_time']][trust_msk].astype(np.float)
P_a = 1e3*num_dat_df['P'].values
Perr_a = 1e3*num_dat_df['P_err'].values
T_a = 273.15+num_dat_df['T'].values
Terr_a = num_dat_df['T_err'].values
# time_a = np.log10(num_dat_df['equil_time'].values) # NOTE: use log time
time_a = num_dat_df['equil_time'].values # NOTE: use log time
pubid_a = data_df['PubID'][trust_msk].values
run_num_a = data_df['Run Num.'][trust_msk].values
rxn_dir_a = data_df['rxn_dir'][trust_msk].values
rxn_a = data_df['rxn_studied'][trust_msk].values
# NOTE: need to fix Water flag to flux_amt
water_a = data_df['Water'][trust_msk].values
water_flag_a = 1.0*np.ones(water_a.shape)
water_flag_a[water_a=='dry'] = 0.0
water_flag_a[water_a=='trace'] = 0.0
data_trust_d = {}
data_trust_d['pubid'] = pubid_a
data_trust_d['run_num'] = run_num_a
data_trust_d['P'] = P_a
data_trust_d['Perr'] = Perr_a
data_trust_d['T'] = T_a
data_trust_d['Terr'] = Terr_a
data_trust_d['time'] = time_a
data_trust_d['rxn_dir'] = rxn_dir_a
data_trust_d['rxn'] = rxn_a
data_trust_d['water'] = water_flag_a
return data_trust_d
def fit_model(self, free_params=None, full_output=False,
method='Nelder-Mead'):
if free_params is None:
free_params = self.get_param_names(typ='free')
# if not np.any([startswith(fix_param,'dH0') for fix_param in fix_param_a]):
# assert False, 'User MUST fix some of the Std. State enthalpy params, dH0_X, relative to the elements.'
# Extract only trustworthy data
self._dat_trust_d = self.extract_trust_data()
param0_unscale_a = self.get_param_values(free_params)
param0_a = self.unscale_params(param0_unscale_a, free_params)
param0_tbl = self.get_param_table(param_nm_a=free_params)
# Precalculate approx Gibbs energy uncertainties
sigG_trust_a = self.propagate_data_errors(param0_unscale_a,
free_params=free_params)
self._sigG_trust_a = sigG_trust_a
model_cost0_d = self.eval_model_costfun(param0_unscale_a,
free_params=free_params,
full_output=True)
costfun = lambda param_a: self.eval_model_costfun_scl(param_a,
free_params=free_params)
lnprob_f = lambda param_a: -costfun(param_a)
# costfun = self.eval_model_costfun_scl( param_free_vals_scl_a,
# free_params=free_params )
# method = 'Nelder-Mead'
# method = 'BFGS'
result = optim.minimize(costfun,param0_a,method=method,
options={'disp':True,'maxiter':1e4})
def shift_one_param(shift,ind,mu_a=result.x,costfun=costfun):
param_a = np.copy(mu_a)
param_a[ind] += shift
return costfun(param_a)
# Create re-scaled-shifted function for hessian
mu_a = result.x
cost0 = costfun(mu_a)
delx_param_scl = np.zeros(mu_a.shape)
dcost_target=1
for ind,param in enumerate(mu_a):
del0 = 1e-2
idelcostfun = lambda dx, ind=ind,target=dcost_target: \
shift_one_param(dx,ind)-cost0-dcost_target
delx = optim.fsolve(idelcostfun,del0)
delx_param_scl[ind] = np.abs(delx)
norm_costfun = lambda dx_a, shift_scl=delx_param_scl,\
mu_a=mu_a,costfun=costfun: costfun(dx_a*shift_scl+mu_a)
curv_scl_a = delx_param_scl*self.get_param_scl_values(free_params)
scl_mat_a = core.make_scale_matrix(curv_scl_a)
Hnorm_fun = nd.Hessian(norm_costfun,step = 1e-2)
Hnorm_a = Hnorm_fun(np.zeros(mu_a.shape))
covnorm_a = np.linalg.pinv(Hnorm_a)
cov_a = covnorm_a*scl_mat_a
try:
err_a = np.sqrt(np.diag(cov_a))
# print(cov_a)
err_scl_a = core.make_scale_matrix(err_a)
corr_a = cov_a/err_scl_a
except:
err_a = None
corr_a = None
# MCMC
# ndim = len(free_params)
# nwalkers = 10*ndim
# walker_pos0_a = [result['x'] + 1e-4*np.random.randn(ndim)
# for i in range(nwalkers)]
# sampler = emcee.EnsembleSampler(nwalkers, ndim,lnprob_f)
# sampler.run_mcmc(walker_pos0_a,10)
# from IPython import embed; embed(); import ipdb; ipdb.set_trace()
paramf_unscale_a = self.scale_params(result.x, free_params)
self.set_param_values(paramf_unscale_a,free_params)
self._fit_result_d = result
if full_output:
output_d = {}
output_d['err_a'] = err_a
output_d['corr_a'] = corr_a
self._mu_a = paramf_unscale_a
self._err_a = err_a
self._corr_a = corr_a
for key in self._param_error_d:
self._param_error_d[key] = np.nan
for key,err_val in zip(free_params,err_a):
self._param_error_d[key] = err_val
model_cost_d = self.eval_model_costfun(paramf_unscale_a,
free_params=free_params,
full_output=True)
param_tbl = self.get_param_table(param_nm_a=free_params)
param_all_tbl = self.get_param_table(typ='all')
output_d['free_params'] = free_params
output_d['costval0'] = model_cost0_d['cost_val']
output_d['costdata0_df'] = model_cost0_d['cost_data_df']
output_d['param0_tbl'] = param0_tbl
output_d['costval'] = model_cost_d['cost_val']
output_d['costdata_df'] = model_cost_d['cost_data_df']
output_d['prior_df'] = model_cost_d['prior_df']
output_d['param_tbl'] = param_tbl
output_d['param_all_tbl'] = param_all_tbl
output_d['result'] = result
# output_d['param_d'] = copy.copy(self._param_d)
# output_d['param0_a'] = param0_a
# output_d['paramf_a'] = result.x
# output_d['param0_unscl_a'] = param0_unscale_a
# output_d['paramf_unscl_a'] = paramf_unscale_a
return output_d
pass
def posterior_draw(self, Ndraw=1):
param_free = self.get_param_names(typ='free')
mu_a = self._mu_a
err_a = self._err_a
corr_a = self._corr_a
err_scl_mat_a = core.make_scale_matrix(err_a)
cov_a = err_scl_mat_a*corr_a
pdraw_a = np.squeeze(random.multivariate_normal(mu_a, cov_a, Ndraw))
return pdraw_a
def posterior_rxn_bound(self, Tlims, conf_lvl=0.68, Ndraw=100,
Nsamp=30, sampfac=3, convert_units=True):
Nrxn = len(self.rxn_key)
Nsamptot = np.round(Nsamp*sampfac)
# T_bound_draw_a = np.zeros((Nrxn,Ndraw,Nsamptot))
P_bound_draw_a = np.zeros((Nrxn,Ndraw,Nsamptot))
T_a = np.linspace(Tlims[0],Tlims[1],Nsamptot)
free_param_nm_a= self.get_param_names(typ='free')
PT_triple_draw_a = np.zeros((2,Ndraw))
for ind in range(Ndraw):
pdraw_a = self.posterior_draw()
self.set_param_values(pdraw_a,free_param_nm_a)
for irxn,(rxn_eqn,rxn_obj) in enumerate(zip(self.rxn_key,self.rxn_l)):
iTP_bound_a = rxn_obj.trace_rxn_bound(Tlims=Tlims,Nsamp=Nsamp)
fun = interp.interp1d(iTP_bound_a[0],iTP_bound_a[1])
Pbnd_a = fun(T_a)
# T_bound_draw_a[irxn,ind,:] = T_a
P_bound_draw_a[irxn,ind,:] = Pbnd_a
T_tp,P_tp = self.rxn_l[0].get_simultaneous_rxn_cond(self.rxn_l[1])
PT_triple_draw_a[0,ind] = T_tp
PT_triple_draw_a[1,ind] = P_tp
if convert_units:
T_a -= 273.15
P_bound_draw_a/=1e3
PT_triple_draw_a[0] -= 273.15
PT_triple_draw_a[1] /= 1e3
posterior_rxn_d = {}
posterior_rxn_d['T_a'] = T_a
posterior_rxn_d['P_bound_draw_a'] = P_bound_draw_a
posterior_rxn_d['PT_triple_draw_a'] = PT_triple_draw_a
self.calc_rxn_bound_conf_lvl(posterior_rxn_d,conf_lvl=conf_lvl)
return posterior_rxn_d
def calc_rxn_bound_conf_lvl(self,posterior_rxn_d, conf_lvl=0.68):
T_a = posterior_rxn_d['T_a']
P_bound_draw_a = posterior_rxn_d['P_bound_draw_a']
PT_triple_draw_a = posterior_rxn_d['PT_triple_draw_a']
rxn_conf_bnd_a=np.percentile(P_bound_draw_a,
[50-0.5*100*conf_lvl,50+0.5*100*conf_lvl],
axis=1)
PT_triple_mean_a = np.mean(PT_triple_draw_a,axis=1)
PT_triple_cov_a = np.cov(PT_triple_draw_a)
posterior_rxn_d['rxn_conf_bnd_a'] = rxn_conf_bnd_a
posterior_rxn_d['PT_triple_mean_a'] = PT_triple_mean_a
posterior_rxn_d['PT_triple_cov_a'] = PT_triple_cov_a
pass
def scale_params(self, param_vals_scl_a, param_names_a):
param_scl_a = self.get_param_scl_values(param_names_a)
param_vals_a = param_vals_scl_a*param_scl_a
return param_vals_a
def unscale_params(self, param_vals_a, param_names_a):
param_scl_a = self.get_param_scl_values(param_names_a)
param_vals_scl_a = param_vals_a/param_scl_a
return param_vals_scl_a
def eval_model_costfun_scl(self, param_free_vals_scl_a, free_params=None):
if free_params is None:
free_params = self.get_param_names(typ='free')
# param_free_scl_a = thermoDB_mod.get_param_scl_values(free_params)
# param_free_vals_a = param_free_vals_scl_a*param_free_scl_a
param_free_vals_a = self.scale_params(param_free_vals_scl_a,free_params)
cost_fun = self.eval_model_costfun(param_free_vals_a,
free_params=free_params)
return cost_fun
def propagate_data_errors(self, param_free_vals_a, free_params=None):
if free_params is None:
free_params = self.get_param_names(typ='free')
if param_free_vals_a is None:
param_free_vals_a = self.get_param_values(free_params)
# Extract only trustworthy data
# if self._dat_trust_d is None:
# self._dat_trust_d = self.extract_trust_data()
self.set_param_values(param_free_vals_a,free_params)
# self.set_free_param_values(param_free_vals_a)
data_df = self.data_df
rxn_eqn_l = self._rxn_eqn_l
rxn_l = self.rxn_l
param_d = self._param_d
dat_d = self._dat_trust_d
# dat_d = self.extract_trust_data()
# print(P_a)
# print(P_a.reset_index())
Ndat = dat_d['P'].size
dVrxn_a = np.zeros(Ndat)
dSrxn_a = np.zeros(Ndat)
for ind,(rxn_eqn, rxn_obj) in enumerate(zip(rxn_eqn_l,rxn_l)):
msk_rxn = dat_d['rxn']==rxn_eqn
idVrxn_a = rxn_obj.get_rxn_volume(dat_d['T'][msk_rxn],
dat_d['P'][msk_rxn],
peratom=True )
idSrxn_a = rxn_obj.get_rxn_entropy(dat_d['T'][msk_rxn],
dat_d['P'][msk_rxn],
peratom=True )
dVrxn_a[msk_rxn] = idVrxn_a
dSrxn_a[msk_rxn] = idSrxn_a
sigG_trust_a = np.sqrt((dat_d['Perr']*dVrxn_a)**2+(dat_d['Terr']*dSrxn_a)**2)
return sigG_trust_a
def eval_model_costfun(self, param_free_vals_a, free_params=None,
full_output=False):
if free_params is None:
free_params = self.get_param_names(typ='free')
# Extract only trustworthy data
# if self._dat_trust_d is None:
# self._dat_trust_d = self.extract_trust_data()
self.set_param_values(param_free_vals_a,free_params)
# self.set_free_param_values(param_free_vals_a)
# Try to use precalculated approx Gibbs energy uncertainties
sigG_a = self._sigG_trust_a
if sigG_a is None:
sigG_a = self.propagate_data_errors(param_free_vals_a,
free_params=free_params)
self._sigG_trust_a = sigG_a
Gthresh_scl = self.Gthresh_scl
data_df = self.data_df
rxn_eqn_l = self._rxn_eqn_l
rxn_l = self.rxn_l
param_d = self._param_d
dat_d = self._dat_trust_d
# dat_d = self.extract_trust_data()
# print(P_a)
# print(P_a.reset_index())
Ndat = dat_d['P'].size
dGrxn_a = np.zeros(Ndat)
logGth_a = np.zeros(Ndat)
# dVrxn_a = np.zeros(Ndat)
# dSrxn_a = np.zeros(Ndat)
# get universal reaction parameters
alpha_t_all = param_d['alpha_t_rxn_all']
alpha_T_all = param_d['alpha_T_rxn_all']
alpha_H2O_all = param_d['alpha_H2O_rxn_all']
for ind,(rxn_eqn, rxn_obj) in enumerate(zip(rxn_eqn_l,rxn_l)):
msk_rxn = dat_d['rxn']==rxn_eqn
idGrxn_a = rxn_obj.get_rxn_gibbs_energy(dat_d['T'][msk_rxn],
dat_d['P'][msk_rxn],
peratom=True )
# idVrxn_a = rxn_obj.get_rxn_volume(dat_d['T'][msk_rxn],
# dat_d['P'][msk_rxn],
# peratom=True )
# idSrxn_a = rxn_obj.get_rxn_entropy(dat_d['T'][msk_rxn],
# dat_d['P'][msk_rxn],
# peratom=True )
# get reaction-specific parameters
alpha_0_rxn = param_d['alpha_0_rxn'+str(ind)]
dalpha_t_rxn = param_d['dalpha_t_rxn'+str(ind)]
dalpha_T_rxn = param_d['dalpha_T_rxn'+str(ind)]
dalpha_H2O_rxn = param_d['dalpha_H2O_rxn'+str(ind)]
logGth_a[msk_rxn] = alpha_0_rxn \
+ (alpha_t_all+dalpha_t_rxn)*dat_d['time'][msk_rxn] \
+ (alpha_T_all+dalpha_T_rxn)*dat_d['T'][msk_rxn] \
+ (alpha_H2O_all+dalpha_H2O_rxn)*dat_d['water'][msk_rxn]
dGrxn_a[msk_rxn] = idGrxn_a
# dVrxn_a[msk_rxn] = idVrxn_a
# dSrxn_a[msk_rxn] = idSrxn_a
Gth_a = Gthresh_scl*np.exp(logGth_a)
# sigG_a = np.sqrt((dat_d['Perr']*dVrxn_a)**2+(dat_d['Terr']*dSrxn_a)**2)
loglk_a = np.zeros(Gth_a.size)
for irxndir in self.rxn_dir_opt:
msk_rxndir = dat_d['rxn_dir']==irxndir
iNobs = np.sum(msk_rxndir==True)
if iNobs == 0:
continue
if irxndir=='FWD':
iloglk_a = self.logprob_fwd(dGrxn_a[msk_rxndir],
Gth_a[msk_rxndir],
sigG_a[msk_rxndir],
rxn_trans_typ=self.rxn_trans_typ)
elif irxndir=='REV':
iloglk_a = self.logprob_rev(dGrxn_a[msk_rxndir],
Gth_a[msk_rxndir],
sigG_a[msk_rxndir],
rxn_trans_typ=self.rxn_trans_typ)
elif irxndir=='FWD?':
iloglk_a = self.logprob_fwdq(dGrxn_a[msk_rxndir],
Gth_a[msk_rxndir],
sigG_a[msk_rxndir],
rxn_trans_typ=self.rxn_trans_typ)
if irxndir=='REV?':
iloglk_a = self.logprob_revq(dGrxn_a[msk_rxndir],
Gth_a[msk_rxndir],
sigG_a[msk_rxndir],
rxn_trans_typ=self.rxn_trans_typ)
elif irxndir=='NC':
iloglk_a = self.logprob_nc(dGrxn_a[msk_rxndir],
Gth_a[msk_rxndir],
sigG_a[msk_rxndir],
rxn_trans_typ=self.rxn_trans_typ)
loglk_a[msk_rxndir] = iloglk_a
msk_zeroprob_a = np.isinf(loglk_a) & (loglk_a<0)
logprior_a = self.eval_log_prior()
# logprob_a[msk_zeroprob_a] = -160
cost_val_a = np.hstack((-loglk_a,-logprior_a))
cost_val = np.sum(cost_val_a)
if full_output:
model_cost_d = {}
model_cost_d['cost_val'] = cost_val
logprior_a,prior_df = self.eval_log_prior(full_output=True)
cost_data_df = pd.DataFrame(dat_d)
cost_data_df['zeroprob'] = pd.Series(msk_zeroprob_a)
cost_data_df['log_lk'] = pd.Series(loglk_a)
# cost_data_df['PubID'] = pubid_a
# cost_data_df['run_num'] = run_num_a
# cost_data_df['cost_fun'] = cost_fun_a
# cost_data_df['rxn_dir'] = rxn_dir_a
cost_data_df['Gth'] = pd.Series(Gth_a)
cost_data_df['sigG'] = pd.Series(sigG_a)
cost_data_df['dGrxn'] = pd.Series(dGrxn_a)
cost_data_df['relG'] = pd.Series(dGrxn_a/sigG_a)
model_cost_d['cost_data_df'] = cost_data_df
model_cost_d['log_prior'] = pd.Series(logprior_a)
model_cost_d['prior_df'] = prior_df
return model_cost_d
else:
return cost_val
def eval_log_prior(self,typ='studentt',dof=5,full_output=False):
paramnm_s = self.exp_prior_df['Param']
trust_s = self.exp_prior_df['Trust']
val_data_s = self.exp_prior_df['Data']
err_s = self.exp_prior_df['Error']
log_prior_a = np.zeros(val_data_s.shape[0])
val_model_a = np.zeros(val_data_s.shape[0])
resid_a = np.zeros(val_data_s.shape[0])
for ind,(paramnm,trust,val_data,err) in \
enumerate(zip(paramnm_s,trust_s,val_data_s,err_s)):
val_mod = self.param_d[paramnm]
val_model_a[ind] = val_mod
x = (val_mod-val_data)/err
resid_a[ind] = x
if trust=='Yes':
log_prior_a[ind] = self.logprior_fun(x)
else:
log_prior_a[ind] = 0.0
if full_output:
# Get new dataframe by copything columns
prior_df = self.exp_prior_df[['Param','Abbrev']].copy()
prior_df['Data'] = self.exp_prior_df['Data']
prior_df['Error'] = self.exp_prior_df['Error']
prior_df['Model'] = pd.Series(val_model_a)
prior_df['Resid'] = pd.Series(resid_a)
prior_df['Trust'] = self.exp_prior_df['Trust']
prior_df['log_prior'] = pd.Series(log_prior_a)
return log_prior_a, prior_df
else:
return log_prior_a
#===================================================
class PhaseStabilityData:
def __init__(self, exp_data, phase_wt_comp, phase_symbol_key, modelDB,
T_units='K', P_units='bars'):
# self._prune_missing_phases(exp_dataphase_wt_comp, modelDB)
self._init_exps(exp_data, phase_wt_comp, phase_symbol_key, modelDB,
T_units, P_units)
def _init_exps(self, exp_data, phs_wt_comp, phase_symbol_key, modelDB,
T_units, P_units):
phase_symbol_key = chem.LEPR_phase_symbols #Jenna added this; to get
# this function to work properly, it seems like we should take output
# the phase_symbol_key as an input, and create a comprehensive lists
# of these lepr phase symbols?
phase_stability_exps=[]
exp_index_invalid = []
for exp_idx in exp_data.index:
iP = exp_data.loc[exp_idx, 'P']
iT = exp_data.loc[exp_idx, 'T']
phase_symbols = []
phase_names = []
phase_wt_oxide_comp = []
for key in phs_wt_comp:
iphs_wt_comp = phs_wt_comp[key]
if exp_idx in iphs_wt_comp.index:
phase_names.append(key)
phase_symbols.append(phase_symbol_key[key])
phase_wt_oxide_comp.append(iphs_wt_comp.loc[exp_idx].values)
phase_wt_oxide_comp = np.array(phase_wt_oxide_comp)
try:
phase_stability_exp = PhaseStabilityExp(
iP, iT, 0, phase_symbols, phase_wt_oxide_comp, modelDB,
T_units, P_units)
phase_stability_exps.append(phase_stability_exp)
except:
exp_index_invalid.append(exp_idx)
self._exp_index_invalid = exp_index_invalid
self._phase_stability_exps = phase_stability_exps
#phase_stability_info = OrderedDict()
#phase_stability_info['phase_symbols'] = self._phase_symbols = phase_symbols
#phase_stability_info['phases'] = phases
#phase_stability_info['phase_num'] = phase_num
#phase_stability_info['phase_wt_oxide_comp'] = phase_wt_oxide_comp
#phase_stability_info['phase_mol_oxide_comp'] = phase_mol_oxide_comp
#phase_stability_info['phase_mol_endmem_comp'] = phase_mol_endmem_comp
def calc_equil_rxn_affinities(self):
phase_stability_exps = self._phase_stability_exps
affinities = []
for iexp in phase_stability_exps:
iaffinity = iexp.calc_equil_rxn_affinities()
affinities.append(iaffinity)
return affinities
# def _prune_missing_phases(exp_data, phase_wt_comp, modelDB):
# self._modelDB = modelDB
# prune_phases = []
#
# for phase_sym in phase_wt_comp:
# try:
# phase = modelDB.get_phase(phase_sym)
#
# except:
# prune_phases.append(phase_sym)
# for idx in exp_data.index:
#===================================================
class PhaseStabilityExp:
"""
"""
def __init__(self, T, P, wt_oxide_comp,
phase_symbols, phase_wt_oxide_comp, modelDB,
T_err=None, P_err=None, wt_oxide_comp_err=None,
phase_wt_oxide_comp_err=None,
fO2_buffer=None, fO2_offset=0, fO2_err=None,
T_units='K', P_units='bars'):
self._init_exp_cond(P, T, P_err, T_err,
fO2_buffer, fO2_offset, fO2_err, T_units, P_units)
self._init_phase_info(phase_symbols, phase_wt_oxide_comp, modelDB)
self._init_equil_rxns()
# calculate rxns with svd
#self._init_absent_rxns()
def _init_exp_cond(self, T, P, T_err, P_err,
fO2_buffer, fO2_offset, fO2_err, T_units, P_units):
if T_units == 'C':
T = T + 273
if P_units == 'GPa':
P = P * 10000
P_err = P_err * 10000
self._P = P
self._T = T
self._P_err = P_err
self._T_err = T_err
self._fO2_buffer = fO2_buffer
self._fO2_offset = fO2_offset
self._fO2_err = fO2_err
def _init_phase_info(self, phase_symbols, phase_wt_oxide_comp, modelDB):
phase_wt_oxide_comp = np.array(phase_wt_oxide_comp)
phase_num = len(phase_symbols)
oxide_num = len(chem.OXIDE_ORDER)
assert phase_wt_oxide_comp.shape[0]==phase_num, (
'phase_wt_oxide_comp must define compositions '
'for every phase in phase_symbols.'
)
assert phase_wt_oxide_comp.shape[1]==oxide_num, (
'phase_wt_oxide_comp must define composition for every oxide '
'in standard order.'
)
phases = [modelDB.get_phase(phs_sym) for phs_sym in phase_symbols]
phase_mol_oxide_comp = chem.wt_to_mol_oxide(phase_wt_oxide_comp)
phase_mol_endmem_comp = {}
for phs_sym, phase, mol_oxide_comp in zip(
phase_symbols, phases, phase_mol_oxide_comp):
if phs_sym not in modelDB.phase_obj['pure']:
endmem_comp = phase.calc_endmember_comp(
mol_oxide_comp, method='intrinsic', output_residual=False,
normalize=True)
else:
continue
phase_mol_endmem_comp[phs_sym] = np.array(endmem_comp)
self._modelDB = modelDB
self._phase_symbols = phase_symbols
self._phases = phases
self._phase_num = phase_num
self._phase_wt_oxide_comp = phase_wt_oxide_comp
self._phase_mol_oxide_comp = phase_mol_oxide_comp
self._phase_mol_endmem_comp = phase_mol_endmem_comp
def _init_equil_rxns(self):
phases = self._phases
phase_symbols = self._phase_symbols
modelDB = self._modelDB
rxn_svd_props = chem.calc_reaction_svd(phase_symbols, TOLsvd=1e-4, modelDB=modelDB)
equil_rxn_coefs = rxn_svd_props['rxn_svd']
Nbasis=len(equil_rxn_coefs)
rxn_endmember_name = rxn_svd_props['all_endmember_name']
rxn_phase_symbols = rxn_svd_props['all_phase_symbol']
endmember_ids = rxn_svd_props['all_endmember_id']
equil_rxns = []
for irxn_coefs in equil_rxn_coefs:
irxn = modelDB.get_rxn(rxn_phase_symbols, endmember_ids, irxn_coefs, coefs_per_atom=True)
equil_rxns.append(irxn)
equil_rxn_num = len(equil_rxns)
all_endmem_names = rxn_endmember_name
self._all_endmem_names = all_endmem_names
self._equil_rxn_num = equil_rxn_num
self._equil_rxns = equil_rxns
self._equil_rxn_coefs = equil_rxn_coefs
def calc_equil_rxn_affinities(self):
phases = self._phases
equil_rxns = self._equil_rxns
equil_rxn_num = self._equil_rxn_num
T = self._T
P = self._P
phase_mol_endmem_comp = self._phase_mol_endmem_comp
# print(T)
# print(P)
# print(phase_mol_endmem_comp)
rxn_affinities = np.zeros(equil_rxn_num)
for ind, rxn in enumerate(equil_rxns):
rxn_affinities[ind] = rxn.affinity(T, P, mols=phase_mol_endmem_comp)
return rxn_affinities
def calc_absent_rxn_affinities(self):
raise NotImplimented()
#===================================================
class RxnData:
"""
TODO:
- filter trusted data
- get_data() - return only trusted info?
"""
RESULT_OPTS = ['+','-','=','+?','-?']
ERROR_DEFAULT = {'P':5, 'T':10, 'mol':1}
ERROR_TYPE = {'P':'%', 'T':'absolute', 'mol':'%'}
def __init__(self, input_data_sheets, error_default=None):
self._init_data_tables()
self.load_data(input_data_sheets)
def _init_data_tables(self):
self.reference = self._init_reference_data()
self.setup = self._init_setup_data()
self.conditions = self._init_conditions_data()
self.rxn = self._init_rxn_data()
self.comp = self._init_comp_data()
pass
def _init_reference_data(self):
reference = pd.DataFrame(columns=[
'pub_id', 'authors', 'date', 'title'])
return reference
def _init_setup_data(self):
setup = pd.DataFrame(columns=[
'pub_id','run_id',
'total_mass', 'contaminant_phases', 'contact_geom',
'container_aspect_ratio', 'grain_size', 'single_xtal_size',
'other_phase_frac', 'flux_type', 'flux_amt',
'init_reac_present', 'init_prod_present'])
return setup
def _init_conditions_data(self):
conditions = pd.DataFrame(columns=[
'P', 'P_err', 'T', 'T_err', 'equil_time', 'trust_conditions'])
return conditions
def _init_rxn_data(self):
rxn = pd.DataFrame(columns=[
'rxn_id', 'rxn_studied', 'init_rxn_progress', 'results', 'rxn_dir'])
return rxn
def _init_comp_data(self):
comp = OrderedDict()
return comp
def load_data(self, input_data):
self.input_data = input_data
self._load_reference_data(input_data)
self._load_setup_data(input_data)
self._load_conditions_data(input_data)
self._load_rxn_data(input_data)
pass
def _load_reference_data(self, input_data):
reference = self.reference
reference_cols = self.reference.columns
ref_data, ref_units = self._read_data_sheet(
input_data['reference'][reference_cols])
self.reference = reference.append(ref_data)[reference_cols]
pass
def _load_setup_data(self, input_data):
setup = self.setup
setup_cols = setup.columns
exp_conditions_cols = ['pub_id','run_id',
'container_aspect_ratio']
rxn_cols = ['contact_geom', 'total_mass',
'other_phase_frac', 'contaminant_phases',
'grain_size', 'single_xtal_size', 'flux_type', 'flux_amt',
'init_reac_present', 'init_prod_present']
isetup = pd.concat([
input_data['exp_conditions'][exp_conditions_cols],
input_data['rxn'][rxn_cols]], axis=1)
isetup_data, isetup_units = self._read_data_sheet(isetup)
self.setup = setup.append(isetup_data)[setup_cols]
pass
def _load_conditions_data(self, input_data):
conditions = self.conditions
conditions_cols = conditions.columns
iconditions_data, iconditions_units = self._read_data_sheet(
input_data['exp_conditions'][conditions_cols])
# Convert C to K
if iconditions_units['T'] == 'C':
iconditions_data['T'] += 273.15
if iconditions_units['P'] == 'kbar':
iconditions_data['P'] *= 1e3
iconditions_data['P_err'] *= 1e3
iconditions_data['trust_conditions'].fillna(value='Yes', inplace=True)
iconditions_data = self._set_default_error(iconditions_data)
self.conditions = conditions.append(iconditions_data)
pass
def _set_default_error(self, conditions_data):
ERROR_DEFAULT = self.ERROR_DEFAULT
ERROR_TYPE = self.ERROR_TYPE
for key in ['T', 'P']:
default_val = ERROR_DEFAULT[key]
default_typ = ERROR_TYPE[key]
# from IPython import embed; embed(); import ipdb; ipdb.set_trace()
mask = conditions_data[key+'_err'].isnull()
# print(mask)
if default_typ=='absolute':
ierr = default_val
elif default_typ=='%':
ival = conditions_data.loc[mask, key]
ival = np.maximum(ival, 1.0)
ierr = np.abs(ival*default_val/100)
conditions_data.loc[mask, key+'_err'] = ierr
return conditions_data
def _load_rxn_data(self, input_data):
rxn = self.rxn
rxn_cols = rxn.columns
rxn_input_cols = ['rxn_studied', 'init_rxn_progress', 'results']
rxn_input_data, rxn_input_units = self._read_data_sheet(
input_data['rxn'][rxn_input_cols])
# Redetermine rxn directions by combining ALL rxns reported
all_rxn_input_data = rxn[rxn_input_cols].append(rxn_input_data)
rxn_props, phases, rxn_ids = self._init_rxns(all_rxn_input_data)
rxn_dir = self._infer_rxn_dir(all_rxn_input_data, rxn_props, rxn_ids)
self.rxn = pd.concat((pd.Series(rxn_ids, name='rxn_id'),
all_rxn_input_data,
pd.Series(rxn_dir, name='rxn_dir')), axis=1)
# self.rxn = rxn.append(irxn)[rxn.columns]
self.rxn_num = len(rxn_props)
self.rxn_props = rxn_props
self.phases = phases
pass
def _load_comp_data(self, comp, input_data):
pass
def _init_phase_comp(self, component):
phase_comp = pd.DataFrame(columns=[
component, component+'_err', 'init_'+component,
'init_'+component+'_err'])
self.comp[component] = phase_comp
pass
def _read_data_sheet(self, sheet):
units = sheet.loc[0]
data = sheet.loc[1:].reset_index(drop=True)
return data, units
def _init_rxns(self, rxn):
rxn_eqns = rxn['rxn_studied'].unique()
rxn_props = []
all_phases = []
rxn_ids = np.zeros(rxn.shape[0], dtype=int)
for ind, irxn_eqn in enumerate(rxn_eqns):
imask = rxn['rxn_studied']==irxn_eqn
rxn_ids[imask] = ind
iphs, icoef = self.extract_rxn_coefs(irxn_eqn)
all_phases.append(iphs)
irxn_prop = OrderedDict({'phases': iphs, 'coefs': icoef, 'eqn':irxn_eqn})
rxn_props.append(irxn_prop)
phases = np.unique(all_phases)
return rxn_props, phases, rxn_ids
def _infer_rxn_dir(self, rxn, rxn_props, rxn_ids):
results = rxn['results']
rxn_dir = []
for iresult, irxn_id in zip(results, rxn_ids):
iphases, ichanges = self.extract_rxn_results(iresult)
ichange_dir = np.nan*np.ones(len(ichanges))
irxn_prop = rxn_props[irxn_id]
irxn_phases = irxn_prop['phases']
irxn_coefs = irxn_prop['coefs']
irxn_dir = np.zeros(len(ichanges))
irxn_certain = np.tile(False, len(ichanges))
for ind, (ijphs, ijchange) in enumerate(zip(iphases, ichanges)):
ijdir = np.nan
ijcertain = False
if ijchange == '+':
ijdir = +1
ijcertain = True
elif ijchange == '-':
ijdir = -1
ijcertain = True
elif ijchange == '+?':
ijdir = +1
ijcertain = False
elif ijchange == '-?':
ijdir = -1
ijcertain = False
elif (ijchange=='='):
ijdir = 0
ijcertain = True
elif (ijchange=='=?'):
ijdir = 0
ijcertain = False
else:
ijdir = np.nan
ijcertain = False
ijdir *= irxn_coefs[np.where(irxn_phases==ijphs)]
irxn_dir[ind] = ijdir
irxn_certain[ind] = ijcertain
if np.all(irxn_dir==-1):
if np.any(irxn_certain):
rxn_dir.append('REV')
else:
rxn_dir.append('REV?')
elif np.all(irxn_dir==+1):
if np.any(irxn_certain):
rxn_dir.append('FWD')
else:
rxn_dir.append('FWD?')
elif np.all(irxn_dir==0):
rxn_dir.append('NC')
else:
rxn_dir.append('BIASED')
rxn_dir = np.array(rxn_dir)
return rxn_dir
# for irxn_props in rxn_props:
# irxn_
# imask = df['rxn_studied']==irxn_eqn
# iresults = pd.DataFrame([pd.Series(df[imask]['results'].str.startswith(iphs),name=iphs)
# for iphs in irxn.phase_symbols]).T
#
# irxn_dir = np.dot(irxn.rxn_coefs,
# np.array([df[imask]['results'].str.startswith(iphs)
# for iphs in irxn.phase_symbols]))
# rxn_dir[imask] = irxn_dir
#
# df['rxn_dir'] = rxn_dir
return None
@classmethod
def read_phase_rev_data(cls, filenm, sheetname=None):
# Read file and concatenate all sheets
data_d = pd.read_excel(filenm,sheetname=sheetname)
# try to concatenate multiple sheets (if present)
try:
raw_df = pd.concat(data_d,ignore_index=True)
except:
raw_df = data_d
return raw_df
@classmethod
# Determine which values are actually bounds
def detect_bound(self, colnm, df):
msk_lo = df[colnm].astype(np.object).str.startswith('>').fillna(value=False).astype(bool)
msk_hi = df[colnm].astype(np.object).str.startswith('<').fillna(value=False).astype(bool)
# NOTE: It is crucial that bound is a series (not a numpy array), otherwise msk indexing will fail
bound_ser = pd.Series(np.tile('',msk_lo.size))
bound_ser[msk_lo] = 'lower'
bound_ser[msk_hi] = 'upper'
bound_df = pd.DataFrame()
bound_df[colnm+'_Bound'] =bound_ser
bound_df[colnm] = df[colnm].copy()
bound_df.loc[msk_lo,colnm] = df.loc[msk_lo,colnm].astype(np.object).str.slice(start=1).astype(np.float)
bound_df.loc[msk_hi,colnm] = df.loc[msk_hi,colnm].astype(np.object).str.slice(start=1).astype(np.float)
return bound_df
def filter_phase_rev_data(self, raw_df, mask_phs_l=None):
P_df = self.detect_bound('P',raw_df)
T_df = self.detect_bound('T',raw_df)
time_df = self.detect_bound('equil_time',raw_df)
rxn_df = pd.DataFrame()
# rxn_df['RxnPhases'] = raw_df[['rxn_studied']].applymap(get_reaction_str)
rxn_df['RxnPhases'] = raw_df[['rxn_studied']].applymap(model.Database._get_reaction_phase_str)
# Set Rxn equation as the most common string representaton in the raw database
RxnPhases_uniq = rxn_df['RxnPhases'].unique()
rxn_df['rxn_studied'] = raw_df['rxn_studied']
for rxn_phs_str in RxnPhases_uniq:
this_reaction = raw_df['rxn_studied'][rxn_df['RxnPhases']==rxn_phs_str]
this_reaction = this_reaction.str.strip()
# Store eqn only as most common variant
#NOTE iloc crucial to obtain just value (not series object)
rxn_df.loc[rxn_df['RxnPhases']==rxn_phs_str,'rxn_studied'] = this_reaction.mode().iloc[0]
rxn_d_l = []
phs_l = []
rxn_eqn_uniq = rxn_df['rxn_studied'].unique()
for rxn_eqn_str in rxn_eqn_uniq:
rxn_d = model.Database.parse_rxn( rxn_eqn_str )
#Rewrite Rxn equation using adopted rxn direction
rxn_df.loc[rxn_df['rxn_studied']==rxn_eqn_str,'rxn_studied'] = rxn_d['rxn_eqn']
# Remove masked phases
if mask_phs_l is not None:
curr_rxn_phs_l = []
curr_rxn_phs_l.extend(rxn_d['reac_l'])
curr_rxn_phs_l.extend(rxn_d['prod_l'])
disallowed_phs_l = np.intersect1d( curr_rxn_phs_l, mask_phs_l )
if len(disallowed_phs_l)>0:
continue
phs_l.extend( rxn_d['reac_l'] )
phs_l.extend( rxn_d['prod_l'] )
rxn_d_l.append( rxn_d )
# Remove masked phases
trust_ser = raw_df['trust'].fillna(value='Yes')
if mask_phs_l is not None:
# Remove from phase list
phs_l = np.setdiff1d( phs_l, mask_phs_l )
# set Trust variable to No for rxns involving masked phase
for mask_phs in mask_phs_l:
trust_ser[rxn_df['RxnPhases'].str.contains(mask_phs)] = 'No'
phs_uniq_l = np.unique( phs_l )
# Determine Reaction Direction: ['FWD','REV','NC','FWD?','REV?','INV']
rxn_dir_l = []
for result, rxn_phs_str in zip(raw_df['results'],rxn_df['RxnPhases']):
rxn_dir_l.append(model.Database._get_rxn_dir(rxn_phs_str, result))
# rxn_uniq = rxn_df['Rxn'].unique()
# print(rxn_uniq)
rxn_df['rxn_dir'] = pd.Series(rxn_dir_l)
# rxn_df['results'] = raw_df['results']
# result = rxn_df[['Rxn']].applymap(get_reaction_phase_str)
# print(get_reaction_phases(result.loc[0,'Rxn']))
# print(result)
# print(result['Rxn'].unique())
# rxn_df = pd.concat((rxn_df,pd.DataFrame(result['Rxn'].tolist(),columns=['Reac_l','Prod_l'])),axis=1)
# print(result.tolist())
# print(pd.DataFrame(result,columns=['Reac','Prod']))
dat_df = pd.concat((raw_df['pub_id'],raw_df['device'],
raw_df['run_id'],
time_df,raw_df['flux_amt'],
P_df,raw_df['P_err'],T_df,raw_df['T_err'],
rxn_df,trust_ser),axis=1)
return dat_df, rxn_d_l, phs_uniq_l
@classmethod
def extract_rxn_coefs(cls, rxn_eqn_str):
eqn_split = re.split('=', rxn_eqn_str)
assert len(eqn_split) - 1 == 1, (
'rxn_eqn_str must have exactly one = sign'
)
reac_str, prod_str = eqn_split
reac_str = str.strip(reac_str)
prod_str = str.strip(prod_str)
def extract_coefs(eqn_side_str):
eqn_terms_str = list(filter(None, re.split('[ +=]', eqn_side_str)))
coefs = []
phases_str = []
for iterm_str in eqn_terms_str:
match = re.match('^[0-9]', iterm_str)
if not match:
icoef = 1.0
else:
icoef = float(match.group())
coefs.append(icoef)
phases_str.append(re.split('^[0-9]', iterm_str)[-1])
coefs = np.array(coefs)
return phases_str, coefs
phases_str_reac, coefs_reac = extract_coefs(reac_str)
phases_str_prod, coefs_prod = extract_coefs(prod_str)
phases_str = np.hstack((phases_str_reac, phases_str_prod))
coefs = np.hstack((-coefs_reac, +coefs_prod))
return phases_str, coefs
@classmethod
def extract_rxn_results(cls, result):
result_terms = [str.strip(iresult) for iresult in str.split(result,';')]
phases = []
changes = []
for iterm in result_terms:
term_split = str.split(iterm, ' ')
assert len(term_split)==2, (
'phase and result symbols must be separated by a space.'
)
iphs, ichange = term_split
assert ichange in cls.RESULT_OPTS, (
'result is invalid. Every result must be selected from ' + str(cls.RESULT_OPTS)
)
phases.append(iphs)
changes.append(ichange)
return phases, changes
#===================================================
class ParamModel:
pass
#===================================================