diff --git a/README.md b/README.md
index fb6f2a0..f7cae51 100644
--- a/README.md
+++ b/README.md
@@ -18,12 +18,14 @@ which you can use to build the environment using `pip`:
`pip install -e .`
+There is one exception: The Argonne Data Cloud SDK is not yet installable via Pip.
+You must clone it from https://github.com/AD-SDL/adc-rdm-sdk/ (private repo) then install the package with [Poetry](https://python-poetry.org/).
+
## Running Polybot
The web service is built from three smaller services.
- Redis: Launch Redis as normal: `redis-server`
-- Web Service: Launch FastAPI using Uvicorn: `uvicorn polybot.fastapi:app`
- AI planner: Launch using the CLI: `polybot planner opt_spec.json`
### Configuring PolyBot
@@ -57,19 +59,39 @@ polybot planner --planning-class planner:BOPlanner
```
Polybot uses [Colmena](http://colmena.rtfd.org/) to express planning algorithms.
-At minimum, you will need to implement a ["Thinker"](https://colmena.readthedocs.io/en/latest/how-to.html#creating-a-thinker-application)
-that defines the logic for controlling the robot.
+Follow our guide to creating a ["Thinker"](https://colmena.readthedocs.io/en/latest/how-to.html#creating-a-thinker-application)
+to defines the logic for controlling the robot.
+`BasePlanner` is a subclass of the Colmena `BaseThinker` class.
We briefly describe a few common tasks and how to implement them.
#### Responding to Data from Robot
-The web service provides result events with the topic "robot," and you must
-define logic for how to respond to new data becoming available.
-We recommend either explicitly waiting on the results from this topic using the
-[`self.queue.get_result`](https://colmena.readthedocs.io/en/latest/how-to.html#submitting-tasks) function
-or using the [`@result_processor(topic='robot')`](https://colmena.readthedocs.io/en/latest/thinker.html#result-processing-agents)
-decorate.
-Typically, the function will end by sending a new task to the robot using the
+We access results from the robot by subscribing "study events" from the Argonne Data Cloud (ADC).
+The ADC provides a service where one can be sent messages about when a sample is created via web sockets.
+Use this feature by first setting the ID for your study as the `ADC_STUDY_ID` environment variable and
+then using the `subscribe_to_study` function provided with this library.
+
+A template "Thinker" application that uses these functions would be
+
+```python
+from colmena.thinker import agent
+
+from polybot.planning import BasePlanner
+from polybot.sample import subscribe_to_study
+from polybot.robot import send_new_sample
+
+class RobotPlanner(BasePlanner):
+
+ @agent()
+ def make_tasks(self):
+ """This function will run as a thread when you start the planner"""
+ for sample in subscribe_to_study():
+ new_sample = ... # Logic to be defined by you!
+ send_new_sample(new_sample)
+```
+
+Note how we receive a new sample from the [`subscribe_to_study`](./polybot/sample.py) function
+and then send a new task to the robot using the
[`polybot.robot.send_new_sample`](./polybot/robot.py) function.
#### Performing Computations on Remote Resources
diff --git a/environment.yml b/environment.yml
index 9ff7506..11f12bd 100644
--- a/environment.yml
+++ b/environment.yml
@@ -10,6 +10,7 @@ dependencies:
- scikit-learn
- pandas==1.*
- pydantic
+ - tqdm
- pytest
- pytest-cov
- flake8
diff --git a/example-planners/bayesian-optimization/README.md b/example-planners/bayesian-optimization/README.md
index d39d903..7199f6c 100644
--- a/example-planners/bayesian-optimization/README.md
+++ b/example-planners/bayesian-optimization/README.md
@@ -9,17 +9,14 @@ complete very quickly (~10s for 3 million inference tasks).
Launch using `polybot planner -p planner:BOPlanner -t local_compute:make_task_server opt_spec.yaml`.
-We recommend also launching the FastAPI services from this folder as well so that it uses the same environment variables.
-
## Tailoring to your system
-First, edit the `.env` file in this folder to point to the proper address for the robot and a good path for storing samples.
+First, edit the `.env` file in this folder to point to the proper address for the robot
+and the Study ID for the collection of completed samples in the Argonne Data cloud.
## Launching the Services
-1. Start up Redis
-2. Launch the HTTP service for receiving new samples. (Linux: `../../run.sh`, Windows: `..\..\run.bat`)
-3. Launch the planner using the `polybot` command listed above
+1. Start up Redis (e.g., run `redis-server` in another screen or terminal)
+2. Launch the planner using the `polybot` command listed above
-Make sure to record the IP address and port number for your HTTP service,
-which are needed to configure where the robot should send new data.
+The planner will then run until you kill it with Ctrl+C.
diff --git a/example-planners/bayesian-optimization/planner.py b/example-planners/bayesian-optimization/planner.py
index 3893997..215419a 100644
--- a/example-planners/bayesian-optimization/planner.py
+++ b/example-planners/bayesian-optimization/planner.py
@@ -8,7 +8,7 @@
import numpy as np
from colmena.models import Result
from colmena.redis.queue import ClientQueues
-from colmena.thinker import result_processor, agent
+from colmena.thinker import agent
from sklearn.feature_selection import VarianceThreshold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
@@ -16,8 +16,9 @@
from sklearn.model_selection import RepeatedKFold, cross_validate
from modAL.acquisition import EI
+from polybot.config import settings
from polybot.robot import send_new_sample
-from polybot.sample import load_samples
+from polybot.sample import load_samples, subscribe_to_study
from polybot.planning import BasePlanner, OptimizationProblem
@@ -74,9 +75,10 @@ def startup(self):
self.logger.info('Performing a cold-start')
self.perform_bo()
- @result_processor(topic='robot')
- def robot_result_handler(self, _: Result):
- self.perform_bo()
+ @agent()
+ def robot_result_handler(self):
+ for sample in subscribe_to_study():
+ self.perform_bo()
def perform_bo(self):
# Make the output directory for results
diff --git a/example-planners/bayesian-optimization/simulate-results.sh b/example-planners/bayesian-optimization/simulate-results.sh
old mode 100644
new mode 100755
index e44b357..0c67928
--- a/example-planners/bayesian-optimization/simulate-results.sh
+++ b/example-planners/bayesian-optimization/simulate-results.sh
@@ -2,7 +2,7 @@
for f in `find 706data -name "*.json"`; do
# Send data to the web service
- curl -H "Content-Type: application/json" -L --data @${f} http://127.0.0.1:8000/ingest
+ curl -H "Content-Type: application/json" -L --data @${f} http://127.0.0.1:5001/ingest
echo # So that we get newlines
sleep 15
done
diff --git a/example-planners/bayesian-optimization/test_funcx.py b/example-planners/bayesian-optimization/test_funcx.py
new file mode 100644
index 0000000..981eb48
--- /dev/null
+++ b/example-planners/bayesian-optimization/test_funcx.py
@@ -0,0 +1,141 @@
+"""Example script for running the core methods of the FuncX planner"""
+from argparse import ArgumentParser
+from pathlib import Path
+from typing import Tuple
+
+from sklearn.gaussian_process import GaussianProcessRegressor, kernels
+from sklearn.feature_selection import VarianceThreshold
+from sklearn.preprocessing import StandardScaler
+from sklearn.pipeline import Pipeline
+from modAL.acquisition import EI
+from yaml import SafeLoader
+import numpy as np
+import yaml
+
+from planner import run_inference
+from polybot.models import Sample
+from polybot.planning import OptimizationProblem
+
+
+def fit_model(opt_spec: OptimizationProblem, train_x: np.ndarray, train_y: np.ndarray) -> Pipeline:
+ """Fit and test a model using the latest data
+
+ Args:
+ opt_spec: Configuration file for the optimization
+ train_x: Input columns
+ train_y: Output column
+ out_dir: Location to store the data
+ """
+ # Create an initial RBF kernel, using the training set mean as a scaling parameter
+ kernel = train_y.mean() ** 2 * kernels.RBF(length_scale=1)
+
+ # TODO (wardlt): Make it clear where featurization would appear, as we are soon to introduce additives
+ # This will yield chemical degrees of freedom better captured using features of the additives rather
+ # than a new variable per additive
+ # Notes for now: Mol. Weight, Side Chain Length, and ... are the likely candidates
+
+ # Add a noise parameter based on user settings
+ noise = opt_spec.planner_options.get('noise_level', 0)
+ if noise < 0:
+ # Use standard deviation of the distribution of train_y will be the estimation of initial noise
+ # TODO (wardlt): Document where 3, 4, and 11 come from
+ noise_estimated = np.std(train_y) / 3
+ noise_lb = noise_estimated / 4
+ noise_ub = noise_estimated * 11
+
+ kernel_noise = kernels.WhiteKernel(noise_level=noise_estimated ** 2,
+ noise_level_bounds=(noise_lb ** 2, noise_ub ** 2))
+ kernel = kernel + kernel_noise
+ elif noise > 0:
+ kernel = kernel + kernels.WhiteKernel(noise ** 2, noise_level_bounds=(noise ** 2,) * 2)
+
+ # Train a GPR model
+ model = Pipeline([
+ ('variance', VarianceThreshold()),
+ ('scale', StandardScaler()),
+ ('gpr', GaussianProcessRegressor(kernel))
+ ])
+
+ # Train and save the model
+ model.fit(train_x, train_y)
+ print(f'Finished fitting the model on {len(train_x)} data points')
+ print(f'Optimized model: {model["gpr"].kernel_}')
+ return model
+
+
+def generate_training_set(opt_spec: OptimizationProblem, sample_path) -> Tuple[np.ndarray, np.ndarray]:
+ """Load in all of the previous samples to build a training set
+
+ Uses the inputs and outputs defined in the optimization specification
+
+ Returns:
+ - Input features
+ - Output variable
+ """
+
+ # Get the name of the input columns
+ input_columns = opt_spec.search_template.input_columns
+
+ train_x = []
+ train_y = []
+ # Loop over samples in the training data
+ for path in Path(sample_path).rglob('*.json'):
+ sample = Sample.parse_file(path)
+ train_x.append([sample.inputs[c] for c in input_columns]) # Get only the needed input columns
+ train_y.append(sample.processed_output[opt_spec.output]) # Get the target output column
+
+ # Convert them to numpy and return
+ return np.array(train_x), np.array(train_y)
+
+
+if __name__ == "__main__":
+ # Make some CLI arguments
+ parser = ArgumentParser()
+ parser.add_argument('--train-files', help='Path to the completed samples', default='samples')
+ parser.add_argument('--optimization-spec', help='Configuration file', default='opt_spec.yaml')
+ args = parser.parse_args()
+
+ # Load in the optimization configuration
+ with open(args.optimization_spec) as fp:
+ opt_spec = yaml.load(fp, Loader=SafeLoader)
+ opt_spec = OptimizationProblem.parse_obj(opt_spec)
+
+ # Get the training data
+ train_x, train_y = generate_training_set(opt_spec, args.train_files)
+ print(f'Loaded a training set of {len(train_x)} entries')
+
+ # Log-normalize conductivity
+ train_y = np.log(train_y)
+
+ # Fit a model and save the training records
+ model = fit_model(opt_spec, train_x, train_y)
+
+ # Create the search space
+ possible_options = opt_spec.search_template.generate_search_space_dataframe()
+ search_x = possible_options[opt_spec.search_template.input_columns]
+ print(f'Created {len(search_x)} samples to be evaluated')
+
+ # Perform the inference
+ # TODO (wardlt): This is the part that can be parallelized
+ chunk_size = opt_spec.planner_options.get('chunk_size')
+ search_y = []
+ search_std = []
+ for chunk in np.array_split(search_x, len(search_x) // chunk_size):
+ y_pred, y_std = run_inference(model, chunk)
+ search_y.append(y_pred)
+ search_std.append(y_std)
+ search_y = np.vstack(search_y)
+ search_std = np.vstack(search_std)
+
+ # Get the largest UCB
+ assert opt_spec.maximize, "The optimization requests minimization"
+ ei = EI(search_y, search_std, max_val=np.max(train_y), tradeoff=0.1)
+ best_ind = np.argmax(ei)
+ best_point = search_x.iloc[best_ind][opt_spec.search_template.input_columns]
+
+ # Make the sample and save it to disk
+ output = opt_spec.search_template.create_new_sample()
+ for p, x in zip(opt_spec.search_template.input_columns, best_point):
+ output.inputs[p] = x
+ with open('next_sample.json', 'w') as fp:
+ fp.write(output.json(indent=2))
diff --git a/polybot/config.py b/polybot/config.py
index f7c8f8d..ed42db7 100644
--- a/polybot/config.py
+++ b/polybot/config.py
@@ -3,6 +3,7 @@
from typing import Optional, Tuple, List
from urllib.parse import urlparse
+from adc.client import ADCClient
from colmena.redis.queue import ClientQueues, TaskServerQueues
from pydantic import BaseSettings, Field, HttpUrl, RedisDsn
@@ -12,14 +13,15 @@
class Settings(BaseSettings):
"""Settings for the web service"""
- # Sample handling
- sample_folder: Path = Field(_run_folder / "samples", description="Path in which to store the samples")
+ # Configuration related to data storage on the Argonne Data Cloud
+ adc_access_token: Optional[str] = Field(None, description='Token for accessing the Argonne Data Cloud')
+ adc_study_id: Optional[str] = Field(None, description='Study ID associated with this experiment')
# Logging
log_name: Optional[str] = Field(None, description="Name of the log file. If not provided, logs will not be stored")
log_size: int = Field(1, description="Maximum log size in MB")
- # Interface between FastAPI and planning services
+ # Interface between the thinker and any remote compute processes
redis_url: Optional[RedisDsn] = Field(None, description="URL of the redis service. Used to send messages "
"between web and planning services")
@@ -66,5 +68,13 @@ def make_server_queue(self) -> TaskServerQueues:
hostname, port = self.redis_info
return TaskServerQueues(hostname, port, name='polybot', topics=['robot'] + self.task_queues)
+ def generate_adc_client(self) -> ADCClient:
+ """Create an authenticated ADC client
+
+ Returns:
+ A client to the ADC that is ready to make queries
+ """
+ return ADCClient(self.adc_access_token)
+
settings = Settings()
diff --git a/polybot/fastapi.py b/polybot/fastapi.py
deleted file mode 100644
index 21b325f..0000000
--- a/polybot/fastapi.py
+++ /dev/null
@@ -1,33 +0,0 @@
-"""Routines for creating the REST interface"""
-from logging.handlers import RotatingFileHandler
-from datetime import datetime
-import logging
-
-from fastapi import FastAPI
-
-from .views import ingest
-from .config import settings
-
-logger = logging.getLogger(__name__)
-_start_time = datetime.now()
-
-# Build the application
-app = FastAPI()
-app.include_router(ingest.router)
-
-# Make the output directory
-settings.sample_folder.mkdir(exist_ok=True)
-logger.info(f'Saving output files to {settings.sample_folder.absolute()}')
-
-# Define the logging, if desired
-if settings.log_name is not None:
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[RotatingFileHandler(settings.log_name, mode='a',
- maxBytes=1024 * 1024 * settings.log_size, backupCount=1)])
-
-
-@app.get("/")
-def home():
- return {"msg": f"System has been online since {_start_time.isoformat()}"}
diff --git a/polybot/planning.py b/polybot/planning.py
index 64baebd..1a75ee8 100644
--- a/polybot/planning.py
+++ b/polybot/planning.py
@@ -10,11 +10,11 @@
import requests
from colmena.redis.queue import ClientQueues, TaskServerQueues
from colmena.task_server import ParslTaskServer
-from colmena.thinker import BaseThinker, result_processor
-from colmena.models import Result
+from colmena.thinker import BaseThinker, agent
from parsl import Config, ThreadPoolExecutor
from pydantic import BaseModel, Field, AnyHttpUrl
+from polybot.sample import subscribe_to_study
from polybot.models import SampleTemplate
from polybot.robot import send_new_sample
@@ -60,12 +60,14 @@ class BasePlanner(BaseThinker):
:class:`OptimizationProblem` JSON document.
There are no requirements on how you implement the planning algorithm, but you may at least want an agent
- waiting for results with the "robot" topic. For example,
+ that subscribes to results from the Argonne Data Cloud "subscribe_to_samples" feed.
.. code: python
- @result_processor(topic='robot')
- def robot_result_handler(self, _: Result):
+ @agent()
+ def robot_planner(self, _: Result):
+ from polybot.
+ for _ in
output = self.opt_spec.get_sample_template()
send_send_new_sample(output)
@@ -79,21 +81,20 @@ def __init__(self, queues: ClientQueues, opt_spec: OptimizationProblem, daemon:
class RandomPlanner(BasePlanner):
"""Submit a randomly-selected point from the search space each time a new result is completed"""
- @result_processor(topic='robot')
- def robot_result_handler(self, _: Result):
- """Generate a new task to be run on the robot after one completes
-
- Args:
- _: Result that is not actually used for now.
- """
- # Make a choice for each variable
- output = self.opt_spec.search_template.create_new_sample()
- for key, acceptable_values in self.opt_spec.search_template.list_acceptable_input_values().items():
- output.inputs[key] = random.choice(acceptable_values)
-
- # Send it to the robot
- send_new_sample(output)
- return
+ @agent()
+ def robot_result_handler(self):
+ """Generate a new task to be run on the robot after one completes"""
+ # Wait a result to complete
+ for sample in subscribe_to_study():
+ self.logger.info(f'Received new sample: {sample.ID}')
+
+ # Make a choice for each variable
+ output = self.opt_spec.search_template.create_new_sample()
+ for key, acceptable_values in self.opt_spec.search_template.list_acceptable_input_values().items():
+ output.inputs[key] = random.choice(acceptable_values)
+
+ # Send it to the robot
+ send_new_sample(output)
def _execute(f: Callable):
diff --git a/polybot/sample.py b/polybot/sample.py
index fba5877..f431f50 100644
--- a/polybot/sample.py
+++ b/polybot/sample.py
@@ -6,6 +6,7 @@
import logging
from typing import Iterator
+from requests import get
from .config import settings
from .models import Sample
@@ -14,23 +15,27 @@
logger = logging.getLogger(__name__)
-def save_sample(sample: Sample, overwrite: bool = True):
- """Save a sample
+def subscribe_to_study() -> Iterator[Sample]:
+ """Subscribe to the "new sample" created event feed
- Args:
- sample: Sample to be saved
- overwrite: Whether overwriting existing files
+ Yields:
+ Latest samples as they are created
"""
- path = settings.sample_folder / f"{sample.ID}.json"
- if path.exists():
- if overwrite:
- logger.warning(f'Overwriting file at {sample.ID}')
- else:
- raise ValueError(f"File already exists. Set overwrite=True, if you want to remove it. Path: {path}")
- with open(path, 'w') as fp:
- fp.write(sample.json(indent=2))
- logger.info(f'Wrote {sample.ID} to {path}')
+ # Query to get the list of samples in the study
+ adc_client = settings.generate_adc_client()
+ if settings.adc_study_id is None:
+ raise ValueError('The ADC study id is not set. Set your ADC_STUDY_ID environment variable.')
+
+ for event in adc_client.subscribe_to_study(settings.adc_study_id):
+ # Check that we have the right type of event
+ if "newSample" not in event:
+ logger.debug('Event type was not "newSample"')
+ continue
+
+ # Get the sample information
+ sample = event["newSample"]["sample"]
+ yield _parse_sample(sample)
def load_samples() -> Iterator[Sample]:
@@ -40,8 +45,27 @@ def load_samples() -> Iterator[Sample]:
Samples in no prescribed order
"""
- for path in settings.sample_folder.glob("*.json"):
- try:
- yield Sample.parse_file(path)
- except BaseException:
- continue
+ # Query to get the list of samples in the study
+ adc_client = settings.generate_adc_client()
+ if settings.adc_study_id is None:
+ raise ValueError('The ADC study id is not set. Set your ADC_STUDY_ID environment variable.')
+ study_info = adc_client.get_study(settings.adc_study_id)
+
+ for sample in study_info['study']['samples']:
+ yield _parse_sample(sample)
+
+
+def _parse_sample(sample_record: dict) -> Sample:
+ """Create a Sample object given a sample record from ADC
+
+ Args:
+ sample_record: Sample information record from ADC
+ Returns:
+ Sample object in the format used by `pol
+ """
+ # Pull down the JSON associated with each sample
+ json_url = sample_record['url']
+ sample_data = get(json_url, verify=False).json()
+
+ # Parse as a sample object
+ return Sample.parse_obj(sample_data)
diff --git a/polybot/tests/conftest.py b/polybot/tests/conftest.py
index d6af697..2aef5dc 100644
--- a/polybot/tests/conftest.py
+++ b/polybot/tests/conftest.py
@@ -1,10 +1,9 @@
from pathlib import Path
-from shutil import rmtree
+import json
from pytest import fixture
-from fastapi.testclient import TestClient
+from pytest_mock import MockerFixture
-from polybot.fastapi import app
from polybot.models import Sample, SampleTemplate
from polybot.config import settings
@@ -15,13 +14,6 @@
@fixture(autouse=True)
def test_settings():
- # Redirect the sample folder
- sample_dir = _test_dir / "test-samples"
- if sample_dir.is_dir():
- rmtree(sample_dir)
- sample_dir.mkdir()
- settings.sample_folder = sample_dir
-
# Set up the test Redis service
settings.redis_url = "rediss://localhost"
@@ -36,6 +28,22 @@ def example_template() -> SampleTemplate:
return SampleTemplate.parse_file(file_path / 'example-template.json')
+@fixture
+def example_sample() -> dict:
+ """Get an example sample response from ADC with a fresh token to access S3"""
+ client = settings.generate_adc_client()
+ assert settings.adc_study_id is not None, "Missing ADC_STUDY_ID environmental variable"
+ study_response = client.get_study(settings.adc_study_id)
+ return study_response['study']['samples'][0]
+
+
@fixture()
-def fastapi_client() -> TestClient:
- return TestClient(app)
+def mock_subscribe(example_sample, mocker: MockerFixture):
+ # Make a fake subscription response
+ ex_record = json.loads(file_path.joinpath('example-adc-subscription-event.json').read_text())
+ ex_record['newSample']['sample'] = example_sample # Gives a sample with an active data URL
+
+ # Mock the event generator
+ def _fake_subscribe(*args, **kwargs):
+ yield ex_record
+ mocker.patch('polybot.config.ADCClient.subscribe_to_study', new=_fake_subscribe)
diff --git a/polybot/tests/files/example-adc-subscription-event.json b/polybot/tests/files/example-adc-subscription-event.json
new file mode 100644
index 0000000..8d094a2
--- /dev/null
+++ b/polybot/tests/files/example-adc-subscription-event.json
@@ -0,0 +1,59 @@
+{
+ "newSample": {
+ "study": {
+ "id": "U3R1ZHlOb2RlOjU=",
+ "name": "polybot-ai-test",
+ "description": "Test study for developing the Polybot AI planning service.",
+ "keywords": [],
+ "startDate": null,
+ "status": "NEW",
+ "created": "2021-08-30T16:54:41.706000+00:00",
+ "updated": "2021-08-30T16:54:42.127000+00:00",
+ "permissions": [
+ {
+ "user": {
+ "id": "VXNlck5vZGU6NQ==",
+ "name": "",
+ "email": "loganw@uchicago.edu"
+ },
+ "level": "ADMIN"
+ }
+ ],
+ "investigations": [],
+ "samples": [
+ {
+ "id": "U2FtcGxlTm9kZToxNQ==",
+ "name": "000286e59a",
+ "url": "https://s3-stage.discoverycloud.anl.gov/adc-test/samples/000286e59a.json?AWSAccessKeyId=73cf579b511a35c476f338ef1413397d&Signature=9sP9Jlc0paJntFNjj2fYImpaEgw%3D&Expires=1630445602",
+ "keywords": [],
+ "created": "2021-08-30T17:48:13.570686+00:00",
+ "updated": "2021-08-30T17:48:13.570717+00:00"
+ },
+ {
+ "id": "U2FtcGxlTm9kZToxNg==",
+ "name": "000286e59a",
+ "url": "https://s3-stage.discoverycloud.anl.gov/adc-test/samples/000286e59a_UcWJRrA.json?AWSAccessKeyId=73cf579b511a35c476f338ef1413397d&Signature=Kvz%2BTKx8pzusLojlY4Qgu%2FMqf1E%3D&Expires=1630445602",
+ "keywords": [],
+ "created": "2021-08-31T20:33:22.241986+00:00",
+ "updated": "2021-08-31T20:33:22.242018+00:00"
+ }
+ ]
+ },
+ "sample": {
+ "id": "U2FtcGxlTm9kZToxNg==",
+ "name": "000286e59a",
+ "user": {
+ "id": "VXNlck5vZGU6NQ==",
+ "name": "",
+ "email": "loganw@uchicago.edu"
+ },
+ "keywords": [],
+ "parent": null,
+ "created": "2021-08-31T20:33:22.241000+00:00",
+ "updated": "2021-08-31T20:33:22.242000+00:00",
+ "url": "https://s3-stage.discoverycloud.anl.gov/adc-test/samples/000286e59a_UcWJRrA.json?AWSAccessKeyId=73cf579b511a35c476f338ef1413397d&Signature=Kvz%2BTKx8pzusLojlY4Qgu%2FMqf1E%3D&Expires=1630445602"
+ },
+ "source": null
+ }
+}
+
diff --git a/polybot/tests/test_planner.py b/polybot/tests/test_planner.py
index 1721d16..37ca6c4 100644
--- a/polybot/tests/test_planner.py
+++ b/polybot/tests/test_planner.py
@@ -1,6 +1,7 @@
"""Make sure the planning system works"""
import json
import logging
+import time
from time import sleep
from colmena.models import Result
@@ -34,34 +35,20 @@ def json(self):
assert isinstance(opt.search_template, SampleTemplate)
-def test_generate(mocker: MockerFixture, opt_config, fastapi_client, example_sample, caplog):
+def test_generate(mocker: MockerFixture, mock_subscribe, opt_config, example_sample, caplog):
# Mock the send_new_sample in the planning library
fake_robot = mocker.patch('polybot.planning.send_new_sample')
# Make the planner
client_q = settings.make_client_queue()
- server_q = settings.make_server_queue()
planner = RandomPlanner(client_q, opt_config, daemon=True)
# Launch it as a Thread
planner.start()
+ time.sleep(5) # Wait for the thread to start up and the subscription to download a sample
try:
- # Test sending in a new result
- server_q.send_result(Result(((0,), {})), topic='robot')
- sleep(1) # For the other thread to catch up
-
# Make sure the server responded by sending a record to the "robot"
assert fake_robot.call_count == 1, planner.is_alive()
-
- # Test sending via the REST API
- caplog.clear()
- with caplog.at_level(logging.INFO):
- res = fastapi_client.post("/ingest", json=example_sample.dict(), allow_redirects=True)
- assert res.status_code == 200
- sleep(2) # Multiple threads have to complete
-
- # Make sure the server responded by sending a record to the "robot"
- assert fake_robot.call_count == 2, planner.is_alive()
finally:
# Kill the planning service
planner.done.set()
diff --git a/polybot/tests/test_samples.py b/polybot/tests/test_samples.py
index 989f58b..8f70550 100644
--- a/polybot/tests/test_samples.py
+++ b/polybot/tests/test_samples.py
@@ -1,28 +1,16 @@
from pathlib import Path
-from pytest import fixture, raises
+from polybot.models import Sample
+from polybot.sample import load_samples, subscribe_to_study
-from polybot.sample import save_sample, load_samples
-from polybot.config import settings
+_my_path = Path(__file__).parent
-@fixture(autouse=True)
-def sample_dir(tmpdir):
- settings.sample_folder = Path(tmpdir)
-
-
-def test_save(example_sample):
- save_sample(example_sample)
- assert settings.sample_folder.joinpath(f'{example_sample.ID}.json').is_file()
- save_sample(example_sample, overwrite=True)
- with raises(ValueError):
- save_sample(example_sample, overwrite=False)
+def test_subscribe(mock_subscribe):
+ sample = next(subscribe_to_study())
+ assert isinstance(sample, Sample)
def test_load(example_sample):
- test_save(example_sample)
- with open(settings.sample_folder / "test.json", 'w') as fp:
- fp.write('Junk')
samples = list(load_samples())
- assert len(samples) == 1
- assert samples[0].ID == example_sample.ID
+ assert len(samples) >= 1
diff --git a/polybot/tests/test_server.py b/polybot/tests/test_server.py
deleted file mode 100644
index b063724..0000000
--- a/polybot/tests/test_server.py
+++ /dev/null
@@ -1,14 +0,0 @@
-"""Tests for the REST API"""
-
-
-def test_home(fastapi_client):
- res = fastapi_client.get("/")
- assert res.status_code == 200
-
-
-def test_upload(fastapi_client):
- res = fastapi_client.post('/ingest', json={'ID': '1'*10}, allow_redirects=True)
- assert res.status_code == 200
- reply = res.json()
- assert reply['success']
- assert reply['sample'] == '1' * 10
diff --git a/polybot/views/__init__.py b/polybot/views/__init__.py
deleted file mode 100644
index 0a08b3e..0000000
--- a/polybot/views/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-"""Routes for the server"""
diff --git a/polybot/views/ingest.py b/polybot/views/ingest.py
deleted file mode 100644
index 5dbbb1b..0000000
--- a/polybot/views/ingest.py
+++ /dev/null
@@ -1,34 +0,0 @@
-"""Routes related to ingesting data from the robot"""
-import logging
-
-from fastapi import APIRouter
-from colmena.models import Result
-
-from polybot.models import Sample
-from polybot.config import settings
-from polybot.sample import save_sample
-
-logger = logging.getLogger(__name__)
-router = APIRouter(prefix='/ingest')
-
-
-@router.post("/")
-def upload_data(sample: Sample):
- """Intake a file from the robot and save it to disk"""
- logger.info(f'Received sample {sample.ID}')
-
- # Save it to disk
- save_sample(sample)
-
- # Send the result to the planning service
- # We use Colmena-formatted messages in a Redis queue
- if settings.redis_url is not None:
- queue = settings.make_server_queue()
- result = Result(((None,), {}))
- result.set_result(sample, 0)
- queue.send_result(
- result, topic='robot'
- )
- logger.info('Sent result to the planning service')
-
- return {'success': True, 'sample': sample.ID}
diff --git a/requirements.txt b/requirements.txt
index 834ffd2..3b8036a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,9 +2,7 @@ matplotlib>3
pandas>=1
pydantic>=1.5
requests>=2.24
-fastapi>=0.65.2
colmena>=0.1.0
-pytest>=6.2.4
polybot>=0.0.1
pyyaml>=5.4.1
numpy>=1.20.2
diff --git a/run.bat b/run.bat
deleted file mode 100644
index 3977233..0000000
--- a/run.bat
+++ /dev/null
@@ -1 +0,0 @@
-uvicorn polybot.fastapi:app --port 5001 --host 0.0.0.0
diff --git a/run.sh b/run.sh
deleted file mode 100644
index 3977233..0000000
--- a/run.sh
+++ /dev/null
@@ -1 +0,0 @@
-uvicorn polybot.fastapi:app --port 5001 --host 0.0.0.0
diff --git a/test-requirements.txt b/test-requirements.txt
index 2bee210..5db6d7c 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -1,7 +1,5 @@
-pytest>=5.2
+pytest>=6.2.4
pytest-cov
pytest-mock
scikit-learn
-uvicorn[standard]
-python-multipart
flake8