Skip to content

Commit

Permalink
Add tests for SnowflakeGCSClient.java (#1278)
Browse files Browse the repository at this point in the history
* testing download stream for GCS, ignore presigned url test for now

---------

Co-authored-by: sfc-gh-mknister <[email protected]>
  • Loading branch information
sfc-gh-ext-simba-lb and sfc-gh-mknister authored Mar 16, 2023
1 parent 520dd90 commit e798047
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.*;
import java.net.SocketTimeoutException;
import java.net.URISyntaxException;
import java.nio.channels.Channels;
import java.security.InvalidKeyException;
import java.util.*;
import java.util.Map.Entry;
Expand Down Expand Up @@ -448,9 +449,7 @@ public InputStream downloadToStream(
404, // because blob not found
"Blob" + blobId.getName() + " not found in bucket " + blobId.getBucket()));
}

inputStream = new ByteArrayInputStream(blob.getContent());

inputStream = Channels.newInputStream(blob.reader());
if (isEncrypting()) {
// Get the user-defined BLOB metadata
Map<String, String> userDefinedMetadata = blob.getMetadata();
Expand Down Expand Up @@ -776,11 +775,17 @@ private void uploadWithPresignedUrl(
SqlState.INTERNAL_ERROR,
"Unexpected: upload presigned URL invalid");
} catch (Exception e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
String stackTrace = sw.toString();
throw new SnowflakeSQLLoggedException(
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Unexpected: upload with presigned url failed");
"Unexpected: upload with presigned url failed. Ex: "
+ e.getMessage()
+ ", stacktrace: "
+ stackTrace);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1443,4 +1443,53 @@ public void testNoSpaceLeftOnDeviceException() throws SQLException {
}
}
}

@Test
@ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class)
public void testUploadWithGCSPresignedUrlWithoutConnection() throws Throwable {
Connection connection = null;
File destFolder = tmpFolder.newFolder();
String destFolderCanonicalPath = destFolder.getCanonicalPath();
try {
connection = getConnection("gcpaccount");
Statement statement = connection.createStatement();

// create a stage to put the file in
statement.execute("CREATE OR REPLACE STAGE " + testStageName);

SFSession sfSession = connection.unwrap(SnowflakeConnectionV1.class).getSfSession();

// Test put file with internal compression
String putCommand = "put file:///dummy/path/file1.gz @" + testStageName;
SnowflakeFileTransferAgent sfAgent =
new SnowflakeFileTransferAgent(putCommand, sfSession, new SFStatement(sfSession));
List<SnowflakeFileTransferMetadata> metadata = sfAgent.getFileTransferMetadatas();

String srcPath = getFullPathFileInResource(TEST_DATA_FILE);
for (SnowflakeFileTransferMetadata oneMetadata : metadata) {
InputStream inputStream = new FileInputStream(srcPath);

assert (oneMetadata.isForOneFile());
SnowflakeFileTransferAgent.uploadWithoutConnection(
SnowflakeFileTransferConfig.Builder.newInstance()
.setSnowflakeFileTransferMetadata(oneMetadata)
.setUploadStream(inputStream)
.setRequireCompress(true)
.setNetworkTimeoutInMilli(0)
.setOcspMode(OCSPMode.FAIL_OPEN)
.build());
}

assertTrue(
"Failed to get files",
statement.execute(
"GET @" + testStageName + " 'file://" + destFolderCanonicalPath + "/' parallel=8"));
assert (isFileContentEqual(srcPath, false, destFolderCanonicalPath + "/file1.gz", true));
} finally {
if (connection != null) {
connection.createStatement().execute("DROP STAGE if exists " + testStageName);
connection.close();
}
}
}
}
81 changes: 81 additions & 0 deletions src/test/java/net/snowflake/client/jdbc/StreamLatestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import static org.junit.Assert.*;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -14,6 +17,8 @@
import net.snowflake.client.ConditionalIgnoreRule;
import net.snowflake.client.RunningOnGithubAction;
import net.snowflake.client.category.TestCategoryOthers;
import org.apache.commons.io.IOUtils;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -115,4 +120,80 @@ public void testDownloadToStreamBlobNotFoundGCS() throws SQLException {
closeSQLObjects(statement, connection);
}
}

@Test
@Ignore
public void testDownloadToStreamGCSPresignedUrl() throws SQLException, IOException {
final String DEST_PREFIX = "testUploadStream";
Connection connection = null;
Statement statement = null;
connection = getConnection("gcpaccount");
statement = connection.createStatement();
statement.execute("create or replace stage testgcpstage");
ResultSet rset =
statement.executeQuery(
"PUT file://"
+ getFullPathFileInResource(TEST_DATA_FILE)
+ " @testgcpstage/"
+ DEST_PREFIX);
assertTrue(rset.next());
assertEquals("Error message:" + rset.getString(8), "UPLOADED", rset.getString(7));

InputStream out =
connection
.unwrap(SnowflakeConnection.class)
.downloadStream("@testgcpstage", DEST_PREFIX + "/" + TEST_DATA_FILE + ".gz", true);
StringWriter writer = new StringWriter();
IOUtils.copy(out, writer, "UTF-8");
String output = writer.toString();
// the first 2 characters
assertEquals("1|", output.substring(0, 2));

// the number of lines
String[] lines = output.split("\n");
assertEquals(28, lines.length);

statement.execute("rm @~/" + DEST_PREFIX);
statement.close();
closeSQLObjects(statement, connection);
}

@Test
@ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class)
public void testDownloadToStreamGCS() throws SQLException, IOException {
final String DEST_PREFIX = TEST_UUID + "/testUploadStream";
Connection connection = null;
Statement statement = null;
Properties paramProperties = new Properties();
paramProperties.put("GCS_USE_DOWNSCOPED_CREDENTIAL", true);
try {
connection = getConnection("gcpaccount", paramProperties);
statement = connection.createStatement();
ResultSet rset =
statement.executeQuery(
"PUT file://" + getFullPathFileInResource(TEST_DATA_FILE) + " @~/" + DEST_PREFIX);
assertTrue(rset.next());
assertEquals("UPLOADED", rset.getString(7));

InputStream out =
connection
.unwrap(SnowflakeConnection.class)
.downloadStream("~", DEST_PREFIX + "/" + TEST_DATA_FILE + ".gz", true);
StringWriter writer = new StringWriter();
IOUtils.copy(out, writer, "UTF-8");
String output = writer.toString();
// the first 2 characters
assertEquals("1|", output.substring(0, 2));

// the number of lines
String[] lines = output.split("\n");
assertEquals(28, lines.length);
} finally {
if (statement != null) {
statement.execute("rm @~/" + DEST_PREFIX);
statement.close();
}
closeSQLObjects(statement, connection);
}
}
}

0 comments on commit e798047

Please sign in to comment.