diff --git a/README.md b/README.md index fb6f2a0..f7cae51 100644 --- a/README.md +++ b/README.md @@ -18,12 +18,14 @@ which you can use to build the environment using `pip`: `pip install -e .` +There is one exception: The Argonne Data Cloud SDK is not yet installable via Pip. +You must clone it from https://github.com/AD-SDL/adc-rdm-sdk/ (private repo) then install the package with [Poetry](https://python-poetry.org/). + ## Running Polybot The web service is built from three smaller services. - Redis: Launch Redis as normal: `redis-server` -- Web Service: Launch FastAPI using Uvicorn: `uvicorn polybot.fastapi:app` - AI planner: Launch using the CLI: `polybot planner opt_spec.json` ### Configuring PolyBot @@ -57,19 +59,39 @@ polybot planner --planning-class planner:BOPlanner ``` Polybot uses [Colmena](http://colmena.rtfd.org/) to express planning algorithms. -At minimum, you will need to implement a ["Thinker"](https://colmena.readthedocs.io/en/latest/how-to.html#creating-a-thinker-application) -that defines the logic for controlling the robot. +Follow our guide to creating a ["Thinker"](https://colmena.readthedocs.io/en/latest/how-to.html#creating-a-thinker-application) +to defines the logic for controlling the robot. +`BasePlanner` is a subclass of the Colmena `BaseThinker` class. We briefly describe a few common tasks and how to implement them. #### Responding to Data from Robot -The web service provides result events with the topic "robot," and you must -define logic for how to respond to new data becoming available. -We recommend either explicitly waiting on the results from this topic using the -[`self.queue.get_result`](https://colmena.readthedocs.io/en/latest/how-to.html#submitting-tasks) function -or using the [`@result_processor(topic='robot')`](https://colmena.readthedocs.io/en/latest/thinker.html#result-processing-agents) -decorate. -Typically, the function will end by sending a new task to the robot using the +We access results from the robot by subscribing "study events" from the Argonne Data Cloud (ADC). +The ADC provides a service where one can be sent messages about when a sample is created via web sockets. +Use this feature by first setting the ID for your study as the `ADC_STUDY_ID` environment variable and +then using the `subscribe_to_study` function provided with this library. + +A template "Thinker" application that uses these functions would be + +```python +from colmena.thinker import agent + +from polybot.planning import BasePlanner +from polybot.sample import subscribe_to_study +from polybot.robot import send_new_sample + +class RobotPlanner(BasePlanner): + + @agent() + def make_tasks(self): + """This function will run as a thread when you start the planner""" + for sample in subscribe_to_study(): + new_sample = ... # Logic to be defined by you! + send_new_sample(new_sample) +``` + +Note how we receive a new sample from the [`subscribe_to_study`](./polybot/sample.py) function +and then send a new task to the robot using the [`polybot.robot.send_new_sample`](./polybot/robot.py) function. #### Performing Computations on Remote Resources diff --git a/environment.yml b/environment.yml index 9ff7506..11f12bd 100644 --- a/environment.yml +++ b/environment.yml @@ -10,6 +10,7 @@ dependencies: - scikit-learn - pandas==1.* - pydantic + - tqdm - pytest - pytest-cov - flake8 diff --git a/example-planners/bayesian-optimization/README.md b/example-planners/bayesian-optimization/README.md index d39d903..7199f6c 100644 --- a/example-planners/bayesian-optimization/README.md +++ b/example-planners/bayesian-optimization/README.md @@ -9,17 +9,14 @@ complete very quickly (~10s for 3 million inference tasks). Launch using `polybot planner -p planner:BOPlanner -t local_compute:make_task_server opt_spec.yaml`. -We recommend also launching the FastAPI services from this folder as well so that it uses the same environment variables. - ## Tailoring to your system -First, edit the `.env` file in this folder to point to the proper address for the robot and a good path for storing samples. +First, edit the `.env` file in this folder to point to the proper address for the robot +and the Study ID for the collection of completed samples in the Argonne Data cloud. ## Launching the Services -1. Start up Redis -2. Launch the HTTP service for receiving new samples. (Linux: `../../run.sh`, Windows: `..\..\run.bat`) -3. Launch the planner using the `polybot` command listed above +1. Start up Redis (e.g., run `redis-server` in another screen or terminal) +2. Launch the planner using the `polybot` command listed above -Make sure to record the IP address and port number for your HTTP service, -which are needed to configure where the robot should send new data. +The planner will then run until you kill it with Ctrl+C. diff --git a/example-planners/bayesian-optimization/planner.py b/example-planners/bayesian-optimization/planner.py index 3893997..215419a 100644 --- a/example-planners/bayesian-optimization/planner.py +++ b/example-planners/bayesian-optimization/planner.py @@ -8,7 +8,7 @@ import numpy as np from colmena.models import Result from colmena.redis.queue import ClientQueues -from colmena.thinker import result_processor, agent +from colmena.thinker import agent from sklearn.feature_selection import VarianceThreshold from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler @@ -16,8 +16,9 @@ from sklearn.model_selection import RepeatedKFold, cross_validate from modAL.acquisition import EI +from polybot.config import settings from polybot.robot import send_new_sample -from polybot.sample import load_samples +from polybot.sample import load_samples, subscribe_to_study from polybot.planning import BasePlanner, OptimizationProblem @@ -74,9 +75,10 @@ def startup(self): self.logger.info('Performing a cold-start') self.perform_bo() - @result_processor(topic='robot') - def robot_result_handler(self, _: Result): - self.perform_bo() + @agent() + def robot_result_handler(self): + for sample in subscribe_to_study(): + self.perform_bo() def perform_bo(self): # Make the output directory for results diff --git a/example-planners/bayesian-optimization/simulate-results.sh b/example-planners/bayesian-optimization/simulate-results.sh old mode 100644 new mode 100755 index e44b357..0c67928 --- a/example-planners/bayesian-optimization/simulate-results.sh +++ b/example-planners/bayesian-optimization/simulate-results.sh @@ -2,7 +2,7 @@ for f in `find 706data -name "*.json"`; do # Send data to the web service - curl -H "Content-Type: application/json" -L --data @${f} http://127.0.0.1:8000/ingest + curl -H "Content-Type: application/json" -L --data @${f} http://127.0.0.1:5001/ingest echo # So that we get newlines sleep 15 done diff --git a/example-planners/bayesian-optimization/test_funcx.py b/example-planners/bayesian-optimization/test_funcx.py new file mode 100644 index 0000000..981eb48 --- /dev/null +++ b/example-planners/bayesian-optimization/test_funcx.py @@ -0,0 +1,141 @@ +"""Example script for running the core methods of the FuncX planner""" +from argparse import ArgumentParser +from pathlib import Path +from typing import Tuple + +from sklearn.gaussian_process import GaussianProcessRegressor, kernels +from sklearn.feature_selection import VarianceThreshold +from sklearn.preprocessing import StandardScaler +from sklearn.pipeline import Pipeline +from modAL.acquisition import EI +from yaml import SafeLoader +import numpy as np +import yaml + +from planner import run_inference +from polybot.models import Sample +from polybot.planning import OptimizationProblem + + +def fit_model(opt_spec: OptimizationProblem, train_x: np.ndarray, train_y: np.ndarray) -> Pipeline: + """Fit and test a model using the latest data + + Args: + opt_spec: Configuration file for the optimization + train_x: Input columns + train_y: Output column + out_dir: Location to store the data + """ + # Create an initial RBF kernel, using the training set mean as a scaling parameter + kernel = train_y.mean() ** 2 * kernels.RBF(length_scale=1) + + # TODO (wardlt): Make it clear where featurization would appear, as we are soon to introduce additives + # This will yield chemical degrees of freedom better captured using features of the additives rather + # than a new variable per additive + # Notes for now: Mol. Weight, Side Chain Length, and ... are the likely candidates + + # Add a noise parameter based on user settings + noise = opt_spec.planner_options.get('noise_level', 0) + if noise < 0: + # Use standard deviation of the distribution of train_y will be the estimation of initial noise + # TODO (wardlt): Document where 3, 4, and 11 come from + noise_estimated = np.std(train_y) / 3 + noise_lb = noise_estimated / 4 + noise_ub = noise_estimated * 11 + + kernel_noise = kernels.WhiteKernel(noise_level=noise_estimated ** 2, + noise_level_bounds=(noise_lb ** 2, noise_ub ** 2)) + kernel = kernel + kernel_noise + elif noise > 0: + kernel = kernel + kernels.WhiteKernel(noise ** 2, noise_level_bounds=(noise ** 2,) * 2) + + # Train a GPR model + model = Pipeline([ + ('variance', VarianceThreshold()), + ('scale', StandardScaler()), + ('gpr', GaussianProcessRegressor(kernel)) + ]) + + # Train and save the model + model.fit(train_x, train_y) + print(f'Finished fitting the model on {len(train_x)} data points') + print(f'Optimized model: {model["gpr"].kernel_}') + return model + + +def generate_training_set(opt_spec: OptimizationProblem, sample_path) -> Tuple[np.ndarray, np.ndarray]: + """Load in all of the previous samples to build a training set + + Uses the inputs and outputs defined in the optimization specification + + Returns: + - Input features + - Output variable + """ + + # Get the name of the input columns + input_columns = opt_spec.search_template.input_columns + + train_x = [] + train_y = [] + # Loop over samples in the training data + for path in Path(sample_path).rglob('*.json'): + sample = Sample.parse_file(path) + train_x.append([sample.inputs[c] for c in input_columns]) # Get only the needed input columns + train_y.append(sample.processed_output[opt_spec.output]) # Get the target output column + + # Convert them to numpy and return + return np.array(train_x), np.array(train_y) + + +if __name__ == "__main__": + # Make some CLI arguments + parser = ArgumentParser() + parser.add_argument('--train-files', help='Path to the completed samples', default='samples') + parser.add_argument('--optimization-spec', help='Configuration file', default='opt_spec.yaml') + args = parser.parse_args() + + # Load in the optimization configuration + with open(args.optimization_spec) as fp: + opt_spec = yaml.load(fp, Loader=SafeLoader) + opt_spec = OptimizationProblem.parse_obj(opt_spec) + + # Get the training data + train_x, train_y = generate_training_set(opt_spec, args.train_files) + print(f'Loaded a training set of {len(train_x)} entries') + + # Log-normalize conductivity + train_y = np.log(train_y) + + # Fit a model and save the training records + model = fit_model(opt_spec, train_x, train_y) + + # Create the search space + possible_options = opt_spec.search_template.generate_search_space_dataframe() + search_x = possible_options[opt_spec.search_template.input_columns] + print(f'Created {len(search_x)} samples to be evaluated') + + # Perform the inference + # TODO (wardlt): This is the part that can be parallelized + chunk_size = opt_spec.planner_options.get('chunk_size') + search_y = [] + search_std = [] + for chunk in np.array_split(search_x, len(search_x) // chunk_size): + y_pred, y_std = run_inference(model, chunk) + search_y.append(y_pred) + search_std.append(y_std) + search_y = np.vstack(search_y) + search_std = np.vstack(search_std) + + # Get the largest UCB + assert opt_spec.maximize, "The optimization requests minimization" + ei = EI(search_y, search_std, max_val=np.max(train_y), tradeoff=0.1) + best_ind = np.argmax(ei) + best_point = search_x.iloc[best_ind][opt_spec.search_template.input_columns] + + # Make the sample and save it to disk + output = opt_spec.search_template.create_new_sample() + for p, x in zip(opt_spec.search_template.input_columns, best_point): + output.inputs[p] = x + with open('next_sample.json', 'w') as fp: + fp.write(output.json(indent=2)) diff --git a/polybot/config.py b/polybot/config.py index f7c8f8d..ed42db7 100644 --- a/polybot/config.py +++ b/polybot/config.py @@ -3,6 +3,7 @@ from typing import Optional, Tuple, List from urllib.parse import urlparse +from adc.client import ADCClient from colmena.redis.queue import ClientQueues, TaskServerQueues from pydantic import BaseSettings, Field, HttpUrl, RedisDsn @@ -12,14 +13,15 @@ class Settings(BaseSettings): """Settings for the web service""" - # Sample handling - sample_folder: Path = Field(_run_folder / "samples", description="Path in which to store the samples") + # Configuration related to data storage on the Argonne Data Cloud + adc_access_token: Optional[str] = Field(None, description='Token for accessing the Argonne Data Cloud') + adc_study_id: Optional[str] = Field(None, description='Study ID associated with this experiment') # Logging log_name: Optional[str] = Field(None, description="Name of the log file. If not provided, logs will not be stored") log_size: int = Field(1, description="Maximum log size in MB") - # Interface between FastAPI and planning services + # Interface between the thinker and any remote compute processes redis_url: Optional[RedisDsn] = Field(None, description="URL of the redis service. Used to send messages " "between web and planning services") @@ -66,5 +68,13 @@ def make_server_queue(self) -> TaskServerQueues: hostname, port = self.redis_info return TaskServerQueues(hostname, port, name='polybot', topics=['robot'] + self.task_queues) + def generate_adc_client(self) -> ADCClient: + """Create an authenticated ADC client + + Returns: + A client to the ADC that is ready to make queries + """ + return ADCClient(self.adc_access_token) + settings = Settings() diff --git a/polybot/fastapi.py b/polybot/fastapi.py deleted file mode 100644 index 21b325f..0000000 --- a/polybot/fastapi.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Routines for creating the REST interface""" -from logging.handlers import RotatingFileHandler -from datetime import datetime -import logging - -from fastapi import FastAPI - -from .views import ingest -from .config import settings - -logger = logging.getLogger(__name__) -_start_time = datetime.now() - -# Build the application -app = FastAPI() -app.include_router(ingest.router) - -# Make the output directory -settings.sample_folder.mkdir(exist_ok=True) -logger.info(f'Saving output files to {settings.sample_folder.absolute()}') - -# Define the logging, if desired -if settings.log_name is not None: - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s', - handlers=[RotatingFileHandler(settings.log_name, mode='a', - maxBytes=1024 * 1024 * settings.log_size, backupCount=1)]) - - -@app.get("/") -def home(): - return {"msg": f"System has been online since {_start_time.isoformat()}"} diff --git a/polybot/planning.py b/polybot/planning.py index 64baebd..1a75ee8 100644 --- a/polybot/planning.py +++ b/polybot/planning.py @@ -10,11 +10,11 @@ import requests from colmena.redis.queue import ClientQueues, TaskServerQueues from colmena.task_server import ParslTaskServer -from colmena.thinker import BaseThinker, result_processor -from colmena.models import Result +from colmena.thinker import BaseThinker, agent from parsl import Config, ThreadPoolExecutor from pydantic import BaseModel, Field, AnyHttpUrl +from polybot.sample import subscribe_to_study from polybot.models import SampleTemplate from polybot.robot import send_new_sample @@ -60,12 +60,14 @@ class BasePlanner(BaseThinker): :class:`OptimizationProblem` JSON document. There are no requirements on how you implement the planning algorithm, but you may at least want an agent - waiting for results with the "robot" topic. For example, + that subscribes to results from the Argonne Data Cloud "subscribe_to_samples" feed. .. code: python - @result_processor(topic='robot') - def robot_result_handler(self, _: Result): + @agent() + def robot_planner(self, _: Result): + from polybot. + for _ in output = self.opt_spec.get_sample_template() send_send_new_sample(output) @@ -79,21 +81,20 @@ def __init__(self, queues: ClientQueues, opt_spec: OptimizationProblem, daemon: class RandomPlanner(BasePlanner): """Submit a randomly-selected point from the search space each time a new result is completed""" - @result_processor(topic='robot') - def robot_result_handler(self, _: Result): - """Generate a new task to be run on the robot after one completes - - Args: - _: Result that is not actually used for now. - """ - # Make a choice for each variable - output = self.opt_spec.search_template.create_new_sample() - for key, acceptable_values in self.opt_spec.search_template.list_acceptable_input_values().items(): - output.inputs[key] = random.choice(acceptable_values) - - # Send it to the robot - send_new_sample(output) - return + @agent() + def robot_result_handler(self): + """Generate a new task to be run on the robot after one completes""" + # Wait a result to complete + for sample in subscribe_to_study(): + self.logger.info(f'Received new sample: {sample.ID}') + + # Make a choice for each variable + output = self.opt_spec.search_template.create_new_sample() + for key, acceptable_values in self.opt_spec.search_template.list_acceptable_input_values().items(): + output.inputs[key] = random.choice(acceptable_values) + + # Send it to the robot + send_new_sample(output) def _execute(f: Callable): diff --git a/polybot/sample.py b/polybot/sample.py index fba5877..f431f50 100644 --- a/polybot/sample.py +++ b/polybot/sample.py @@ -6,6 +6,7 @@ import logging from typing import Iterator +from requests import get from .config import settings from .models import Sample @@ -14,23 +15,27 @@ logger = logging.getLogger(__name__) -def save_sample(sample: Sample, overwrite: bool = True): - """Save a sample +def subscribe_to_study() -> Iterator[Sample]: + """Subscribe to the "new sample" created event feed - Args: - sample: Sample to be saved - overwrite: Whether overwriting existing files + Yields: + Latest samples as they are created """ - path = settings.sample_folder / f"{sample.ID}.json" - if path.exists(): - if overwrite: - logger.warning(f'Overwriting file at {sample.ID}') - else: - raise ValueError(f"File already exists. Set overwrite=True, if you want to remove it. Path: {path}") - with open(path, 'w') as fp: - fp.write(sample.json(indent=2)) - logger.info(f'Wrote {sample.ID} to {path}') + # Query to get the list of samples in the study + adc_client = settings.generate_adc_client() + if settings.adc_study_id is None: + raise ValueError('The ADC study id is not set. Set your ADC_STUDY_ID environment variable.') + + for event in adc_client.subscribe_to_study(settings.adc_study_id): + # Check that we have the right type of event + if "newSample" not in event: + logger.debug('Event type was not "newSample"') + continue + + # Get the sample information + sample = event["newSample"]["sample"] + yield _parse_sample(sample) def load_samples() -> Iterator[Sample]: @@ -40,8 +45,27 @@ def load_samples() -> Iterator[Sample]: Samples in no prescribed order """ - for path in settings.sample_folder.glob("*.json"): - try: - yield Sample.parse_file(path) - except BaseException: - continue + # Query to get the list of samples in the study + adc_client = settings.generate_adc_client() + if settings.adc_study_id is None: + raise ValueError('The ADC study id is not set. Set your ADC_STUDY_ID environment variable.') + study_info = adc_client.get_study(settings.adc_study_id) + + for sample in study_info['study']['samples']: + yield _parse_sample(sample) + + +def _parse_sample(sample_record: dict) -> Sample: + """Create a Sample object given a sample record from ADC + + Args: + sample_record: Sample information record from ADC + Returns: + Sample object in the format used by `pol + """ + # Pull down the JSON associated with each sample + json_url = sample_record['url'] + sample_data = get(json_url, verify=False).json() + + # Parse as a sample object + return Sample.parse_obj(sample_data) diff --git a/polybot/tests/conftest.py b/polybot/tests/conftest.py index d6af697..2aef5dc 100644 --- a/polybot/tests/conftest.py +++ b/polybot/tests/conftest.py @@ -1,10 +1,9 @@ from pathlib import Path -from shutil import rmtree +import json from pytest import fixture -from fastapi.testclient import TestClient +from pytest_mock import MockerFixture -from polybot.fastapi import app from polybot.models import Sample, SampleTemplate from polybot.config import settings @@ -15,13 +14,6 @@ @fixture(autouse=True) def test_settings(): - # Redirect the sample folder - sample_dir = _test_dir / "test-samples" - if sample_dir.is_dir(): - rmtree(sample_dir) - sample_dir.mkdir() - settings.sample_folder = sample_dir - # Set up the test Redis service settings.redis_url = "rediss://localhost" @@ -36,6 +28,22 @@ def example_template() -> SampleTemplate: return SampleTemplate.parse_file(file_path / 'example-template.json') +@fixture +def example_sample() -> dict: + """Get an example sample response from ADC with a fresh token to access S3""" + client = settings.generate_adc_client() + assert settings.adc_study_id is not None, "Missing ADC_STUDY_ID environmental variable" + study_response = client.get_study(settings.adc_study_id) + return study_response['study']['samples'][0] + + @fixture() -def fastapi_client() -> TestClient: - return TestClient(app) +def mock_subscribe(example_sample, mocker: MockerFixture): + # Make a fake subscription response + ex_record = json.loads(file_path.joinpath('example-adc-subscription-event.json').read_text()) + ex_record['newSample']['sample'] = example_sample # Gives a sample with an active data URL + + # Mock the event generator + def _fake_subscribe(*args, **kwargs): + yield ex_record + mocker.patch('polybot.config.ADCClient.subscribe_to_study', new=_fake_subscribe) diff --git a/polybot/tests/files/example-adc-subscription-event.json b/polybot/tests/files/example-adc-subscription-event.json new file mode 100644 index 0000000..8d094a2 --- /dev/null +++ b/polybot/tests/files/example-adc-subscription-event.json @@ -0,0 +1,59 @@ +{ + "newSample": { + "study": { + "id": "U3R1ZHlOb2RlOjU=", + "name": "polybot-ai-test", + "description": "Test study for developing the Polybot AI planning service.", + "keywords": [], + "startDate": null, + "status": "NEW", + "created": "2021-08-30T16:54:41.706000+00:00", + "updated": "2021-08-30T16:54:42.127000+00:00", + "permissions": [ + { + "user": { + "id": "VXNlck5vZGU6NQ==", + "name": "", + "email": "loganw@uchicago.edu" + }, + "level": "ADMIN" + } + ], + "investigations": [], + "samples": [ + { + "id": "U2FtcGxlTm9kZToxNQ==", + "name": "000286e59a", + "url": "https://s3-stage.discoverycloud.anl.gov/adc-test/samples/000286e59a.json?AWSAccessKeyId=73cf579b511a35c476f338ef1413397d&Signature=9sP9Jlc0paJntFNjj2fYImpaEgw%3D&Expires=1630445602", + "keywords": [], + "created": "2021-08-30T17:48:13.570686+00:00", + "updated": "2021-08-30T17:48:13.570717+00:00" + }, + { + "id": "U2FtcGxlTm9kZToxNg==", + "name": "000286e59a", + "url": "https://s3-stage.discoverycloud.anl.gov/adc-test/samples/000286e59a_UcWJRrA.json?AWSAccessKeyId=73cf579b511a35c476f338ef1413397d&Signature=Kvz%2BTKx8pzusLojlY4Qgu%2FMqf1E%3D&Expires=1630445602", + "keywords": [], + "created": "2021-08-31T20:33:22.241986+00:00", + "updated": "2021-08-31T20:33:22.242018+00:00" + } + ] + }, + "sample": { + "id": "U2FtcGxlTm9kZToxNg==", + "name": "000286e59a", + "user": { + "id": "VXNlck5vZGU6NQ==", + "name": "", + "email": "loganw@uchicago.edu" + }, + "keywords": [], + "parent": null, + "created": "2021-08-31T20:33:22.241000+00:00", + "updated": "2021-08-31T20:33:22.242000+00:00", + "url": "https://s3-stage.discoverycloud.anl.gov/adc-test/samples/000286e59a_UcWJRrA.json?AWSAccessKeyId=73cf579b511a35c476f338ef1413397d&Signature=Kvz%2BTKx8pzusLojlY4Qgu%2FMqf1E%3D&Expires=1630445602" + }, + "source": null + } +} + diff --git a/polybot/tests/test_planner.py b/polybot/tests/test_planner.py index 1721d16..37ca6c4 100644 --- a/polybot/tests/test_planner.py +++ b/polybot/tests/test_planner.py @@ -1,6 +1,7 @@ """Make sure the planning system works""" import json import logging +import time from time import sleep from colmena.models import Result @@ -34,34 +35,20 @@ def json(self): assert isinstance(opt.search_template, SampleTemplate) -def test_generate(mocker: MockerFixture, opt_config, fastapi_client, example_sample, caplog): +def test_generate(mocker: MockerFixture, mock_subscribe, opt_config, example_sample, caplog): # Mock the send_new_sample in the planning library fake_robot = mocker.patch('polybot.planning.send_new_sample') # Make the planner client_q = settings.make_client_queue() - server_q = settings.make_server_queue() planner = RandomPlanner(client_q, opt_config, daemon=True) # Launch it as a Thread planner.start() + time.sleep(5) # Wait for the thread to start up and the subscription to download a sample try: - # Test sending in a new result - server_q.send_result(Result(((0,), {})), topic='robot') - sleep(1) # For the other thread to catch up - # Make sure the server responded by sending a record to the "robot" assert fake_robot.call_count == 1, planner.is_alive() - - # Test sending via the REST API - caplog.clear() - with caplog.at_level(logging.INFO): - res = fastapi_client.post("/ingest", json=example_sample.dict(), allow_redirects=True) - assert res.status_code == 200 - sleep(2) # Multiple threads have to complete - - # Make sure the server responded by sending a record to the "robot" - assert fake_robot.call_count == 2, planner.is_alive() finally: # Kill the planning service planner.done.set() diff --git a/polybot/tests/test_samples.py b/polybot/tests/test_samples.py index 989f58b..8f70550 100644 --- a/polybot/tests/test_samples.py +++ b/polybot/tests/test_samples.py @@ -1,28 +1,16 @@ from pathlib import Path -from pytest import fixture, raises +from polybot.models import Sample +from polybot.sample import load_samples, subscribe_to_study -from polybot.sample import save_sample, load_samples -from polybot.config import settings +_my_path = Path(__file__).parent -@fixture(autouse=True) -def sample_dir(tmpdir): - settings.sample_folder = Path(tmpdir) - - -def test_save(example_sample): - save_sample(example_sample) - assert settings.sample_folder.joinpath(f'{example_sample.ID}.json').is_file() - save_sample(example_sample, overwrite=True) - with raises(ValueError): - save_sample(example_sample, overwrite=False) +def test_subscribe(mock_subscribe): + sample = next(subscribe_to_study()) + assert isinstance(sample, Sample) def test_load(example_sample): - test_save(example_sample) - with open(settings.sample_folder / "test.json", 'w') as fp: - fp.write('Junk') samples = list(load_samples()) - assert len(samples) == 1 - assert samples[0].ID == example_sample.ID + assert len(samples) >= 1 diff --git a/polybot/tests/test_server.py b/polybot/tests/test_server.py deleted file mode 100644 index b063724..0000000 --- a/polybot/tests/test_server.py +++ /dev/null @@ -1,14 +0,0 @@ -"""Tests for the REST API""" - - -def test_home(fastapi_client): - res = fastapi_client.get("/") - assert res.status_code == 200 - - -def test_upload(fastapi_client): - res = fastapi_client.post('/ingest', json={'ID': '1'*10}, allow_redirects=True) - assert res.status_code == 200 - reply = res.json() - assert reply['success'] - assert reply['sample'] == '1' * 10 diff --git a/polybot/views/__init__.py b/polybot/views/__init__.py deleted file mode 100644 index 0a08b3e..0000000 --- a/polybot/views/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Routes for the server""" diff --git a/polybot/views/ingest.py b/polybot/views/ingest.py deleted file mode 100644 index 5dbbb1b..0000000 --- a/polybot/views/ingest.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Routes related to ingesting data from the robot""" -import logging - -from fastapi import APIRouter -from colmena.models import Result - -from polybot.models import Sample -from polybot.config import settings -from polybot.sample import save_sample - -logger = logging.getLogger(__name__) -router = APIRouter(prefix='/ingest') - - -@router.post("/") -def upload_data(sample: Sample): - """Intake a file from the robot and save it to disk""" - logger.info(f'Received sample {sample.ID}') - - # Save it to disk - save_sample(sample) - - # Send the result to the planning service - # We use Colmena-formatted messages in a Redis queue - if settings.redis_url is not None: - queue = settings.make_server_queue() - result = Result(((None,), {})) - result.set_result(sample, 0) - queue.send_result( - result, topic='robot' - ) - logger.info('Sent result to the planning service') - - return {'success': True, 'sample': sample.ID} diff --git a/requirements.txt b/requirements.txt index 834ffd2..3b8036a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,9 +2,7 @@ matplotlib>3 pandas>=1 pydantic>=1.5 requests>=2.24 -fastapi>=0.65.2 colmena>=0.1.0 -pytest>=6.2.4 polybot>=0.0.1 pyyaml>=5.4.1 numpy>=1.20.2 diff --git a/run.bat b/run.bat deleted file mode 100644 index 3977233..0000000 --- a/run.bat +++ /dev/null @@ -1 +0,0 @@ -uvicorn polybot.fastapi:app --port 5001 --host 0.0.0.0 diff --git a/run.sh b/run.sh deleted file mode 100644 index 3977233..0000000 --- a/run.sh +++ /dev/null @@ -1 +0,0 @@ -uvicorn polybot.fastapi:app --port 5001 --host 0.0.0.0 diff --git a/test-requirements.txt b/test-requirements.txt index 2bee210..5db6d7c 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,7 +1,5 @@ -pytest>=5.2 +pytest>=6.2.4 pytest-cov pytest-mock scikit-learn -uvicorn[standard] -python-multipart flake8