Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings/java): support append #2350

Merged
merged 2 commits into from
May 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions bindings/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<jniClassifier>${os.detected.classifier}</jniClassifier>

<assertj-version>3.23.1</assertj-version>
<lombok.version>1.18.26</lombok.version>
<questdb.version>1.0.0</questdb.version>

<maven-surefire-plugin.version>3.0.0</maven-surefire-plugin.version>
Expand Down Expand Up @@ -84,10 +85,21 @@
<artifactId>assertj-core</artifactId>
<version>${assertj-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.questdb</groupId>
<artifactId>jar-jni</artifactId>
Expand Down
10 changes: 5 additions & 5 deletions bindings/java/src/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use std::str::FromStr;

use jni::objects::JClass;
use jni::objects::JObject;
use jni::objects::JString;
use jni::objects::{JByteArray, JClass};
use jni::sys::jlong;
use jni::sys::jstring;
use jni::JNIEnv;
Expand Down Expand Up @@ -93,7 +93,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_write(
_: JClass,
op: *mut BlockingOperator,
path: JString,
content: JString,
content: JByteArray,
) {
intern_write(&mut env, &mut *op, path, content).unwrap_or_else(|e| {
e.throw(&mut env);
Expand All @@ -104,11 +104,11 @@ fn intern_write(
env: &mut JNIEnv,
op: &mut BlockingOperator,
path: JString,
content: JString,
content: JByteArray,
) -> Result<()> {
let path = env.get_string(&path)?;
let content = env.get_string(&content)?;
Ok(op.write(path.to_str()?, content.to_str()?.to_string())?)
let content = env.convert_byte_array(content)?;
Ok(op.write(path.to_str()?, content)?)
}

/// # Safety
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.opendal;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
Expand All @@ -41,6 +42,10 @@ public BlockingOperator(String schema, Map<String, String> map) {
}

public void write(String path, String content) {
write(path, content.getBytes(StandardCharsets.UTF_8));
}

public void write(String path, byte[] content) {
write(nativeHandle, path, content);
}

Expand All @@ -61,7 +66,7 @@ public Metadata stat(String path) {

private static native long constructor(String schema, Map<String, String> map);

private static native void write(long nativeHandle, String path, String content);
private static native void write(long nativeHandle, String path, byte[] content);

private static native String read(long nativeHandle, String path);

Expand Down
18 changes: 17 additions & 1 deletion bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.opendal;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -115,10 +116,23 @@ public Operator(String schema, Map<String, String> map) {
}

public CompletableFuture<Void> write(String path, String content) {
return write(path, content.getBytes(StandardCharsets.UTF_8));
}

public CompletableFuture<Void> write(String path, byte[] content) {
final long requestId = write(nativeHandle, path, content);
return registry().take(requestId);
}

public CompletableFuture<Void> append(String path, String content) {
return append(path, content.getBytes(StandardCharsets.UTF_8));
}

public CompletableFuture<Void> append(String path, byte[] content) {
final long requestId = append(nativeHandle, path, content);
return registry().take(requestId);
}

public CompletableFuture<Metadata> stat(String path) {
final long requestId = stat(nativeHandle, path);
final CompletableFuture<Long> f = registry().take(requestId);
Expand All @@ -142,7 +156,9 @@ public CompletableFuture<Void> delete(String path) {

private static native long read(long nativeHandle, String path);

private static native long write(long nativeHandle, String path, String content);
private static native long write(long nativeHandle, String path, byte[] content);

private static native long append(long nativeHandle, String path, byte[] content);

private static native long delete(long nativeHandle, String path);

Expand Down
52 changes: 47 additions & 5 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

use std::str::FromStr;

use jni::objects::JClass;
use jni::objects::JObject;
use jni::objects::JString;
use jni::objects::JValue;
use jni::objects::JValueOwned;
use jni::objects::{JByteArray, JClass};
use jni::sys::jlong;
use jni::JNIEnv;
use opendal::Operator;
Expand Down Expand Up @@ -73,7 +73,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write(
_: JClass,
op: *mut Operator,
path: JString,
content: JString,
content: JByteArray,
) -> jlong {
intern_write(&mut env, op, path, content).unwrap_or_else(|e| {
e.throw(&mut env);
Expand All @@ -85,13 +85,13 @@ fn intern_write(
env: &mut JNIEnv,
op: *mut Operator,
path: JString,
content: JString,
content: JByteArray,
) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

let path = env.get_string(&path)?.to_str()?.to_string();
let content = env.get_string(&content)?.to_str()?.to_string();
let content = env.convert_byte_array(content)?;

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
Expand All @@ -102,10 +102,52 @@ fn intern_write(
Ok(id)
}

async fn do_write(op: &mut Operator, path: String, content: String) -> Result<()> {
async fn do_write(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.write(&path, content).await?)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_append(
mut env: JNIEnv,
_: JClass,
op: *mut Operator,
path: JString,
content: JByteArray,
) -> jlong {
intern_append(&mut env, op, path, content).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_append(
env: &mut JNIEnv,
op: *mut Operator,
path: JString,
content: JByteArray,
) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

let path = env.get_string(&path)?.to_str()?.to_string();
let content = env.convert_byte_array(content)?;

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
let result = do_append(op, path, content).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});

Ok(id)
}

async fn do_append(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.append(&path, content).await?)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cucumber.java.en.When;
import java.util.HashMap;
import java.util.Map;
import lombok.Cleanup;

public class AsyncStepsTest {
Operator op;
Expand All @@ -45,19 +46,19 @@ public void async_write_path_test_with_content_hello_world(String path, String c

@Then("The async file {string} should exist")
public void the_async_file_test_should_exist(String path) {
Metadata metadata = op.stat(path).join();
@Cleanup Metadata metadata = op.stat(path).join();
assertNotNull(metadata);
}

@Then("The async file {string} entry mode must be file")
public void the_async_file_test_entry_mode_must_be_file(String path) {
Metadata metadata = op.stat(path).join();
@Cleanup Metadata metadata = op.stat(path).join();
assertTrue(metadata.isFile());
}

@Then("The async file {string} content length must be {int}")
public void the_async_file_test_content_length_must_be_13(String path, int length) {
Metadata metadata = op.stat(path).join();
@Cleanup Metadata metadata = op.stat(path).join();
assertEquals(metadata.getContentLength(), length);
}

Expand Down
49 changes: 35 additions & 14 deletions bindings/java/src/test/java/org/apache/opendal/OperatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,26 @@
package org.apache.opendal;

import static org.assertj.core.api.Assertions.assertThat;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class OperatorTest {
private Operator op;
@TempDir
private static Path tempDir;

@BeforeEach
public void init() {
@Test
public void testCreateAndDelete() {
Map<String, String> params = new HashMap<>();
params.put("root", "/tmp");
this.op = new Operator("Memory", params);
}

@AfterEach
public void clean() {
this.op.close();
}
@Cleanup Operator op = new Operator("Memory", params);

@Test
public void testCreateAndDelete() {
op.write("testCreateAndDelete", "Odin").join();
assertThat(op.read("testCreateAndDelete").join()).isEqualTo("Odin");
op.delete("testCreateAndDelete").join();
Expand All @@ -57,4 +53,29 @@ public void testCreateAndDelete() {
})
.join();
}

@Test
public void testAppendManyTimes() {
Map<String, String> params = new HashMap<>();
params.put("root", tempDir.toString());
@Cleanup Operator op = new Operator("fs", params);

String[] trunks = new String[] {"first trunk", "second trunk", "third trunk"};

for (int i = 0; i < trunks.length; i++) {
op.append("testAppendManyTimes", trunks[i]).join();
String expected = Arrays.stream(trunks).limit(i + 1).collect(Collectors.joining());
assertThat(op.read("testAppendManyTimes").join()).isEqualTo(expected);
}

// write overwrite existing content
op.write("testAppendManyTimes", "new attempt").join();
assertThat(op.read("testAppendManyTimes").join()).isEqualTo("new attempt");

for (int i = 0; i < trunks.length; i++) {
op.append("testAppendManyTimes", trunks[i]).join();
String expected = Arrays.stream(trunks).limit(i + 1).collect(Collectors.joining());
assertThat(op.read("testAppendManyTimes").join()).isEqualTo("new attempt" + expected);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cucumber.java.en.When;
import java.util.HashMap;
import java.util.Map;
import lombok.Cleanup;

public class StepsTest {
BlockingOperator op;
Expand All @@ -45,19 +46,19 @@ public void blocking_write_path_test_with_content_hello_world(String path, Strin

@Then("The blocking file {string} should exist")
public void the_blocking_file_test_should_exist(String path) {
Metadata metadata = op.stat(path);
@Cleanup Metadata metadata = op.stat(path);
assertNotNull(metadata);
}

@Then("The blocking file {string} entry mode must be file")
public void the_blocking_file_test_entry_mode_must_be_file(String path) {
Metadata metadata = op.stat(path);
@Cleanup Metadata metadata = op.stat(path);
assertTrue(metadata.isFile());
}

@Then("The blocking file {string} content length must be {int}")
public void the_blocking_file_test_content_length_must_be_13(String path, int length) {
Metadata metadata = op.stat(path);
@Cleanup Metadata metadata = op.stat(path);
assertEquals(metadata.getContentLength(), length);
}

Expand Down