diff --git a/emmet-cli/emmet/cli/db.py b/emmet-cli/emmet/cli/db.py new file mode 100644 index 0000000000..49479950ee --- /dev/null +++ b/emmet-cli/emmet/cli/db.py @@ -0,0 +1,310 @@ +""" Instantiate database objects for emmet cli. """ +from __future__ import annotations +from bson import ObjectId +import json +import logging +from maggma.core import Store +from maggma.stores import GridFSStore, MongoStore, MongoURIStore, S3Store +from monty.json import jsanitize, MontyDecoder, MontyEncoder +from pymongo import ReturnDocument +from typing import Literal, TYPE_CHECKING, Union, Optional +import zlib + +from emmet.core.utils import utcnow + +if TYPE_CHECKING: + from emmet.core.tasks import TaskDoc + from typing import Any + +logger = logging.getLogger("emmet") + + +class TaskStore: + _get_store_from_type: dict[str, Store] = { + "mongo": MongoStore, + "s3": S3Store, + "gridfs": GridFSStore, + "mongo_uri": MongoURIStore, + } + + _object_names: tuple[str, ...] = ( + "dos", + "bandstructure", + "chgcar", + "locpot", + "aeccar0", + "aeccar1", + "aeccar2", + "elfcar", + ) + + def __init__( + self, + store_kwargs: dict, + store_type: Optional[Literal["mongo", "s3", "gridfs", "mongo_uri"]] = None, + ) -> None: + self._store_kwargs = store_kwargs + self._store_type = store_type + + if all( + store_kwargs.get(k) + for k in ( + "@module", + "@class", + ) + ): + self.store = MontyDecoder().process_decoded(store_kwargs) + + elif store_type and self._get_store_from_type.get(store_type): + store = self._get_store_from_type[store_type] + store_kwargs = { + k: v + for k, v in store_kwargs.items() + if k + in Store.__init__.__code__.co_varnames + + store.__init__.__code__.co_varnames + } + self.store = store(**store_kwargs) + else: + raise ValueError("TaskStore cannot construct desired store!") + + self.store.connect() + self.db = self.store._coll + self.collection = self.db[store_kwargs.get("collection")] + + self.large_data_store = None + if isinstance(self.store, (MongoStore, MongoURIStore)): + gridfs_store_kwargs = store_kwargs.copy() + gridfs_store_kwargs["collection_name"] = gridfs_store_kwargs.get( + "gridfs_collection", gridfs_store_kwargs["collection_name"] + ) + self.large_data_store = GridFSStore(**gridfs_store_kwargs) + + elif isinstance(self.store, S3Store): + self.large_data_store = self.store + + if self.large_data_store: + self.large_data_store.connect() + self.large_data_db = self.large_data_store._coll + + @classmethod + def from_db_file(cls, db_file) -> TaskStore: + from monty.serialization import loadfn + + store_kwargs = loadfn(db_file, cls=None) + if store_kwargs.get("collection") and not store_kwargs.get("collection_name"): + store_kwargs["collection_name"] = store_kwargs["collection"] + + store_kwargs.pop("aliases", None) + + if not all(store_kwargs.get(key) for key in ("username", "password")): + for mode in ("admin", "readonly"): + if all( + store_kwargs.get(f"{mode}_{key}") for key in ("user", "password") + ): + store_kwargs["username"] = store_kwargs[f"{mode}_user"] + store_kwargs["password"] = store_kwargs[f"{mode}_password"] + break + + return cls(store_kwargs, store_type="mongo") + + def insert(self, dct: dict, update_duplicates: bool = True) -> Union[str | None]: + """ + Insert the task document to the database collection. + + Args: + dct (dict): task document + update_duplicates (bool): whether to update the duplicates + """ + + result = self.collection.find_one( + {"dir_name": dct["dir_name"]}, ["dir_name", "task_id"] + ) + if result is None or update_duplicates: + dct["last_updated"] = utcnow() + if result is None: + logger.info("No duplicate!") + if ("task_id" not in dct) or (not dct["task_id"]): + dct["task_id"] = self.db.counter.find_one_and_update( + {"_id": "taskid"}, + {"$inc": {"c": 1}}, + return_document=ReturnDocument.AFTER, + )["c"] + logger.info( + f"Inserting {dct['dir_name']} with taskid = {dct['task_id']}" + ) + elif update_duplicates: + dct["task_id"] = result["task_id"] + logger.info( + f"Updating {dct['dir_name']} with taskid = {dct['task_id']}" + ) + dct = jsanitize(dct, allow_bson=True) + self.collection.update_one( + {"dir_name": dct["dir_name"]}, {"$set": dct}, upsert=True + ) + return dct["task_id"] + + else: + logger.info(f"Skipping duplicate {dct['dir_name']}") + + def insert_task(self, task_doc: TaskDoc) -> int: + """ + Inserts a TaskDoc into the database. + Handles putting DOS, band structure and charge density into GridFS as needed. + During testing, a percentage of runs on some clusters had corrupted AECCAR files + when even if everything else about the calculation looked OK. + So we do a quick check here and only record the AECCARs if they are valid + + Args: + task_doc (dict): the task document + Returns: + (int) - task_id of inserted document + """ + + big_data_to_store = {} + + def extract_from_calcs_reversed(obj_key: str) -> Any: + """ + Grab the data from calcs_reversed.0.obj_key and store on gridfs directly or some Maggma store + Args: + obj_key: Key of the data in calcs_reversed.0 to store + """ + calcs_r_data = task_doc["calcs_reversed"][0][obj_key] + + # remove the big object from all calcs_reversed + # this can catch situations where the drone added the data to more than one calc. + for i_calcs in range(len(task_doc["calcs_reversed"])): + if obj_key in task_doc["calcs_reversed"][i_calcs]: + del task_doc["calcs_reversed"][i_calcs][obj_key] + return calcs_r_data + + # drop the data from the task_document and keep them in a separate dictionary (big_data_to_store) + if self.large_data_store and task_doc.get("calcs_reversed"): + for data_key in self._object_names: + if data_key in task_doc["calcs_reversed"][0]: + big_data_to_store[data_key] = extract_from_calcs_reversed(data_key) + + # insert the task document + t_id = self.insert(task_doc) + + if "calcs_reversed" in task_doc: + # upload the data to a particular location and store the reference to that location in the task database + for data_key, data_val in big_data_to_store.items(): + fs_di_, compression_type_ = self.insert_object( + dct=data_val, + collection=f"{data_key}_fs", + task_id=t_id, + ) + self.collection.update_one( + {"task_id": t_id}, + { + "$set": { + f"calcs_reversed.0.{data_key}_compression": compression_type_ + } + }, + ) + self.collection.update_one( + {"task_id": t_id}, + {"$set": {f"calcs_reversed.0.{data_key}_fs_id": fs_di_}}, + ) + return t_id + + def insert_object(self, *args, **kwargs) -> tuple[int, str]: + """Insert the object into big object storage, try maggma_store if + it is available, if not try storing directly to girdfs. + + Returns: + fs_id: The id of the stored object + compression_type: The compress method of the stored object + """ + if isinstance(self.large_data_store, GridFSStore): + return self.insert_gridfs(*args, **kwargs) + else: + return self.insert_maggma_store(*args, **kwargs) + + def insert_gridfs( + self, + dct: dict, + compression_type: Optional[Literal["zlib"]] = "zlib", + oid: Optional[ObjectId] = None, + task_id: Optional[Union[int, str]] = None, + ) -> tuple[int, str]: + """ + Insert the given document into GridFS. + + Args: + dct (dict): the document + collection (string): the GridFS collection name + compression_type (str = Literal["zlib"]or None) : Whether to compress the data using a known compressor + oid (ObjectId()): the _id of the file; if specified, it must not already exist in GridFS + task_id(int or str): the task_id to store into the gridfs metadata + Returns: + file id, the type of compression used. + """ + oid = oid or ObjectId() + if isinstance(oid, ObjectId): + oid = str(oid) + + # always perform the string conversion when inserting directly to gridfs + dct = json.dumps(dct, cls=MontyEncoder) + if compression_type == "zlib": + d = zlib.compress(dct.encode()) + + metadata = {"compression": compression_type} + if task_id: + metadata["task_id"] = task_id + # Putting task id in the metadata subdocument as per mongo specs: + # https://github.com/mongodb/specifications/blob/master/source/gridfs/gridfs-spec.rst#terms + fs_id = self.large_data_db.put(d, _id=oid, metadata=metadata) + + return fs_id, compression_type + + def insert_maggma_store( + self, + dct: Any, + collection: str, + oid: Optional[Union[str, ObjectId]] = None, + task_id: Optional[Any] = None, + ) -> tuple[int, str]: + """ + Insert the given document into a Maggma store. + + Args: + data: the document to be stored + collection (string): the name prefix for the maggma store + oid (str, ObjectId, None): the _id of the file; if specified, it must not already exist in GridFS + task_id(int or str): the task_id to store into the gridfs metadata + Returns: + file id, the type of compression used. + """ + oid = oid or ObjectId() + if isinstance(oid, ObjectId): + oid = str(oid) + + compression_type = None + + doc = { + "fs_id": oid, + "maggma_store_type": self.get_store(collection).__class__.__name__, + "compression": compression_type, + "data": dct, + } + + search_keys = [ + "fs_id", + ] + + if task_id is not None: + search_keys.append("task_id") + doc["task_id"] = str(task_id) + elif isinstance(dct, dict) and "task_id" in dct: + search_keys.append("task_id") + doc["task_id"] = str(dct["task_id"]) + + if getattr(self.large_data_store, "compression", False): + compression_type = "zlib" + doc["compression"] = "zlib" + + self.store.update([doc], search_keys) + + return oid, compression_type diff --git a/emmet-cli/emmet/cli/utils.py b/emmet-cli/emmet/cli/utils.py index 973e6251f0..afad36ab9c 100644 --- a/emmet-cli/emmet/cli/utils.py +++ b/emmet-cli/emmet/cli/utils.py @@ -5,7 +5,6 @@ import shutil import stat from collections import defaultdict -from datetime import datetime from enum import Enum from fnmatch import fnmatch from glob import glob @@ -14,16 +13,16 @@ import click import mgzip from botocore.exceptions import EndpointConnectionError -from atomate.vasp.database import VaspCalcDb -from atomate.vasp.drones import VaspDrone from dotty_dict import dotty from fireworks.fw_config import FW_BLOCK_FORMAT from mongogrant.client import Client from pymatgen.core import Structure from pymatgen.util.provenance import StructureNL from pymongo.errors import DocumentTooLarge -from emmet.core.vasp.task_valid import TaskDocument +from emmet.core.tasks import TaskDoc +from emmet.core.utils import utcnow from emmet.core.vasp.validation import ValidationDoc +from emmet.cli.db import TaskStore from pymatgen.entries.compatibility import MaterialsProject2020Compatibility from emmet.cli import SETTINGS @@ -65,7 +64,7 @@ def ensure_indexes(indexes, colls): def calcdb_from_mgrant(spec_or_dbfile): if os.path.exists(spec_or_dbfile): - return VaspCalcDb.from_db_file(spec_or_dbfile) + return TaskStore.from_db_file(spec_or_dbfile) client = Client() role = "rw" # NOTE need write access to source to ensure indexes @@ -73,14 +72,16 @@ def calcdb_from_mgrant(spec_or_dbfile): auth = client.get_auth(host, dbname_or_alias, role) if auth is None: raise Exception("No valid auth credentials available!") - return VaspCalcDb( - auth["host"], - 27017, - auth["db"], - "tasks", - auth["username"], - auth["password"], - authSource=auth["db"], + return TaskStore( + store_kwargs={ + "host": auth["host"], + "port": 27017, + "database": auth["db"], + "collection": "tasks", + "user": auth["username"], + "password": auth["password"], + "authSource": auth["db"], + } ) @@ -149,7 +150,7 @@ def get_subdir(dn): def get_timestamp_dir(prefix="launcher"): - time_now = datetime.utcnow().strftime(FW_BLOCK_FORMAT) + time_now = utcnow().strftime(FW_BLOCK_FORMAT) return "_".join([prefix, time_now]) @@ -382,11 +383,7 @@ def parse_vasp_dirs(vaspdirs, tag, task_ids, snl_metas): # noqa: C901 projection = {"tags": 1, "task_id": 1} # projection = {"tags": 1, "task_id": 1, "calcs_reversed": 1} count = 0 - drone = VaspDrone( - additional_fields={"tags": tags}, - store_volumetric_data=ctx.params["store_volumetric_data"], - runs=ctx.params["runs"], - ) + # fs_keys = ["bandstructure", "dos", "chgcar", "locpot", "elfcar"] # for i in range(3): # fs_keys.append(f"aeccar{i}") @@ -413,13 +410,7 @@ def parse_vasp_dirs(vaspdirs, tag, task_ids, snl_metas): # noqa: C901 logger.warning(f"{name} {launcher} already parsed -> would remove.") continue - try: - task_doc = drone.assimilate(vaspdir) - except Exception as ex: - logger.error(f"Failed to assimilate {vaspdir}: {ex}") - continue - - task_doc["sbxn"] = sbxn + additional_fields = {"sbxn": sbxn, "tags": []} snl_metas_avail = isinstance(snl_metas, dict) task_id = ( task_ids.get(launcher) if manual_taskid else task_ids[chunk_idx][count] @@ -429,7 +420,7 @@ def parse_vasp_dirs(vaspdirs, tag, task_ids, snl_metas): # noqa: C901 logger.error(f"Unable to determine task_id for {launcher}") continue - task_doc["task_id"] = task_id + additional_fields["task_id"] = task_id logger.info(f"Using {task_id} for {launcher}.") if docs: @@ -437,17 +428,23 @@ def parse_vasp_dirs(vaspdirs, tag, task_ids, snl_metas): # noqa: C901 # (run through set to implicitly remove duplicate tags) if docs[0]["tags"]: existing_tags = list(set(docs[0]["tags"])) - task_doc["tags"] += existing_tags + additional_fields["tags"] += existing_tags logger.info(f"Adding existing tags {existing_tags} to {tags}.") try: - task_document = TaskDocument(**task_doc) - except Exception as exc: - logger.error(f"Unable to construct a valid TaskDocument: {exc}") + task_doc = TaskDoc.from_directory( + dir_name=vaspdir, + additional_fields=additional_fields, + volumetric_files=ctx.params["store_volumetric_data"], + task_names=ctx.params["runs"], + run_bader="bader_analysis.json.gz", + ) + except Exception as ex: + logger.error(f"Failed to build a TaskDoc from {vaspdir}: {ex}") continue try: - validation_doc = ValidationDoc.from_task_doc(task_document) + validation_doc = ValidationDoc.from_task_doc(task_doc) except Exception as exc: logger.error(f"Unable to construct a valid ValidationDoc: {exc}") continue @@ -461,7 +458,7 @@ def parse_vasp_dirs(vaspdirs, tag, task_ids, snl_metas): # noqa: C901 try: entry = MaterialsProject2020Compatibility().process_entry( - task_document.structure_entry + task_doc.structure_entry ) except Exception as exc: logger.error(f"Unable to apply corrections: {exc}") @@ -479,7 +476,7 @@ def parse_vasp_dirs(vaspdirs, tag, task_ids, snl_metas): # noqa: C901 if references: kwargs["references"] = references - struct = Structure.from_dict(task_doc["input"]["structure"]) + struct = task_doc.input.structure snl = StructureNL(struct, authors, **kwargs) snl_dct = snl.as_dict() snl_dct.update(get_meta_from_structure(struct)) @@ -488,7 +485,7 @@ def parse_vasp_dirs(vaspdirs, tag, task_ids, snl_metas): # noqa: C901 logger.info(f"Created SNL object for {snl_id}.") if run: - if task_doc["state"] == "successful": + if task_doc.state == "successful": if docs and no_dupe_check: # new_calc = task_doc["calcs_reversed"][0] # existing_calc = docs[0]["calcs_reversed"][0] @@ -515,12 +512,12 @@ def parse_vasp_dirs(vaspdirs, tag, task_ids, snl_metas): # noqa: C901 # return count # TODO remove try: - target.insert_task(task_doc, use_gridfs=True) + target.insert_task(task_doc.model_dump(), use_gridfs=True) except EndpointConnectionError as exc: logger.error(f"Connection failed for {task_id}: {exc}") continue except DocumentTooLarge: - output = dotty(task_doc["calcs_reversed"][0]["output"]) + output = dotty(task_doc.calcs_reversed[0].output.as_dict()) pop_keys = [ "normalmode_eigenvecs", "force_constants", diff --git a/emmet-cli/requirements.txt b/emmet-cli/requirements.txt index f00dd34997..c9d87bb0c7 100644 --- a/emmet-cli/requirements.txt +++ b/emmet-cli/requirements.txt @@ -6,7 +6,6 @@ oauth2client==4.1.3 google-api-python-client==1.8.0 bravado==10.6.0 zipstream-new==1.1.7 -atomate==0.9.4 mongogrant==0.3.1 colorama==0.4.3 mgzip==0.2.1 diff --git a/emmet-cli/setup.py b/emmet-cli/setup.py index 8c29e48d8b..3fbb677e39 100644 --- a/emmet-cli/setup.py +++ b/emmet-cli/setup.py @@ -18,7 +18,6 @@ "click", "colorama", "mongogrant", - "atomate", "mgzip", "slurmpy", "github3.py", diff --git a/emmet-core/emmet/core/vasp/calculation.py b/emmet-core/emmet/core/vasp/calculation.py index 9eeb5a7647..d76360e3b1 100644 --- a/emmet-core/emmet/core/vasp/calculation.py +++ b/emmet-core/emmet/core/vasp/calculation.py @@ -7,6 +7,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union +from monty.serialization import loadfn import numpy as np from pydantic import BaseModel, Extra, Field from pymatgen.command_line.bader_caller import bader_analysis_from_path @@ -648,7 +649,7 @@ def from_vasp_files( parse_dos: Union[str, bool] = False, parse_bandstructure: Union[str, bool] = False, average_locpot: bool = True, - run_bader: bool = False, + run_bader: Union[bool, str] = False, run_ddec6: Union[bool, str] = False, strip_bandstructure_projections: bool = False, strip_dos_projections: bool = False, @@ -701,13 +702,19 @@ def from_vasp_files( average_locpot Whether to store the average of the LOCPOT along the crystal axes. - run_bader : bool = False - Whether to run bader on the charge density. + run_bader : Union[bool,str] = False + If a bool: whether to run bader on the charge density. + If a str: the path to a file containing the output of + `pymatgen.command_line.bader_caller.bader_analysis_from_path` run_ddec6 : Union[bool , str] = False - Whether to run DDEC6 on the charge density. If a string, it's interpreted - as the path to the atomic densities directory. Can also be set via the - DDEC6_ATOMIC_DENSITIES_DIR environment variable. The files are available at - https://sourceforge.net/projects/ddec/files. + If a bool: whether to run DDEC6 on the charge density. + If a string, it's interpreted as either + (1) the path to the atomic densities directory. + Can also be set via the DDEC6_ATOMIC_DENSITIES_DIR + environment variable. The files are available at + https://sourceforge.net/projects/ddec/files. + (2) the path to a file containing the `summary` attr of + `pymatgen.command_line.chargemol_caller.ChargemolAnalysis` strip_dos_projections Whether to strip the element and site projections from the density of states. This can help reduce the size of DOS objects in systems with many @@ -769,13 +776,22 @@ def from_vasp_files( vasp_objects[VaspObject.BANDSTRUCTURE] = bandstructure # type: ignore bader = None - if run_bader and VaspObject.CHGCAR in output_file_paths: + if isinstance(run_bader, (str, Path)) and Path(run_bader).is_file(): + # Load pre-computed bader analysis from file + bader = loadfn(str(run_bader)) + elif run_bader and VaspObject.CHGCAR in output_file_paths: suffix = "" if task_name == "standard" else f".{task_name}" bader = bader_analysis_from_path(dir_name, suffix=suffix) ddec6 = None - if run_ddec6 and VaspObject.CHGCAR in output_file_paths: - densities_path = run_ddec6 if isinstance(run_ddec6, (str, Path)) else None + if isinstance(run_ddec6, (str, Path)) and Path(run_ddec6).is_file(): + # Load pre-computed DDEC6 analysis from file + ddec6 = loadfn(str(run_ddec6)) + elif run_ddec6 and VaspObject.CHGCAR in output_file_paths: + # Compute DDEC6 analysis on the fly + densities_path = None + if isinstance(run_ddec6, (str, Path)) and Path(run_ddec6).is_dir(): + densities_path = run_ddec6 ddec6 = ChargemolAnalysis( path=dir_name, atomic_densities_path=densities_path ).summary