Skip to content

Commit

Permalink
Merge pull request #278 from materialsproject/source_loader
Browse files Browse the repository at this point in the history
Add custom source loading
  • Loading branch information
shyamd committed Sep 14, 2020
2 parents 6ba742b + f10552d commit af978d6
Show file tree
Hide file tree
Showing 17 changed files with 414 additions and 16 deletions.
40 changes: 40 additions & 0 deletions docs/getting_started/running_builders.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,46 @@ There are progress bars for each of the three steps, which lets you understand w
The `url` argument takes a fully qualified url including protocol. `tcp` is recommended:
Example: `tcp://127.0.0.1:8080`


## Running Scripts

`mrun` has the ability to run Builders defined in python scripts or in jupyter-notebooks.

The only requirements are:

1. The builder file has to be in a sub-directory from where `mrun` is called.
2. The builders you want to run are in a variable called `__builder__` or `__builders__`

`mrun` will run the whole python/jupyter file, grab the builders in these variables and adds these builders to the builder queue.

Assuming you have a builder in a python file: `my_builder.py`
``` python
class MultiplyBuilder(Builder):
"""
Simple builder that multiplies the "a" sub-document by pre-set value
"""
...
__builder__ = MultiplyBuilder(source_store,target_store,multiplier=3)
```

You can use `mrun` to run this builder and parallelize for you:
``` shell
mrun -n 2 -v my_builder.py
```


## Running multiple builders

`mrun` can run multiple builders. You can have multiple builders in a single file: `json`, `python`, or `jupyter-notebook`. Or you can chain multiple files in the order you want to run them:
``` shell
mrun -n 32 -vv my_first_builder.json builder_2_and_3.py last_builder.ipynb
```

`mrun` will then execute the builders in these files in order.


## Reporting Build State

`mrun` has the ability to report the status of the build pipeline to a user-provided `Store`. To do this, you first have to save the `Store` as a JSON or YAML file. Then you can use the `-r` option to give this to `mrun`. It will then periodicially add documents to the `Store` for one of 3 different events:
Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ nav:
- Getting Started:
- Using Stores: getting_started/stores.md
- Writing a Builder: getting_started/simple_builder.md
- Advanced Builders: getting_started/advanced_builder.md
- Running a Builder Pipeline: getting_started/running_builders.md
- Advanced Builders: getting_started/advanced_builder.md
- Working with MapBuilder: getting_started/map_builder.md
- Working with GroupBuilder: getting_started/group_builder.md
- Writing a Drone: getting_started/simple_drone.md
Expand Down
3 changes: 3 additions & 0 deletions requirements-optional.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
uvicorn==0.11.8
boto3==1.14.60
hvac==0.10.5
IPython==7.16.1
nbformat==5.0.7
regex==2020.6.8
9 changes: 9 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,12 @@ ignore_missing_imports = True

[mypy-sshtunnel.*]
ignore_missing_imports = True

[mypy-IPython.*]
ignore_missing_imports = True

[mypy-nbformat.*]
ignore_missing_imports = True

[mypy-regex.*]
ignore_missing_imports = True
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@
"sshtunnel>=0.1.5",
"msgpack-python>=0.5.6",
],
extras_require={"vault": ["hvac>=0.9.5"], "S3": ["boto3==1.14.60"]},
extras_require={
"vault": ["hvac>=0.9.5"],
"S3": ["boto3>=1.14.56"],
"notebook_runner": ["IPython>=7.16", "nbformat>=5.0", "regex>=2020.6"],
},
classifiers=[
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
Expand Down
23 changes: 17 additions & 6 deletions src/maggma/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import logging
import sys
from itertools import chain

import click
Expand All @@ -12,8 +13,11 @@
from maggma.cli.distributed import master, worker
from maggma.cli.multiprocessing import multi
from maggma.cli.serial import serial
from maggma.cli.source_loader import ScriptFinder, load_builder_from_source
from maggma.utils import ReportingHandler, TqdmLoggingHandler

sys.meta_path.append(ScriptFinder())


@click.command()
@click.argument("builders", nargs=-1, type=click.Path(exists=True))
Expand Down Expand Up @@ -56,9 +60,16 @@ def run(builders, verbosity, reporting_store, num_workers, url, num_chunks):
ch.setFormatter(formatter)
root.addHandler(ch)

builders = [loadfn(b) for b in builders]
builders = [b if isinstance(b, list) else [b] for b in builders]
builders = list(chain.from_iterable(builders))
builder_objects = []

for b in builders:
if str(b).endswith(".py") or str(b).endswith(".ipynb"):
builder_objects.append(load_builder_from_source(b))
else:
builder_objects.append(loadfn(b))

builder_objects = [b if isinstance(b, list) else [b] for b in builder_objects]
builder_objects = list(chain.from_iterable(builder_objects))

if reporting_store:
reporting_store = loadfn(reporting_store)
Expand All @@ -67,14 +78,14 @@ def run(builders, verbosity, reporting_store, num_workers, url, num_chunks):
if url:
if num_chunks > 0:
# Master
asyncio.run(master(url, builders, num_chunks))
asyncio.run(master(url, builder_objects, num_chunks))
else:
# worker
asyncio.run(worker(url, num_workers))
else:
if num_workers == 1:
for builder in builders:
for builder in builder_objects:
serial(builder)
else:
for builder in builders:
for builder in builder_objects:
asyncio.run(multi(builder, num_workers))
5 changes: 4 additions & 1 deletion src/maggma/cli/multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# coding utf-8

import multiprocessing
from asyncio import (
FIRST_COMPLETED,
BoundedSemaphore,
Expand Down Expand Up @@ -157,7 +158,9 @@ async def multi(builder, num_workers):

builder.connect()
cursor = builder.get_items()
executor = ProcessPoolExecutor(num_workers)
executor = ProcessPoolExecutor(
num_workers, mp_context=multiprocessing.get_context("spawn")
)

# Gets the total number of items to process by priming
# the cursor
Expand Down
178 changes: 178 additions & 0 deletions src/maggma/cli/source_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import importlib.util
import sys
from glob import glob
from importlib.abc import Loader, MetaPathFinder
from importlib.machinery import ModuleSpec, SourceFileLoader
from pathlib import Path
from typing import List


from maggma.core import Builder

try:
import nbformat
from IPython import get_ipython
from IPython.core.interactiveshell import InteractiveShell
from regex import match
except ModuleNotFoundError:
pass

_BASENAME = "maggma.cli.sources"


class ScriptFinder(MetaPathFinder):
"""
Special Finder designed to find custom script builders
"""

@classmethod
def find_spec(cls, fullname, path, target=None):
if not (str(fullname).startswith(f"{_BASENAME}.")):
return None

# The last module is what we want to find the path for
sub_path = str(fullname).split(".")[-1]
segments = sub_path.split("_")

file_path = next(find_matching_file(segments))

if file_path is None:
return None

return spec_from_source(file_path)


class NotebookLoader(Loader):
"""Module Loader for Jupyter Notebooks or Source Files"""

def __init__(self, name=None, path=None):

self.shell = InteractiveShell.instance()

self.name = name
self.path = path

def create_module(self, spec):
return None

def exec_module(self, module):

module.__dict__["get_ipython"] = get_ipython
module.__path__ = self.path

# load the notebook object
with open(self.path, "r", encoding="utf-8") as f:
nb = nbformat.read(f, 4)

# extra work to ensure that magics that would affect the user_ns
# actually affect the notebook module's ns
save_user_ns = self.shell.user_ns
self.shell.user_ns = module.__dict__

try:
for cell in nb.cells:
if cell.cell_type == "code":
# transform the input to executable Python
code = self.shell.input_transformer_manager.transform_cell(
cell.source
)
# run the code in themodule
exec(code, module.__dict__)
finally:
self.shell.user_ns = save_user_ns
return module


def spec_from_source(file_path: str) -> ModuleSpec:
"""
Returns a ModuleSpec from a filepath for importlib loading
Specialized for loading python source files and notebooks into
a temporary maggma cli package to run as a builder
"""
file_path_obj = Path(file_path).resolve().relative_to(Path(".").resolve())
file_path_str = str(file_path_obj)

if file_path_obj.parts[-1][-3:] == ".py":
# Gets module name from the filename without the .py extension
module_name = "_".join(file_path_obj.parts).replace(" ", "_").replace(".py", "")

spec = ModuleSpec(
name=f"{_BASENAME}.{module_name}",
loader=SourceFileLoader(
fullname=f"{_BASENAME}.{module_name}", path=file_path_str
),
origin=file_path_str,
)
# spec._set_fileattr = True
elif file_path_obj.parts[-1][-6:] == ".ipynb":
# Gets module name from the filename without the .ipnb extension
module_name = (
"_".join(file_path_obj.parts).replace(" ", "_").replace(".ipynb", "")
)

spec = ModuleSpec(
name=f"{_BASENAME}.{module_name}",
loader=NotebookLoader(
name=f"{_BASENAME}.{module_name}", path=file_path_str
),
origin=file_path_str,
)
# spec._set_fileattr = True
else:
raise Exception(
"Can't load {file_path}. Must provide a python source file such as a .py or .ipynb file"
)

return spec


def load_builder_from_source(file_path: str) -> List[Builder]:
"""
Loads Maggma Builders from a Python source file
"""
file_path = str(Path(file_path).resolve())
spec = spec_from_source(file_path)
module_object = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module_object) # type: ignore

sys.modules[spec.name] = module_object

if hasattr(module_object, "__builders__"):
return getattr(module_object, "__builders__")
elif hasattr(module_object, "__builder__"):
return getattr(module_object, "__builder__")
else:
raise Exception(
f"No __builders__ or __builder__ attribute found in {file_path}"
)


def find_matching_file(segments, curr_path="./"):
"""
Finds file that has the right sequence of segments
in the path relative to the current path
Requires all segments match the file path
"""

# If we've gotten to the end of the segment match check to see if a file exists
if len(segments) == 0:
if Path(curr_path + ".py").exists():
yield curr_path + ".py"
if Path(curr_path + ".ipynb").exists():
yield curr_path + ".ipynb"
else:
# Recurse down the segment tree some more
current_segment = segments[0]
remainder = segments[1:]

re = fr"({curr_path}[\s_]*{current_segment})"
pos_matches = [match(re, pos_path) for pos_path in glob(curr_path + "*")]
pos_matches = {pmatch.group(1) for pmatch in pos_matches if pmatch}
for new_path in pos_matches:
if Path(new_path).exists() and Path(new_path).is_dir:
for sub_match in find_matching_file(
remainder, curr_path=new_path + "/"
):
yield sub_match
for sub_match in find_matching_file(remainder, curr_path=new_path):
yield sub_match
1 change: 1 addition & 0 deletions src/maggma/cli/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""" Dummy module to allow for loading dynamic source files """
2 changes: 1 addition & 1 deletion src/maggma/stores/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"""

import threading
import warnings
import zlib
from concurrent.futures import wait
from concurrent.futures.thread import ThreadPoolExecutor
import warnings
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

import msgpack # type: ignore
Expand Down
3 changes: 1 addition & 2 deletions src/maggma/stores/gridfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import json
import zlib
from datetime import datetime
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

import gridfs
from monty.dev import deprecated
Expand All @@ -21,7 +21,6 @@
from maggma.core import Sort, Store
from maggma.stores.mongolike import MongoStore


# https://github.com/mongodb/specifications/
# blob/master/source/gridfs/gridfs-spec.rst#terms
# (Under "Files collection document")
Expand Down
10 changes: 8 additions & 2 deletions src/maggma/stores/mongolike.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,14 @@ def query(
if isinstance(properties, list):
properties = {p: 1 for p in properties}

sort_list = [(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in sort.items()] if sort else None
sort_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in sort.items()
]
if sort
else None
)

for d in self._collection.find(
filter=criteria,
Expand Down
Loading

0 comments on commit af978d6

Please sign in to comment.