diff --git a/BUILD b/BUILD
index 1e6acc3..e023200 100644
--- a/BUILD
+++ b/BUILD
@@ -15,7 +15,6 @@ py_library(
"//tensorflow_compression/python/entropy_models:continuous_batched",
"//tensorflow_compression/python/entropy_models:continuous_indexed",
"//tensorflow_compression/python/entropy_models:universal",
- "//tensorflow_compression/python/layers:entropy_models",
"//tensorflow_compression/python/layers:gdn",
"//tensorflow_compression/python/layers:initializers",
"//tensorflow_compression/python/layers:parameterizers",
diff --git a/tensorflow_compression/__init__.py b/tensorflow_compression/__init__.py
index 3c82606..69592a9 100644
--- a/tensorflow_compression/__init__.py
+++ b/tensorflow_compression/__init__.py
@@ -32,7 +32,6 @@
from tensorflow_compression.python.entropy_models.continuous_indexed import *
from tensorflow_compression.python.entropy_models.universal import *
-from tensorflow_compression.python.layers.entropy_models import *
from tensorflow_compression.python.layers.gdn import *
from tensorflow_compression.python.layers.initializers import *
from tensorflow_compression.python.layers.parameterizers import *
diff --git a/tensorflow_compression/all_tests.py b/tensorflow_compression/all_tests.py
index e7caee1..5fe3067 100644
--- a/tensorflow_compression/all_tests.py
+++ b/tensorflow_compression/all_tests.py
@@ -29,7 +29,6 @@
from tensorflow_compression.python.entropy_models.continuous_indexed_test import *
from tensorflow_compression.python.entropy_models.universal_test import *
-from tensorflow_compression.python.layers.entropy_models_test import *
from tensorflow_compression.python.layers.gdn_test import *
from tensorflow_compression.python.layers.initializers_test import *
from tensorflow_compression.python.layers.parameterizers_test import *
diff --git a/tensorflow_compression/python/layers/BUILD b/tensorflow_compression/python/layers/BUILD
index 09e9802..63d4004 100644
--- a/tensorflow_compression/python/layers/BUILD
+++ b/tensorflow_compression/python/layers/BUILD
@@ -4,16 +4,6 @@ package(
licenses(["notice"])
-py_library(
- name = "entropy_models",
- srcs = ["entropy_models.py"],
- srcs_version = "PY3",
- deps = [
- "//tensorflow_compression/python/ops:math_ops",
- "//tensorflow_compression/python/ops:range_coding_ops",
- ],
-)
-
py_library(
name = "gdn",
srcs = ["gdn.py"],
@@ -61,13 +51,6 @@ py_library(
deps = ["//tensorflow_compression/python/ops:soft_round_ops"],
)
-py_test(
- name = "entropy_models_test",
- srcs = ["entropy_models_test.py"],
- python_version = "PY3",
- deps = [":entropy_models"],
-)
-
py_test(
name = "initializers_test",
srcs = ["initializers_test.py"],
diff --git a/tensorflow_compression/python/layers/entropy_models.py b/tensorflow_compression/python/layers/entropy_models.py
deleted file mode 100644
index 2b77726..0000000
--- a/tensorflow_compression/python/layers/entropy_models.py
+++ /dev/null
@@ -1,1005 +0,0 @@
-# Copyright 2018 Google LLC. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-"""Entropy model layers."""
-
-import numpy as np
-import scipy.stats
-import tensorflow.compat.v1 as tf
-
-from tensorflow.python.keras.engine import input_spec
-from tensorflow_compression.python.ops import math_ops
-from tensorflow_compression.python.ops import range_coding_ops
-
-
-__all__ = [
- "EntropyModel",
- "EntropyBottleneck",
- "SymmetricConditional",
- "GaussianConditional",
- "LogisticConditional",
- "LaplacianConditional",
-]
-
-
-class EntropyModel(tf.keras.layers.Layer):
- """Entropy model (base class)."""
-
- _setattr_tracking = False
-
- def __init__(self, tail_mass=2 ** -8, likelihood_bound=1e-9,
- range_coder_precision=16, **kwargs):
- """Initializer.
-
- Args:
- tail_mass: Float, between 0 and 1. The bottleneck layer automatically
- determines the range of input values based on their frequency of
- occurrence. Values occurring in the tails of the distributions will not
- be encoded with range coding, but using a Golomb-like code. `tail_mass`
- determines the amount of probability mass in the tails which will be
- Golomb-coded. For example, the default value of `2 ** -8` means that on
- average, one 256th of all values will use the Golomb code.
- likelihood_bound: Float. If positive, the returned likelihood values are
- ensured to be greater than or equal to this value. This prevents very
- large gradients with a typical entropy loss (defaults to 1e-9).
- range_coder_precision: Integer, between 1 and 16. The precision of the
- range coder used for compression and decompression. This trades off
- computation speed with compression efficiency, where 16 is the slowest
- but most efficient setting. Choosing lower values may increase the
- average codelength slightly compared to the estimated entropies.
- **kwargs: Other keyword arguments passed to superclass (`Layer`).
- """
- super(EntropyModel, self).__init__(**kwargs)
- self._tail_mass = float(tail_mass)
- if not 0 < self.tail_mass < 1:
- raise ValueError(
- "`tail_mass` must be between 0 and 1, got {}.".format(self.tail_mass))
- self._likelihood_bound = float(likelihood_bound)
- self._range_coder_precision = int(range_coder_precision)
- if tf.executing_eagerly():
- raise NotImplementedError(
- "Keras layer implementations of entropy models are not supported in "
- "eager mode.")
-
- @property
- def tail_mass(self):
- return self._tail_mass
-
- @property
- def likelihood_bound(self):
- return self._likelihood_bound
-
- @property
- def range_coder_precision(self):
- return self._range_coder_precision
-
- def _quantize(self, inputs, mode):
- """Perturb or quantize a `Tensor` and optionally dequantize.
-
- Args:
- inputs: `Tensor`. The input values.
- mode: String. Can take on one of three values: `'noise'` (adds uniform
- noise), `'dequantize'` (quantizes and dequantizes), and `'symbols'`
- (quantizes and produces integer symbols for range coder).
-
- Returns:
- The quantized/perturbed `inputs`. The returned `Tensor` should have type
- `self.dtype` if mode is `'noise'`, `'dequantize'`; `tf.int32` if mode is
- `'symbols'`.
- """
- raise NotImplementedError("Must inherit from EntropyModel.")
-
- def _dequantize(self, inputs, mode):
- """Dequantize a `Tensor`.
-
- The opposite to `_quantize(inputs, mode='symbols')`.
-
- Args:
- inputs: `Tensor`. The range coder symbols.
- mode: String. Must be `'dequantize'`.
-
- Returns:
- The dequantized `inputs`. The returned `Tensor` should have type
- `self.dtype`.
- """
- raise NotImplementedError("Must inherit from EntropyModel.")
-
- def _likelihood(self, inputs):
- """Compute the likelihood of the inputs under the model.
-
- Args:
- inputs: `Tensor`. The input values.
-
- Returns:
- `Tensor` of same shape and type as `inputs`, giving the likelihoods
- evaluated at `inputs`.
- """
- raise NotImplementedError("Must inherit from EntropyModel.")
-
- def _pmf_to_cdf(self, pmf, tail_mass, pmf_length, max_length):
- """Helper function for computing the CDF from the PMF."""
-
- # Prevent tensors from bouncing back and forth between host and GPU.
- with tf.device("/cpu:0"):
- def loop_body(args):
- prob, length, tail = args
- prob = tf.concat([prob[:length], tail], axis=0)
- cdf = range_coding_ops.pmf_to_quantized_cdf(
- prob, precision=self.range_coder_precision)
- return tf.pad(
- cdf, [[0, max_length - length]], mode="CONSTANT", constant_values=0)
-
- return tf.map_fn(
- loop_body, (pmf, pmf_length, tail_mass),
- dtype=tf.int32, back_prop=False, name="pmf_to_cdf")
-
- def call(self, inputs, training):
- """Pass a tensor through the bottleneck.
-
- Args:
- inputs: The tensor to be passed through the bottleneck.
- training: Boolean. If `True`, returns a differentiable approximation of
- the inputs, and their likelihoods under the modeled probability
- densities. If `False`, returns the quantized inputs and their
- likelihoods under the corresponding probability mass function. These
- quantities can't be used for training, as they are not differentiable,
- but represent actual compression more closely.
-
- Returns:
- values: `Tensor` with the same shape as `inputs` containing the perturbed
- or quantized input values.
- likelihood: `Tensor` with the same shape as `inputs` containing the
- likelihood of `values` under the modeled probability distributions.
-
- Raises:
- ValueError: if `inputs` has an integral or inconsistent `DType`, or
- inconsistent number of channels.
- """
- inputs = tf.convert_to_tensor(inputs, dtype=self.dtype)
- if inputs.dtype.is_integer:
- raise ValueError(
- "{} can't take integer inputs.".format(type(self).__name__))
-
- outputs = self._quantize(inputs, "noise" if training else "dequantize")
- assert outputs.dtype == self.dtype
- likelihood = self._likelihood(outputs)
- if self.likelihood_bound > 0:
- likelihood_bound = tf.constant(self.likelihood_bound, dtype=self.dtype)
- likelihood = math_ops.lower_bound(likelihood, likelihood_bound)
-
- if not tf.executing_eagerly():
- outputs_shape, likelihood_shape = self.compute_output_shape(inputs.shape)
- outputs.set_shape(outputs_shape)
- likelihood.set_shape(likelihood_shape)
-
- return outputs, likelihood
-
- def compress(self, inputs):
- """Compress inputs and store their binary representations into strings.
-
- Args:
- inputs: `Tensor` with values to be compressed.
-
- Returns:
- compressed: String `Tensor` vector containing the compressed
- representation of each batch element of `inputs`.
-
- Raises:
- ValueError: if `inputs` has an integral or inconsistent `DType`, or
- inconsistent number of channels.
- """
- with tf.name_scope(self._name_scope()):
- inputs = tf.convert_to_tensor(inputs, dtype=self.dtype)
- if not self.built:
- # Check input assumptions set before layer building, e.g. input rank.
- input_spec.assert_input_compatibility(
- self.input_spec, inputs, self.name)
- if self.dtype is None:
- self._dtype = inputs.dtype.base_dtype.name
- self.build(inputs.shape)
-
- # Check input assumptions set after layer building, e.g. input shape.
- if not tf.executing_eagerly():
- input_spec.assert_input_compatibility(
- self.input_spec, inputs, self.name)
- if inputs.dtype.is_integer:
- raise ValueError(
- "{} can't take integer inputs.".format(type(self).__name__))
-
- symbols = self._quantize(inputs, "symbols")
- assert symbols.dtype == tf.int32
-
- ndim = self.input_spec.ndim
- indexes = self._prepare_indexes(shape=tf.shape(symbols)[1:])
- broadcast_indexes = (indexes.shape.ndims != ndim)
- if broadcast_indexes:
- # We can't currently broadcast over anything else but the batch axis.
- assert indexes.shape.ndims == ndim - 1
- args = (symbols,)
- else:
- args = (symbols, indexes)
-
- def loop_body(args):
- string = range_coding_ops.unbounded_index_range_encode(
- args[0], indexes if broadcast_indexes else args[1],
- self._quantized_cdf, self._cdf_length, self._offset,
- precision=self.range_coder_precision, overflow_width=4,
- debug_level=0)
- return string
-
- strings = tf.map_fn(
- loop_body, args, dtype=tf.string,
- back_prop=False, name="compress")
-
- if not tf.executing_eagerly():
- strings.set_shape(inputs.shape[:1])
-
- return strings
-
- def decompress(self, strings, **kwargs):
- """Decompress values from their compressed string representations.
-
- Args:
- strings: A string `Tensor` vector containing the compressed data.
- **kwargs: Model-specific keyword arguments.
-
- Returns:
- The decompressed `Tensor`.
- """
- with tf.name_scope(self._name_scope()):
- strings = tf.convert_to_tensor(strings, dtype=tf.string)
-
- indexes = self._prepare_indexes(**kwargs)
- ndim = self.input_spec.ndim
- broadcast_indexes = (indexes.shape.ndims != ndim)
- if broadcast_indexes:
- # We can't currently broadcast over anything else but the batch axis.
- assert indexes.shape.ndims == ndim - 1
- args = (strings,)
- else:
- args = (strings, indexes)
-
- def loop_body(args):
- symbols = range_coding_ops.unbounded_index_range_decode(
- args[0], indexes if broadcast_indexes else args[1],
- self._quantized_cdf, self._cdf_length, self._offset,
- precision=self.range_coder_precision, overflow_width=4,
- debug_level=0)
- return symbols
-
- symbols = tf.map_fn(
- loop_body, args, dtype=tf.int32, back_prop=False, name="decompress")
-
- outputs = self._dequantize(symbols, "dequantize")
- assert outputs.dtype == self.dtype
-
- if not tf.executing_eagerly():
- outputs.set_shape(self.input_spec.shape)
-
- return outputs
-
- def compute_output_shape(self, input_shape):
- input_shape = tf.TensorShape(input_shape)
- return input_shape, input_shape
-
-
-class EntropyBottleneck(EntropyModel):
- """Entropy bottleneck layer.
-
- This layer models the entropy of the tensor passing through it. During
- training, this can be used to impose a (soft) entropy constraint on its
- activations, limiting the amount of information flowing through the layer.
- After training, the layer can be used to compress any input tensor to a
- string, which may be written to a file, and to decompress a file which it
- previously generated back to a reconstructed tensor. The entropies estimated
- during training or evaluation are approximately equal to the average length of
- the strings in bits.
-
- The layer implements a flexible probability density model to estimate entropy
- of its input tensor, which is described in the appendix of the paper (please
- cite the paper if you use this code for scientific work):
-
- > "Variational image compression with a scale hyperprior"
- > J. Ballé, D. Minnen, S. Singh, S. J. Hwang, N. Johnston
- > https://arxiv.org/abs/1802.01436
-
- The layer assumes that the input tensor is at least 2D, with a batch dimension
- at the beginning and a channel dimension as specified by `data_format`. The
- layer trains an independent probability density model for each channel, but
- assumes that across all other dimensions, the inputs are i.i.d. (independent
- and identically distributed).
-
- Because data compression always involves discretization, the outputs of the
- layer are generally only approximations of its inputs. During training,
- discretization is modeled using additive uniform noise to ensure
- differentiability. The entropies computed during training are differential
- entropies. During evaluation, the data is actually quantized, and the
- entropies are discrete (Shannon entropies). To make sure the approximated
- tensor values are good enough for practical purposes, the training phase must
- be used to balance the quality of the approximation with the entropy, by
- adding an entropy term to the training loss. See the example in the package
- documentation to get started.
-
- Note: the layer always produces exactly one auxiliary loss and one update op,
- which are only significant for compression and decompression. To use the
- compression feature, the auxiliary loss must be minimized during or after
- training. After that, the update op must be executed at least once.
- """
-
- def __init__(self, init_scale=10, filters=(3, 3, 3),
- data_format="channels_last", **kwargs):
- """Initializer.
-
- Args:
- init_scale: Float. A scaling factor determining the initial width of the
- probability densities. This should be chosen big enough so that the
- range of values of the layer inputs roughly falls within the interval
- [`-init_scale`, `init_scale`] at the beginning of training.
- filters: An iterable of ints, giving the number of filters at each layer
- of the density model. Generally, the more filters and layers, the more
- expressive is the density model in terms of modeling more complicated
- distributions of the layer inputs. For details, refer to the paper
- referenced above. The default is `[3, 3, 3]`, which should be sufficient
- for most practical purposes.
- data_format: Either `'channels_first'` or `'channels_last'` (default).
- **kwargs: Other keyword arguments passed to superclass (`EntropyModel`).
- """
- super(EntropyBottleneck, self).__init__(**kwargs)
- self._init_scale = float(init_scale)
- self._filters = tuple(int(f) for f in filters)
- self._data_format = str(data_format)
- self.input_spec = tf.keras.layers.InputSpec(min_ndim=2)
-
- if self.data_format not in ("channels_first", "channels_last"):
- raise ValueError("Unknown data format: '{}'.".format(self.data_format))
-
- @property
- def init_scale(self):
- return self._init_scale
-
- @property
- def filters(self):
- return self._filters
-
- @property
- def data_format(self):
- return self._data_format
-
- def _channel_axis(self, ndim):
- return {"channels_first": 1, "channels_last": ndim - 1}[self.data_format]
-
- def _get_input_dims(self):
- """Returns a few useful numbers related to input dimensionality.
-
- Returns:
- ndim: Integer. Number of input dimensions including batch.
- channel_axis: Integer. Index of dimension that enumerates channels.
- channels: Integer. Number of channels in inputs.
- input_slices: Tuple of slices. Can be used as an index to expand a vector
- to input dimensions, where the vector now runs across channels.
- """
- ndim = self.input_spec.ndim
- channel_axis = self._channel_axis(ndim)
- channels = self.input_spec.axes[channel_axis]
- # Tuple of slices for expanding tensors to input shape.
- input_slices = ndim * [None]
- input_slices[channel_axis] = slice(None)
- input_slices = tuple(input_slices)
- return ndim, channel_axis, channels, input_slices
-
- def _logits_cumulative(self, inputs, stop_gradient):
- """Evaluate logits of the cumulative densities.
-
- Args:
- inputs: The values at which to evaluate the cumulative densities, expected
- to be a `Tensor` of shape `(channels, 1, batch)`.
- stop_gradient: Boolean. Whether to add `tf.stop_gradient` calls so
- that the gradient of the output with respect to the density model
- parameters is disconnected (the gradient with respect to `inputs` is
- left untouched).
-
- Returns:
- A `Tensor` of the same shape as `inputs`, containing the logits of the
- cumulative densities evaluated at the given inputs.
- """
- logits = inputs
-
- for i in range(len(self.filters) + 1):
- matrix = self._matrices[i]
- if stop_gradient:
- matrix = tf.stop_gradient(matrix)
- logits = tf.linalg.matmul(matrix, logits)
-
- bias = self._biases[i]
- if stop_gradient:
- bias = tf.stop_gradient(bias)
- logits += bias
-
- if i < len(self._factors):
- factor = self._factors[i]
- if stop_gradient:
- factor = tf.stop_gradient(factor)
- logits += factor * tf.math.tanh(logits)
-
- return logits
-
- def build(self, input_shape):
- """Builds the entropy model.
-
- Creates the variables for the network modeling the densities, creates the
- auxiliary loss estimating the median and tail quantiles of the densities,
- and then uses that to create the probability mass functions and the discrete
- cumulative density functions used by the range coder.
-
- Args:
- input_shape: Shape of the input tensor, used to get the number of
- channels.
-
- Raises:
- ValueError: if `input_shape` doesn't specify the length of the channel
- dimension.
- """
- input_shape = tf.TensorShape(input_shape)
- channel_axis = self._channel_axis(input_shape.ndims)
- channels = input_shape.as_list()[channel_axis]
- if channels is None:
- raise ValueError("The channel dimension of the inputs must be defined.")
- self.input_spec = tf.keras.layers.InputSpec(
- ndim=input_shape.ndims, axes={channel_axis: channels})
- filters = (1,) + self.filters + (1,)
- scale = self.init_scale ** (1 / (len(self.filters) + 1))
-
- # Create variables.
- self._matrices = []
- self._biases = []
- self._factors = []
- for i in range(len(self.filters) + 1):
- init = np.log(np.expm1(1 / scale / filters[i + 1]))
- matrix = self.add_weight(
- "matrix_{}".format(i), dtype=self.dtype,
- shape=(channels, filters[i + 1], filters[i]),
- initializer=tf.initializers.constant(init))
- matrix = tf.nn.softplus(matrix)
- self._matrices.append(matrix)
-
- bias = self.add_weight(
- "bias_{}".format(i), dtype=self.dtype,
- shape=(channels, filters[i + 1], 1),
- initializer=tf.initializers.random_uniform(-.5, .5))
- self._biases.append(bias)
-
- if i < len(self.filters):
- factor = self.add_weight(
- "factor_{}".format(i), dtype=self.dtype,
- shape=(channels, filters[i + 1], 1),
- initializer=tf.initializers.zeros())
- factor = tf.math.tanh(factor)
- self._factors.append(factor)
-
- # To figure out what range of the densities to sample, we need to compute
- # the quantiles given by `tail_mass / 2` and `1 - tail_mass / 2`. Since we
- # can't take inverses of the cumulative directly, we make it an optimization
- # problem:
- # `quantiles = argmin(|logit(cumulative) - target|)`
- # where `target` is `logit(tail_mass / 2)` or `logit(1 - tail_mass / 2)`.
- # Taking the logit (inverse of sigmoid) of the cumulative makes the
- # representation of the right target more numerically stable.
-
- # Numerically stable way of computing logits of `tail_mass / 2`
- # and `1 - tail_mass / 2`.
- target = np.log(2 / self.tail_mass - 1)
- # Compute lower and upper tail quantile as well as median.
- target = tf.constant([-target, 0, target], dtype=self.dtype)
-
- def quantiles_initializer(shape, dtype=None, partition_info=None):
- del partition_info # unused
- assert tuple(shape[1:]) == (1, 3)
- init = tf.constant(
- [[[-self.init_scale, 0, self.init_scale]]], dtype=dtype)
- return tf.tile(init, (shape[0], 1, 1))
-
- quantiles = self.add_weight(
- "quantiles", shape=(channels, 1, 3), dtype=self.dtype,
- initializer=quantiles_initializer,
- aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)
- logits = self._logits_cumulative(quantiles, stop_gradient=True)
- loss = tf.math.reduce_sum(abs(logits - target))
- self.add_loss(loss, inputs=None)
-
- # Quantize such that the median coincides with the center of a bin.
- medians = quantiles[:, 0, 1]
- self._medians = tf.stop_gradient(medians)
-
- # Largest distance observed between lower tail quantile and median, and
- # between median and upper tail quantile.
- minima = medians - quantiles[:, 0, 0]
- minima = tf.cast(tf.math.ceil(minima), tf.int32)
- minima = tf.math.maximum(minima, 0)
- maxima = quantiles[:, 0, 2] - medians
- maxima = tf.cast(tf.math.ceil(maxima), tf.int32)
- maxima = tf.math.maximum(maxima, 0)
-
- # PMF starting positions and lengths.
- self._offset = -minima
- pmf_start = medians - tf.cast(minima, self.dtype)
- pmf_length = maxima + minima + 1
-
- # Sample the densities in the computed ranges, possibly computing more
- # samples than necessary at the upper end.
- max_length = tf.math.reduce_max(pmf_length)
- samples = tf.range(tf.cast(max_length, self.dtype), dtype=self.dtype)
- samples += pmf_start[:, None, None]
-
- half = tf.constant(.5, dtype=self.dtype)
- # We strip the sigmoid from the end here, so we can use the special rule
- # below to only compute differences in the left tail of the sigmoid.
- # This increases numerical stability (see explanation in `call`).
- lower = self._logits_cumulative(samples - half, stop_gradient=True)
- upper = self._logits_cumulative(samples + half, stop_gradient=True)
- # Flip signs if we can move more towards the left tail of the sigmoid.
- sign = -tf.math.sign(tf.math.add_n([lower, upper]))
- pmf = abs(tf.math.sigmoid(sign * upper) - tf.math.sigmoid(sign * lower))
- pmf = pmf[:, 0, :]
-
- # Compute out-of-range (tail) masses.
- tail_mass = tf.math.add_n([
- tf.math.sigmoid(lower[:, 0, :1]),
- tf.math.sigmoid(-upper[:, 0, -1:]),
- ])
-
- # Construct a valid CDF initializer, so that we can run the model without
- # error even on the zeroth training step.
- def cdf_initializer(shape, dtype=None, partition_info=None):
- del shape, partition_info # unused
- assert dtype == tf.int32
- fill = tf.constant(.5, dtype=self.dtype)
- prob = tf.fill((channels, 2), fill)
- cdf = range_coding_ops.pmf_to_quantized_cdf(
- prob, precision=self.range_coder_precision)
- return tf.placeholder_with_default(cdf, shape=(channels, None))
-
- # We need to supply an initializer without fully defined static shape
- # here, or the variable will return the wrong dynamic shape later. A
- # placeholder with default gets the trick done (see initializer above).
- quantized_cdf = self.add_weight(
- "quantized_cdf",
- shape=(channels, None),
- dtype=tf.int32,
- trainable=False,
- initializer=cdf_initializer,
- aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)
- cdf_length = self.add_weight(
- "cdf_length", shape=(channels,), dtype=tf.int32, trainable=False,
- initializer=tf.initializers.constant(3),
- aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)
- # Works around a weird TF issue with reading variables inside a loop.
- self._quantized_cdf = tf.identity(quantized_cdf)
- self._cdf_length = tf.identity(cdf_length)
-
- update_cdf = tf.assign(
- quantized_cdf,
- self._pmf_to_cdf(pmf, tail_mass, pmf_length, max_length),
- validate_shape=False)
- update_length = tf.assign(
- cdf_length,
- pmf_length + 2)
- update_op = tf.group(update_cdf, update_length)
- self.add_update(update_op)
-
- super(EntropyBottleneck, self).build(input_shape)
-
- def _quantize(self, inputs, mode):
- # Add noise or quantize (and optionally dequantize in one step).
- half = tf.constant(.5, dtype=self.dtype)
- _, _, _, input_slices = self._get_input_dims()
-
- if mode == "noise":
- noise = tf.random.uniform(tf.shape(inputs), -half, half)
- return tf.math.add_n([inputs, noise])
-
- medians = self._medians[input_slices]
- outputs = tf.math.floor(inputs + (half - medians))
-
- if mode == "dequantize":
- outputs = tf.cast(outputs, self.dtype)
- return outputs + medians
- else:
- assert mode == "symbols", mode
- outputs = tf.cast(outputs, tf.int32)
- return outputs
-
- def _dequantize(self, inputs, mode):
- _, _, _, input_slices = self._get_input_dims()
- medians = self._medians[input_slices]
- outputs = tf.cast(inputs, self.dtype)
- return outputs + medians
-
- def _likelihood(self, inputs):
- ndim, channel_axis, _, _ = self._get_input_dims()
- half = tf.constant(.5, dtype=self.dtype)
-
- # Convert to (channels, 1, batch) format by commuting channels to front
- # and then collapsing.
- order = list(range(ndim))
- order.pop(channel_axis)
- order.insert(0, channel_axis)
- inputs = tf.transpose(inputs, order)
- shape = tf.shape(inputs)
- inputs = tf.reshape(inputs, (shape[0], 1, -1))
-
- # Evaluate densities.
- # We can use the special rule below to only compute differences in the left
- # tail of the sigmoid. This increases numerical stability: sigmoid(x) is 1
- # for large x, 0 for small x. Subtracting two numbers close to 0 can be done
- # with much higher precision than subtracting two numbers close to 1.
- lower = self._logits_cumulative(inputs - half, stop_gradient=False)
- upper = self._logits_cumulative(inputs + half, stop_gradient=False)
- # Flip signs if we can move more towards the left tail of the sigmoid.
- sign = -tf.math.sign(tf.math.add_n([lower, upper]))
- sign = tf.stop_gradient(sign)
- likelihood = abs(
- tf.math.sigmoid(sign * upper) - tf.math.sigmoid(sign * lower))
-
- # Convert back to input tensor shape.
- order = list(range(1, ndim))
- order.insert(channel_axis, 0)
- likelihood = tf.reshape(likelihood, shape)
- likelihood = tf.transpose(likelihood, order)
-
- return likelihood
-
- def _prepare_indexes(self, shape, channels=None):
- shape = tf.convert_to_tensor(shape)
-
- if not self.built:
- if not (shape.shape.is_fully_defined() and shape.shape.ndims == 1):
- raise ValueError("`shape` must be a vector with known length.")
- ndim = shape.shape.as_list()[0] + 1
- channel_axis = self._channel_axis(ndim)
- input_shape = ndim * [None]
- input_shape[channel_axis] = channels
- self.build(input_shape)
-
- _, channel_axis, channels, input_slices = self._get_input_dims()
-
- # TODO(jonycgn, ssjhv): Investigate broadcasting.
- indexes = tf.range(channels, dtype=tf.int32)
- indexes = tf.cast(indexes, tf.int32)
- tiles = tf.concat(
- [shape[:channel_axis - 1], [1], shape[channel_axis:]], axis=0)
- indexes = tf.tile(indexes[input_slices[1:]], tiles)
-
- return indexes
-
- # Just giving a more useful docstring.
- def decompress(self, strings, shape, channels=None):
- """Decompress values from their compressed string representations.
-
- Args:
- strings: A string `Tensor` vector containing the compressed data.
- shape: A `Tensor` vector of int32 type. Contains the shape of the tensor
- to be decompressed, excluding the batch dimension.
- channels: Integer. Specifies the number of channels statically. Needs only
- be set if the layer hasn't been built yet (i.e., this is the first input
- it receives).
-
- Returns:
- The decompressed `Tensor`. Its shape will be equal to `shape` prepended
- with the batch dimension from `strings`.
-
- Raises:
- ValueError: If the length of `shape` isn't available at graph construction
- time.
- """
- return super(EntropyBottleneck, self).decompress(
- strings, shape=shape, channels=channels)
-
-
-class SymmetricConditional(EntropyModel):
- """Symmetric conditional entropy model (base class)."""
-
- def __init__(self, scale, scale_table,
- scale_bound=None, mean=None, indexes=None, **kwargs):
- """Initializer.
-
- Args:
- scale: `Tensor`, the scale parameters for the conditional distributions.
- scale_table: Iterable of positive floats. For range coding, the scale
- parameters in `scale` can't be used, because the probability tables need
- to be constructed statically. Only the values given in this table will
- actually be used for range coding. For each predicted scale, the next
- greater entry in the table is selected. It's optimal to choose the
- scales provided here in a logarithmic way.
- scale_bound: Float. Lower bound for scales. Any values in `scale` smaller
- than this value are set to this value to prevent non-positive scales. By
- default (or when set to `None`), uses the smallest value in
- `scale_table`. To disable, set to 0.
- mean: `Tensor`, the mean parameters for the conditional distributions. If
- `None`, the mean is assumed to be zero.
- indexes: `Tensor` of type `int32` or `None`. Can be used to override the
- selection of scale table indexes based on the predicted values in
- `scale`. Only affects compression and decompression.
- **kwargs: Other keyword arguments passed to superclass (`EntropyModel`).
- """
- super(SymmetricConditional, self).__init__(**kwargs)
- self._scale = tf.convert_to_tensor(scale)
- input_shape = self.scale.shape
- self._scale_table = tuple(sorted(float(s) for s in scale_table))
- if any(s <= 0 for s in self.scale_table):
- raise ValueError("`scale_table` must be an iterable of positive numbers.")
- self._scale_bound = None if scale_bound is None else float(scale_bound)
- self._mean = None if mean is None else tf.convert_to_tensor(mean)
- if indexes is not None:
- self._indexes = tf.convert_to_tensor(indexes)
- if self.indexes.dtype != tf.int32:
- raise ValueError("`indexes` must have `int32` dtype.")
- input_shape = input_shape.merge_with(self.indexes.shape)
- if input_shape.ndims is None:
- raise ValueError(
- "Number of dimensions of `scale` or `indexes` must be known.")
- self.input_spec = tf.keras.layers.InputSpec(shape=input_shape)
-
- @property
- def scale(self):
- return self._scale
-
- @property
- def scale_table(self):
- return self._scale_table
-
- @property
- def scale_bound(self):
- return self._scale_bound
-
- @property
- def mean(self):
- return self._mean
-
- @property
- def indexes(self):
- return self._indexes
-
- def _standardized_cumulative(self, inputs):
- """Evaluate the standardized cumulative density.
-
- Note: This function should be optimized to give the best possible numerical
- accuracy for negative input values.
-
- Args:
- inputs: `Tensor`. The values at which to evaluate the cumulative density.
-
- Returns:
- A `Tensor` of the same shape as `inputs`, containing the cumulative
- density evaluated at the given inputs.
- """
- raise NotImplementedError("Must inherit from SymmetricConditional.")
-
- def _standardized_quantile(self, quantile):
- """Evaluate the standardized quantile function.
-
- This returns the inverse of the standardized cumulative function for a
- scalar.
-
- Args:
- quantile: Float. The values at which to evaluate the quantile function.
-
- Returns:
- A float giving the inverse CDF value.
- """
- raise NotImplementedError("Must inherit from SymmetricConditional.")
-
- def build(self, input_shape):
- """Builds the entropy model.
-
- This function precomputes the quantized CDF table based on the scale table.
- This can be done at graph construction time. Then, it creates the graph for
- computing the indexes into that table based on the scale tensor, and then
- uses this index tensor to determine the starting positions of the PMFs for
- each scale.
-
- Args:
- input_shape: Shape of the input tensor.
-
- Raises:
- ValueError: If `input_shape` doesn't specify number of input dimensions.
- """
- input_shape = tf.TensorShape(input_shape)
- input_shape.assert_is_compatible_with(self.input_spec.shape)
-
- scale_table = tf.constant(self.scale_table, dtype=self.dtype)
-
- # Lower bound scales. We need to do this here, and not in __init__, because
- # the dtype may not yet be known there.
- if self.scale_bound is None:
- self._scale = math_ops.lower_bound(self._scale, scale_table[0])
- elif self.scale_bound > 0:
- self._scale = math_ops.lower_bound(self._scale, self.scale_bound)
-
- multiplier = -self._standardized_quantile(self.tail_mass / 2)
- pmf_center = np.ceil(np.array(self.scale_table) * multiplier).astype(int)
- pmf_length = 2 * pmf_center + 1
- max_length = np.max(pmf_length)
-
- # This assumes that the standardized cumulative has the property
- # 1 - c(x) = c(-x), which means we can compute differences equivalently in
- # the left or right tail of the cumulative. The point is to only compute
- # differences in the left tail. This increases numerical stability: c(x) is
- # 1 for large x, 0 for small x. Subtracting two numbers close to 0 can be
- # done with much higher precision than subtracting two numbers close to 1.
- samples = abs(np.arange(max_length, dtype=int) - pmf_center[:, None])
- samples = tf.constant(samples, dtype=self.dtype)
- samples_scale = tf.expand_dims(scale_table, 1)
- upper = self._standardized_cumulative((.5 - samples) / samples_scale)
- lower = self._standardized_cumulative((-.5 - samples) / samples_scale)
- pmf = upper - lower
-
- # Compute out-of-range (tail) masses.
- tail_mass = 2 * lower[:, :1]
-
- def cdf_initializer(shape, dtype=None, partition_info=None):
- del partition_info # unused
- assert tuple(shape) == (len(pmf_length), max_length + 2)
- assert dtype == tf.int32
- return self._pmf_to_cdf(
- pmf, tail_mass,
- tf.constant(pmf_length, dtype=tf.int32), max_length)
-
- quantized_cdf = self.add_weight(
- "quantized_cdf", shape=(len(pmf_length), max_length + 2),
- initializer=cdf_initializer, dtype=tf.int32, trainable=False,
- aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)
- cdf_length = self.add_weight(
- "cdf_length", shape=(len(pmf_length),),
- initializer=tf.initializers.constant(pmf_length + 2),
- dtype=tf.int32, trainable=False,
- aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)
- # Works around a weird TF issue with reading variables inside a loop.
- self._quantized_cdf = tf.identity(quantized_cdf)
- self._cdf_length = tf.identity(cdf_length)
-
- # Now, if they haven't been overridden, compute the indexes into the table
- # for each of the passed-in scales.
- if not hasattr(self, "_indexes"):
- # Prevent tensors from bouncing back and forth between host and GPU.
- with tf.device("/cpu:0"):
- fill = tf.constant(
- len(self.scale_table) - 1, dtype=tf.int32)
- initializer = tf.fill(tf.shape(self.scale), fill)
-
- def loop_body(indexes, scale):
- return indexes - tf.cast(self.scale <= scale, tf.int32)
-
- self._indexes = tf.foldr(
- loop_body, scale_table[:-1],
- initializer=initializer, back_prop=False, name="compute_indexes")
-
- self._offset = tf.constant(-pmf_center, dtype=tf.int32)
-
- super(SymmetricConditional, self).build(input_shape)
-
- def _quantize(self, inputs, mode):
- # Add noise or quantize (and optionally dequantize in one step).
- half = tf.constant(.5, dtype=self.dtype)
-
- if mode == "noise":
- noise = tf.random.uniform(tf.shape(inputs), -half, half)
- return tf.math.add_n([inputs, noise])
-
- outputs = inputs
- if self.mean is not None:
- outputs -= self.mean
- outputs = tf.math.floor(outputs + half)
-
- if mode == "dequantize":
- if self.mean is not None:
- outputs += self.mean
- return outputs
- else:
- assert mode == "symbols", mode
- outputs = tf.cast(outputs, tf.int32)
- return outputs
-
- def _dequantize(self, inputs, mode):
- assert mode == "dequantize"
- outputs = tf.cast(inputs, self.dtype)
- if self.mean is not None:
- outputs += self.mean
- return outputs
-
- def _likelihood(self, inputs):
- values = inputs
- if self.mean is not None:
- values -= self.mean
-
- # This assumes that the standardized cumulative has the property
- # 1 - c(x) = c(-x), which means we can compute differences equivalently in
- # the left or right tail of the cumulative. The point is to only compute
- # differences in the left tail. This increases numerical stability: c(x) is
- # 1 for large x, 0 for small x. Subtracting two numbers close to 0 can be
- # done with much higher precision than subtracting two numbers close to 1.
- values = abs(values)
- upper = self._standardized_cumulative((.5 - values) / self.scale)
- lower = self._standardized_cumulative((-.5 - values) / self.scale)
- likelihood = upper - lower
-
- return likelihood
-
- def _prepare_indexes(self, shape=None):
- del shape # unused
- if not self.built:
- self.build(self.input_spec.shape)
- return self.indexes
-
- # Just giving a more useful docstring.
- def decompress(self, strings): # pylint:disable=useless-super-delegation
- """Decompress values from their compressed string representations.
-
- Args:
- strings: A string `Tensor` vector containing the compressed data.
-
- Returns:
- The decompressed `Tensor`.
- """
- return super(SymmetricConditional, self).decompress(strings)
-
-
-class GaussianConditional(SymmetricConditional):
- """Conditional Gaussian entropy model.
-
- The layer implements a conditionally Gaussian probability density model to
- estimate entropy of its input tensor, which is described in the paper (please
- cite the paper if you use this code for scientific work):
-
- > "Variational image compression with a scale hyperprior"
- > J. Ballé, D. Minnen, S. Singh, S. J. Hwang, N. Johnston
- > https://arxiv.org/abs/1802.01436
- """
-
- def _standardized_cumulative(self, inputs):
- half = tf.constant(.5, dtype=self.dtype)
- const = tf.constant(-(2 ** -0.5), dtype=self.dtype)
- # Using the complementary error function maximizes numerical precision.
- return half * tf.math.erfc(const * inputs)
-
- def _standardized_quantile(self, quantile):
- return scipy.stats.norm.ppf(quantile)
-
-
-class LogisticConditional(SymmetricConditional):
- """Conditional logistic entropy model.
-
- This is a conditionally Logistic entropy model, analogous to
- `GaussianConditional`.
- """
-
- def _standardized_cumulative(self, inputs):
- return tf.math.sigmoid(inputs)
-
- def _standardized_quantile(self, quantile):
- return scipy.stats.logistic.ppf(quantile)
-
-
-class LaplacianConditional(SymmetricConditional):
- """Conditional Laplacian entropy model.
-
- This is a conditionally Laplacian entropy model, analogous to
- `GaussianConditional`.
- """
-
- def _standardized_cumulative(self, inputs):
- exp = tf.math.exp(-abs(inputs))
- return tf.where(inputs > 0, 2 - exp, exp) / 2
-
- def _standardized_quantile(self, quantile):
- return scipy.stats.laplace.ppf(quantile)
diff --git a/tensorflow_compression/python/layers/entropy_models_test.py b/tensorflow_compression/python/layers/entropy_models_test.py
deleted file mode 100644
index f4c301c..0000000
--- a/tensorflow_compression/python/layers/entropy_models_test.py
+++ /dev/null
@@ -1,618 +0,0 @@
-# Copyright 2018 Google LLC. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-"""Tests of entropy models."""
-
-import numpy as np
-import scipy.stats
-import tensorflow.compat.v1 as tf
-
-from tensorflow.python.framework import test_util
-from tensorflow_compression.python.layers import entropy_models
-
-
-@test_util.deprecated_graph_mode_only
-class EntropyBottleneckTest(tf.test.TestCase):
-
- def test_noise(self):
- # Tests that the noise added is uniform noise between -0.5 and 0.5.
- inputs = tf.placeholder(tf.float32, (None, 1))
- layer = entropy_models.EntropyBottleneck()
- noisy, _ = layer(inputs, training=True)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.linspace(-50, 50, 100)[:, None]
- noisy, = sess.run([noisy], {inputs: values})
- self.assertFalse(np.allclose(values, noisy, rtol=0, atol=.45))
- self.assertAllClose(values, noisy, rtol=0, atol=.5)
-
- def test_quantization_init(self):
- # Tests that inputs are quantized to full integer values right after
- # initialization.
- inputs = tf.placeholder(tf.float32, (None, 1))
- layer = entropy_models.EntropyBottleneck()
- quantized, _ = layer(inputs, training=False)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.linspace(-50, 50, 100)[:, None]
- quantized, = sess.run([quantized], {inputs: values})
- self.assertAllClose(np.around(values), quantized, rtol=0, atol=1e-6)
-
- def test_quantization(self):
- # Tests that inputs are not quantized to full integer values after quantiles
- # have been updated. However, the difference between input and output should
- # be between -0.5 and 0.5, and the offset must be consistent.
- inputs = tf.placeholder(tf.float32, (None, 1))
- layer = entropy_models.EntropyBottleneck()
- quantized, _ = layer(inputs, training=False)
- opt = tf.train.GradientDescentOptimizer(learning_rate=1)
- self.assertEqual(1, len(layer.losses))
- step = opt.minimize(layer.losses[0])
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- sess.run(step)
- values = np.linspace(-50, 50, 100)[:, None]
- quantized, = sess.run([quantized], {inputs: values})
- self.assertAllClose(values, quantized, rtol=0, atol=.5)
- diff = np.ravel(np.around(values) - quantized) % 1
- self.assertAllClose(diff, np.full_like(diff, diff[0]), rtol=0, atol=5e-6)
- self.assertNotEqual(diff[0], 0)
-
- def test_codec_init(self):
- # Tests that inputs are compressed and decompressed correctly, and quantized
- # to full integer values right after initialization.
- inputs = tf.placeholder(tf.float32, (1, None, 1))
- layer = entropy_models.EntropyBottleneck(
- data_format="channels_last", init_scale=30)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings, tf.shape(inputs)[1:])
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.linspace(-50, 50, 100)[None, :, None]
- decoded, = sess.run([decoded], {inputs: values})
- self.assertAllClose(np.around(values), decoded, rtol=0, atol=1e-6)
-
- def test_codec(self):
- # Tests that inputs are compressed and decompressed correctly, and not
- # quantized to full integer values after quantiles have been updated.
- # However, the difference between input and output should be between -0.5
- # and 0.5, and the offset must be consistent.
- inputs = tf.placeholder(tf.float32, (1, None, 1))
- layer = entropy_models.EntropyBottleneck(
- data_format="channels_last", init_scale=40)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings, tf.shape(inputs)[1:])
- opt = tf.train.GradientDescentOptimizer(learning_rate=1)
- self.assertEqual(1, len(layer.losses))
- step = opt.minimize(layer.losses[0])
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- sess.run(step)
- self.assertEqual(1, len(layer.updates))
- sess.run(layer.updates[0])
- values = np.linspace(-50, 50, 100)[None, :, None]
- decoded, = sess.run([decoded], {inputs: values})
- self.assertAllClose(values, decoded, rtol=0, atol=.5)
- diff = np.ravel(np.around(values) - decoded) % 1
- self.assertAllClose(diff, np.full_like(diff, diff[0]), rtol=0, atol=5e-6)
- self.assertNotEqual(diff[0], 0)
-
- def test_channels_last(self):
- # Test the layer with more than one channel and multiple input dimensions,
- # with the channels in the last dimension.
- inputs = tf.placeholder(tf.float32, (None, None, None, 2))
- layer = entropy_models.EntropyBottleneck(
- data_format="channels_last", init_scale=20)
- noisy, _ = layer(inputs, training=True)
- quantized, _ = layer(inputs, training=False)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings, tf.shape(inputs)[1:])
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- self.assertEqual(1, len(layer.updates))
- sess.run(layer.updates[0])
- values = 5 * np.random.normal(size=(7, 5, 3, 2))
- noisy, quantized, decoded = sess.run(
- [noisy, quantized, decoded], {inputs: values})
- self.assertAllClose(values, noisy, rtol=0, atol=.5)
- self.assertAllClose(values, quantized, rtol=0, atol=.5)
- self.assertAllClose(values, decoded, rtol=0, atol=.5)
-
- def test_channels_first(self):
- # Test the layer with more than one channel and multiple input dimensions,
- # with the channel dimension right after the batch dimension.
- inputs = tf.placeholder(tf.float32, (None, 3, None, None))
- layer = entropy_models.EntropyBottleneck(
- data_format="channels_first", init_scale=10)
- noisy, _ = layer(inputs, training=True)
- quantized, _ = layer(inputs, training=False)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings, tf.shape(inputs)[1:])
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- self.assertEqual(1, len(layer.updates))
- sess.run(layer.updates[0])
- values = 2.5 * np.random.normal(size=(2, 3, 5, 7))
- noisy, quantized, decoded = sess.run(
- [noisy, quantized, decoded], {inputs: values})
- self.assertAllClose(values, noisy, rtol=0, atol=.5)
- self.assertAllClose(values, quantized, rtol=0, atol=.5)
- self.assertAllClose(values, decoded, rtol=0, atol=.5)
-
- def test_compress(self):
- # Test compression and decompression, and produce test data for
- # `test_decompress`. If you set the constant at the end to `True`, this test
- # will fail and the log will contain the new test data.
- inputs = tf.placeholder(tf.float32, (2, 3, 9))
- layer = entropy_models.EntropyBottleneck(
- data_format="channels_first", filters=(), init_scale=2)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings, tf.shape(inputs)[1:])
- with self.cached_session() as sess:
- values = 8 * np.random.uniform(size=(2, 3, 9)) - 4
- sess.run(tf.global_variables_initializer())
- self.assertEqual(1, len(layer.updates))
- sess.run(layer.updates[0])
- bitstrings, quantized_cdf, cdf_length, decoded = sess.run(
- [bitstrings, layer._quantized_cdf, layer._cdf_length, decoded],
- {inputs: values})
- self.assertAllClose(values, decoded, rtol=0, atol=.5)
- # Set this constant to `True` to log new test data for `test_decompress`.
- if False: # pylint:disable=using-constant-test
- assert False, (bitstrings, quantized_cdf, cdf_length, decoded)
-
- # Data generated by `test_compress`.
- # pylint:disable=bad-whitespace,bad-continuation
- bitstrings = np.array([
- b"\x91\xf4\xdan2\xd3q\x97\xd0\x91N1~\xc4\xb0;\xd38\xa8\x90",
- b"?\xc7\xf9\x17\xa8\xcfu\x99\x1e4\xfe\xe0\xd3U`z\x15v",
- ], dtype=object)
-
- quantized_cdf = np.array([
- [ 0, 5170, 11858, 19679, 27812, 35302, 65536],
- [ 0, 6100, 13546, 21671, 29523, 36269, 65536],
- [ 0, 6444, 14120, 22270, 29929, 36346, 65536],
- ], dtype=np.int32)
-
- cdf_length = np.array([7, 7, 7], dtype=np.int32)
-
- expected = np.array([
- [[-3., 2., 1., -3., -1., -3., -4., -2., 2.],
- [-2., 2., 4., 1., 0., -3., -3., 2., 4.],
- [ 1., 2., 4., -1., -3., 4., 0., -2., -3.]],
- [[ 0., 4., 0., 2., 4., 1., -2., 1., 4.],
- [ 2., 2., 3., -3., 4., -1., -1., 0., -1.],
- [ 3., 0., 3., -3., 3., 3., -3., -4., -1.]],
- ], dtype=np.float32)
- # pylint:enable=bad-whitespace,bad-continuation
-
- def test_decompress(self):
- # Test that decompression of values compressed with a previous version
- # works, i.e. that the file format doesn't change across revisions.
- bitstrings = tf.placeholder(tf.string)
- input_shape = tf.placeholder(tf.int32)
- quantized_cdf = tf.placeholder(tf.int32)
- cdf_length = tf.placeholder(tf.int32)
- layer = entropy_models.EntropyBottleneck(
- data_format="channels_first", filters=(), init_scale=2,
- dtype=tf.float32)
- layer.build(self.expected.shape)
- layer._quantized_cdf = quantized_cdf
- layer._cdf_length = cdf_length
- decoded = layer.decompress(bitstrings, input_shape[1:])
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- decoded, = sess.run([decoded], {
- bitstrings: self.bitstrings, input_shape: self.expected.shape,
- quantized_cdf: self.quantized_cdf, cdf_length: self.cdf_length})
- self.assertAllClose(self.expected, decoded, rtol=0, atol=1e-6)
-
- def test_build_decompress(self):
- # Test that layer can be built when `decompress` is the first call to it.
- bitstrings = tf.placeholder(tf.string)
- input_shape = tf.placeholder(tf.int32, shape=[3])
- layer = entropy_models.EntropyBottleneck(dtype=tf.float32)
- layer.decompress(bitstrings, input_shape[1:], channels=5)
- self.assertTrue(layer.built)
-
- def test_normalization(self):
- # Test that densities are normalized correctly.
- inputs = tf.placeholder(tf.float32, (None, 1))
- layer = entropy_models.EntropyBottleneck(filters=(2,))
- _, likelihood = layer(inputs, training=True)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- x = np.repeat(np.arange(-200, 201), 2000)[:, None]
- likelihood, = sess.run([likelihood], {inputs: x})
- self.assertEqual(x.shape, likelihood.shape)
- integral = np.sum(likelihood) * .0005
- self.assertAllClose(1, integral, rtol=0, atol=2e-4)
-
- def test_entropy_estimates(self):
- # Test that entropy estimates match actual range coding.
- inputs = tf.placeholder(tf.float32, (1, None, 1))
- layer = entropy_models.EntropyBottleneck(
- filters=(2, 3), data_format="channels_last")
- _, likelihood = layer(inputs, training=True)
- diff_entropy = tf.reduce_sum(tf.log(likelihood)) / -np.log(2)
- _, likelihood = layer(inputs, training=False)
- disc_entropy = tf.reduce_sum(tf.log(likelihood)) / -np.log(2)
- bitstrings = layer.compress(inputs)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- self.assertEqual(1, len(layer.updates))
- sess.run(layer.updates[0])
- diff_entropy, disc_entropy, bitstrings = sess.run(
- [diff_entropy, disc_entropy, bitstrings],
- {inputs: np.random.normal(size=(1, 10000, 1))})
- codelength = 8 * sum(len(s) for s in bitstrings)
- self.assertAllClose(diff_entropy, disc_entropy, rtol=5e-3, atol=0)
- self.assertAllClose(disc_entropy, codelength, rtol=5e-3, atol=0)
-
-
-@test_util.deprecated_graph_mode_only
-class SymmetricConditionalTest(object):
-
- def test_noise(self):
- # Tests that the noise added is uniform noise between -0.5 and 0.5.
- inputs = tf.placeholder(tf.float32, [None])
- scale = tf.placeholder(tf.float32, [None])
- layer = self.subclass(scale, [1])
- noisy, _ = layer(inputs, training=True)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.linspace(-50, 50, 100)
- noisy, = sess.run([noisy], {
- inputs: values,
- scale: np.random.uniform(1, 10, size=values.shape),
- })
- self.assertFalse(np.allclose(values, noisy, rtol=0, atol=.45))
- self.assertAllClose(values, noisy, rtol=0, atol=.5)
-
- def test_quantization(self):
- # Tests that inputs are quantized to full integer values.
- inputs = tf.placeholder(tf.float32, [None])
- scale = tf.placeholder(tf.float32, [None])
- layer = self.subclass(scale, [1], mean=None)
- quantized, _ = layer(inputs, training=False)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.linspace(-50, 50, 100)
- quantized, = sess.run([quantized], {
- inputs: values,
- scale: np.random.uniform(1, 10, size=values.shape),
- })
- self.assertAllClose(np.around(values), quantized, rtol=0, atol=1e-6)
-
- def test_quantization_mean(self):
- # Tests that inputs are quantized to integer values with a consistent offset
- # to the mean.
- inputs = tf.placeholder(tf.float32, [None])
- scale = tf.placeholder(tf.float32, [None])
- mean = tf.placeholder(tf.float32, [None])
- layer = self.subclass(scale, [1], mean=mean)
- quantized, _ = layer(inputs, training=False)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.linspace(-50, 50, 100)
- mean_values = np.random.normal(size=values.shape)
- quantized, = sess.run([quantized], {
- inputs: values,
- scale: np.random.uniform(1, 10, size=values.shape),
- mean: mean_values,
- })
- self.assertAllClose(
- np.around(values - mean_values) + mean_values, quantized,
- rtol=0, atol=1e-5)
-
- def test_codec(self):
- # Tests that inputs are compressed and decompressed correctly, and quantized
- # to full integer values.
- inputs = tf.placeholder(tf.float32, [None, None])
- scale = tf.placeholder(tf.float32, [None, None])
- layer = self.subclass(
- scale, [2 ** x for x in range(-10, 10)], mean=None)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.linspace(-50, 50, 100)[None]
- decoded, = sess.run([decoded], {
- inputs: values,
- scale: np.random.uniform(25, 75, size=values.shape),
- })
- self.assertAllClose(np.around(values), decoded, rtol=0, atol=1e-6)
-
- def test_codec_mean(self):
- # Tests that inputs are compressed and decompressed correctly, and quantized
- # to integer values with an offset consistent with the mean.
- inputs = tf.placeholder(tf.float32, [None, None])
- scale = tf.placeholder(tf.float32, [None, None])
- mean = tf.placeholder(tf.float32, [None, None])
- layer = self.subclass(
- scale, [2 ** x for x in range(-10, 10)], mean=mean)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.linspace(-50, 50, 100)[None]
- mean_values = np.random.normal(size=values.shape)
- decoded, = sess.run([decoded], {
- inputs: values,
- scale: np.random.uniform(25, 75, size=values.shape),
- mean: mean_values,
- })
- self.assertAllClose(
- np.around(values - mean_values) + mean_values, decoded,
- rtol=0, atol=1e-5)
-
- def test_multiple_dimensions(self):
- # Test the layer with more than one channel and multiple input dimensions.
- inputs = tf.placeholder(tf.float32, [None, None, None, None])
- scale = tf.placeholder(tf.float32, [None, None, None, None])
- layer = self.subclass(
- scale, [2 ** x for x in range(-10, 10)])
- noisy, _ = layer(inputs, training=True)
- quantized, _ = layer(inputs, training=False)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = 10 * np.random.normal(size=(2, 5, 3, 7))
- noisy, quantized, decoded = sess.run(
- [noisy, quantized, decoded],
- {inputs: values, scale: np.random.uniform(5, 15, size=values.shape)})
- self.assertAllClose(values, noisy, rtol=0, atol=.5)
- self.assertAllClose(values, quantized, rtol=0, atol=.5)
- self.assertAllClose(values, decoded, rtol=0, atol=.5)
-
- def test_compress(self):
- # Test compression and decompression, and produce test data for
- # `test_decompress`. If you set the constant at the end to `True`, this test
- # will fail and the log will contain the new test data.
- shape = (2, 7)
- scale_table = [2 ** x for x in range(-5, 1)]
- inputs = tf.placeholder(tf.float32, shape)
- scale = tf.placeholder(tf.float32, shape)
- indexes = tf.placeholder(tf.int32, shape)
- layer = self.subclass(scale, scale_table, indexes=indexes)
- bitstrings = layer.compress(inputs)
- decoded = layer.decompress(bitstrings)
- with self.cached_session() as sess:
- values = 8 * np.random.uniform(size=shape) - 4
- indexes = np.random.randint(
- 0, len(scale_table), size=shape, dtype=np.int32)
- sess.run(tf.global_variables_initializer())
- bitstrings, quantized_cdf, cdf_length, decoded = sess.run(
- [bitstrings, layer._quantized_cdf, layer._cdf_length, decoded],
- {inputs: values, layer.indexes: indexes})
- self.assertAllClose(values, decoded, rtol=0, atol=.5)
- # Set this constant to `True` to log new test data for `test_decompress`.
- if False: # pylint:disable=using-constant-test
- assert False, (bitstrings, indexes, quantized_cdf, cdf_length, decoded)
-
- def test_decompress(self):
- # Test that decompression of values compressed with a previous version
- # works, i.e. that the file format doesn't change across revisions.
- shape = (2, 7)
- scale_table = [2 ** x for x in range(-5, 1)]
- bitstrings = tf.placeholder(tf.string)
- scale = tf.placeholder(tf.float32, shape)
- indexes = tf.placeholder(tf.int32, shape)
- layer = self.subclass(
- scale, scale_table, indexes=indexes, dtype=tf.float32)
- decoded = layer.decompress(bitstrings)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- decoded, = sess.run([decoded], {
- bitstrings: self.bitstrings,
- layer.indexes: self.indexes,
- layer._quantized_cdf: self.quantized_cdf,
- layer._cdf_length: self.cdf_length})
- self.assertAllClose(self.expected, decoded, rtol=0, atol=1e-6)
-
- def test_build_decompress(self):
- # Test that layer can be built when `decompress` is the first call to it.
- bitstrings = tf.placeholder(tf.string)
- scale = tf.placeholder(tf.float32, [None, None, None])
- layer = self.subclass(
- scale, [2 ** x for x in range(-10, 10)], dtype=tf.float32)
- layer.decompress(bitstrings)
- self.assertTrue(layer.built)
-
- def test_quantile_function(self):
- # Test that quantile function inverts cumulative.
- scale = tf.placeholder(tf.float64, [None])
- layer = self.subclass(scale, [1], dtype=tf.float64)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- quantiles = np.array([1e-5, 1e-2, .1, .5, .6, .8])
- locations = layer._standardized_quantile(quantiles)
- locations = tf.constant(locations, tf.float64)
- values, = sess.run([layer._standardized_cumulative(locations)])
- self.assertAllClose(quantiles, values, rtol=1e-12, atol=0)
-
- def test_distribution(self):
- # Tests that the model represents the underlying distribution convolved
- # with a uniform.
- inputs = tf.placeholder(tf.float32, [None, None])
- scale = tf.placeholder(tf.float32, [None, None])
- layer = self.subclass(scale, [1], scale_bound=0, mean=None)
- _, likelihood = layer(inputs, training=False)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- values = np.arange(-5, 1)[:, None] # must be integers due to quantization
- scales = 2 ** np.linspace(-3, 3, 10)[None, :]
- likelihoods, = sess.run([likelihood], {inputs: values, scale: scales})
- expected = (
- self.scipy_class.cdf(values + .5, scale=scales) -
- self.scipy_class.cdf(values - .5, scale=scales))
- self.assertAllClose(expected, likelihoods, rtol=1e-5, atol=1e-7)
-
- def test_entropy_estimates(self):
- # Test that analytical entropy, entropy estimates, and range coding match
- # each other.
- inputs = tf.placeholder(tf.float32, [None, None])
- scale = tf.placeholder(tf.float32, [None, None])
- layer = self.subclass(
- scale, [2 ** -10, 1, 10], scale_bound=0, likelihood_bound=0)
- _, likelihood = layer(inputs, training=True)
- diff_entropy = tf.reduce_mean(tf.log(likelihood), axis=1)
- diff_entropy /= -np.log(2)
- _, likelihood = layer(inputs, training=False)
- disc_entropy = tf.reduce_mean(tf.log(likelihood), axis=1)
- disc_entropy /= -np.log(2)
- bitstrings = layer.compress(inputs)
- with self.cached_session() as sess:
- sess.run(tf.global_variables_initializer())
- scales = np.repeat([layer.scale_table], 10000, axis=0).T
- values = self.scipy_class.rvs(scale=scales, size=scales.shape)
- diff_entropy, disc_entropy, bitstrings = sess.run(
- [diff_entropy, disc_entropy, bitstrings],
- {inputs: values, scale: scales})
- codelength = [8 * len(s) for s in bitstrings]
- codelength = np.array(codelength) / values.shape[1]
- # The analytical entropy is only going to match the empirical for larger
- # scales because of the additive uniform noise. For scale values going to
- # zero, the empirical entropy will converge to zero (the entropy of a
- # standard uniform) instead of -infty. For large scale values, the additive
- # noise is negligible.
- theo_entropy = self.scipy_class.entropy(scale=10) / np.log(2)
- self.assertAllClose(0, diff_entropy[0], rtol=1e-2, atol=1e-2)
- self.assertAllClose(theo_entropy, diff_entropy[-1], rtol=1e-2, atol=1e-2)
- self.assertAllClose(diff_entropy, disc_entropy, rtol=1e-2, atol=1e-2)
- self.assertAllClose(disc_entropy, codelength, rtol=1e-2, atol=1e-2)
- # The range coder should have some overhead.
- self.assertTrue(all(codelength > disc_entropy))
-
-
-class GaussianConditionalTest(tf.test.TestCase, SymmetricConditionalTest):
-
- subclass = entropy_models.GaussianConditional
- scipy_class = scipy.stats.norm
-
- # Data generated by `test_compress`.
- # pylint:disable=bad-whitespace,bad-continuation
- bitstrings = np.array([
- b"\xff\xff\x13\xff\xff\x0f\xff\xef\xa9\x000\xb9\xffT\x87\xffUB",
- b"\x10\xf1m-\xf0r\xac\x97\xb6\xd5",
- ], dtype=object)
-
- indexes = np.array([
- [1, 2, 3, 4, 2, 2, 1],
- [5, 5, 1, 5, 3, 2, 3],
- ], dtype=np.int32)
-
- quantized_cdf = np.array([
- [ 0, 1, 65534, 65535, 65536, 0, 0, 0, 0],
- [ 0, 1, 65534, 65535, 65536, 0, 0, 0, 0],
- [ 0, 2, 65533, 65535, 65536, 0, 0, 0, 0],
- [ 0, 1491, 64044, 65535, 65536, 0, 0, 0, 0],
- [ 0, 88, 10397, 55138, 65447, 65535, 65536, 0, 0],
- [ 0, 392, 4363, 20205, 45301, 61143, 65114, 65506, 65536],
- ], dtype=np.int32)
-
- cdf_length = np.array([5, 5, 5, 5, 7, 9], dtype=np.int32)
-
- expected = np.array([
- [-3., 2., 1., -3., -1., -3., -4.],
- [-2., 2., -2., 2., 4., 1., 0.],
- ], dtype=np.float32)
- # pylint:enable=bad-whitespace,bad-continuation
-
-
-class LogisticConditionalTest(tf.test.TestCase, SymmetricConditionalTest):
-
- subclass = entropy_models.LogisticConditional
- scipy_class = scipy.stats.logistic
-
- # Data generated by `test_compress`.
- # pylint:disable=bad-whitespace,bad-continuation
- bitstrings = np.array([
- b"\xff\xff\x13\xff\xff\x0e\x17\xfd\xb5B\x03\xff\xf4\x11",
- b",yh\x13)\x12F\xfb",
- ], dtype=object)
-
- indexes = np.array([
- [1, 2, 3, 4, 2, 2, 1],
- [5, 5, 1, 5, 3, 2, 3]
- ], dtype=np.int32)
-
- quantized_cdf = np.array([
- [ 0, 1, 65534, 65535, 65536, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0],
- [ 0, 22, 65513, 65535, 65536, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0],
- [ 0, 1178, 64357, 65535, 65536, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0],
- [ 0, 159, 7809, 57721, 65371, 65530, 65536, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0],
- [ 0, 52, 431, 3100, 17617, 47903, 62420, 65089, 65468,
- 65520, 65536, 0, 0, 0, 0, 0, 0],
- [ 0, 62, 230, 683, 1884, 4935, 11919, 24706, 40758,
- 53545, 60529, 63580, 64781, 65234, 65402, 65464, 65536],
- ], dtype=np.int32)
-
- cdf_length = np.array([ 5, 5, 5, 7, 11, 17], dtype=np.int32)
-
- expected = np.array([
- [-3., 2., 1., -3., -1., -3., -4.],
- [-2., 2., -2., 2., 4., 1., 0.],
- ], dtype=np.float32)
- # pylint:enable=bad-whitespace,bad-continuation
-
-
-class LaplacianConditionalTest(tf.test.TestCase, SymmetricConditionalTest):
-
- subclass = entropy_models.LaplacianConditional
- scipy_class = scipy.stats.laplace
-
- # Data generated by `test_compress`.
- # pylint:disable=bad-whitespace,bad-continuation
- bitstrings = np.array([
- b"\xff\xff\x13\xff\xff\x0e\xea\xc1\xd9n'\xff\xfe*",
- b"\x1b\x9c\xd3\x06\xde_\xc0$",
- ], dtype=object)
-
- indexes = np.array([
- [1, 2, 3, 4, 2, 2, 1],
- [5, 5, 1, 5, 3, 2, 3],
- ], dtype=np.int32)
-
- quantized_cdf = np.array([
- [ 0, 1, 65534, 65535, 65536, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0],
- [ 0, 11, 65524, 65535, 65536, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0],
- [ 0, 600, 64935, 65535, 65536, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0],
- [ 0, 80, 4433, 61100, 65453, 65533, 65536, 0, 0,
- 0, 0, 0, 0, 0, 0],
- [ 0, 191, 1602, 12025, 53451, 63874, 65285, 65476, 65536,
- 0, 0, 0, 0, 0, 0],
- [ 0, 85, 315, 940, 2640, 7262, 19825, 45612, 58175,
- 62797, 64497, 65122, 65352, 65437, 65536],
- ], dtype=np.int32)
-
- cdf_length = np.array([ 5, 5, 5, 7, 9, 15], dtype=np.int32)
-
- expected = np.array([
- [-3., 2., 1., -3., -1., -3., -4.],
- [-2., 2., -2., 2., 4., 1., 0.],
- ], dtype=np.float32)
- # pylint:enable=bad-whitespace,bad-continuation
-
-
-if __name__ == "__main__":
- tf.test.main()