Skip to content

Commit

Permalink
Merge pull request #16 from AD-SDL/adc
Browse files Browse the repository at this point in the history
Initial Support for ADC
  • Loading branch information
WardLT authored Sep 21, 2021
2 parents 0aebb08 + acea2f1 commit 53b5a6e
Show file tree
Hide file tree
Showing 21 changed files with 354 additions and 202 deletions.
42 changes: 32 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ which you can use to build the environment using `pip`:

`pip install -e .`

There is one exception: The Argonne Data Cloud SDK is not yet installable via Pip.
You must clone it from https://github.com/AD-SDL/adc-rdm-sdk/ (private repo) then install the package with [Poetry](https://python-poetry.org/).

## Running Polybot

The web service is built from three smaller services.

- Redis: Launch Redis as normal: `redis-server`
- Web Service: Launch FastAPI using Uvicorn: `uvicorn polybot.fastapi:app`
- AI planner: Launch using the CLI: `polybot planner opt_spec.json`

### Configuring PolyBot
Expand Down Expand Up @@ -57,19 +59,39 @@ polybot planner --planning-class planner:BOPlanner
```

Polybot uses [Colmena](http://colmena.rtfd.org/) to express planning algorithms.
At minimum, you will need to implement a ["Thinker"](https://colmena.readthedocs.io/en/latest/how-to.html#creating-a-thinker-application)
that defines the logic for controlling the robot.
Follow our guide to creating a ["Thinker"](https://colmena.readthedocs.io/en/latest/how-to.html#creating-a-thinker-application)
to defines the logic for controlling the robot.
`BasePlanner` is a subclass of the Colmena `BaseThinker` class.
We briefly describe a few common tasks and how to implement them.

#### Responding to Data from Robot

The web service provides result events with the topic "robot," and you must
define logic for how to respond to new data becoming available.
We recommend either explicitly waiting on the results from this topic using the
[`self.queue.get_result`](https://colmena.readthedocs.io/en/latest/how-to.html#submitting-tasks) function
or using the [`@result_processor(topic='robot')`](https://colmena.readthedocs.io/en/latest/thinker.html#result-processing-agents)
decorate.
Typically, the function will end by sending a new task to the robot using the
We access results from the robot by subscribing "study events" from the Argonne Data Cloud (ADC).
The ADC provides a service where one can be sent messages about when a sample is created via web sockets.
Use this feature by first setting the ID for your study as the `ADC_STUDY_ID` environment variable and
then using the `subscribe_to_study` function provided with this library.

A template "Thinker" application that uses these functions would be

```python
from colmena.thinker import agent

from polybot.planning import BasePlanner
from polybot.sample import subscribe_to_study
from polybot.robot import send_new_sample

class RobotPlanner(BasePlanner):

@agent()
def make_tasks(self):
"""This function will run as a thread when you start the planner"""
for sample in subscribe_to_study():
new_sample = ... # Logic to be defined by you!
send_new_sample(new_sample)
```

Note how we receive a new sample from the [`subscribe_to_study`](./polybot/sample.py) function
and then send a new task to the robot using the
[`polybot.robot.send_new_sample`](./polybot/robot.py) function.

#### Performing Computations on Remote Resources
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- scikit-learn
- pandas==1.*
- pydantic
- tqdm
- pytest
- pytest-cov
- flake8
Expand Down
13 changes: 5 additions & 8 deletions example-planners/bayesian-optimization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ complete very quickly (~10s for 3 million inference tasks).

Launch using `polybot planner -p planner:BOPlanner -t local_compute:make_task_server opt_spec.yaml`.

We recommend also launching the FastAPI services from this folder as well so that it uses the same environment variables.

## Tailoring to your system

First, edit the `.env` file in this folder to point to the proper address for the robot and a good path for storing samples.
First, edit the `.env` file in this folder to point to the proper address for the robot
and the Study ID for the collection of completed samples in the Argonne Data cloud.

## Launching the Services

1. Start up Redis
2. Launch the HTTP service for receiving new samples. (Linux: `../../run.sh`, Windows: `..\..\run.bat`)
3. Launch the planner using the `polybot` command listed above
1. Start up Redis (e.g., run `redis-server` in another screen or terminal)
2. Launch the planner using the `polybot` command listed above

Make sure to record the IP address and port number for your HTTP service,
which are needed to configure where the robot should send new data.
The planner will then run until you kill it with <kbd>Ctrl</kbd>+<kbd>C</kbd>.
12 changes: 7 additions & 5 deletions example-planners/bayesian-optimization/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
import numpy as np
from colmena.models import Result
from colmena.redis.queue import ClientQueues
from colmena.thinker import result_processor, agent
from colmena.thinker import agent
from sklearn.feature_selection import VarianceThreshold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.gaussian_process import GaussianProcessRegressor, kernels
from sklearn.model_selection import RepeatedKFold, cross_validate
from modAL.acquisition import EI

from polybot.config import settings
from polybot.robot import send_new_sample
from polybot.sample import load_samples
from polybot.sample import load_samples, subscribe_to_study
from polybot.planning import BasePlanner, OptimizationProblem


Expand Down Expand Up @@ -74,9 +75,10 @@ def startup(self):
self.logger.info('Performing a cold-start')
self.perform_bo()

@result_processor(topic='robot')
def robot_result_handler(self, _: Result):
self.perform_bo()
@agent()
def robot_result_handler(self):
for sample in subscribe_to_study():
self.perform_bo()

def perform_bo(self):
# Make the output directory for results
Expand Down
2 changes: 1 addition & 1 deletion example-planners/bayesian-optimization/simulate-results.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

for f in `find 706data -name "*.json"`; do
# Send data to the web service
curl -H "Content-Type: application/json" -L --data @${f} http://127.0.0.1:8000/ingest
curl -H "Content-Type: application/json" -L --data @${f} http://127.0.0.1:5001/ingest
echo # So that we get newlines
sleep 15
done
141 changes: 141 additions & 0 deletions example-planners/bayesian-optimization/test_funcx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Example script for running the core methods of the FuncX planner"""
from argparse import ArgumentParser
from pathlib import Path
from typing import Tuple

from sklearn.gaussian_process import GaussianProcessRegressor, kernels
from sklearn.feature_selection import VarianceThreshold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from modAL.acquisition import EI
from yaml import SafeLoader
import numpy as np
import yaml

from planner import run_inference
from polybot.models import Sample
from polybot.planning import OptimizationProblem


def fit_model(opt_spec: OptimizationProblem, train_x: np.ndarray, train_y: np.ndarray) -> Pipeline:
"""Fit and test a model using the latest data
Args:
opt_spec: Configuration file for the optimization
train_x: Input columns
train_y: Output column
out_dir: Location to store the data
"""
# Create an initial RBF kernel, using the training set mean as a scaling parameter
kernel = train_y.mean() ** 2 * kernels.RBF(length_scale=1)

# TODO (wardlt): Make it clear where featurization would appear, as we are soon to introduce additives
# This will yield chemical degrees of freedom better captured using features of the additives rather
# than a new variable per additive
# Notes for now: Mol. Weight, Side Chain Length, and ... are the likely candidates

# Add a noise parameter based on user settings
noise = opt_spec.planner_options.get('noise_level', 0)
if noise < 0:
# Use standard deviation of the distribution of train_y will be the estimation of initial noise
# TODO (wardlt): Document where 3, 4, and 11 come from
noise_estimated = np.std(train_y) / 3
noise_lb = noise_estimated / 4
noise_ub = noise_estimated * 11

kernel_noise = kernels.WhiteKernel(noise_level=noise_estimated ** 2,
noise_level_bounds=(noise_lb ** 2, noise_ub ** 2))
kernel = kernel + kernel_noise
elif noise > 0:
kernel = kernel + kernels.WhiteKernel(noise ** 2, noise_level_bounds=(noise ** 2,) * 2)

# Train a GPR model
model = Pipeline([
('variance', VarianceThreshold()),
('scale', StandardScaler()),
('gpr', GaussianProcessRegressor(kernel))
])

# Train and save the model
model.fit(train_x, train_y)
print(f'Finished fitting the model on {len(train_x)} data points')
print(f'Optimized model: {model["gpr"].kernel_}')
return model


def generate_training_set(opt_spec: OptimizationProblem, sample_path) -> Tuple[np.ndarray, np.ndarray]:
"""Load in all of the previous samples to build a training set
Uses the inputs and outputs defined in the optimization specification
Returns:
- Input features
- Output variable
"""

# Get the name of the input columns
input_columns = opt_spec.search_template.input_columns

train_x = []
train_y = []
# Loop over samples in the training data
for path in Path(sample_path).rglob('*.json'):
sample = Sample.parse_file(path)
train_x.append([sample.inputs[c] for c in input_columns]) # Get only the needed input columns
train_y.append(sample.processed_output[opt_spec.output]) # Get the target output column

# Convert them to numpy and return
return np.array(train_x), np.array(train_y)


if __name__ == "__main__":
# Make some CLI arguments
parser = ArgumentParser()
parser.add_argument('--train-files', help='Path to the completed samples', default='samples')
parser.add_argument('--optimization-spec', help='Configuration file', default='opt_spec.yaml')
args = parser.parse_args()

# Load in the optimization configuration
with open(args.optimization_spec) as fp:
opt_spec = yaml.load(fp, Loader=SafeLoader)
opt_spec = OptimizationProblem.parse_obj(opt_spec)

# Get the training data
train_x, train_y = generate_training_set(opt_spec, args.train_files)
print(f'Loaded a training set of {len(train_x)} entries')

# Log-normalize conductivity
train_y = np.log(train_y)

# Fit a model and save the training records
model = fit_model(opt_spec, train_x, train_y)

# Create the search space
possible_options = opt_spec.search_template.generate_search_space_dataframe()
search_x = possible_options[opt_spec.search_template.input_columns]
print(f'Created {len(search_x)} samples to be evaluated')

# Perform the inference
# TODO (wardlt): This is the part that can be parallelized
chunk_size = opt_spec.planner_options.get('chunk_size')
search_y = []
search_std = []
for chunk in np.array_split(search_x, len(search_x) // chunk_size):
y_pred, y_std = run_inference(model, chunk)
search_y.append(y_pred)
search_std.append(y_std)
search_y = np.vstack(search_y)
search_std = np.vstack(search_std)

# Get the largest UCB
assert opt_spec.maximize, "The optimization requests minimization"
ei = EI(search_y, search_std, max_val=np.max(train_y), tradeoff=0.1)
best_ind = np.argmax(ei)
best_point = search_x.iloc[best_ind][opt_spec.search_template.input_columns]

# Make the sample and save it to disk
output = opt_spec.search_template.create_new_sample()
for p, x in zip(opt_spec.search_template.input_columns, best_point):
output.inputs[p] = x
with open('next_sample.json', 'w') as fp:
fp.write(output.json(indent=2))
16 changes: 13 additions & 3 deletions polybot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional, Tuple, List
from urllib.parse import urlparse

from adc.client import ADCClient
from colmena.redis.queue import ClientQueues, TaskServerQueues
from pydantic import BaseSettings, Field, HttpUrl, RedisDsn

Expand All @@ -12,14 +13,15 @@
class Settings(BaseSettings):
"""Settings for the web service"""

# Sample handling
sample_folder: Path = Field(_run_folder / "samples", description="Path in which to store the samples")
# Configuration related to data storage on the Argonne Data Cloud
adc_access_token: Optional[str] = Field(None, description='Token for accessing the Argonne Data Cloud')
adc_study_id: Optional[str] = Field(None, description='Study ID associated with this experiment')

# Logging
log_name: Optional[str] = Field(None, description="Name of the log file. If not provided, logs will not be stored")
log_size: int = Field(1, description="Maximum log size in MB")

# Interface between FastAPI and planning services
# Interface between the thinker and any remote compute processes
redis_url: Optional[RedisDsn] = Field(None, description="URL of the redis service. Used to send messages "
"between web and planning services")

Expand Down Expand Up @@ -66,5 +68,13 @@ def make_server_queue(self) -> TaskServerQueues:
hostname, port = self.redis_info
return TaskServerQueues(hostname, port, name='polybot', topics=['robot'] + self.task_queues)

def generate_adc_client(self) -> ADCClient:
"""Create an authenticated ADC client
Returns:
A client to the ADC that is ready to make queries
"""
return ADCClient(self.adc_access_token)


settings = Settings()
33 changes: 0 additions & 33 deletions polybot/fastapi.py

This file was deleted.

Loading

0 comments on commit 53b5a6e

Please sign in to comment.