Skip to content

Commit

Permalink
Add content-type in the AsyncRequestBody and corresponding tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Quanzzzz committed Jan 20, 2021
1 parent be81185 commit 88e0c71
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-2d297fd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "AWS SDK for Java v2",
"contributor": "",
"type": "bugfix",
"description": "Add support for content-type in AsyncRequestBody."
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
public Optional<Long> contentLength() {
return transformedRequestBody.contentLength();
}

@Override
public String contentType() {
return transformedRequestBody.contentType();
}
}

static String toDebugString(Message m, boolean truncatePayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;

/**
Expand Down Expand Up @@ -57,6 +58,13 @@ public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
*/
Optional<Long> contentLength();

/**
* @return The content type of the data being produced.
*/
default String contentType() {
return Mimetype.MIMETYPE_OCTET_STREAM;
}

/**
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
* The data is delivered when the publisher publishes the data.
Expand Down Expand Up @@ -115,7 +123,7 @@ static AsyncRequestBody fromFile(File file) {
* @see ByteArrayAsyncRequestBody
*/
static AsyncRequestBody fromString(String string, Charset cs) {
return new ByteArrayAsyncRequestBody(string.getBytes(cs));
return new ByteArrayAsyncRequestBody(string.getBytes(cs), Mimetype.MIMETYPE_TEXT_PLAIN);
}

/**
Expand All @@ -137,7 +145,7 @@ static AsyncRequestBody fromString(String string) {
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytes(byte[] bytes) {
return new ByteArrayAsyncRequestBody(bytes);
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
}

/**
Expand All @@ -148,7 +156,7 @@ static AsyncRequestBody fromBytes(byte[] bytes) {
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
return new ByteArrayAsyncRequestBody(BinaryUtils.copyAllBytesFrom(byteBuffer));
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,23 @@ public final class ByteArrayAsyncRequestBody implements AsyncRequestBody {

private final byte[] bytes;

public ByteArrayAsyncRequestBody(byte[] bytes) {
private final String mimetype;

public ByteArrayAsyncRequestBody(byte[] bytes, String mimetype) {
this.bytes = bytes.clone();
this.mimetype = mimetype;
}

@Override
public Optional<Long> contentLength() {
return Optional.of((long) bytes.length);
}

@Override
public String contentType() {
return mimetype;
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.builder.SdkBuilder;

Expand Down Expand Up @@ -70,6 +71,11 @@ public Optional<Long> contentLength() {
}
}

@Override
public String contentType() {
return Mimetype.getInstance().getMimetype(path);
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@

package software.amazon.awssdk.core.runtime.transform;

import static software.amazon.awssdk.http.Header.CONTENT_TYPE;

import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.transform.AbstractStreamingRequestMarshaller;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.utils.StringUtils;

/**
* Augments a {@link Marshaller} to add contents for an async streamed request.
Expand All @@ -42,6 +45,10 @@ public static Builder builder() {
@Override
public SdkHttpFullRequest marshall(T in) {
SdkHttpFullRequest.Builder marshalled = delegateMarshaller.marshall(in).toBuilder();
String contentType = marshalled.firstMatchingHeader(CONTENT_TYPE).orElse(null);
if (StringUtils.isEmpty(contentType)) {
marshalled.putHeader(CONTENT_TYPE, asyncRequestBody.contentType());
}

addHeaders(marshalled, asyncRequestBody.contentLength(), requiresLength, transferEncoding, useHttp2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,39 @@

package software.amazon.awssdk.core.async;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import io.reactivex.Flowable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.assertj.core.util.Lists;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.http.async.SimpleSubscriber;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.StringInputStream;

@RunWith(Parameterized.class)
public class AsyncRequestBodyTest {
Expand Down Expand Up @@ -96,6 +111,48 @@ public void onComplete() {
assertThat(sb.toString()).isEqualTo(testString);
}

@Test
public void stringConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world");
assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_TEXT_PLAIN);
}

@Test
public void fileConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromFile(path);
assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}

@Test
public void bytesArrayConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromBytes("hello world".getBytes());
assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}

@Test
public void bytesBufferConstructorHasCorrectContentType() {
ByteBuffer byteBuffer = ByteBuffer.wrap("hello world".getBytes());
AsyncRequestBody requestBody = AsyncRequestBody.fromByteBuffer(byteBuffer);
assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}

@Test
public void emptyBytesConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.empty();
assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}

@Test
public void publisherConstructorHasCorrectContentType() {
List<String> requestBodyStrings = Lists.newArrayList("A", "B", "C");
List<ByteBuffer> bodyBytes = requestBodyStrings.stream()
.map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
.collect(Collectors.toList());
Publisher<ByteBuffer> bodyPublisher = Flowable.fromIterable(bodyBytes);
AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher);
assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}

@Test
public void fromBytes_byteArrayNotNull_createsCopy() {
byte[] original = {0x1, 0x2, 0x3, 0x4};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.internal.util.Mimetype;

public class ByteArrayAsyncRequestBodyTest {
private class testSubscriber implements Subscriber<ByteBuffer> {
Expand Down Expand Up @@ -54,7 +55,8 @@ public void onComplete() {

@Test
public void concurrentRequests_shouldCompleteNormally() {
ByteArrayAsyncRequestBody byteArrayReq = new ByteArrayAsyncRequestBody("Hello World!".getBytes());
ByteArrayAsyncRequestBody byteArrayReq = new ByteArrayAsyncRequestBody("Hello World!".getBytes(),
Mimetype.MIMETYPE_OCTET_STREAM);
byteArrayReq.subscribe(subscriber);
assertTrue(subscriber.onCompleteCalled.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public Optional<Long> contentLength() {
return wrapped.contentLength();
}

@Override
public String contentType() {
return wrapped.contentType();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
sdkChecksum.reset();
Expand Down

0 comments on commit 88e0c71

Please sign in to comment.