Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributed MNIST example #1

Merged
merged 3 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ build/
dependency-reduced-pom.xml
out/
target
tony-final.xml
2 changes: 1 addition & 1 deletion tony-core/src/main/java/com/linkedin/tony/TonyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private boolean init(String[] args) throws ParseException {

tonyConf.addResource(Constants.TONY_DEFAULT_XML);
if (cliParser.hasOption("conf_file")) {
tonyConf.addResource(cliParser.getOptionValue("conf_file"));
tonyConf.addResource(new Path(cliParser.getOptionValue("conf_file")));
} else {
tonyConf.addResource(Constants.TONY_XML);
}
Expand Down
61 changes: 61 additions & 0 deletions tony-examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
### Running Examples

To run the examples here, you need to:

* build a Python virtual environment with TensorFlow 1.9.0 installed
* install Hadoop 3.1.1+

If you don't have security enabled, you'll also need to provide a custom config file with security turned off.


### Building a Python virtual environment with TensorFlow

TonY requires a Python virtual environment zip with TensorFlow and any needed Python libraries already installed.

```
wget https://files.pythonhosted.org/packages/33/bc/fa0b5347139cd9564f0d44ebd2b147ac97c36b2403943dbee8a25fd74012/virtualenv-16.0.0.tar.gz
tar xf virtualenv-16.0.0.tar.gz

# Make sure to install using Python 3, as TensorFlow only provides Python 3 artifacts
python virtualenv-16.0.0/virtualenv.py venv
. venv/bin/activate
pip install tensorflow==1.9.0
zip -r venv.zip venv
```


### Installing Hadoop

TonY only requires YARN, not HDFS. Please see the [open-source documentation](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html) on how to set YARN up.


### Disabling security

If your Hadoop cluster is not running with security enabled (e.g.: for local testing), you can disable security by creating a config file as follows:

```
<configuration>
<property>
<name>tony.application.insecure-mode</name>
<value>true</value>
</property>
</configuration>
```

For the instructions below, we assume this file is named `tony-test.xml`.


### Running an example

Once you've installed Hadoop and built your Python virtual environment zip, you can run an example as follows:

```
gradlew :tony-cli:build

java -cp `hadoop classpath`:/path/to/TonY/tony-cli/build/libs/tony-cli-x.x.x-all.jar com.linkedin.tony.cli.ClusterSubmitter \
--python_venv=/path/to/venv.zip \
--src_dir=/path/to/TonY/tony-examples/mnist \
--executes=/path/to/TonY/tony-examples/mnist/mnist_distributed.py \
--conf_file=/path/to/tony-test.xml \
--python_binary_path=venv/bin/python
```
Empty file added tony-examples/mnist/__init__.py
Empty file.
246 changes: 246 additions & 0 deletions tony-examples/mnist/mnist_distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""A deep MNIST classifier using convolutional layers.

This example was adapted from
https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/tutorials/mnist/mnist_deep.py.

Each worker reads the full MNIST dataset and asynchronously trains a CNN with dropout and using the Adam optimizer,
updating the model parameters on shared parameter servers.

The current training accuracy is printed out after every 100 steps.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorflow.examples.tutorials.mnist import input_data
from threading import Thread

import json
import logging
import os
import sys
import tensorboard.main as tb_main
import tensorflow as tf

# Environment variable containing port to launch TensorBoard on, set by TonY.
TB_PORT_ENV_VAR = 'TB_PORT'

# Input/output directories
tf.flags.DEFINE_string('data_dir', '/tmp/tensorflow/mnist/input_data',
'Directory for storing input data')
tf.flags.DEFINE_string('working_dir', '/tmp/tensorflow/mnist/working_dir',
'Directory under which events and output will be stored (in separate subdirectories).')

# Training parameters
tf.flags.DEFINE_integer("steps", 2500, "The number of training steps to execute.")
tf.flags.DEFINE_integer("batch_size", 64, "The batch size per step.")

FLAGS = tf.flags.FLAGS


def deepnn(x):
"""deepnn builds the graph for a deep net for classifying digits.

Args:
x: an input tensor with the dimensions (N_examples, 784), where 784 is the
number of pixels in a standard MNIST image.

Returns:
A tuple (y, keep_prob). y is a tensor of shape (N_examples, 10), with values
equal to the logits of classifying the digit into one of 10 classes (the
digits 0-9). keep_prob is a scalar placeholder for the probability of
dropout.
"""
# Reshape to use within a convolutional neural net.
# Last dimension is for "features" - there is only one here, since images are
# grayscale -- it would be 3 for an RGB image, 4 for RGBA, etc.
with tf.name_scope('reshape'):
x_image = tf.reshape(x, [-1, 28, 28, 1])

# First convolutional layer - maps one grayscale image to 32 feature maps.
with tf.name_scope('conv1'):
W_conv1 = weight_variable([5, 5, 1, 32])
b_conv1 = bias_variable([32])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

# Pooling layer - downsamples by 2X.
with tf.name_scope('pool1'):
h_pool1 = max_pool_2x2(h_conv1)

# Second convolutional layer -- maps 32 feature maps to 64.
with tf.name_scope('conv2'):
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

# Second pooling layer.
with tf.name_scope('pool2'):
h_pool2 = max_pool_2x2(h_conv2)

# Fully connected layer 1 -- after 2 round of downsampling, our 28x28 image
# is down to 7x7x64 feature maps -- maps this to 1024 features.
with tf.name_scope('fc1'):
W_fc1 = weight_variable([7 * 7 * 64, 1024])
b_fc1 = bias_variable([1024])

h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

# Dropout - controls the complexity of the model, prevents co-adaptation of
# features.
with tf.name_scope('dropout'):
keep_prob = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

# Map the 1024 features to 10 classes, one for each digit
with tf.name_scope('fc2'):
W_fc2 = weight_variable([1024, 10])
b_fc2 = bias_variable([10])

y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
return y_conv, keep_prob


def conv2d(x, W):
"""conv2d returns a 2d convolution layer with full stride."""
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')


def max_pool_2x2(x):
"""max_pool_2x2 downsamples a feature map by 2X."""
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
strides=[1, 2, 2, 1], padding='SAME')


def weight_variable(shape):
"""weight_variable generates a weight variable of a given shape."""
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)


def bias_variable(shape):
"""bias_variable generates a bias variable of a given shape."""
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)


def create_model():
"""Creates our model and returns the target nodes to be run or populated"""
# Create the model
x = tf.placeholder(tf.float32, [None, 784])

# Define loss and optimizer
y_ = tf.placeholder(tf.int64, [None])

# Build the graph for the deep net
y_conv, keep_prob = deepnn(x)

with tf.name_scope('loss'):
cross_entropy = tf.losses.sparse_softmax_cross_entropy(labels=y_, logits=y_conv)
cross_entropy = tf.reduce_mean(cross_entropy)

global_step = tf.train.get_or_create_global_step()
with tf.name_scope('adam_optimizer'):
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy, global_step=global_step)

with tf.name_scope('accuracy'):
correct_prediction = tf.equal(tf.argmax(y_conv, 1), y_)
correct_prediction = tf.cast(correct_prediction, tf.float32)
accuracy = tf.reduce_mean(correct_prediction)

tf.summary.scalar('cross_entropy_loss', cross_entropy)
tf.summary.scalar('accuracy', accuracy)

merged = tf.summary.merge_all()

return x, y_, keep_prob, global_step, train_step, accuracy, merged


def start_tensorboard(checkpoint_dir):
FLAGS.logdir = checkpoint_dir
if TB_PORT_ENV_VAR in os.environ:
FLAGS.port = os.environ[TB_PORT_ENV_VAR]

tb_thread = Thread(target=tb_main.run_main)
tb_thread.daemon = True

logging.info("Starting TensorBoard with --logdir=" + checkpoint_dir + " in daemon thread...")
tb_thread.start()


def main(_):
logging.getLogger().setLevel(logging.INFO)

cluster_spec_str = os.environ["CLUSTER_SPEC"]
cluster_spec = json.loads(cluster_spec_str)
ps_hosts = cluster_spec['ps']
worker_hosts = cluster_spec['worker']

# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

# Create and start a server for the local task.
job_name = os.environ["JOB_NAME"]
task_index = int(os.environ["TASK_INDEX"])
server = tf.train.Server(cluster, job_name=job_name, task_index=task_index)

if job_name == "ps":
server.join()
elif job_name == "worker":
# Create our model graph. Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
features, labels, keep_prob, global_step, train_step, accuracy, merged = create_model()

if task_index is 0: # chief worker
tf.gfile.MakeDirs(FLAGS.working_dir)
start_tensorboard(FLAGS.working_dir)

# The StopAtStepHook handles stopping after running given steps.
hooks = [tf.train.StopAtStepHook(num_steps=FLAGS.steps)]

with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(task_index == 0),
checkpoint_dir=FLAGS.working_dir,
hooks=hooks) as sess:
# Import data
logging.info('Extracting and loading input data...')
mnist = input_data.read_data_sets(FLAGS.data_dir)

# Train
logging.info('Starting training')
i = 0
while not sess.should_stop():
batch = mnist.train.next_batch(FLAGS.batch_size)
if i % 100 == 0:
step, _, train_accuracy = sess.run([global_step, train_step, accuracy],
feed_dict={features: batch[0], labels: batch[1], keep_prob: 1.0})
logging.info('Step %d, training accuracy: %g' % (step, train_accuracy))
else:
sess.run([global_step, train_step],
feed_dict={features: batch[0], labels: batch[1], keep_prob: 0.5})
i += 1

logging.info('Done training!')
sys.exit()


if __name__ == '__main__':
tf.app.run()