"""The Model module of ThermoEngine implements a Python interface with the Phase
objective-C classes as well as the infrastructure for pure phase thermodynamic
calibration. The module contains methods that allow for loading and selection of built-in
thermodynamic databases.
"""
from thermoengine import core
from thermoengine import chem
from thermoengine import phases
DATADIR = 'data/databases'
import numpy as np
import pandas as pd
import re
from os import path
from collections import OrderedDict
import warnings
import deprecation
import sys
#warnings.filterwarnings('error', category=DeprecationWarning)
warnings.filterwarnings('module', category=DeprecationWarning)
__all__ = ['Database']
#===================================================
[docs]class Database:
"""
Thermodynamic database model object
Simple access to many competing thermodynamic database models. A wide
variety of published models are available. Models are defined using
low-level code together with list of implemented Pure and Solution phases.
Parameters
----------
database: {'Berman', 'Stixrude', 'HollandAndPowell', 'CoderModule'}
Chosen thermodynamic database (str ID)
liq_mod: {'v1.0', 'v1.1', 'v1.2', 'pMELTS'}
Chosen version of liquid model (str ID)
calib: {True, False}
Access calibration code or code optimized for speed (bool)
phase_tuple: {None}
A tuple that is set if database is 'CoderModule'. The first element is a
string corresponding to the name of the module. The second element is a
dictionary of abbreviations (keys) and a list of [ClassName, PhaseType]
(values) containing phases coded by the module. The abbreviations must
be consistent with the standard list in PurePhasdeList.csv or
SolutionPhaseList.csv. The ClassNames are implementation-specific.
PhaseType is either 'pure' or 'solution'.
Methods
-------
disable_gibbs_energy_reference_state
enable_gibbs_energy_reference_state
get_assemblage
get_phase
get_rxn
redox_buffer
redox_state
Attributes
----------
calib
coder_module
database
liq_mod
phase_info
phase_tuple
phases
Examples
--------
Retrieve a copy of the Berman/MELTS database.
>>> model.Database()
Retrieve a copy of the Stixrude database.
>>> model.Database(database='Stixrude')
Retrieve a copy of the *Berman* database generated by the Coder module in
ThermoEngine that contains the pure (stoichiometric) phase class
*Potassium_Feldspar* with code generation that includes parameter
calibration methods.
>>> modelDB = model.Database(database="CoderModule", calib=True,
phase_tuple=('berman', {'Or':['Potassium_Feldspar','pure']}))
Retrieve a copy of the *Simple_Solution* database generated by the coder
module in ThermoEngine that contains the solution phase class *Feldspar*
with code generation that includes parameter calibration methods.
>>> modelDB = model.Database(database="CoderModule", calib=True,
phase_tuple=('Simple_Solution', {'Fsp':['Feldspar','solution']}))
Notes
-----
The database maintains a single copy of each phase object, which is shared
among all processes. Thus, any changes to phase models (through
calibration, for example) automatically propagate across all uses of the
database.
Deprecated methods
- phase_attributes
- phase_details
- phase_obj
- phase_props
NEEDS UPDATING
- disable_gibbs_energy_reference_state
- enable_gibbs_energy_reference_state
"""
def __init__(self, database='Berman', liq_mod='v1.0', calib=True,
phase_tuple=None):
# Store database name at top-level for convenience
# load active phase list
# self._init_phase_details()
# self._init_active_phases(liq_mod=liq_mod)
# self._init_phase_attributes()
# Read in allowable phases
global_phase_info, global_info_files = phases.get_phase_info()
if phase_tuple and database == 'CoderModule':
coder_module = phase_tuple[0]
phase_dict = phase_tuple[1]
else:
coder_module = None
phase_dict = None
active_pure_filename, active_pure_phases = (
self._load_validate_info('PurePhases.csv', database, 'pure',
global_phase_info, global_info_files,
phase_dict=phase_dict)
)
active_soln_filename, active_soln_phases = (
self._load_validate_info('SolutionPhases.csv', database, 'solution',
global_phase_info, global_info_files,
phase_dict=phase_dict)
)
objDB = self._init_database(database, 'solution', active_soln_phases,
liq_mod=liq_mod, calib=calib,
coder_module=coder_module)
objDB = self._init_database(database, 'pure', active_pure_phases,
liq_mod=liq_mod, objDB=objDB, calib=calib,
coder_module=coder_module)
phase_info = pd.DataFrame(columns=['abbrev', 'phase_name', 'formula',
'phase_type', 'endmember_num'])
for iabbrev, iphs in objDB.items():
iphs_info = {
'abbrev': iphs.abbrev,
'phase_name': iphs.phase_name,
'formula': iphs.formula,
'phase_type': iphs.phase_type,
'endmember_num': iphs.endmember_num
}
phase_info = phase_info.append(iphs_info, ignore_index=True)
self._database = database
self._liq_mod = liq_mod
self._phases = objDB
self._phase_info = phase_info
self._calib = calib
self._coder_module = coder_module
self._phase_tuple = phase_tuple
pass
def _load_validate_info(self, basename, database, phase_type,
global_phase_info, global_info_files,
phase_dict=None):
# load
phase_info = global_phase_info[phase_type]
if database != 'CoderModule':
active_phases, active_filename = self._read_database_phase_file(
basename, database)
elif phase_dict:
data = []
for key,value in phase_dict.items():
if value[1] == phase_type:
data.append([key, value[0], None])
active_phases = pd.DataFrame(data,
columns=['Abbrev', 'ClassName', 'FormulaOveride'])
active_filename = 'CoderModule'
return active_filename, active_phases
else:
print ('If database is set to "CoderModule" then the second ' +
'argument of "phase_tuple" must contain a dictionary of ' +
'phase Abbreviations:ClassNames that are available in the ' +
'module.')
# validate
abbrev_valid = active_phases['Abbrev'].isin(phase_info['Abbrev'])
err_msg = (
'The {phase_type} phase library defined in '
'{filename} contains some invalid phase '
'abbreviations, shown below: \n\n'
'{invalid_phases}\n\n'
'Check that the abbreviations conform to the '
'list given in: "{phase_info_file}"')
invalid_phases = str(active_phases[~abbrev_valid])
phase_info_file = global_info_files[phase_type]
assert abbrev_valid.all(), (
err_msg.format(phase_type=phase_type,
filename=active_filename,
invalid_phases=invalid_phases,
phase_info_file=phase_info_file)
)
return active_filename, active_phases
def _read_database_phase_file(self, basefilenm, database):
# database = self.phase_details['database']
# phase_details = self.phase_details
parentpath = path.dirname(__file__)
filename = database+basefilenm
pathname = path.join(parentpath, DATADIR, filename)
active_phases = pd.read_csv(pathname)
return active_phases, pathname
def _init_database(self, database, phase_type, active_phases,
fixH2O=True, liq_mod=None, objDB=None, calib=None,
coder_module=None):
# Create and store class for each phase
# propsDB = OrderedDict()
if objDB is None:
objDB = OrderedDict()
phase_ptr = None
for indx, phase_info in active_phases.iterrows():
abbrev = phase_info['Abbrev']
classnm = phase_info['ClassName']
if database == 'CoderModule':
phase_ptr = self._init_phase_coder(coder_module, classnm,
abbrev, phase_type, calib=calib)
else:
phase_ptr = self._init_phase_obj(classnm+database, abbrev,
phase_type, calib=calib)
try:
objDB[abbrev] = phase_ptr
except:
print('{classnm} is not a valid ClassName for '
'the {database} database.'.\
format(classnm=classnm, database=database))
if database != 'CoderModule':
objDB = self._load_special_phases(phase_type, fixH2O, liq_mod, objDB)
return objDB
def _load_special_phases(self, phase_type, fixH2O, liq_mod, objDB):
if fixH2O and (phase_type=='pure'):
H2O_classnm = 'GenericH2O'
H2O_phase_obj = self._init_phase_obj(
H2O_classnm, 'H2O', phase_type, calib=False)
# propsDB['H2O'] = H2O_phase_obj.props
objDB['H2O'] = H2O_phase_obj
if (liq_mod is not None) and (phase_type=='solution'):
if liq_mod=='v1.0':
liq_classnm = 'LiquidMelts'
elif liq_mod=='v1.1':
liq_classnm = 'LiquidMeltsPlusOldH2OandNewCO2'
elif liq_mod=='v1.2':
liq_classnm = 'LiquidMeltsPlusCO2'
elif liq_mod=='pMELTS':
liq_classnm = 'LiquidpMelts'
else:
assert False, (
'Chosen liq_mod is not valid'
)
liq_phase_obj = self._init_phase_obj(
liq_classnm, 'Liq', phase_type, calib=True)
# propsDB['Liq'] = liq_phase_obj.props
objDB['Liq'] = liq_phase_obj
if (phase_type=='solution'):
alloy_solid_classnm = 'AlloySolid'
alloy_liquid_classnm = 'AlloyLiquid'
MtlS_phase_obj = self._init_phase_obj(
alloy_solid_classnm, 'MtlS', phase_type, calib=False)
MtlL_phase_obj = self._init_phase_obj(
alloy_liquid_classnm, 'MtlL', phase_type, calib=False)
# propsDB['H2O'] = H2O_phase_obj.props
objDB['MtlS'] = MtlS_phase_obj
objDB['MtlL'] = MtlL_phase_obj
# return propsDB, objDB
return objDB
#########################################################
def _init_phase_obj(self, classname, abbrev, phase_type, calib=None):
if phase_type=='pure':
if calib is None:
phase_obj = phases.PurePhase(classname, abbrev)
else:
phase_obj = phases.PurePhase(classname, abbrev, calib=calib)
elif phase_type=='solution':
if calib is None:
phase_obj = phases.SolutionPhase(classname, abbrev)
else:
phase_obj = phases.SolutionPhase(classname, abbrev, calib=calib)
return phase_obj
def _init_phase_coder(self, coder_module, classnm, abbrev, phase_type,
calib=False):
ptr_name = 'cy_' + classnm + '_' + coder_module.split('.')[-1] + '_' + (
'calib_' if calib else '')
if phase_type=='pure':
if calib is None:
phase_ptr = phases.PurePhase(ptr_name, abbrev, source='coder',
coder_module=coder_module)
else:
phase_ptr = phases.PurePhase(ptr_name, abbrev, calib=calib,
source='coder', coder_module=coder_module)
elif phase_type=='solution':
if calib is None:
phase_ptr = phases.SolutionPhase(ptr_name, abbrev,
source='coder', coder_module=coder_module)
else:
phase_ptr = phases.SolutionPhase(ptr_name, abbrev, calib=calib,
source='coder', coder_module=coder_module)
return phase_ptr
@property
def liq_mod(self):
"""
Name of current liquid model
Refers to version of MELTS.
"""
return self._liq_mod
@property
def database(self):
"""
Name of current database
"""
return self._database
@property
def phases(self):
"""
Dictionary of phase objects
Phase objects stored under official abbreviation. See phase_info for
information on each phase.
"""
return self._phases
@property
def phase_info(self):
"""
Phase info table for all members of current database
Basic phase information stored in pandas dataframe with columns:
'abbrev' - Official phase abbreviation
'phase_name' - Full phase name
'formula' - Chemical formula (or generic formula for solution phases)
'phase_type' - Solution or pure
'endmember_num' - Number of endmembers (1 for pure phases)
"""
return self._phase_info
@property
def calib(self):
"""
The code base for this phase implements model calibration functions.
Returns
-------
bool
"""
return self._calib
@property
def coder_module(self):
"""
Module name of Coder-generated database
Returns
-------
str
Name of Coder module
"""
return self._coder_module
@property
def phase_tuple(self):
"""
Dictionary of phases in a Coder-generated database
Returns
-------
tuple of strs
(1) str of coder_module
(2) dict of phases coded in the coder_module. *key* is abbreviation.
*value* is a two-component list of phase_name and phase_type;
*phase_type* is either 'pure' or 'solution'.
"""
return self._phase_tuple
@property
@deprecation.deprecated(
deprecated_in="1.0", removed_in="2.0",
details="Use phase_info instead.")
def phase_details(self):
# deprecated_in="1.0", removed_in="2.0",
# current_version=__version__,
# warnings.warn(
# "phase_details is deprecated, use phase_info instead.",
# DeprecationWarning)
pass
@property
@deprecation.deprecated(
deprecated_in="1.0", removed_in="2.0",
details=(
"For basic phase properties, use phase_info instead.\n"
"For detailed phase properties, retrieve them directly "
"from the desired phase object stored in phases." ))
def phase_attributes(self):
# warnings.warn((
# "'phase_attributes' is deprecated.\n"
# "For basic phase properties, use 'phase_info' instead.\n"
# "For detailed phase properties, retrieve them directly "
# "from the desired phase object stored in 'phases'."),
# DeprecationWarning)
pass
@property
@deprecation.deprecated(
deprecated_in="1.0", removed_in="2.0",
details=(
"Direct access to the non-Python phase calculation object is "
"not recommended. Instead, use the Python interface provided by "
"the desired phase stored in phases."))
def phase_obj(self):
# warnings.warn((
# "'phase_obj' is deprecated.\n"
# "Direct access to the non-python phase calculation object is "
# "not recommended. Instead, use the python interface provided by "
# "the desired phase stored in 'phases'."),
# DeprecationWarning)
pass
@property
@deprecation.deprecated(
deprecated_in="1.0", removed_in="2.0",
details=(
"Instead retrieve properties directly "
"from the desired phase object stored in phases."))
def phase_props(self):
# warnings.warn((
# "'phase_props' is deprecated.\n"
# "Instead retrieve properties directly "
# "from the desired phase object stored in 'phases'."),
# DeprecationWarning)
pass
##################
[docs] def enable_gibbs_energy_reference_state(self):
"""
Enable Helgeson convention of Gibbs energy.
Use Helgeson (SUPCRT) convention of Gibbs free energy
of formation rather than enthalpy of formation at Tr, Pr.
"""
# call method on any phase class (automatically applied to all)
# next(iter(self.phases.values()))
for phase in self.phases.values():
phase.enable_gibbs_energy_reference_state()
[docs] def disable_gibbs_energy_reference_state(self):
"""
Disable Helgeson convention of Gibbs energy.
Use standard enthalpy of formation at Tr, Pr as reference, rather
than Helgeson (SUPCRT) convention of Gibbs free energy of formation.
"""
# call method on any phase class (automatically applied to all)
# next(iter(self.phases.values()))
for phase in self.phases.values():
phase.disable_gibbs_energy_reference_state()
[docs] def get_phase_obj(self, phasesym_l):
"""
Get a phase that is coded in Objective-C by symbol.
Parameters
----------
phasesym_l : []
A list of abbreviations of desired phases.
Returns
-------
phase_obj_l
List of phase objects corresponding to desired phases.
"""
phase_details = self.phase_details
phase_obj_l = []
for phasesym in phasesym_l:
try:
# phase_obj_l.append(phase_details['purephase_obj_d'][phasesym])
phase_obj_l.append(phase_details['pure_props'][phasesym]['obj'])
except:
assert False, '"'+phasesym+'" is not a valid phase abbreviation. Try again.'
return phase_obj_l
[docs] def get_phase(self, phase_symbol):
"""
Get phase from current database by symbol.
Parameters
----------
phase_symbol : str
Abbreviation of desired phase. Must use correct ThermoEngine format.
Returns
-------
phase : obj
Phase object corresponding to desired phase.
Notes
-----
The phase object retrieved by this method remains tied to the Database
(i.e., it points to a single copy stored in the Database).
Any changes made to this phase (e.g., through calibration) thus
propagate to the entire Database.
Examples
--------
phase = modelDB.get_phase('Aeg')
sym_list = ['Aeg', 'Ky', 'Sil', 'Qtz']
phase_list = [modelDB.get_phase(sym) for sym in sym_list]
"""
# warn_val = warnings.warn(
# "phase_details is deprecated, use phase_info instead.",
# DeprecationWarning)
phases = self.phases
try:
return phases[phase_symbol]
except KeyError:
err_msg = (
'"{phase_symbol}" is not a valid '
'phase symbol for this database. '
'Select from the following available '
'phase symbol keys: {allowedsyms}'
)
phase_list = (self.phase_tuple[1]).keys()
raise KeyError(
err_msg.format(
phase_symbol=phase_symbol,
allowedsyms=phase_list)
)
[docs] def warn_test(self):
"""
This method is called when some function is deprecated.
"""
warnings.filterwarnings('error', category=DeprecationWarning)
print('hello')
warnings.warn("Big old warning test", DeprecationWarning)
print(sys.stderr)
print('world')
[docs] def get_assemblage(self, phase_symbols):
"""
Get phase assemblage from current database by symbol.
An assemblage represents a set of coexisting phases.
Parameters
----------
phase_symbols : list of strings
List of abbreviations of coexisting phases.
Must use correct ThermoEngine format.
Returns
-------
assemblage : obj
Assemblage object corresponding to coexisting phases.
Examples
--------
sym_list = ['Ky', 'Sil', 'Qtz']
assemblage = modelDB.get_assemblage(sym_list)
"""
phase_objs = []
for phase_symbol in phase_symbols:
phase_obj = self.get_phase(phase_symbol)
phase_objs.append(phase_obj)
assemblage = phases.Assemblage(phase_objs, obj_is_classnm=False)
return assemblage
[docs] def get_rxn(self, phase_symbols, endmember_ids, rxn_coefs,
coefs_per_atom=False):
"""
Get an endmember reaction from current database.
A reaction is represented as a stoichiometrically balanced
exchange of atoms between a set of endmember (or pure) phases.
Parameters
----------
phase_symbols : list of strings
List of abbreviations of reacting phases
endmember_ids : list of ints
List of integers representing endmember ID number for each phase
rxn_coefs : array
Array of reaction coefficients. Positive values are products;
negative values are reactants. Coefficients should be
stoichiometrically balanced.
coefs_per_atom : bool, default False
If False, rxn coefficients are defined per formula unit of each
endmember.
If True, coefficients are given on per atom basis. Thus they are
independent of the formula unit definition.
This is useful to avoid mistakes for phases with multiple standard
formula definitions (e.g., En is often given as MgSiO3 OR Mg2Si2O6).
Returns
-------
rxn : obj
rxn object for reacting endmember phases
Examples
--------
phase_symbols = ['Per', 'Qz', 'Cpx']
endmember_ids = [0, 0, 1]
rxn_coefs = [2, 2, 1]
rxn = modelDB.get_rxn(phase_symbols, endmember_ids, rxn_coefs)
# rxn_coefs defined per atom basis
rxn_coefs = [2, 3, 5]
rxn = modelDB.get_rxn(phase_symbols, endmember_ids, rxn_coefs,
coefs_per_atom=True)
"""
assert (
(len(endmember_ids) == len(phase_symbols)) and (len(phase_symbols) == len(rxn_coefs))), 'phase_symbols, endmember_ids, and rxn_coefs must all be equal'
phase_objs = []
for phase_symbol in phase_symbols:
phase_objs.append(self.get_phase(phase_symbol))
# from IPython import embed;embed();import ipdb;ipdb.set_trace()
rxn = phases.Rxn(phase_objs, endmember_ids, rxn_coefs,
coefs_per_atom=coefs_per_atom)
return rxn
def _redox_state_Kress91(self, T, P, oxide_comp, logfO2=None):
"""
Fe redox model of Kress and Carmichael 1991
Calculate ln(Fe2O3/FeO) ratio given lnfO2, T, P, bulk composition.
Alternatively, can predict lnfO2 values given measured ferric & ferrous comp.
Parameters
----------
T : double (array)
temperature in Kelvin
P : double (array)
pressure in bars
oxide_comp : double array (matrix)
molar oxide composition in standard order. Either measured FeO and Fe2O3 are
provided, or total iron reported as FeO (e.g. FeO*)
logfO2 : double (array), default None
If provided, the measured logfO2 value is used to predict the ln(Fe2O3/FeO).
Otherwise, reported FeO and Fe2O3 values are used to predict logfO2.
Returns
-------
output : double (array)
Output depends on whether logfO2 values are provided.
ln_Fe_oxide_ratio : If logfO2 values are given, return log ferric/ferrous ratio of melt.
logfO2 : If not, return predicted logfO2, given measured ferric and ferrous content of melt.
"""
predict_fO2 = False
if logfO2 is None:
predict_fO2 = True
OXIDES = chem.OXIDE_ORDER
# ['SiO2', 'TiO2', 'Al2O3', 'Fe2O3', 'Cr2O3', 'FeO', 'MnO', 'MgO',
# 'NiO', 'CoO', 'CaO', 'Na2O', 'K2O', 'P2O5', 'H2O', 'CO2']
T0 = 1673.15 # K
a = 0.196
b = 1.1492e4 # K
c = -6.675
e = -3.364
f = -7.01e-7 * 1.0e5 # K/bar
g = -1.54e-10 * 1.0e5 # 1/bar
h = 3.85e-17 * 1.0e5 * 1.0e5 # K/bar^2
# dAl2O3 = -2.243
# dFeO = -1.828
# dCaO = 3.201
# dNa2O = 5.854
# dK2O = 6.215
mol_oxides = np.array(oxide_comp.copy())
# from IPython import embed; embed()
XFeO_equiv = mol_oxides[:, OXIDES=='FeO'] + 2*mol_oxides[:, OXIDES=='Fe2O3']
# print(mol_oxides.shape)
# print(XFeO_equiv.shape)
if predict_fO2:
ln_Fe_oxide_ratio = np.squeeze(np.log(mol_oxides[:, OXIDES=='Fe2O3']/mol_oxides[:, OXIDES=='FeO']))
# display(ln_Fe_oxide_ratio)
mol_oxides[:, OXIDES=='FeO'] = XFeO_equiv
mol_oxides[:, OXIDES=='Fe2O3'] = 0.0
if mol_oxides.ndim==2:
mol_oxide_tot = np.sum(mol_oxides, axis=1)
mol_oxides /= mol_oxide_tot[:,np.newaxis]
elif mol_oxides.ndim==1:
mol_oxide_tot = np.sum(mol_oxides)
mol_oxides /= mol_oxide_tot
else:
assert False, 'mol_oxides must be either an array of compositions, or a matrix for many experiments'
d = np.zeros(len(OXIDES))
d[OXIDES=='Al2O3'] = -2.243
d[OXIDES=='FeO'] = -1.828
d[OXIDES=='CaO'] = +3.201
d[OXIDES=='Na2O'] = +5.854
d[OXIDES=='K2O'] = +6.215
atm_terms = b/T + c + e*(1.0-T0/T - np.log(T/T0))
press_terms = f*P/T + g*(T-T0)*P/T+ h*P*P/T
comp_terms = np.dot(mol_oxides, d)
if not predict_fO2:
lnfO2 = logfO2/np.log10(np.exp(1))
ln_Fe_oxide_ratio = a*lnfO2 + atm_terms + press_terms + comp_terms
return ln_Fe_oxide_ratio
else:
# print(ln_Fe_oxide_ratio )
# print((atm_terms + press_terms + comp_terms))
lnfO2 = (ln_Fe_oxide_ratio -
(atm_terms + press_terms + comp_terms))/a
logfO2 = lnfO2*np.log10(np.exp(1))
return logfO2
[docs] def redox_state(self, T, P, oxide_comp=None, logfO2=None,
phase_of_interest=None, method=None):
"""
Parameters
----------
T : array-like
Temperature in Kelvin.
P : array-like
Pressure in bars.
oxide_comp : dict of arrays, optional
Molar oxide composition of each phase.
logfO2 : double (array), optional
Base 10 logfO2 values with fO2 in bars
phase_of_interest : {'Liq', 'Spl'}
Abbreviation of redox-sensitive phase used to determine
redox state.
method : {'consistent', 'coexist', 'stoich', 'Kress91'}
'consistent' :
'coexist' :
'stoich' :
'Kress91' :
"""
def not_implemented_error(method, phase_of_interest):
raise NotImplementedError(
'The method "'+method+'" is not implemented ' +
'for phase_of_interest "'+phase_of_interest+'".')
output = None
if phase_of_interest=='Liq':
if method=='Kress91':
if oxide_comp['Liq'].ndim==1:
oxide_comp['Liq'] = oxide_comp['Liq'][np.newaxis, :]
liq_oxide_comp = oxide_comp['Liq']
# print(liq_oxide_comp.shape)
output = self._redox_state_Kress91(
T, P, liq_oxide_comp, logfO2=logfO2)
if logfO2 is None:
logfO2 = output
else:
ln_Fe_oxide_ratio = output
Fe_oxide_ratio = np.exp(ln_Fe_oxide_ratio)
ind_FeO = np.where(chem.OXIDE_ORDER=='FeO')[0][0]
ind_Fe2O3 = np.where(chem.OXIDE_ORDER=='Fe2O3')[0][0]
XFeO = liq_oxide_comp[:, ind_FeO]
XFe2O3 = liq_oxide_comp[:, ind_Fe2O3]
XFeOs = XFeO + 2*XFe2O3
XFeO = XFeOs/(1+2*Fe_oxide_ratio)
XFe2O3 = 0.5*(XFeOs - XFeO)
oxide_comp['Liq'][:, ind_FeO] = XFeO
oxide_comp['Liq'][:, ind_Fe2O3] = XFe2O3
else:
not_implemented_error(method, phase_of_interest)
elif phase_of_interest=='Spl':
not_implemented_error(method, phase_of_interest)
else:
not_implemented_error(method, phase_of_interest)
return output
[docs] def redox_buffer(self, T, P, buffer=None, method=None,
ignore_lims=True):
"""
Calculate logfO2 values for common oxygen buffers.
Parameters
----------
T : double (array)
Temperature in Kelvin
P : double (array)
Pressure in bars
buffer : {'IW', 'IM', 'NNO', 'Co+CoO', 'Cu+Cu2O', ('HM'/'MH'),
('MW'/'WM'), 'MMO', 'CCO', ('QFM'/'FMQ'), 'QIF'}
Models of common oxygen fugacity buffer systems with sources.
'IW' : Iron-Wustite [1?,2,3]
'IM' : Iron-Magnetite [1?,3]
'NNO' : Nickel-Nickel Oxide [2,3]
'Co+CoO' : Cobalt-Cobalt Oxide [3]
'Cu+Cu2O' : Copper-Copper Oxide [2]
'HM' or 'MH' : Magnetite-Hematite [1,2,3]
'MW' or 'WM' : Magnetite-Wustite [1?,2,3]
'MMO' : MnO-Mn3O4 [2]
'CCO' : Graphite-CO-CO2 [2]
'QFM' or 'FMQ' : Quartz-Fayalite-Magnetite [1,2,3]
'QIF' : Quartz-Iron-Fayalite [1?,3]
method : {'consistent', 'LEPR', 'Frost1991'}
Method used to calculate redox buffer value. Default depends on
buffer availability (see above), in preference order
{'consistent', 'LEPR', 'Frost1991'}.
[1] 'consistent' : Directly evaluate chemical potentials for each
phase using current thermodynamic database
[2] 'LEPR' : Empirical expressions constructed for LEPR database
(Hirschmann et al. 2008) based on previously published studies
[3] 'Frost91' : Empirical expressions from review article
Frost 1991
Returns
-------
logfO2 : double (array)
Base 10 logfO2 values with fO2 in bars
Publication Sources
-------------------
[1] consistent method based on MELTS thermodynamic database,
incorporating Berman 1988 and ???
[2] M. Hirschmann et al. (2008) Library of Experimental Phase Relations
(LEPR): A database and Web portal for experimental magmatic
phase equilibria data
[3] B. R. Frost (1991) Introduction to oxygen fugacity and
its petrologic importance
"""
BUFFER_OPTS = ['IW', 'IM', 'NNO', 'Co+CoO', 'Cu+Cu2O', 'HM', 'MH',
'MW', 'WM', 'MMO', 'CCO', 'QFM', 'FMQ', 'QIF']
assert buffer in BUFFER_OPTS, (
'Selected buffer ' + buffer + ' is not available. Please select from ' + str(BUFFER_OPTS)
)
def not_implemented_error(method, buffer):
raise NotImplementedError(
'The method "'+method+'" is not implimented ' +
'for the redox buffer "'+buffer+'".')
if buffer=='IW':
method = 'LEPR' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
logfO2 = self._empirical_redox_buffer(
T, P, A=-28776.8, B=14.057, C=.055, D=-.8853)
elif method=='Frost91':
logfO2 = self._empirical_redox_buffer(
T, P, A=-27489, B=6.702, C=.055, ignore_lims=ignore_lims,
Tlims=np.array([565, 1200])+273.15)
else:
not_implemented_error(method, buffer)
elif buffer=='IM':
method = 'Frost91' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
not_implemented_error(method, buffer)
elif method=='Frost91':
logfO2 = self._empirical_redox_buffer(
T, P, A=-28690.6, B=8.13, C=.056,
ignore_lims=ignore_lims,
Tlims=np.array([300, 565])+273.15)
else:
not_implemented_error(method, buffer)
elif buffer=='NNO':
method = 'LEPR' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
logfO2 = self._empirical_redox_buffer(
T, P, A=-25018.7, B=12.981, C=0.046, D=-0.5117)
elif method=='Frost91':
logfO2 = self._empirical_redox_buffer(
T, P, A=-24930, B=9.36, C=.046, ignore_lims=ignore_lims,
Tlims=np.array([600, 1200])+273.15)
else:
not_implemented_error(method, buffer)
elif buffer=='Co+CoO':
method = 'Frost91' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
not_implemented_error(method, buffer)
elif method=='Frost91':
logfO2 = self._empirical_redox_buffer(
T, P, A=-24332.6, B=7.295, C=0.052,
ignore_lims=ignore_lims,
Tlims=np.array([600, 1200])+273.15)
else:
not_implemented_error(method, buffer)
elif buffer=='Cu+Cu2O':
method = 'LEPR' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
logfO2 = self._empirical_redox_buffer(
T, P, A=-18162.2, B=12.855, C=0, D=-.6741)
elif method=='Frost91':
not_implemented_error(method, buffer)
else:
not_implemented_error(method, buffer)
elif buffer in ['HM', 'MH']:
method = 'consistent' if method is None else method
if method=='consistent':
logfO2 = self._consistent_redox_buffer_HM(T, P)
elif method=='LEPR':
logfO2 = self._empirical_redox_buffer(
T, P, A=-25632, B=14.620, C=0.019)
elif method=='Frost91':
logfO2_T1 = self._empirical_redox_buffer(
T, P, A=-25497.5, B=14.33, C=.019,
ignore_lims=ignore_lims,
Tlims=np.array([300, 573])+273.15)
logfO2_T2 = self._empirical_redox_buffer(
T, P, A=-26452.6, B=15.455, C=.019,
Tlims=np.array([573, 682])+273.15,
ignore_lims=ignore_lims)
logfO2_T3 = self._empirical_redox_buffer(
T, P, A=-25700.6, B=14.558, C=.019,
Tlims=np.array([682, 1100])+273.15,
ignore_lims=ignore_lims)
logfO2 = np.vstack((logfO2_T1, logfO2_T2, logfO2_T3))
logfO2 = np.nanmean(logfO2, axis=0)
else:
not_implemented_error(method, buffer)
elif buffer in ['MW', 'WM']:
method = 'LEPR' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
logfO2 = self._empirical_redox_buffer(
T, P, A=-30396, B=-3.427, C=0.083, D=2.0236)
elif method=='Frost91':
logfO2 = self._empirical_redox_buffer(
T, P, A=-32807, B=13.012, C=.083,
Tlims=np.array([565, 1200])+273.15,
ignore_lims=ignore_lims)
else:
not_implemented_error(method, buffer)
elif buffer=='MMO':
method = 'LEPR' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
logfO2 = self._empirical_redox_buffer(
T, P, A=-29420.7, B=92.025, C=0, D=-11.517)
elif method=='Frost91':
not_implemented_error(method, buffer)
else:
not_implemented_error(method, buffer)
elif buffer=='CCO':
method = 'LEPR' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
logfO2 = self._empirical_redox_buffer(
T, P, A=-21803, B=4.325, C=0.171)
elif method=='Frost91':
not_implemented_error(method, buffer)
else:
not_implemented_error(method, buffer)
elif buffer in ['QFM', 'FMQ']:
method = 'consistent' if method is None else method
if method=='consistent':
logfO2 = self._consistent_redox_buffer_QFM(T, P)
elif method=='LEPR':
logfO2 = self._empirical_redox_buffer(
T, P, A=-30686, B=82.760, C=.094, D=-10.620)
elif method=='Frost91':
logfO2 = self._empirical_redox_buffer(
T, P, A=0, B=0, C=0, ignore_lims=ignore_lims,
Tlims=np.array([0, 0])+273.15)
logfO2_T1 = self._empirical_redox_buffer(
T, P, A=-26455.3, B=10.344, C=.092,
Tlims=np.array([400, 573])+273.15,
ignore_lims=ignore_lims)
logfO2_T2 = self._empirical_redox_buffer(
T, P, A=-25096.3, B=8.735, C=.110,
Tlims=np.array([573, 1200])+273.15,
ignore_lims=ignore_lims)
logfO2 = np.vstack((logfO2_T1, logfO2_T2))
logfO2 = np.nanmean(logfO2, axis=0)
else:
not_implemented_error(method, buffer)
elif buffer=='QIF':
method = 'Frost91' if method is None else method
if method=='consistent':
not_implemented_error(method, buffer)
elif method=='LEPR':
not_implemented_error(method, buffer)
elif method=='Frost91':
logfO2_T1 = self._empirical_redox_buffer(
T, P, A=-29435.7, B=7.391, C=.044,
Tlims=np.array([150, 573])+273.15,
ignore_lims=ignore_lims)
logfO2_T2 = self._empirical_redox_buffer(
T, P, A=-29520.8, B=7.492, C=.05,
Tlims=np.array([573, 1200])+273.15,
ignore_lims=ignore_lims)
logfO2 = np.vstack((logfO2_T1, logfO2_T2))
logfO2 = np.nanmean(logfO2, axis=0)
else:
not_implemented_error(method, buffer)
return logfO2
def _O2_chem_potential(self, T, P):
Tref = 298.15
Cp_k0 = 23.10248
Cp_k1 = 804.8876
Cp_k2 = 1762835.0
Cp_k3 = 0.0
Cp_l1 = 18172.91960
Cp_Tt = 0.002676
Hs = (23.10248*(T-Tref) + 2.0*804.8876*(np.sqrt(T)-np.sqrt(Tref))
- 1762835.0*(1.0/T-1.0/Tref) - 18172.91960*np.log(T/Tref)
+ 0.5*0.002676*(T*T-Tref*Tref))
Ss = (205.15 + 23.10248*np.log(T/Tref)
- 2.0*804.8876*(1.0/np.sqrt(T)-1.0/np.sqrt(Tref))
- 0.5*1762835.0*(1.0/(T*T)-1.0/(Tref*Tref))
+ 18172.91960*(1.0/T-1.0/Tref) + 0.002676*(T-Tref))
mu_O2 = Hs - T*Ss
return mu_O2
def _consistent_redox_buffer_QFM(self, T, P):
mu_O2 = self._O2_chem_potential(T, P)
mu_Qz = self.get_phase('Qz').chem_potential(T, P)
mu_Fa = self.get_phase('Fa').chem_potential(T, P)
mu_Mag = self.get_phase('Mag').chem_potential(T, P)
dGr0 = 2*mu_Mag + 3*mu_Qz - 3*mu_Fa - mu_O2
logfO2 = 1/(2.303*8.314*T)*dGr0
return logfO2
def _consistent_redox_buffer_HM(self, T, P):
mu_O2 = self._O2_chem_potential(T, P)
mu_Hem = self.get_phase('Hem').chem_potential(T, P)
mu_Mag = self.get_phase('Mag').chem_potential(T, P)
dGr0 = 6*mu_Hem - 4*mu_Mag - mu_O2
logfO2 = 1/(2.303*8.314*T)*dGr0
return logfO2
def _empirical_redox_buffer(self, T, P, A=0, B=0, C=0, D=0,
ignore_lims=True, Tlims=None):
logfO2 = A/T + B + C*(P-1)/T + D*np.log(T)
if (not ignore_lims) and (Tlims is not None):
logfO2[T<Tlims[0]] = np.nan
logfO2[T>=Tlims[1]] = np.nan
return logfO2
@classmethod
def parse_rxn(cls, rxn_eqn_str, rxn_result_str=None, sort=True ):
rxn_phs_str, rxn_eqn_str = cls._get_reaction_phase_str( rxn_eqn_str, sort=sort, full_output=True)
reac_l, prod_l = cls._get_reaction_phases( rxn_phs_str )
if rxn_result_str is not None:
rxn_dir = cls._get_rxn_dir(rxn_phs_str, rxn_result_str)
else:
rxn_dir = None
rxn_d = {}
rxn_d['rxn_eqn'] = rxn_eqn_str
rxn_d['rxn_phs_str'] = rxn_phs_str
rxn_d['reac_l'] = reac_l
rxn_d['prod_l'] = prod_l
rxn_d['rxn_dir'] = rxn_dir
return rxn_d
@classmethod
def _parse_rxn_result(cls, result_str):
# Remove surrounding whitespace
result_str = result_str.strip()
if result_str == 'NC':
phs_l = None
obs_l = result_str
return phs_l, obs_l
parse_result = re.compile(r'\w+\s*[+-?]+\s*')
phs_result_l = parse_result.findall(result_str)
phs_a = []
obs_a = []
for iphs_result in phs_result_l:
ires = iphs_result.strip().split()
phs_a.append(ires[0])
obs_a.append(ires[1])
phs_a = np.array(phs_a)
obs_a = np.array(obs_a)
return phs_a, obs_a
@classmethod
def _get_rxn_dir(cls, rxn_phs_str, result):
# Determine Reaction Direction: ['FWD','REV','NC','FWD?','REV?','INV']
# print('rxn_phs_str = ',rxn_phs_str)
# print('results = ',result)
# print('===========')
reac_l, prod_l = cls._get_reaction_phases(rxn_phs_str)
phs_a, obs_a = cls._parse_rxn_result(result)
if phs_a is None:
rxn_dir = 'NC'
return rxn_dir
fwd_cnt = 0
fwd_maybe_cnt = 0
rev_cnt = 0
rev_maybe_cnt = 0
for reac in reac_l:
if reac not in phs_a:
continue
obs = obs_a[phs_a==reac]
if obs == '-':
fwd_cnt += 1
elif obs == '-?':
fwd_maybe_cnt +=1
elif obs == '+':
rev_cnt += 1
elif obs == '+?':
rev_maybe_cnt += 1
elif obs == '?':
# Do nothing
pass
else:
# print('*****************')
# print('reac = ',reac)
# print('prod = ',prod)
# print('obs = ', obs)
# print('prod_l = ',prod_l)
# print('reac_l = ',reac_l)
# print('phs_a = ',phs_a)
# print('obs_a = ',obs_a)
# print('*****************')
assert False, obs + ' is not a supported Results code.'
for prod in prod_l:
if prod not in phs_a:
continue
obs = obs_a[phs_a==prod]
if obs == '+':
fwd_cnt += 1
elif obs == '+?':
fwd_maybe_cnt +=1
elif obs == '-':
rev_cnt += 1
elif obs == '-?':
rev_maybe_cnt += 1
elif obs == '?':
# Do nothing
pass
else:
# print('*****************')
# print('reac = ',reac)
# print('prod = ',prod)
# print('obs = ', obs)
# print('prod_l = ',prod_l)
# print('reac_l = ',reac_l)
# print('phs_a = ',phs_a)
# print('obs_a = ',obs_a)
# print('*****************')
assert False, obs + ' is not a supported Results code.'
# Reaction Direction Options: ['FWD','REV','NC','FWD?','REV?','INV']
if (fwd_cnt==0)&(fwd_maybe_cnt==0)&(rev_cnt==0)&(rev_maybe_cnt==0):
rxn_dir = 'INV'
if fwd_maybe_cnt > 0:
rxn_dir = 'FWD?'
if rev_maybe_cnt > 0:
rxn_dir = 'REV?'
if (fwd_maybe_cnt > 0) & (rev_maybe_cnt > 0):
rxn_dir = 'NC'
if fwd_cnt > 0:
rxn_dir = 'FWD'
if rev_cnt > 0:
rxn_dir = 'REV'
if (fwd_cnt > 0) & (rev_cnt > 0):
rxn_dir = 'INV'
return rxn_dir
@classmethod
def _get_reaction_phases(cls, rxn_phs_str):
reac_str, prod_str = str.split(rxn_phs_str,':')
reac_l = reac_str.split(',')
prod_l = prod_str.split(',')
return reac_l, prod_l
@classmethod
def _split_phases(cls, phs_combo_str):
return [re.sub('^[0-9]','',phs.strip()) \
for phs in phs_combo_str.split('+')]
@classmethod
def _get_reaction_phase_str(cls, rxn_eqn_str, sort=True, full_output=False ):
reac_str, prod_str = str.split(rxn_eqn_str,'=')
reac_str = str.strip(reac_str)
prod_str = str.strip(prod_str)
reac_l = cls._split_phases(reac_str)
prod_l = cls._split_phases(prod_str)
if sort:
reac_l.sort()
prod_l.sort()
first_phs = [reac_l[0],prod_l[0]]
first_phs.sort()
if first_phs[0] == prod_l[0]:
reac_l, prod_l = prod_l, reac_l
rxn_eqn_str = prod_str + ' = ' + reac_str
rxn_phs_str = ''
for phs in reac_l[:-1]:
rxn_phs_str += phs+','
rxn_phs_str += reac_l[-1]
rxn_phs_str += ':'
for phs in prod_l[:-1]:
rxn_phs_str += phs+','
rxn_phs_str += prod_l[-1]
if full_output:
return rxn_phs_str, rxn_eqn_str
else:
return rxn_phs_str
#===================================================