# Copyright 2025-2026 Onera
# This file is part of the Noda package
# SPDX-License-Identifier: GPL-3.0-or-later
"""Load thermodynamics and mobility data."""
import json
import tomllib
import re
import io
import pandas as pd
import noda.thermo_functions as tfu
import noda.utils as ut
from noda.paths import pkg_data_dir
[docs]
def get_user_data(data_dir, logger):
"""
Get user data from 'user_data.toml' file.
If 'user_data.toml' file is not found in the user data folder, use the
package-provided file instead.
Parameters
----------
data_dir : pathlib.Path
Path of data folder.
logger : :class:`log_utils.CustomLogger`
Logger.
Returns
-------
res : dict
Content of file.
"""
fpath = data_dir / 'user_data.toml'
if fpath.is_file():
with open(fpath, 'rb') as file:
res = tomllib.load(file)
else:
with open(pkg_data_dir / 'user_data.toml', 'rb') as file:
res = tomllib.load(file)
msg = (f"No 'user_data.toml' file found in {data_dir}. Using "
"'user_data.toml' file from package installation directory "
f"instead ({pkg_data_dir}).")
logger.warning(msg)
return res
[docs]
def get_partial_molar_volume(databases, db_register, comps, default, logger):
"""
Get partial molar volumes from specified database.
User input can be a database name, which must be present in the database
register, or directly a dict with pure elements as keys.
If no partial molar volume database is specified, or if the specified
database contains no value for an element, use the default value (see
:meth:`simu.get_user_data`).
Parameters
----------
databases : dict
Databases in input configuration.
db_register : dict
Molar volume databases in 'user_data.toml'.
comps : list of str
System components.
default:
Default value.
logger : :class:`log_utils.CustomLogger`
Logger.
Raises
------
:class:`utils.UserInputError`
If database entry is formatted incorrectly.
Returns
-------
res : dict
``{k: V_k for k in comps}``
"""
name, dct = get_database('partial_molar_volume', databases, db_register)
res = {}
sorted_keys = comps + ['pore']
for k in sorted_keys:
try:
res[k] = dct[k]
except KeyError:
res[k] = default
msg = (f"Partial molar volume database '{name}' contains no data "
f"for {k}. Using system-wide default value "
f"({default} m3/mol) instead.")
if 'partial_molar_volume' in databases and k not in ['Va', 'pore']:
log_level = 30
else:
log_level = 18
logger.log_wrapper(log_level, msg)
if k in ('Va', 'pore'):
if not (isinstance(res[k], float) or res[k] == 'local'):
msg = (f"Invalid entry for species {k} in partial molar volume"
f" database '{name}' (found '{res[k]}', should be "
"a float or string 'local').")
raise ut.UserInputError(msg) from None
else:
if not isinstance(res[k], float):
msg = (f"Invalid entry for species {k} in partial molar volume"
f" database '{name}' (found '{res[k]}', should be a "
"float).")
raise ut.UserInputError(msg) from None
return res
[docs]
def get_database(key, databases, db_register):
"""
Get database name and database as a dictionary.
* If user input is a dict : return this dict
* If user input is a database name : get the database from registered
databases
* If no user input : return empty dict
Parameters
----------
key : str
Database type, either 'partial_molar_volume' or
'vacancy_formation_energy'.
databases : dict
Databases provided in input configuration.
db_register : dict
Databases registered in 'user_data.toml'.
Returns
-------
name : str
Name of database.
dct : dict
Database parameters as a dict.
Raises
------
utils.UserInputError
If database is not found in 'user_data.toml'.
"""
if key in databases:
if isinstance(databases[key], dict):
dct = databases[key]
name = None
else:
name = databases[key]
try:
dct = db_register[name]
except KeyError:
msg = (f"Unknown {key} database '{name}'. "
"Please check the '{key}' table in "
"your 'user_data.toml' file")
raise ut.UserInputError(msg) from None
else:
name = None
dct = {}
return name, dct
# =============================================================================
# Thermodynamic parameters
[docs]
def get_thermo_from_file(fpath, phase, comps, TK, logger):
"""
Get parameters needed to calculate Gibbs free energy from database file.
Data retrieved from csv or spreadsheet file.
Parameters
----------
fpath : pathlib.Path
Path of file with thermodynamic database.
phase : str
Name of metal phase.
comps : list of str
System components.
TK : float
Temperature in Kelvin.
logger : :class:`log_utils.CustomLogger`
Logger.
Raises
------
utils.UserInputError
If file is not found.
utils.UserInputError
If element not found in database.
Returns
-------
p : dict of floats
Thermodynamic parameters arranged as follows:
| ``A: G_A for A in endmembers``
| ``AB: [L0, L1] for AB in binary subsystems``
| ``ABC: [L0, L1] for ABC in ternary subsystems``
"""
if not fpath.exists():
fname = fpath.name
data_dir = fpath.parents[0]
pkg_fpath = pkg_data_dir / fname
if pkg_fpath.exists():
fpath = pkg_fpath
msg = (f"Thermodynamic database file '{fname}' not found in "
f"{data_dir}. Using file from package installation "
f"directory instead ({pkg_data_dir}).")
logger.warning(msg)
else:
msg = (f"Thermodynamic database file '{fname}' not found in "
f"{data_dir} or in package installation directory. \n"
"Please provide a thermodynamic database file.")
raise ut.UserInputError(msg) from None
if fpath.suffix == '.csv':
dct = get_thermo_from_csv(fpath, comps)
else:
dct = get_thermo_from_spreadsheet(fpath)
for key in ['Elements', 'Interactions']:
df = sanitize_dataframe(dct[key])
# Make column names lowercase to add flexibility in user database files
df = df.rename(str.lower, axis=1)
dct[key] = df
for k in comps:
if k.lower() not in dct['Elements'].columns:
msg = f"Element {k} not found in thermodynamic database '{fpath}'."
raise ut.UserInputError(msg) from None
G0 = process_elements_parameters(dct['Elements'], comps, TK, phase)
interactions = process_interaction_parameters(dct['Interactions'],
comps,
logger)
L_para = interactions['L']
L = make_L_isotherm(L_para, TK)
p = {**G0, **L}
return p
[docs]
def get_thermo_from_csv(fpath, comps):
"""
Get thermodynamic parameters from csv file.
Parameters
----------
fpath : pathlib.Path
Path of file with thermodynamic database.
comps : list of str
System components.
Returns
-------
dct : dict of pd.DataFrames
Thermodynamic parameters,
| ``'Elements': parameters related to pure elements``
| ``'Interactions' : interactions parameters``
"""
csv_string = get_csv_as_string(fpath)
parts = re.split('Elements,*|Interactions,*', csv_string)
elements = parts[1]
df_elements = pd.read_csv(io.StringIO(elements),
comment='#',
usecols=range(len(comps) + 1),
index_col=0)
interactions = parts[2]
df_interactions = pd.read_csv(io.StringIO(interactions), comment='#')
dct = {'Elements': df_elements, 'Interactions': df_interactions}
return dct
[docs]
def get_thermo_from_spreadsheet(fpath):
"""
Get thermodynamic parameters from spreadsheet file.
File in ods, xls or xlsx format. Requires an external dependency:
====== ============
format package name
====== ============
xls xlrd
xlsx openpyxl
ods odfpy
====== ============
Parameters
----------
fpath : pathlib.Path
Path of file with thermodynamic database.
Returns
-------
dct : dict of pd.DataFrames
Thermodynamic parameters,
| ``'Elements': parameters related to pure elements``
| ``'Interactions' : interactions parameters``
"""
df_elements = pd.read_excel(fpath, sheet_name='Elements', comment='#',
index_col=0)
df_interactions = pd.read_excel(fpath, sheet_name='Interactions',
comment='#')
dct = {'Elements': df_elements, 'Interactions': df_interactions}
return dct
[docs]
def process_elements_parameters(df, comps, TK, phase):
"""
Compute Gibbs free energy of endmembers at a given temperature.
Data is in the form of G - H_SER. See Dinsdale 1991 [#Dinsdale_1991]_.
Parameters
----------
df : pd.DataFrame
Parameters from :func:`get_thermo_from_csv` or
:func:`get_thermo_from_spreadsheet`.
comps : list of str
System components.
TK : float
Temperature in Kelvin.
phase : str
Name of metal phase.
Returns
-------
dict
Thermodynamic parameters,
``A: G_A for A in endmembers``
"""
dct = {ut.format_element_symbol(k): v for k, v in df.to_dict().items()}
res = {k: tfu.G0_fun(dct[k], TK, phase) for k in comps}
return res
[docs]
def process_interaction_parameters(df, comps, logger):
"""
Process interaction parameters.
The parameters belong to the following categories (variables) depending
on the quantity they are related to:
* L : Gibbs free energy
* Tc : critical temperature
* beta : magnetism
The parameters correspond to:
* binary interactions (orders 0 and 1)
* ternary interactions (order 0, with `L1 = 0` for compatibility)
They are given as:
* order 0: A and B in `L0 = a + b*T`
* order 1: C and D in `L1 = c + d*T`
If a variable is not included in the input file, all parameters are set to
0 for this variable.
Parameters
----------
df : pd.DataFrame
Parameters from :func:`get_thermo_from_csv` or
:func:`get_thermo_from_spreadsheet`.
comps : list of str
System components.
logger : :class:`log_utils.CustomLogger`
Logger.
Returns
-------
di: dict
Thermodynamic interaction parameters,
``{var: subdi for var in ['L', 'Tc', 'beta']}``
where subdi is result of :func:`unit_process_interactions`.
"""
solvents = ut.make_combinations(comps)['mix']
di = {}
for k in ['L', 'Tc', 'beta']:
sub_df = df.loc[df['variable'] == k]
if sub_df.empty:
di[k] = {k: 0 for k in solvents}
else:
sub_df = sub_df.set_index('solvent')
di[k] = unit_process_interactions(sub_df, solvents, logger)
return di
[docs]
def unit_process_interactions(df, solvents, logger):
"""
Process dataframe with thermodynamic interaction parameters.
Apply sanitary checks and convert from dataframe to dict. For each
subsystem in solvents, operation depends on number of matching keys in the
dataframe:
* if 0, set all interaction parameters to 0
* if 1, get interaction parameters
* if more than 1, raise exception.
Parameters
----------
df : dataframe
Thermodynamic interaction parameters for one variable. Columns:
* variable : either of L, Tc or beta (see
:func:`process_interaction_parameters`).
* solvent : constituents of subsystem concatenated to string.
* a, b, c, d : interaction parameters, with
* order 0 = a + b*T
* order 1 = c + d*T
fpath : pathlib.Path
Path of file with thermodynamic database.
solvents : list of str
Binary and ternary subsystems, concatenated to strings.
logger : :class:`log_utils.CustomLogger`
Logger.
Raises
------
utils.UserInputError
If several equivalent subsystems (ie permutations of the same
subsystem) are present in the database.
Returns
-------
di_reduced : dict of dicts
Thermodynamic interaction parameters,
``{k: {letter: val for letter in 'abcd'} for k in solvents}``.
"""
variable = df['variable'].values[0]
for k in solvents:
possible = ut.make_permutations_samesize(k)
kfile_list = [s for s in df.index if s in possible]
if len(kfile_list) == 0:
df.loc[k] = [variable] + [0 for letter in 'abcd']
# Interactions parameters for Tc and beta are not always available,
# and are not used in Noda -> do not issue any warning for these
# variables.
if variable not in ['Tc', 'beta']:
msg = f"{k} interaction parameters for variable '{variable}' "
msg += "are missing. Using 0 as default."
logger.warning(msg)
elif len(kfile_list) == 1:
# In binary subsystems, if the 2 elements are reversed, the order 1
# parameters must be changed because
# G = ... + x_i*x_j*(L0_ij + L1_ij*(x_i - x_j)) + ...
# with L1_ij = c + d*T
# In ternary subsystems, order 1 parameters are 0 for the moment
# Introducing order 1 parameters would be more complex -> order of
# elements does not matter
kfile = kfile_list[0]
if k != kfile:
df = df.rename(index={kfile: k})
if len(possible) == 2: # binary subsystems
df.loc[k, 'c'] *= -1
df.loc[k, 'd'] *= -1
else:
msg = (f"{k} interaction parameters for {variable}\n"
f"Several equivalent solvents given: {kfile_list}")
raise ut.UserInputError(msg) from None
di = df.to_dict(orient='index')
di_reduced = {k: di[k] for k in solvents}
return di_reduced
[docs]
def make_L_isotherm(L, T):
"""
Evaluate interaction parameters at given temperature.
Parameters
----------
L : dict
Interaction parameters,
``{k: {letter: val for letter in 'abcd'} for k in solvents}``.
T : float or int
Temperature in Kelvin.
Returns
-------
res : dict of lists
Interaction parameters, ``{k: [L0, L1] for k in solvents}``.
"""
res = {k: [L[k]['a'] + L[k]['b']*T,
L[k]['c'] + L[k]['d']*T]
for k in L}
return res
# =============================================================================
# Mobility parameters
[docs]
def get_mob_from_file(fpath, comps, TK, logger):
"""
Get mobility parameters from input file.
File formats currently supported:
* xls, xlsx or ods: database from literature
* json: database from OPTIMOB.
Parameters
----------
fpath : pathlib.Path
Path of file with mobility database.
comps : list of str
System components.
TK : float
Temperature in Kelvin.
logger : :class:`log_utils.CustomLogger`
Logger.
Raises
------
Exception
If file is not found or file format not accepted.
Returns
-------
p : dict of dicts
``{i: subdict for i in comps}``
subdict: ``{j: val for j in subsystems}``.
"""
if not fpath.exists():
fname = fpath.name
data_dir = fpath.parents[0]
pkg_fpath = pkg_data_dir / fname
if pkg_fpath.exists():
fpath = pkg_fpath
msg = (f"Mobility database file '{fname}' not found in "
f"{data_dir}. Using file from package installation "
f"directory instead ({pkg_data_dir}).")
logger.warning(msg)
else:
msg = (f"Mobility database file '{fname}' not found in "
f"{data_dir} or in package installation directory. \n"
"Please provide a mobility database file.")
raise ut.UserInputError(msg) from None
ext = fpath.suffix
if ext in ['.xls', '.xlsx', '.ods', '.csv']:
p = get_mob_from_spreadsheet(fpath, comps, TK)
elif ext == '.json':
p = get_mob_from_json(fpath, comps, TK)
else:
msg = f'Input file format (.{ext}) not accepted.'
raise ut.UserInputError(msg) from None
return p
[docs]
def get_mob_from_spreadsheet(fpath, comps, TK):
"""
Get mobility parameters.
Data retrieved from csv or spreadsheet file. The latter requires an
external dependency:
====== ============
format package name
====== ============
xls xlrd
xlsx openpyxl
ods odfpy
====== ============
Parameters
----------
fpath : pathlib.Path
Path of file with mobility database.
comps : list of str
System components.
TK : float
Temperature in Kelvin.
Returns
-------
p : dict of dicts
``{i: subdict for i in comps}``
subdict: ``{j: L0 for j in subsystems}``
"""
if fpath.suffix == '.csv':
csv_string = get_csv_as_string(fpath)
df = pd.read_csv(io.StringIO(csv_string), comment='#')
else:
df = pd.read_excel(fpath, comment='#')
df = sanitize_dataframe(df)
df.solute = df.solute.apply(ut.format_element_symbol)
# Make column names lowercase to add flexibility in user database files
# Use dropna first to avoid TypeError (nan due to empty column in database)
df = df.dropna(how='all', axis='columns').rename(str.lower, axis="columns")
solvents = ut.make_combinations(comps)['all']
p = {}
for i in comps:
p[i] = {}
for j in solvents:
redf = get_reduced_df(df, j, i)
a = redf['a'].values[0]
b = redf['b'].values[0]
p[i][j] = a + b*TK
return p
[docs]
def get_mob_from_json(fpath, comps, TK):
"""
Get mobility parameters from json file.
Parameters
----------
fpath : pathlib.Path
Path of file with mobility database.
comps : list of str
System components.
TK : float
Temperature in Kelvin.
Returns
-------
p : dict of dicts
``{i: subdict for i in comps}``
subdict: ``{j: L0 for j in subsystems}``
"""
# pylint: disable=too-many-locals
with open(fpath, 'r', encoding='utf-8') as f:
di_full = json.load(f)
# Check that temperature is correct
TC_file_list = []
for k in di_full:
if 'exp ' in k:
di = di_full[k]
TC_file_list.append(di_full[k]['TC'])
TC_file = TC_file_list[0]
assert all(TC == TC_file for TC in TC_file_list)
TC = TK - 273.15
if TC != round(TC_file, 0):
msg = f'Optimized parameters are valid at {TC_file} *C, '
msg += f'not compatible with simulation at {TC} *C.'
raise ut.UserInputError(msg) from None
# Get nested dicts with parameters
di = di_full["popt"]
# Keep values for required solute-solvent combinations
solvents = ut.make_combinations(comps)['all']
p = {}
for i in comps:
p[i] = {}
for j in solvents:
p[i][j] = di[i][j]['value']
return p
[docs]
def get_reduced_df(df, solvent, solute):
"""
Filter dataframe to keep mobility data for solute in solvent.
Parameters
----------
df : dataframe
Parameters to compute mobility of solutes in solvents.
solvent : str
Solvent of interest (constituents concatenated to string).
solute : str
Solute of interest.
Raises
------
Exception
If several equivalent solvents (ie permutations of the same solvent)
are present in the df.
Returns
-------
res : dataframe
Reduced dataframe.
"""
possible_solvents = ut.make_permutations_samesize(solvent)
possible_lower_key = [k.lower() for k in possible_solvents]
res = df[(df.solute == solute)
& (df.solvent.apply(str.lower).isin(possible_lower_key))]
if len(res.index) == 0:
msg = f"Mobility parameters for {solute} in {solvent} are missing. "
raise ut.UserInputError(msg) from None
if len(res.index) > 1:
msg = (f"Mobility parameters for {solute} in {solvent}. "
f"Several equivalent solvents given: {res.solvent.values}")
raise ut.UserInputError(msg) from None
return res
[docs]
def get_csv_as_string(fpath):
"""Load csv file content, sanitize and return string."""
with open(fpath, 'r', encoding='utf-8') as file:
raw_lines = file.readlines()
# Convert ; to , before loading with pd.read_csv.
# Avoid sep=',|;' which triggers engine='Python' and different line
# counting behavior.
csv = [line.replace(';', ',') for line in raw_lines]
# Get rid of empty lines
csv = [line for line in csv if not all(c == ',' for c in line.strip())]
# Get rif of quote before # (can be inserted when converting to csv)
csv = [line.replace('"#', '#') for line in csv]
csv = ''.join(csv)
return csv
[docs]
def sanitize_dataframe(df):
"""
Get rid of empty lines and columns and rebuild if first line was empty
pd.read_excel use first line as header even if it is empty (then columns
are named 'Unnamed N'). Delete empty lines first to make sure the line used
as header when rebuilding is not empty.
Deleting empty columns is needed to apply str.lower on column names (see
:func:`get_thermo_from_file`).
"""
df = df.dropna(how='all', axis=0).dropna(how='all', axis=1)
if all(x.startswith('Unnamed') for x in df.columns):
df = pd.DataFrame(df.values[1:], columns=df.iloc[0], index=df.index[1:])
return df