Skip to content

Commit

Permalink
2bit gradient compression (apache#8662)
Browse files Browse the repository at this point in the history
* update two bit compression

* Update trainer.py

* Update test_operator.py

* update two bit compression

* update two bit compression

* update two bit compression

* update

* update

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* update two bit compression

* Update comm.h

* add original size in comrpessed array

* update comm.h

* update distributed training

* update distributed training

* Update ndarray_function.cu

* Update kvstore_dist.h

* Update kvstore_dist.h

* update

* update

* update

* fix bug

* fix

* add GC test

* fix bug in push

* fix push and pull

* fix

* fix

* uncompiled

* kvstore dist changes. added cpp_package. changed strtof function calls

* fix usage of keys in dict

* fix push and pull

* fix

* fix_test

* fix_test

* fix_test

* add print statements

* more print statements and move send command to server

* set compress handling

* kvstore dist changes

* working kvstore push and pull. not sure if I commited that. from this commit removing mutable variable changes for residual array gives working push and pull

* cleanup test

* debug prints

* working kvstore dist. includes mutation of inputs and setting threshold array dtype properly

* fix operator

* kvstore dist changes

* fix compress kvstore issues. non compress is broken

* fix sparse push issue

* fix read lock issue

* optimizer is the only issue now?

* fix all issues with gc dist

* fix read lock issue

* pushing sharded data works

* works most times. sometimes val instead of 0 has parts of 1 or 1.5...

* fix read lock issue

* prev commit fixed seg fault issue on pull without push in a server

* add waittowrite to fix pull before push problems

* refactor quantizing for sharded data

* redo break up of data across servers,clearer split

* refactor to use param for thresholds.
also cleans up code

* Added many checks for 0

* cmake changes

* formatting issues for easier merge

* fix rate

* fix compilation errors after merge

* fix compile error and ndarray thresholds in dequantize

* fix compile error and ndarray thresholds in dequantize

* fix compile error

* fix compile error, and add comments

* update operator comments

* comment checks

* comment checks

* compile error

* working on local kvstore compress test

* fix module api compressparams, and change quantize tblob to inside engine

* 2bit arg wrong kvstore

* remove log

* fix gpu dequantize and tests

* fix seg fault in quantize and test indent

* tests print more info
order of params corrected

* assert almost equal

* more debug stuff
correct profiler message

* intermediate test rewrite

* small change in pushing op to engineh

* fix concurrency of quantization

* wait on kernel

* updated tests and removed prints

* comment unnecessary stuff

* fix test

* remove print

* Update dist_sync_kvstore.py

fix random dist sync test

* remove slow kernel launch init

* cleanup

* undo changes in submodule

* submodule reset

* remove files

* undo changes unrelated to project

* undo changes unrelated to project

* Comments and cleanup.
Remaining are src/kvstore, src/operator and tests

* more cleanup and comments

* comments for tests

* lint changes and comments

* speed up operator test by reducing asnumpy() calls

* random data for test_kvstore_local

* fix variable confusion error in test

* fix randomized data test for local kvstore

* add nrepeat for test_kvstore

* change keys after merge from master introduced same keys

* correct test which fails because grad changes

* change to bit ops

* change to bit ops

* use bit array and revert sign changes

* correct bits setting to 10 as 2

* remove switch in dequantize

* image classification example changes and remove cpp-api

* merge all quantize, and new type in dist server

* fix ndarray dequantize

* debug stuff

* fix bug

* trying merge dequntize

* Frmework and validation tests for operator validation and performance-testing in C++
Normally used for gtest tests.

* Remove obsolete file

* Fix compile error for non-CUDA build

* tweaks in quantize

* Allow for no backward pass

* Remove unused var

* making quantize all compatible as operators

* separate mshadow and loop operators

* working profiler, dequantize mshadow is slow

* fix mshadow dequantize

* fix quantize call by kvdist

* making quantize all compatible as operators

* add profile to measure.py

* minor profiler changes

* timing print in cpp operator

* time quantize

* saving data feature added

* cleanup test

* small updates

* cleanup

* minor fix

* passing additional environment variables through launch.py

* update local test

* update dmlc with pass-env

* fix launch pass env issue

* update with pass-env changes

* fix operator increment of block, remove unncessary commented code

* fix operator increment of block, remove unncessary commented code

* fix operator increment of block, remove unncessary commented code

* fix operator increment of block, remove unncessary commented code

* bring back quantize

Signed-off-by: Rahul <[email protected]>

* fix test

* fix bug with increment of char pointer

* fix bug with increment of char pointer

* debug module

* update test

* comment all debug statements

* change init to normal for now

* remove debug changes

* reorg to create gc class, add delayed start to gc, untested: causing segfault

* redo header files

* remove ps

* remove unused header

* fix compile issues

* remove multiple delete of gc

* add expected to local kvstore test

* fix operator compile issues

* fix operator compile issues

* fix operator compile and link issues

* remove gc.cpp

* add split function

* move setting of active gc

* move all to gc.cpp, compile works for cpu

* WIP gpu compile

* compiles and links on both cpu and gpu

* move prototypes to header

* add split function

* undo changes from master

* remove cpp perf quantize

* undo more changes

* add inactive function so that multiple kvstore dist inits have no compression
fix tests

* undo some formatting changes

* make sharding same when inactive and active

* remove counts and get_active_type

* remove print

* add train caltech

* increase size of mlp

* update to alexa mlp

* pass-env changes

* add bucketing module compression

* attempts for alexnet training

* prepare for merge

* fix lint issues

* fix lint issues

* remove caltech

* address some comments: shared_ptr, documentation, indentaion, new functions, check_eq

* move header

* include header corrected

* include header corrected

* indents, documentation and test update

* lint

* pylint

* rename class, fix local kvstore test, remove confusing active method

* fix importing of compute expected in test_kvstore

* fix bug in device kvstore

* remove active comment in pull

* docstring

* use dmlc params, enums,

Signed-off-by: Rahul <[email protected]>

* doc updates

Signed-off-by: Rahul <[email protected]>

* lint

Signed-off-by: Rahul <[email protected]>

* typo

Signed-off-by: Rahul <[email protected]>

* rename field to type

Signed-off-by: Rahul <[email protected]>

* fix distributed kvstore stopping issue.
frontend was sending command with id=stopServer in old enum

Signed-off-by: Rahul <[email protected]>

* Trigger CI

* trigger CI
  • Loading branch information
rahul003 authored and cjolivier01 committed Nov 19, 2017
1 parent a8f79bc commit a499f89
Show file tree
Hide file tree
Showing 21 changed files with 1,501 additions and 167 deletions.
44 changes: 26 additions & 18 deletions example/image-classification/common/fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ def add_fit_args(parser):
help='1 means test reading speed without training')
train.add_argument('--dtype', type=str, default='float32',
help='precision: float32 or float16')
train.add_argument('--gc-type', type=str, default='none',
help='type of gradient compression to use, \
takes `2bit` or `none` for now')
train.add_argument('--gc-threshold', type=float, default=0.5,
help='threshold for 2bit gradient compression')
return train

def fit(args, network, data_loader, **kwargs):
Expand All @@ -114,6 +119,9 @@ def fit(args, network, data_loader, **kwargs):
"""
# kvstore
kv = mx.kvstore.create(args.kv_store)
if args.gc_type != 'none':
kv.set_gradient_compression({'type': args.gc_type,
'threshold': args.gc_threshold})

# logging
head = '%(asctime)-15s Node[' + str(kv.rank) + '] %(message)s'
Expand Down Expand Up @@ -162,10 +170,10 @@ def fit(args, network, data_loader, **kwargs):

lr_scheduler = lr_scheduler
optimizer_params = {
'learning_rate': lr,
'wd' : args.wd,
'lr_scheduler': lr_scheduler,
'multi_precision': True}
'learning_rate': lr,
'wd' : args.wd,
'lr_scheduler': lr_scheduler,
'multi_precision': True}

# Only a limited number of optimizers have 'momentum' property
has_momentum = {'sgd', 'dcasgd', 'nag'}
Expand Down Expand Up @@ -195,17 +203,17 @@ def fit(args, network, data_loader, **kwargs):

# run
model.fit(train,
begin_epoch = args.load_epoch if args.load_epoch else 0,
num_epoch = args.num_epochs,
eval_data = val,
eval_metric = eval_metrics,
kvstore = kv,
optimizer = args.optimizer,
optimizer_params = optimizer_params,
initializer = initializer,
arg_params = arg_params,
aux_params = aux_params,
batch_end_callback = batch_end_callbacks,
epoch_end_callback = checkpoint,
allow_missing = True,
monitor = monitor)
begin_epoch = args.load_epoch if args.load_epoch else 0,
num_epoch = args.num_epochs,
eval_data = val,
eval_metric = eval_metrics,
kvstore = kv,
optimizer = args.optimizer,
optimizer_params = optimizer_params,
initializer = initializer,
arg_params = arg_params,
aux_params = aux_params,
batch_end_callback = batch_end_callbacks,
epoch_end_callback = checkpoint,
allow_missing = True,
monitor = monitor)
1 change: 0 additions & 1 deletion example/rnn/lstm_bucketing.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
parser.add_argument('--disp-batches', type=int, default=50,
help='show progress for every n batches')


def tokenize_text(fname, vocab=None, invalid_label=-1, start_label=0):
if not os.path.isfile(fname):
raise IOError("Please use get_ptb_data.sh to download requied file (data/ptb.train.txt)")
Expand Down
13 changes: 13 additions & 0 deletions include/mxnet/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,19 @@ MXNET_DLL int MXInitPSEnv(mx_uint num_vars,
*/
MXNET_DLL int MXKVStoreCreate(const char *type,
KVStoreHandle *out);

/*!
* \brief Set parameters to use low-bit compressed gradients
* \param handle handle to the kvstore
* \param keys keys for compression parameters
* \param vals values for compression parameters
* \return 0 when success, -1 when failure happens
*/
MXNET_DLL int MXKVStoreSetGradientCompression(KVStoreHandle handle,
mx_uint num_params,
const char** keys,
const char** vals);

/*!
* \brief Delete a KVStore handle.
* \param handle handle to the kvstore
Expand Down
15 changes: 15 additions & 0 deletions include/mxnet/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <string>
#include <functional>
#include <atomic>
#include "../../src/kvstore/gradient_compression.h"
#include "./ndarray.h"
#if MXNET_USE_DIST_KVSTORE
#include "ps/ps.h"
Expand Down Expand Up @@ -64,6 +65,14 @@ class KVStore {
*/
inline const std::string& type() { return type_; }

/**
* \brief Set parameters to use low-bit compressed gradients
* \param compression_type type of compression
* \param threshold threshold for 2bit compression
*/
virtual void SetGradientCompression(const std::vector<std::pair<std::string, std::string> >
& kwargs) = 0;

/*!
* \brief Initialize a list of key-value pair to the store.
*
Expand Down Expand Up @@ -387,6 +396,12 @@ class KVStore {
*/
std::string type_;

/** \brief Gradient compression object starts with GC_NONE mode
* Used if SetGradientCompression sets the type.
* Currently there is no support for un-setting gradient compression
*/
std::shared_ptr<kvstore::GradientCompression> gradient_compression_;

/**
* \brief whether to do barrier when finalize
*/
Expand Down
12 changes: 10 additions & 2 deletions python/mxnet/gluon/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,20 @@ class Trainer(object):
kvstore : str or KVStore
kvstore type for multi-gpu and distributed training. See help on
:any:`mxnet.kvstore.create` for more information.
compression_params : dict
Specifies type of gradient compression and additional arguments depending
on the type of compression being used. For example, 2bit compression requires a threshold.
Arguments would then be {'type':'2bit', 'threshold':0.5}
See mxnet.KVStore.set_gradient_compression method for more details on gradient compression.
Properties
----------
learning_rate: float
The current learning rate of the optimizer. Given an Optimizer object
optimizer, its learning rate can be accessed as optimizer.learning_rate.
"""
def __init__(self, params, optimizer, optimizer_params=None, kvstore='device'):
def __init__(self, params, optimizer, optimizer_params=None, kvstore='device',
compression_params=None):
if isinstance(params, (dict, ParameterDict)):
params = list(params.values())
if not isinstance(params, (list, tuple)):
Expand All @@ -65,7 +71,7 @@ def __init__(self, params, optimizer, optimizer_params=None, kvstore='device'):
"First argument must be a list or dict of Parameters, " \
"got list of %s."%(type(param)))
self._params.append(param)

self._compression_params = compression_params
optimizer_params = optimizer_params if optimizer_params else {}
self._scale = optimizer_params.get('rescale_grad', 1.0)
self._contexts = self._check_contexts()
Expand Down Expand Up @@ -104,6 +110,8 @@ def _init_kvstore(self):
kvstore, update_on_kvstore = _create_kvstore(self._kvstore, len(self._contexts),
arg_arrays)
if kvstore:
if self._compression_params:
kvstore.set_gradient_compression(self._compression_params)
if 'dist' in kvstore.type:
update_on_kvstore = False
for i, param in enumerate(self._params):
Expand Down
62 changes: 62 additions & 0 deletions python/mxnet/kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ def _ctype_key_value(keys, vals):
else c_array_buf(ctypes.c_int, array('i', [keys] * len(vals)))
return (c_keys, c_handle_array(vals), use_str_keys)

def _ctype_dict(param_dict):
"""
Returns ctype arrays for keys and values(converted to strings) in a dictionary
"""
assert(isinstance(param_dict, dict)), \
"unexpected type for param_dict: " + str(type(param_dict))
c_keys = c_array(ctypes.c_char_p, [c_str(k) for k in param_dict.keys()])
c_vals = c_array(ctypes.c_char_p, [c_str(str(v)) for v in param_dict.values()])
return (c_keys, c_vals)

def _updater_wrapper(updater):
"""A wrapper for the user-defined handle."""
def updater_handle(key, lhs_handle, rhs_handle, _):
Expand Down Expand Up @@ -350,6 +360,58 @@ def row_sparse_pull(self, key, out=None, priority=0, row_ids=None):
check_call(_LIB.MXKVStorePullRowSparse(
self.handle, mx_uint(len(ckeys)), ckeys, cvals, crow_ids, ctypes.c_int(priority)))

def set_gradient_compression(self, compression_params):
""" Specifies type of low-bit quantization for gradient compression \
and additional arguments depending on the type of compression being used.
2bit Gradient Compression takes a positive float `threshold`.
The technique works by thresholding values such that positive values in the
gradient above threshold will be set to threshold. Negative values whose absolute
values are higher than threshold, will be set to the negative of threshold.
Values whose absolute values are less than threshold will be set to 0.
By doing so, each value in the gradient is in one of three states. 2bits are
used to represent these states, and every 16 float values in the original
gradient can be represented using one float. This compressed representation
can reduce communication costs. The difference between these thresholded values and
original values is stored at the sender's end as residual and added to the
gradient in the next iteration.
When kvstore is 'local', gradient compression is used to reduce communication
between multiple devices (gpus). Gradient is quantized on each GPU which
computed the gradients, then sent to the GPU which merges the gradients. This
receiving GPU dequantizes the gradients and merges them. Note that this
increases memory usage on each GPU because of the residual array stored.
When kvstore is 'dist', gradient compression is used to reduce communication
from worker to sender. Gradient is quantized on each worker which
computed the gradients, then sent to the server which dequantizes
this data and merges the gradients from each worker. Note that this
increases CPU memory usage on each worker because of the residual array stored.
Only worker to server communication is compressed in this setting.
If each machine has multiple GPUs, currently this GPU to GPU or GPU to CPU communication
is not compressed. Server to worker communication (in the case of pull)
is also not compressed.
To use 2bit compression, we need to specify `type` as `2bit`.
Only specifying `type` would use default value for the threshold.
To completely specify the arguments for 2bit compression, we would need to pass
a dictionary which includes `threshold` like:
{'type': '2bit', 'threshold': 0.5}
Parameters
----------
compression_params : dict
A dictionary specifying the type and parameters for gradient compression.
The key `type` in this dictionary is a
required string argument and specifies the type of gradient compression.
Currently `type` can be only `2bit`
Other keys in this dictionary are optional and specific to the type
of gradient compression.
"""
ckeys, cvals = _ctype_dict(compression_params)
check_call(_LIB.MXKVStoreSetGradientCompression(self.handle,
mx_uint(len(compression_params)),
ckeys, cvals))

def set_optimizer(self, optimizer):
""" Registers an optimizer with the kvstore.
Expand Down
17 changes: 14 additions & 3 deletions python/mxnet/module/bucketing_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ class BucketingModule(BaseModule):
Instead they are initialized to 0 and can be set by set_states()
group2ctxs : list of dict of str to context
Default is `None`. Mapping the `ctx_group` attribute to the context assignment.
compression_params : dict
Specifies type of gradient compression and additional arguments depending
on the type of compression being used. For example, 2bit compression requires a threshold.
Arguments would then be {'type':'2bit', 'threshold':0.5}
See mxnet.KVStore.set_gradient_compression method for more details on gradient compression.
"""
def __init__(self, sym_gen, default_bucket_key=None, logger=logging,
context=ctx.cpu(), work_load_list=None,
fixed_param_names=None, state_names=None, group2ctxs=None):
fixed_param_names=None, state_names=None, group2ctxs=None,
compression_params=None):
super(BucketingModule, self).__init__(logger=logger)

assert default_bucket_key is not None
Expand All @@ -75,6 +81,7 @@ def __init__(self, sym_gen, default_bucket_key=None, logger=logging,
_check_input_names(symbol, state_names, "state", True)
_check_input_names(symbol, fixed_param_names, "fixed_param", True)

self._compression_params = compression_params
self._fixed_param_names = fixed_param_names
self._state_names = state_names
self._context = context
Expand Down Expand Up @@ -322,7 +329,9 @@ def bind(self, data_shapes, label_shapes=None, for_training=True,
module = Module(symbol, data_names, label_names, logger=self.logger,
context=self._context, work_load_list=self._work_load_list,
fixed_param_names=self._fixed_param_names,
state_names=self._state_names, group2ctxs=self._group2ctxs)
state_names=self._state_names,
group2ctxs=self._group2ctxs,
compression_params=self._compression_params)
module.bind(data_shapes, label_shapes, for_training, inputs_need_grad,
force_rebind=False, shared_module=None, grad_req=grad_req)
self._curr_module = module
Expand Down Expand Up @@ -352,7 +361,9 @@ def switch_bucket(self, bucket_key, data_shapes, label_shapes=None):
logger=self.logger, context=self._context,
work_load_list=self._work_load_list,
fixed_param_names=self._fixed_param_names,
state_names=self._state_names, group2ctxs=self._group2ctxs)
state_names=self._state_names,
group2ctxs=self._group2ctxs,
compression_params=self._compression_params)
module.bind(data_shapes, label_shapes, self._curr_module.for_training,
self._curr_module.inputs_need_grad,
force_rebind=False, shared_module=self._buckets[self._default_bucket_key])
Expand Down
11 changes: 10 additions & 1 deletion python/mxnet/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,16 @@ class Module(BaseModule):
Instead they are initialized to 0 and can be set by `set_states()`.
group2ctxs : list of dict of str to context
Default is `None`. Mapping the `ctx_group` attribute to the context assignment.
compression_params : dict
Specifies type of gradient compression and additional arguments depending
on the type of compression being used. For example, 2bit compression requires a threshold.
Arguments would then be {'type':'2bit', 'threshold':0.5}
See mxnet.KVStore.set_gradient_compression method for more details on gradient compression.
"""
def __init__(self, symbol, data_names=('data',), label_names=('softmax_label',),
logger=logging, context=ctx.cpu(), work_load_list=None,
fixed_param_names=None, state_names=None, group2ctxs=None):
fixed_param_names=None, state_names=None, group2ctxs=None,
compression_params=None):
super(Module, self).__init__(logger=logger)

if isinstance(context, ctx.Context):
Expand Down Expand Up @@ -103,6 +109,7 @@ def __init__(self, symbol, data_names=('data',), label_names=('softmax_label',),
self._aux_params = None
self._params_dirty = False

self._compression_params = compression_params
self._optimizer = None
self._kvstore = None
self._update_on_kvstore = None
Expand Down Expand Up @@ -525,6 +532,8 @@ def init_optimizer(self, kvstore='local', optimizer='sgd',
self._updater = None

if kvstore:
if self._compression_params:
kvstore.set_gradient_compression(self._compression_params)
# copy initialized local parameters to kvstore
_initialize_kvstore(kvstore=kvstore,
param_arrays=self._exec_group.param_arrays,
Expand Down
14 changes: 14 additions & 0 deletions src/c_api/c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,20 @@ int MXKVStoreCreate(const char *type,
API_END();
}

int MXKVStoreSetGradientCompression(KVStoreHandle handle, mx_uint num_params,
const char** keys, const char** vals) {
API_BEGIN();
std::vector<std::pair<std::string, std::string> > params;
for (mx_uint i = 0; i < num_params; ++i) {
std::pair<std::string, std::string> p;
p.first = keys[i];
p.second = vals[i];
params.push_back(p);
}
static_cast<KVStore*>(handle)->SetGradientCompression(params);
API_END();
}

int MXKVStoreFree(KVStoreHandle handle) {
API_BEGIN();
delete static_cast<KVStore*>(handle);
Expand Down
Loading

0 comments on commit a499f89

Please sign in to comment.