diff --git a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index 5544f2de966f..078040b6aa42 100644 --- a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -172,11 +172,19 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro attributes.put(getAttributePrefix() + ".path", path.getParent().toString()); flowFile = session.putAllAttributes(flowFile, attributes); - fileSystem.delete(path, isRecursive(context, session)); - getLogger().debug("For flowfile {} Deleted file at path {} with name {}", originalFlowFile, path.getParent(), path.getName()); - final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); - flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); - session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); + boolean success = fileSystem.delete(path, isRecursive(context, session)); + + if (success) { + getLogger().debug("For flowfile {} Deleted file at path {} with name {}", originalFlowFile, path.getParent(), path.getName()); + final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); + flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); + session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); + } else { + getLogger().warn("Failed to delete file at path {} with name {} due to unknown issue, please check related component logs.", path.getParent(), path.getName()); + attributes.put(getAttributePrefix() + ".error.message", "Delete action failed due to unknown issue, please check related component logs."); + session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); + failedPath++; + } } catch (IOException ioe) { if (handleAuthErrors(ioe, session, context, new GSSExceptionRollbackYieldSessionHandler())) { return null; diff --git a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index 21391b228775..1ae82fd1f08e 100644 --- a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -38,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,6 +57,7 @@ public void testSuccessfulDelete() throws Exception { Path filePath = new Path("/some/path/to/file.txt"); when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); + when(mockFileSystem.delete(any(Path.class), anyBoolean())).thenReturn(true); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem); TestRunner runner = TestRunners.newTestRunner(deleteHDFS); runner.setIncomingConnection(false); @@ -82,6 +84,7 @@ public void testDeleteFromIncomingFlowFile() throws Exception { Path filePath = new Path("/some/path/to/file.txt"); when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); + when(mockFileSystem.delete(any(Path.class), anyBoolean())).thenReturn(true); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem); TestRunner runner = TestRunners.newTestRunner(deleteHDFS); runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}"); @@ -156,7 +159,7 @@ public void testNoFlowFilesWithIncomingConnection() { } @Test - public void testUnsuccessfulDelete() throws Exception { + public void testDeleteNotExistingFile() throws Exception { Path filePath = new Path("/some/path/to/file.txt"); when(mockFileSystem.exists(any(Path.class))).thenReturn(false); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem); @@ -169,6 +172,22 @@ public void testUnsuccessfulDelete() throws Exception { runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); } + @Test + public void testFailedDelete() throws Exception { + final Path filePath = new Path("/some/path/to/file.txt"); + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.delete(any(Path.class), anyBoolean())).thenReturn(false); + final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem); + final TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setIncomingConnection(false); + runner.assertNotValid(); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); + runner.assertValid(); + runner.run(); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); + } + @Test public void testGlobDelete() throws Exception { Path glob = new Path("/data/for/2017/08/05/*"); @@ -183,6 +202,7 @@ public void testGlobDelete() throws Exception { when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); + when(mockFileSystem.delete(any(Path.class), anyBoolean())).thenReturn(true); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem); TestRunner runner = TestRunners.newTestRunner(deleteHDFS); runner.setIncomingConnection(false); @@ -193,6 +213,61 @@ public void testGlobDelete() throws Exception { runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); } + @Test + public void testFailedGlobDelete() throws Exception { + final Path glob = new Path("/data/for/2017/08/05/*"); + final int fileCount = 10; + final FileStatus[] fileStatuses = new FileStatus[fileCount]; + for (int i = 0; i < fileCount; i++) { + final Path file = new Path("/data/for/2017/08/05/file" + i); + final FileStatus fileStatus = mock(FileStatus.class); + when(fileStatus.getPath()).thenReturn(file); + fileStatuses[i] = fileStatus; + } + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); + when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); + when(mockFileSystem.delete(any(Path.class), anyBoolean())).thenReturn(false); + final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem); + final TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setIncomingConnection(false); + runner.assertNotValid(); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); + runner.assertValid(); + runner.run(); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, fileCount); + } + + @Test + public void testMixedGlobDelete() throws Exception { + final Path glob = new Path("/data/for/2017/08/05/*"); + final int fileCount = 3; + final FileStatus[] fileStatuses = new FileStatus[fileCount]; + for (int i = 0; i < fileCount; i++) { + final Path file = new Path("/data/for/2017/08/05/file" + i); + final FileStatus fileStatus = mock(FileStatus.class); + when(fileStatus.getPath()).thenReturn(file); + fileStatuses[i] = fileStatus; + } + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); + when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); + when(mockFileSystem.delete(any(Path.class), anyBoolean())) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false); + final DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem); + final TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setIncomingConnection(false); + runner.assertNotValid(); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); + runner.assertValid(); + runner.run(); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 2); + } + @Test public void testGlobDeleteFromIncomingFlowFile() throws Exception { Path glob = new Path("/data/for/2017/08/05/*"); @@ -207,6 +282,7 @@ public void testGlobDeleteFromIncomingFlowFile() throws Exception { when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); + when(mockFileSystem.delete(any(Path.class), anyBoolean())).thenReturn(true); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(mockFileSystem); TestRunner runner = TestRunners.newTestRunner(deleteHDFS); runner.setIncomingConnection(true);