Skip to content

Commit

Permalink
CODENVY-587: Handle closed consumers in composite line comsumer
Browse files Browse the repository at this point in the history
Signed-off-by: Mykola Morhun <[email protected]>
  • Loading branch information
Mykola Morhun committed Sep 27, 2016
1 parent 33944c6 commit 54a5b81
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,58 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* @author andrew00x
* @author Mykola Morhun
*/
public class CompositeLineConsumer implements LineConsumer {
private static final Logger LOG = LoggerFactory.getLogger(CompositeLineConsumer.class);

private final LineConsumer[] lineConsumers;
private final List<LineConsumer> lineConsumers;
private boolean isClosed;

public CompositeLineConsumer(LineConsumer... lineConsumers) {
this.lineConsumers = lineConsumers;
this.lineConsumers = new ArrayList<>(lineConsumers.length);
Arrays.stream(lineConsumers).forEach(this.lineConsumers::add);

this.isClosed = false;
}

@Override
public void close() throws IOException {
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.close();
} catch (IOException e) {
LOG.error(String.format("An error occurred while closing the line consumer %s", lineConsumer), e);
if (!isClosed) {
isClosed = true;
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.close();
} catch (IOException e) {
LOG.error(String.format("An error occurred while closing the line consumer %s", lineConsumer), e);
}
}
}
}

@Override
public void writeLine(String line) throws IOException {
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.writeLine(line);
} catch (IOException e) {
LOG.error(String.format("An error occurred while writing line to the line consumer %s", lineConsumer), e);
if (!isClosed) {
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.writeLine(line);
} catch (ConsumerAlreadyClosedException | ClosedByInterruptException e) {
lineConsumers.remove(lineConsumer); // consumer is already closed, so we cannot write into it any more
if (lineConsumers.size() == 0) { // if all consumers are closed then we can close this one
isClosed = true;
}
} catch (IOException e) {
LOG.error(String.format("An error occurred while writing line to the line consumer %s", lineConsumer), e);
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*******************************************************************************
* Copyright (c) 2012-2016 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.core.util;

import java.io.IOException;

/**
* @author Mykola Morhun
*/
public class ConsumerAlreadyClosedException extends IOException {
public ConsumerAlreadyClosedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,18 @@ public File getFile() {

@Override
public void writeLine(String line) throws IOException {
if (line != null) {
writer.write(line);
try {
if (line != null) {
writer.write(line);
}
writer.write('\n');
writer.flush();
} catch (IOException e) {
if("Stream closed".equals(e.getMessage())) {
throw new ConsumerAlreadyClosedException(e.getMessage());
}
throw e;
}
writer.write('\n');
writer.flush();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*******************************************************************************/
package org.eclipse.che.plugin.docker.client;

import org.eclipse.che.api.core.util.ConsumerAlreadyClosedException;
import org.eclipse.che.plugin.docker.client.json.ProgressStatus;

/**
Expand All @@ -19,7 +20,7 @@
* @author Alexander Garagatyi
*/
public interface ProgressMonitor {
void updateProgress(ProgressStatus currentProgressStatus);
void updateProgress(ProgressStatus currentProgressStatus) throws ConsumerAlreadyClosedException;

ProgressMonitor DEV_NULL = new ProgressMonitor() {
@Override
Expand Down

0 comments on commit 54a5b81

Please sign in to comment.