Skip to content

Commit

Permalink
enable test with latest server (#20)
Browse files Browse the repository at this point in the history
* add source config

* enable test with latest server

* enable test with latest server
  • Loading branch information
Nicole00 authored Sep 3, 2021
1 parent fc01ce6 commit 11c7285
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 52 deletions.
48 changes: 48 additions & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# This workflow will build a Java project with Maven
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven

name: Java CI with Maven

on:
push:
branches: [ master ]
pull_request:
branches:
- master
- 'v[0-9]+.*'
schedule:
- cron: '0 6 * * *'

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8

- name: Cache the Maven packages to speed up build
uses: actions/cache@v2
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-maven-

- name: Install nebula-graph
run: |
mkdir tmp
pushd tmp
git clone https://github.com/vesoft-inc/nebula-docker-compose.git
pushd nebula-docker-compose/
cp ../../client/src/test/resources/docker-compose.yaml .
docker-compose up -d
sleep 10
popd
popd
- name: Build with Maven
run: mvn -B package
4 changes: 3 additions & 1 deletion connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
<goal>jar</goal>
</goals>
<configuration>
<sourcepath>src/main/java</sourcepath>
<encoding>UTF-8</encoding>
<charset>UTF-8</charset>
<doclint>none</doclint>
Expand Down Expand Up @@ -204,7 +205,8 @@
<exclude>javax.inject:javax.inject</exclude>
<exclude>org.spark-project.hive:hive-exec</exclude>
<exclude>stax:stax-api</exclude>
<exclude>org.glassfish.hk2.external:aopalliance-repackaged</exclude>
<exclude>org.glassfish.hk2.external:aopalliance-repackaged
</exclude>
</excludes>
</artifactSet>
<filters>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String createValue(Row row, PolicyEnum policy) {
String propName = pos2Field.get(i);
if (propName == null || !schema.containsKey(propName)) {
throw new IllegalArgumentException("position " + i + " or field " + propName
+ "does not exist.");
+ " does not exist.");
}
int type = schema.get(propName);
vertexProps.add(NebulaUtils.extraValue(row.getField(i), type));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ public void testIsNumeric() {
}

public void testExtraValue() {
assert (null == NebulaUtils.extraValue(null, PropertyType.STRING));
assert ("\"\"".equals(NebulaUtils.extraValue("", PropertyType.STRING)));
assert ("\"\"".equals(NebulaUtils.extraValue("", PropertyType.FIXED_STRING)));
assert ("1".equals(NebulaUtils.extraValue(1, PropertyType.INT8)));
assert (null == NebulaUtils.extraValue(null, PropertyType.STRING.getValue()));
assert ("\"\"".equals(NebulaUtils.extraValue("", PropertyType.STRING.getValue())));
assert ("\"\"".equals(NebulaUtils.extraValue("", PropertyType.FIXED_STRING.getValue())));
assert ("1".equals(NebulaUtils.extraValue(1, PropertyType.INT8.getValue())));
assert ("timestamp(\"2021-01-01T12:12:12\")".equals(
NebulaUtils.extraValue("2021-01-01T12:12:12", PropertyType.TIMESTAMP)));
NebulaUtils.extraValue("2021-01-01T12:12:12", PropertyType.TIMESTAMP.getValue())));
assert ("datetime(\"2021-01-01T12:12:12\")".equals(
NebulaUtils.extraValue("2021-01-01T12:12:12", PropertyType.DATETIME)));
NebulaUtils.extraValue("2021-01-01T12:12:12", PropertyType.DATETIME.getValue())));
assert ("date(\"2021-01-01\")".equals(NebulaUtils.extraValue("2021-01-01",
PropertyType.DATE)));
assert ("time(\"12:12:12\")".equals(NebulaUtils.extraValue("12:12:12", PropertyType.TIME)));
PropertyType.DATE.getValue())));
assert ("time(\"12:12:12\")".equals(NebulaUtils.extraValue("12:12:12",
PropertyType.TIME.getValue())));
}

public void testMkString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

package org.apache.flink.sink;

import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand All @@ -18,10 +23,16 @@
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.types.Row;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbstractNebulaOutPutFormatTest extends TestCase {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractNebulaOutPutFormatTest.class);

@Test
public void testWrite() throws IOException {
mockNebulaData();
List<String> cols = Arrays.asList("name", "age");
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
Expand All @@ -34,8 +45,8 @@ public void testWrite() throws IOException {

NebulaClientOptions clientOptions = new NebulaClientOptions
.NebulaClientOptionsBuilder()
.setMetaAddress("127.0.0.1:45500")
.setGraphAddress("127.0.0.1:3699")
.setMetaAddress("127.0.0.1:9559")
.setGraphAddress("127.0.0.1:9669")
.build();
NebulaGraphConnectionProvider graphProvider =
new NebulaGraphConnectionProvider(clientOptions);
Expand All @@ -47,10 +58,41 @@ public void testWrite() throws IOException {

NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphProvider, metaProvider)
.setExecutionOptions(executionOptions);
.setExecutionOptions(executionOptions);

outPutFormat.open(1, 2);
outPutFormat.writeRecord(row);
}

}
private void mockNebulaData() {
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
nebulaPoolConfig.setMaxConnSize(100);
List<HostAddress> addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669));
NebulaPool pool = new NebulaPool();
Session session = null;
try {
pool.init(addresses, nebulaPoolConfig);
session = pool.getSession("root", "nebula", true);

String createSpace = "CREATE SPACE IF NOT EXISTS flinkSink(partition_num=10,"
+ "vid_type=fixed_string(8));"
+ "USE flinkSink;"
+ "CREATE TAG IF NOT EXISTS player(name string, age int);";
ResultSet resp = session.execute(createSpace);

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!resp.isSucceeded()) {
LOGGER.error("create flinkSink space failed, " + resp.getErrorMessage());
assert (false);
}

} catch (Exception e) {
e.printStackTrace();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ public class NebulaOutputFormatConverterTest {

@Before
public void before() {
schema.put("src", PropertyType.STRING);
schema.put("dst", PropertyType.STRING);
schema.put("degree", PropertyType.DOUBLE);
schema.put("date", PropertyType.DATE);
schema.put("datetime", PropertyType.DATETIME);
schema.put("time", PropertyType.TIME);
schema.put("name", PropertyType.STRING);
schema.put("age", PropertyType.INT16);
schema.put("aaa", PropertyType.DOUBLE);
schema.put("bbb", PropertyType.INT16);
schema.put("src", PropertyType.STRING.getValue());
schema.put("dst", PropertyType.STRING.getValue());
schema.put("degree", PropertyType.DOUBLE.getValue());
schema.put("date", PropertyType.DATE.getValue());
schema.put("datetime", PropertyType.DATETIME.getValue());
schema.put("time", PropertyType.TIME.getValue());
schema.put("name", PropertyType.STRING.getValue());
schema.put("age", PropertyType.INT16.getValue());
schema.put("aaa", PropertyType.DOUBLE.getValue());
schema.put("bbb", PropertyType.INT16.getValue());

row.setField(0, 2);
row.setField(1, "Tom");
Expand Down
Loading

0 comments on commit 11c7285

Please sign in to comment.