From ec6c59e517e322f976240358ebe0fd1999cbe4aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Kieffer?= Date: Wed, 5 Jul 2023 13:42:17 +0200 Subject: [PATCH 01/21] =?UTF-8?q?Validate=20Odd=E2=80=93even=20parallel=20?= =?UTF-8?q?sort?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/opencl/codec/lz4_compression.cl | 213 ++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 src/silx/resources/opencl/codec/lz4_compression.cl diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl new file mode 100644 index 0000000000..3983cc5d21 --- /dev/null +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -0,0 +1,213 @@ +/* + * Project: SILX: Bitshuffle LZ4 compressor + * + * Copyright (C) 2023 European Synchrotron Radiation Facility + * Grenoble, France + * + * Principal authors: J. Kieffer (kieffer@esrf.fr) + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +// define the min and the max of the absolute value +#define MAXA(a,b) (abs(a)>abs(b))?a:b; +#define MINA(a,b) (abs(a)>abs(b))?b:a; + +/* Function is called at the end by the last running wg to compact the output + * + * Maybe would be more efficient to call another kernel to do this in parallel ? + */ +inline void compact_output(global uchar *output_buffer, + int output_size, + global uchar *output_ptr, // Length of all output from different wg + int buffer_size // 1.2x the input buffer size! + ){ + int tid = get_local_id(0); // thread id + int wg = get_local_size(0);// workgroup size + int start_read = 0; + int start_write = output_ptr[0]; + int to_copy; + for (int i=1; i4){ + match_buffer[tid] = match; + } + else{ + match_buffer[tid] = 0; + } + + } + barrier(CLK_LOCAL_MEM_FENCE); + if (running[0] == 0){ + break; + } + } + // retrieve the match_buffer + int out_block = (int) ceil(buffer_size * 1.2); + int write_block = gid * out_block; + for (size_t i=tid; i Date: Wed, 5 Jul 2023 17:54:48 +0200 Subject: [PATCH 02/21] work in progress ... --- .../resources/opencl/codec/lz4_compression.cl | 261 ++++++++++++++---- 1 file changed, 210 insertions(+), 51 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 3983cc5d21..a31096e876 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -28,9 +28,9 @@ * OTHER DEALINGS IN THE SOFTWARE. */ -// define the min and the max of the absolute value -#define MAXA(a,b) (abs(a)>abs(b))?a:b; -#define MINA(a,b) (abs(a)>abs(b))?b:a; +// define the min and the max based on the first element +#define MAXA(a,b) (a.s0>b.s0)?a:b; +#define MINA(a,b) (a.s0>b.s0)?b:a; /* Function is called at the end by the last running wg to compact the output * @@ -64,7 +64,7 @@ inline void compact_output(global uchar *output_buffer, */ inline void sort_odd_even(int start, int stop, - volatile local short *lbuffer){ + volatile local short4 *lbuffer){ int size = stop - start; if (size <2){ return; @@ -134,23 +134,181 @@ inline void sort_odd_even(int start, } } -// test kernel to ensure `sort_odd_even` works -kernel void test_sort(global short *buffer, - int start, - int stop, - volatile local short *lbuffer){ +/* compact segments + * After the scan, begining of litterals and of match are noted and stored in segments. + * In this function one takes 2 segments, starting with a litteral and concatenate the subsequent match + * as a consequence, the number of segments is divided by 2 ! + */ +inline int compact_segments(local volatile short4 segments, + int nb){ int tid = get_local_id(0); // thread id - int gid = get_group_id(0); // group id + short4 merge + if (2*tid0) && (pid there) && (there==0) && (here>4){ + segments[atomic_inc(cnt)] = (short4)(start, 0, here, 0); + } else + if ((here==0) && (there>0){ + segments[atomic_inc(cnt)] = (short4)(start, 0, 0, 0); + } } -} + if (cnt[0] == 1){ + // noting occured, just complete segment + segments[0] = (short4)(start, stop-start, 0, 0); + }else{ + // sort segments + if (tid == 0){ + cnt[0] += 1; + + } + sort_odd_even(0, cnt[0], segments); + // compact segments TODO + } + return end_of_scan; +} + + +// Build token, concatenation of a litteral and a match +inline uchar build_token(short4 segment){ + int lit = segment.s1; + int mat = segment.s2; + int token = ((lit & 15)<<4)|((mat-4)&15); + return token; +} + +// copy collaborative, return the position in output stream. +inline int copy(global uchar* dest, + const int dest_position, + local uchar* source, + const int src_position, + const int length){ + for (int i=get_local_id(0); i=15){ + int rem = segment.s1-15; + while (rem>=255){ + output_buffer[start_cmp] = 255; + start_cmp++; + rem -=255; + } + output_buffer[start_cmp] = rem; + start_cmp++; + } + + //copy litteral. This is collaborative. + start_cmp = copy(output_buffer, start_cmp, + buffer, segment.s0, segment.s1); + + //write offset, here always 1 in 16 bits little endian ! + output_buffer[start_cmp] = 1; + output_buffer[start_cmp+1] = 0; + start_cmp+=2; + + //write match overflow + if (segment.s2>=19){ + int rem = segment.s2-19; + while (rem>=255){ + output_buffer[start_cmp] = 255; + start_cmp++; + rem -=255; + } + output_buffer[start_cmp] = rem; + start_cmp++; + } + }//loop over segments +} /* Main kernel for lz4 compression */ @@ -162,52 +320,53 @@ kernel void lz4_cmp( global uchar *input_buffer, global int *running_grp, // counter with the number of wg still running local uchar *buffer, int buffer_size, - local short *match_buffer // size of the workgroup + local short *match_buffer, // size of the buffer + local short4 segments // contains: start of segment (uncompressed), number of litterals, number of match (offset is enforced to 1) and start of segment (compressed) ) { int tid = get_local_id(0); // thread id int gid = get_group_id(0); // group id int wg = get_local_size(0);// workgroup size + //copy input data to buffer int actual_buffer_size = min(buffer_size, input_size - ((gid+1) * buffer_size)); int start_block = gid * buffer_size; for (int i=tid; i4){ - match_buffer[tid] = match; - } - else{ - match_buffer[tid] = 0; - } - - } - barrier(CLK_LOCAL_MEM_FENCE); - if (running[0] == 0){ - break; - } + /// divide the work in parts, one wg has enough threads + int start = 0; + while (start Date: Thu, 6 Jul 2023 09:38:50 +0200 Subject: [PATCH 03/21] fix code compilation --- .../resources/opencl/codec/lz4_compression.cl | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index a31096e876..f1bf761dd8 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -72,7 +72,7 @@ inline void sort_odd_even(int start, int cycle = (int)(size/2.0+0.5); int tid = get_local_id(0); // thread id int wg = get_local_size(0);// workgroup size - short here, there; + short4 here, there; int pid = start + tid; for (int i=0; i0) && (pid there) && (there==0) && (here>4){ + if ((here > there) && (there==0) && (here>4)){ segments[atomic_inc(cnt)] = (short4)(start, 0, here, 0); } else - if ((here==0) && (there>0){ + if ((here==0) && (there>0)){ segments[atomic_inc(cnt)] = (short4)(start, 0, 0, 0); } } @@ -231,7 +234,7 @@ inline int segmentation(int start, //index where scan4match did start sort_odd_even(0, cnt[0], segments); // compact segments TODO } - return end_of_scan; + return cnt[0]; } @@ -261,7 +264,7 @@ inline int copy(global uchar* dest, * return the end-position in the output stream */ inline int write_lz4(local uchar *buffer, - local volatile short4 segments, // size of the workgroup + local volatile short4 *segments, // size of the workgroup int nb_segments, int start_cmp, global uchar *output_buffer, @@ -308,6 +311,7 @@ inline int write_lz4(local uchar *buffer, start_cmp++; } }//loop over segments + return start_cmp; } /* Main kernel for lz4 compression @@ -321,9 +325,8 @@ kernel void lz4_cmp( global uchar *input_buffer, local uchar *buffer, int buffer_size, local short *match_buffer, // size of the buffer - local short4 segments // contains: start of segment (uncompressed), number of litterals, number of match (offset is enforced to 1) and start of segment (compressed) - ) -{ + local volatile short4 *segments // contains: start of segment (uncompressed), number of litterals, number of match (offset is enforced to 1) and start of segment (compressed) + ){ int tid = get_local_id(0); // thread id int gid = get_group_id(0); // group id int wg = get_local_size(0);// workgroup size @@ -338,14 +341,14 @@ kernel void lz4_cmp( global uchar *input_buffer, /// divide the work in parts, one wg has enough threads int start = 0; - while (start Date: Thu, 6 Jul 2023 16:18:23 +0200 Subject: [PATCH 04/21] WIP --- .../resources/opencl/codec/lz4_compression.cl | 193 ++++++++++-------- 1 file changed, 108 insertions(+), 85 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index f1bf761dd8..bbe5658477 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -28,10 +28,6 @@ * OTHER DEALINGS IN THE SOFTWARE. */ -// define the min and the max based on the first element -#define MAXA(a,b) (a.s0>b.s0)?a:b; -#define MINA(a,b) (a.s0>b.s0)?b:a; - /* Function is called at the end by the last running wg to compact the output * * Maybe would be more efficient to call another kernel to do this in parallel ? @@ -57,6 +53,10 @@ inline void compact_output(global uchar *output_buffer, } } +// short compare and swap function used in sort_odd_even +inline short8 _order_short4(short4 a, short4 b){ + return (a.s00) && (pid there) && (there==0) && (here>4)){ - segments[atomic_inc(cnt)] = (short4)(start, 0, here, 0); + if ((there==1) && (here>4)){ + segments[atomic_inc(cnt)] = (short4)(pid, 0, here, 0); } else - if ((here==0) && (there>0)){ - segments[atomic_inc(cnt)] = (short4)(start, 0, 0, 0); - } - } - if (cnt[0] == 1){ - // noting occured, just complete segment - segments[0] = (short4)(start, stop-start, 0, 0); - }else{ - // sort segments - if (tid == 0){ - cnt[0] += 1; - + if ((here==1) && (there>1)){ + segments[atomic_inc(cnt)] = (short4)(pid, 0, 0, 0); } - sort_odd_even(0, cnt[0], segments); - // compact segments TODO } +// if (cnt[0] == 1){ +// // noting occured, just complete segment +// segments[0] = (short4)(start, stop-start, 0, 0); +// }else{ +// // sort segments +// if (tid == 0){ +// cnt[0] += 1; +// +// } +// sort_odd_even(0, cnt[0], segments); +// // compact segments TODO +// } + barrier(CLK_LOCAL_MEM_FENCE); return cnt[0]; } @@ -373,3 +336,63 @@ kernel void test_sort(global short4 *buffer, buffer[i] = lbuffer[i]; } } +// test kernel to ensure `scan4match` works +kernel void test_scan4match( + global uchar *buffer, // buffer with input data in it, as large as possible, limited by shared memory space. + global short *match, // buffer with output data in it, matches the buffer array + int start, + int stop, + global int *end, + local uchar *lbuffer, + local short *lmatch){ + local volatile int cnt[2]; + int tid = get_local_id(0); // thread id + int gid = get_group_id(0); // group id + int wg = get_local_size(0);// workgroup size + for (int i=tid; i Date: Thu, 6 Jul 2023 17:09:57 +0200 Subject: [PATCH 05/21] fix compaction of segments --- .../resources/opencl/codec/lz4_compression.cl | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index bbe5658477..18dc5e8957 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -69,7 +69,7 @@ inline void sort_odd_even(int start, if (size <2){ return; } - int cycle = (int)ceil(size/2.0); + int cycle = (size+1)/2; int tid = get_local_id(0); // thread id int wg = get_local_size(0);// workgroup size short8 swapped; @@ -80,14 +80,14 @@ inline void sort_odd_even(int start, if (pid+10) && (pid4)){ - segments[atomic_inc(cnt)] = (short4)(pid, 0, here, 0); + if ((there==0) && (here>4)){ + segments[atomic_inc(cnt)] = (short4)(pid+1, 0, here, 0); } else - if ((here==1) && (there>1)){ - segments[atomic_inc(cnt)] = (short4)(pid, 0, 0, 0); + if ((here==0) && (there>0)){ + segments[atomic_inc(cnt)] = (short4)(pid+1, 0, 0, 0); } } -// if (cnt[0] == 1){ -// // noting occured, just complete segment -// segments[0] = (short4)(start, stop-start, 0, 0); -// }else{ -// // sort segments -// if (tid == 0){ -// cnt[0] += 1; -// -// } -// sort_odd_even(0, cnt[0], segments); -// // compact segments TODO -// } + barrier(CLK_LOCAL_MEM_FENCE); + if (cnt[0] == 1){ + // noting occured, just complete segment + segments[0] = (short4)(start, end-start, 0, 0); + }else{ + // sort segments + sort_odd_even(0, cnt[0], segments); + //add end position as a litteral + if (tid==0){ + segments[cnt[0]] = (short4)(end, 0, 0, 0); + cnt[0]++; + } + barrier(CLK_LOCAL_MEM_FENCE); + // compact segments + cnt[0] = compact_segments(segments,cnt[0]); + } barrier(CLK_LOCAL_MEM_FENCE); return cnt[0]; } @@ -390,7 +395,7 @@ kernel void test_segmentation(global uchar *buffer, lmatch, cnt); if ((tid==0) && (gid==0))printf("scanned up to %d\n", res); - int res2 = segmentation(start, stop, lmatch, lsegments, cnt); + int res2 = segmentation(start, stop, res, lmatch, lsegments, cnt); nbsegment[0] = res2; if (tid Date: Fri, 7 Jul 2023 17:48:53 +0200 Subject: [PATCH 06/21] multipass segment extraction works almost ... issue in spotting the end of loop. --- .../resources/opencl/codec/lz4_compression.cl | 187 +++++++++++++++--- 1 file changed, 158 insertions(+), 29 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 18dc5e8957..2cd3588680 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -28,6 +28,16 @@ * OTHER DEALINGS IN THE SOFTWARE. */ +// This is used in tests to simplify the signature of those test kernels. +#ifndef TEST_WG +#define TEST_WG 64 +#endif +#ifndef TEST_BUFFER +#define TEST_BUFFER 1024 +#endif +// TODO generalize test methods to use this + + /* Function is called at the end by the last running wg to compact the output * * Maybe would be more efficient to call another kernel to do this in parallel ? @@ -99,24 +109,50 @@ inline void sort_odd_even(int start, * as a consequence, the number of segments is divided by 2 ! */ inline int compact_segments(local volatile short4 *segments, - int nb){ + local volatile int* cnt){ int tid = get_local_id(0); // thread id - short4 merge; - if (2*tid=w) && (tid0) && (pid4)){ - segments[atomic_inc(cnt)] = (short4)(pid+1, 0, here, 0); + segments[atomic_inc(cnt)] = (short4)(pid+1, 0, here, 1); } else - if ((here==0) && (there>0)){ + if ((here==0) && (there>0) && (tid>5) && match_buffer[tid-5]>4){ segments[atomic_inc(cnt)] = (short4)(pid+1, 0, 0, 0); } } barrier(CLK_LOCAL_MEM_FENCE); + if (cnt[0] == 1){ - // noting occured, just complete segment - segments[0] = (short4)(start, end-start, 0, 0); - }else{ + // nothing occured in considered + if (tid == 0){ + // noting occured, just complete former segment + short4 seg=segments[0]; + if (seg.s2 == 0){ //there was no match, just complete the former segment + seg.s1 += end-start; + segments[0] = seg; + } + else{ // noting occured, but former segment has already some match ! + if (tid==0){ + segments[atomic_inc(cnt)] = (short4)(start, end-start, 0, 0); + } + } + } + } + else{ // sort segments sort_odd_even(0, cnt[0], segments); //add end position as a litteral if (tid==0){ segments[cnt[0]] = (short4)(end, 0, 0, 0); - cnt[0]++; + atomic_inc(cnt); } barrier(CLK_LOCAL_MEM_FENCE); + if (tid==0){ + printf("after match scan, before compaction, cnt=%d start=%d end=%d stop=%d\n",cnt[0], start, end, stop); + } // compact segments - cnt[0] = compact_segments(segments,cnt[0]); + cnt[0] = compact_segments(segments, cnt); } barrier(CLK_LOCAL_MEM_FENCE); return cnt[0]; @@ -378,10 +434,12 @@ kernel void test_segmentation(global uchar *buffer, global int *nbsegment, global short4 *segments // size of the workgroup ){ - local volatile int cnt[2]; - local volatile short4 lsegments[64]; - local uchar lbuffer[1024]; - local short lmatch[64]; + local volatile int cnt[2]; + local volatile int seg[1]; + local volatile short4 lsegments[TEST_WG]; + local uchar lbuffer[TEST_BUFFER]; + local short lmatch[TEST_WG]; + seg[0] = 0; int tid = get_local_id(0); // thread id int gid = get_group_id(0); // group id @@ -395,9 +453,80 @@ kernel void test_segmentation(global uchar *buffer, lmatch, cnt); if ((tid==0) && (gid==0))printf("scanned up to %d\n", res); - int res2 = segmentation(start, stop, res, lmatch, lsegments, cnt); + int res2 = segmentation(start, stop, res, lmatch, lsegments, seg); nbsegment[0] = res2; if (tid %d to memory %d-%d\n",res2-1,seg[1], seg[1]+res2-1); + barrier(CLK_GLOBAL_MEM_FENCE); + if (tid == 0){ + seg[1] += res2-1; + lsegments[0] = lsegments[res2-1]; + seg[0] = 1; + short4 seg = lsegments[0]; + printf("copied seg[0] (was %d) (%d, %d, %d, %d)\n",res2-1,seg.s0,seg.s1,seg.s2,seg.s3); + } + barrier(CLK_LOCAL_MEM_FENCE); + //memset local segments above first one, + if (tid>1) lsegments[tid] = (short4)(0,0,0,0); + barrier(CLK_LOCAL_MEM_FENCE); + start = res; + if (tid==5)printf("end of loop, start=%d res=%d size=%d\n\n", start, res, actual_buffer_size); + } + barrier(CLK_GLOBAL_MEM_FENCE); + if (tid == 0){ + segments[seg[1]++] = lsegments[0]; + nbsegment[0] = seg[1]; + printf("last copy, total segments: %d\n", seg[1]); + } +} + \ No newline at end of file From 39675ce06fbd87f23b4a2bf56acfd4d46faccf69 Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Tue, 8 Aug 2023 11:58:53 +0200 Subject: [PATCH 07/21] address issue of run-away end of block --- .../resources/opencl/codec/lz4_compression.cl | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 2cd3588680..11339d10ad 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -158,41 +158,41 @@ inline int scan4match( local uchar *buffer, // buffer with input data in int start, int stop, local short *match_buffer, // size of the wg is enough - volatile local int* cnt // size 2 is enough + volatile local int* cnt // size 1 is enough, idx0: largest index value found ){ int wg = get_local_size(0);// workgroup size int tid = get_local_id(0); // thread id int size = stop-start; - cnt[0] = min(wg, size); - cnt[1] = 0; + cnt[0] = 0; // memset match_buffer match_buffer[tid] = -1; barrier(CLK_LOCAL_MEM_FENCE); - + int i; // position index int pid = tid + start; uchar here = (pid < stop)?buffer[pid]:255; int match = 0; uchar valid = 1; - for (int i=pid+1; i Date: Tue, 8 Aug 2023 15:28:09 +0200 Subject: [PATCH 08/21] Start compression at 4 occurience --- src/silx/resources/opencl/codec/lz4_compression.cl | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 11339d10ad..50a828ef7c 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -35,6 +35,10 @@ #ifndef TEST_BUFFER #define TEST_BUFFER 1024 #endif +#ifndef MIN_MATCH +#define MIN_MATCH 4 +#endif + // TODO generalize test methods to use this @@ -186,8 +190,8 @@ inline int scan4match( local uchar *buffer, // buffer with input data in } } } - if ((valid) &&(i==stop)){ // we reached the end of the block: stop anyway - atomic_max(cnt, stop); + if ((valid) && (i==stop)){ // we reached the end of the block: stop anyway + cnt[0] = stop; match_buffer[tid] = match; valid = 0; } @@ -217,10 +221,11 @@ inline int segmentation(int start, //index where scan4match did start if ((tid>0) && (pid4)){ + if ((there==0) && (here>=MIN_MATCH)){ segments[atomic_inc(cnt)] = (short4)(pid+1, 0, here, 1); } else - if ((here==0) && (there>0) && (tid>5) && match_buffer[tid-5]>4){ +// if ((here==0) && (there>0) && (tid>5) && match_buffer[tid-5]>4){ + if ((here==0) && (there>0) && (tid>=MIN_MATCH) && match_buffer[tid-MIN_MATCH]>=MIN_MATCH){ segments[atomic_inc(cnt)] = (short4)(pid+1, 0, 0, 0); } } From a06570de658692957d0c59f06df6158b0b473fff Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Wed, 9 Aug 2023 17:33:18 +0200 Subject: [PATCH 09/21] Write LZ4 works Multi-block WIP --- .../resources/opencl/codec/lz4_compression.cl | 293 +++++++++++++++--- 1 file changed, 258 insertions(+), 35 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 50a828ef7c..09ba62aa67 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -256,9 +256,9 @@ inline int segmentation(int start, //index where scan4match did start atomic_inc(cnt); } barrier(CLK_LOCAL_MEM_FENCE); - if (tid==0){ - printf("after match scan, before compaction, cnt=%d start=%d end=%d stop=%d\n",cnt[0], start, end, stop); - } +// if (tid==0){ +// printf("after match scan, before compaction, cnt=%d start=%d end=%d stop=%d\n",cnt[0], start, end, stop); +// } // compact segments cnt[0] = compact_segments(segments, cnt); } @@ -297,52 +297,113 @@ inline int write_lz4(local uchar *buffer, int nb_segments, int start_cmp, global uchar *output_buffer, - int stop + int stop, //output buffer max size + int continuation // set to 0 to indicate this is the last segment ) { for (int i=0; i=15){ - int rem = segment.s1-15; + int token_idx = start_cmp++; + int rem; + int litter = segment.s1; + int match = segment.s2; + if (litter >= 15){ + segment.s1 = 15; + rem = litter - 15; while (rem>=255){ - output_buffer[start_cmp] = 255; - start_cmp++; - rem -=255; + output_buffer[start_cmp++] = 255; + rem -= 255; } - output_buffer[start_cmp] = rem; - start_cmp++; + output_buffer[start_cmp++] = rem; } - + if (match >= 19){ + segment.s2 = 19; + } + output_buffer[token_idx] = build_token(segment); + //copy litteral. This is collaborative. start_cmp = copy(output_buffer, start_cmp, - buffer, segment.s0, segment.s1); + buffer, segment.s0, litter); - //write offset, here always 1 in 16 bits little endian ! - output_buffer[start_cmp] = 1; - output_buffer[start_cmp+1] = 0; - start_cmp+=2; - - //write match overflow - if (segment.s2>=19){ - int rem = segment.s2-19; - while (rem>=255){ - output_buffer[start_cmp] = 255; - start_cmp++; - rem -=255; + if ((continuation)||(i+1=19){ + rem = segment.s2-19; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; } - output_buffer[start_cmp] = rem; - start_cmp++; } }//loop over segments return start_cmp; } +// calculate the length of a segment in compressed form +inline int len_segment(int4 segment){ + int lit = segment.s1; + int mat = segment.s2-4; + int size = 3+lit; + if (lit>=15){ + size++; + lit -= 15; + } + while (lit>255){ + size++; + lit-=255; + } + if (mat>=15){ + size++; + mat -= 15; + } + while (mat>255){ + size++; + mat-=255; + } + return size; +} + +// store several local segments into the global memory starting at position. return the position in the output stream +inline int store_segments(local short4 *local_segments, + int nb_segments, + global int4 *global_segments, + int global_idx, + int input_stream_idx, + int output_stream_idx, + int last, // set to true to concatenate the match and the litteral for last block + local volatile int* cnt //size=1 is eough + ){ + cnt[0] = output_stream_idx; + barrier(CLK_LOCAL_MEM_FENCE); + //this is serial for conviniance ! + if (get_local_id(0)==0){ + for (int i=0; i1) + out_ptr = write_lz4(lbuffer, lsegments, + res2-1, // -1? to keep the last for concatenation + out_ptr, output,max_out, 1); + + barrier(CLK_GLOBAL_MEM_FENCE); + if (tid == 0){ + seg[1] += res2-1; + lsegments[0] = lsegments[res2-1]; + seg[0] = 1; +// short4 seg = lsegments[0]; + } + barrier(CLK_LOCAL_MEM_FENCE); + //memset local segments above first one, + if (tid>1) lsegments[tid] = (short4)(0,0,0,0); + barrier(CLK_LOCAL_MEM_FENCE); + start = res; + } + barrier(CLK_GLOBAL_MEM_FENCE); + if (tid == 0){ + short4 segment = lsegments[0]; + segment.s1 += segment.s2; + segment.s2 = 0; + lsegments[0] = segment; + + segments[seg[1]++] = segment; + nbsegment[0] = seg[1]; + printf("last segment %d %d %d %d\n", segment.s0, segment.s1, segment.s2, segment.s3); + } + // write last segment + + out_ptr = write_lz4(lbuffer, lsegments, + 1, out_ptr, output, max_out, 0); + + output_size[0] = out_ptr; + + +} + +// kernel to test multiple blocks in parallel with last to finish who concatenates segments WG<64 buffer<1024. +// segment description: s0: position in input buffer s1: number of litterals, s2: number of match, s3: position in output buffer +kernel void test_multiblock(global uchar *buffer, + int input_size, + global int *nbsegment, // size = number of workgroup launched + global int4 *segments, // size of the workgroup * number of workgroup / 4 + global uchar *output, // output buffer + global int *output_size // output buffer size, max in input, actual value in output +){ + local volatile int seg[2]; // #0:number of segments in local mem, #1 in global mem + local volatile int cnt[1]; // end position of the scan + local volatile short4 lsegments[TEST_WG]; + local uchar lbuffer[TEST_BUFFER]; + local short lmatch[TEST_WG]; + + + int tid = get_local_id(0); // thread id + int gid = get_group_id(0); // group id + int wg = get_local_size(0);// workgroup size + int ng = get_num_groups(0);// number of groups + + int output_block_size = 0;//TEST_BUFFER*1.1; + int output_idx = output_block_size*gid; + int segment_idx = nbsegment[gid]; + if (tid==0)printf("gid %d starts writing segments at %d\n", gid, segment_idx); + int local_start = 0; + int global_start = TEST_BUFFER*gid; + int local_stop = input_size - global_start; + if (local_stop<=0){ + if (tid==0)printf("gid %d local_stop: %d \n",gid, local_stop); + return; + } + + int actual_buffer_size = min(TEST_BUFFER, local_stop) ; + + int watchdog = (local_stop + wg-1)/wg; //prevent code from running way ! + int res, res2, out_ptr=0, max_out=output_size[0]; + + //copy input to local buffer + for (int i=tid; i1) lsegments[tid] = (short4)(0,0,0,0); + barrier(CLK_LOCAL_MEM_FENCE); + local_start = res; + } + barrier(CLK_GLOBAL_MEM_FENCE); + if (tid==0)printf("gid %d store final segments\n",gid); + output_idx = store_segments(lsegments, 1, // last segment is treated here + segments, segment_idx, global_start, output_idx, gid+1==ng, cnt); + output_size[gid] = output_idx; + +} \ No newline at end of file From 41fa5d21ce28c8644417b950e2d7eea341717f70 Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Thu, 10 Aug 2023 09:52:46 +0200 Subject: [PATCH 10/21] manage when segment storage is too small --- .../resources/opencl/codec/lz4_compression.cl | 61 ++++++++++++------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 09ba62aa67..ec74137ac4 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -373,34 +373,48 @@ inline int len_segment(int4 segment){ } // store several local segments into the global memory starting at position. return the position in the output stream -inline int store_segments(local short4 *local_segments, +inline int store_segments(local volatile short4 *local_segments, int nb_segments, global int4 *global_segments, + int max_idx, // last position achievable in global segment array int global_idx, int input_stream_idx, int output_stream_idx, + int block_size, // size of the block under analysis int last, // set to true to concatenate the match and the litteral for last block local volatile int* cnt //size=1 is eough ){ cnt[0] = output_stream_idx; barrier(CLK_LOCAL_MEM_FENCE); - //this is serial for conviniance ! - if (get_local_id(0)==0){ - for (int i=0; i %d\n", get_group_id(0), global_idx, max_idx, segment.s0, block_size); + segment.s1 = block_size - segment.s0; + segment.s2 = 0; + } + // manage last segment in block + if (last){ + segment.s1+=segment.s2; + segment.s2 = 0; + } + segment.s0 += input_stream_idx; + segment.s3 = output_stream_idx; + + output_stream_idx += len_segment(segment); + global_segments[global_idx++]=segment; + if (emergency) break; } - segment.s0 += input_stream_idx; - segment.s3 = output_stream_idx; - - output_stream_idx += len_segment(segment); - global_segments[global_idx++]=segment; + cnt[0] = output_stream_idx; } - cnt[0] = output_stream_idx; + barrier(CLK_LOCAL_MEM_FENCE); } - barrier(CLK_LOCAL_MEM_FENCE); return cnt[0]; } @@ -690,8 +704,8 @@ kernel void test_write(global uchar *buffer, // segment description: s0: position in input buffer s1: number of litterals, s2: number of match, s3: position in output buffer kernel void test_multiblock(global uchar *buffer, int input_size, - global int *nbsegment, // size = number of workgroup launched - global int4 *segments, // size of the workgroup * number of workgroup / 4 + global int *nbsegment, // size = number of workgroup launched +1, initially contains the index where to store segments + global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup global uchar *output, // output buffer global int *output_size // output buffer size, max in input, actual value in output ){ @@ -710,16 +724,17 @@ kernel void test_multiblock(global uchar *buffer, int output_block_size = 0;//TEST_BUFFER*1.1; int output_idx = output_block_size*gid; int segment_idx = nbsegment[gid]; - if (tid==0)printf("gid %d starts writing segments at %d\n", gid, segment_idx); + int segment_max = nbsegment[gid+1]; + if (tid==0)printf("gid %d writes segments in range %d-%d\n", gid, segment_idx, segment_max); int local_start = 0; int global_start = TEST_BUFFER*gid; - int local_stop = input_size - global_start; + int local_stop = min(TEST_BUFFER, input_size - global_start); if (local_stop<=0){ if (tid==0)printf("gid %d local_stop: %d \n",gid, local_stop); return; } - int actual_buffer_size = min(TEST_BUFFER, local_stop) ; +// int actual_buffer_size = min(TEST_BUFFER, local_stop) ; int watchdog = (local_stop + wg-1)/wg; //prevent code from running way ! int res, res2, out_ptr=0, max_out=output_size[0]; @@ -733,7 +748,7 @@ kernel void test_multiblock(global uchar *buffer, } barrier(CLK_LOCAL_MEM_FENCE); - while ((watchdog--) && (local_start+1 Date: Thu, 10 Aug 2023 17:35:36 +0200 Subject: [PATCH 11/21] start sclae up to larger problems --- .../resources/opencl/codec/lz4_compression.cl | 227 ++++++++++++------ 1 file changed, 158 insertions(+), 69 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index ec74137ac4..a361b63d91 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -29,44 +29,23 @@ */ // This is used in tests to simplify the signature of those test kernels. -#ifndef TEST_WG -#define TEST_WG 64 +#ifndef WORKGROUP_SIZE +#define WORKGROUP_SIZE 64 #endif -#ifndef TEST_BUFFER -#define TEST_BUFFER 1024 +//segment size should be buffer_size/4 +#ifndef SEGMENT_SIZE +#define SEGMENT_SIZE 256 +#endif + +#ifndef BUFFER_SIZE +#define BUFFER_SIZE 1024 #endif #ifndef MIN_MATCH #define MIN_MATCH 4 #endif -// TODO generalize test methods to use this -/* Function is called at the end by the last running wg to compact the output - * - * Maybe would be more efficient to call another kernel to do this in parallel ? - */ -inline void compact_output(global uchar *output_buffer, - int output_size, - global uchar *output_ptr, // Length of all output from different wg - int buffer_size // 1.2x the input buffer size! - ){ - int tid = get_local_id(0); // thread id - int wg = get_local_size(0);// workgroup size - int start_read = 0; - int start_write = output_ptr[0]; - int to_copy; - for (int i=1; i %d\n", get_group_id(0), global_idx, max_idx, segment.s0, block_size); + printf("gid %lu emergency %d %d, segment starts at %d -> %d\n", get_group_id(0), global_idx, max_idx, segment.s0, block_size); segment.s1 = block_size - segment.s0; segment.s2 = 0; } @@ -418,6 +400,92 @@ inline int store_segments(local volatile short4 *local_segments, return cnt[0]; } +/* concatenate all segments (stored in global memory) in such a way that they are adjacent. + * This function is to be called by the latest workgroup running. + * + * There are tons of synchro since data are read and written from same buffer. + */ +inline int concatenate_segments( + global int2 *segment_ptr, // size = number of workgroup launched, contains start and stop position + global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + + global int *output_size, // output buffer size, max in input, actual value in output + local volatile int *lsegment_idx, // index of segment offset, shared + local volatile int4 *last_segment // shared memory with the last segment to share between threads + ){ + + int tid = get_local_id(0); // thread id + int gid = get_group_id(0); // group id + int wg = get_local_size(0);// workgroup size + int ng = get_num_groups(0);// number of groups + +// if (tid==0) printf("gid %d, running concat_segments \n", gid); + int4 segment; + barrier(CLK_GLOBAL_MEM_FENCE); + int output_idx = output_size[0]; + lsegment_idx[0] = segment_ptr[0].s1; + segment = segments[max(0, lsegment_idx[0]-1)]; + if ((tid==0) && (segment.s0>0) && (segment.s2==0) && (ng>1)){ + last_segment[0] = segment; + lsegment_idx[0] -= 1; + } + + last_segment[0] = (int4)(0,0,0,0); + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0) printf("groups range from 1 to %d. segment_idx=%d, output_ptr=%d\n",ng, lsegment_idx[0], output_idx); + for (int grp=1; grp0) && (last_segment[0].s2==0)){ + segment = segments[seg_ptr.s0]; + segment.s0 = last_segment[0].s0; + segment.s1 = segment.s0+segment.s1-last_segment[0].s0; + output_idx += len_segment(segment)-len_segment(last_segment[0]); + last_segment[0] = (int4)(0,0,0,0); + segments[seg_ptr.s0] = segment; + } + barrier(CLK_LOCAL_MEM_FENCE); + for (int i=low; i0) && (segment.s2==0)&&(grp+1 %d to memory %d-%d\n",res2-1,seg[1], seg[1]+res2-1); - barrier(CLK_GLOBAL_MEM_FENCE); + barrier(CLK_LOCAL_MEM_FENCE); if (tid == 0){ seg[1] += res2-1; lsegments[0] = lsegments[res2-1]; @@ -607,7 +675,7 @@ kernel void test_multi(global uchar *buffer, start = res; if (tid==5)printf("end of loop, start=%d res=%d size=%d\n\n", start, res, actual_buffer_size); } - barrier(CLK_GLOBAL_MEM_FENCE); + barrier(CLK_LOCAL_MEM_FENCE); if (tid == 0){ segments[seg[1]++] = lsegments[0]; nbsegment[0] = seg[1]; @@ -626,15 +694,15 @@ kernel void test_write(global uchar *buffer, ){ local volatile int seg[2]; // #0:number of segments in local mem, #1 in global mem local volatile int cnt[1]; // end position of the scan - local volatile short4 lsegments[TEST_WG]; - local uchar lbuffer[TEST_BUFFER]; - local short lmatch[TEST_WG]; + local volatile short4 lsegments[SEGMENT_SIZE]; + local uchar lbuffer[BUFFER_SIZE]; + local short lmatch[WORKGROUP_SIZE]; int tid = get_local_id(0); // thread id int gid = get_group_id(0); // group id int wg = get_local_size(0);// workgroup size - int actual_buffer_size = min(TEST_BUFFER, stop); + int actual_buffer_size = min(BUFFER_SIZE, stop); int watchdog = (stop-start+wg-1)/wg; //prevent code from running way ! int res, res2, out_ptr=0, max_out=output_size[0]; @@ -666,7 +734,7 @@ kernel void test_write(global uchar *buffer, res2-1, // -1? to keep the last for concatenation out_ptr, output,max_out, 1); - barrier(CLK_GLOBAL_MEM_FENCE); + barrier(CLK_LOCAL_MEM_FENCE); if (tid == 0){ seg[1] += res2-1; lsegments[0] = lsegments[res2-1]; @@ -679,7 +747,7 @@ kernel void test_write(global uchar *buffer, barrier(CLK_LOCAL_MEM_FENCE); start = res; } - barrier(CLK_GLOBAL_MEM_FENCE); + barrier(CLK_LOCAL_MEM_FENCE); if (tid == 0){ short4 segment = lsegments[0]; segment.s1 += segment.s2; @@ -704,16 +772,18 @@ kernel void test_write(global uchar *buffer, // segment description: s0: position in input buffer s1: number of litterals, s2: number of match, s3: position in output buffer kernel void test_multiblock(global uchar *buffer, int input_size, - global int *nbsegment, // size = number of workgroup launched +1, initially contains the index where to store segments - global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup - global uchar *output, // output buffer - global int *output_size // output buffer size, max in input, actual value in output + global int2 *segment_ptr, // size = number of workgroup launched, contains start and stop position + global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + global uchar *output, // output buffer + global int *output_size, // output buffer size, max in input, actual value in output + global int *wgcnt // counter with workgroups still running ){ local volatile int seg[2]; // #0:number of segments in local mem, #1 in global mem local volatile int cnt[1]; // end position of the scan - local volatile short4 lsegments[TEST_WG]; - local uchar lbuffer[TEST_BUFFER]; - local short lmatch[TEST_WG]; + local volatile short4 lsegments[SEGMENT_SIZE]; + local uchar lbuffer[BUFFER_SIZE]; + local short lmatch[WORKGROUP_SIZE]; + local volatile int4 last_segment[1]; int tid = get_local_id(0); // thread id @@ -721,20 +791,21 @@ kernel void test_multiblock(global uchar *buffer, int wg = get_local_size(0);// workgroup size int ng = get_num_groups(0);// number of groups - int output_block_size = 0;//TEST_BUFFER*1.1; + int output_block_size = 0; int output_idx = output_block_size*gid; - int segment_idx = nbsegment[gid]; - int segment_max = nbsegment[gid+1]; - if (tid==0)printf("gid %d writes segments in range %d-%d\n", gid, segment_idx, segment_max); + int2 seg_ptr = segment_ptr[gid]; + int segment_idx = seg_ptr.s0; + int segment_max = seg_ptr.s1; +// if (tid==0)printf("gid %d writes segments in range %d-%d\n", gid, segment_idx, segment_max); int local_start = 0; - int global_start = TEST_BUFFER*gid; - int local_stop = min(TEST_BUFFER, input_size - global_start); + int global_start = BUFFER_SIZE*gid; + int local_stop = min(BUFFER_SIZE, input_size - global_start); if (local_stop<=0){ if (tid==0)printf("gid %d local_stop: %d \n",gid, local_stop); return; } -// int actual_buffer_size = min(TEST_BUFFER, local_stop) ; +// int actual_buffer_size = min(BUFFER_SIZE, local_stop) ; int watchdog = (local_stop + wg-1)/wg; //prevent code from running way ! int res, res2, out_ptr=0, max_out=output_size[0]; @@ -754,12 +825,12 @@ kernel void test_multiblock(global uchar *buffer, res2 = segmentation(local_start, local_stop, res, lmatch, lsegments, seg); // copy segments to global memory: int segment_to_copy = res2 - 1; - if (tid==0)printf("gid %d store %d segments at %d\n",gid, segment_to_copy, segment_idx); +// if (tid==0)printf("gid %d store %d segments at %d\n",gid, segment_to_copy, segment_idx); output_idx = store_segments(lsegments, segment_to_copy, // last segment is kept for the future ... segments, segment_max, segment_idx, global_start, output_idx, local_stop, 0, cnt); segment_idx += segment_to_copy; - barrier(CLK_GLOBAL_MEM_FENCE); + barrier(CLK_LOCAL_MEM_FENCE); if (tid == 0){ seg[1] += segment_to_copy; lsegments[0] = lsegments[segment_to_copy]; @@ -771,10 +842,28 @@ kernel void test_multiblock(global uchar *buffer, barrier(CLK_LOCAL_MEM_FENCE); local_start = res; } - barrier(CLK_GLOBAL_MEM_FENCE); - if (tid==0)printf("gid %d store final segments\n",gid); + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0)printf("gid %d store final segments\n",gid); output_idx = store_segments(lsegments, 1, // last segment is treated here segments, segment_max, segment_idx, global_start, output_idx, local_stop, gid+1==ng, cnt); - output_size[gid] = output_idx; - + output_size[gid] = output_idx; + seg_ptr.s1 = ++segment_idx; + segment_ptr[gid] = seg_ptr; + + barrier(CLK_LOCAL_MEM_FENCE); + barrier(CLK_GLOBAL_MEM_FENCE); + // last group running performs the cumsum and compaction of indices + if (tid==0){ + cnt[0] = (atomic_dec(wgcnt)==1); + } + barrier(CLK_LOCAL_MEM_FENCE); + if (cnt[0]){ + int end_ptr = concatenate_segments(segment_ptr, // size = number of workgroup launched, contains start and stop position + segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + output_size, // output buffer size, max in input, actual value in output + cnt, // index of segment offset, shared + last_segment // shared memory with the last segment to share between threads + ); + segment_ptr[0] = (int2)(0, end_ptr); + } } \ No newline at end of file From 0cdac0ed66be2dca82c3af6b79223982547f2c3e Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Fri, 11 Aug 2023 16:03:55 +0200 Subject: [PATCH 12/21] Simple function which tests and validate the LZ4 analysis --- src/silx/opencl/codec/bitshuffle_lz4.py | 124 +++++++++++++++++++++++- 1 file changed, 121 insertions(+), 3 deletions(-) diff --git a/src/silx/opencl/codec/bitshuffle_lz4.py b/src/silx/opencl/codec/bitshuffle_lz4.py index 508e5bdb09..72b3e96f52 100644 --- a/src/silx/opencl/codec/bitshuffle_lz4.py +++ b/src/silx/opencl/codec/bitshuffle_lz4.py @@ -27,22 +27,24 @@ # OTHER DEALINGS IN THE SOFTWARE. """ -This module provides a class for CBF byte offset compression/decompression. +This module provides a class for bitshuffle-LZ4 compression/decompression. """ __authors__ = ["Jérôme Kieffer"] __contact__ = "jerome.kieffer@esrf.eu" __license__ = "MIT" __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" -__date__ = "09/11/2022" +__date__ = "11/08/2023" __status__ = "production" - +import time import os import struct import numpy +import json from ..common import ocl, pyopencl, kernel_workgroup_size from ..processing import BufferDescription, EventDescription, OpenclProcessing +import pyopencl.array as cla import logging logger = logging.getLogger(__name__) @@ -175,3 +177,119 @@ def decompress(self, raw, out=None, wg=None, nbytes=None): return out __call__ = decompress + + +def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=None, profile=True): + """Function that tests LZ4 analysis (i.e. the correctness of segments) on a dataset. + + :param data: some data to play with + :paam block_size: how many items are treated by a workgroup + :param workgroup_size: size of the workgroup + :param segments_size: by default, data_size/4 + :param profile: tune on profiling for OpenCL + :return: a set of segment containing: + - position in the input stream + - length of the littral section + - length of the matching section + - position in the output stream + + Prints out performance (measured from Python) in ms + """ + t0 = time.perf_counter_ns() + performances = {} + if isinstance(data, bytes): + data = numpy.frombuffer(data, "uint8") + else: + data = data.view("uint8") + data_size = data.size + num_workgroup = (data_size+block_size-1)//block_size + if segments_size is None: + segments_size = block_size//4 + + segment_pos = numpy.zeros((num_workgroup,2), "int32") + tmp_sp = numpy.arange(0,segments_size*(num_workgroup+1), segments_size) + segment_pos[:,0] = tmp_sp[:-1] + segment_pos[:,1] = tmp_sp[1:] + + + # Opencl setup + t1 = time.perf_counter_ns() + ctx = pyopencl.create_some_context() + src_file = os.path.abspath(os.path.join(os.path.abspath(__file__),"../../../resources/opencl/codec/lz4_compression.cl")) + src = open(src_file).read() + prg = pyopencl.Program(ctx, src).build(options=f"-DBUFFER_SIZE={block_size} -DSEGMENT_SIZE={segments_size} -DWORKGROUP_SIZE={workgroup_size}") + t1a = time.perf_counter_ns() + if profile: + queue = pyopencl.CommandQueue(ctx, properties=pyopencl.command_queue_properties.PROFILING_ENABLE) + else: + queue = pyopencl.CommandQueue(ctx) + + data_d = cla.to_device(queue, data) + segment_posd = cla.to_device(queue, segment_pos) + segments_d = cla.zeros(queue, (segments_size*num_workgroup,4), "int32") + wgcnt_d = cla.to_device(queue, numpy.array([num_workgroup], "int32")) + output_d = cla.empty_like(data_d) + output_size_d = cla.zeros(queue, num_workgroup, "int32") + + t2 = time.perf_counter_ns() + prg.test_multiblock(queue, (workgroup_size*num_workgroup,), (workgroup_size,), + data_d.data, numpy.int32(data_size), + segment_posd.data, + segments_d.data, output_d.data, output_size_d.data, wgcnt_d.data + ).wait() + t3 = time.perf_counter_ns() + final_positons = segment_posd.get() + segments = segments_d.get() + segments = segments[final_positons[0,0]:final_positons[0,1]] + t4 = time.perf_counter_ns() + if 1: #profile: + performances["python_setup"] = (t1-t0)*1e-6 + performances["opencl_compilation"] = (t1a-t1)*1e-6 + performances["opencl_setup"] = (t2-t1a)*1e-6 + performances["opencl_run"] = (t3-t2)*1e-6 + performances["opencl_retrieve"] = (t4-t3)*1e-6 + print(json.dumps(performances, indent=2)) + # Check validity: input indexes + inp_idx = segments[:,0] + res = numpy.where((inp_idx[1:]-inp_idx[:-1])<=0) +# if res[0].size: + if 1: + print(f"Input position are all ascending except {res[0]}") + # Check validity: input size + size = segments[:,1:3].sum() + if 1: +# if data.size != size: + print(f"Input size matches, got {size}, expected {data.size}") + + # Check validity: input size (bis) + size = segments[-1,:-1].sum() +# if data.size != size: + if 1: + print(f"Input size does match the end of segments, got {size}, expected {data.size}") + + # Check validity: output indexes + out_idx = segments[:,-1] + res = numpy.where((out_idx[1:]-out_idx[:-1])<=0) +# if res[0].size: + if 1: + print(f"Output position are all ascending, except {res[0]}") + + #check for invalid segments, those have no matches, allowd only on last segment + match_size = segments[:-1,2] + res = numpy.where(match_size==0) + if 1: +# if res[0].size: + print(f"Found empty match at {res[0]}") + + # Validate that match are all constant: + bad = {} + for i,s in enumerate(segments): + if s[2] == 0: continue + start = s[0]+s[1] + stop = start + s[2] + res = numpy.where(data[start:stop]-data[start])[0] + if res.size: + bad[i] = res + print(f"Non constant match section found at {bad}") + + return segments From b7c6c3a2090b6bbf248aeccf8a2bb9432ce3e914 Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Fri, 11 Aug 2023 16:31:07 +0200 Subject: [PATCH 13/21] output is not used --- src/silx/resources/opencl/codec/lz4_compression.cl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index a361b63d91..3f00cb1aa5 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -770,12 +770,12 @@ kernel void test_write(global uchar *buffer, // kernel to test multiple blocks in parallel with last to finish who concatenates segments WG<64 buffer<1024. // segment description: s0: position in input buffer s1: number of litterals, s2: number of match, s3: position in output buffer -kernel void test_multiblock(global uchar *buffer, +kernel void LZ4_cmp_stage1(global uchar *buffer, int input_size, global int2 *segment_ptr, // size = number of workgroup launched, contains start and stop position global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup - global uchar *output, // output buffer - global int *output_size, // output buffer size, max in input, actual value in output + int final_compaction, // set to 0 to prevent the final compaction. allows the analysis of intermediate results + global int *output_size, // output buffer size, max in input, actual value in output, size should be at least the global int *wgcnt // counter with workgroups still running ){ local volatile int seg[2]; // #0:number of segments in local mem, #1 in global mem @@ -857,7 +857,7 @@ kernel void test_multiblock(global uchar *buffer, cnt[0] = (atomic_dec(wgcnt)==1); } barrier(CLK_LOCAL_MEM_FENCE); - if (cnt[0]){ + if (cnt[0] && final_compaction){ int end_ptr = concatenate_segments(segment_ptr, // size = number of workgroup launched, contains start and stop position segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup output_size, // output buffer size, max in input, actual value in output From 4194ef9adf6faede4335d7b4a7c5ea2629fe9404 Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Wed, 16 Aug 2023 17:48:38 +0200 Subject: [PATCH 14/21] work on repack of segments in python --- src/silx/opencl/codec/bitshuffle_lz4.py | 75 +++++++++++++++++-------- 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/src/silx/opencl/codec/bitshuffle_lz4.py b/src/silx/opencl/codec/bitshuffle_lz4.py index 72b3e96f52..8e823e88f9 100644 --- a/src/silx/opencl/codec/bitshuffle_lz4.py +++ b/src/silx/opencl/codec/bitshuffle_lz4.py @@ -34,7 +34,7 @@ __contact__ = "jerome.kieffer@esrf.eu" __license__ = "MIT" __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" -__date__ = "11/08/2023" +__date__ = "16/08/2023" __status__ = "production" import time @@ -179,7 +179,8 @@ def decompress(self, raw, out=None, wg=None, nbytes=None): __call__ = decompress -def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=None, profile=True): +def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=None, + profile=True, compaction=True): """Function that tests LZ4 analysis (i.e. the correctness of segments) on a dataset. :param data: some data to play with @@ -187,6 +188,7 @@ def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=No :param workgroup_size: size of the workgroup :param segments_size: by default, data_size/4 :param profile: tune on profiling for OpenCL + :param compaction: set to false to retrieve the raw segment before compaction :return: a set of segment containing: - position in the input stream - length of the littral section @@ -228,19 +230,19 @@ def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=No segment_posd = cla.to_device(queue, segment_pos) segments_d = cla.zeros(queue, (segments_size*num_workgroup,4), "int32") wgcnt_d = cla.to_device(queue, numpy.array([num_workgroup], "int32")) - output_d = cla.empty_like(data_d) output_size_d = cla.zeros(queue, num_workgroup, "int32") t2 = time.perf_counter_ns() - prg.test_multiblock(queue, (workgroup_size*num_workgroup,), (workgroup_size,), + prg.LZ4_cmp_stage1(queue, (workgroup_size*num_workgroup,), (workgroup_size,), data_d.data, numpy.int32(data_size), segment_posd.data, - segments_d.data, output_d.data, output_size_d.data, wgcnt_d.data + segments_d.data, numpy.int32(compaction), output_size_d.data, wgcnt_d.data ).wait() t3 = time.perf_counter_ns() - final_positons = segment_posd.get() segments = segments_d.get() - segments = segments[final_positons[0,0]:final_positons[0,1]] + if compaction: + final_positons = segment_posd.get() + segments = segments[final_positons[0,0]:final_positons[0,1]] t4 = time.perf_counter_ns() if 1: #profile: performances["python_setup"] = (t1-t0)*1e-6 @@ -249,47 +251,74 @@ def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=No performances["opencl_run"] = (t3-t2)*1e-6 performances["opencl_retrieve"] = (t4-t3)*1e-6 print(json.dumps(performances, indent=2)) + + if compaction: + compacted = segments + else: + compacted = _repack_segments(segments) + # Check validity: input indexes - inp_idx = segments[:,0] + inp_idx = compacted[:,0] res = numpy.where((inp_idx[1:]-inp_idx[:-1])<=0) + # if res[0].size: - if 1: + if True: print(f"Input position are all ascending except {res[0]}") # Check validity: input size size = segments[:,1:3].sum() - if 1: + if True: # if data.size != size: print(f"Input size matches, got {size}, expected {data.size}") # Check validity: input size (bis) - size = segments[-1,:-1].sum() + size = compacted[-1,:-1].sum() # if data.size != size: - if 1: + if True: print(f"Input size does match the end of segments, got {size}, expected {data.size}") # Check validity: output indexes - out_idx = segments[:,-1] + out_idx = compacted[:,-1] res = numpy.where((out_idx[1:]-out_idx[:-1])<=0) # if res[0].size: - if 1: - print(f"Output position are all ascending, except {res[0]}") + if True: + print(f"Output position are all ascending, except {res[0]}") #check for invalid segments, those have no matches, allowd only on last segment - match_size = segments[:-1,2] + match_size = compacted[:-1,2] res = numpy.where(match_size==0) - if 1: + if True: # if res[0].size: - print(f"Found empty match at {res[0]}") + print(f"Found empty match at {res[0]}") # Validate that match are all constant: + print(f"Non constant match section found at {_validate_content(data, compacted)}") + + return segments + + +def _validate_content(data, segments): + data = data.view('uint8') bad = {} for i,s in enumerate(segments): if s[2] == 0: continue start = s[0]+s[1] stop = start + s[2] res = numpy.where(data[start:stop]-data[start])[0] - if res.size: - bad[i] = res - print(f"Non constant match section found at {bad}") - - return segments + if res.size: + bad[i] = res + return bad + + +def _repack_segments(segments): + "repack a set of segments to be contiguous" + valid = numpy.where(segments.sum(axis=-1)!=0)[0] + repacked1 = segments[valid] + blocks = numpy.where(repacked1[:,-1]==0)[0] + sub_tot = 0 + repacked2 = repacked1.copy() + for start, stop in zip(blocks,numpy.concatenate((blocks[1:], [len(repacked1)]))): + repacked2[start:stop, -1] += sub_tot + sub_tot+=repacked1[stop-1,-1] + repacked3 = repacked2[numpy.where(repacked2[:,1:3].sum(axis=-1)!=0)[0]] + return repacked3 + \ No newline at end of file From 6862efbb48ea4fc8ca05f62ba97ff5ebab62a55d Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Thu, 17 Aug 2023 16:31:33 +0200 Subject: [PATCH 15/21] Validate the writer Second stage of the compressor --- src/silx/opencl/codec/bitshuffle_lz4.py | 71 +++++++- .../resources/opencl/codec/lz4_compression.cl | 151 ++++++++++++++++-- 2 files changed, 211 insertions(+), 11 deletions(-) diff --git a/src/silx/opencl/codec/bitshuffle_lz4.py b/src/silx/opencl/codec/bitshuffle_lz4.py index 8e823e88f9..7b1d9f5b78 100644 --- a/src/silx/opencl/codec/bitshuffle_lz4.py +++ b/src/silx/opencl/codec/bitshuffle_lz4.py @@ -34,7 +34,7 @@ __contact__ = "jerome.kieffer@esrf.eu" __license__ = "MIT" __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" -__date__ = "16/08/2023" +__date__ = "17/08/2023" __status__ = "production" import time @@ -321,4 +321,71 @@ def _repack_segments(segments): sub_tot+=repacked1[stop-1,-1] repacked3 = repacked2[numpy.where(repacked2[:,1:3].sum(axis=-1)!=0)[0]] return repacked3 - \ No newline at end of file + + +def test_lz4_writing(data, segments, workgroup_size=32, prepend_header=False, + profile=True): + """Function that tests LZ4 writing of a segmented dataset + + :param data: some data to play with + :param segments: array on int[:,4] + :param workgroup_size: size of the workgroup + :param profile: tune on profiling for OpenCL + :return: a comperssed datablock + + Prints out performance (measured from Python) in ms + """ + t0 = time.perf_counter_ns() + performances = {} + if isinstance(data, bytes): + data = numpy.frombuffer(data, "uint8") + else: + data = data.view("uint8") + + segments = segments.astype("int32") + + data_size = data.size + num_workgroup = segments.shape[0] + segment_pos = numpy.zeros(2, "int32") + segment_pos[1] = num_workgroup + + + # Opencl setup + t1 = time.perf_counter_ns() + ctx = pyopencl.create_some_context() + src_file = os.path.abspath(os.path.join(os.path.abspath(__file__),"../../../resources/opencl/codec/lz4_compression.cl")) + src = open(src_file).read() + prg = pyopencl.Program(ctx, src).build() + t1a = time.perf_counter_ns() + if profile: + queue = pyopencl.CommandQueue(ctx, properties=pyopencl.command_queue_properties.PROFILING_ENABLE) + else: + queue = pyopencl.CommandQueue(ctx) + + data_d = cla.to_device(queue, data) + segment_posd = cla.to_device(queue, segment_pos) + segments_d = cla.to_device(queue, segments) + output_d = cla.zeros(queue, int(1.1*data.nbytes), "uint8") + output_size_d = cla.to_device(queue, numpy.array([output_d.nbytes, 0], "int32")) + + t2 = time.perf_counter_ns() + prg.LZ4_cmp_stage2(queue, (workgroup_size*num_workgroup,), (workgroup_size,), + data_d.data, numpy.int32(data_size), + segment_posd.data, + segments_d.data, + output_d.data, + output_size_d.data, + numpy.int32(prepend_header) + ).wait() + t3 = time.perf_counter_ns() + buffer_size = output_size_d.get()[1] + compressed = output_d.get()[:buffer_size] + t4 = time.perf_counter_ns() + if 1: #profile: + performances["python_setup"] = (t1-t0)*1e-6 + performances["opencl_compilation"] = (t1a-t1)*1e-6 + performances["opencl_setup"] = (t2-t1a)*1e-6 + performances["opencl_run"] = (t3-t2)*1e-6 + performances["opencl_retrieve"] = (t4-t3)*1e-6 + print(json.dumps(performances, indent=2)) + return compressed diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 3f00cb1aa5..e789282666 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -248,25 +248,111 @@ inline int segmentation(int start, //index where scan4match did start // Build token, concatenation of a litteral and a match -inline uchar build_token(short4 segment){ +inline uchar build_token(int4 segment){ int lit = segment.s1; int mat = segment.s2; int token = ((lit & 15)<<4)|((mat-4)&15); return token; } + +// copy collaborative, return the position in output stream. +inline int copy_local(global uchar* dest, + const int dest_position, + local uchar* source, + const int src_position, + const int length){ + for (int i=get_local_id(0); i=input_size) || (start_cmp>=output_size)){// this segment read/write outsize boundaries + return -1; + } + + if (last_segment){ + litter += match; + match = 0; + segment.s1 = litter; + segment.s2 = match; +// if(tid==0)printf("last segment %d %d %d %d\n", segment.s0, segment.s1, segment.s2, segment.s3); + } + + //write token + int token_idx = start_cmp++; + if (litter >= 15){ + segment.s1 = 15; + rem = litter - 15; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; + } + if (match >= 19){ + segment.s2 = 19; + } + output_buffer[token_idx] = build_token(segment); + + //copy litteral. This is collaborative. + start_cmp = copy_global(output_buffer, start_cmp, + input_buffer, start_dec, litter); + + if (!last_segment){ // last block has no offset, nor match + //write offset, here always 1 in 16 bits little endian ! + output_buffer[start_cmp++] = 1; + output_buffer[start_cmp++] = 0; + + //write match overflow + if (match>=19){ + rem = match-19; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; + } + } + return start_cmp; +} + /* * Perform the actual compression by copying * @@ -303,10 +389,10 @@ inline int write_lz4(local uchar *buffer, if (match >= 19){ segment.s2 = 19; } - output_buffer[token_idx] = build_token(segment); + output_buffer[token_idx] = build_token((int4)(segment.s0, segment.s1, segment.s2, segment.s3)); //copy litteral. This is collaborative. - start_cmp = copy(output_buffer, start_cmp, + start_cmp = copy_local(output_buffer, start_cmp, buffer, segment.s0, litter); if ((continuation)||(i+1=segment_range.s1)) // out of range segment, should not occure ! + return; + int4 segment = segments[gid]; + if (prefix_header!=0){ + segment.s3 += 4; + if ((gid == 0) && (tid==0)){//write + output_buffer[0] = input_size & 0xFF; + output_buffer[1] = (input_size>>8) & 0xFF; + output_buffer[2] = (input_size>>16) & 0xFF; + output_buffer[3] = (input_size>>24) & 0xFF; + } + } + + if (gid+1==segment_range.s1){//last segment + r_size = write_segment(input_buffer, // buffer with input uncompressed data + input_size, // size of the data to be compressed + segment, // segment to be compressed + output_buffer, // destination buffer for compressed data + r_size, // + 1); + if (tid==0) output_size[1] = r_size; + } + else{ + write_segment(input_buffer, // buffer with input uncompressed data + input_size, // size of the data to be compressed + segment, // segment to be compressed + output_buffer, // destination buffer for compressed data + r_size, // + 0); + } +} From c887ff05bbae1cb66974da1ba7367b14f4454ce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Kieffer?= Date: Fri, 18 Aug 2023 16:14:33 +0200 Subject: [PATCH 16/21] Fix the segment_compaction part. works under nvidia GPU with 80k of data. --- .../resources/opencl/codec/lz4_compression.cl | 98 ++++++++++++------- 1 file changed, 64 insertions(+), 34 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index e789282666..4af8bf619e 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -489,14 +489,15 @@ inline int store_segments(local volatile short4 *local_segments, /* concatenate all segments (stored in global memory) in such a way that they are adjacent. * This function is to be called by the latest workgroup running. * + * Returns the number of segments and the number of bytes to be written. + * * There are tons of synchro since data are read and written from same buffer. */ -inline int concatenate_segments( +inline int2 concatenate_segments( global int2 *segment_ptr, // size = number of workgroup launched, contains start and stop position global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup - global int *output_size, // output buffer size, max in input, actual value in output - local volatile int *lsegment_idx, // index of segment offset, shared + local volatile int *shared_idx, // shared indexes with segment offset(0), output_idx(1) local volatile int4 *last_segment // shared memory with the last segment to share between threads ){ @@ -508,29 +509,32 @@ inline int concatenate_segments( // if (tid==0) printf("gid %d, running concat_segments \n", gid); int4 segment; barrier(CLK_GLOBAL_MEM_FENCE); - int output_idx = output_size[0]; - lsegment_idx[0] = segment_ptr[0].s1; - segment = segments[max(0, lsegment_idx[0]-1)]; - if ((tid==0) && (segment.s0>0) && (segment.s2==0) && (ng>1)){ - last_segment[0] = segment; - lsegment_idx[0] -= 1; + if (tid==0){ + shared_idx[0] = segment_ptr[0].s1; + shared_idx[1] = output_size[0]; + segment = segments[max(0, shared_idx[0]-1)]; + if ((segment.s0>0) && (segment.s2==0) && (ng>1)){ + last_segment[0] = segment; + shared_idx[0] -= 1; + } + else{ + last_segment[0] = (int4)(0,0,0,0); + } } - - last_segment[0] = (int4)(0,0,0,0); barrier(CLK_LOCAL_MEM_FENCE); -// if (tid==0) printf("groups range from 1 to %d. segment_idx=%d, output_ptr=%d\n",ng, lsegment_idx[0], output_idx); +// if (tid==0) printf("groups range from 1 to %d. segment_idx=%d, output_ptr=%d\n",ng, shared_idx[0], shared_idx[1]); for (int grp=1; grp0) && (last_segment[0].s2==0)){ segment = segments[seg_ptr.s0]; segment.s0 = last_segment[0].s0; segment.s1 = segment.s0+segment.s1-last_segment[0].s0; - output_idx += len_segment(segment)-len_segment(last_segment[0]); + shared_idx[1] += len_segment(segment)-len_segment(last_segment[0]); last_segment[0] = (int4)(0,0,0,0); segments[seg_ptr.s0] = segment; } @@ -543,34 +547,40 @@ inline int concatenate_segments( else segment = (int4)(0,0,0,0); barrier(CLK_GLOBAL_MEM_FENCE); - segment.s3+=output_idx; + segment.s3+=shared_idx[1]; if (i0) && (segment.s2==0)&&(grp+10) && (segment.s2==0) && (grp+1 Date: Fri, 18 Aug 2023 17:05:36 +0200 Subject: [PATCH 17/21] all profiling times in ms --- src/silx/opencl/codec/bitshuffle_lz4.py | 154 ++++++++++++------------ 1 file changed, 78 insertions(+), 76 deletions(-) diff --git a/src/silx/opencl/codec/bitshuffle_lz4.py b/src/silx/opencl/codec/bitshuffle_lz4.py index 7b1d9f5b78..9eaa0d5b37 100644 --- a/src/silx/opencl/codec/bitshuffle_lz4.py +++ b/src/silx/opencl/codec/bitshuffle_lz4.py @@ -34,7 +34,7 @@ __contact__ = "jerome.kieffer@esrf.eu" __license__ = "MIT" __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" -__date__ = "17/08/2023" +__date__ = "18/08/2023" __status__ = "production" import time @@ -62,7 +62,7 @@ class BitshuffleLz4(OpenclProcessing): :param dtype: dtype of decompressed data """ LZ4_BLOCK_SIZE = 8192 - + def __init__(self, cmp_size, dec_size, dtype, ctx=None, devicetype="all", platformid=None, deviceid=None, @@ -87,7 +87,7 @@ def __init__(self, cmp_size, dec_size, dtype, self.cmp_size = numpy.uint64(cmp_size) self.dec_size = numpy.uint64(dec_size) self.dec_dtype = numpy.dtype(dtype) - self.num_blocks = numpy.uint32((self.dec_dtype.itemsize*self.dec_size+self.LZ4_BLOCK_SIZE-1)//self.LZ4_BLOCK_SIZE) + self.num_blocks = numpy.uint32((self.dec_dtype.itemsize * self.dec_size + self.LZ4_BLOCK_SIZE - 1) // self.LZ4_BLOCK_SIZE) buffers = [BufferDescription("nb_blocks", 1, numpy.uint32, None), BufferDescription("block_position", self.num_blocks, numpy.uint64, None), @@ -140,18 +140,18 @@ def decompress(self, raw, out=None, wg=None, nbytes=None): dest_size = struct.unpack(">Q", raw[:8]) self_dest_nbyte = self.dec_size * self.dec_dtype.itemsize - if dest_sizeself_dest_nbyte: - num_blocks = numpy.uint32((dest_size+self.LZ4_BLOCK_SIZE-1) // self.LZ4_BLOCK_SIZE) - self.cl_mem["dec"] = pyopencl.array.empty(self.queue,dest_size , self.dec_dtype) + if dest_size < self_dest_nbyte: + num_blocks = numpy.uint32((dest_size + self.LZ4_BLOCK_SIZE - 1) // self.LZ4_BLOCK_SIZE) + elif dest_size > self_dest_nbyte: + num_blocks = numpy.uint32((dest_size + self.LZ4_BLOCK_SIZE - 1) // self.LZ4_BLOCK_SIZE) + self.cl_mem["dec"] = pyopencl.array.empty(self.queue, dest_size , self.dec_dtype) self.dec_size = dest_size // self.dec_dtype.itemsize else: num_blocks = self.num_blocks wg = int(wg or self.block_size) - evt = self.program.lz4_unblock(self.queue, (1,), (1,), + evt = self.program.lz4_unblock(self.queue, (1,), (1,), cmp_buffer, len_raw, self.cl_mem["block_position"].data, @@ -165,7 +165,7 @@ def decompress(self, raw, out=None, wg=None, nbytes=None): assert out.dtype == self.dec_dtype assert out.size == self.dec_size - evt = self.program.bslz4_decompress_block(self.queue, (self.num_blocks*wg,), (wg,), + evt = self.program.bslz4_decompress_block(self.queue, (self.num_blocks * wg,), (wg,), cmp_buffer, out.data, self.cl_mem["block_position"].data, @@ -179,7 +179,7 @@ def decompress(self, raw, out=None, wg=None, nbytes=None): __call__ = decompress -def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=None, +def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=None, profile=True, compaction=True): """Function that tests LZ4 analysis (i.e. the correctness of segments) on a dataset. @@ -202,22 +202,21 @@ def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=No if isinstance(data, bytes): data = numpy.frombuffer(data, "uint8") else: - data = data.view("uint8") + data = data.view("uint8").ravel() data_size = data.size - num_workgroup = (data_size+block_size-1)//block_size + num_workgroup = (data_size + block_size - 1) // block_size if segments_size is None: - segments_size = block_size//4 - - segment_pos = numpy.zeros((num_workgroup,2), "int32") - tmp_sp = numpy.arange(0,segments_size*(num_workgroup+1), segments_size) - segment_pos[:,0] = tmp_sp[:-1] - segment_pos[:,1] = tmp_sp[1:] - - + segments_size = block_size // 4 + + segment_pos = numpy.zeros((num_workgroup, 2), "int32") + tmp_sp = numpy.arange(0, segments_size * (num_workgroup + 1), segments_size) + segment_pos[:, 0] = tmp_sp[:-1] + segment_pos[:, 1] = tmp_sp[1:] + # Opencl setup t1 = time.perf_counter_ns() ctx = pyopencl.create_some_context() - src_file = os.path.abspath(os.path.join(os.path.abspath(__file__),"../../../resources/opencl/codec/lz4_compression.cl")) + src_file = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../../resources/opencl/codec/lz4_compression.cl")) src = open(src_file).read() prg = pyopencl.Program(ctx, src).build(options=f"-DBUFFER_SIZE={block_size} -DSEGMENT_SIZE={segments_size} -DWORKGROUP_SIZE={workgroup_size}") t1a = time.perf_counter_ns() @@ -225,31 +224,35 @@ def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=No queue = pyopencl.CommandQueue(ctx, properties=pyopencl.command_queue_properties.PROFILING_ENABLE) else: queue = pyopencl.CommandQueue(ctx) - + data_d = cla.to_device(queue, data) segment_posd = cla.to_device(queue, segment_pos) - segments_d = cla.zeros(queue, (segments_size*num_workgroup,4), "int32") + segments_d = cla.zeros(queue, (segments_size * num_workgroup, 4), "int32") wgcnt_d = cla.to_device(queue, numpy.array([num_workgroup], "int32")) output_size_d = cla.zeros(queue, num_workgroup, "int32") - + t2 = time.perf_counter_ns() - prg.LZ4_cmp_stage1(queue, (workgroup_size*num_workgroup,), (workgroup_size,), - data_d.data, numpy.int32(data_size), + evt = prg.LZ4_cmp_stage1(queue, (workgroup_size * num_workgroup,), (workgroup_size,), + data_d.data, numpy.int32(data_size), segment_posd.data, segments_d.data, numpy.int32(compaction), output_size_d.data, wgcnt_d.data - ).wait() + ) + evt.wait() t3 = time.perf_counter_ns() segments = segments_d.get() if compaction: final_positons = segment_posd.get() - segments = segments[final_positons[0,0]:final_positons[0,1]] + segments = segments[final_positons[0, 0]:final_positons[0, 1]] t4 = time.perf_counter_ns() - if 1: #profile: - performances["python_setup"] = (t1-t0)*1e-6 - performances["opencl_compilation"] = (t1a-t1)*1e-6 - performances["opencl_setup"] = (t2-t1a)*1e-6 - performances["opencl_run"] = (t3-t2)*1e-6 - performances["opencl_retrieve"] = (t4-t3)*1e-6 + if 1: # profile: + performances["python_setup"] = (t1 - t0) * 1e-6 + performances["opencl_compilation"] = (t1a - t1) * 1e-6 + performances["opencl_setup"] = (t2 - t1a) * 1e-6 + performances["opencl_run_python"] = (t3 - t2) * 1e-6 + if profile: + performances["opencl_run_profile"] = 1e-6 * (evt.profile.end - evt.profile.start) + performances["opencl_retrieve"] = (t4 - t3) * 1e-6 + print(json.dumps(performances, indent=2)) if compaction: @@ -258,70 +261,70 @@ def test_lz4_analysis(data, block_size=1024, workgroup_size=32, segments_size=No compacted = _repack_segments(segments) # Check validity: input indexes - inp_idx = compacted[:,0] - res = numpy.where((inp_idx[1:]-inp_idx[:-1])<=0) - + inp_idx = compacted[:, 0] + res = numpy.where((inp_idx[1:] - inp_idx[:-1]) <= 0) + # if res[0].size: if True: print(f"Input position are all ascending except {res[0]}") # Check validity: input size - size = segments[:,1:3].sum() + size = segments[:, 1:3].sum() if True: # if data.size != size: print(f"Input size matches, got {size}, expected {data.size}") - + # Check validity: input size (bis) size = compacted[-1,:-1].sum() # if data.size != size: if True: print(f"Input size does match the end of segments, got {size}, expected {data.size}") - + # Check validity: output indexes - out_idx = compacted[:,-1] - res = numpy.where((out_idx[1:]-out_idx[:-1])<=0) + out_idx = compacted[:, -1] + res = numpy.where((out_idx[1:] - out_idx[:-1]) <= 0) # if res[0].size: if True: print(f"Output position are all ascending, except {res[0]}") - - #check for invalid segments, those have no matches, allowd only on last segment - match_size = compacted[:-1,2] - res = numpy.where(match_size==0) + + # check for invalid segments, those have no matches, allowd only on last segment + match_size = compacted[:-1, 2] + res = numpy.where(match_size == 0) if True: # if res[0].size: print(f"Found empty match at {res[0]}") - + # Validate that match are all constant: print(f"Non constant match section found at {_validate_content(data, compacted)}") - + return segments def _validate_content(data, segments): data = data.view('uint8') bad = {} - for i,s in enumerate(segments): + for i, s in enumerate(segments): if s[2] == 0: continue - start = s[0]+s[1] + start = s[0] + s[1] stop = start + s[2] - res = numpy.where(data[start:stop]-data[start])[0] - if res.size: + res = numpy.where(data[start:stop] - data[start])[0] + if res.size: bad[i] = res return bad def _repack_segments(segments): "repack a set of segments to be contiguous" - valid = numpy.where(segments.sum(axis=-1)!=0)[0] + valid = numpy.where(segments.sum(axis=-1) != 0)[0] repacked1 = segments[valid] - blocks = numpy.where(repacked1[:,-1]==0)[0] + blocks = numpy.where(repacked1[:, -1] == 0)[0] sub_tot = 0 repacked2 = repacked1.copy() - for start, stop in zip(blocks,numpy.concatenate((blocks[1:], [len(repacked1)]))): + for start, stop in zip(blocks, numpy.concatenate((blocks[1:], [len(repacked1)]))): repacked2[start:stop, -1] += sub_tot - sub_tot+=repacked1[stop-1,-1] - repacked3 = repacked2[numpy.where(repacked2[:,1:3].sum(axis=-1)!=0)[0]] + sub_tot += repacked1[stop - 1, -1] + repacked3 = repacked2[numpy.where(repacked2[:, 1:3].sum(axis=-1) != 0)[0]] return repacked3 - + def test_lz4_writing(data, segments, workgroup_size=32, prepend_header=False, profile=True): @@ -341,19 +344,18 @@ def test_lz4_writing(data, segments, workgroup_size=32, prepend_header=False, data = numpy.frombuffer(data, "uint8") else: data = data.view("uint8") - + segments = segments.astype("int32") - + data_size = data.size num_workgroup = segments.shape[0] segment_pos = numpy.zeros(2, "int32") segment_pos[1] = num_workgroup - - + # Opencl setup t1 = time.perf_counter_ns() ctx = pyopencl.create_some_context() - src_file = os.path.abspath(os.path.join(os.path.abspath(__file__),"../../../resources/opencl/codec/lz4_compression.cl")) + src_file = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../../resources/opencl/codec/lz4_compression.cl")) src = open(src_file).read() prg = pyopencl.Program(ctx, src).build() t1a = time.perf_counter_ns() @@ -361,18 +363,18 @@ def test_lz4_writing(data, segments, workgroup_size=32, prepend_header=False, queue = pyopencl.CommandQueue(ctx, properties=pyopencl.command_queue_properties.PROFILING_ENABLE) else: queue = pyopencl.CommandQueue(ctx) - + data_d = cla.to_device(queue, data) segment_posd = cla.to_device(queue, segment_pos) segments_d = cla.to_device(queue, segments) - output_d = cla.zeros(queue, int(1.1*data.nbytes), "uint8") + output_d = cla.zeros(queue, int(1.1 * data.nbytes), "uint8") output_size_d = cla.to_device(queue, numpy.array([output_d.nbytes, 0], "int32")) - + t2 = time.perf_counter_ns() - prg.LZ4_cmp_stage2(queue, (workgroup_size*num_workgroup,), (workgroup_size,), - data_d.data, numpy.int32(data_size), + prg.LZ4_cmp_stage2(queue, (workgroup_size * num_workgroup,), (workgroup_size,), + data_d.data, numpy.int32(data_size), segment_posd.data, - segments_d.data, + segments_d.data, output_d.data, output_size_d.data, numpy.int32(prepend_header) @@ -381,11 +383,11 @@ def test_lz4_writing(data, segments, workgroup_size=32, prepend_header=False, buffer_size = output_size_d.get()[1] compressed = output_d.get()[:buffer_size] t4 = time.perf_counter_ns() - if 1: #profile: - performances["python_setup"] = (t1-t0)*1e-6 - performances["opencl_compilation"] = (t1a-t1)*1e-6 - performances["opencl_setup"] = (t2-t1a)*1e-6 - performances["opencl_run"] = (t3-t2)*1e-6 - performances["opencl_retrieve"] = (t4-t3)*1e-6 + if 1: # profile: + performances["python_setup"] = (t1 - t0) * 1e-6 + performances["opencl_compilation"] = (t1a - t1) * 1e-6 + performances["opencl_setup"] = (t2 - t1a) * 1e-6 + performances["opencl_run"] = (t3 - t2) * 1e-6 + performances["opencl_retrieve"] = (t4 - t3) * 1e-6 print(json.dumps(performances, indent=2)) return compressed From d6b79831a107be56c3e0d0d8b189b84d2d310800 Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Wed, 23 Aug 2023 17:26:23 +0200 Subject: [PATCH 18/21] several alternative implementations --- .../resources/opencl/codec/lz4_compression.cl | 227 +---- .../opencl/codec/lz4_compression_nocache.cl | 776 +++++++++++++++++ .../codec/lz4_compression_nocache_int32.cl | 784 ++++++++++++++++++ 3 files changed, 1584 insertions(+), 203 deletions(-) create mode 100644 src/silx/resources/opencl/codec/lz4_compression_nocache.cl create mode 100644 src/silx/resources/opencl/codec/lz4_compression_nocache_int32.cl diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 4af8bf619e..1f12984461 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -30,15 +30,15 @@ // This is used in tests to simplify the signature of those test kernels. #ifndef WORKGROUP_SIZE -#define WORKGROUP_SIZE 64 +#define WORKGROUP_SIZE 1024 #endif //segment size should be buffer_size/4 #ifndef SEGMENT_SIZE -#define SEGMENT_SIZE 256 +#define SEGMENT_SIZE 512 #endif #ifndef BUFFER_SIZE -#define BUFFER_SIZE 1024 +#define BUFFER_SIZE 16384 #endif #ifndef MIN_MATCH #define MIN_MATCH 4 @@ -138,11 +138,11 @@ inline int compact_segments(local volatile short4 *segments, /* This function scans the input data searching for litterals and matches. return the end-of-scan position. */ -inline int scan4match( local uchar *buffer, // buffer with input data in it, as large as possible, limited by shared memory space. - int start, - int stop, - local short *match_buffer, // size of the wg is enough - volatile local int* cnt // size 1 is enough, idx0: largest index value found +inline int scan4match(local uchar *buffer, // buffer with input data in it, as large as possible, limited by shared memory space. + int start, + int stop, + local short *match_buffer, // size of the wg is enough + volatile local int* cnt // size 1 is enough, idx0: largest index value found ){ int wg = get_local_size(0);// workgroup size @@ -151,6 +151,7 @@ inline int scan4match( local uchar *buffer, // buffer with input data in cnt[0] = 0; // memset match_buffer +// if (tid==0)printf("workgroup size is %d\n",WORKGROUP_SIZE); match_buffer[tid] = -1; barrier(CLK_LOCAL_MEM_FENCE); int i; // position index @@ -353,66 +354,6 @@ inline int write_segment(global uchar *input_buffer, // buffer with input uncomp return start_cmp; } -/* - * Perform the actual compression by copying - * - * return the end-position in the output stream - */ -inline int write_lz4(local uchar *buffer, - local volatile short4 *segments, // size of the workgroup - int nb_segments, - int start_cmp, - global uchar *output_buffer, - int stop, //output buffer max size - int continuation // set to 0 to indicate this is the last segment - ) -{ - for (int i=0; i= 15){ - segment.s1 = 15; - rem = litter - 15; - while (rem>=255){ - output_buffer[start_cmp++] = 255; - rem -= 255; - } - output_buffer[start_cmp++] = rem; - } - if (match >= 19){ - segment.s2 = 19; - } - output_buffer[token_idx] = build_token((int4)(segment.s0, segment.s1, segment.s2, segment.s3)); - - //copy litteral. This is collaborative. - start_cmp = copy_local(output_buffer, start_cmp, - buffer, segment.s0, litter); - - if ((continuation)||(i+1=19){ - rem = segment.s2-19; - while (rem>=255){ - output_buffer[start_cmp++] = 255; - rem -= 255; - } - output_buffer[start_cmp++] = rem; - } - } - }//loop over segments - return start_cmp; -} // calculate the length of a segment in compressed form inline int len_segment(int4 segment){ @@ -582,48 +523,7 @@ inline int2 concatenate_segments( return (int2) (shared_idx[0], shared_idx[1]); } // end concatenate_segments -/* Main kernel for lz4 compression - */ -kernel void lz4_cmp( global uchar *input_buffer, - int input_size, - global uchar *output_buffer, - int output_size, - global uchar *output_ptr, // Length of all output from different wg - global int *running_grp, // counter with the number of wg still running - local uchar *buffer, - int buffer_size, - local short *match_buffer, // size of the buffer - local volatile short4 *segments // contains: start of segment (uncompressed), number of litterals, number of match (offset is enforced to 1) and start of segment (compressed) - ){ - int tid = get_local_id(0); // thread id - int gid = get_group_id(0); // group id - int wg = get_local_size(0);// workgroup size - //copy input data to buffer - int actual_buffer_size = min(buffer_size, input_size - ((gid+1) * buffer_size)); - int start_block = gid * buffer_size; - for (int i=tid; i1) - out_ptr = write_lz4(lbuffer, lsegments, - res2-1, // -1? to keep the last for concatenation - out_ptr, output,max_out, 1); - - barrier(CLK_LOCAL_MEM_FENCE); - if (tid == 0){ - seg[1] += res2-1; - lsegments[0] = lsegments[res2-1]; - seg[0] = 1; -// short4 seg = lsegments[0]; - } - barrier(CLK_LOCAL_MEM_FENCE); - //memset local segments above first one, - if (tid>1) lsegments[tid] = (short4)(0,0,0,0); - barrier(CLK_LOCAL_MEM_FENCE); - start = res; - } - barrier(CLK_LOCAL_MEM_FENCE); - if (tid == 0){ - short4 segment = lsegments[0]; - segment.s1 += segment.s2; - segment.s2 = 0; - lsegments[0] = segment; - - segments[seg[1]++] = segment; - nbsegment[0] = seg[1]; - printf("last segment %d %d %d %d\n", segment.s0, segment.s1, segment.s2, segment.s3); - } - // write last segment - - out_ptr = write_lz4(lbuffer, lsegments, - 1, out_ptr, output, max_out, 0); - - output_size[0] = out_ptr; - - -} // kernel to test the function `concatenate_segments`, run on only one workgroup kernel void test_concatenate_segments( @@ -889,6 +705,8 @@ kernel void test_concatenate_segments( // segment description: s0: position in input buffer s1: number of litterals, s2: number of match, s3: position in output buffer kernel void LZ4_cmp_stage1(global uchar *buffer, int input_size, + local uchar *lbuffer, // local buffer of size block_size for caching buffer. + int block_size, // size of the block global int2 *segment_ptr, // size = number of workgroup launched, contains start and stop position global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup int final_compaction, // set to 0 to prevent the final compaction. allows the analysis of intermediate results @@ -898,7 +716,7 @@ kernel void LZ4_cmp_stage1(global uchar *buffer, local volatile int seg[2]; // #0:number of segments in local mem, #1 in global mem local volatile int cnt[2]; // end position of the scan local volatile short4 lsegments[SEGMENT_SIZE]; - local uchar lbuffer[BUFFER_SIZE]; +// local uchar lbuffer[BUFFER_SIZE]; local short lmatch[WORKGROUP_SIZE]; local volatile int4 last_segment[1]; @@ -908,6 +726,11 @@ kernel void LZ4_cmp_stage1(global uchar *buffer, int wg = get_local_size(0);// workgroup size int ng = get_num_groups(0);// number of groups +// if (BUFFER_SIZE BUFFER_SIZE (%d): Aborting!!!\n",block_size, BUFFER_SIZE); +// } + int output_block_size = 0; int output_idx = output_block_size*gid; int2 seg_ptr = segment_ptr[gid]; @@ -915,15 +738,13 @@ kernel void LZ4_cmp_stage1(global uchar *buffer, int segment_max = seg_ptr.s1; // if (tid==0)printf("gid %d writes segments in range %d-%d\n", gid, segment_idx, segment_max); int local_start = 0; - int global_start = BUFFER_SIZE*gid; - int local_stop = min(BUFFER_SIZE, input_size - global_start); + int global_start = block_size * gid; + int local_stop = min(block_size, input_size - global_start); if (local_stop<=0){ if (tid==0)printf("gid %d local_stop: %d \n",gid, local_stop); return; } - -// int actual_buffer_size = min(BUFFER_SIZE, local_stop) ; - + int watchdog = (local_stop + wg-1)/wg; //prevent code from running way ! int res, res2, out_ptr=0, max_out=output_size[0]; diff --git a/src/silx/resources/opencl/codec/lz4_compression_nocache.cl b/src/silx/resources/opencl/codec/lz4_compression_nocache.cl new file mode 100644 index 0000000000..7b95da3df2 --- /dev/null +++ b/src/silx/resources/opencl/codec/lz4_compression_nocache.cl @@ -0,0 +1,776 @@ +/* + * Project: SILX: Bitshuffle LZ4 compressor + * + * Copyright (C) 2023 European Synchrotron Radiation Facility + * Grenoble, France + * + * Principal authors: J. Kieffer (kieffer@esrf.fr) + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +// This is used in tests to simplify the signature of those test kernels. +#ifndef WORKGROUP_SIZE +#define WORKGROUP_SIZE 128 +#endif + +//segment size should be buffer_size/4 +#ifndef SEGMENT_SIZE +#define SEGMENT_SIZE 256 +#endif + +#ifndef BUFFER_SIZE +#define BUFFER_SIZE 16384 +#endif +#ifndef MIN_MATCH +#define MIN_MATCH 4 +#endif + + + +// short compare and swap function used in sort_odd_even +inline short8 _order_short4(short4 a, short4 b){ + return (a.s0=w) && (tid0) && (pid=MIN_MATCH)){ + segments[atomic_inc(cnt)] = (short4)(pid+1, 0, here, 1); + } else +// if ((here==0) && (there>0) && (tid>5) && match_buffer[tid-5]>4){ + if ((here==0) && (there>0) && (tid>=MIN_MATCH) && match_buffer[tid-MIN_MATCH]>=MIN_MATCH){ + segments[atomic_inc(cnt)] = (short4)(pid+1, 0, 0, 0); + } + } + barrier(CLK_LOCAL_MEM_FENCE); + + if (cnt[0] == 1){ + // nothing occured in considered + if (tid == 0){ + // nothing occured, just complete former segment + short4 seg=segments[0]; + if (seg.s2 == 0){ //there was no match, just complete the former segment + seg.s1 += end-start; + segments[0] = seg; + } + else{ // noting occured, but former segment has already some match ! + if (tid==0){ + segments[atomic_inc(cnt)] = (short4)(start, end-start, 0, 0); + } + } + } + } + else{ + // sort segments + sort_odd_even(0, cnt[0], segments); + //add end position as a litteral + if (tid==0){ + segments[cnt[0]] = (short4)(end, 0, 0, 0); + atomic_inc(cnt); + } + barrier(CLK_LOCAL_MEM_FENCE); + cnt[0] = compact_segments(segments, cnt); + } + barrier(CLK_LOCAL_MEM_FENCE); + return cnt[0]; +} + + +// Build token, concatenation of a litteral and a match +inline uchar build_token(int4 segment){ + int lit = segment.s1; + int mat = segment.s2; + int token = ((lit & 15)<<4)|((mat-4)&15); + return token; +} + + +// copy collaborative, return the position in output stream. +inline int copy_global(global uchar* dest, + const int dest_position, + global uchar* source, + const int src_position, + const int length){ + for (int i=get_local_id(0); i=input_size) || (start_cmp>=output_size)){// this segment read/write outsize boundaries + return -1; + } + + if (last_segment){ + litter += match; + match = 0; + segment.s1 = litter; + segment.s2 = match; +// if(tid==0)printf("last segment %d %d %d %d\n", segment.s0, segment.s1, segment.s2, segment.s3); + } + + //write token + int token_idx = start_cmp++; + if (litter >= 15){ + segment.s1 = 15; + rem = litter - 15; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; + } + if (match >= 19){ + segment.s2 = 19; + } + output_buffer[token_idx] = build_token(segment); + + //copy litteral. This is collaborative. + start_cmp = copy_global(output_buffer, start_cmp, + input_buffer, start_dec, litter); + + if (!last_segment){ // last block has no offset, nor match + //write offset, here always 1 in 16 bits little endian ! + output_buffer[start_cmp++] = 1; + output_buffer[start_cmp++] = 0; + + //write match overflow + if (match>=19){ + rem = match-19; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; + } + } + return start_cmp; +} + + +// calculate the length of a segment in compressed form +inline int len_segment(int4 segment){ + int lit = segment.s1; + int mat = segment.s2-4; + int size = 3+lit; + if (lit>=15){ + size++; + lit -= 15; + } + while (lit>255){ + size++; + lit-=255; + } + if (mat>=15){ + size++; + mat -= 15; + } + while (mat>255){ + size++; + mat-=255; + } + return size; +} + + +/* store several local segments into the global memory starting at position. + * return the position in the output stream + */ +inline int store_segments(local volatile short4 *local_segments, + int nb_segments, + global int4 *global_segments, + int max_idx, // last position achievable in global segment array + int global_idx, + int input_stream_idx, + int output_stream_idx, + int block_size, // size of the block under analysis + int last, // set to true to concatenate the match and the litteral for last block + local volatile int* cnt //size=1 is eough + ){ + cnt[0] = output_stream_idx; + barrier(CLK_LOCAL_MEM_FENCE); + if (global_idx!=max_idx){ + //this is serial for conviniance ! + if (get_local_id(0)==0){ + for (int i=0; i %d\n", get_group_id(0), global_idx, max_idx, segment.s0, block_size); + segment.s1 = block_size - segment.s0; + segment.s2 = 0; + } + // manage last segment in block + if (last){ + segment.s1+=segment.s2; + segment.s2 = 0; + } + segment.s0 += input_stream_idx; + segment.s3 = output_stream_idx; + + output_stream_idx += len_segment(segment); + global_segments[global_idx++]=segment; + if (emergency) break; + } + cnt[0] = output_stream_idx; + } + barrier(CLK_LOCAL_MEM_FENCE); + } + return cnt[0]; +} + + +/* concatenate all segments (stored in global memory) in such a way that they are adjacent. + * This function is to be called by the latest workgroup running. + * + * Returns the number of segments and the number of bytes to be written. + * + * There are tons of synchro since data are read and written from same buffer. + */ +inline int2 concatenate_segments( + global int2 *segment_ptr, // size = number of workgroup launched, contains start and stop position + global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + global int *output_size, // output buffer size, max in input, actual value in output + local volatile int *shared_idx, // shared indexes with segment offset(0), output_idx(1) + local volatile int4 *last_segment // shared memory with the last segment to share between threads + ){ + + int tid = get_local_id(0); // thread id + int gid = get_group_id(0); // group id + int wg = get_local_size(0);// workgroup size + int ng = get_num_groups(0);// number of groups + +// if (tid==0) printf("gid %d, running concat_segments \n", gid); + int4 segment; + barrier(CLK_GLOBAL_MEM_FENCE); + if (tid==0){ + shared_idx[0] = segment_ptr[0].s1; + shared_idx[1] = output_size[0]; + segment = segments[max(0, shared_idx[0]-1)]; + if ((segment.s0>0) && (segment.s2==0) && (ng>1)){ + last_segment[0] = segment; + shared_idx[0] -= 1; + } + else{ + last_segment[0] = (int4)(0,0,0,0); + } + } + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0) printf("groups range from 1 to %d. segment_idx=%d, output_ptr=%d\n",ng, shared_idx[0], shared_idx[1]); + for (int grp=1; grp0) && (last_segment[0].s2==0)){ + segment = segments[seg_ptr.s0]; + segment.s0 = last_segment[0].s0; + segment.s1 = segment.s0+segment.s1-last_segment[0].s0; + shared_idx[1] += len_segment(segment)-len_segment(last_segment[0]); + last_segment[0] = (int4)(0,0,0,0); + segments[seg_ptr.s0] = segment; + } + barrier(CLK_LOCAL_MEM_FENCE); + for (int i=low; i0) && (segment.s2==0) && (grp+11) lsegments[tid] = (short4)(0,0,0,0); + barrier(CLK_LOCAL_MEM_FENCE); + local_start = res; + } + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0)printf("gid %d store final segments\n",gid); + output_idx = store_segments(lsegments, 1, // last segment is treated here + segments, segment_max, segment_idx, global_start, output_idx, global_stop, gid+1==ng, cnt); + output_size[gid] = output_idx; + seg_ptr.s1 = ++segment_idx; + segment_ptr[gid] = seg_ptr; + + barrier(CLK_LOCAL_MEM_FENCE); + barrier(CLK_GLOBAL_MEM_FENCE); + // last group running performs the cumsum and compaction of indices + if (tid==0){ + cnt[0] = (atomic_dec(wgcnt)==1); + } + barrier(CLK_LOCAL_MEM_FENCE); + if (cnt[0] && final_compaction){ + int2 end_ptr = concatenate_segments(segment_ptr, // size = number of workgroup launched, contains start and stop position + segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + output_size, // output buffer size, max in input, actual value in output + cnt, // index of segment offset, shared + last_segment // shared memory with the last segment to share between threads + ); + } +} + + +// kernel launched with one segment per workgroup. If the segment has large litterals, having many threads per group is interesting. + +kernel void LZ4_cmp_stage2(global uchar *input_buffer, // bufffer with data to be compressed + int input_size, // size of the daa to be compressed + global int2 *segment_ptr, // size = 1 (often more) contains start and stop position of segment + global int4 *segments, // size defined by segment_ptr + global uchar *output_buffer,// destination buffer for compressed data + global int *output_size, // output buffer size, pre-filled (not updated if prefix_header), size should be at least 2 + int prefix_header // if set, put in header the input buffer size (increases the output_size[0] by 4) +){ + int gid = get_group_id(0); + int tid = get_local_id(0); + int wg = get_local_size(0); + int r_size = output_size[0]; + int2 segment_range = segment_ptr[0]; + if ((gid=segment_range.s1)) // out of range segment, should not occure ! + return; + int4 segment = segments[gid]; + if (prefix_header!=0){ + segment.s3 += 4; + if ((gid == 0) && (tid==0)){//write + output_buffer[0] = input_size & 0xFF; + output_buffer[1] = (input_size>>8) & 0xFF; + output_buffer[2] = (input_size>>16) & 0xFF; + output_buffer[3] = (input_size>>24) & 0xFF; + } + } + + if (gid+1==segment_range.s1){//last segment + r_size = write_segment(input_buffer, // buffer with input uncompressed data + input_size, // size of the data to be compressed + segment, // segment to be compressed + output_buffer, // destination buffer for compressed data + r_size, // + 1); + if (tid==0) output_size[1] = r_size; + } + else{ + write_segment(input_buffer, // buffer with input uncompressed data + input_size, // size of the data to be compressed + segment, // segment to be compressed + output_buffer, // destination buffer for compressed data + r_size, // + 0); + } +} diff --git a/src/silx/resources/opencl/codec/lz4_compression_nocache_int32.cl b/src/silx/resources/opencl/codec/lz4_compression_nocache_int32.cl new file mode 100644 index 0000000000..75ba2b93f2 --- /dev/null +++ b/src/silx/resources/opencl/codec/lz4_compression_nocache_int32.cl @@ -0,0 +1,784 @@ +/* + * Project: SILX: Bitshuffle LZ4 compressor + * + * Copyright (C) 2023 European Synchrotron Radiation Facility + * Grenoble, France + * + * Principal authors: J. Kieffer (kieffer@esrf.fr) + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + + + +// This is used in tests to simplify the signature of those test kernels. +#ifndef WORKGROUP_SIZE +#define WORKGROUP_SIZE 128 +#endif + +#ifndef SEGMENT_SIZE +#define SEGMENT_SIZE 128 +#endif + +#ifndef BUFFER_SIZE +#define BUFFER_SIZE 16384 +#endif + +#ifndef MIN_MATCH +#define MIN_MATCH 4 +#endif + + + +// short compare and swap function used in sort_odd_even_int4 +inline int8 _order_int4(int4 a, int4 b){ + return (a.s0=w) && (tid0) && (pid=MIN_MATCH)){ + segments[atomic_inc(cnt)] = (int4)(pid+1, 0, here, 1); + } else +// if ((here==0) && (there>0) && (tid>5) && match_buffer[tid-5]>4){ + if ((here==0) && (there>0) && (tid>=MIN_MATCH) && match_buffer[tid-MIN_MATCH]>=MIN_MATCH){ + segments[atomic_inc(cnt)] = (int4)(pid+1, 0, 0, 0); + } + } + barrier(CLK_LOCAL_MEM_FENCE); + + if (cnt[0] == 1){ + // nothing occured in considered + if (tid == 0){ + // nothing occured, just complete former segment + int4 seg=segments[0]; + if (seg.s2 == 0){ //there was no match, just complete the former segment + seg.s1 += end-start; + segments[0] = seg; + } + else{ // noting occured, but former segment has already some match ! + if (tid==0){ + segments[atomic_inc(cnt)] = (int4)(start, end-start, 0, 0); + } + } + } + } + else{ + // sort segments + sort_odd_even_int4(0, cnt[0], segments); + //add end position as a litteral + if (tid==0){ + segments[cnt[0]] = (int4)(end, 0, 0, 0); + atomic_inc(cnt); + } + barrier(CLK_LOCAL_MEM_FENCE); + cnt[0] = compact_segments_int4(segments, cnt); + } + barrier(CLK_LOCAL_MEM_FENCE); + return cnt[0]; +} + + +// Build token, concatenation of a litteral and a match +inline uchar build_token(int4 segment){ + int lit = segment.s1; + int mat = segment.s2; + int token = ((lit & 15)<<4)|((mat-4)&15); + return token; +} + + +// copy collaborative, return the position in output stream. +inline int copy_global(global uchar* dest, + const int dest_position, + global uchar* source, + const int src_position, + const int length){ + for (int i=get_local_id(0); i=input_size) || (start_cmp>=output_size)){// this segment read/write outsize boundaries + return -1; + } + + if (last_segment){ + litter += match; + match = 0; + segment.s1 = litter; + segment.s2 = match; +// if(tid==0)printf("last segment %d %d %d %d\n", segment.s0, segment.s1, segment.s2, segment.s3); + } + + //write token + int token_idx = start_cmp++; + if (litter >= 15){ + segment.s1 = 15; + rem = litter - 15; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; + } + if (match >= 19){ + segment.s2 = 19; + } + output_buffer[token_idx] = build_token(segment); + + //copy litteral. This is collaborative. + start_cmp = copy_global(output_buffer, start_cmp, + input_buffer, start_dec, litter); + + if (!last_segment){ // last block has no offset, nor match + //write offset, here always 1 in 16 bits little endian ! + output_buffer[start_cmp++] = 1; + output_buffer[start_cmp++] = 0; + + //write match overflow + if (match>=19){ + rem = match-19; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; + } + } + return start_cmp; +} + + +// calculate the length of a segment in compressed form +inline int len_segment(int4 segment){ + int lit = segment.s1; + int mat = segment.s2-4; + int size = 3+lit; + if (lit>=15){ + size++; + lit -= 15; + } + while (lit>255){ + size++; + lit-=255; + } + if (mat>=15){ + size++; + mat -= 15; + } + while (mat>255){ + size++; + mat-=255; + } + return size; +} + + +/* store several local segments into the global memory starting at position. + * return the position in the output stream + */ +inline int store_segments_int4(local volatile int4 *local_segments, + int nb_segments, + global int4 *global_segments, + int max_idx, // last position achievable in global segment array + int global_idx, + int input_stream_idx, + int output_stream_idx, + int block_size, // size of the block under analysis + int last, // set to true to concatenate the match and the litteral for last block + local volatile int* cnt //size=1 is eough + ){ + cnt[0] = output_stream_idx; + barrier(CLK_LOCAL_MEM_FENCE); + if (global_idx!=max_idx){ + //this is serial for conviniance ! + if (get_local_id(0)==0){ + for (int i=0; i %d\n", (int)get_group_id(0), global_idx, max_idx, segment.s0, block_size); + segment.s1 = block_size - segment.s0; + segment.s2 = 0; + } + // manage last segment in block + if (last){ + segment.s1+=segment.s2; + segment.s2 = 0; + } + segment.s0 += input_stream_idx; + segment.s3 = output_stream_idx; + + output_stream_idx += len_segment(segment); + global_segments[global_idx++]=segment; + if (emergency) break; + } + cnt[0] = output_stream_idx; + } + barrier(CLK_LOCAL_MEM_FENCE); + } + return cnt[0]; +} + + +/* concatenate all segments (stored in global memory) in such a way that they are adjacent. + * This function is to be called by the latest workgroup running. + * + * Returns the number of segments and the number of bytes to be written. + * + * There are tons of synchro since data are read and written from same buffer. + */ +inline int2 concatenate_segments( + global int2 *segment_ptr, // size = number of workgroup launched, contains start and stop position + global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + global int *output_size, // output buffer size, max in input, actual value in output + local volatile int *shared_idx, // shared indexes with segment offset(0), output_idx(1) + local volatile int4 *last_segment // shared memory with the last segment to share between threads + ){ + + int tid = get_local_id(0); // thread id + int gid = get_group_id(0); // group id + int wg = get_local_size(0);// workgroup size + int ng = get_num_groups(0);// number of groups + +// if (tid==0) printf("gid %d, running concat_segments \n", gid); + int4 segment; + barrier(CLK_GLOBAL_MEM_FENCE); + if (tid==0){ + shared_idx[0] = segment_ptr[0].s1; + shared_idx[1] = output_size[0]; + segment = segments[max(0, shared_idx[0]-1)]; + if ((segment.s0>0) && (segment.s2==0) && (ng>1)){ + last_segment[0] = segment; + shared_idx[0] -= 1; + } + else{ + last_segment[0] = (int4)(0,0,0,0); + } + } + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0) printf("groups range from 1 to %d. segment_idx=%d, output_ptr=%d\n",ng, shared_idx[0], shared_idx[1]); + for (int grp=1; grp0) && (last_segment[0].s2==0)){ + segment = segments[seg_ptr.s0]; + segment.s0 = last_segment[0].s0; + segment.s1 = segment.s0+segment.s1-last_segment[0].s0; + shared_idx[1] += len_segment(segment)-len_segment(last_segment[0]); + last_segment[0] = (int4)(0,0,0,0); + segments[seg_ptr.s0] = segment; + } + barrier(CLK_LOCAL_MEM_FENCE); + for (int i=low; i0) && (segment.s2==0) && (grp+11) lsegments[tid] = (int4)(0,0,0,0); + barrier(CLK_LOCAL_MEM_FENCE); + local_start = res; + } + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0)printf("gid %d store final segments\n",gid); + output_idx = store_segments_int4(lsegments, 1, // last segment is treated here + segments, segment_max, segment_idx, global_start, output_idx, global_stop, gid+1==ng, cnt); + output_size[gid] = output_idx; + seg_ptr.s1 = ++segment_idx; + segment_ptr[gid] = seg_ptr; + + barrier(CLK_LOCAL_MEM_FENCE); + barrier(CLK_GLOBAL_MEM_FENCE); + // last group running performs the cumsum and compaction of indices + if (tid==0){ + cnt[0] = (atomic_dec(wgcnt)==1); + } + barrier(CLK_LOCAL_MEM_FENCE); + if (cnt[0] && final_compaction){ + int2 end_ptr = concatenate_segments(segment_ptr, // size = number of workgroup launched, contains start and stop position + segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + output_size, // output buffer size, max in input, actual value in output + cnt, // index of segment offset, shared + last_segment // shared memory with the last segment to share between threads + ); + } +} + + +// kernel launched with one segment per workgroup. If the segment has large litterals, having many threads per group is interesting. + +kernel void LZ4_cmp_stage2(global uchar *input_buffer, // bufffer with data to be compressed + int input_size, // size of the daa to be compressed + global int2 *segment_ptr, // size = 1 (often more) contains start and stop position of segment + global int4 *segments, // size defined by segment_ptr + global uchar *output_buffer,// destination buffer for compressed data + global int *output_size, // output buffer size, pre-filled (not updated if prefix_header), size should be at least 2 + int prefix_header // if set, put in header the input buffer size (increases the output_size[0] by 4) +){ + int gid = get_group_id(0); + int tid = get_local_id(0); + int wg = get_local_size(0); + int r_size = output_size[0]; + int2 segment_range = segment_ptr[0]; + if ((gid=segment_range.s1)) // out of range segment, should not occure ! + return; + int4 segment = segments[gid]; + if (prefix_header!=0){ + segment.s3 += 4; + if ((gid == 0) && (tid==0)){//write + output_buffer[0] = input_size & 0xFF; + output_buffer[1] = (input_size>>8) & 0xFF; + output_buffer[2] = (input_size>>16) & 0xFF; + output_buffer[3] = (input_size>>24) & 0xFF; + } + } + + if (gid+1==segment_range.s1){//last segment + r_size = write_segment(input_buffer, // buffer with input uncompressed data + input_size, // size of the data to be compressed + segment, // segment to be compressed + output_buffer, // destination buffer for compressed data + r_size, // + 1); + if (tid==0) output_size[1] = r_size; + } + else{ + write_segment(input_buffer, // buffer with input uncompressed data + input_size, // size of the data to be compressed + segment, // segment to be compressed + output_buffer, // destination buffer for compressed data + r_size, // + 0); + } +} From 6ffe4c73645f52bd7858606f06c34868828276f9 Mon Sep 17 00:00:00 2001 From: Jerome Kieffer Date: Thu, 24 Aug 2023 17:23:40 +0200 Subject: [PATCH 19/21] WIP --- .../resources/opencl/codec/lz4_compression.cl | 48 +- .../opencl/codec/lz4_compression_int16.cl | 955 ++++++++++++++++++ 2 files changed, 998 insertions(+), 5 deletions(-) create mode 100644 src/silx/resources/opencl/codec/lz4_compression_int16.cl diff --git a/src/silx/resources/opencl/codec/lz4_compression.cl b/src/silx/resources/opencl/codec/lz4_compression.cl index 1f12984461..031b003f4c 100644 --- a/src/silx/resources/opencl/codec/lz4_compression.cl +++ b/src/silx/resources/opencl/codec/lz4_compression.cl @@ -44,7 +44,9 @@ #define MIN_MATCH 4 #endif - +/*************************** + * Odd-Even Sort algorithm * + ***************************/ // short compare and swap function used in sort_odd_even inline short8 _order_short4(short4 a, short4 b){ @@ -86,12 +88,38 @@ inline void sort_odd_even(int start, } } -/* Compact litterals and matches into segments containing a litteral and a match section (non null) + +/************************************** + * Cumsum based on Hillis Steele Scan * + **************************************/ +// calculate the cumulative sum of element in the array inplace. +inline void cumsum_short(local volatile short *array, + int size){ + int oid, tid = get_local_id(0); + short here, there; + for (int offset = 1; offset < size; offset *= 2){ + here = (tid < size) ? array[tid] : 0; + oid = tid-offset; + there = ((tid < size)&&(oid>=0)) ? array[oid] : 0; + barrier(CLK_LOCAL_MEM_FENCE); + if (tid= offset) + array[tid] = here+there; + else + array[tid] = here; + } + barrier(CLK_LOCAL_MEM_FENCE); + } +} + + +/* ***************************************************************************************************** + * Compact litterals and matches into segments containing a litteral and a match section (non null) * * After the scan, begining of litterals and of match are noted and stored in segments. * In this function one takes 2 segments, starting with a litteral and concatenate the subsequent match * as a consequence, the number of segments is divided by 2 ! - */ + *******************************************************************************************************/ inline int compact_segments(local volatile short4 *segments, local volatile int* cnt){ int tid = get_local_id(0); // thread id @@ -396,7 +424,7 @@ inline int store_segments(local volatile short4 *local_segments, cnt[0] = output_stream_idx; barrier(CLK_LOCAL_MEM_FENCE); if (global_idx!=max_idx){ - //this is serial for conviniance ! + //this is serial for conviniance ! TODO: can be partially parallelized. if (get_local_id(0)==0){ for (int i=0; i=0)) ? array[oid] : 0; + barrier(CLK_LOCAL_MEM_FENCE); + if (tid= offset) + array[tid] = here+there; + else + array[tid] = here; + } + barrier(CLK_LOCAL_MEM_FENCE); + } +} + + +/* ***************************************************************************************************** + * Compact litterals and matches into segments containing a litteral and a match section (non null) + * + * After the scan, begining of litterals and of match are noted and stored in segments. + * In this function one takes 2 segments, starting with a litteral and concatenate the subsequent match + * as a consequence, the number of segments is divided by 2 ! + *******************************************************************************************************/ +inline int compact_segments(local volatile short4 *segments, + local volatile int* cnt){ + int tid = get_local_id(0); // thread id + int nb = cnt[0]; + short4 merge, current, next; + int w = 0; //write positions + int r = 0; //read position + //single threaded for safety ... + if (tid == 0){ + short4 current, next, merged; + current = segments[r++]; + while (r=w) && (tid=15){ + size++; + lit -= 15; + } + while (lit>255){ + size++; + lit-=255; + } + if (mat>=15){ + size++; + mat -= 15; + } + while (mat>255){ + size++; + mat-=255; + } + return size; +} + + +// fill the s3 index with the compressed size of each segment +inline void calculate_output_size(local volatile short4 *segments,// size of the workgroup + int start, int stop){ + int tid = get_local_id(0); + if ((tid>=start) && (tid0) && (pid=MIN_MATCH)){ + segments[atomic_inc(cnt)] = (short4)(pid+1, 0, here, 1); + } else +// if ((here==0) && (there>0) && (tid>5) && match_buffer[tid-5]>4){ + if ((here==0) && (there>0) && (tid>=MIN_MATCH) && match_buffer[tid-MIN_MATCH]>=MIN_MATCH){ + segments[atomic_inc(cnt)] = (short4)(pid+1, 0, 0, 0); + } + } + barrier(CLK_LOCAL_MEM_FENCE); + + if (cnt[0] == 1){ + // nothing occured in considered + if (tid == 0){ + // nothing occured, just complete former segment + short4 seg=segments[0]; + if (seg.s2 == 0){ //there was no match, just complete the former segment + seg.s1 += end-start; + segments[0] = seg; + } + else{ // noting occured, but former segment has already some match ! + if (tid==0){ + segments[atomic_inc(cnt)] = (short4)(start, end-start, 0, 0); + } + } + } + } + else{ + // sort segments + sort_odd_even(0, cnt[0], segments); + //add end position as a litteral + if (tid==0){ + segments[cnt[0]] = (short4)(end, 0, 0, 0); + atomic_inc(cnt); + } + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0){ +// printf("after match scan, before compaction, cnt=%d start=%d end=%d stop=%d\n",cnt[0], start, end, stop); +// } + // compact segments + cnt[0] = compact_segments(segments, cnt); + } + barrier(CLK_LOCAL_MEM_FENCE); + // update the segment with the compressed size: + calculate_output_size(segments, 0, cnt[0]); + return cnt[0]; +} + + +// Build token, concatenation of a litteral and a match +inline uchar build_token(int4 segment){ + int lit = segment.s1; + int mat = segment.s2; + int token = ((lit & 15)<<4)|((mat-4)&15); + return token; +} + + +// copy collaborative, return the position in output stream. +inline int copy_local(global uchar* dest, + const int dest_position, + local uchar* source, + const int src_position, + const int length){ + for (int i=get_local_id(0); i=input_size) || (start_cmp>=output_size)){// this segment read/write outsize boundaries + return -1; + } + + if (last_segment){ + litter += match; + match = 0; + segment.s1 = litter; + segment.s2 = match; +// if(tid==0)printf("last segment %d %d %d %d\n", segment.s0, segment.s1, segment.s2, segment.s3); + } + + //write token + int token_idx = start_cmp++; + if (litter >= 15){ + segment.s1 = 15; + rem = litter - 15; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; + } + if (match >= 19){ + segment.s2 = 19; + } + output_buffer[token_idx] = build_token(segment); + + //copy litteral. This is collaborative. + start_cmp = copy_global(output_buffer, start_cmp, + input_buffer, start_dec, litter); + + if (!last_segment){ // last block has no offset, nor match + //write offset, here always 1 in 16 bits little endian ! + output_buffer[start_cmp++] = 1; + output_buffer[start_cmp++] = 0; + + //write match overflow + if (match>=19){ + rem = match-19; + while (rem>=255){ + output_buffer[start_cmp++] = 255; + rem -= 255; + } + output_buffer[start_cmp++] = rem; + } + } + return start_cmp; +} + + +// calculate the length of a segment in compressed form +inline int len_segment(int4 segment){ + int lit = segment.s1; + int mat = segment.s2-4; + int size = 3+lit; + if (lit>=15){ + size++; + lit -= 15; + } + while (lit>255){ + size++; + lit-=255; + } + if (mat>=15){ + size++; + mat -= 15; + } + while (mat>255){ + size++; + mat-=255; + } + return size; +} + +/* store several local segments into the global memory starting at position. + * return the position in the output stream + */ +inline int store_segments(local volatile short4 *local_segments, + int nb_segments, + global short4 *global_segments, + int max_idx, // last position achievable in global segment array + int global_idx, + int input_stream_idx, + int output_stream_idx, + int block_size, // size of the block under analysis + int last, // set to true to concatenate the match and the litteral for last block + local volatile int* cnt, //size=2 is needed, index 0 for size, index 1 for emergency + local short* tmparray // size: workgroup size + ){ + int tid = get_local_id(0); + cnt[0] = output_stream_idx; + cnt[1] = 0; + short4 segment; + barrier(CLK_LOCAL_MEM_FENCE); + if (global_idx %d\n", get_group_id(0), global_idx, max_idx, segment.s0, block_size); + segment.s1 = block_size - segment.s0; + segment.s2 = 0; + segment.s3 = length_segment(segment); + } + // manage last segment in block, i.e. transform match in litteral. + if ((last) && (tid+1==nb_segments)){ + segment.s1+=segment.s2; + segment.s2 = 0; + segment.s3 = length_segment(segment); + } + tmparray[tid] = segment.s3; + } + else tmparray[tid] = 0; + cumsum_short(tmparray, nb_segments); + nb_segments = cnt[1]?cnt[1]:nb_segments; + if (tid==0){ + segment.s3 = output_stream_idx; + } + else if (tid0) && (segment.s2==0) && (ng>1)){ + last_segment[0] = segment; + shared_idx[0] -= 1; + } + else{ + last_segment[0] = (int4)(0,0,0,0); + } + } + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0) printf("groups range from 1 to %d. segment_idx=%d, output_ptr=%d\n",ng, shared_idx[0], shared_idx[1]); + for (int grp=1; grp0) && (last_segment[0].s2==0)){ + segment = segments[seg_ptr.s0]; + segment.s0 = last_segment[0].s0; + segment.s1 = segment.s0+segment.s1-last_segment[0].s0; + shared_idx[1] += len_segment(segment)-len_segment(last_segment[0]); + last_segment[0] = (int4)(0,0,0,0); + segments[seg_ptr.s0] = segment; + } + barrier(CLK_LOCAL_MEM_FENCE); + for (int i=low; i0) && (segment.s2==0) && (grp+1 %d to memory %d-%d\n",res2-1,seg[1], seg[1]+res2-1); + barrier(CLK_LOCAL_MEM_FENCE); + if (tid == 0){ + seg[1] += res2-1; + lsegments[0] = lsegments[res2-1]; + seg[0] = 1; + short4 seg = lsegments[0]; + printf("copied seg[0] (was %d) (%d, %d, %d, %d)\n",res2-1,seg.s0,seg.s1,seg.s2,seg.s3); + } + barrier(CLK_LOCAL_MEM_FENCE); + //memset local segments above first one, + if (tid>1) lsegments[tid] = (short4)(0,0,0,0); + barrier(CLK_LOCAL_MEM_FENCE); + start = res; + if (tid==5)printf("end of loop, start=%d res=%d size=%d\n\n", start, res, actual_buffer_size); + } + barrier(CLK_LOCAL_MEM_FENCE); + if (tid == 0){ + segments[seg[1]++] = lsegments[0]; + nbsegment[0] = seg[1]; + printf("last copy, total segments: %d\n", seg[1]); + } +} + + +// kernel to test the function `concatenate_segments`, run on only one workgroup +kernel void test_concatenate_segments( + global int2 *segment_ptr, // size = number of workgroup launched, contains start and stop position + global int4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + global int *output_size // output buffer size, max in input, actual value in output, size should be at least the + ){ + local volatile int cnt[2]; //0:segment_ptr, 1:output_ptr + local volatile int4 last_segment[1]; + + int gid = get_group_id(0); // group id + int tid = get_local_id(0); // thread id + if (gid == 0){ + int2 end_ptr = concatenate_segments(segment_ptr, // size = number of workgroup launched, contains start and stop position + segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + output_size, // output buffer size, max in input, actual value in output + cnt, // index of segment offset, shared + last_segment // shared memory with the last segment to share between threads + ); + } +} + +// kernel to test multiple blocks in parallel with last to finish which manages the junction between blocks +// segment description: s0: position in input buffer s1: number of litterals, s2: number of match, s3: size/position in output buffer +kernel void LZ4_cmp_stage1(global uchar *buffer, + int input_size, + local uchar *lbuffer, // local buffer of size block_size for caching buffer. + int block_size, // size of the block + global int4 *block_ptr, // size = number of workgroup launched, i.e. number of LZ4-blocks. contains, start+end segment, start+end write + global short4 *segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup + int final_compaction, // set to 0 to prevent the final compaction. allows the analysis of intermediate results + global int *output_size, // output buffer size, max in input, actual value in output, size should be at least the + global int *wgcnt // counter with workgroups still running +){ + local volatile int seg[2]; // #0:number of segments in local mem, #1 in global mem + local volatile int cnt[2]; // end position of the scan + local volatile short4 lsegments[WORKGROUP_SIZE]; + local short lmatch[WORKGROUP_SIZE]; + local volatile short4 last_segment[1]; + + + int tid = get_local_id(0); // thread id + int gid = get_group_id(0); // group id + int wg = get_local_size(0);// workgroup size + int ng = get_num_groups(0);// number of groups + + int output_block_size = 0; + int output_idx = output_block_size*gid; + int4 seg_ptr = block_ptr[gid]; + int segment_idx = seg_ptr.s0; + int segment_max = seg_ptr.s1; + int local_start = 0; + int global_start = block_size * gid; + int local_stop = min(block_size, input_size - global_start); + if (local_stop<=0){ + if (tid==0)printf("gid %d local_stop: %d \n",gid, local_stop); + return; + } + + int watchdog = (local_stop + wg-1)/wg; //prevent code from running way ! + int res, res2, out_ptr=0, max_out=output_size[0]; + + //copy input to local buffer + for (int i=tid; i1) lsegments[tid] = (short4)(0,0,0,0); + barrier(CLK_LOCAL_MEM_FENCE); + local_start = res; + + } + barrier(CLK_LOCAL_MEM_FENCE); +// if (tid==0)printf("gid %d store final segments\n",gid); + output_idx = store_segments(lsegments, 1, // last segment is treated here + segments, segment_max, segment_idx, global_start, output_idx, local_stop, gid+1==ng, cnt, lmatch); + output_size[gid] = output_idx; + seg_ptr.s1 = ++segment_idx; + seg_ptr.s3 = output_idx; + block_ptr[gid] = seg_ptr; + + barrier(CLK_LOCAL_MEM_FENCE); + barrier(CLK_GLOBAL_MEM_FENCE); + // last group running performs the cumsum and compaction of indices + if (tid==0){ + cnt[0] = (atomic_dec(wgcnt)==1); + } +// barrier(CLK_LOCAL_MEM_FENCE); +// if (cnt[0] && final_compaction){//TODO: redo +// int2 end_ptr = concatenate_segments(block_ptr, // size = number of workgroup launched, contains start and stop position +// segments, // size of the block-size (i.e. 1-8k !wg) / 4 * number of workgroup +// output_size, // output buffer size, max in input, actual value in output +// cnt, // index of segment offset, shared +// last_segment // shared memory with the last segment to share between threads +// ); +// } +} + + +// kernel launched with one block per workgroup. +//If the segment has large litterals, having many threads per group is interesting. + +kernel void LZ4_cmp_stage2(global uchar *input_buffer, // bufffer with data to be compressed + int input_size, // size of the data to be compressed + int block_size, // size of each block + global int4 *block_ptr, // size = numblocks, contains contains the start and end index in segment array and start and end position in the output array + global short4 *segments, // size defined by segment_ptr, constains segments relative to the begining on the block + global uchar *output_buffer, // destination buffer for compressed data + global int *output_size, // size of the destination buffer + int prefix_header // if set, put in header the input buffer size (increases the output_size[0] by 4) +){ + int gid = get_group_id(0); + int tid = get_local_id(0); + int wg = get_local_size(0); + int ng = get_num_groups(0); + int4 segment_range = block_ptr[gid]; + int input_offset = block_size*gid; + int output_offset = segment_range.s2 + (prefix_header) ? 4 : 0; + short4 short_segment; + int4 int_segment; + int r_size = output_size[0]; + + if (prefix_header){ + if ((gid == 0) && (tid==0)){//write + output_buffer[0] = input_size & 0xFF; + output_buffer[1] = (input_size>>8) & 0xFF; + output_buffer[2] = (input_size>>16) & 0xFF; + output_buffer[3] = (input_size>>24) & 0xFF; + } + } + + for (int i=segment_range.s0; i Date: Fri, 25 Aug 2023 15:20:21 +0200 Subject: [PATCH 20/21] almost working --- .../opencl/codec/lz4_compression_int16.cl | 98 +++++++++++++------ 1 file changed, 69 insertions(+), 29 deletions(-) diff --git a/src/silx/resources/opencl/codec/lz4_compression_int16.cl b/src/silx/resources/opencl/codec/lz4_compression_int16.cl index e372f3a258..02613a038f 100644 --- a/src/silx/resources/opencl/codec/lz4_compression_int16.cl +++ b/src/silx/resources/opencl/codec/lz4_compression_int16.cl @@ -113,6 +113,27 @@ inline void cumsum_short(local volatile short *array, } } +// calculate the cumulative sum of element in the array inplace. +inline void cumsum_int(local volatile int *array, + int size){ + int oid, tid = get_local_id(0); + int here, there; + barrier(CLK_LOCAL_MEM_FENCE); + for (int offset = 1; offset < size; offset *= 2){ + here = (tid < size) ? array[tid] : 0; + oid = tid-offset; + there = ((tid < size)&&(oid>=0)) ? array[oid] : 0; + barrier(CLK_LOCAL_MEM_FENCE); + if (tid= offset) + array[tid] = here+there; + else + array[tid] = here; + } + barrier(CLK_LOCAL_MEM_FENCE); + } +} + /* ***************************************************************************************************** * Compact litterals and matches into segments containing a litteral and a match section (non null) @@ -604,6 +625,30 @@ inline int2 concatenate_segments( return (int2) (shared_idx[0], shared_idx[1]); } // end concatenate_segments + +/* function to perform the cumsum at the end: + * - make the cumulative sum of start-stop indexes for outgoing buffer position in block_ptr + */ +inline void wrap_up(global int *output_size, // size = number of workgroup launched, i.e. number of LZ4-blocks. contains, start+end segment, start+end write + local int *temp + ){ + int tid = get_local_id(0); + int ng = get_num_groups(0); + int wg = get_local_size(0); + int max_iter = (ng+wg-1)/wg; + int prefix = 0; + int pos = tid; + for (int i=0; i Date: Fri, 25 Aug 2023 17:15:32 +0200 Subject: [PATCH 21/21] add event --- src/silx/opencl/codec/bitshuffle_lz4.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/silx/opencl/codec/bitshuffle_lz4.py b/src/silx/opencl/codec/bitshuffle_lz4.py index 9eaa0d5b37..008723ee2c 100644 --- a/src/silx/opencl/codec/bitshuffle_lz4.py +++ b/src/silx/opencl/codec/bitshuffle_lz4.py @@ -34,7 +34,7 @@ __contact__ = "jerome.kieffer@esrf.eu" __license__ = "MIT" __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" -__date__ = "18/08/2023" +__date__ = "22/08/2023" __status__ = "production" import time @@ -371,14 +371,15 @@ def test_lz4_writing(data, segments, workgroup_size=32, prepend_header=False, output_size_d = cla.to_device(queue, numpy.array([output_d.nbytes, 0], "int32")) t2 = time.perf_counter_ns() - prg.LZ4_cmp_stage2(queue, (workgroup_size * num_workgroup,), (workgroup_size,), + evt = prg.LZ4_cmp_stage2(queue, (workgroup_size * num_workgroup,), (workgroup_size,), data_d.data, numpy.int32(data_size), segment_posd.data, segments_d.data, output_d.data, output_size_d.data, numpy.int32(prepend_header) - ).wait() + ) + evt.wait() t3 = time.perf_counter_ns() buffer_size = output_size_d.get()[1] compressed = output_d.get()[:buffer_size] @@ -388,6 +389,8 @@ def test_lz4_writing(data, segments, workgroup_size=32, prepend_header=False, performances["opencl_compilation"] = (t1a - t1) * 1e-6 performances["opencl_setup"] = (t2 - t1a) * 1e-6 performances["opencl_run"] = (t3 - t2) * 1e-6 + if profile: + performances["opencl_run_profile"] = 1e-6 * (evt.profile.end - evt.profile.start) performances["opencl_retrieve"] = (t4 - t3) * 1e-6 print(json.dumps(performances, indent=2)) return compressed