Skip to content

Commit

Permalink
[HUDI-6825] Use UTF_8 to encode String to byte array in all places (#…
Browse files Browse the repository at this point in the history
…9634)

Unify the encoding of Java `String` to byte array in Hudi, 
especially for writing bytes to the storage, 
by using `UTF_8` encoding only.

---------

Co-authored-by: Sagar Sumit <[email protected]>
  • Loading branch information
yihua and codope authored Sep 12, 2023
1 parent 0fda680 commit fe71659
Show file tree
Hide file tree
Showing 109 changed files with 416 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* CLI command to display hudi table options.
Expand Down Expand Up @@ -261,7 +262,7 @@ private static void writeToFile(String filePath, String data) throws IOException
OutputStream os = null;
try {
os = new FileOutputStream(outFile);
os.write(data.getBytes(), 0, data.length());
os.write(getUTF8Bytes(data), 0, data.length());
} finally {
os.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.hudi.cli.integ;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.HoodieCLIIntegrationTestBase;
Expand All @@ -33,6 +30,10 @@
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.hudi.utilities.functional.TestHDFSParquetImporter;
import org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -49,6 +50,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -81,7 +83,7 @@ public void init() throws IOException, ParseException {

// create schema file
try (FSDataOutputStream schemaFileOS = fs.create(new Path(schemaFile))) {
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
schemaFileOS.write(getUTF8Bytes(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
}

importer = new TestHDFSParquetImporter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -42,6 +41,7 @@

import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Class to be used in tests to keep generating test inserts and updates against a corpus.
Expand Down Expand Up @@ -114,7 +114,7 @@ public static void createCommitFileWithMetadata(String basePath, String commitTi
static void createFileWithMetadata(String basePath, Configuration configuration, String name, String content) throws IOException {
Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + name);
try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) {
os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8)));
os.writeBytes(new String(getUTF8Bytes(content)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -85,6 +84,7 @@
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;

Expand Down Expand Up @@ -500,7 +500,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,

table.getActiveTimeline().transitionReplaceInflightToComplete(
clusteringInstant,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(metadata.toJsonString())));
} catch (Exception e) {
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -106,6 +105,7 @@

import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;

/**
Expand Down Expand Up @@ -284,7 +284,7 @@ protected void commit(HoodieTable table, String commitActionType, String instant
// update Metadata table
writeTableMetadata(table, instantTime, metadata, writeStatuses);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(metadata.toJsonString())));
}

// Save internal schema
Expand Down Expand Up @@ -1542,7 +1542,7 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
metadata.setOperationType(WriteOperationType.ALTER_SCHEMA);
try {
timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
timeLine.transitionRequestedToInflight(requested, Option.of(getUTF8Bytes(metadata.toJsonString())));
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -59,6 +58,8 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* A timeline writer which organizes the files as an LSM tree.
*/
Expand Down Expand Up @@ -158,7 +159,7 @@ public void updateManifest(List<String> filesToRemove, String fileToAdd) throws
}

private void createManifestFile(HoodieLSMTimelineManifest manifest, int currentVersion) throws IOException {
byte[] content = manifest.toJsonString().getBytes(StandardCharsets.UTF_8);
byte[] content = getUTF8Bytes(manifest.toJsonString());
// version starts from 1 and increases monotonically
int newVersion = currentVersion < 0 ? 1 : currentVersion + 1;
// create manifest file
Expand All @@ -169,7 +170,7 @@ private void createManifestFile(HoodieLSMTimelineManifest manifest, int currentV
}

private void updateVersionFile(int newVersion) throws IOException {
byte[] content = (String.valueOf(newVersion)).getBytes(StandardCharsets.UTF_8);
byte[] content = getUTF8Bytes(String.valueOf(newVersion));
final Path versionFilePath = LSMTimeline.getVersionFilePath(metaClient);
metaClient.getFs().delete(versionFilePath, false);
metaClient.getFs().createImmutableFileInPath(versionFilePath, Option.of(content));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
Expand All @@ -57,6 +56,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Tools used for migrating to new LSM tree style archived timeline.
*/
Expand Down Expand Up @@ -95,7 +96,7 @@ private Pair<HoodieInstant, Option<byte[]>> readInstant(GenericRecord record) {
if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
return HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) actionData);
} else {
return actionData.toString().getBytes(StandardCharsets.UTF_8);
return getUTF8Bytes(actionData.toString());
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX;
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Utilities class for consistent bucket index metadata management.
Expand Down Expand Up @@ -208,7 +209,7 @@ private static void createCommitMarker(HoodieTable table, Path fileStatus, Path
if (fs.exists(fullPath)) {
return;
}
FileIOUtils.createFileInPath(fs, fullPath, Option.of(StringUtils.EMPTY_STRING.getBytes()));
FileIOUtils.createFileInPath(fs, fullPath, Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -71,6 +70,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;

public abstract class BaseCommitActionExecutor<T, I, K, O, R>
Expand Down Expand Up @@ -154,7 +154,7 @@ void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String insta
String commitActionType = getCommitActionType();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
activeTimeline.transitionRequestedToInflight(requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
Option.of(getUTF8Bytes(metadata.toJsonString())),
config.shouldAllowMultiWriteOnSameInstant());
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
import org.apache.hudi.table.HoodieTable;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Base class helps to perform compact.
*
Expand Down Expand Up @@ -83,7 +84,7 @@ public void completeInflightCompaction(HoodieTable table, String compactionCommi
try {
activeTimeline.transitionCompactionInflightToComplete(
HoodieTimeline.getCompactionInflightInstant(compactionCommitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
} catch (IOException e) {
throw new HoodieCompactionException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + compactionCommitTime, e);
Expand All @@ -95,7 +96,7 @@ public void completeInflightLogCompaction(HoodieTable table, String logCompactio
try {
activeTimeline.transitionLogCompactionInflightToComplete(
HoodieTimeline.getLogCompactionInflightInstant(logCompactionCommitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
} catch (IOException e) {
throw new HoodieCompactionException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + logCompactionCommitTime, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -46,6 +45,7 @@
import java.util.UUID;

import static org.apache.hudi.common.table.log.HoodieLogFormat.DEFAULT_WRITE_TOKEN;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

public class HoodieTestCommitGenerator {
public static final String BASE_FILE_WRITE_TOKEN = "1-0-1";
Expand Down Expand Up @@ -163,7 +163,7 @@ public static void createCommitFileWithMetadata(
String filename, String content) throws IOException {
Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + filename);
try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) {
os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8)));
os.writeBytes(new String(getUTF8Bytes(content)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -98,7 +99,7 @@ private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient)
String completionTime = String.format("%08d", i + 1000);
HoodieCommitMetadata metadata = testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT, Arrays.asList("par1", "par2"), 10, false);
testTable.addCommit(instantTime, Option.of(metadata));
activeActions.add(new DummyActiveAction(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime), metadata.toJsonString().getBytes()));
activeActions.add(new DummyActiveAction(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime), getUTF8Bytes(metadata.toJsonString())));
}
testTable.addRequestedCommit(String.format("%08d", 11));
List<HoodieInstant> instants = new HoodieActiveTimeline(metaClient, false).getInstantsAsStream().sorted().collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,6 @@

package org.apache.hudi.io.storage;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -40,6 +30,16 @@
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -70,8 +70,9 @@
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.io.storage.HoodieAvroHFileReader.SCHEMA_KEY;
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -130,7 +131,7 @@ protected void verifySchema(Configuration conf, String schemaPath) throws IOExce
FileSystem fs = getFilePath().getFileSystem(conf);
HFile.Reader hfileReader = HoodieHFileUtils.createHFileReader(fs, getFilePath(), new CacheConfig(conf), conf);
assertEquals(getSchemaFromResource(TestHoodieHFileReaderWriter.class, schemaPath),
new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(SCHEMA_KEY.getBytes()))));
new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(getUTF8Bytes(SCHEMA_KEY)))));
}

private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClient<List<HoodieRecord<T>>, List<WriteStatus>, List<WriteStatus>> {

private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkTableServiceClient.class);
Expand Down Expand Up @@ -137,7 +138,7 @@ protected void completeClustering(
LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(metadata.toJsonString())));
} catch (IOException e) {
throw new HoodieClusteringException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
Expand Down
Loading

0 comments on commit fe71659

Please sign in to comment.