From e10c2f56127a4110aa04f33e6a1216944040da48 Mon Sep 17 00:00:00 2001 From: Ian Robinson Date: Thu, 21 Feb 2019 15:40:24 +0000 Subject: [PATCH 1/5] Update README to point to sample --- ...rt-from-mysql-to-neptune-incremental.ipynb | 143 ------------------ .../export-from-mysql-to-neptune.ipynb | 106 ------------- .../examples/export-from-mysql-to-s3.ipynb | 102 ------------- glue-neptune/readme.md | 8 +- 4 files changed, 2 insertions(+), 357 deletions(-) delete mode 100644 glue-neptune/examples/export-from-mysql-to-neptune-incremental.ipynb delete mode 100644 glue-neptune/examples/export-from-mysql-to-neptune.ipynb delete mode 100644 glue-neptune/examples/export-from-mysql-to-s3.ipynb diff --git a/glue-neptune/examples/export-from-mysql-to-neptune-incremental.ipynb b/glue-neptune/examples/export-from-mysql-to-neptune-incremental.ipynb deleted file mode 100644 index 1f08f41a82..0000000000 --- a/glue-neptune/examples/export-from-mysql-to-neptune-incremental.ipynb +++ /dev/null @@ -1,143 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import sys, boto3, os, datetime\n", - "\n", - "from awsglue.utils import getResolvedOptions\n", - "from pyspark.context import SparkContext\n", - "from awsglue.context import GlueContext\n", - "from awsglue.job import Job\n", - "from awsglue.transforms import ApplyMapping\n", - "from awsglue.transforms import RenameField\n", - "from awsglue.transforms import SelectFields\n", - "from awsglue.dynamicframe import DynamicFrame\n", - "from pyspark.sql.functions import lit\n", - "from pyspark.sql.functions import format_string\n", - "from pyspark.sql.functions import col\n", - "from glue_neptune.NeptuneConnectionInfo import NeptuneConnectionInfo\n", - "from glue_neptune.NeptuneGremlinClient import NeptuneGremlinClient\n", - "from glue_neptune.GremlinCsvTransforms import GremlinCsvTransforms\n", - "from gremlin_python import statics\n", - "from gremlin_python.structure.graph import Graph\n", - "from gremlin_python.process.graph_traversal import __\n", - "from gremlin_python.process.strategies import *\n", - "from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection\n", - "from gremlin_python.process.traversal import *\n", - "\n", - "glueContext = GlueContext(sc)\n", - " \n", - "job = Job(glueContext)\n", - "job.init('mysql-to-neptune', {})\n", - "\n", - "database = \"sales-order\"\n", - "order_table = \"salesdb_sales_order\"\n", - "order_detail_table = \"salesdb_sales_order_detail\"\n", - "\n", - "gremlin_endpoint = NeptuneConnectionInfo(glueContext).neptune_endpoint('neptune')\n", - "neptune = NeptuneGremlinClient(gremlin_endpoint)\n", - "\n", - "def get_last_checkpoint (client, tablename):\n", - " conn = client.remote_connection()\n", - " g = client.traversal_source(conn)\n", - " checkpoint= (g.V().hasLabel('Checkpoint').has('table', tablename).fold().coalesce(\n", - " __.unfold(),\n", - " __.addV('Checkpoint').\n", - " property('table', tablename).\n", - " property('value', datetime.datetime(2015, 1, 1, 0, 0))).\n", - " values('value').\n", - " next())\n", - " conn.close()\n", - " return checkpoint\n", - " \n", - "def update_checkpoint (client, tablename, checkpoint):\n", - " conn = client.remote_connection()\n", - " g = client.traversal_source(conn)\n", - " g.V().hasLabel('Checkpoint').has('table', tablename).property(Cardinality.single, 'value', checkpoint).next()\n", - " conn.close()\n", - " return True\n", - " \n", - "checkpoint = get_last_checkpoint(neptune, order_table)\n", - "newcheckpoint = checkpoint + datetime.timedelta(days=1)\n", - "\n", - "print(\"Last checkpoint: \"+ str(checkpoint))\n", - "print(\"New checkpoint : \"+ str(newcheckpoint))\n", - "\n", - "print \"Creating Order vertices...\"\n", - "\n", - "datasource0 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = order_table, transformation_ctx = \"datasource0\")\n", - "df0 = datasource0.toDF().filter(col(\"ORDER_DATE\") == checkpoint)\n", - "datasource1 = DynamicFrame.fromDF(df0, glueContext,'datasource1')\n", - "\n", - "print \"Total orders : \"+str(datasource0.count())\n", - "print \"Orders for checkpoint: \"+str(datasource1.count())\n", - "\n", - "applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = [(\"ORDER_DATE\", \"timestamp\", \"orderDate\", \"string\"), (\"SHIP_MODE\", \"string\", \"shipMode\", \"string\"), (\"SITE_ID\", \"double\", \"siteId\", \"int\"), (\"ORDER_ID\", \"int\", \"orderId\", \"int\")], transformation_ctx = \"applymapping1\")\n", - "applymapping1 = GremlinCsvTransforms.create_prefixed_columns(applymapping1, [('~id', 'orderId', 'o')])\n", - "selectfields1 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"orderDate\", \"shipMode\"], transformation_ctx = \"selectfields1\")\n", - "\n", - "selectfields1.toDF().foreachPartition(neptune.add_vertices('Order'))\n", - "\n", - "print \"Creating OrderDetail vertices...\"\n", - "\n", - "datasource2 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = order_detail_table, transformation_ctx = \"datasource1\")\n", - "datasource3 = datasource2.join( [\"ORDER_ID\"],[\"ORDER_ID\"], datasource1, transformation_ctx = \"join\")\n", - "\n", - "print \"Total order details : \"+str(datasource2.count())\n", - "print \"Order details for checkpoint: \"+str(datasource3.count())\n", - "\n", - "applymapping2 = ApplyMapping.apply(frame = datasource3, mappings = [(\"DISCOUNT\", \"decimal(10,2)\", \"discount\", \"string\"), (\"UNIT_PRICE\", \"decimal(10,2)\", \"unitPrice\", \"string\"), (\"TAX\", \"decimal(10,2)\", \"tax\", \"string\"), (\"SUPPLY_COST\", \"decimal(10,2)\", \"supplyCost\", \"string\"), (\"PRODUCT_ID\", \"int\", \"productId\", \"int\"), (\"QUANTITY\", \"int\", \"quantity\", \"int\"), (\"LINE_ID\", \"int\", \"lineId\", \"int\"), (\"LINE_NUMBER\", \"int\", \"lineNumber\", \"int\"), (\"ORDER_ID\", \"int\", \"orderId\", \"int\")], transformation_ctx = \"applymapping2\")\n", - "applymapping2 = GremlinCsvTransforms.create_prefixed_columns(applymapping2, [('~id', 'lineId', 'od')])\n", - "selectfields2 = SelectFields.apply(frame = applymapping2, paths = [\"~id\", \"lineNumber\", \"quantity\", \"unitPrice\", \"discount\", \"supplyCost\", \"tax\"], transformation_ctx = \"selectfields2\")\n", - "\n", - "selectfields2.toDF().foreachPartition(neptune.add_vertices('OrderDetail'))\n", - "\n", - "print \"Creating ORDER_DETAIL edges...\"\n", - "\n", - "applymapping3 = RenameField.apply(applymapping2, \"~id\", \"~to\")\n", - "applymapping3 = GremlinCsvTransforms.create_prefixed_columns(applymapping3, [('~from', 'orderId', 'o')])\n", - "applymapping3 = GremlinCsvTransforms.create_edge_id_column(applymapping3, '~from', '~to')\n", - "selectfields3 = SelectFields.apply(frame = applymapping3, paths = [\"~id\", \"~from\", \"~to\", \"lineNumber\"], transformation_ctx = \"selectfields3\")\n", - "\n", - "selectfields3.toDF().foreachPartition(neptune.add_edges('ORDER_DETAIL'))\n", - "\n", - "print \"Creating PRODUCT edges...\"\n", - "\n", - "applymapping4 = RenameField.apply(applymapping2, \"~id\", \"~from\")\n", - "applymapping4 = GremlinCsvTransforms.create_prefixed_columns(applymapping4, [('~to', 'productId', 'p')])\n", - "applymapping4 = GremlinCsvTransforms.create_edge_id_column(applymapping4, '~from', '~to')\n", - "selectfields4 = SelectFields.apply(frame = applymapping4, paths = [\"~id\", \"~from\", \"~to\"], transformation_ctx = \"selectfields4\")\n", - "\n", - "selectfields4.toDF().foreachPartition(neptune.add_edges('PRODUCT'))\n", - "\n", - "update_checkpoint(neptune, order_table, newcheckpoint)\n", - "\n", - "job.commit()\n", - "\n", - "print('Done')" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Sparkmagic (PySpark)", - "language": "", - "name": "pysparkkernel" - }, - "language_info": { - "codemirror_mode": { - "name": "python", - "version": 2 - }, - "mimetype": "text/x-python", - "name": "pyspark", - "pygments_lexer": "python2" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/glue-neptune/examples/export-from-mysql-to-neptune.ipynb b/glue-neptune/examples/export-from-mysql-to-neptune.ipynb deleted file mode 100644 index 6b21f8cc65..0000000000 --- a/glue-neptune/examples/export-from-mysql-to-neptune.ipynb +++ /dev/null @@ -1,106 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import sys, boto3, os\n", - "\n", - "from awsglue.utils import getResolvedOptions\n", - "from pyspark.context import SparkContext\n", - "from awsglue.context import GlueContext\n", - "from awsglue.job import Job\n", - "from awsglue.transforms import ApplyMapping\n", - "from awsglue.transforms import RenameField\n", - "from awsglue.transforms import SelectFields\n", - "from awsglue.dynamicframe import DynamicFrame\n", - "from pyspark.sql.functions import lit\n", - "from pyspark.sql.functions import format_string\n", - "from gremlin_python import statics\n", - "from gremlin_python.structure.graph import Graph\n", - "from gremlin_python.process.graph_traversal import __\n", - "from gremlin_python.process.strategies import *\n", - "from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection\n", - "from gremlin_python.process.traversal import *\n", - "from glue_neptune.NeptuneConnectionInfo import NeptuneConnectionInfo\n", - "from glue_neptune.NeptuneGremlinClient import NeptuneGremlinClient\n", - "from glue_neptune.GremlinCsvTransforms import GremlinCsvTransforms\n", - "\n", - "glueContext = GlueContext(sc)\n", - " \n", - "job = Job(glueContext)\n", - "job.init('mysql-to-neptune', {})\n", - "\n", - "database = \"sales-order\"\n", - "product_table = \"salesdb_product\"\n", - "product_category_table = \"salesdb_product_category\"\n", - "supplier_table = \"salesdb_supplier\"\n", - "\n", - "gremlin_endpoint = NeptuneConnectionInfo(glueContext).neptune_endpoint('neptune')\n", - "neptune = NeptuneGremlinClient(gremlin_endpoint)\n", - "\n", - "# Product vertices\n", - "\n", - "print \"Creating Product vertices...\"\n", - "\n", - "datasource0 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = product_table, transformation_ctx = \"datasource0\")\n", - "datasource1 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = product_category_table, transformation_ctx = \"datasource1\")\n", - "datasource2 = datasource0.join( [\"CATEGORY_ID\"],[\"CATEGORY_ID\"], datasource1, transformation_ctx = \"join\")\n", - "\n", - "applymapping1 = ApplyMapping.apply(frame = datasource2, mappings = [(\"NAME\", \"string\", \"name:String\", \"string\"), (\"UNIT_PRICE\", \"decimal(10,2)\", \"unitPrice\", \"string\"), (\"PRODUCT_ID\", \"int\", \"productId\", \"int\"), (\"QUANTITY_PER_UNIT\", \"int\", \"quantityPerUnit:Int\", \"int\"), (\"CATEGORY_ID\", \"int\", \"category_id\", \"int\"), (\"SUPPLIER_ID\", \"int\", \"supplierId\", \"int\"), (\"CATEGORY_NAME\", \"string\", \"category:String\", \"string\"), (\"DESCRIPTION\", \"string\", \"description:String\", \"string\"), (\"IMAGE_URL\", \"string\", \"imageUrl:String\", \"string\")], transformation_ctx = \"applymapping1\")\n", - "applymapping1 = GremlinCsvTransforms.create_prefixed_columns(applymapping1, [('~id', 'productId', 'p'),('~to', 'supplierId', 's')])\n", - "selectfields1 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"name:String\", \"category:String\", \"description:String\", \"unitPrice\", \"quantityPerUnit:Int\", \"imageUrl:String\"], transformation_ctx = \"selectfields1\")\n", - "\n", - "selectfields1.toDF().foreachPartition(neptune.upsert_vertices('Product'))\n", - "\n", - "# Supplier vertices\n", - "\n", - "print \"Creating Supplier vertices...\"\n", - "\n", - "datasource3 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = supplier_table, transformation_ctx = \"datasource3\")\n", - "\n", - "applymapping2 = ApplyMapping.apply(frame = datasource3, mappings = [(\"COUNTRY\", \"string\", \"country:String\", \"string\"), (\"ADDRESS\", \"string\", \"address:String\", \"string\"), (\"NAME\", \"string\", \"name:String\", \"string\"), (\"STATE\", \"string\", \"state:String\", \"string\"), (\"SUPPLIER_ID\", \"int\", \"supplierId\", \"int\"), (\"CITY\", \"string\", \"city:String\", \"string\"), (\"PHONE\", \"string\", \"phone:String\", \"string\")], transformation_ctx = \"applymapping1\")\n", - "applymapping2 = GremlinCsvTransforms.create_prefixed_columns(applymapping2, [('~id', 'supplierId', 's')])\n", - "selectfields3 = SelectFields.apply(frame = applymapping2, paths = [\"~id\", \"country:String\", \"address:String\", \"city:String\", \"phone:String\", \"name:String\", \"state:String\"], transformation_ctx = \"selectfields3\")\n", - "\n", - "selectfields3.toDF().foreachPartition(neptune.upsert_vertices('Supplier'))\n", - "\n", - "# SUPPLIER edges\n", - "\n", - "print \"Creating SUPPLIER edges...\"\n", - "\n", - "applymapping1 = RenameField.apply(applymapping1, \"~id\", \"~from\")\n", - "applymapping1 = GremlinCsvTransforms.create_edge_id_column(applymapping1, '~from', '~to')\n", - "selectfields2 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"~from\", \"~to\"], transformation_ctx = \"selectfields2\")\n", - " \n", - "selectfields2.toDF().foreachPartition(neptune.upsert_edges('SUPPLIER'))\n", - "\n", - "# End\n", - "\n", - "job.commit()\n", - "\n", - "print('Done')" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Sparkmagic (PySpark)", - "language": "", - "name": "pysparkkernel" - }, - "language_info": { - "codemirror_mode": { - "name": "python", - "version": 2 - }, - "mimetype": "text/x-python", - "name": "pyspark", - "pygments_lexer": "python2" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/glue-neptune/examples/export-from-mysql-to-s3.ipynb b/glue-neptune/examples/export-from-mysql-to-s3.ipynb deleted file mode 100644 index a005fcf0ef..0000000000 --- a/glue-neptune/examples/export-from-mysql-to-s3.ipynb +++ /dev/null @@ -1,102 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import sys\n", - "from awsglue.utils import getResolvedOptions\n", - "from pyspark.context import SparkContext\n", - "from awsglue.context import GlueContext\n", - "from awsglue.job import Job\n", - "from awsglue.transforms import ApplyMapping\n", - "from awsglue.transforms import RenameField\n", - "from awsglue.transforms import SelectFields\n", - "from awsglue.dynamicframe import DynamicFrame\n", - "from pyspark.sql.functions import lit\n", - "from pyspark.sql.functions import format_string\n", - "from glue_neptune.GremlinCsvTransforms import GremlinCsvTransforms\n", - "\n", - "glueContext = GlueContext(sc)\n", - "\n", - "job = Job(glueContext)\n", - "job.init('rds-2-neptune', {})\n", - "\n", - "nodes_path = 's3://'\n", - "edges_path = 's3://'\n", - "\n", - "database = \"sales-order\"\n", - "product_table = \"salesdb_product\"\n", - "product_category_table = \"salesdb_product_category\"\n", - "supplier_table = \"salesdb_supplier\"\n", - " \n", - "def writeCsvFile(datasource, path):\n", - " dataframe = DynamicFrame.toDF(datasource).repartition(1)\n", - " datasource = DynamicFrame.fromDF(dataframe, glueContext, 'write-csv')\n", - " glueContext.write_dynamic_frame.from_options(frame = datasource, connection_type = \"s3\", connection_options = {\"path\": path}, format = \"csv\", transformation_ctx = \"write-csv\") \n", - "\n", - "# Product vertices\n", - "\n", - "print \"Creating Product vertices...\"\n", - "\n", - "datasource0 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = product_table, transformation_ctx = \"datasource0\")\n", - "datasource1 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = product_category_table, transformation_ctx = \"datasource1\")\n", - "datasource2 = datasource0.join( [\"CATEGORY_ID\"],[\"CATEGORY_ID\"], datasource1, transformation_ctx = \"join\")\n", - "\n", - "applymapping1 = ApplyMapping.apply(frame = datasource2, mappings = [(\"NAME\", \"string\", \"name:String\", \"string\"), (\"UNIT_PRICE\", \"decimal(10,2)\", \"unitPrice\", \"decimal(10,2)\"), (\"PRODUCT_ID\", \"int\", \"productId\", \"int\"), (\"QUANTITY_PER_UNIT\", \"int\", \"quantityPerUnit:Int\", \"int\"), (\"CATEGORY_ID\", \"int\", \"category_id\", \"int\"), (\"SUPPLIER_ID\", \"int\", \"supplierId\", \"int\"), (\"CATEGORY_NAME\", \"string\", \"category:String\", \"string\"), (\"DESCRIPTION\", \"string\", \"description:String\", \"string\"), (\"IMAGE_URL\", \"string\", \"imageUrl:String\", \"string\")], transformation_ctx = \"applymapping1\")\n", - "applymapping1 = GremlinCsvTransforms.create_prefixed_columns(applymapping1, [('~id', 'productId', 'p'),('~to', 'supplierId', 's')])\n", - "selectfields1 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"name:String\", \"category:String\", \"description:String\", \"unitPrice\", \"quantityPerUnit:Int\", \"imageUrl:String\"], transformation_ctx = \"selectfields1\")\n", - "\n", - "writeCsvFile(GremlinCsvTransforms.addLabel(selectfields1, 'Product'), nodes_path)\n", - "\n", - "# SUPPLIER edges\n", - "\n", - "print \"Creating SUPPLIER edges...\"\n", - "\n", - "applymapping1 = RenameField.apply(applymapping1, \"~id\", \"~from\")\n", - "applymapping1 = GremlinCsvTransforms.create_edge_id_column(applymapping1, '~from', '~to')\n", - "selectfields2 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"~from\", \"~to\"], transformation_ctx = \"selectfields2\")\n", - "\n", - "writeCsvFile(GremlinCsvTransforms.addLabel(selectfields2, 'SUPPLIER'), edges_path)\n", - "\n", - "# Supplier vertices\n", - "\n", - "print \"Creating Supplier vertices...\"\n", - "\n", - "datasource3 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = supplier_table, transformation_ctx = \"datasource3\")\n", - "\n", - "applymapping2 = ApplyMapping.apply(frame = datasource3, mappings = [(\"COUNTRY\", \"string\", \"country:String\", \"string\"), (\"ADDRESS\", \"string\", \"address:String\", \"string\"), (\"NAME\", \"string\", \"name:String\", \"string\"), (\"STATE\", \"string\", \"state:String\", \"string\"), (\"SUPPLIER_ID\", \"int\", \"supplierId\", \"int\"), (\"CITY\", \"string\", \"city:String\", \"string\"), (\"PHONE\", \"string\", \"phone:String\", \"string\")], transformation_ctx = \"applymapping1\")\n", - "applymapping2 = GremlinCsvTransforms.create_prefixed_columns(applymapping2, [('~id', 'supplierId', 's')])\n", - "selectfields3 = SelectFields.apply(frame = applymapping2, paths = [\"~id\", \"country:String\", \"address:String\", \"city:String\", \"phone:String\", \"name:String\", \"state:String\"], transformation_ctx = \"selectfields3\")\n", - "\n", - "writeCsvFile(GremlinCsvTransforms.addLabel(selectfields3, 'Supplier'), nodes_path)\n", - "\n", - "# End\n", - "\n", - "job.commit()\n", - "\n", - "print \"Done\"" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Sparkmagic (PySpark)", - "language": "", - "name": "pysparkkernel" - }, - "language_info": { - "codemirror_mode": { - "name": "python", - "version": 2 - }, - "mimetype": "text/x-python", - "name": "pyspark", - "pygments_lexer": "python2" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/glue-neptune/readme.md b/glue-neptune/readme.md index 3ecb31b469..3a59959550 100644 --- a/glue-neptune/readme.md +++ b/glue-neptune/readme.md @@ -18,13 +18,9 @@ You can then refer to this library from your Glue Development Endpoint or Glue j ## Examples -The _examples_ directory contains 3 example Glue jobs: - - - _export-from-mysql-to-s3_ – Shows how to export from several MySQL tables to CSV formatted in accordance with the Neptune CSV bulk load format. These files are saved to S3, ready to be bulk loaded into Neptune. - - _export-from-mysql-to-neptune_ – Shows how to export direct from several MySQL tables into Neptune. Nodes and edges are written conditionally to the database using user-supplied IDs. - - _export-from-mysql-to-neptune-incremental_ – Shows how to perform an incremental load from MySQL to Neptune using checkpoint information that is written to a Neptune vertex. +See [Migrating from MySQL to Amazon Neptune using AWS Glue](https://github.com/iansrobinson/amazon-neptune-samples/tree/master/gremlin/glue-neptune). - ## Cross Account/Region Datasources +## Cross Account/Region Datasources If you have a datasource in a different region and/or different account from Glue and your Neptune database, you can follow the instructions in this [blog](https://aws.amazon.com/blogs/big-data/create-cross-account-and-cross-region-aws-glue-connections/) to allow access. From 9e134c2c0f21e60f2cd8b1e0a6ba3af5341e727b Mon Sep 17 00:00:00 2001 From: Ian Robinson Date: Fri, 22 Feb 2019 10:26:55 +0000 Subject: [PATCH 2/5] Reduce dependencies and size of jar --- neptune-export/pom.xml | 3 ++- .../services/neptune/NeptuneExportBaseCommand.java | 11 ++++++----- .../amazonaws/services/neptune/NeptuneExportCli.java | 3 ++- .../services/neptune/rdf/NeptuneSparqlClient.java | 6 +++--- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/neptune-export/pom.xml b/neptune-export/pom.xml index 8830d98a2a..116753844e 100644 --- a/neptune-export/pom.xml +++ b/neptune-export/pom.xml @@ -75,11 +75,12 @@ junit junit 4.12 + test org.eclipse.rdf4j - rdf4j-runtime + rdf4j-repository-sparql [2.4.0,) diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportBaseCommand.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportBaseCommand.java index c62a80801d..87b5078785 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportBaseCommand.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportBaseCommand.java @@ -5,13 +5,14 @@ import com.github.rvesse.airline.annotations.restrictions.*; import java.io.File; +import java.util.ArrayList; import java.util.List; -public class NeptuneExportBaseCommand { +public abstract class NeptuneExportBaseCommand { - @Option(name = {"-e", "--endpoint"}, description = "Neptune endpoint(s) – supply multiple instance endpoints if you want to load balance requests across a cluster", title="endpoint") + @Option(name = {"-e", "--endpoint"}, description = "Neptune endpoint(s) – supply multiple instance endpoints if you want to load balance requests across a cluster", title = "endpoint") @Required - protected List endpoints; + protected List endpoints = new ArrayList<>(); @Option(name = {"-p", "--port"}, description = "Neptune port (optional, default 8182)") @Port(acceptablePorts = {PortType.SYSTEM, PortType.USER}) @@ -56,11 +57,11 @@ public class NeptuneExportBaseCommand { @Once protected int loadBalancerPort = 80; - public ConnectionConfig connectionConfig(){ + public ConnectionConfig connectionConfig() { return new ConnectionConfig(endpoints, port, networkLoadBalancerEndpoint, applicationLoadBalancerEndpoint, loadBalancerPort, useIamAuth, useSsl); } - public void setLoggingLevel(){ + public void applyLogLevel() { System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", logLevel); } } diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportCli.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportCli.java index 17538a8b5d..59a96dd3e3 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportCli.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportCli.java @@ -12,6 +12,7 @@ package com.amazonaws.services.neptune; +import com.github.rvesse.airline.Accessor; import com.github.rvesse.airline.annotations.Alias; import com.github.rvesse.airline.annotations.Cli; import com.github.rvesse.airline.annotations.Parser; @@ -46,7 +47,7 @@ public static void main(String[] args) { Runnable cmd = cli.parse(args); if (NeptuneExportBaseCommand.class.isAssignableFrom(cmd.getClass())) { - ((NeptuneExportBaseCommand) cmd).setLoggingLevel(); + ((NeptuneExportBaseCommand) cmd).applyLogLevel(); } cmd.run(); diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/rdf/NeptuneSparqlClient.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/rdf/NeptuneSparqlClient.java index db95fc59a5..570e1d705a 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/rdf/NeptuneSparqlClient.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/rdf/NeptuneSparqlClient.java @@ -63,14 +63,14 @@ public static NeptuneSparqlClient create(Collection endpoints, int port, throw new RuntimeException(e1); } }). - peek(AbstractRepository::initialize). + peek(AbstractRepository::init). collect(Collectors.toList())); } else { return new NeptuneSparqlClient( endpoints.stream().map(e -> updateParser(new SPARQLRepository(sparqlEndpount(e, port)))). - peek(AbstractRepository::initialize). + peek(AbstractRepository::init). collect(Collectors.toList())); } } @@ -172,7 +172,7 @@ private SPARQLRepository chooseRepository() { } @Override - public void close() throws Exception { + public void close() { repositories.forEach(AbstractRepository::shutDown); } } From 896d446ba1765c5367246286656df73610443c6c Mon Sep 17 00:00:00 2001 From: Ian Robinson Date: Tue, 26 Feb 2019 19:03:02 +0100 Subject: [PATCH 3/5] Add --exclude-type-definitions flag. Experimental Lambda function. --- neptune-export/pom.xml | 15 +- .../services/neptune/ExportPropertyGraph.java | 13 +- .../ExportPropertyGraphFromConfig.java | 14 +- .../services/neptune/NeptuneExportLambda.java | 143 ++++++++++++++++++ .../io/ExportPropertyGraphJob.java | 8 +- .../io/ExportPropertyGraphTask.java | 10 +- .../metadata/MetadataSpecification.java | 7 +- .../util/EnvironmentVariableUtils.java | 12 ++ .../services/neptune/util/S3ObjectInfo.java | 51 +++++++ .../propertygraph/io/JsonPrinterTest.java | 1 - .../neptune/util/S3ObjectInfoTest.java | 83 ++++++++++ 11 files changed, 344 insertions(+), 13 deletions(-) create mode 100644 neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportLambda.java create mode 100644 neptune-export/src/main/java/com/amazonaws/services/neptune/util/S3ObjectInfo.java create mode 100644 neptune-export/src/test/java/com/amazonaws/services/neptune/util/S3ObjectInfoTest.java diff --git a/neptune-export/pom.xml b/neptune-export/pom.xml index 116753844e..3801b4cc60 100644 --- a/neptune-export/pom.xml +++ b/neptune-export/pom.xml @@ -41,6 +41,18 @@ 1.11.307 + + com.amazonaws + aws-java-sdk-s3 + 1.11.307 + + + + com.amazonaws + aws-lambda-java-core + 1.2.0 + + com.amazonaws amazon-neptune-sigv4-signer @@ -142,7 +154,8 @@ ${uberjar.name} - + com.amazonaws.services.neptune.NeptuneExportCli diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraph.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraph.java index 3c0921f1e7..5c04e86614 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraph.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraph.java @@ -88,6 +88,10 @@ public class ExportPropertyGraph extends NeptuneExportBaseCommand implements Run @AllowedValues(allowedValues = {"csv", "json"}) private Format format = Format.csv; + @Option(name = {"--exclude-type-definitions"}, description = "Exclude type definitions from column headers (optional, default false)") + @Once + private boolean excludeTypeDefinitions = false; + @Override public void run() { ConcurrencyConfig concurrencyConfig = new ConcurrencyConfig(concurrency, range); @@ -107,7 +111,14 @@ public void run() { new SaveMetadataConfig(metadataCollection, configFilePath).execute(); - ExportPropertyGraphJob exportJob = new ExportPropertyGraphJob(metadataSpecifications, metadataCollection, g, concurrencyConfig, directories, format); + ExportPropertyGraphJob exportJob = new ExportPropertyGraphJob( + metadataSpecifications, + metadataCollection, + g, + concurrencyConfig, + directories, + format, + !excludeTypeDefinitions); exportJob.execute(); System.err.println(format.description() + " files : " + directories.directory()); diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromConfig.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromConfig.java index d54dd8d6c4..59cc4d456e 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromConfig.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromConfig.java @@ -12,7 +12,6 @@ package com.amazonaws.services.neptune; -import com.amazonaws.services.neptune.auth.HandshakeRequestConfig; import com.amazonaws.services.neptune.io.DirectoryStructure; import com.amazonaws.services.neptune.propertygraph.ConcurrencyConfig; import com.amazonaws.services.neptune.propertygraph.NeptuneGremlinClient; @@ -77,6 +76,10 @@ public class ExportPropertyGraphFromConfig extends NeptuneExportBaseCommand impl @AllowedValues(allowedValues = {"csv", "json"}) private Format format = Format.csv; + @Option(name = {"--exclude-type-definitions"}, description = "Exclude type definitions from column headers (optional, default false)") + @Once + private boolean excludeTypeDefinitions = false; + @Override public void run() { ConcurrencyConfig concurrencyConfig = new ConcurrencyConfig(concurrency, range); @@ -90,7 +93,14 @@ public void run() { Collection> metadataSpecifications = scope.metadataSpecifications(nodeLabels, edgeLabels); - ExportPropertyGraphJob exportJob = new ExportPropertyGraphJob(metadataSpecifications, metadataCollection, g, concurrencyConfig, directories, format); + ExportPropertyGraphJob exportJob = new ExportPropertyGraphJob( + metadataSpecifications, + metadataCollection, + g, + concurrencyConfig, + directories, + format, + !excludeTypeDefinitions); exportJob.execute(); System.err.println(format.description() + " files : " + directories.directory()); diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportLambda.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportLambda.java new file mode 100644 index 0000000000..fb00986685 --- /dev/null +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportLambda.java @@ -0,0 +1,143 @@ +/* +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + http://www.apache.org/licenses/LICENSE-2.0 +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +*/ + +package com.amazonaws.services.neptune; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.LambdaLogger; +import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import com.amazonaws.services.neptune.util.EnvironmentVariableUtils; +import com.amazonaws.services.neptune.util.S3ObjectInfo; +import com.amazonaws.services.s3.transfer.*; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.*; + +public class NeptuneExportLambda implements RequestStreamHandler { + + @Override + public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { + + LambdaLogger logger = context.getLogger(); + + JsonNode json = new ObjectMapper().readTree(inputStream); + + String cmd = json.has("command") ? + json.path("command").textValue() : + EnvironmentVariableUtils.getMandatoryEnv("COMMAND"); + + String outputS3Path = json.has("outputS3Path") ? + json.path("outputS3Path").textValue() : + EnvironmentVariableUtils.getMandatoryEnv("OUTPUT_S3_PATH"); + + String configFileS3Path = json.has("configFileS3Path") ? + json.path("configFileS3Path").textValue() : + EnvironmentVariableUtils.getOptionalEnv("CONFIG_FILE_S3_PATH", ""); + + String completionFileS3Path = json.has("completionFileS3Path") ? + json.path("completionFileS3Path").textValue() : + EnvironmentVariableUtils.getOptionalEnv("COMPLETION_FILE_S3_PATH", ""); + + logger.log("cmd : " + cmd); + logger.log("outputS3Path : " + outputS3Path); + logger.log("configFileS3Path : " + configFileS3Path); + logger.log("completionFileS3Path : " + completionFileS3Path); + + S3ObjectInfo outputBaseS3ObjectInfo = new S3ObjectInfo(outputS3Path); + + TransferManager transferManager = TransferManagerBuilder.standard().build(); + + downloadConfigFile(context, logger, configFileS3Path, transferManager); + + File directory = executeCommand(cmd); + logger.log("DIRECTORY: " + directory.getAbsolutePath()); + + S3ObjectInfo outputS3ObjectInfo = outputBaseS3ObjectInfo.withNewKeySuffix(directory.getName()); + + uploadExportFilesToS3(logger, transferManager, directory, outputS3ObjectInfo); + uploadCompletionFileToS3(completionFileS3Path, transferManager, directory, outputS3ObjectInfo); + + } + + private void uploadCompletionFileToS3(String completionFileS3Path, TransferManager transferManager, File directory, S3ObjectInfo outputS3ObjectInfo) throws IOException { + if (!completionFileS3Path.isEmpty()) { + + File completionFile = new File("/tmp", directory.getName() + ".txt"); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(completionFile))) { + writer.write(outputS3ObjectInfo.toString()); + } + + S3ObjectInfo completionFileS3ObjectInfo = new S3ObjectInfo(completionFileS3Path).withNewKeySuffix(completionFile.getName()); + + Upload upload = transferManager.upload(completionFileS3ObjectInfo.bucket(), completionFileS3ObjectInfo.key(), completionFile); + try { + upload.waitForUploadResult(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private void uploadExportFilesToS3(LambdaLogger logger, TransferManager transferManager, File directory, S3ObjectInfo outputS3ObjectInfo) { + try { + + MultipleFileUpload upload = transferManager.uploadDirectory( + outputS3ObjectInfo.bucket(), + outputS3ObjectInfo.key(), + directory, + true); + + upload.waitForCompletion(); + } catch (InterruptedException e) { + logger.log(e.getMessage()); + } + } + + private void downloadConfigFile(Context context, LambdaLogger logger, String configFileS3Path, TransferManager transferManager) { + if (!configFileS3Path.isEmpty()) { + S3ObjectInfo configFileS3ObjectInfo = new S3ObjectInfo(configFileS3Path); + + logger.log("Bucket: " + configFileS3ObjectInfo.bucket()); + logger.log("Key : " + configFileS3ObjectInfo.key()); + logger.log("File : " + configFileS3ObjectInfo.createDownloadFile("/tmp").getAbsolutePath()); + + Download download = transferManager.download( + configFileS3ObjectInfo.bucket(), + configFileS3ObjectInfo.key(), + configFileS3ObjectInfo.createDownloadFile("/tmp")); + try { + download.waitForCompletion(); + } catch (InterruptedException e) { + context.getLogger().log(e.getMessage()); + } + } + } + + private File executeCommand(String cmd) throws IOException { + String[] args = cmd.split(" "); + + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + + PrintStream out = new PrintStream(output); + PrintStream old = System.out; + System.setOut(out); + + NeptuneExportCli.main(args); + + System.out.flush(); + System.setOut(old); + + return new File(output.toString().replace("\n", "")); + } + } +} diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java index e19c525909..4b3920ec5f 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java @@ -33,19 +33,22 @@ public class ExportPropertyGraphJob { private final ConcurrencyConfig concurrencyConfig; private final Directories directories; private final Format format; + private boolean includeTypeDefinitions; public ExportPropertyGraphJob(Collection> metadataSpecifications, PropertiesMetadataCollection propertiesMetadataCollection, GraphTraversalSource g, ConcurrencyConfig concurrencyConfig, Directories directories, - Format format) { + Format format, + boolean includeTypeDefinitions) { this.metadataSpecifications = metadataSpecifications; this.propertiesMetadataCollection = propertiesMetadataCollection; this.g = g; this.concurrencyConfig = concurrencyConfig; this.directories = directories; this.format = format; + this.includeTypeDefinitions = includeTypeDefinitions; } public void execute() throws Exception { @@ -68,7 +71,8 @@ public void execute() throws Exception { format, rangeFactory, status, - index); + index, + includeTypeDefinitions); taskExecutor.execute(exportTask); } diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphTask.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphTask.java index 29cf316022..3d8d5cce49 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphTask.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphTask.java @@ -35,14 +35,17 @@ public class ExportPropertyGraphTask implements Runnable, GraphElementHandler private final Status status; private final int index; private final Map> labelWriters = new HashMap<>(); + private boolean includeTypeDefinitions; public ExportPropertyGraphTask(PropertiesMetadata propertiesMetadata, LabelsFilter labelsFilter, GraphClient graphClient, WriterFactory writerFactory, - Format format, RangeFactory rangeFactory, + Format format, + RangeFactory rangeFactory, Status status, - int index) { + int index, + boolean includeTypeDefinitions) { this.propertiesMetadata = propertiesMetadata; this.labelsFilter = labelsFilter; this.graphClient = graphClient; @@ -51,6 +54,7 @@ public ExportPropertyGraphTask(PropertiesMetadata propertiesMetadata, this.rangeFactory = rangeFactory; this.status = status; this.index = index; + this.includeTypeDefinitions = includeTypeDefinitions; } @Override @@ -101,7 +105,7 @@ private void createWriterFor(String label) { } Printer printer = writerFactory.createPrinter(label, index, propertyMetadata, format); - printer.printHeaderRemainingColumns(propertyMetadata.values(), true); + printer.printHeaderRemainingColumns(propertyMetadata.values(), includeTypeDefinitions); labelWriters.put(label, writerFactory.createLabelWriter(printer)); diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/metadata/MetadataSpecification.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/metadata/MetadataSpecification.java index c0272d9546..7c908d47fb 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/metadata/MetadataSpecification.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/metadata/MetadataSpecification.java @@ -69,7 +69,8 @@ public ExportPropertyGraphTask createExportTask(PropertiesMetadataCollection Format format, RangeFactory rangeFactory, Status status, - int index) { + int index, + boolean includeTypeDefinitions) { return new ExportPropertyGraphTask<>( metadataCollection.propertyMetadataFor(metadataType), labelsFilter, @@ -78,8 +79,8 @@ public ExportPropertyGraphTask createExportTask(PropertiesMetadataCollection format, rangeFactory, status, - index - ); + index, + includeTypeDefinitions); } private static class Handler implements GraphElementHandler> { diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/util/EnvironmentVariableUtils.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/util/EnvironmentVariableUtils.java index fb55bfd359..4a9f34529f 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/util/EnvironmentVariableUtils.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/util/EnvironmentVariableUtils.java @@ -1,3 +1,15 @@ +/* +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + http://www.apache.org/licenses/LICENSE-2.0 +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +*/ + package com.amazonaws.services.neptune.util; public class EnvironmentVariableUtils { diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/util/S3ObjectInfo.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/util/S3ObjectInfo.java new file mode 100644 index 0000000000..faf21fa6b5 --- /dev/null +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/util/S3ObjectInfo.java @@ -0,0 +1,51 @@ +/* +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + http://www.apache.org/licenses/LICENSE-2.0 +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +*/ + +package com.amazonaws.services.neptune.util; + +import java.io.File; +import java.net.URI; + +public class S3ObjectInfo { + private final String bucket; + private final String key; + private final String fileName; + + public S3ObjectInfo(String s3Uri) { + URI uri = URI.create(s3Uri); + + bucket = uri.getAuthority(); + key = uri.getPath().substring(1); + fileName = new File(uri.getPath()).getName(); + } + + public String bucket() { + return bucket; + } + + public String key() { + return key; + } + + public File createDownloadFile(String parent) { + return new File(parent, fileName); + } + + public S3ObjectInfo withNewKeySuffix(String suffix) { + return new S3ObjectInfo( String.format("s3://%s/%s", bucket, new File(key, suffix).getPath())); + } + + @Override + public String toString() { + return String.format("s3://%s/%s", bucket, key); + } +} diff --git a/neptune-export/src/test/java/com/amazonaws/services/neptune/propertygraph/io/JsonPrinterTest.java b/neptune-export/src/test/java/com/amazonaws/services/neptune/propertygraph/io/JsonPrinterTest.java index 384449b429..ec6233c93d 100644 --- a/neptune-export/src/test/java/com/amazonaws/services/neptune/propertygraph/io/JsonPrinterTest.java +++ b/neptune-export/src/test/java/com/amazonaws/services/neptune/propertygraph/io/JsonPrinterTest.java @@ -37,5 +37,4 @@ public void shouldPrintEdge() throws Exception { "{\"~id\":\"edge-id\",\"~label\":\"edge-label\",\"~from\":\"from-id\",\"~to\":\"to-id\"}", stringWriter.toString()); } - } \ No newline at end of file diff --git a/neptune-export/src/test/java/com/amazonaws/services/neptune/util/S3ObjectInfoTest.java b/neptune-export/src/test/java/com/amazonaws/services/neptune/util/S3ObjectInfoTest.java new file mode 100644 index 0000000000..8b67273335 --- /dev/null +++ b/neptune-export/src/test/java/com/amazonaws/services/neptune/util/S3ObjectInfoTest.java @@ -0,0 +1,83 @@ +/* +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + http://www.apache.org/licenses/LICENSE-2.0 +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +*/ + +package com.amazonaws.services.neptune.util; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class S3ObjectInfoTest { + + @Test + public void canParseBucketFromURI(){ + String s3Uri = "s3://my-bucket/a/b/c"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("my-bucket", s3ObjectInfo.bucket()); + } + + @Test + public void canParseKeyWithoutTrailingSlashFromURI(){ + String s3Uri = "s3://my-bucket/a/b/c"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("a/b/c", s3ObjectInfo.key()); + } + + @Test + public void canParseKeyWithTrainlingSlashFromURI(){ + String s3Uri = "s3://my-bucket/a/b/c/"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("a/b/c/", s3ObjectInfo.key()); + } + + @Test + public void canCreateDownloadFileForKeyWithoutTrailingSlash(){ + String s3Uri = "s3://my-bucket/a/b/c.txt"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("/temp/c.txt", s3ObjectInfo.createDownloadFile("/temp").getAbsolutePath()); + } + + @Test + public void canCreateDownloadFileForKeyWithTrailingSlash(){ + String s3Uri = "s3://my-bucket/a/b/c/"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("/temp/c", s3ObjectInfo.createDownloadFile("/temp").getAbsolutePath()); + } + + @Test + public void canCreateNewInfoForKeyWithoutTrailingSlash() { + String s3Uri = "s3://my-bucket/a/b/c"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("a/b/c/dir", s3ObjectInfo.withNewKeySuffix("dir").key()); + } + + @Test + public void canCreateNewKeyForKeyWithTrailingSlash() { + String s3Uri = "s3://my-bucket/a/b/c/"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("a/b/c/dir", s3ObjectInfo.withNewKeySuffix("dir").key()); + } +} \ No newline at end of file From fae4f939756e58171b7834918b9ef0d071fe748e Mon Sep 17 00:00:00 2001 From: Ian Robinson Date: Tue, 26 Feb 2019 19:04:25 +0100 Subject: [PATCH 4/5] Update docs --- neptune-export/docs/export-pg-from-config.md | 10 +++++++++- neptune-export/docs/export-pg.md | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/neptune-export/docs/export-pg-from-config.md b/neptune-export/docs/export-pg-from-config.md index c4a4e5a314..2119060840 100644 --- a/neptune-export/docs/export-pg-from-config.md +++ b/neptune-export/docs/export-pg-from-config.md @@ -8,7 +8,8 @@ {-c | --config-file} [ {-cn | --concurrency} ] {-d | --dir} {-e | --endpoint} ... - [ {-el | --edge-label} ... ] [ --format ] + [ {-el | --edge-label} ... ] + [ --exclude-type-definitions ] [ --format ] [ --lb-port ] [ --log-level ] [ {-nl | --node-label} ... ] [ --nlb-endpoint ] @@ -63,6 +64,13 @@ -el , --edge-label Labels of edges to be exported (optional, default all labels) + --exclude-type-definitions + Exclude type definitions from column headers (optional, default + false) + + This option may occur a maximum of 1 times + + --format Output format (optional, default 'csv') diff --git a/neptune-export/docs/export-pg.md b/neptune-export/docs/export-pg.md index f856b91369..b37d1e6901 100644 --- a/neptune-export/docs/export-pg.md +++ b/neptune-export/docs/export-pg.md @@ -7,7 +7,8 @@ [ --alb-endpoint ] [ {-cn | --concurrency} ] {-d | --dir} {-e | --endpoint} ... - [ {-el | --edge-label} ... ] [ --format ] + [ {-el | --edge-label} ... ] + [ --exclude-type-definitions ] [ --format ] [ --lb-port ] [ --log-level ] [ {-nl | --node-label} ... ] [ --nlb-endpoint ] @@ -52,6 +53,13 @@ -el , --edge-label Labels of edges to be exported (optional, default all labels) + --exclude-type-definitions + Exclude type definitions from column headers (optional, default + false) + + This option may occur a maximum of 1 times + + --format Output format (optional, default 'csv') From 48c5a4485299a3529c33e98e740d4dc1b834b9da Mon Sep 17 00:00:00 2001 From: Ian Robinson Date: Tue, 26 Feb 2019 19:38:33 +0100 Subject: [PATCH 5/5] Update README --- neptune-export/readme.md | 71 ++++++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/neptune-export/readme.md b/neptune-export/readme.md index c53aadfa58..4fb6ecf3b2 100644 --- a/neptune-export/readme.md +++ b/neptune-export/readme.md @@ -10,34 +10,14 @@ Exports Amazon Neptune property graph data to CSV or JSON, or RDF graph data to - [`export-pg-from-queries`](docs/export-pg-from-queries.md) - [`export-rdf`](docs/export-rdf.md) -### Property Graph +### Topics - [Exporting to the Bulk Loader CSV Format](#exporting-to-the-bulk-loader-csv-format) - [Exporting the Results of User-Supplied Queries](#exporting-the-results-of-user-supplied-queries) - -### RDF Graph - - [Exporting an RDF Graph](#exporting-an-rdf-graph) - -### Encryption in transit - -You can connect to Neptune from _neptune-export_ using SSL by specifying the `--use-ssl` option. - -If you are using a load balancer or a proxy server (such as HAProxy), you must [use SSL termination and have your own SSL certificate on the proxy server](https://docs.aws.amazon.com/neptune/latest/userguide/security-ssl.html). - -### IAM DB authentication - -_neptune-export_ supports exporting from databases that have [IAM database authentication](https://docs.aws.amazon.com/neptune/latest/userguide/iam-auth.html) enabled. Supply the `--use-iam-auth` option with each command. Remember to set the **SERVICE_REGION** environment variable – e.g. `export SERVICE_REGION=us-east-1`. - -_neptune-export_ also supports connecting through a load balancer to a Neptune database with IAM DB authetication enabled. However, this feature is only currently supported for property graphs, with support for RDF graphs coming soon. - -If you are connecting through a load balancer, and have IAM DB authentication enabled, you must also supply either an `--nlb-endpoint` option (if using a network load balancer) or an `--alb-endpoint` option (if using an application load balancer), and an `--lb-port`. - -For details on using a load balancer with a database with IAM DB authentication enabled, see [Connecting to Amazon Neptune from Clients Outside the Neptune VPC](https://github.com/aws-samples/aws-dbs-refarch-graph/tree/master/src/connecting-using-a-load-balancer). - -## Building neptune-export - -`mvn clean install` + - [Building neptune-export](#building-neptune-export) + - [Security](#security) + - [Deploying neptune-export as an AWS Lambda Function](#deploying-neptune-export-as-an-aws-lambda-function) ## Exporting to the Bulk Loader CSV Format @@ -101,4 +81,45 @@ Queries whose results contain very large rows can sometimes trigger a `Corrupted ## Exporting an RDF Graph -At present _neptune-export_ supports exporting an RDF dataset to Turtle with a single-threaded long-running query. \ No newline at end of file +At present _neptune-export_ supports exporting an RDF dataset to Turtle with a single-threaded long-running query. + +## Security + +### Encryption in transit + +You can connect to Neptune from _neptune-export_ using SSL by specifying the `--use-ssl` option. + +If you are using a load balancer or a proxy server (such as HAProxy), you must [use SSL termination and have your own SSL certificate on the proxy server](https://docs.aws.amazon.com/neptune/latest/userguide/security-ssl.html). + +### IAM DB authentication + +_neptune-export_ supports exporting from databases that have [IAM database authentication](https://docs.aws.amazon.com/neptune/latest/userguide/iam-auth.html) enabled. Supply the `--use-iam-auth` option with each command. Remember to set the **SERVICE_REGION** environment variable – e.g. `export SERVICE_REGION=us-east-1`. + +_neptune-export_ also supports connecting through a load balancer to a Neptune database with IAM DB authetication enabled. However, this feature is only currently supported for property graphs, with support for RDF graphs coming soon. + +If you are connecting through a load balancer, and have IAM DB authentication enabled, you must also supply either an `--nlb-endpoint` option (if using a network load balancer) or an `--alb-endpoint` option (if using an application load balancer), and an `--lb-port`. + +For details on using a load balancer with a database with IAM DB authentication enabled, see [Connecting to Amazon Neptune from Clients Outside the Neptune VPC](https://github.com/aws-samples/aws-dbs-refarch-graph/tree/master/src/connecting-using-a-load-balancer). + +## Building neptune-export + +To build the jar, run: + +`mvn clean install` + +## Deploying neptune-export as an AWS Lambda Function + +The _neptune-export_ jar can be deployed as an AWS Lambda function. To access Neptune, you will either have to [configure the function to access resources inside your VPC](https://docs.aws.amazon.com/lambda/latest/dg/vpc.html), or [expose the Neptune endpoints via a load balancer](https://github.com/aws-samples/aws-dbs-refarch-graph/tree/master/src/connecting-using-a-load-balancer). + +Be mindful of the [AWS Lambda limits](https://docs.aws.amazon.com/lambda/latest/dg/limits.html), particularly with regard to function timeouts (max 15 minutes) and _/tmp_ directory storage (512 MB). Large exports can easily exceed these limits. + +When deployed as a Lambda function, _neptune-export_ will automatically copy the export files to an S3 bucket of your choosing. Optionally, it can also write a completion file to a separate S3 location (useful for triggering additional Lambda functions). You must configure your function with an IAM role that has write access to these S3 locations. + +The Lambda function expects a number of parameters, which you can supply either as [environment variables](https://docs.aws.amazon.com/lambda/latest/dg/env_variables.html) or via a JSON input parameter. Fields in the JSON input parameter override any environment variables you have set up. + +| Environment Variable | JSON Field | Description || +| ---- | ---- | ---- | ---- | +| `COMMAND` | `command` | Command and command-line options: e.g. `export-pg -e ` | Mandatory | +| `OUTPUT_S3_PATH` | `outputS3Path` | S3 location to which exported files will be written | Mandatory | +| `CONFIG_FILE_S3_PATH` | `configFileS3Path` | S3 location of a JSON config file to be used when exporting a property graph from a config file | Optional | +| `COMPLETION_FILE_S3_PATH` | `completionFileS3Path` | S3 location to which a completion file shuld be written once all export files have been copied to S3 | Optional | \ No newline at end of file