-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-33278 Fix //version race condition in parquetType.ecl #19440
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,13 +22,14 @@ | |
//version compressionType='LZ4' | ||
//version compressionType='ZSTD' | ||
|
||
import STD.File AS FileServices; | ||
import ^ as root; | ||
compressionType := #IFDEFINED(root.compressionType, 'UNCOMPRESSED'); | ||
|
||
IMPORT Std; | ||
IMPORT Parquet; | ||
|
||
dropzoneDirectory := Std.File.GetDefaultDropZone(); | ||
dropzoneDirectory := Std.File.GetDefaultDropZone() + '/regress/parquet/' + WORKUNIT + '-'; | ||
|
||
// ======================== BOOLEAN ======================== | ||
|
||
|
@@ -39,9 +40,7 @@ booleanDatasetOut := DATASET([ | |
{001, 'aab', FALSE} | ||
], booleanRecord); | ||
|
||
ParquetIO.Write(booleanDatasetOut, dropzoneDirectory + '/BooleanTest.parquet', TRUE, compressionType); | ||
|
||
booleanDatasetIn := ParquetIO.Read(booleanRecord, dropzoneDirectory + '/BooleanTest.parquet'); | ||
booleanDatasetIn := ParquetIO.Read(booleanRecord, dropzoneDirectory + 'BooleanTest.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} booleanJoin (booleanDatasetOut a, booleanDatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -71,9 +70,7 @@ integerDatasetOut := DATASET([ | |
{20, 'ack', 13, 114, 'ack', 13} | ||
], integerRecord); | ||
|
||
ParquetIO.Write(integerDatasetOut, dropzoneDirectory + '/IntegerTest.parquet', TRUE, compressionType); | ||
|
||
integerDatasetIn := ParquetIO.Read(integerRecord, dropzoneDirectory + '/IntegerTest.parquet'); | ||
integerDatasetIn := ParquetIO.Read(integerRecord, dropzoneDirectory + 'IntegerTest.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} integerJoin (integerDatasetOut a, integerDatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -98,9 +95,7 @@ unsignedDatasetOut := DATASET([ | |
{25, 'aff', (UNSIGNED8)18446744073709551615} | ||
], unsignedRecord); | ||
|
||
ParquetIO.Write(unsignedDatasetOut, dropzoneDirectory + '/UnsignedTest.parquet', TRUE, compressionType); | ||
|
||
unsignedDatasetIn := ParquetIO.Read(unsignedRecord, dropzoneDirectory + '/UnsignedTest.parquet'); | ||
unsignedDatasetIn := ParquetIO.Read(unsignedRecord, dropzoneDirectory + 'UnsignedTest.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} unsignedJoin (unsignedDatasetOut a, unsignedDatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -128,9 +123,7 @@ realDatasetOut := DATASET([ | |
{39, 'afi', (REAL4)3.14159} | ||
], realRecord); | ||
|
||
ParquetIO.Write(realDatasetOut, dropzoneDirectory + '/RealTest.parquet', TRUE, compressionType); | ||
|
||
realDatasetIn := ParquetIO.Read(realRecord, dropzoneDirectory + '/RealTest.parquet'); | ||
realDatasetIn := ParquetIO.Read(realRecord, dropzoneDirectory + 'RealTest.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} realJoin (realDatasetOut a, realDatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -152,9 +145,7 @@ decimalDatasetOut := DATASET([ | |
{042, 'abb', 0.00D} | ||
], decimalRecord); | ||
|
||
ParquetIO.Write(decimalDatasetOut, dropzoneDirectory + '/DecimalTest.parquet', TRUE, compressionType); | ||
|
||
decimalDatasetIn := ParquetIO.Read(decimalRecord, dropzoneDirectory + '/DecimalTest.parquet'); | ||
decimalDatasetIn := ParquetIO.Read(decimalRecord, dropzoneDirectory + 'DecimalTest.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} decimalJoin (decimalDatasetOut a, decimalDatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -182,9 +173,7 @@ stringDatasetOut := DATASET([ | |
{'adt', '[\'A\',\'B\',\'C\',\'D\',\'E\']'} | ||
], stringRecord); | ||
|
||
ParquetIO.Write(stringDatasetOut, dropzoneDirectory + '/StringTest.parquet', TRUE, compressionType); | ||
|
||
stringDatasetIn := ParquetIO.Read(stringRecord, dropzoneDirectory + '/StringTest.parquet'); | ||
stringDatasetIn := ParquetIO.Read(stringRecord, dropzoneDirectory + 'StringTest.parquet'); | ||
|
||
{STRING5 name, BOOLEAN value} stringJoin (stringDatasetOut a, stringDatasetIn b) := TRANSFORM | ||
SELF.name := a.name; | ||
|
@@ -218,9 +207,7 @@ dataDatasetOut := DATASET([ | |
{'neg', X'0000000000000000', REALToBinary(-2.71828), REALToLargeBinary(-2.71828)} | ||
], dataRecord); | ||
|
||
ParquetIO.Write(dataDatasetOut, dropzoneDirectory + '/DataTest.parquet', TRUE, compressionType); | ||
|
||
dataDatasetIn := ParquetIO.Read(dataRecord, dropzoneDirectory + '/DataTest.parquet'); | ||
dataDatasetIn := ParquetIO.Read(dataRecord, dropzoneDirectory + 'DataTest.parquet'); | ||
|
||
{STRING5 name, BOOLEAN value1, BOOLEAN value2, BOOLEAN value3, BOOLEAN overallValue} dataJoin(dataDatasetOut a, dataDatasetIn b) := TRANSFORM | ||
SELF.name := a.name; | ||
|
@@ -243,9 +230,7 @@ varStringDatasetOut := DATASET([ | |
{072, 'abo', U'UTF8_测试'} | ||
], varstringRecord); | ||
|
||
ParquetIO.Write(varStringDatasetOut, dropzoneDirectory + '/VarStringTest.parquet', TRUE, compressionType); | ||
|
||
varStringDatasetIn := ParquetIO.Read(varstringRecord, dropzoneDirectory + '/VarStringTest.parquet'); | ||
varStringDatasetIn := ParquetIO.Read(varstringRecord, dropzoneDirectory + 'VarStringTest.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} varstringJoin (varStringDatasetOut a, varStringDatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -267,9 +252,7 @@ qStringDatasetOut := DATASET([ | |
{082, 'abt', U'Special_字符'} | ||
], qstringRecord); | ||
|
||
ParquetIO.Write(qStringDatasetOut, dropzoneDirectory + '/QStringTest.parquet', TRUE, compressionType); | ||
|
||
qStringDatasetIn := ParquetIO.Read(qstringRecord, dropzoneDirectory + '/QStringTest.parquet'); | ||
qStringDatasetIn := ParquetIO.Read(qstringRecord, dropzoneDirectory + 'QStringTest.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} qstringJoin (qStringDatasetOut a, qStringDatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -291,9 +274,7 @@ utf8DatasetOut := DATASET([ | |
{092, 'abw', U''} | ||
], utf8Record); | ||
|
||
ParquetIO.Write(utf8DatasetOut, dropzoneDirectory + '/UTF8Test.parquet', TRUE, compressionType); | ||
|
||
utf8DatasetIn := ParquetIO.Read(utf8Record, dropzoneDirectory + '/UTF8Test.parquet'); | ||
utf8DatasetIn := ParquetIO.Read(utf8Record, dropzoneDirectory + 'UTF8Test.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} utf8Join (utf8DatasetOut a, utf8DatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -320,11 +301,8 @@ setOfUnicodeDatasetOut := DATASET([ | |
{105, 'ady', [U'☀', U'☁', U'☂', U'☃', U'☄']} | ||
], setOfUnicodeRecord); | ||
|
||
ParquetIO.Write(unicodeDatasetOut, dropzoneDirectory + '/UnicodeTest.parquet', TRUE, compressionType); | ||
ParquetIO.Write(setOfUnicodeDatasetOut, dropzoneDirectory + '/SetOfUnicodeTest.parquet', TRUE, compressionType); | ||
|
||
unicodeDatasetIn := ParquetIO.Read(unicodeRecord, dropzoneDirectory + '/UnicodeTest.parquet'); | ||
setOfUnicodeDatasetIn := ParquetIO.Read(setOfUnicodeRecord, dropzoneDirectory + '/SetOfUnicodeTest.parquet'); | ||
unicodeDatasetIn := ParquetIO.Read(unicodeRecord, dropzoneDirectory + 'UnicodeTest.parquet'); | ||
setOfUnicodeDatasetIn := ParquetIO.Read(setOfUnicodeRecord, dropzoneDirectory + 'SetOfUnicodeTest.parquet'); | ||
|
||
{UNSIGNED testid, STRING3 testname, BOOLEAN isEqual} unicodeJoin(unicodeDatasetOut a, unicodeDatasetIn b) := TRANSFORM | ||
SELF.testid := a.testid; | ||
|
@@ -335,16 +313,45 @@ END; | |
unicodeCompareResult := JOIN(unicodeDatasetOut, unicodeDatasetIn, LEFT.testid = RIGHT.testid, unicodeJoin(LEFT, RIGHT), ALL); | ||
unicodeResult := IF(COUNT(unicodeCompareResult(isEqual = FALSE)) = 0, 'Pass', 'Fail'); | ||
|
||
PARALLEL( | ||
OUTPUT(booleanResult, NAMED('BooleanTest'), OVERWRITE), | ||
OUTPUT(integerResult, NAMED('IntegerTest'), OVERWRITE), | ||
OUTPUT(unsignedResult, NAMED('UnsignedTest'), OVERWRITE), | ||
OUTPUT(realResult, NAMED('RealTest'), OVERWRITE), | ||
OUTPUT(decimalResult, NAMED('DecimalTest'), OVERWRITE), | ||
OUTPUT(stringResult, NAMED('StringTest'), OVERWRITE), | ||
OUTPUT(dataResult, NAMED('DataAsStringTest'), OVERWRITE), | ||
OUTPUT(varStringResult, NAMED('VarStringTest'), OVERWRITE), | ||
OUTPUT(qStringResult, NAMED('QStringTest'), OVERWRITE), | ||
OUTPUT(utf8Result, NAMED('UTF8Test'), OVERWRITE), | ||
OUTPUT(unicodeResult, NAMED('UnicodeTest'), OVERWRITE) | ||
SEQUENTIAL( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you reorganise the write file then read it back parallel for each type from previous version to this parallel write and sequential read? Is the ParquetIO.Read() not thread safe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was because I was seeing some weird behavior with the SEQUENTIAL call. Before I explicitly ordered the Parquet.Write actions to come before the OUTPUTs, the DeleteExternalFile actions would occur first despite coming after the OUTPUTs in the list of SEQUENTIAL arguments. This meant the cleanup of the files happened before they were written and read from leaving behind every file that was used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Read is thread safe, but the reason for the sequential read is because the result order needs to be the same to match the key file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I see. Fair enough. This Morning I found 2 more errors related to parquetType test:
What do you think about are those errors related to versions race condition as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe so. I think that would be the result of reading from a file while something is in the middle of writing a record batch to it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, thanks |
||
// Set up test files | ||
PARALLEL( | ||
ParquetIO.Write(booleanDatasetOut, dropzoneDirectory + 'BooleanTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(integerDatasetOut, dropzoneDirectory + 'IntegerTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(unsignedDatasetOut, dropzoneDirectory + 'UnsignedTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(realDatasetOut, dropzoneDirectory + 'RealTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(decimalDatasetOut, dropzoneDirectory + 'DecimalTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(stringDatasetOut, dropzoneDirectory + 'StringTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(dataDatasetOut, dropzoneDirectory + 'DataTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(varStringDatasetOut, dropzoneDirectory + 'VarStringTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(qStringDatasetOut, dropzoneDirectory + 'QStringTest.parquet', TRUE, compressionType), | ||
ParquetIO.Write(utf8DatasetOut, dropzoneDirectory + 'UTF8Test.parquet', TRUE, compressionType), | ||
ParquetIO.Write(unicodeDatasetOut, dropzoneDirectory + 'UnicodeTest.parquet', TRUE, compressionType), | ||
), | ||
// Read and compare results | ||
OUTPUT(booleanResult, NAMED('BooleanTest')), | ||
OUTPUT(integerResult, NAMED('IntegerTest')), | ||
OUTPUT(unsignedResult, NAMED('UnsignedTest')), | ||
OUTPUT(realResult, NAMED('RealTest')), | ||
OUTPUT(decimalResult, NAMED('DecimalTest')), | ||
OUTPUT(stringResult, NAMED('StringTest')), | ||
OUTPUT(dataResult, NAMED('DataAsStringTest')), | ||
OUTPUT(varStringResult, NAMED('VarStringTest')), | ||
OUTPUT(qStringResult, NAMED('QStringTest')), | ||
OUTPUT(utf8Result, NAMED('UTF8Test')), | ||
OUTPUT(unicodeResult, NAMED('UnicodeTest')), | ||
// Clean up temporary files | ||
PARALLEL( | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'BooleanTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'IntegerTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'UnsignedTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'RealTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'DecimalTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'StringTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'DataTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'VarStringTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'QStringTest.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'UTF8Test.parquet'), | ||
FileServices.DeleteExternalFile('.', dropzoneDirectory + 'UnicodeTest.parquet'), | ||
) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just pondering about is this name (dropzoneDirectory) still correct or not, because now it contains not only the default DZ path but a file path and prefix as well. I feel it is a bit misleading, but hopefully not a big deal.