Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 95 additions & 19 deletions interfaces/ASE_interface/abacuslite/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import tempfile
import unittest
from pathlib import Path
from typing import Dict, Optional, List, Tuple, Set
from typing import Dict, Optional, List

import numpy as np
from ase.calculators.genericfileio import (
Expand All @@ -54,6 +54,7 @@
read_input,
read_stru,
read_kpt,
species_group_indices,
write_input,
write_stru,
write_kpt
Expand Down Expand Up @@ -142,7 +143,7 @@ def version(self) -> str:
class AbacusTemplate(CalculatorTemplate):

implemented_properties = [
'energy', 'forces', 'stress', 'free_energy', 'magmom', 'dipole'
'energy', 'forces', 'stress', 'free_energy', 'magmom'
]
_label = 'abacus'

Expand Down Expand Up @@ -184,17 +185,13 @@ def get_free_energy_keywords(self) -> Dict[str, str]:
@staticmethod
def get_magmom_keywords(self) -> Dict[str, str]:
return {'nspin': '2'}

@staticmethod
def get_dipole_keywords(self) -> Dict[str, str]:
return {'esolver_type': 'tddft', 'out_dipole': '1'}

def get_property_keywords(self,
parameters: Dict[str, str],
properties: List[str]) -> Dict[str, str]:
'''Connect the relationship between the properties calculation and
the ABACUS keywords. May be more complicated in the future, therefore
it is better to have a seperate mapping function instead of
it is better to have a separate mapping function instead of
implementing in some other functions.

Parameters
Expand All @@ -204,17 +201,30 @@ def get_property_keywords(self,
properties : list of str
The list of properties to calculate
'''
# update the parameters with the keywords for the properties
# however, one should also consider that there may be the case that
# contradictory keywords are needed. In this kind of cases,
# we should raise a ValueError
param_cache_ = {}
def keyword_compare_value(value):
if isinstance(value, bool):
Comment thread
kirk0830 marked this conversation as resolved.
return '1' if value else '0'
if isinstance(value, (list, tuple, set)):
return ' '.join(str(i) for i in value)
return str(value)

param_cache_ = {
key: keyword_compare_value(value)
for key, value in parameters.items()
if value is not None
}

def counter(param_new: Dict[str, str]) -> Dict[str, str]:
info = 'desired properties required contradictory keywords'
info = 'desired properties or explicit parameters required contradictory keywords'
staged = {}
for k, v in param_new.items():
if k in param_cache_ and param_cache_[k] != v:
if v is None:
continue
normalized_value = keyword_compare_value(v)
if k in param_cache_ and param_cache_[k] != normalized_value:
raise ValueError(f'{info}: {k}={v} (now), {param_cache_[k]} (before)')
# if it is alright, pass through
staged[k] = normalized_value
param_cache_.update(staged)
return param_new

# update the parameters with the keywords for the properties
Expand Down Expand Up @@ -260,9 +270,9 @@ def write_input(self,

# STRU
_ = file_safe_backup(directory / parameters.get('stru_file', 'STRU'))
# reorder the atoms according to the alphabet. Keep the reverse map
# group atoms by first-occurrence species order. Keep the reverse map
# so that we will recover the order in function read_results()
ind = sorted(range(len(atoms)), key=lambda i: atoms[i].symbol)
ind = species_group_indices(atoms.get_chemical_symbols())
self.atomorder = sorted(range(len(atoms)), key=lambda i: ind[i]) # revmap
# then we write
_ = write_stru(atoms[ind],
Expand Down Expand Up @@ -297,7 +307,7 @@ def write_input(self,
# array, convert to the string spaced by whitespace
for k, v in parameters.items():
# if the v is iterable, convert to the string spaced by whitespace
if isinstance(v, (List, Tuple, Set)):
if isinstance(v, (list, tuple, set)):
parameters[k] = ' '.join(str(i) for i in v)
dst = directory / self.inputname
_ = file_safe_backup(dst)
Expand Down Expand Up @@ -663,5 +673,71 @@ def test_version_number_check(self):
self.assertFalse(switch_io_backend_version('v3.11.0-beta.2'))
self.assertFalse(switch_io_backend_version('v3.11.0'))

def test_property_keywords_reject_conflicting_user_parameters(self):
template = AbacusTemplate()
with self.assertRaises(ValueError):
template.get_property_keywords({'nspin': 1}, ['magmom'])

parameters = template.get_property_keywords({'nspin': 2}, ['magmom'])
self.assertEqual(str(parameters['nspin']), '2')

def test_property_keywords_accept_equivalent_boolean_user_parameters(self):
template = AbacusTemplate()

parameters = template.get_property_keywords(
{'cal_force': True, 'cal_stress': True},
['forces', 'stress']
)

self.assertEqual(str(parameters['cal_force']), '1')
self.assertEqual(str(parameters['cal_stress']), '1')

def test_property_keywords_reject_conflicting_boolean_user_parameters(self):
template = AbacusTemplate()

with self.assertRaises(ValueError):
template.get_property_keywords({'cal_force': False}, ['forces'])

with self.assertRaises(ValueError):
template.get_property_keywords({'cal_stress': False}, ['stress'])

def test_property_keywords_treat_string_values_as_scalars(self):
template = AbacusTemplate()
template.implemented_properties = ['probe']
template.get_probe_keywords = lambda parameters: {'custom_switch': 'true'}

parameters = template.get_property_keywords(
{'custom_switch': 'true'}, ['probe']
)

self.assertEqual(parameters['custom_switch'], 'true')

def test_property_keywords_compare_iterables_like_input_writer(self):
template = AbacusTemplate()
template.implemented_properties = ['probe']
template.get_probe_keywords = lambda parameters: {
'custom_vector': [1, 'true']
}

with self.assertRaises(ValueError):
template.get_property_keywords(
{'custom_vector': [True, 'true']}, ['probe']
)

def test_property_keywords_reject_conflicting_properties(self):
template = AbacusTemplate()
template.implemented_properties = ['prop_a', 'prop_b']
template.get_prop_a_keywords = lambda parameters: {'calculation': 'scf'}
template.get_prop_b_keywords = lambda parameters: {'calculation': 'md'}

with self.assertRaises(ValueError):
template.get_property_keywords({}, ['prop_a', 'prop_b'])

def test_dipole_property_is_not_implemented(self):
template = AbacusTemplate()
self.assertNotIn('dipole', template.implemented_properties)
with self.assertRaises(AssertionError):
template.get_property_keywords({}, ['dipole'])

if __name__ == '__main__':
unittest.main()
unittest.main()
134 changes: 111 additions & 23 deletions interfaces/ASE_interface/abacuslite/io/generalio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
from ase.atoms import Atoms
from ase.build import bulk
from ase.constraints import FixAtoms, FixCartesian
from ase.data import chemical_symbols, atomic_masses
ATOM_MASS = dict(zip(chemical_symbols, atomic_masses.tolist()))

Expand Down Expand Up @@ -84,16 +85,17 @@ def file_safe_backup(fn: Path, suffix: str = 'bak'):
'''
assert isinstance(fn, Path)
where = fn.parent
prefix = f'{fn.name}.{suffix}.'
indexed_backups = []

# get the backup files
fbak = sorted(list(where.glob(f'{fn.name}.{suffix}.*')),
key=lambda p: int(p.name.split('.')[-1]))
if fbak:
# rename the elder by adding 1 to the suffix
for i, f in enumerate(fbak[::-1]): # reverse order, to avoid overwrite
j = len(fbak) - i + 1 #: STRU.bak.i -> STRU.bak.i+1
fname = f.name.replace(f'.{j}', f'.{j+1}')
f.rename(f.parent / fname)
for backup in where.glob(f'{fn.name}.{suffix}.*'):
index_text = backup.name.removeprefix(prefix)
if not re.fullmatch(r'0|[1-9][0-9]*', index_text):
continue
indexed_backups.append((int(index_text), backup))

for backup_index, backup in sorted(indexed_backups, key=lambda item: item[0], reverse=True):
backup.rename(backup.parent / f'{fn.name}.{suffix}.{backup_index + 1}')

# backup the latest file, if there is one
if fn.exists():
Expand Down Expand Up @@ -177,6 +179,29 @@ def _write_stru(job_dir, stru, fname='STRU'):

f.write('\n')

def species_group_indices(symbols: List[str]) -> List[int]:
"""Return indices grouped by first-occurrence species order."""
species_order = list(dict.fromkeys(symbols))
return [i for species in species_order for i, symbol in enumerate(symbols) if symbol == species]


def _constraint_mobility(atoms: Atoms) -> np.ndarray:
"""Return ABACUS mobility flags derived from ASE constraints."""
mobility = np.ones((len(atoms), 3), dtype=int)
for constraint in atoms.constraints:
if isinstance(constraint, FixAtoms):
mobility[constraint.get_indices()] = 0
elif isinstance(constraint, FixCartesian):
indices = np.asarray(constraint.get_indices(), dtype=int)
mask = np.asarray(constraint.mask, dtype=bool)
if mask.ndim == 1:
mobility[np.ix_(indices, np.where(mask)[0])] = 0
else:
for atom_index, atom_mask in zip(indices, mask):
mobility[atom_index, atom_mask] = 0
return mobility


def write_stru(stru: Atoms,
outdir: str,
pp_file: Optional[Dict[str, str]],
Expand Down Expand Up @@ -216,17 +241,20 @@ def write_stru(stru: Atoms,

elem = stru.get_chemical_symbols()
# ABACUS requires the atoms ranged species-by-species, therefore
# we need to sort the atoms by species
ind = np.argsort(elem)
# we need to group atoms by species. Preserve first-occurrence species
# order from ASE instead of forcing alphabetical order.
ind = species_group_indices(elem)
coords = stru.get_positions()[ind]
mobility = _constraint_mobility(stru)[ind]
elem = [elem[i] for i in ind]

# handle the atomic magnetic moment (issue #6516)
magmoms = np.array([stru[i].magmom for i in ind]).reshape(len(stru), -1) # ncol in [1, 3]
magmoms = [{} if abs(np.linalg.norm(m)) <= 1e-10
else {'mag': m[0] if len(m) == 1 else ('Cartesian', m.tolist())}
for m in magmoms]
elem_uniq, nat = np.unique(elem, return_counts=True)
elem_uniq = list(dict.fromkeys(elem))
nat = np.array([elem.count(e) for e in elem_uniq])
stru_dict = {
'coord_type': 'Cartesian',
'lat': {
Expand All @@ -244,7 +272,7 @@ def write_stru(stru: Atoms,
'atom': [
magmoms[j] | {
'coord': coords[j].tolist(), # coordinate
'm': [1, 1, 1], # mobility
'm': mobility[j].tolist(), # mobility
'v': [0.0, 0.0, 0.0], # velocity
} for j in range(np.sum(nat[:i]), np.sum(nat[:i+1]))
]
Expand Down Expand Up @@ -531,7 +559,6 @@ def _read_kline(raw: List[str]) -> Dict[str, Any]:
assert all(m for m in mymatch), \
'Invalid KPT file, expected the k-points to be in the format ' \
'"x y z n # comment"'
print(raw)
return {
'mode': 'line',
'coordinate': 'Cartesian' if raw[2].lower().endswith('cartesian') else 'Direct',
Expand Down Expand Up @@ -632,6 +659,25 @@ def test_input_io(self):
self.assertDictEqual(data, data_)
# will automatically delete the file after the context manager

def test_file_safe_backup_rotates_numbered_backups(self):
with tempfile.TemporaryDirectory() as tmpdir:
workdir = Path(tmpdir)
live = workdir / 'STRU'
live.write_text('live')
(workdir / 'STRU.bak.0').write_text('bak0')
(workdir / 'STRU.bak.1').write_text('bak1')
(workdir / 'STRU.bak.01').write_text('bak01')
(workdir / 'STRU.bak.note').write_text('note')

file_safe_backup(live)

self.assertFalse(live.exists())
self.assertEqual((workdir / 'STRU.bak.0').read_text(), 'live')
self.assertEqual((workdir / 'STRU.bak.1').read_text(), 'bak0')
self.assertEqual((workdir / 'STRU.bak.2').read_text(), 'bak1')
self.assertEqual((workdir / 'STRU.bak.01').read_text(), 'bak01')
self.assertEqual((workdir / 'STRU.bak.note').read_text(), 'note')

def test_stru_io(self):
from ase.units import Bohr, Angstrom
nacl = bulk('NaCl', 'rocksalt', a=5.64)
Expand Down Expand Up @@ -676,14 +722,56 @@ def test_stru_io(self):
self.assertEqual(a['m'], [1, 1, 1])
self.assertEqual(a['v'], [0.0, 0.0, 0.0])

self.assertEqual(stru_['species'][0]['symbol'], 'Cl')
self.assertEqual(stru_['species'][1]['symbol'], 'Na')
self.assertEqual(stru_['species'][0]['mass'], ATOM_MASS['Cl'])
self.assertEqual(stru_['species'][1]['mass'], ATOM_MASS['Na'])
self.assertEqual(stru_['species'][0]['pp_file'], 'Cl.pz-bhs.UPF')
self.assertEqual(stru_['species'][1]['pp_file'], 'Na.pz-bhs.UPF')
self.assertEqual(stru_['species'][0]['orb_file'], 'Cl_gga_6au_100Ry_2s2p1d.orb')
self.assertEqual(stru_['species'][1]['orb_file'], 'Na_gga_6au_100Ry_2s2p1d.orb')
self.assertEqual(stru_['species'][0]['symbol'], 'Na')
self.assertEqual(stru_['species'][1]['symbol'], 'Cl')
self.assertEqual(stru_['species'][0]['mass'], ATOM_MASS['Na'])
self.assertEqual(stru_['species'][1]['mass'], ATOM_MASS['Cl'])
self.assertEqual(stru_['species'][0]['pp_file'], 'Na.pz-bhs.UPF')
self.assertEqual(stru_['species'][1]['pp_file'], 'Cl.pz-bhs.UPF')
self.assertEqual(stru_['species'][0]['orb_file'], 'Na_gga_6au_100Ry_2s2p1d.orb')
self.assertEqual(stru_['species'][1]['orb_file'], 'Cl_gga_6au_100Ry_2s2p1d.orb')

def test_write_stru_preserves_first_occurrence_species_order(self):
atoms = Atoms(
symbols=['C', 'C', 'Pt', 'H', 'H'],
positions=np.zeros((5, 3)),
cell=np.eye(3),
)

with tempfile.TemporaryDirectory() as tmpdir:
write_stru(
atoms,
outdir=tmpdir,
pp_file={'C': 'C.upf', 'Pt': 'Pt.upf', 'H': 'H.upf'},
)
stru = read_stru(Path(tmpdir) / 'STRU')

self.assertEqual([s['symbol'] for s in stru['species']], ['C', 'Pt', 'H'])
self.assertEqual([s['natom'] for s in stru['species']], [2, 1, 2])

def test_write_stru_uses_ase_constraints_for_mobility(self):
atoms = Atoms(
symbols=['C', 'C', 'Pt', 'H'],
positions=np.zeros((4, 3)),
cell=np.eye(3),
)
atoms.set_constraint([
FixAtoms(indices=[0]),
FixCartesian(2, mask=[True, False, True]),
])

with tempfile.TemporaryDirectory() as tmpdir:
write_stru(
atoms,
outdir=tmpdir,
pp_file={'C': 'C.upf', 'Pt': 'Pt.upf', 'H': 'H.upf'},
)
stru = read_stru(Path(tmpdir) / 'STRU')

self.assertEqual(stru['species'][0]['atom'][0]['m'], [0, 0, 0])
self.assertEqual(stru['species'][0]['atom'][1]['m'], [1, 1, 1])
self.assertEqual(stru['species'][1]['atom'][0]['m'], [0, 1, 0])
self.assertEqual(stru['species'][2]['atom'][0]['m'], [1, 1, 1])

def test_kpt_io(self):
kpt = {
Expand Down Expand Up @@ -818,4 +906,4 @@ def test_write_stru_with_magmom(self):
self.assertEqual(stru['species'][2]['atom'][0]['mag'], ('Cartesian', [0.0, 0.0, 3.0]))

if __name__ == '__main__':
unittest.main()
unittest.main()
Loading
Loading