Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Serializer APIs, consolidation of ContentCodec, and gRPC MethodDescriptor #1673

Merged
merged 20 commits into from
Aug 7, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,21 @@ default boolean tryEnsureWritable(int minWritableBytes, boolean force) {
*/
Buffer writeUtf8(CharSequence seq, int ensureWritable);

/**
* Encode a {@link CharSequence} encoded in {@link Charset} and write it to this buffer starting at
* {@code writerIndex} and increases the {@code writerIndex} by the number of the transferred bytes.
*
* @param seq the source of the data.
* @param charset the charset used for encoding.
* @return self.
* @throws ReadOnlyBufferException if this buffer is read-only
*/
default Buffer writeCharSequence(CharSequence seq, Charset charset) {
byte[] bytes = seq.toString().getBytes(charset);
writeBytes(bytes);
return this;
}

/**
* Locates the first occurrence of the specified {@code value} in this
* buffer. The search takes place from the specified {@code fromIndex}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
throw new IndexOutOfBoundsException();
}

@Override
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
throw new IndexOutOfBoundsException();
}

@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
checkIndex(fromIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
throw new ReadOnlyBufferException();
}

@Override
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
throw new ReadOnlyBufferException();
}

@Override
public Buffer readSlice(int length) {
checkReadableBytes0(length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,12 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
return this;
}

@Override
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
buffer.writeCharSequence(seq, charset);
return this;
}

@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
return buffer.indexOf(fromIndex, toIndex, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.CompositeByteBuf;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;

final class NettyCompositeBuffer extends NettyBuffer<CompositeByteBuf> implements CompositeBuffer {

Expand Down Expand Up @@ -363,4 +364,10 @@ public CompositeBuffer writeUtf8(CharSequence seq, int ensureWritable) {
super.writeUtf8(seq, ensureWritable);
return this;
}

@Override
public CompositeBuffer writeCharSequence(CharSequence seq, Charset charset) {
super.writeCharSequence(seq, charset);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import java.nio.charset.Charset;

final class ReadOnlyBuffer extends WrappedBuffer {

Expand Down Expand Up @@ -302,6 +303,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
throw new ReadOnlyBufferException();
}

@Override
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
throw new ReadOnlyBufferException();
}

@Override
public Buffer readSlice(int length) {
return buffer.readSlice(length).asReadOnly();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,12 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
return this;
}

@Override
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
buffer.writeCharSequence(seq, charset);
return this;
}

@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
return buffer.indexOf(fromIndex, toIndex, value);
Expand Down
2 changes: 2 additions & 0 deletions servicetalk-concurrent-reactivestreams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ dependencies {
api "org.reactivestreams:reactive-streams:$reactiveStreamsVersion"

implementation project(":servicetalk-annotations")
implementation project(":servicetalk-serializer-utils")
implementation project(":servicetalk-buffer-netty")
implementation "com.google.code.findbugs:jsr305:$jsr305Version"
implementation "org.slf4j:slf4j-api:$slf4jVersion"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.reactivestreams.tck;

import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.serializer.utils.FramedDeserializerOperator;

import org.testng.annotations.Test;

import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static java.util.function.Function.identity;

@Test
public class FramedDeserializerOperatorTckTest extends AbstractPublisherTckTest<Integer> {
@Override
public Publisher<Integer> createServiceTalkPublisher(final long elements) {
return Publisher.range(0, TckUtils.requestNToInt(elements))
.map(i -> DEFAULT_ALLOCATOR.newBuffer().writeInt(i))
.liftSync(new FramedDeserializerOperator<>(
(serializedData, allocator) -> serializedData.readInt(),
() -> (buffer, bufferAllocator) ->
buffer.readableBytes() < Integer.BYTES ? null : buffer.readBytes(Integer.BYTES),
DEFAULT_ALLOCATOR))
.flatMapConcatIterable(identity());
}

@Override
public long maxElementsFromPublisher() {
return TckUtils.maxElementsFromPublisher();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import javax.annotation.Nullable;
import javax.ws.rs.ext.ContextResolver;

/**
* @deprecated Use {@link JacksonSerializerFactoryContextResolver}.
*/
@Deprecated
final class JacksonSerializationProviderContextResolver implements ContextResolver<JacksonSerializationProvider> {
private final JacksonSerializationProvider jacksonSerializationProvider;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.data.jackson.jersey;

import io.servicetalk.data.jackson.JacksonSerializerFactory;

import javax.annotation.Nullable;
import javax.ws.rs.ext.ContextResolver;

import static java.util.Objects.requireNonNull;

final class JacksonSerializerFactoryContextResolver implements ContextResolver<JacksonSerializerFactory> {
private final JacksonSerializerFactory factory;

JacksonSerializerFactoryContextResolver(final JacksonSerializerFactory factory) {
this.factory = requireNonNull(factory);
}

@Nullable
@Override
public JacksonSerializerFactory getContext(final Class<?> aClass) {
if (!JacksonSerializerFactory.class.isAssignableFrom(aClass)) {
return null;
}

return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.data.jackson.JacksonSerializationProvider;
import io.servicetalk.data.jackson.JacksonSerializerFactory;
import io.servicetalk.http.router.jersey.internal.SourceWrappers.PublisherSource;
import io.servicetalk.http.router.jersey.internal.SourceWrappers.SingleSource;
import io.servicetalk.serialization.api.DefaultSerializer;
import io.servicetalk.serialization.api.SerializationException;
import io.servicetalk.serialization.api.Serializer;
import io.servicetalk.serializer.api.Deserializer;
import io.servicetalk.serializer.api.SerializationException;
import io.servicetalk.serializer.api.Serializer;
import io.servicetalk.serializer.api.StreamingSerializer;
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.ExecutionContext;

Expand Down Expand Up @@ -68,9 +69,6 @@
@Consumes(WILDCARD)
@Produces(WILDCARD)
final class JacksonSerializerMessageBodyReaderWriter implements MessageBodyReader<Object>, MessageBodyWriter<Object> {
private static final JacksonSerializationProvider DEFAULT_JACKSON_SERIALIZATION_PROVIDER =
new JacksonSerializationProvider();

// We can not use `@Context ConnectionContext` directly because we would not see the latest version
// in case it has been rebound as part of offloading.
@Context
Expand All @@ -95,86 +93,81 @@ public boolean isReadable(final Class<?> type, final Type genericType, final Ann
public Object readFrom(final Class<Object> type, final Type genericType, final Annotation[] annotations,
final MediaType mediaType, final MultivaluedMap<String, String> httpHeaders,
final InputStream entityStream) throws WebApplicationException {

final Serializer serializer = getSerializer(mediaType);
final JacksonSerializerFactory serializerFactory = getJacksonSerializerFactory(mediaType);
final ExecutionContext executionContext = ctxRefProvider.get().get().executionContext();
final BufferAllocator allocator = executionContext.bufferAllocator();
final int contentLength = requestCtxProvider.get().getLength();

if (Single.class.isAssignableFrom(type)) {
return handleEntityStream(entityStream, allocator,
(p, a) -> deserialize(p, serializer, getSourceClass(genericType), contentLength, a),
(is, a) -> new SingleSource<>(deserialize(toBufferPublisher(is, a), serializer,
getSourceClass(genericType), contentLength, a)));
(p, a) -> deserialize(p, serializerFactory.serializerDeserializer(getSourceClass(genericType)),
contentLength, a),
(is, a) -> new SingleSource<>(deserialize(toBufferPublisher(is, a),
serializerFactory.serializerDeserializer(getSourceClass(genericType)), contentLength, a)));
} else if (Publisher.class.isAssignableFrom(type)) {
return handleEntityStream(entityStream, allocator,
(p, a) -> serializer.deserialize(p, getSourceClass(genericType)),
(is, a) -> new PublisherSource<>(serializer.deserialize(toBufferPublisher(is, a),
getSourceClass(genericType))));
(p, a) -> serializerFactory.streamingSerializerDeserializer(
getSourceClass(genericType)).deserialize(p, a),
(is, a) -> new PublisherSource<>(serializerFactory.streamingSerializerDeserializer(
getSourceClass(genericType)).deserialize(toBufferPublisher(is, a), a)));
}

return handleEntityStream(entityStream, allocator,
(p, a) -> deserializeObject(p, serializer, type, contentLength, a),
(is, a) -> deserializeObject(toBufferPublisher(is, a), serializer, type, contentLength, a));
(p, a) -> deserializeObject(p, serializerFactory.serializerDeserializer(type), contentLength, a),
(is, a) -> deserializeObject(toBufferPublisher(is, a), serializerFactory.serializerDeserializer(type),
contentLength, a));
}

@Override
public boolean isWriteable(final Class<?> type, final Type genericType, final Annotation[] annotations,
final MediaType mediaType) {

return !isSse(requestCtxProvider.get()) && isSupportedMediaType(mediaType);
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void writeTo(final Object o, final Class<?> type, final Type genericType, final Annotation[] annotations,
final MediaType mediaType, final MultivaluedMap<String, Object> httpHeaders,
final OutputStream entityStream) throws WebApplicationException {

final BufferAllocator allocator = ctxRefProvider.get().get().executionContext().bufferAllocator();
final Publisher<Buffer> bufferPublisher;
if (o instanceof Single) {
bufferPublisher = getResponseBufferPublisher(((Single) o).toPublisher(), genericType, mediaType);
final Class<?> clazz = genericType instanceof Class ? (Class) genericType : getSourceClass(genericType);
Serializer serializer = getJacksonSerializerFactory(mediaType).serializerDeserializer(clazz);
bufferPublisher = ((Single) o).map(t -> serializer.serialize(t, allocator)).toPublisher();
} else if (o instanceof Publisher) {
bufferPublisher = getResponseBufferPublisher((Publisher) o, genericType, mediaType);
final Class<?> clazz = genericType instanceof Class ? (Class) genericType : getSourceClass(genericType);
StreamingSerializer serializer = getJacksonSerializerFactory(mediaType)
.streamingSerializerDeserializer(clazz);
bufferPublisher = serializer.serialize((Publisher) o, allocator);
} else {
bufferPublisher = getResponseBufferPublisher(Publisher.from(o), o.getClass(), mediaType);
Serializer serializer = getJacksonSerializerFactory(mediaType).serializerDeserializer(o.getClass());
bufferPublisher = Publisher.from(serializer.serialize(o, allocator));
}

setResponseBufferPublisher(bufferPublisher, requestCtxProvider.get());
}

@SuppressWarnings("unchecked")
private Publisher<Buffer> getResponseBufferPublisher(final Publisher publisher, final Type type,
final MediaType mediaType) {
final BufferAllocator allocator = ctxRefProvider.get().get().executionContext().bufferAllocator();
return getSerializer(mediaType).serialize(publisher, allocator,
type instanceof Class ? (Class) type : getSourceClass(type));
}

private Serializer getSerializer(final MediaType mediaType) {
return new DefaultSerializer(getJacksonSerializationProvider(mediaType));
}
private JacksonSerializerFactory getJacksonSerializerFactory(final MediaType mediaType) {
final ContextResolver<JacksonSerializerFactory> contextResolver =
providers.getContextResolver(JacksonSerializerFactory.class, mediaType);

private JacksonSerializationProvider getJacksonSerializationProvider(final MediaType mediaType) {
final ContextResolver<JacksonSerializationProvider> contextResolver =
providers.getContextResolver(JacksonSerializationProvider.class, mediaType);

return contextResolver != null ? contextResolver.getContext(JacksonSerializationProvider.class) :
DEFAULT_JACKSON_SERIALIZATION_PROVIDER;
return contextResolver != null ? contextResolver.getContext(JacksonSerializerFactory.class) :
JacksonSerializerFactory.JACKSON;
}

private static Publisher<Buffer> toBufferPublisher(final InputStream is, final BufferAllocator a) {
return fromInputStream(is).map(a::wrap);
}

private static <T> Single<T> deserialize(final Publisher<Buffer> bufferPublisher, final Serializer ser,
final Class<T> type, final int contentLength,
final BufferAllocator allocator) {

private static <T> Single<T> deserialize(
final Publisher<Buffer> bufferPublisher, final Deserializer<T> deserializer, final int contentLength,
final BufferAllocator allocator) {
return bufferPublisher
.collect(() -> newBufferForRequestContent(contentLength, allocator), Buffer::writeBytes)
.map(buf -> {
try {
return ser.deserializeAggregatedSingle(buf, type);
return deserializer.deserialize(buf, allocator);
} catch (final NoSuchElementException e) {
throw new BadRequestException("No deserializable JSON content", e);
} catch (final SerializationException e) {
Expand All @@ -192,10 +185,9 @@ static Buffer newBufferForRequestContent(final int contentLength,
}

// visible for testing
static <T> T deserializeObject(final Publisher<Buffer> bufferPublisher, final Serializer ser,
final Class<T> type, final int contentLength,
final BufferAllocator allocator) {
return awaitResult(deserialize(bufferPublisher, ser, type, contentLength, allocator).toFuture());
static <T> T deserializeObject(final Publisher<Buffer> bufferPublisher, final Deserializer<T> deserializer,
final int contentLength, final BufferAllocator allocator) {
return awaitResult(deserialize(bufferPublisher, deserializer, contentLength, allocator).toFuture());
}

private static boolean isSse(ContainerRequestContext requestCtx) {
Expand Down
Loading