Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-33278 Fix //version race condition in parquetType.ecl #19440

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 55 additions & 48 deletions testing/regress/ecl/parquetTypes.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
//version compressionType='LZ4'
//version compressionType='ZSTD'

import STD.File AS FileServices;
import ^ as root;
compressionType := #IFDEFINED(root.compressionType, 'UNCOMPRESSED');

IMPORT Std;
IMPORT Parquet;

dropzoneDirectory := Std.File.GetDefaultDropZone();
dropzoneDirectory := Std.File.GetDefaultDropZone() + '/regress/parquet/' + WORKUNIT + '-';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just pondering about is this name (dropzoneDirectory) still correct or not, because now it contains not only the default DZ path but a file path and prefix as well. I feel it is a bit misleading, but hopefully not a big deal.


// ======================== BOOLEAN ========================

Expand All @@ -39,9 +40,7 @@ booleanDatasetOut := DATASET([
{001, 'aab', FALSE}
], booleanRecord);

ParquetIO.Write(booleanDatasetOut, dropzoneDirectory + '/BooleanTest.parquet', TRUE, compressionType);

booleanDatasetIn := ParquetIO.Read(booleanRecord, dropzoneDirectory + '/BooleanTest.parquet');
booleanDatasetIn := ParquetIO.Read(booleanRecord, dropzoneDirectory + 'BooleanTest.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} booleanJoin (booleanDatasetOut a, booleanDatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand Down Expand Up @@ -71,9 +70,7 @@ integerDatasetOut := DATASET([
{20, 'ack', 13, 114, 'ack', 13}
], integerRecord);

ParquetIO.Write(integerDatasetOut, dropzoneDirectory + '/IntegerTest.parquet', TRUE, compressionType);

integerDatasetIn := ParquetIO.Read(integerRecord, dropzoneDirectory + '/IntegerTest.parquet');
integerDatasetIn := ParquetIO.Read(integerRecord, dropzoneDirectory + 'IntegerTest.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} integerJoin (integerDatasetOut a, integerDatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand All @@ -98,9 +95,7 @@ unsignedDatasetOut := DATASET([
{25, 'aff', (UNSIGNED8)18446744073709551615}
], unsignedRecord);

ParquetIO.Write(unsignedDatasetOut, dropzoneDirectory + '/UnsignedTest.parquet', TRUE, compressionType);

unsignedDatasetIn := ParquetIO.Read(unsignedRecord, dropzoneDirectory + '/UnsignedTest.parquet');
unsignedDatasetIn := ParquetIO.Read(unsignedRecord, dropzoneDirectory + 'UnsignedTest.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} unsignedJoin (unsignedDatasetOut a, unsignedDatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand Down Expand Up @@ -128,9 +123,7 @@ realDatasetOut := DATASET([
{39, 'afi', (REAL4)3.14159}
], realRecord);

ParquetIO.Write(realDatasetOut, dropzoneDirectory + '/RealTest.parquet', TRUE, compressionType);

realDatasetIn := ParquetIO.Read(realRecord, dropzoneDirectory + '/RealTest.parquet');
realDatasetIn := ParquetIO.Read(realRecord, dropzoneDirectory + 'RealTest.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} realJoin (realDatasetOut a, realDatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand All @@ -152,9 +145,7 @@ decimalDatasetOut := DATASET([
{042, 'abb', 0.00D}
], decimalRecord);

ParquetIO.Write(decimalDatasetOut, dropzoneDirectory + '/DecimalTest.parquet', TRUE, compressionType);

decimalDatasetIn := ParquetIO.Read(decimalRecord, dropzoneDirectory + '/DecimalTest.parquet');
decimalDatasetIn := ParquetIO.Read(decimalRecord, dropzoneDirectory + 'DecimalTest.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} decimalJoin (decimalDatasetOut a, decimalDatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand Down Expand Up @@ -182,9 +173,7 @@ stringDatasetOut := DATASET([
{'adt', '[\'A\',\'B\',\'C\',\'D\',\'E\']'}
], stringRecord);

ParquetIO.Write(stringDatasetOut, dropzoneDirectory + '/StringTest.parquet', TRUE, compressionType);

stringDatasetIn := ParquetIO.Read(stringRecord, dropzoneDirectory + '/StringTest.parquet');
stringDatasetIn := ParquetIO.Read(stringRecord, dropzoneDirectory + 'StringTest.parquet');

{STRING5 name, BOOLEAN value} stringJoin (stringDatasetOut a, stringDatasetIn b) := TRANSFORM
SELF.name := a.name;
Expand Down Expand Up @@ -218,9 +207,7 @@ dataDatasetOut := DATASET([
{'neg', X'0000000000000000', REALToBinary(-2.71828), REALToLargeBinary(-2.71828)}
], dataRecord);

ParquetIO.Write(dataDatasetOut, dropzoneDirectory + '/DataTest.parquet', TRUE, compressionType);

dataDatasetIn := ParquetIO.Read(dataRecord, dropzoneDirectory + '/DataTest.parquet');
dataDatasetIn := ParquetIO.Read(dataRecord, dropzoneDirectory + 'DataTest.parquet');

{STRING5 name, BOOLEAN value1, BOOLEAN value2, BOOLEAN value3, BOOLEAN overallValue} dataJoin(dataDatasetOut a, dataDatasetIn b) := TRANSFORM
SELF.name := a.name;
Expand All @@ -243,9 +230,7 @@ varStringDatasetOut := DATASET([
{072, 'abo', U'UTF8_测试'}
], varstringRecord);

ParquetIO.Write(varStringDatasetOut, dropzoneDirectory + '/VarStringTest.parquet', TRUE, compressionType);

varStringDatasetIn := ParquetIO.Read(varstringRecord, dropzoneDirectory + '/VarStringTest.parquet');
varStringDatasetIn := ParquetIO.Read(varstringRecord, dropzoneDirectory + 'VarStringTest.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} varstringJoin (varStringDatasetOut a, varStringDatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand All @@ -267,9 +252,7 @@ qStringDatasetOut := DATASET([
{082, 'abt', U'Special_字符'}
], qstringRecord);

ParquetIO.Write(qStringDatasetOut, dropzoneDirectory + '/QStringTest.parquet', TRUE, compressionType);

qStringDatasetIn := ParquetIO.Read(qstringRecord, dropzoneDirectory + '/QStringTest.parquet');
qStringDatasetIn := ParquetIO.Read(qstringRecord, dropzoneDirectory + 'QStringTest.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} qstringJoin (qStringDatasetOut a, qStringDatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand All @@ -291,9 +274,7 @@ utf8DatasetOut := DATASET([
{092, 'abw', U''}
], utf8Record);

ParquetIO.Write(utf8DatasetOut, dropzoneDirectory + '/UTF8Test.parquet', TRUE, compressionType);

utf8DatasetIn := ParquetIO.Read(utf8Record, dropzoneDirectory + '/UTF8Test.parquet');
utf8DatasetIn := ParquetIO.Read(utf8Record, dropzoneDirectory + 'UTF8Test.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} utf8Join (utf8DatasetOut a, utf8DatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand All @@ -320,11 +301,8 @@ setOfUnicodeDatasetOut := DATASET([
{105, 'ady', [U'☀', U'☁', U'☂', U'☃', U'☄']}
], setOfUnicodeRecord);

ParquetIO.Write(unicodeDatasetOut, dropzoneDirectory + '/UnicodeTest.parquet', TRUE, compressionType);
ParquetIO.Write(setOfUnicodeDatasetOut, dropzoneDirectory + '/SetOfUnicodeTest.parquet', TRUE, compressionType);

unicodeDatasetIn := ParquetIO.Read(unicodeRecord, dropzoneDirectory + '/UnicodeTest.parquet');
setOfUnicodeDatasetIn := ParquetIO.Read(setOfUnicodeRecord, dropzoneDirectory + '/SetOfUnicodeTest.parquet');
unicodeDatasetIn := ParquetIO.Read(unicodeRecord, dropzoneDirectory + 'UnicodeTest.parquet');
setOfUnicodeDatasetIn := ParquetIO.Read(setOfUnicodeRecord, dropzoneDirectory + 'SetOfUnicodeTest.parquet');

{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} unicodeJoin(unicodeDatasetOut a, unicodeDatasetIn b) := TRANSFORM
SELF.testid := a.testid;
Expand All @@ -335,16 +313,45 @@ END;
unicodeCompareResult := JOIN(unicodeDatasetOut, unicodeDatasetIn, LEFT.testid = RIGHT.testid, unicodeJoin(LEFT, RIGHT), ALL);
unicodeResult := IF(COUNT(unicodeCompareResult(isEqual = FALSE)) = 0, 'Pass', 'Fail');

PARALLEL(
OUTPUT(booleanResult, NAMED('BooleanTest'), OVERWRITE),
OUTPUT(integerResult, NAMED('IntegerTest'), OVERWRITE),
OUTPUT(unsignedResult, NAMED('UnsignedTest'), OVERWRITE),
OUTPUT(realResult, NAMED('RealTest'), OVERWRITE),
OUTPUT(decimalResult, NAMED('DecimalTest'), OVERWRITE),
OUTPUT(stringResult, NAMED('StringTest'), OVERWRITE),
OUTPUT(dataResult, NAMED('DataAsStringTest'), OVERWRITE),
OUTPUT(varStringResult, NAMED('VarStringTest'), OVERWRITE),
OUTPUT(qStringResult, NAMED('QStringTest'), OVERWRITE),
OUTPUT(utf8Result, NAMED('UTF8Test'), OVERWRITE),
OUTPUT(unicodeResult, NAMED('UnicodeTest'), OVERWRITE)
SEQUENTIAL(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you reorganise the write file then read it back parallel for each type from previous version to this parallel write and sequential read? Is the ParquetIO.Read() not thread safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was because I was seeing some weird behavior with the SEQUENTIAL call. Before I explicitly ordered the Parquet.Write actions to come before the OUTPUTs, the DeleteExternalFile actions would occur first despite coming after the OUTPUTs in the list of SEQUENTIAL arguments. This meant the cleanup of the files happened before they were written and read from leaving behind every file that was used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Read is thread safe, but the reason for the sequential read is because the result order needs to be the same to match the key file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. Fair enough.

This Morning I found 2 more errors related to parquetType test:

557. parquetTypes(compressionType='GZip') 	Error: 0: parquet: IOError: Couldn't deserialize thrift: TProtocolException: Invalid data
558. parquetTypes(compressionType='Brotli')	Error: 0: parquet: IOError: Couldn't deserialize thrift: TProtocolException: Invalid data

What do you think about are those errors related to versions race condition as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so. I think that would be the result of reading from a file while something is in the middle of writing a record batch to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks

// Set up test files
PARALLEL(
ParquetIO.Write(booleanDatasetOut, dropzoneDirectory + 'BooleanTest.parquet', TRUE, compressionType),
ParquetIO.Write(integerDatasetOut, dropzoneDirectory + 'IntegerTest.parquet', TRUE, compressionType),
ParquetIO.Write(unsignedDatasetOut, dropzoneDirectory + 'UnsignedTest.parquet', TRUE, compressionType),
ParquetIO.Write(realDatasetOut, dropzoneDirectory + 'RealTest.parquet', TRUE, compressionType),
ParquetIO.Write(decimalDatasetOut, dropzoneDirectory + 'DecimalTest.parquet', TRUE, compressionType),
ParquetIO.Write(stringDatasetOut, dropzoneDirectory + 'StringTest.parquet', TRUE, compressionType),
ParquetIO.Write(dataDatasetOut, dropzoneDirectory + 'DataTest.parquet', TRUE, compressionType),
ParquetIO.Write(varStringDatasetOut, dropzoneDirectory + 'VarStringTest.parquet', TRUE, compressionType),
ParquetIO.Write(qStringDatasetOut, dropzoneDirectory + 'QStringTest.parquet', TRUE, compressionType),
ParquetIO.Write(utf8DatasetOut, dropzoneDirectory + 'UTF8Test.parquet', TRUE, compressionType),
ParquetIO.Write(unicodeDatasetOut, dropzoneDirectory + 'UnicodeTest.parquet', TRUE, compressionType),
),
// Read and compare results
OUTPUT(booleanResult, NAMED('BooleanTest')),
OUTPUT(integerResult, NAMED('IntegerTest')),
OUTPUT(unsignedResult, NAMED('UnsignedTest')),
OUTPUT(realResult, NAMED('RealTest')),
OUTPUT(decimalResult, NAMED('DecimalTest')),
OUTPUT(stringResult, NAMED('StringTest')),
OUTPUT(dataResult, NAMED('DataAsStringTest')),
OUTPUT(varStringResult, NAMED('VarStringTest')),
OUTPUT(qStringResult, NAMED('QStringTest')),
OUTPUT(utf8Result, NAMED('UTF8Test')),
OUTPUT(unicodeResult, NAMED('UnicodeTest')),
// Clean up temporary files
PARALLEL(
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'BooleanTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'IntegerTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'UnsignedTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'RealTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'DecimalTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'StringTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'DataTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'VarStringTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'QStringTest.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'UTF8Test.parquet'),
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'UnicodeTest.parquet'),
)
);
Loading