From 8fc3f92f745a6fd64596ada017a3ec236683d1c5 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 19 Nov 2024 16:47:45 -0800 Subject: [PATCH] improve serialization for resource usage info Signed-off-by: Chenyang Ji --- .../resourcetracker/TaskResourceInfo.java | 45 +++++++++++++++++-- .../tasks/TaskResourceTrackingService.java | 21 ++++----- 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceInfo.java b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceInfo.java index 373cdbfa7e9a1..14e74fa941899 100644 --- a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceInfo.java +++ b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceInfo.java @@ -10,16 +10,16 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.ParseField; -import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ConstructingObjectParser; -import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Base64; import java.util.Objects; import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg; @@ -202,7 +202,46 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return Strings.toString(MediaTypeRegistry.JSON, this); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (StreamOutput streamOutput = new StreamOutput() { + @Override + public void writeByte(byte b) { + byteArrayOutputStream.write(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) { + byteArrayOutputStream.write(b, offset, length); + } + + /** + * Forces any buffered output to be written. + */ + @Override + public void flush() throws IOException { + byteArrayOutputStream.flush(); + } + + /** + * Closes this stream to further operations. + */ + @Override + public void close() throws IOException { + byteArrayOutputStream.close(); + } + + @Override + public void reset() { + byteArrayOutputStream.reset(); + } + }) { + // Serialize the object to the custom StreamOutput + this.writeTo(streamOutput); + // Convert the byte array to Base64 string + return Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray()); + } catch (IOException e) { + return ""; + } } @Override diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index a184673a8fa2f..84b553187e8b6 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -23,8 +23,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.xcontent.XContentHelper; -import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.tasks.resourcetracker.ResourceStats; import org.opensearch.core.tasks.resourcetracker.ResourceStatsType; import org.opensearch.core.tasks.resourcetracker.ResourceUsageInfo; @@ -32,16 +31,13 @@ import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.tasks.resourcetracker.ThreadResourceInfo; -import org.opensearch.core.xcontent.DeprecationHandler; -import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; @@ -346,13 +342,12 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() { String usage = taskResourceUsages.get(0); try { if (usage != null && !usage.isEmpty()) { - XContentParser parser = XContentHelper.createParser( - NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - new BytesArray(usage), - MediaTypeRegistry.JSON - ); - return TaskResourceInfo.PARSER.apply(parser, null); + // Get the serialized data as a byte array + byte[] serializedData = Base64.getDecoder().decode(usage); + // Deserialize from byte array + try (StreamInput streamInput = StreamInput.wrap(serializedData)) { + return TaskResourceInfo.readFromStream(streamInput); + } } } catch (IOException e) { logger.debug("fail to parse phase resource usages: ", e);