Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Opencl lz4 #3900

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 263 additions & 3 deletions src/silx/opencl/codec/bitshuffle_lz4.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,24 @@
# OTHER DEALINGS IN THE SOFTWARE.

"""
This module provides a class for CBF byte offset compression/decompression.
This module provides a class for bitshuffle-LZ4 compression/decompression.
"""

__authors__ = ["Jérôme Kieffer"]
__contact__ = "[email protected]"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
__date__ = "09/11/2022"
__date__ = "22/08/2023"
__status__ = "production"


import time
import os
import struct
import numpy
import json
from ..common import ocl, pyopencl, kernel_workgroup_size
from ..processing import BufferDescription, EventDescription, OpenclProcessing
import pyopencl.array as cla

import logging

Expand Down Expand Up @@ -212,3 +214,261 @@ def decompress(self, raw, out=None, wg=None, nbytes=None):
return out

__call__ = decompress


def test_lz4_analysis(
data,
block_size=1024,
workgroup_size=32,
segments_size=None,
profile=True,
compaction=True,
):
"""Function that tests LZ4 analysis (i.e. the correctness of segments) on a dataset.

:param data: some data to play with
:paam block_size: how many items are treated by a workgroup
:param workgroup_size: size of the workgroup
:param segments_size: by default, data_size/4
:param profile: tune on profiling for OpenCL
:param compaction: set to false to retrieve the raw segment before compaction
:return: a set of segment containing:
- position in the input stream
- length of the littral section
- length of the matching section
- position in the output stream

Prints out performance (measured from Python) in ms
"""
t0 = time.perf_counter_ns()
performances = {}
if isinstance(data, bytes):
data = numpy.frombuffer(data, "uint8")
else:
data = data.view("uint8").ravel()
data_size = data.size
num_workgroup = (data_size + block_size - 1) // block_size
if segments_size is None:
segments_size = block_size // 4

segment_pos = numpy.zeros((num_workgroup, 2), "int32")
tmp_sp = numpy.arange(0, segments_size * (num_workgroup + 1), segments_size)
segment_pos[:, 0] = tmp_sp[:-1]
segment_pos[:, 1] = tmp_sp[1:]

# Opencl setup
t1 = time.perf_counter_ns()
ctx = pyopencl.create_some_context()
src_file = os.path.abspath(
os.path.join(
os.path.abspath(__file__),
"../../../resources/opencl/codec/lz4_compression.cl",
)
)
src = open(src_file).read()
prg = pyopencl.Program(ctx, src).build(
options=f"-DBUFFER_SIZE={block_size} -DSEGMENT_SIZE={segments_size} -DWORKGROUP_SIZE={workgroup_size}"
)
t1a = time.perf_counter_ns()
if profile:
queue = pyopencl.CommandQueue(
ctx, properties=pyopencl.command_queue_properties.PROFILING_ENABLE
)
else:
queue = pyopencl.CommandQueue(ctx)

data_d = cla.to_device(queue, data)
segment_posd = cla.to_device(queue, segment_pos)
segments_d = cla.zeros(queue, (segments_size * num_workgroup, 4), "int32")
wgcnt_d = cla.to_device(queue, numpy.array([num_workgroup], "int32"))
output_size_d = cla.zeros(queue, num_workgroup, "int32")

t2 = time.perf_counter_ns()
evt = prg.LZ4_cmp_stage1(
queue,
(workgroup_size * num_workgroup,),
(workgroup_size,),
data_d.data,
numpy.int32(data_size),
segment_posd.data,
segments_d.data,
numpy.int32(compaction),
output_size_d.data,
wgcnt_d.data,
)
evt.wait()
t3 = time.perf_counter_ns()
segments = segments_d.get()
if compaction:
final_positons = segment_posd.get()
segments = segments[final_positons[0, 0] : final_positons[0, 1]]
t4 = time.perf_counter_ns()
if 1: # profile:
performances["python_setup"] = (t1 - t0) * 1e-6
performances["opencl_compilation"] = (t1a - t1) * 1e-6
performances["opencl_setup"] = (t2 - t1a) * 1e-6
performances["opencl_run_python"] = (t3 - t2) * 1e-6
if profile:
performances["opencl_run_profile"] = 1e-6 * (
evt.profile.end - evt.profile.start
)
performances["opencl_retrieve"] = (t4 - t3) * 1e-6

print(json.dumps(performances, indent=2))

if compaction:
compacted = segments
else:
compacted = _repack_segments(segments)

# Check validity: input indexes
inp_idx = compacted[:, 0]
res = numpy.where((inp_idx[1:] - inp_idx[:-1]) <= 0)

# if res[0].size:
if True:
print(f"Input position are all ascending except {res[0]}")
# Check validity: input size
size = segments[:, 1:3].sum()
if True:
# if data.size != size:
print(f"Input size matches, got {size}, expected {data.size}")

# Check validity: input size (bis)
size = compacted[-1, :-1].sum()
# if data.size != size:
if True:
print(
f"Input size does match the end of segments, got {size}, expected {data.size}"
)

# Check validity: output indexes
out_idx = compacted[:, -1]
res = numpy.where((out_idx[1:] - out_idx[:-1]) <= 0)
# if res[0].size:
if True:
print(f"Output position are all ascending, except {res[0]}")

# check for invalid segments, those have no matches, allowd only on last segment
match_size = compacted[:-1, 2]
res = numpy.where(match_size == 0)
if True:
# if res[0].size:
print(f"Found empty match at {res[0]}")

# Validate that match are all constant:
print(f"Non constant match section found at {_validate_content(data, compacted)}")

return segments


def _validate_content(data, segments):
data = data.view("uint8")
bad = {}
for i, s in enumerate(segments):
if s[2] == 0:
continue
start = s[0] + s[1]
stop = start + s[2]
res = numpy.where(data[start:stop] - data[start])[0]
if res.size:
bad[i] = res
return bad


def _repack_segments(segments):
"repack a set of segments to be contiguous"
valid = numpy.where(segments.sum(axis=-1) != 0)[0]
repacked1 = segments[valid]
blocks = numpy.where(repacked1[:, -1] == 0)[0]
sub_tot = 0
repacked2 = repacked1.copy()
for start, stop in zip(blocks, numpy.concatenate((blocks[1:], [len(repacked1)]))):
repacked2[start:stop, -1] += sub_tot
sub_tot += repacked1[stop - 1, -1]
repacked3 = repacked2[numpy.where(repacked2[:, 1:3].sum(axis=-1) != 0)[0]]
return repacked3


def test_lz4_writing(
data, segments, workgroup_size=32, prepend_header=False, profile=True
):
"""Function that tests LZ4 writing of a segmented dataset

:param data: some data to play with
:param segments: array on int[:,4]
:param workgroup_size: size of the workgroup
:param profile: tune on profiling for OpenCL
:return: a comperssed datablock

Prints out performance (measured from Python) in ms
"""
t0 = time.perf_counter_ns()
performances = {}
if isinstance(data, bytes):
data = numpy.frombuffer(data, "uint8")
else:
data = data.view("uint8")

segments = segments.astype("int32")

data_size = data.size
num_workgroup = segments.shape[0]
segment_pos = numpy.zeros(2, "int32")
segment_pos[1] = num_workgroup

# Opencl setup
t1 = time.perf_counter_ns()
ctx = pyopencl.create_some_context()
src_file = os.path.abspath(
os.path.join(
os.path.abspath(__file__),
"../../../resources/opencl/codec/lz4_compression.cl",
)
)
src = open(src_file).read()
prg = pyopencl.Program(ctx, src).build()
t1a = time.perf_counter_ns()
if profile:
queue = pyopencl.CommandQueue(
ctx, properties=pyopencl.command_queue_properties.PROFILING_ENABLE
)
else:
queue = pyopencl.CommandQueue(ctx)

data_d = cla.to_device(queue, data)
segment_posd = cla.to_device(queue, segment_pos)
segments_d = cla.to_device(queue, segments)
output_d = cla.zeros(queue, int(1.1 * data.nbytes), "uint8")
output_size_d = cla.to_device(queue, numpy.array([output_d.nbytes, 0], "int32"))

t2 = time.perf_counter_ns()
evt = prg.LZ4_cmp_stage2(
queue,
(workgroup_size * num_workgroup,),
(workgroup_size,),
data_d.data,
numpy.int32(data_size),
segment_posd.data,
segments_d.data,
output_d.data,
output_size_d.data,
numpy.int32(prepend_header),
)
evt.wait()
t3 = time.perf_counter_ns()
buffer_size = output_size_d.get()[1]
compressed = output_d.get()[:buffer_size]
t4 = time.perf_counter_ns()
if 1: # profile:
performances["python_setup"] = (t1 - t0) * 1e-6
performances["opencl_compilation"] = (t1a - t1) * 1e-6
performances["opencl_setup"] = (t2 - t1a) * 1e-6
performances["opencl_run"] = (t3 - t2) * 1e-6
if profile:
performances["opencl_run_profile"] = 1e-6 * (
evt.profile.end - evt.profile.start
)
performances["opencl_retrieve"] = (t4 - t3) * 1e-6
print(json.dumps(performances, indent=2))
return compressed
Loading