Skip to content

Commit

Permalink
Making perf enhancements to JSON serde
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 20, 2023
1 parent 979132b commit aaf5a31
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
*/
public class TimelineServerBasedWriteMarkers extends WriteMarkers {
private static final Logger LOG = LoggerFactory.getLogger(TimelineServerBasedWriteMarkers.class);
private static final TypeReference<Boolean> BOOLEAN_TYPE_REFERENCE = new TypeReference<Boolean>() {};
private static final TypeReference<Set<String>> SET_TYPE_REFERENCE = new TypeReference<Set<String>>() {};

private final HttpRequestClient httpRequestClient;

Expand All @@ -84,7 +86,7 @@ public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
DELETE_MARKER_DIR_URL, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST);
DELETE_MARKER_DIR_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to delete marker directory " + markerDirPath.toString(), e);
}
Expand All @@ -95,7 +97,7 @@ public boolean doesMarkerDirExist() {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
MARKERS_DIR_EXISTS_URL, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.GET);
MARKERS_DIR_EXISTS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to check marker directory " + markerDirPath.toString(), e);
}
Expand All @@ -106,7 +108,7 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
Set<String> markerPaths = httpRequestClient.executeRequest(
CREATE_AND_MERGE_MARKERS_URL, paramsMap, new TypeReference<Set<String>>() {}, RequestMethod.GET);
CREATE_AND_MERGE_MARKERS_URL, paramsMap, SET_TYPE_REFERENCE, RequestMethod.GET);
return markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet());
} catch (IOException e) {
throw new HoodieRemoteException("Failed to get CREATE and MERGE data file paths in "
Expand All @@ -119,7 +121,7 @@ public Set<String> allMarkerFilePaths() {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
ALL_MARKERS_URL, paramsMap, new TypeReference<Set<String>>() {}, RequestMethod.GET);
ALL_MARKERS_URL, paramsMap, SET_TYPE_REFERENCE, RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to get all markers in " + markerDirPath.toString(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.client.utils.URIBuilder;
Expand All @@ -37,7 +38,7 @@
*/
public class HttpRequestClient {
private static final Logger LOG = LoggerFactory.getLogger(HttpRequestClient.class);
private final ObjectMapper mapper;
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
private final String serverHost;
private final int serverPort;
private final int timeoutSecs;
Expand All @@ -51,7 +52,6 @@ public HttpRequestClient(HoodieWriteConfig writeConfig) {
}

public HttpRequestClient(String serverHost, int serverPort, int timeoutSecs, int maxRetry) {
this.mapper = new ObjectMapper();
this.serverHost = serverHost;
this.serverPort = serverPort;
this.timeoutSecs = timeoutSecs;
Expand Down Expand Up @@ -92,7 +92,7 @@ public <T> T executeRequest(String requestPath, Map<String, String> queryParamet
break;
}
String content = response.returnContent().asString();
return (T) mapper.readValue(content, reference);
return (T) MAPPER.readValue(content, reference);
}

public enum RequestMethod {
Expand Down
6 changes: 6 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>


<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</dependency>

<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public static List<FileGroupDTO> fileGroupDTOsfromFileGroups(List<HoodieFileGrou
} else if (fileGroups.size() == 1) {
return Collections.singletonList(FileGroupDTO.fromFileGroup(fileGroups.get(0), true));
} else {
List<FileGroupDTO> fileGroupDTOS = new ArrayList<>();
List<FileGroupDTO> fileGroupDTOS = new ArrayList<>(fileGroups.size());
fileGroupDTOS.add(FileGroupDTO.fromFileGroup(fileGroups.get(0), true));
fileGroupDTOS.addAll(fileGroups.subList(1, fileGroups.size()).stream()
fileGroupDTOS.addAll(fileGroups.stream().skip(1)
.map(fg -> FileGroupDTO.fromFileGroup(fg, false)).collect(Collectors.toList()));
return fileGroupDTOS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import org.apache.http.Consts;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
Expand All @@ -63,6 +64,8 @@
*/
public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, Serializable {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());

private static final String BASE_URL = "/v1/hoodie/view";
public static final String LATEST_PARTITION_SLICES_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/");
public static final String LATEST_PARTITION_SLICE_URL = String.format("%s/%s", BASE_URL, "slices/file/latest/");
Expand Down Expand Up @@ -112,7 +115,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,

public static final String PENDING_CLUSTERING_FILEGROUPS = String.format("%s/%s", BASE_URL, "clustering/pending/");


public static final String LAST_INSTANT = String.format("%s/%s", BASE_URL, "timeline/instant/last");
public static final String LAST_INSTANTS = String.format("%s/%s", BASE_URL, "timeline/instants/last");

Expand All @@ -136,13 +138,20 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,


private static final Logger LOG = LoggerFactory.getLogger(RemoteHoodieTableFileSystemView.class);
private static final TypeReference<List<FileSliceDTO>> FILE_SLICE_DTOS_REFERENCE = new TypeReference<List<FileSliceDTO>>() {};
private static final TypeReference<List<FileGroupDTO>> FILE_GROUP_DTOS_REFERENCE = new TypeReference<List<FileGroupDTO>>() {};
private static final TypeReference<Boolean> BOOLEAN_TYPE_REFERENCE = new TypeReference<Boolean>() {};
private static final TypeReference<List<CompactionOpDTO>> COMPACTION_OP_DTOS_REFERENCE = new TypeReference<List<CompactionOpDTO>>() {};
private static final TypeReference<List<ClusteringOpDTO>> CLUSTERING_OP_DTOS_REFERENCE = new TypeReference<List<ClusteringOpDTO>>() {};
private static final TypeReference<List<InstantDTO>> INSTANT_DTOS_REFERENCE = new TypeReference<List<InstantDTO>>() {};
private static final TypeReference<TimelineDTO> TIMELINE_DTO_REFERENCE = new TypeReference<TimelineDTO>() {};
private static final TypeReference<List<BaseFileDTO>> BASE_FILE_DTOS_REFERENCE = new TypeReference<List<BaseFileDTO>>() {};

private final String serverHost;
private final int serverPort;
private final String basePath;
private final HoodieTableMetaClient metaClient;
private HoodieTimeline timeline;
private final ObjectMapper mapper;
private final int timeoutMs;

private boolean closed = false;
Expand All @@ -159,7 +168,6 @@ public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaC

public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
this.basePath = metaClient.getBasePath();
this.mapper = new ObjectMapper();
this.metaClient = metaClient;
this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
this.serverHost = viewConf.getRemoteViewServerHost();
Expand Down Expand Up @@ -192,7 +200,7 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame
LOG.info("Sending request : (" + url + ")");
Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method);
String content = response.returnContent().asString(Consts.UTF_8);
return (T) mapper.readValue(content, reference);
return (T) OBJECT_MAPPER.readValue(content, reference);
}

private Map<String, String> getParamsWithPartitionPath(String partitionPath) {
Expand Down Expand Up @@ -250,7 +258,7 @@ public Stream<HoodieBaseFile> getLatestBaseFiles() {
private Stream<HoodieBaseFile> getLatestBaseFilesFromParams(Map<String, String> paramsMap, String requestPath) {
try {
List<BaseFileDTO> dataFiles = executeRequest(requestPath, paramsMap,
new TypeReference<List<BaseFileDTO>>() {}, RequestMethod.GET);
BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand Down Expand Up @@ -317,7 +325,7 @@ public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
try {
List<FileSliceDTO> dataFiles = executeRequest(LATEST_PARTITION_SLICES_URL, paramsMap,
new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(FileSliceDTO::toFileSlice);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -329,7 +337,7 @@ public Option<FileSlice> getLatestFileSlice(String partitionPath, String fileId)
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
try {
List<FileSliceDTO> dataFiles = executeRequest(LATEST_PARTITION_SLICE_URL, paramsMap,
new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return Option.fromJavaOptional(dataFiles.stream().map(FileSliceDTO::toFileSlice).findFirst());
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -341,7 +349,7 @@ public Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
try {
List<FileSliceDTO> dataFiles = executeRequest(LATEST_PARTITION_UNCOMPACTED_SLICES_URL, paramsMap,
new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(FileSliceDTO::toFileSlice);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -356,7 +364,7 @@ public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, Str
new String[] {maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction)});
try {
List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(FileSliceDTO::toFileSlice);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand Down Expand Up @@ -386,7 +394,7 @@ public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPat
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxInstantTime);
try {
List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, paramsMap,
new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(FileSliceDTO::toFileSlice);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -399,7 +407,7 @@ public Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn)
getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new String[0]), ","));
try {
List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap,
new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(FileSliceDTO::toFileSlice);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -411,7 +419,7 @@ public Stream<FileSlice> getAllFileSlices(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
try {
List<FileSliceDTO> dataFiles =
executeRequest(ALL_SLICES_URL, paramsMap, new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
executeRequest(ALL_SLICES_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(FileSliceDTO::toFileSlice);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -423,7 +431,7 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
try {
List<FileGroupDTO> fileGroups = executeRequest(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap,
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -435,7 +443,7 @@ public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitT
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
try {
List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, paramsMap,
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -447,7 +455,7 @@ public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime,
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
try {
List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -459,7 +467,7 @@ public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String minCommitTi
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MIN_INSTANT_PARAM, minCommitTime);
try {
List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON, paramsMap,
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -471,7 +479,7 @@ public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
try {
List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap,
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -483,7 +491,7 @@ public boolean refresh() {
try {
// refresh the local timeline first.
this.timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
return executeRequest(REFRESH_TABLE, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST);
return executeRequest(REFRESH_TABLE, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
Expand All @@ -493,7 +501,7 @@ public boolean refresh() {
public Void loadAllPartitions() {
Map<String, String> paramsMap = getParams();
try {
executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST);
executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
return null;
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -505,7 +513,7 @@ public Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations(
Map<String, String> paramsMap = getParams();
try {
List<CompactionOpDTO> dtos = executeRequest(PENDING_COMPACTION_OPS, paramsMap,
new TypeReference<List<CompactionOpDTO>>() {}, RequestMethod.GET);
COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET);
return dtos.stream().map(CompactionOpDTO::toCompactionOperation);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -517,7 +525,7 @@ public Stream<Pair<String, CompactionOperation>> getPendingLogCompactionOperatio
Map<String, String> paramsMap = getParams();
try {
List<CompactionOpDTO> dtos = executeRequest(PENDING_LOG_COMPACTION_OPS, paramsMap,
new TypeReference<List<CompactionOpDTO>>() {}, RequestMethod.GET);
COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET);
return dtos.stream().map(CompactionOpDTO::toCompactionOperation);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -529,7 +537,7 @@ public Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClus
Map<String, String> paramsMap = getParams();
try {
List<ClusteringOpDTO> dtos = executeRequest(PENDING_CLUSTERING_FILEGROUPS, paramsMap,
new TypeReference<List<ClusteringOpDTO>>() {}, RequestMethod.GET);
CLUSTERING_OP_DTOS_REFERENCE, RequestMethod.GET);
return dtos.stream().map(ClusteringOpDTO::toClusteringOperation);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -551,7 +559,7 @@ public Option<HoodieInstant> getLastInstant() {
Map<String, String> paramsMap = getParams();
try {
List<InstantDTO> instants =
executeRequest(LAST_INSTANT, paramsMap, new TypeReference<List<InstantDTO>>() {}, RequestMethod.GET);
executeRequest(LAST_INSTANT, paramsMap, INSTANT_DTOS_REFERENCE, RequestMethod.GET);
return Option.fromJavaOptional(instants.stream().map(InstantDTO::toInstant).findFirst());
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand All @@ -563,7 +571,7 @@ public HoodieTimeline getTimeline() {
Map<String, String> paramsMap = getParams();
try {
TimelineDTO timeline =
executeRequest(TIMELINE, paramsMap, new TypeReference<TimelineDTO>() {}, RequestMethod.GET);
executeRequest(TIMELINE, paramsMap, TIMELINE_DTO_REFERENCE, RequestMethod.GET);
return TimelineDTO.toTimeline(timeline, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand Down
Loading

0 comments on commit aaf5a31

Please sign in to comment.