Skip to content

Commit

Permalink
Add code example and FAQ on chunked (de)compression
Browse files Browse the repository at this point in the history
  • Loading branch information
lindstro committed Oct 29, 2024
1 parent c0a3f92 commit a46fa8b
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ Change Log

## Unreleased

### Added

- A new code example, `chunk`, shows how to perform (de)compression in chunks.

### Fixed

- #241: Signed left shifts, integer overflow invoke undefined behavior.
Expand Down
32 changes: 32 additions & 0 deletions docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,38 @@ storage would not be enough to distinguish more than 16 different values.
For more advanced compressed-array features, see the
:ref:`tutorial <tut-arrays>`.

.. _ex-chunk:

Chunked (De)compression
-----------------------

The :program:`chunk` program is an example of how to perform chunked
(de)compression, where the compressed stream for a 3D array is produced or
consumed in multiple chunks. Chunking slices the array along the *z*
direction (the slowest varying dimension) into slabs that are (de)compressed
independently. Assuming the chosen array dimensions, rate, and number of
chunks admit (de)compression by satisfying certain constraints (see FAQ
:ref:`#32 <q-chunked>`), (de)compression in chunks should result in the same
output as if the entire array were (de)compressed all at once.

The array dimensions are specified as :code:`-3 nx ny nz` (default is
125 |times| 100 |times| 240); the rate as :code:`-r rate` (default is
16 bits/value); and the number of chunks as :code:`-n chunks` (default is one
chunk). Without :code:`-d`, a synthetic array is generated and compressed to
standard output. Using :code:`-d`, standard input is decompressed and written
to standard output. For example::

chunk -n 1 > single.zfp
chunk -n 4 > quadruple.zfp
diff single.zfp quadruple.zfp

chunk -n 1 -d < single.zfp > single.f64
chunk -n 4 -d < single.zfp > quadruple.f64
diff single.f64 quadruple.f64

Here :program:`diff` should report no differences. See FAQ
:ref:`#32 <q-chunked>` for further discussion of chunked (de)compression.

.. _ex-diffusion:

Diffusion Solver
Expand Down
83 changes: 81 additions & 2 deletions docs/source/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Questions answered in this FAQ:
#. :ref:`How can I print array values? <q-printf>`
#. :ref:`What is known about zfp compression errors? <q-err-dist>`
#. :ref:`Why are zfp blocks 4 * 4 * 4 values? <q-block-size>`
#. :ref:`Can zfp (de)compress a single array in chunks? <q-chunked>`

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -530,8 +531,9 @@ when calling the high-level API function :c:func:`zfp_decompress`.

With regards to the :c:type:`zfp_field` struct passed to
:c:func:`zfp_compress` and :c:func:`zfp_decompress`, field dimensions must
match between compression and decompression, however strides need not match
(see :ref:`Q16 <q-strides>`). Additionally, the scalar type,
generally match between compression and decompression, though see
:ref:`Q32 <q-chunked>` on chunked (de)compression. Strides, however, need
not match; see :ref:`Q16 <q-strides>`. Additionally, the scalar type,
:c:type:`zfp_type`, must match. For example, float arrays currently have a
compressed representation different from compressed double arrays due to
differences in exponent width. It is not possible to compress a double array
Expand Down Expand Up @@ -1418,3 +1420,80 @@ above factors. Additionally, *n* = 4 has these benefits:
a compressed 3D block occupies 128 bytes, or 1-2 hardware cache lines on
contemporary computers. Hence, a fair number of *compressed* blocks can
also fit in hardware cache.

-------------------------------------------------------------------------------

.. _q-chunked:

Q32: *Can zfp (de)compress a single array in chunks?*

Yes, but there are restrictions.

First, one can trivially partition any array into subarrays and (de)compress
those independently using separate matching :c:func:`zfp_compress` and
:c:func:`zfp_decompress` calls for each chunk. Via subarray dimensions,
strides, and pointers into the larger array, one can thus (de)compress the
full array in pieces; see also :ref:`Q16 <q-strides>`. This approach to
chunked (de)compression incurs no constraints on compression mode, compression
parameters, or array dimensions, though producer and consumer must agree on
chunk size. This type of chunking is employed by the |zfp| HDF5 filter
`H5Z-ZFP <https://github.com/LLNL/H5Z-ZFP>`__ for I/O.

A more restricted form of chunked (de)compression is to produce (compress) or
consume (decompress) a single compressed stream for the whole array in chunks
in a manner compatible with producing/consuming the entire stream all at once.
Such chunked (de)compression divides the array into slabs along the slowest
varying dimension (e.g., along *z* for 3D arrays), (de)compresses one slab at
a time, and produces or consumes consecutive pieces of the sequential
compressed stream. This approach, too, is possible, though only when these
requirements are met:

* The size of each chunk (except the last) must be a whole multiple of four
along the slowest varying dimension; other dimensions are not subject to this
constraint. For example, a 3D array with *nz* = 120 can be (de)compressed
in two or three equal-size chunks, but not four, since 120/2 = 60, and
120/3 = 40 are both divisible by four, but 120/4 = 30 is not. Other viable
chunk sizes are 120/5 = 24, 120/6 = 20, 120/10 = 12, 120/15 = 8, and
120/30 = 4. Note that other chunk sizes may be possible by relaxing the
constraint that they all be equal, as exploited by the
:ref:`chunk <ex-chunk>` code example, e.g., *nz* = 120 can be partitioned
into three chunks of size 32 and one of size 24.

The reason for this requirement is that |zfp| always pads each compressed
(sub)array to fill out whole blocks of size 4 in each dimension, and such
interior padding would not occur if the whole array were compressed as a
single chunk.

* The length of the compressed substream for each chunk must be a multiple of
the :ref:`word size <word-size>`. The reason for this is that each
:c:func:`zfp_compress` and :c:func:`zfp_decompress` call aligns the stream
on a word boundary upon completion. One may avoid this requirement by using
the low-level API, which does not automatically perform such alignment.

.. note::

When using the :ref:`high-level API <hl-api>`, the requirement on stream
alignment essentially limits chunked (de)compression to
:ref:`fixed-rate mode <mode-fixed-rate>`, as it is the only one that can
guarantee that the size of each compressed chunk is a multiple of the word
size. To support other compression modes, use the
:ref:`low-level API <ll-api>`.

Chunked (de)compression requires the user to set the :c:type:`zfp_field`
dimensions to match the current chunk size and to set the
:ref:`field pointer <zfp_field_set>` to the beginning of each uncompressed
chunk before (de)compressing it. The user may also have to position the
compressed stream so that it points to the beginning of each compressed
chunk. See the :ref:`code example <ex-chunk>` for how one may implement
chunked (de)compression.

Note that the chunk size used for compression need not match the size used for
decompression; e.g., the array may be compressed in a single sweep but
decompressed in chunks, or vice versa. Any combination of chunk sizes that
respect the above constraints is valid.

Chunked (de)compression makes it possible to perform, for example, windowed
streaming computations on smaller subsets of the decompressed array at a time,
i.e., without having to allocate enough space to hold the entire uncompressed
array. It also can be useful for overlapping or interleaving computation with
(de)compression in a producer/consumer model.
2 changes: 2 additions & 0 deletions docs/source/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ in the same manner that :ref:`build targets <targets>` are specified, e.g.,
Default: undefined/off.


.. _word-size:

.. c:macro:: BIT_STREAM_WORD_TYPE
Unsigned integer type used for buffering bits. Wider types tend to give
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ add_executable(array array.cpp)
target_compile_definitions(array PRIVATE ${zfp_compressed_array_defs})
target_link_libraries(array zfp)

add_executable(chunk chunk.c)
target_link_libraries(chunk zfp)

add_executable(diffusion diffusion.cpp)
target_compile_definitions(diffusion PRIVATE ${zfp_compressed_array_defs})
if(ZFP_WITH_OPENMP)
Expand Down
4 changes: 4 additions & 0 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ include ../Config

BINDIR = ../bin
TARGETS = $(BINDIR)/array\
$(BINDIR)/chunk\
$(BINDIR)/diffusion\
$(BINDIR)/inplace\
$(BINDIR)/iterator\
Expand All @@ -25,6 +26,9 @@ all: $(TARGETS)
$(BINDIR)/array: array.cpp ../lib/$(LIBZFP)
$(CXX) $(CXXFLAGS) $(INCS) array.cpp $(CXXLIBS) -o $@

$(BINDIR)/chunk: chunk.c ../lib/$(LIBZFP)
$(CC) $(CFLAGS) $(INCS) chunk.c $(CLIBS) -o $@

$(BINDIR)/diffusion: diffusion.cpp ../lib/$(LIBZFP)
$(CXX) $(CXXFLAGS) $(INCS) diffusion.cpp $(CXXLIBS) -o $@

Expand Down
192 changes: 192 additions & 0 deletions examples/chunk.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/* code example showing how to (de)compress a 3D array in chunks */

#include <limits.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "zfp.h"

/* open compressed stream for (de)compressing field at given rate */
static zfp_stream*
stream(const zfp_field* field, double rate)
{
const size_t bx = (field->nx + 3) / 4; /* # blocks along x */
const size_t by = (field->ny + 3) / 4; /* # blocks along y */
const size_t bz = (field->nz + 3) / 4; /* # blocks along z */

zfp_stream* zfp; /* compressed stream */
size_t words; /* word size of compressed buffer */
size_t bytes; /* byte size of compressed buffer */
void* buffer; /* storage for compressed stream */
bitstream* stream; /* bit stream to write to or read from */

/* allocate meta data for a compressed stream */
zfp = zfp_stream_open(NULL);

/* set fixed-rate mode with no alignment */
zfp_stream_set_rate(zfp, rate, zfp_type_double, zfp_field_dimensionality(field), zfp_false);

/* determine exact compressed size in words */
words = (bx * by * bz * zfp->maxbits + stream_word_bits - 1) / stream_word_bits;

/* allocate buffer for single chunk of compressed data */
bytes = words * stream_word_bits / CHAR_BIT;
buffer = malloc(bytes);

/* associate bit stream with allocated buffer */
stream = stream_open(buffer, bytes);
zfp_stream_set_bit_stream(zfp, stream);

return zfp;
}

/* compress chunk */
static zfp_bool
compress(zfp_stream* zfp, const zfp_field* field)
{
void* buffer = stream_data(zfp_stream_bit_stream(zfp));

/* compress chunk and output compressed data */
size_t size = zfp_compress(zfp, field);
if (!size)
return zfp_false;
fwrite(buffer, 1, size, stdout);

return zfp_true;
}

/* decompress chunk */
static zfp_bool
decompress(zfp_stream* zfp, zfp_field* field)
{
void* buffer = stream_data(zfp_stream_bit_stream(zfp));

/* decompress chunk and output uncompressed data */
size_t size = fread(buffer, 1, stream_capacity(zfp_stream_bit_stream(zfp)), stdin);
if (zfp_decompress(zfp, field) != size)
return zfp_false;
fwrite(zfp_field_pointer(field), sizeof(double), zfp_field_size(field, NULL), stdout);

return zfp_true;
}

/* print command usage */
static int
usage(void)
{
fprintf(stderr, "chunk [options] <input >output\n");
fprintf(stderr, "Options:\n");
fprintf(stderr, "-3 <nx> <ny> <nz> : array dimensions\n");
fprintf(stderr, "-d : decompress (from stdin to stdout); else compress\n");
fprintf(stderr, "-n <count> : number of chunks along z dimension\n");
fprintf(stderr, "-r <rate> : rate in bits/value\n");

return EXIT_FAILURE;
}

int main(int argc, char* argv[])
{
/* command-line arguments */
zfp_bool decode = zfp_false;
double rate = 16;
int nx = 125;
int ny = 100;
int nz = 240;
int chunks = 1;

/* local variables */
double* array;
double* ptr;
zfp_field* field;
zfp_stream* zfp;
int i, x, y, z, mz;

/* process command line */
for (i = 1; i < argc; i++)
if (!strcmp(argv[i], "-3")) {
if (++i == argc || sscanf(argv[i], "%d", &nx) != 1 ||
++i == argc || sscanf(argv[i], "%d", &ny) != 1 ||
++i == argc || sscanf(argv[i], "%d", &nz) != 1)
return usage();
}
else if (!strcmp(argv[i], "-d"))
decode = zfp_true;
else if (!strcmp(argv[i], "-r")) {
if (++i == argc || sscanf(argv[i], "%lf", &rate) != 1)
return usage();
}
else if (!strcmp(argv[i], "-n")) {
if (++i == argc || sscanf(argv[i], "%d", &chunks) != 1)
usage();
}
else
return usage();

/* compute chunk size (must be a multiple of four) */
mz = 4 * ((nz + 4 * chunks - 1) / (4 * chunks));
if ((chunks - 1) * mz >= nz) {
fprintf(stderr, "cannot partition nz=%d into %d chunks\n", nz, chunks);
return EXIT_FAILURE;
}

/* allocate whole nx * ny * nz array of doubles */
array = malloc(nx * ny * nz * sizeof(double));

if (!decode) {
/* initialize array to be compressed */
for (z = 0; z < nz; z++)
for (y = 0; y < ny; y++)
for (x = 0; x < nx; x++)
array[x + nx * (y + ny * z)] = 1. / (1 + x + nx * (y + ny * z));
}

/* initialize field, stream, and compressed buffer */
field = zfp_field_3d(array, zfp_type_double, nx, ny, mz);
zfp = stream(field, rate);

/* warn if compressed size is not a multiple of word size */
if (chunks > 1 && (zfp_field_blocks(field) * zfp->maxbits) % stream_word_bits)
fprintf(stderr, "warning: compressed size (%ld) is not a multiple of word size (%ld)\n", (long)(zfp_field_blocks(field) * zfp->maxbits), (long)stream_word_bits);

/* (de)compress array in chunks */
ptr = array;
for (z = 0; z < nz; z += mz) {
/* compute current chunk size as min(mz, nz - z) */
int cz = mz < nz - z ? mz : nz - z;

/* set chunk size and pointer into uncompressed array */
zfp_field_set_pointer(field, ptr);
zfp_field_set_size_3d(field, nx, ny, cz);

/* reuse compressed buffer by rewinding compressed stream */
zfp_stream_rewind(zfp);

if (decode) {
/* decompress current chunk from stdin to stdout */
if (!decompress(zfp, field)) {
fprintf(stderr, "decompression failed\n");
return EXIT_FAILURE;
}
}
else {
/* compress current chunk to stdout */
if (!compress(zfp, field)) {
fprintf(stderr, "compression failed\n");
return EXIT_FAILURE;
}
}

/* advance pointer to next chunk of uncompressed data */
ptr += nx * ny * cz;
}

/* clean up */
free(stream_data(zfp_stream_bit_stream(zfp)));
stream_close(zfp_stream_bit_stream(zfp));
zfp_stream_close(zfp);
zfp_field_free(field);
free(array);

return EXIT_SUCCESS;
}

0 comments on commit a46fa8b

Please sign in to comment.