diff --git a/docs/concepts.md b/docs/concepts.md index fc09ba9c1..fb167a981 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -36,15 +36,3 @@ Another challenge in building complex data-transformation codes is keeping track Maggma solves this by putting the configuration with the pipeline definition in JSON or YAML files. This is done using the `MSONable` pattern, which requires that any Maggma object (the databases and transformation steps) can convert itself to a python dictionary with it's configuration parameters in a process called serialization. These dictionaries can then be converted back to the origianl Maggma object without having to know what class it belonged. `MSONable` does this by injecting in `@class` and `@module` keys that tell it where to find the original python code for that Maggma object. -## Drone -Drone is a standardized class to synchronize local files and data in your database. It breaks down the process in 4 steps: - -1. `get_items` - - Given a folder path to a data folder, read all the files, and return a dictionary - that maps each RecordKey -> List of `RecordIdentifier` -2. `should_update_records` - - Given a list of `RecordIdentifier`, it query the database return a list of `RecordIdentifier` that requires update -3. `process_item` (from `Builder`) - - Given a single `RecordIdentifier`, return the data that it refers to and add meta data -4. `update_targets` - - updates the database given a list of data diff --git a/docs/getting_started/simple_drone.md b/docs/getting_started/simple_drone.md deleted file mode 100644 index a2645a027..000000000 --- a/docs/getting_started/simple_drone.md +++ /dev/null @@ -1,99 +0,0 @@ -# Simple Drone -Let's implement a Simple Drone example. - -The simple drone will sync database with a local file structure like below. You may find sample files [here](https://github.com/materialsproject/maggma/tree/main/tests/test_files) -``` -- data - - citation-1.bibtex - - citation-2.bibtex - - citation-3.bibtex - - citation-4.bibtex - - citation-5.bibtex - - citation-6.bibtex - - text-1.txt - - text-2.txt - - text-3.txt - - text-4.txt - - text-5.txt -``` - -Notice that the pattern here for computing the key is the number between `-` and its file extension. So we can go ahead and define the `compute_record_identifier_key` function that does exactly that - - def compute_record_identifier_key(self, doc: Document) -> str: - prefix, postfix = doc.name.split(sep="-", maxsplit=1) - ID, ftype = postfix.split(sep=".", maxsplit=1) - return ID - -Notice that these files are all in one single directory, we can simply read all these files and generate a list of `Document`. -A `Document` represent a FILE, a file that contains data, not a directory. - - def generate_documents(self, folder_path: Path) -> List[Document]: - files_paths = [folder_path / f for f in os.listdir(folder_path.as_posix())] - return [Document(path=fp, name=fp.name) for fp in files_paths] - -Now we need to organize these documents, or aka to build an association. So let's define a helper function called `organize_documents` - - def organize_documents(self, documents: List[Document]) -> Dict[str, List[Document]]: - log: Dict = dict() - for doc in documents: - key = self.compute_record_identifier_key(doc) - log[key] = log.get(key, []) + [doc] - return log - -We also want to have a way to compute `RecordIdentifier` when given a list of documents, so we overwrite the `compute_record_identifier` -Please note that `RecordIdentifier` comes with a `state_hash` field. This field is used to compare against the `state_hash` in the database -so that we can efficiently know which file has changed without compare byte by byte. `RecordIdentifier` comes with a default method of -computing `state_hash` using md5sum. You may modify it or simply use it by calling `recordIdentifier.compute_state_hash()` - - def compute_record_identifier(self, record_key: str, doc_list: List[Document]) -> RecordIdentifier: - """ - Compute meta data for this list of documents, and generate a RecordIdentifier object - :param record_key: record keys that indicate a record - :param doc_list: document on disk that this record include - :return: - RecordIdentifier that represent this doc_list - """ - recordIdentifier = RecordIdentifier( - last_updated=datetime.now(), documents=doc_list, record_key=record_key - ) - recordIdentifier.state_hash = recordIdentifier.compute_state_hash() - return recordIdentifier - -At this point, we have all the necessary components to overwrite the `read` function from the base `Drone` class. -We basically generate a list of documents, organize them, and then generate a list of `RecordIdentifier` - - def read(self, path: Path) -> List[RecordIdentifier]: - documents: List[Document] = self.generate_documents(folder_path=path) - log = self.organize_documents(documents=documents) - record_identifiers = [ - self.compute_record_identifier(record_key, doc_list) - for record_key, doc_list in log.items() - ] - return record_identifiers - -Lastly, if there's a file that needs to be updated, we want to extract the data and append some meta data. In our case, this is very simple. - - def compute_data(self, recordID: RecordIdentifier) -> Dict: - record = dict() - - for document in recordID.documents: - if "citations" in document.name: - with open(document.path.as_posix(), "r") as file: - s = file.read() - record["citations"] = s - - if "text" in document.name: - with open(document.path.as_posix(), "r") as file: - s = file.read() - record["text"] = s - return record -Now, you have a working SimpleBibDrone! You can use it like this: - - mongo_store = MongoStore( - database="drone_test", collection_name="drone_test", key="record_key" - ) - simple_path = Path.cwd() / "data" - simple_bib_drone = SimpleBibDrone(store=mongo_store, path=simple_path) - - simple_bib_drone.run() -For complete code, please visit [here](https://github.com/materialsproject/maggma/tree/main/tests/builders) diff --git a/docs/reference/core_drone.md b/docs/reference/core_drone.md deleted file mode 100644 index 668021e0b..000000000 --- a/docs/reference/core_drone.md +++ /dev/null @@ -1 +0,0 @@ -::: maggma.core.drone diff --git a/mkdocs.yml b/mkdocs.yml index 71465cdde..21e5b2fbf 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -16,13 +16,11 @@ nav: - Advanced Builders: getting_started/advanced_builder.md - Working with MapBuilder: getting_started/map_builder.md - Working with GroupBuilder: getting_started/group_builder.md - - Writing a Drone: getting_started/simple_drone.md - Reference: Core: Store: reference/core_store.md Builder: reference/core_builder.md Validator: reference/core_validator.md - Drone: reference/core_drone.md Stores: reference/stores.md Builders: reference/builders.md - Changelog: CHANGELOG.md diff --git a/src/maggma/core/drone.py b/src/maggma/core/drone.py deleted file mode 100644 index 0f0ce1049..000000000 --- a/src/maggma/core/drone.py +++ /dev/null @@ -1,223 +0,0 @@ -import hashlib -import os -from abc import abstractmethod -from datetime import datetime -from pathlib import Path, PosixPath -from typing import Dict, Iterable, List, Optional - -from pydantic import BaseModel, Field - -from maggma.core import Builder - - -class Document(BaseModel): - """ - Represent a file - """ - - path: PosixPath = Field(..., title="Path of this file") - name: str = Field(..., title="File name") - - -class RecordIdentifier(BaseModel): - """ - The meta data for a record - """ - - last_updated: datetime = Field( - ..., title="The time in which this record is last updated" - ) - documents: List[Document] = Field( - [], title="List of documents this RecordIdentifier indicate" - ) - record_key: str = Field( - ..., - title="Hash that uniquely define this record, can be inferred from each document inside", - ) - state_hash: Optional[str] = Field( - None, title="Hash of the state of the documents in this Record" - ) - - @property - def parent_directory(self) -> Path: - """ - root most directory that documnents in this record share - :return: - """ - paths = [doc.path.as_posix() for doc in self.documents] - parent_path = Path(os.path.commonprefix(paths)) - if not parent_path.is_dir(): - return parent_path.parent - - return parent_path - - def compute_state_hash(self) -> str: - """ - compute the hash of the state of the documents in this record - :param doc_list: list of documents - :return: - hash of the list of documents passed in - """ - digest = hashlib.md5() - for doc in self.documents: - digest.update(doc.name.encode()) - with open(doc.path.as_posix(), "rb") as file: - buf = file.read() - digest.update(buf) - return str(digest.hexdigest()) - - -class Drone(Builder): - """ - An abstract drone that handles operations with database, using Builder to support multi-threading - User have to implement all abstract methods to specify the data that they want to use to interact with this class - - Note: All embarrassingly parallel function should be in process_items - Note: The steps of a Drone can be divided into roughly 5 steps - - For sample usage, please see /docs/getting_started/core_drone.md - and example implementation is available in tests/builders/test_simple_bibdrone.py - """ - - def __init__(self, store, path: Path): - self.store = store - self.path = path - super().__init__(sources=[], targets=store) - - @abstractmethod - def compute_record_identifier_key(self, doc: Document) -> str: - """ - Compute the RecordIdentifier key that this document correspond to - - Args: - doc: document which the record identifier key will be inferred from - - Returns: - RecordIdentifiierKey - """ - raise NotImplementedError - - @abstractmethod - def read(self, path: Path) -> List[RecordIdentifier]: - """ - Given a folder path to a data folder, read all the files, and return a dictionary - that maps each RecordKey -> [RecordIdentifier] - - ** Note: require user to implement the function computeRecordIdentifierKey - - Args: - path: Path object that indicate a path to a data folder - - Returns: - List of Record Identifiers - """ - pass - - @abstractmethod - def compute_data(self, recordID: RecordIdentifier) -> Dict: - """ - User can specify what raw data they want to save from the Documents that this recordID refers to - - Args: - recordID: recordID that needs to be re-saved - - Returns: - Dictionary of NAME_OF_DATA -> DATA - ex: - for a recordID refering to 1, - { - "citation": cite.bibtex , - "text": data.txt - } - """ - pass - - def should_update_records( - self, record_identifiers: List[RecordIdentifier] - ) -> List[RecordIdentifier]: - """ - Batch query database by computing all the keys and send them at once - - Args: - record_identifiers: all the record_identifiers that need to fetch from database - - Returns: - List of recordIdentifiers representing data that needs to be updated - """ - cursor = self.store.query( - criteria={ - "record_key": {"$in": [r.record_key for r in record_identifiers]} - }, - properties=["record_key", "state_hash", "last_updated"], - ) - - not_exists = object() - db_record_log = {doc["record_key"]: doc["state_hash"] for doc in cursor} - to_update_list = [ - recordID.state_hash != db_record_log.get(recordID.record_key, not_exists) - for recordID in record_identifiers - ] - return [ - recordID - for recordID, to_update in zip(record_identifiers, to_update_list) - if to_update - ] - - def assimilate(self, path: Path) -> List[RecordIdentifier]: - """ - Function mainly for debugging purpose. It will - 1. read file in the path specified - 2. convert them into recordIdentifier - 3. return the converted recordIdentifiers - Args: - path: path in which files are read - - Returns: - list of record Identifiers - """ - record_identifiers: List[RecordIdentifier] = self.read(path=path) - return record_identifiers - - def get_items(self) -> Iterable: - """ - Read from the path that was given, compare against database files to find recordIDs that needs to be updated - - Returns: - RecordIdentifiers that needs to be updated - """ - self.logger.debug( - "Starting get_items in {} Builder".format(self.__class__.__name__) - ) - record_identifiers: List[RecordIdentifier] = self.read(path=self.path) - records_to_update = self.should_update_records(record_identifiers) - return records_to_update - - def update_targets(self, items: List): - """ - Receive a list of items to update, update the items - - Assume that each item are in the correct format - Args: - items: List of items to update - - Returns: - None - """ - if len(items) > 0: - self.logger.debug("Updating {} items".format(len(items))) - self.store.update(items) - else: - self.logger.debug("There are no items to update") - - def process_item(self, item: RecordIdentifier) -> Dict: # type: ignore - """ - compute the item to update - - Args: - item: item to update - - Returns: - result from expanding the item - """ - - return {**self.compute_data(recordID=item), **item.dict()} diff --git a/tests/builders/simple_bib_drone.py b/tests/builders/simple_bib_drone.py deleted file mode 100644 index 45e067787..000000000 --- a/tests/builders/simple_bib_drone.py +++ /dev/null @@ -1,125 +0,0 @@ -import os -from datetime import datetime -from pathlib import Path -from typing import Dict, List - -from maggma.core.drone import Document, Drone, RecordIdentifier - - -class SimpleBibDrone(Drone): - """ - Example implementation of Drone assuming that all data files are located in one folder and that their - file names indicate association - Ex: - - data - - example - - citation-1.bibtex - - citation-2.bibtex - - text-1.bibtex - ... - """ - - def __init__(self, store, path): - super().__init__(store=store, path=path) - - def compute_record_identifier( - self, record_key: str, doc_list: List[Document] - ) -> RecordIdentifier: - """ - Compute meta data for this list of documents, and generate a RecordIdentifier object - :param record_key: record keys that indicate a record - :param doc_list: document on disk that this record include - :return: - RecordIdentifier that represent this doc_list - """ - recordIdentifier = RecordIdentifier( - last_updated=datetime.now(), documents=doc_list, record_key=record_key - ) - recordIdentifier.state_hash = recordIdentifier.compute_state_hash() - return recordIdentifier - - def generate_documents(self, folder_path: Path) -> List[Document]: - """ - Generate documents by going over the current directory: - Note: Assumes that there's no folder in the current directory - :param folder_path: - :return: - """ - files_paths = [folder_path / f for f in os.listdir(folder_path.as_posix())] - return [Document(path=fp, name=fp.name) for fp in files_paths] - - def read(self, path: Path) -> List[RecordIdentifier]: - """ - Given a folder path to a data folder, read all the files, and return a dictionary - that maps each RecordKey -> [File Paths] - - ** Note: require user to implement the function computeRecordIdentifierKey - - :param path: Path object that indicate a path to a data folder - :return: - - """ - documents: List[Document] = self.generate_documents(folder_path=path) - log = self.organize_documents(documents=documents) - record_identifiers = [ - self.compute_record_identifier(record_key, doc_list) - for record_key, doc_list in log.items() - ] - return record_identifiers - - def compute_data(self, recordID: RecordIdentifier) -> Dict: - """ - return the mapping of NAME_OF_DATA -> DATA - - :param recordID: recordID that needs to be re-saved - :return: - Dictionary of NAME_OF_DATA -> DATA - ex: - for a recordID refering to 1, - { - "citation": cite.bibtex , - "text": data.txt - } - """ - record = dict() - - for document in recordID.documents: - if "citations" in document.name: - with open(document.path.as_posix(), "r") as file: - s = file.read() - record["citations"] = s - - if "text" in document.name: - with open(document.path.as_posix(), "r") as file: - s = file.read() - record["text"] = s - return record - - def compute_record_identifier_key(self, doc: Document) -> str: - """ - Compute the recordIdentifier key by interpreting the name - :param doc: - :return: - """ - prefix, postfix = doc.name.split(sep="-", maxsplit=1) - ID, ftype = postfix.split(sep=".", maxsplit=1) - return ID - - def organize_documents( - self, documents: List[Document] - ) -> Dict[str, List[Document]]: - """ - a dictionary that maps RecordIdentifierKey -> [File Paths] - ex: - 1 -> [cite.bibtex(Represented in Document), data.txt(Represented in Document)] - 2 -> [cite2.bibtex(Represented in Document), text-2.txt(Represented in Document)] - 3 -> [citations-3.bibtex(Represented in Document), ] - ... - :param documents: - :return: - """ - log: Dict = dict() - for doc in documents: - key = self.compute_record_identifier_key(doc) - log[key] = log.get(key, []) + [doc] - return log diff --git a/tests/builders/test_simple_bib_drone.py b/tests/builders/test_simple_bib_drone.py deleted file mode 100644 index a4686cb36..000000000 --- a/tests/builders/test_simple_bib_drone.py +++ /dev/null @@ -1,138 +0,0 @@ -from datetime import datetime -from pathlib import Path - -import pytest - -from maggma.stores import MemoryStore -from maggma.stores.mongolike import MongoStore - -from .simple_bib_drone import SimpleBibDrone - - -@pytest.fixture -def init_drone(test_dir): - """ - Initialize the drone, do not initialize the connection with the database - - :return: - initialized drone - """ - mongo_store = MongoStore( - database="drone_test", collection_name="drone_test", key="record_key" - ) - simple_path = test_dir / "simple_bib_example_data" - assert simple_path.exists(), f"{simple_path} not found" - simple_bib_drone = SimpleBibDrone(store=mongo_store, path=simple_path) - return simple_bib_drone - - -def test_read(init_drone: SimpleBibDrone): - """ - Test whether read function is correct - :param init_drone: un-connected simpleBibDrone instance - :return: - None - """ - list_record_id = init_drone.read(init_drone.path) - assert len(list_record_id) == 7 - state_hashes = [r.state_hash for r in list_record_id] - assert len(state_hashes) == len(list_record_id) # all record_id has hash - assert len((set(state_hashes))) == len(state_hashes) # all unique hashes - - num_docs = sum([len(r.documents) for r in list_record_id]) - assert num_docs == 12 - - -def test_record_id(init_drone: SimpleBibDrone): - """ - Test validity of RecordIdentifier - - :param init_drone: un-connected simpleBibDrone instance - :return: - None - """ - list_record_id = init_drone.read(init_drone.path) - record0 = list_record_id[0] - assert record0.parent_directory == init_drone.path - assert record0.last_updated < datetime.now() - assert len(record0.documents) > 0 - # state hash does not change when the file is not changed - assert record0.compute_state_hash() == record0.state_hash - - -def test_process_item(init_drone: SimpleBibDrone): - """ - Test whether data is expaneded correctly and whether meta data is added - :param init_drone: un-connected simpleBibDrone instance - :return: - None - """ - list_record_id = list(init_drone.read(init_drone.path)) - text_record = next( - d for d in list_record_id if any("text" in f.name for f in d.documents) - ) - data = init_drone.process_item(text_record) - assert "citations" in data - assert "text" in data - assert "record_key" in data - assert "last_updated" in data - assert "documents" in data - assert "state_hash" in data - - -def test_compute_record_identifier_key(init_drone: SimpleBibDrone): - list_record_id = init_drone.read(init_drone.path) - record0 = list_record_id[0] - doc0 = record0.documents[0] - assert record0.record_key == init_drone.compute_record_identifier_key(doc0) - - -def test_get_items(init_drone: SimpleBibDrone): - """ - This test might take a while - test whether get_items work correctly. - It should fetch from database all the files that needs to be updated - - :param init_drone: un-connected simpleBibDrone instance - :return: - None - """ - - init_drone.connect() - init_drone.run() # make sure the database is up-to-date - init_drone.connect() - assert sum([1 for _ in init_drone.get_items()]) == 0 - init_drone.finalize() - - init_drone.connect() - init_drone.store.remove_docs(criteria={}) # clears the database - assert sum([1 for _ in init_drone.get_items()]) == 7 - init_drone.finalize() - - -def test_assimilate(init_drone: SimpleBibDrone): - """ - Test whether assimilate file is correct - :param init_drone: un-connected simpleBibDrone instance - :return: - None - """ - record_ids = init_drone.assimilate(init_drone.path) - assert len(record_ids) == 7 - - -def test_compute_data(init_drone: SimpleBibDrone): - """ - test whether data is extracted as expected - - :param init_drone: un-connected simpleBibDrone instance - :return: - None - """ - list_record_id = list(init_drone.read(init_drone.path)) - text_record = next( - d for d in list_record_id if any("text" in f.name for f in d.documents) - ) - data = init_drone.process_item(text_record) - assert "citations" in data - assert "text" in data