Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java): support user context for serialize global data(#1595) #1596

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/guide/java_serialization_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class Example {
| `codeGenEnabled` | Disabling may result in faster initial serialization but slower subsequent serializations. | `true` |
| `asyncCompilationEnabled` | If enabled, serialization uses interpreter mode first and switches to JIT serialization after async serializer JIT for a class is finished. | `false` |
| `scalaOptimizationEnabled` | Enables or disables Scala-specific serialization optimization. | `false` |
| `shareUserContext` | Enables or disables user context mode.

## Advanced Usage

Expand Down
24 changes: 12 additions & 12 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ public void resetBuffer() {

private void write(MemoryBuffer buffer, Object obj) {
int startOffset = buffer.writerIndex();
boolean shareMetaContext = config.shareMetaContext();
if (shareMetaContext) {
boolean shareContext = config.shareContext();
if (shareContext) {
buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start offsets.
}
// reduce caller stack
Expand All @@ -315,9 +315,9 @@ private void write(MemoryBuffer buffer, Object obj) {
classResolver.writeClass(buffer, classInfo);
writeData(buffer, classInfo, obj);
}
if (shareMetaContext) {
if (shareContext) {
buffer.putInt32(startOffset, buffer.writerIndex());
classResolver.writeClassDefs(buffer);
classResolver.writeContext(buffer);
}
}

Expand Down Expand Up @@ -754,8 +754,8 @@ public Object deserialize(MemoryBuffer buffer, Iterable<MemoryBuffer> outOfBandB
if (isTargetXLang) {
obj = xdeserializeInternal(buffer);
} else {
if (config.shareMetaContext()) {
classResolver.readClassDefs(buffer);
if (config.shareContext()) {
classResolver.readContext(buffer);
}
obj = readRef(buffer);
}
Expand Down Expand Up @@ -1025,15 +1025,15 @@ public void serializeJavaObject(MemoryBuffer buffer, Object obj) {
if (depth != 0) {
throwDepthSerializationException();
}
if (config.shareMetaContext()) {
if (config.shareContext()) {
int startOffset = buffer.writerIndex();
buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start offsets.
if (!refResolver.writeRefOrNull(buffer, obj)) {
ClassInfo classInfo = classResolver.getOrUpdateClassInfo(obj.getClass());
writeData(buffer, classInfo, obj);
}
buffer.putInt32(startOffset, buffer.writerIndex());
classResolver.writeClassDefs(buffer);
classResolver.writeContext(buffer);
} else {
if (!refResolver.writeRefOrNull(buffer, obj)) {
ClassInfo classInfo = classResolver.getOrUpdateClassInfo(obj.getClass());
Expand Down Expand Up @@ -1070,8 +1070,8 @@ public <T> T deserializeJavaObject(MemoryBuffer buffer, Class<T> cls) {
if (depth != 0) {
throwDepthDeserializationException();
}
if (config.shareMetaContext()) {
classResolver.readClassDefs(buffer);
if (config.shareContext()) {
classResolver.readContext(buffer);
}
T obj;
int nextReadRefId = refResolver.tryPreserveRefId(buffer);
Expand Down Expand Up @@ -1184,8 +1184,8 @@ public Object deserializeJavaObjectAndClass(MemoryBuffer buffer) {
if (depth != 0) {
throwDepthDeserializationException();
}
if (config.shareMetaContext()) {
classResolver.readClassDefs(buffer);
if (config.shareContext()) {
classResolver.readContext(buffer);
}
return readRef(buffer);
} catch (Throwable t) {
Expand Down
16 changes: 16 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public class Config implements Serializable {
private final boolean requireClassRegistration;
private final boolean suppressClassRegistrationWarnings;
private final boolean registerGuavaTypes;
private final boolean shareContext;
private final boolean shareMetaContext;
private final boolean shareUserContext;
private final boolean asyncCompilationEnabled;
private final boolean deserializeUnexistedClass;
private final boolean scalaOptimizationEnabled;
Expand All @@ -75,6 +77,8 @@ public Config(FuryBuilder builder) {
checkJdkClassSerializable = builder.checkJdkClassSerializable;
defaultJDKStreamSerializerType = builder.defaultJDKStreamSerializerType;
shareMetaContext = builder.shareMetaContext;
shareUserContext = builder.shareUserContext;
shareContext = shareMetaContext || shareUserContext;
deserializeUnexistedClass = builder.deserializeUnexistedClass;
if (deserializeUnexistedClass) {
// Only in meta share mode or compatibleMode, fury knows how to deserialize
Expand Down Expand Up @@ -182,6 +186,14 @@ public boolean shareMetaContext() {
return shareMetaContext;
}

public boolean shareUserContext() {
return shareUserContext;
}

public boolean shareContext() {
return shareContext;
}

/**
* Whether deserialize/skip data of un-existed class. If not enabled, an exception will be thrown
* if class not exist.
Expand Down Expand Up @@ -236,6 +248,8 @@ public boolean equals(Object o) {
&& suppressClassRegistrationWarnings == config.suppressClassRegistrationWarnings
&& registerGuavaTypes == config.registerGuavaTypes
&& shareMetaContext == config.shareMetaContext
&& shareUserContext == config.shareUserContext
&& shareContext == config.shareContext
&& asyncCompilationEnabled == config.asyncCompilationEnabled
&& deserializeUnexistedClass == config.deserializeUnexistedClass
&& scalaOptimizationEnabled == config.scalaOptimizationEnabled
Expand Down Expand Up @@ -266,6 +280,8 @@ public int hashCode() {
suppressClassRegistrationWarnings,
registerGuavaTypes,
shareMetaContext,
shareUserContext,
shareContext,
asyncCompilationEnabled,
deserializeUnexistedClass,
scalaOptimizationEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class FuryBuilder {
Class<? extends Serializer> defaultJDKStreamSerializerType = ObjectStreamSerializer.class;
boolean requireClassRegistration = true;
boolean shareMetaContext = false;
boolean shareUserContext = false;
boolean codeGenEnabled = true;
Boolean deserializeUnexistedClass;
boolean asyncCompilationEnabled = false;
Expand Down Expand Up @@ -238,6 +239,12 @@ public FuryBuilder withMetaContextShare(boolean shareMetaContext) {
return this;
}

/** Whether to enable user context share feature. */
public FuryBuilder withUserContextShare(boolean shareUserContext) {
this.shareUserContext = shareUserContext;
return this;
}

/**
* Whether deserialize/skip data of un-existed class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1408,16 +1408,31 @@ public void writeClassDefs(MemoryBuffer buffer) {
metaContext.writingClassDefs.clear();
}

/** write user context by register order. */
public void writeUserContext(MemoryBuffer buffer) {
final List<UserContext> userContexts = fury.getSerializationContext().getUserContexts();
for (UserContext userContext : userContexts) {
userContext.write(buffer);
}
}

/** after all data, write share meta context and user context. */
public void writeContext(MemoryBuffer buffer) {
if (fury.getConfig().shareMetaContext()) {
writeClassDefs(buffer);
}
if (fury.getConfig().shareUserContext()) {
writeUserContext(buffer);
}
}

/**
* Ensure all class definition are read and populated, even there are deserialization exception
* such as ClassNotFound. So next time a class def written previously identified by an id can be
* got from the meta context.
*/
public void readClassDefs(MemoryBuffer buffer) {
MetaContext metaContext = fury.getSerializationContext().getMetaContext();
int classDefOffset = buffer.readInt32();
int readerIndex = buffer.readerIndex();
buffer.readerIndex(classDefOffset);
int numClassDefs = buffer.readVarUint32Small14();
for (int i = 0; i < numClassDefs; i++) {
long id = buffer.readInt64();
Expand All @@ -1437,6 +1452,27 @@ public void readClassDefs(MemoryBuffer buffer) {
// can be created still.
metaContext.readClassInfos.add(null);
}
}

/** read user context by register order. */
public void readUserContext(MemoryBuffer buffer) {
final List<UserContext> userContexts = fury.getSerializationContext().getUserContexts();
for (UserContext userContext : userContexts) {
userContext.read(buffer);
}
}

/** read share meta context and user context before read data. */
public void readContext(MemoryBuffer buffer) {
int globalContextOffSet = buffer.readInt32();
int readerIndex = buffer.readerIndex();
buffer.readerIndex(globalContextOffSet);
if (fury.getConfig().shareMetaContext()) {
readClassDefs(buffer);
}
if (fury.getConfig().shareUserContext()) {
readUserContext(buffer);
}
buffer.readerIndex(readerIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.fury.resolver;

import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import org.apache.fury.config.FuryBuilder;

/**
Expand All @@ -30,6 +32,7 @@
public final class SerializationContext {
private final IdentityHashMap<Object, Object> objects = new IdentityHashMap<>();
private MetaContext metaContext;
private final List<UserContext> userContexts = new ArrayList<>();

/** Return the previous value associated with <tt>key</tt>, or <tt>null</tt>. */
public Object add(Object key, Object value) {
Expand Down Expand Up @@ -58,10 +61,19 @@ public void setMetaContext(MetaContext metaContext) {
this.metaContext = metaContext;
}

public List<UserContext> getUserContexts() {
return userContexts;
}

public void addUserContext(UserContext userContext) {
userContexts.add(userContext);
}

public void reset() {
if (objects.size() > 0) {
objects.clear();
}
metaContext = null;
userContexts.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.fury.resolver;

import org.apache.fury.Fury;
import org.apache.fury.memory.MemoryBuffer;

/** write/read user custom global data after metaContext data. */
public abstract class UserContext {

protected final Fury fury;

public UserContext(Fury fury) {
this.fury = fury;
}

public abstract void write(MemoryBuffer buffer);

public abstract void read(MemoryBuffer buffer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.fury.resolver;

import org.apache.fury.Fury;
import org.apache.fury.FuryTestBase;
import org.apache.fury.config.CompatibleMode;
import org.apache.fury.config.Language;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.test.bean.Foo;
import org.testng.Assert;
import org.testng.annotations.Test;

public class UserContextTest extends FuryTestBase {

public class DataUserContext extends UserContext {
Object data = null;

public DataUserContext(Fury fury) {
super(fury);
}

@Override
public void write(MemoryBuffer buffer) {
fury.writeRef(buffer, data);
}

@Override
public void read(MemoryBuffer buffer) {
data = fury.readRef(buffer);
}
}

@Test
public void checkShareUserContext() {
Fury fury =
Fury.builder()
.withLanguage(Language.JAVA)
.requireClassRegistration(false)
.withCompatibleMode(CompatibleMode.COMPATIBLE)
.withMetaContextShare(true)
.withUserContextShare(true)
.build();
final Foo o = Foo.create();
final SerializationContext serializationContext = fury.getSerializationContext();
serializationContext.setMetaContext(new MetaContext());
final DataUserContext userContext1 = new DataUserContext(fury);
userContext1.data = "test1";
serializationContext.addUserContext(userContext1);
final byte[] bytes = fury.serialize(o);

serializationContext.setMetaContext(new MetaContext());
final DataUserContext userContext2 = new DataUserContext(fury);
serializationContext.addUserContext(userContext2);

Assert.assertEquals(fury.deserialize(bytes), o);
Assert.assertEquals(userContext1.data, userContext2.data);
}
}
Loading