ENKI

Source code for model

"""The Model module of ThermoEngine implements a Python interface with the Phase
objective-C classes as well as the infrastructure for pure phase thermodynamic
calibration. The module contains methods that allow for loading and selection of built-in
thermodynamic databases.

"""

from thermoengine import core
from thermoengine import chem
from thermoengine import phases
DATADIR = 'data/databases'

import numpy as np
import pandas as pd
import re
from os import path
from collections import OrderedDict
import warnings
import deprecation
import sys

#warnings.filterwarnings('error', category=DeprecationWarning)
warnings.filterwarnings('module', category=DeprecationWarning)

__all__ = ['Database']

#===================================================
[docs]class Database: """ Thermodynamic database model object Simple access to many competing thermodynamic database models. A wide variety of published models are available. Models are defined using low-level code together with list of implemented Pure and Solution phases. Parameters ---------- database: {'Berman', 'Stixrude', 'HollandAndPowell', 'CoderModule'} Chosen thermodynamic database (str ID) liq_mod: {'v1.0', 'v1.1', 'v1.2', 'pMELTS'} Chosen version of liquid model (str ID) calib: {True, False} Access calibration code or code optimized for speed (bool) phase_tuple: {None} A tuple that is set if database is 'CoderModule'. The first element is a string corresponding to the name of the module. The second element is a dictionary of abbreviations (keys) and a list of [ClassName, PhaseType] (values) containing phases coded by the module. The abbreviations must be consistent with the standard list in PurePhasdeList.csv or SolutionPhaseList.csv. The ClassNames are implementation-specific. PhaseType is either 'pure' or 'solution'. Methods ------- disable_gibbs_energy_reference_state enable_gibbs_energy_reference_state get_assemblage get_phase get_rxn redox_buffer redox_state Attributes ---------- calib coder_module database liq_mod phase_info phase_tuple phases Examples -------- Retrieve a copy of the Berman/MELTS database. >>> model.Database() Retrieve a copy of the Stixrude database. >>> model.Database(database='Stixrude') Retrieve a copy of the *Berman* database generated by the Coder module in ThermoEngine that contains the pure (stoichiometric) phase class *Potassium_Feldspar* with code generation that includes parameter calibration methods. >>> modelDB = model.Database(database="CoderModule", calib=True, phase_tuple=('berman', {'Or':['Potassium_Feldspar','pure']})) Retrieve a copy of the *Simple_Solution* database generated by the coder module in ThermoEngine that contains the solution phase class *Feldspar* with code generation that includes parameter calibration methods. >>> modelDB = model.Database(database="CoderModule", calib=True, phase_tuple=('Simple_Solution', {'Fsp':['Feldspar','solution']})) Notes ----- The database maintains a single copy of each phase object, which is shared among all processes. Thus, any changes to phase models (through calibration, for example) automatically propagate across all uses of the database. Deprecated methods - phase_attributes - phase_details - phase_obj - phase_props NEEDS UPDATING - disable_gibbs_energy_reference_state - enable_gibbs_energy_reference_state """ def __init__(self, database='Berman', liq_mod='v1.0', calib=True, phase_tuple=None): # Store database name at top-level for convenience # load active phase list # self._init_phase_details() # self._init_active_phases(liq_mod=liq_mod) # self._init_phase_attributes() # Read in allowable phases global_phase_info, global_info_files = phases.get_phase_info() if phase_tuple and database == 'CoderModule': coder_module = phase_tuple[0] phase_dict = phase_tuple[1] else: coder_module = None phase_dict = None active_pure_filename, active_pure_phases = ( self._load_validate_info('PurePhases.csv', database, 'pure', global_phase_info, global_info_files, phase_dict=phase_dict) ) active_soln_filename, active_soln_phases = ( self._load_validate_info('SolutionPhases.csv', database, 'solution', global_phase_info, global_info_files, phase_dict=phase_dict) ) objDB = self._init_database(database, 'solution', active_soln_phases, liq_mod=liq_mod, calib=calib, coder_module=coder_module) objDB = self._init_database(database, 'pure', active_pure_phases, liq_mod=liq_mod, objDB=objDB, calib=calib, coder_module=coder_module) phase_info = pd.DataFrame(columns=['abbrev', 'phase_name', 'formula', 'phase_type', 'endmember_num']) for iabbrev, iphs in objDB.items(): iphs_info = { 'abbrev': iphs.abbrev, 'phase_name': iphs.phase_name, 'formula': iphs.formula, 'phase_type': iphs.phase_type, 'endmember_num': iphs.endmember_num } phase_info = phase_info.append(iphs_info, ignore_index=True) self._database = database self._liq_mod = liq_mod self._phases = objDB self._phase_info = phase_info self._calib = calib self._coder_module = coder_module self._phase_tuple = phase_tuple pass def _load_validate_info(self, basename, database, phase_type, global_phase_info, global_info_files, phase_dict=None): # load phase_info = global_phase_info[phase_type] if database != 'CoderModule': active_phases, active_filename = self._read_database_phase_file( basename, database) elif phase_dict: data = [] for key,value in phase_dict.items(): if value[1] == phase_type: data.append([key, value[0], None]) active_phases = pd.DataFrame(data, columns=['Abbrev', 'ClassName', 'FormulaOveride']) active_filename = 'CoderModule' return active_filename, active_phases else: print ('If database is set to "CoderModule" then the second ' + 'argument of "phase_tuple" must contain a dictionary of ' + 'phase Abbreviations:ClassNames that are available in the ' + 'module.') # validate abbrev_valid = active_phases['Abbrev'].isin(phase_info['Abbrev']) err_msg = ( 'The {phase_type} phase library defined in ' '{filename} contains some invalid phase ' 'abbreviations, shown below: \n\n' '{invalid_phases}\n\n' 'Check that the abbreviations conform to the ' 'list given in: "{phase_info_file}"') invalid_phases = str(active_phases[~abbrev_valid]) phase_info_file = global_info_files[phase_type] assert abbrev_valid.all(), ( err_msg.format(phase_type=phase_type, filename=active_filename, invalid_phases=invalid_phases, phase_info_file=phase_info_file) ) return active_filename, active_phases def _read_database_phase_file(self, basefilenm, database): # database = self.phase_details['database'] # phase_details = self.phase_details parentpath = path.dirname(__file__) filename = database+basefilenm pathname = path.join(parentpath, DATADIR, filename) active_phases = pd.read_csv(pathname) return active_phases, pathname def _init_database(self, database, phase_type, active_phases, fixH2O=True, liq_mod=None, objDB=None, calib=None, coder_module=None): # Create and store class for each phase # propsDB = OrderedDict() if objDB is None: objDB = OrderedDict() phase_ptr = None for indx, phase_info in active_phases.iterrows(): abbrev = phase_info['Abbrev'] classnm = phase_info['ClassName'] if database == 'CoderModule': phase_ptr = self._init_phase_coder(coder_module, classnm, abbrev, phase_type, calib=calib) else: phase_ptr = self._init_phase_obj(classnm+database, abbrev, phase_type, calib=calib) try: objDB[abbrev] = phase_ptr except: print('{classnm} is not a valid ClassName for ' 'the {database} database.'.\ format(classnm=classnm, database=database)) if database != 'CoderModule': objDB = self._load_special_phases(phase_type, fixH2O, liq_mod, objDB) return objDB def _load_special_phases(self, phase_type, fixH2O, liq_mod, objDB): if fixH2O and (phase_type=='pure'): H2O_classnm = 'GenericH2O' H2O_phase_obj = self._init_phase_obj( H2O_classnm, 'H2O', phase_type, calib=False) # propsDB['H2O'] = H2O_phase_obj.props objDB['H2O'] = H2O_phase_obj if (liq_mod is not None) and (phase_type=='solution'): if liq_mod=='v1.0': liq_classnm = 'LiquidMelts' elif liq_mod=='v1.1': liq_classnm = 'LiquidMeltsPlusOldH2OandNewCO2' elif liq_mod=='v1.2': liq_classnm = 'LiquidMeltsPlusCO2' elif liq_mod=='pMELTS': liq_classnm = 'LiquidpMelts' else: assert False, ( 'Chosen liq_mod is not valid' ) liq_phase_obj = self._init_phase_obj( liq_classnm, 'Liq', phase_type, calib=True) # propsDB['Liq'] = liq_phase_obj.props objDB['Liq'] = liq_phase_obj if (phase_type=='solution'): alloy_solid_classnm = 'AlloySolid' alloy_liquid_classnm = 'AlloyLiquid' MtlS_phase_obj = self._init_phase_obj( alloy_solid_classnm, 'MtlS', phase_type, calib=False) MtlL_phase_obj = self._init_phase_obj( alloy_liquid_classnm, 'MtlL', phase_type, calib=False) # propsDB['H2O'] = H2O_phase_obj.props objDB['MtlS'] = MtlS_phase_obj objDB['MtlL'] = MtlL_phase_obj # return propsDB, objDB return objDB ######################################################### def _init_phase_obj(self, classname, abbrev, phase_type, calib=None): if phase_type=='pure': if calib is None: phase_obj = phases.PurePhase(classname, abbrev) else: phase_obj = phases.PurePhase(classname, abbrev, calib=calib) elif phase_type=='solution': if calib is None: phase_obj = phases.SolutionPhase(classname, abbrev) else: phase_obj = phases.SolutionPhase(classname, abbrev, calib=calib) return phase_obj def _init_phase_coder(self, coder_module, classnm, abbrev, phase_type, calib=False): ptr_name = 'cy_' + classnm + '_' + coder_module.split('.')[-1] + '_' + ( 'calib_' if calib else '') if phase_type=='pure': if calib is None: phase_ptr = phases.PurePhase(ptr_name, abbrev, source='coder', coder_module=coder_module) else: phase_ptr = phases.PurePhase(ptr_name, abbrev, calib=calib, source='coder', coder_module=coder_module) elif phase_type=='solution': if calib is None: phase_ptr = phases.SolutionPhase(ptr_name, abbrev, source='coder', coder_module=coder_module) else: phase_ptr = phases.SolutionPhase(ptr_name, abbrev, calib=calib, source='coder', coder_module=coder_module) return phase_ptr @property def liq_mod(self): """ Name of current liquid model Refers to version of MELTS. """ return self._liq_mod @property def database(self): """ Name of current database """ return self._database @property def phases(self): """ Dictionary of phase objects Phase objects stored under official abbreviation. See phase_info for information on each phase. """ return self._phases @property def phase_info(self): """ Phase info table for all members of current database Basic phase information stored in pandas dataframe with columns: 'abbrev' - Official phase abbreviation 'phase_name' - Full phase name 'formula' - Chemical formula (or generic formula for solution phases) 'phase_type' - Solution or pure 'endmember_num' - Number of endmembers (1 for pure phases) """ return self._phase_info @property def calib(self): """ The code base for this phase implements model calibration functions. Returns ------- bool """ return self._calib @property def coder_module(self): """ Module name of Coder-generated database Returns ------- str Name of Coder module """ return self._coder_module @property def phase_tuple(self): """ Dictionary of phases in a Coder-generated database Returns ------- tuple of strs (1) str of coder_module (2) dict of phases coded in the coder_module. *key* is abbreviation. *value* is a two-component list of phase_name and phase_type; *phase_type* is either 'pure' or 'solution'. """ return self._phase_tuple @property @deprecation.deprecated( deprecated_in="1.0", removed_in="2.0", details="Use phase_info instead.") def phase_details(self): # deprecated_in="1.0", removed_in="2.0", # current_version=__version__, # warnings.warn( # "phase_details is deprecated, use phase_info instead.", # DeprecationWarning) pass @property @deprecation.deprecated( deprecated_in="1.0", removed_in="2.0", details=( "For basic phase properties, use phase_info instead.\n" "For detailed phase properties, retrieve them directly " "from the desired phase object stored in phases." )) def phase_attributes(self): # warnings.warn(( # "'phase_attributes' is deprecated.\n" # "For basic phase properties, use 'phase_info' instead.\n" # "For detailed phase properties, retrieve them directly " # "from the desired phase object stored in 'phases'."), # DeprecationWarning) pass @property @deprecation.deprecated( deprecated_in="1.0", removed_in="2.0", details=( "Direct access to the non-Python phase calculation object is " "not recommended. Instead, use the Python interface provided by " "the desired phase stored in phases.")) def phase_obj(self): # warnings.warn(( # "'phase_obj' is deprecated.\n" # "Direct access to the non-python phase calculation object is " # "not recommended. Instead, use the python interface provided by " # "the desired phase stored in 'phases'."), # DeprecationWarning) pass @property @deprecation.deprecated( deprecated_in="1.0", removed_in="2.0", details=( "Instead retrieve properties directly " "from the desired phase object stored in phases.")) def phase_props(self): # warnings.warn(( # "'phase_props' is deprecated.\n" # "Instead retrieve properties directly " # "from the desired phase object stored in 'phases'."), # DeprecationWarning) pass ##################
[docs] def enable_gibbs_energy_reference_state(self): """ Enable Helgeson convention of Gibbs energy. Use Helgeson (SUPCRT) convention of Gibbs free energy of formation rather than enthalpy of formation at Tr, Pr. """ # call method on any phase class (automatically applied to all) # next(iter(self.phases.values())) for phase in self.phases.values(): phase.enable_gibbs_energy_reference_state()
[docs] def disable_gibbs_energy_reference_state(self): """ Disable Helgeson convention of Gibbs energy. Use standard enthalpy of formation at Tr, Pr as reference, rather than Helgeson (SUPCRT) convention of Gibbs free energy of formation. """ # call method on any phase class (automatically applied to all) # next(iter(self.phases.values())) for phase in self.phases.values(): phase.disable_gibbs_energy_reference_state()
[docs] def get_phase_obj(self, phasesym_l): """ Get a phase that is coded in Objective-C by symbol. Parameters ---------- phasesym_l : [] A list of abbreviations of desired phases. Returns ------- phase_obj_l List of phase objects corresponding to desired phases. """ phase_details = self.phase_details phase_obj_l = [] for phasesym in phasesym_l: try: # phase_obj_l.append(phase_details['purephase_obj_d'][phasesym]) phase_obj_l.append(phase_details['pure_props'][phasesym]['obj']) except: assert False, '"'+phasesym+'" is not a valid phase abbreviation. Try again.' return phase_obj_l
[docs] def get_phase(self, phase_symbol): """ Get phase from current database by symbol. Parameters ---------- phase_symbol : str Abbreviation of desired phase. Must use correct ThermoEngine format. Returns ------- phase : obj Phase object corresponding to desired phase. Notes ----- The phase object retrieved by this method remains tied to the Database (i.e., it points to a single copy stored in the Database). Any changes made to this phase (e.g., through calibration) thus propagate to the entire Database. Examples -------- phase = modelDB.get_phase('Aeg') sym_list = ['Aeg', 'Ky', 'Sil', 'Qtz'] phase_list = [modelDB.get_phase(sym) for sym in sym_list] """ # warn_val = warnings.warn( # "phase_details is deprecated, use phase_info instead.", # DeprecationWarning) phases = self.phases try: return phases[phase_symbol] except KeyError: err_msg = ( '"{phase_symbol}" is not a valid ' 'phase symbol for this database. ' 'Select from the following available ' 'phase symbol keys: {allowedsyms}' ) phase_list = (self.phase_tuple[1]).keys() raise KeyError( err_msg.format( phase_symbol=phase_symbol, allowedsyms=phase_list) )
[docs] def warn_test(self): """ This method is called when some function is deprecated. """ warnings.filterwarnings('error', category=DeprecationWarning) print('hello') warnings.warn("Big old warning test", DeprecationWarning) print(sys.stderr) print('world')
[docs] def get_assemblage(self, phase_symbols): """ Get phase assemblage from current database by symbol. An assemblage represents a set of coexisting phases. Parameters ---------- phase_symbols : list of strings List of abbreviations of coexisting phases. Must use correct ThermoEngine format. Returns ------- assemblage : obj Assemblage object corresponding to coexisting phases. Examples -------- sym_list = ['Ky', 'Sil', 'Qtz'] assemblage = modelDB.get_assemblage(sym_list) """ phase_objs = [] for phase_symbol in phase_symbols: phase_obj = self.get_phase(phase_symbol) phase_objs.append(phase_obj) assemblage = phases.Assemblage(phase_objs, obj_is_classnm=False) return assemblage
[docs] def get_rxn(self, phase_symbols, endmember_ids, rxn_coefs, coefs_per_atom=False): """ Get an endmember reaction from current database. A reaction is represented as a stoichiometrically balanced exchange of atoms between a set of endmember (or pure) phases. Parameters ---------- phase_symbols : list of strings List of abbreviations of reacting phases endmember_ids : list of ints List of integers representing endmember ID number for each phase rxn_coefs : array Array of reaction coefficients. Positive values are products; negative values are reactants. Coefficients should be stoichiometrically balanced. coefs_per_atom : bool, default False If False, rxn coefficients are defined per formula unit of each endmember. If True, coefficients are given on per atom basis. Thus they are independent of the formula unit definition. This is useful to avoid mistakes for phases with multiple standard formula definitions (e.g., En is often given as MgSiO3 OR Mg2Si2O6). Returns ------- rxn : obj rxn object for reacting endmember phases Examples -------- phase_symbols = ['Per', 'Qz', 'Cpx'] endmember_ids = [0, 0, 1] rxn_coefs = [2, 2, 1] rxn = modelDB.get_rxn(phase_symbols, endmember_ids, rxn_coefs) # rxn_coefs defined per atom basis rxn_coefs = [2, 3, 5] rxn = modelDB.get_rxn(phase_symbols, endmember_ids, rxn_coefs, coefs_per_atom=True) """ assert ( (len(endmember_ids) == len(phase_symbols)) and (len(phase_symbols) == len(rxn_coefs))), 'phase_symbols, endmember_ids, and rxn_coefs must all be equal' phase_objs = [] for phase_symbol in phase_symbols: phase_objs.append(self.get_phase(phase_symbol)) # from IPython import embed;embed();import ipdb;ipdb.set_trace() rxn = phases.Rxn(phase_objs, endmember_ids, rxn_coefs, coefs_per_atom=coefs_per_atom) return rxn
def _redox_state_Kress91(self, T, P, oxide_comp, logfO2=None): """ Fe redox model of Kress and Carmichael 1991 Calculate ln(Fe2O3/FeO) ratio given lnfO2, T, P, bulk composition. Alternatively, can predict lnfO2 values given measured ferric & ferrous comp. Parameters ---------- T : double (array) temperature in Kelvin P : double (array) pressure in bars oxide_comp : double array (matrix) molar oxide composition in standard order. Either measured FeO and Fe2O3 are provided, or total iron reported as FeO (e.g. FeO*) logfO2 : double (array), default None If provided, the measured logfO2 value is used to predict the ln(Fe2O3/FeO). Otherwise, reported FeO and Fe2O3 values are used to predict logfO2. Returns ------- output : double (array) Output depends on whether logfO2 values are provided. ln_Fe_oxide_ratio : If logfO2 values are given, return log ferric/ferrous ratio of melt. logfO2 : If not, return predicted logfO2, given measured ferric and ferrous content of melt. """ predict_fO2 = False if logfO2 is None: predict_fO2 = True OXIDES = chem.OXIDE_ORDER # ['SiO2', 'TiO2', 'Al2O3', 'Fe2O3', 'Cr2O3', 'FeO', 'MnO', 'MgO', # 'NiO', 'CoO', 'CaO', 'Na2O', 'K2O', 'P2O5', 'H2O', 'CO2'] T0 = 1673.15 # K a = 0.196 b = 1.1492e4 # K c = -6.675 e = -3.364 f = -7.01e-7 * 1.0e5 # K/bar g = -1.54e-10 * 1.0e5 # 1/bar h = 3.85e-17 * 1.0e5 * 1.0e5 # K/bar^2 # dAl2O3 = -2.243 # dFeO = -1.828 # dCaO = 3.201 # dNa2O = 5.854 # dK2O = 6.215 mol_oxides = np.array(oxide_comp.copy()) # from IPython import embed; embed() XFeO_equiv = mol_oxides[:, OXIDES=='FeO'] + 2*mol_oxides[:, OXIDES=='Fe2O3'] # print(mol_oxides.shape) # print(XFeO_equiv.shape) if predict_fO2: ln_Fe_oxide_ratio = np.squeeze(np.log(mol_oxides[:, OXIDES=='Fe2O3']/mol_oxides[:, OXIDES=='FeO'])) # display(ln_Fe_oxide_ratio) mol_oxides[:, OXIDES=='FeO'] = XFeO_equiv mol_oxides[:, OXIDES=='Fe2O3'] = 0.0 if mol_oxides.ndim==2: mol_oxide_tot = np.sum(mol_oxides, axis=1) mol_oxides /= mol_oxide_tot[:,np.newaxis] elif mol_oxides.ndim==1: mol_oxide_tot = np.sum(mol_oxides) mol_oxides /= mol_oxide_tot else: assert False, 'mol_oxides must be either an array of compositions, or a matrix for many experiments' d = np.zeros(len(OXIDES)) d[OXIDES=='Al2O3'] = -2.243 d[OXIDES=='FeO'] = -1.828 d[OXIDES=='CaO'] = +3.201 d[OXIDES=='Na2O'] = +5.854 d[OXIDES=='K2O'] = +6.215 atm_terms = b/T + c + e*(1.0-T0/T - np.log(T/T0)) press_terms = f*P/T + g*(T-T0)*P/T+ h*P*P/T comp_terms = np.dot(mol_oxides, d) if not predict_fO2: lnfO2 = logfO2/np.log10(np.exp(1)) ln_Fe_oxide_ratio = a*lnfO2 + atm_terms + press_terms + comp_terms return ln_Fe_oxide_ratio else: # print(ln_Fe_oxide_ratio ) # print((atm_terms + press_terms + comp_terms)) lnfO2 = (ln_Fe_oxide_ratio - (atm_terms + press_terms + comp_terms))/a logfO2 = lnfO2*np.log10(np.exp(1)) return logfO2
[docs] def redox_state(self, T, P, oxide_comp=None, logfO2=None, phase_of_interest=None, method=None): """ Parameters ---------- T : array-like Temperature in Kelvin. P : array-like Pressure in bars. oxide_comp : dict of arrays, optional Molar oxide composition of each phase. logfO2 : double (array), optional Base 10 logfO2 values with fO2 in bars phase_of_interest : {'Liq', 'Spl'} Abbreviation of redox-sensitive phase used to determine redox state. method : {'consistent', 'coexist', 'stoich', 'Kress91'} 'consistent' : 'coexist' : 'stoich' : 'Kress91' : """ def not_implemented_error(method, phase_of_interest): raise NotImplementedError( 'The method "'+method+'" is not implemented ' + 'for phase_of_interest "'+phase_of_interest+'".') output = None if phase_of_interest=='Liq': if method=='Kress91': if oxide_comp['Liq'].ndim==1: oxide_comp['Liq'] = oxide_comp['Liq'][np.newaxis, :] liq_oxide_comp = oxide_comp['Liq'] # print(liq_oxide_comp.shape) output = self._redox_state_Kress91( T, P, liq_oxide_comp, logfO2=logfO2) if logfO2 is None: logfO2 = output else: ln_Fe_oxide_ratio = output Fe_oxide_ratio = np.exp(ln_Fe_oxide_ratio) ind_FeO = np.where(chem.OXIDE_ORDER=='FeO')[0][0] ind_Fe2O3 = np.where(chem.OXIDE_ORDER=='Fe2O3')[0][0] XFeO = liq_oxide_comp[:, ind_FeO] XFe2O3 = liq_oxide_comp[:, ind_Fe2O3] XFeOs = XFeO + 2*XFe2O3 XFeO = XFeOs/(1+2*Fe_oxide_ratio) XFe2O3 = 0.5*(XFeOs - XFeO) oxide_comp['Liq'][:, ind_FeO] = XFeO oxide_comp['Liq'][:, ind_Fe2O3] = XFe2O3 else: not_implemented_error(method, phase_of_interest) elif phase_of_interest=='Spl': not_implemented_error(method, phase_of_interest) else: not_implemented_error(method, phase_of_interest) return output
[docs] def redox_buffer(self, T, P, buffer=None, method=None, ignore_lims=True): """ Calculate logfO2 values for common oxygen buffers. Parameters ---------- T : double (array) Temperature in Kelvin P : double (array) Pressure in bars buffer : {'IW', 'IM', 'NNO', 'Co+CoO', 'Cu+Cu2O', ('HM'/'MH'), ('MW'/'WM'), 'MMO', 'CCO', ('QFM'/'FMQ'), 'QIF'} Models of common oxygen fugacity buffer systems with sources. 'IW' : Iron-Wustite [1?,2,3] 'IM' : Iron-Magnetite [1?,3] 'NNO' : Nickel-Nickel Oxide [2,3] 'Co+CoO' : Cobalt-Cobalt Oxide [3] 'Cu+Cu2O' : Copper-Copper Oxide [2] 'HM' or 'MH' : Magnetite-Hematite [1,2,3] 'MW' or 'WM' : Magnetite-Wustite [1?,2,3] 'MMO' : MnO-Mn3O4 [2] 'CCO' : Graphite-CO-CO2 [2] 'QFM' or 'FMQ' : Quartz-Fayalite-Magnetite [1,2,3] 'QIF' : Quartz-Iron-Fayalite [1?,3] method : {'consistent', 'LEPR', 'Frost1991'} Method used to calculate redox buffer value. Default depends on buffer availability (see above), in preference order {'consistent', 'LEPR', 'Frost1991'}. [1] 'consistent' : Directly evaluate chemical potentials for each phase using current thermodynamic database [2] 'LEPR' : Empirical expressions constructed for LEPR database (Hirschmann et al. 2008) based on previously published studies [3] 'Frost91' : Empirical expressions from review article Frost 1991 Returns ------- logfO2 : double (array) Base 10 logfO2 values with fO2 in bars Publication Sources ------------------- [1] consistent method based on MELTS thermodynamic database, incorporating Berman 1988 and ??? [2] M. Hirschmann et al. (2008) Library of Experimental Phase Relations (LEPR): A database and Web portal for experimental magmatic phase equilibria data [3] B. R. Frost (1991) Introduction to oxygen fugacity and its petrologic importance """ BUFFER_OPTS = ['IW', 'IM', 'NNO', 'Co+CoO', 'Cu+Cu2O', 'HM', 'MH', 'MW', 'WM', 'MMO', 'CCO', 'QFM', 'FMQ', 'QIF'] assert buffer in BUFFER_OPTS, ( 'Selected buffer ' + buffer + ' is not available. Please select from ' + str(BUFFER_OPTS) ) def not_implemented_error(method, buffer): raise NotImplementedError( 'The method "'+method+'" is not implimented ' + 'for the redox buffer "'+buffer+'".') if buffer=='IW': method = 'LEPR' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': logfO2 = self._empirical_redox_buffer( T, P, A=-28776.8, B=14.057, C=.055, D=-.8853) elif method=='Frost91': logfO2 = self._empirical_redox_buffer( T, P, A=-27489, B=6.702, C=.055, ignore_lims=ignore_lims, Tlims=np.array([565, 1200])+273.15) else: not_implemented_error(method, buffer) elif buffer=='IM': method = 'Frost91' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': not_implemented_error(method, buffer) elif method=='Frost91': logfO2 = self._empirical_redox_buffer( T, P, A=-28690.6, B=8.13, C=.056, ignore_lims=ignore_lims, Tlims=np.array([300, 565])+273.15) else: not_implemented_error(method, buffer) elif buffer=='NNO': method = 'LEPR' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': logfO2 = self._empirical_redox_buffer( T, P, A=-25018.7, B=12.981, C=0.046, D=-0.5117) elif method=='Frost91': logfO2 = self._empirical_redox_buffer( T, P, A=-24930, B=9.36, C=.046, ignore_lims=ignore_lims, Tlims=np.array([600, 1200])+273.15) else: not_implemented_error(method, buffer) elif buffer=='Co+CoO': method = 'Frost91' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': not_implemented_error(method, buffer) elif method=='Frost91': logfO2 = self._empirical_redox_buffer( T, P, A=-24332.6, B=7.295, C=0.052, ignore_lims=ignore_lims, Tlims=np.array([600, 1200])+273.15) else: not_implemented_error(method, buffer) elif buffer=='Cu+Cu2O': method = 'LEPR' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': logfO2 = self._empirical_redox_buffer( T, P, A=-18162.2, B=12.855, C=0, D=-.6741) elif method=='Frost91': not_implemented_error(method, buffer) else: not_implemented_error(method, buffer) elif buffer in ['HM', 'MH']: method = 'consistent' if method is None else method if method=='consistent': logfO2 = self._consistent_redox_buffer_HM(T, P) elif method=='LEPR': logfO2 = self._empirical_redox_buffer( T, P, A=-25632, B=14.620, C=0.019) elif method=='Frost91': logfO2_T1 = self._empirical_redox_buffer( T, P, A=-25497.5, B=14.33, C=.019, ignore_lims=ignore_lims, Tlims=np.array([300, 573])+273.15) logfO2_T2 = self._empirical_redox_buffer( T, P, A=-26452.6, B=15.455, C=.019, Tlims=np.array([573, 682])+273.15, ignore_lims=ignore_lims) logfO2_T3 = self._empirical_redox_buffer( T, P, A=-25700.6, B=14.558, C=.019, Tlims=np.array([682, 1100])+273.15, ignore_lims=ignore_lims) logfO2 = np.vstack((logfO2_T1, logfO2_T2, logfO2_T3)) logfO2 = np.nanmean(logfO2, axis=0) else: not_implemented_error(method, buffer) elif buffer in ['MW', 'WM']: method = 'LEPR' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': logfO2 = self._empirical_redox_buffer( T, P, A=-30396, B=-3.427, C=0.083, D=2.0236) elif method=='Frost91': logfO2 = self._empirical_redox_buffer( T, P, A=-32807, B=13.012, C=.083, Tlims=np.array([565, 1200])+273.15, ignore_lims=ignore_lims) else: not_implemented_error(method, buffer) elif buffer=='MMO': method = 'LEPR' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': logfO2 = self._empirical_redox_buffer( T, P, A=-29420.7, B=92.025, C=0, D=-11.517) elif method=='Frost91': not_implemented_error(method, buffer) else: not_implemented_error(method, buffer) elif buffer=='CCO': method = 'LEPR' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': logfO2 = self._empirical_redox_buffer( T, P, A=-21803, B=4.325, C=0.171) elif method=='Frost91': not_implemented_error(method, buffer) else: not_implemented_error(method, buffer) elif buffer in ['QFM', 'FMQ']: method = 'consistent' if method is None else method if method=='consistent': logfO2 = self._consistent_redox_buffer_QFM(T, P) elif method=='LEPR': logfO2 = self._empirical_redox_buffer( T, P, A=-30686, B=82.760, C=.094, D=-10.620) elif method=='Frost91': logfO2 = self._empirical_redox_buffer( T, P, A=0, B=0, C=0, ignore_lims=ignore_lims, Tlims=np.array([0, 0])+273.15) logfO2_T1 = self._empirical_redox_buffer( T, P, A=-26455.3, B=10.344, C=.092, Tlims=np.array([400, 573])+273.15, ignore_lims=ignore_lims) logfO2_T2 = self._empirical_redox_buffer( T, P, A=-25096.3, B=8.735, C=.110, Tlims=np.array([573, 1200])+273.15, ignore_lims=ignore_lims) logfO2 = np.vstack((logfO2_T1, logfO2_T2)) logfO2 = np.nanmean(logfO2, axis=0) else: not_implemented_error(method, buffer) elif buffer=='QIF': method = 'Frost91' if method is None else method if method=='consistent': not_implemented_error(method, buffer) elif method=='LEPR': not_implemented_error(method, buffer) elif method=='Frost91': logfO2_T1 = self._empirical_redox_buffer( T, P, A=-29435.7, B=7.391, C=.044, Tlims=np.array([150, 573])+273.15, ignore_lims=ignore_lims) logfO2_T2 = self._empirical_redox_buffer( T, P, A=-29520.8, B=7.492, C=.05, Tlims=np.array([573, 1200])+273.15, ignore_lims=ignore_lims) logfO2 = np.vstack((logfO2_T1, logfO2_T2)) logfO2 = np.nanmean(logfO2, axis=0) else: not_implemented_error(method, buffer) return logfO2
def _O2_chem_potential(self, T, P): Tref = 298.15 Cp_k0 = 23.10248 Cp_k1 = 804.8876 Cp_k2 = 1762835.0 Cp_k3 = 0.0 Cp_l1 = 18172.91960 Cp_Tt = 0.002676 Hs = (23.10248*(T-Tref) + 2.0*804.8876*(np.sqrt(T)-np.sqrt(Tref)) - 1762835.0*(1.0/T-1.0/Tref) - 18172.91960*np.log(T/Tref) + 0.5*0.002676*(T*T-Tref*Tref)) Ss = (205.15 + 23.10248*np.log(T/Tref) - 2.0*804.8876*(1.0/np.sqrt(T)-1.0/np.sqrt(Tref)) - 0.5*1762835.0*(1.0/(T*T)-1.0/(Tref*Tref)) + 18172.91960*(1.0/T-1.0/Tref) + 0.002676*(T-Tref)) mu_O2 = Hs - T*Ss return mu_O2 def _consistent_redox_buffer_QFM(self, T, P): mu_O2 = self._O2_chem_potential(T, P) mu_Qz = self.get_phase('Qz').chem_potential(T, P) mu_Fa = self.get_phase('Fa').chem_potential(T, P) mu_Mag = self.get_phase('Mag').chem_potential(T, P) dGr0 = 2*mu_Mag + 3*mu_Qz - 3*mu_Fa - mu_O2 logfO2 = 1/(2.303*8.314*T)*dGr0 return logfO2 def _consistent_redox_buffer_HM(self, T, P): mu_O2 = self._O2_chem_potential(T, P) mu_Hem = self.get_phase('Hem').chem_potential(T, P) mu_Mag = self.get_phase('Mag').chem_potential(T, P) dGr0 = 6*mu_Hem - 4*mu_Mag - mu_O2 logfO2 = 1/(2.303*8.314*T)*dGr0 return logfO2 def _empirical_redox_buffer(self, T, P, A=0, B=0, C=0, D=0, ignore_lims=True, Tlims=None): logfO2 = A/T + B + C*(P-1)/T + D*np.log(T) if (not ignore_lims) and (Tlims is not None): logfO2[T<Tlims[0]] = np.nan logfO2[T>=Tlims[1]] = np.nan return logfO2 @classmethod def parse_rxn(cls, rxn_eqn_str, rxn_result_str=None, sort=True ): rxn_phs_str, rxn_eqn_str = cls._get_reaction_phase_str( rxn_eqn_str, sort=sort, full_output=True) reac_l, prod_l = cls._get_reaction_phases( rxn_phs_str ) if rxn_result_str is not None: rxn_dir = cls._get_rxn_dir(rxn_phs_str, rxn_result_str) else: rxn_dir = None rxn_d = {} rxn_d['rxn_eqn'] = rxn_eqn_str rxn_d['rxn_phs_str'] = rxn_phs_str rxn_d['reac_l'] = reac_l rxn_d['prod_l'] = prod_l rxn_d['rxn_dir'] = rxn_dir return rxn_d @classmethod def _parse_rxn_result(cls, result_str): # Remove surrounding whitespace result_str = result_str.strip() if result_str == 'NC': phs_l = None obs_l = result_str return phs_l, obs_l parse_result = re.compile(r'\w+\s*[+-?]+\s*') phs_result_l = parse_result.findall(result_str) phs_a = [] obs_a = [] for iphs_result in phs_result_l: ires = iphs_result.strip().split() phs_a.append(ires[0]) obs_a.append(ires[1]) phs_a = np.array(phs_a) obs_a = np.array(obs_a) return phs_a, obs_a @classmethod def _get_rxn_dir(cls, rxn_phs_str, result): # Determine Reaction Direction: ['FWD','REV','NC','FWD?','REV?','INV'] # print('rxn_phs_str = ',rxn_phs_str) # print('results = ',result) # print('===========') reac_l, prod_l = cls._get_reaction_phases(rxn_phs_str) phs_a, obs_a = cls._parse_rxn_result(result) if phs_a is None: rxn_dir = 'NC' return rxn_dir fwd_cnt = 0 fwd_maybe_cnt = 0 rev_cnt = 0 rev_maybe_cnt = 0 for reac in reac_l: if reac not in phs_a: continue obs = obs_a[phs_a==reac] if obs == '-': fwd_cnt += 1 elif obs == '-?': fwd_maybe_cnt +=1 elif obs == '+': rev_cnt += 1 elif obs == '+?': rev_maybe_cnt += 1 elif obs == '?': # Do nothing pass else: # print('*****************') # print('reac = ',reac) # print('prod = ',prod) # print('obs = ', obs) # print('prod_l = ',prod_l) # print('reac_l = ',reac_l) # print('phs_a = ',phs_a) # print('obs_a = ',obs_a) # print('*****************') assert False, obs + ' is not a supported Results code.' for prod in prod_l: if prod not in phs_a: continue obs = obs_a[phs_a==prod] if obs == '+': fwd_cnt += 1 elif obs == '+?': fwd_maybe_cnt +=1 elif obs == '-': rev_cnt += 1 elif obs == '-?': rev_maybe_cnt += 1 elif obs == '?': # Do nothing pass else: # print('*****************') # print('reac = ',reac) # print('prod = ',prod) # print('obs = ', obs) # print('prod_l = ',prod_l) # print('reac_l = ',reac_l) # print('phs_a = ',phs_a) # print('obs_a = ',obs_a) # print('*****************') assert False, obs + ' is not a supported Results code.' # Reaction Direction Options: ['FWD','REV','NC','FWD?','REV?','INV'] if (fwd_cnt==0)&(fwd_maybe_cnt==0)&(rev_cnt==0)&(rev_maybe_cnt==0): rxn_dir = 'INV' if fwd_maybe_cnt > 0: rxn_dir = 'FWD?' if rev_maybe_cnt > 0: rxn_dir = 'REV?' if (fwd_maybe_cnt > 0) & (rev_maybe_cnt > 0): rxn_dir = 'NC' if fwd_cnt > 0: rxn_dir = 'FWD' if rev_cnt > 0: rxn_dir = 'REV' if (fwd_cnt > 0) & (rev_cnt > 0): rxn_dir = 'INV' return rxn_dir @classmethod def _get_reaction_phases(cls, rxn_phs_str): reac_str, prod_str = str.split(rxn_phs_str,':') reac_l = reac_str.split(',') prod_l = prod_str.split(',') return reac_l, prod_l @classmethod def _split_phases(cls, phs_combo_str): return [re.sub('^[0-9]','',phs.strip()) \ for phs in phs_combo_str.split('+')] @classmethod def _get_reaction_phase_str(cls, rxn_eqn_str, sort=True, full_output=False ): reac_str, prod_str = str.split(rxn_eqn_str,'=') reac_str = str.strip(reac_str) prod_str = str.strip(prod_str) reac_l = cls._split_phases(reac_str) prod_l = cls._split_phases(prod_str) if sort: reac_l.sort() prod_l.sort() first_phs = [reac_l[0],prod_l[0]] first_phs.sort() if first_phs[0] == prod_l[0]: reac_l, prod_l = prod_l, reac_l rxn_eqn_str = prod_str + ' = ' + reac_str rxn_phs_str = '' for phs in reac_l[:-1]: rxn_phs_str += phs+',' rxn_phs_str += reac_l[-1] rxn_phs_str += ':' for phs in prod_l[:-1]: rxn_phs_str += phs+',' rxn_phs_str += prod_l[-1] if full_output: return rxn_phs_str, rxn_eqn_str else: return rxn_phs_str
#===================================================