Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CODENVY-587: Handle closed consumers in composite line comsumer #2549

Merged
merged 16 commits into from
Nov 15, 2016
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,76 @@
*******************************************************************************/
package org.eclipse.che.api.core.util;

import org.eclipse.che.api.core.util.lineconsumer.ConsumerAlreadyClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* This line consumer consists of several consumers and copies each consumed line into all subconsumers.
* Is used when lines should be written into two or more consumers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note that it is not thread-safe

* <br/>
* This class is not thread safe.
* Also see multithreaded implementation {@link org.eclipse.che.api.core.util.lineconsumer.ConcurrentCompositeLineConsumer}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Links to other classes can be formatted with @see annotation

*
* @author andrew00x

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadocs

* @author Mykola Morhun
*/
public class CompositeLineConsumer implements LineConsumer {
private static final Logger LOG = LoggerFactory.getLogger(CompositeLineConsumer.class);

private final LineConsumer[] lineConsumers;
private final List<LineConsumer> lineConsumers;
private boolean isOpen;

public CompositeLineConsumer(LineConsumer... lineConsumers) {
this.lineConsumers = lineConsumers;
this.lineConsumers = new CopyOnWriteArrayList<>(lineConsumers);
this.isOpen = true;
}

/**
* Closes all unclosed subconsumers.
*/
@Override
public void close() throws IOException {
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.close();
} catch (IOException e) {
LOG.error(String.format("An error occurred while closing the line consumer %s", lineConsumer), e);
public void close() {
if (isOpen) {
isOpen = false;
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.close();
} catch (IOException e) {
LOG.error(String.format("An error occurred while closing the line consumer %s", lineConsumer), e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that toString in line consumers returns pointer to memory area. Do we need that in error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because it also returns name of implementation in which problem occurs

}
}
}
}

/**
* Writes given line to each subconsumer.
* Do nothing if this consumer is closed or all subconsumers are closed.
*
* @param line
* line to write
*/
@Override
public void writeLine(String line) throws IOException {
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.writeLine(line);
} catch (IOException e) {
LOG.error(String.format("An error occurred while writing line to the line consumer %s", lineConsumer), e);
public void writeLine(String line) {
if (isOpen) {
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.writeLine(line);
} catch (ConsumerAlreadyClosedException | ClosedByInterruptException e) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ClosedByInterruptException is thrown this class should not continue work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Only one subconsumer is closed. What is the reason to close others?

lineConsumers.remove(lineConsumer); // consumer is already closed, so we cannot write into it any more

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an appropriate way to remove from source while you are iterating through it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say a test is required with isClosed and writeLine throwing exceptions

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mmorhun Please address my comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by using CopyOnWriteArrayList

if (lineConsumers.size() == 0) { // if all consumers are closed then we can close this one
isOpen = false;
}
} catch (IOException e) {
LOG.error(String.format("An error occurred while writing line to the line consumer %s", lineConsumer), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is LineConsumer always have nice toString() ? Could we say also the line that we tried to log ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benoitf, we have default toString() here, but I think that it is enough. We'll have consumer implementation class name and exception stacktrace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benoitf, do you think we need to log that? For example, an output of user's command?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not I was just trying to see which error message could be helpful to debug/understand what was the issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benoitf thank you, sometimes logged information isn't enough, but the problem here is the reason why it happens.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with Florent - toString here has no sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is still usable, because we will know consumer implementation

}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,33 @@
*******************************************************************************/
package org.eclipse.che.api.core.util;

import org.eclipse.che.api.core.util.lineconsumer.ConsumerAlreadyClosedException;

import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.Charset;
import java.nio.file.Files;

/**
* Consumes logs and writes them into file.
* <br/>
* This class is not thread safe.
* Also see multithreaded implementation {@link org.eclipse.che.api.core.util.lineconsumer.ConcurrentFileLineConsumer}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Links to other classes can be formatted with @see annotation

*
* @author andrew00x

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs? And add a note that it is not thread-safe

* @author Mykola Morhun
*/
public class FileLineConsumer implements LineConsumer {
private final File file;
private final Writer writer;

private boolean isOpen;

public FileLineConsumer(File file) throws IOException {
this.file = file;
writer = Files.newBufferedWriter(file.toPath(), Charset.defaultCharset());
isOpen = true;
}

public File getFile() {
Expand All @@ -34,15 +45,27 @@ public File getFile() {

@Override
public void writeLine(String line) throws IOException {
if (line != null) {
writer.write(line);
if (isOpen) {
try {
if (line != null) {
writer.write(line);
}
writer.write('\n');
writer.flush();
} catch (IOException e) {
if ("Stream closed".equals(e.getMessage())) {
throw new ConsumerAlreadyClosedException(e.getMessage());
}
throw e;
}
}
writer.write('\n');
writer.flush();
}

@Override
public void close() throws IOException {
writer.close();
if (isOpen) {
isOpen = false;
writer.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*******************************************************************************
* Copyright (c) 2012-2016 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.core.util.lineconsumer;

import org.eclipse.che.api.core.util.LineConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* This line consumer consists of several consumers and copies each consumed line into all subconsumers.
* Is used when lines should be written into two or more consumers.
* This implementation is thread safe.
*
* @author andrew00x
* @author Mykola Morhun
*/
public class ConcurrentCompositeLineConsumer implements LineConsumer {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentCompositeLineConsumer.class);

private final List<LineConsumer> lineConsumers;
private final ReentrantReadWriteLock lock;

private volatile boolean isOpen;

public ConcurrentCompositeLineConsumer(LineConsumer... lineConsumers) {
this.lineConsumers = new CopyOnWriteArrayList<>(lineConsumers);
this.lock = new ReentrantReadWriteLock();
this.isOpen = true;
}

public boolean isOpen() {
return isOpen;
}

/**
* Closes all unclosed subconsumers.
*/
@Override
public void close() {
if (isOpen) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread-safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, I forgot volatile

lock.writeLock().lock();
try {
isOpen = false;
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.close();
} catch (IOException e) {
LOG.error(String.format("An error occurred while closing the line consumer %s", lineConsumer), e);
}
}
} finally {
lock.writeLock().unlock();
}
}
}

/**
* Writes given line to each subconsumer.
* Do nothing if this consumer is closed or all subconsumers are closed.
*
* @param line
* line to write
*/
@Override
public void writeLine(String line) {
if (isOpen && lock.readLock().tryLock()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread-safe?

try {
for (LineConsumer lineConsumer : lineConsumers) {
try {
lineConsumer.writeLine(line);
} catch (ConsumerAlreadyClosedException | ClosedByInterruptException e) {
lineConsumers.remove(lineConsumer); // consumer is already closed, so we cannot write into it any more
if (lineConsumers.size() == 0) { // if all consumers are closed then we can close this one
isOpen = false;
}
} catch (IOException e) {
LOG.error(String.format("An error occurred while writing line to the line consumer %s", lineConsumer), e);
}
}
} finally {
lock.readLock().unlock();
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*******************************************************************************
* Copyright (c) 2012-2016 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.core.util.lineconsumer;

import org.eclipse.che.api.core.util.LineConsumer;

import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Consumes logs and writes them into file.
* This implementation is thread safe.
*
* @author andrew00x

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs?

* @author Mykola Morhun
*/
public class ConcurrentFileLineConsumer implements LineConsumer {
private final File file;
private final Writer writer;
private final ReentrantReadWriteLock lock;

private volatile boolean isOpen;

public ConcurrentFileLineConsumer(File file) throws IOException {
this.file = file;
writer = Files.newBufferedWriter(file.toPath(), Charset.defaultCharset());
isOpen = true;
lock = new ReentrantReadWriteLock();
}

public File getFile() {
return file;
}

public boolean isOpen() {
return isOpen;
}

@Override
public void writeLine(String line) throws IOException {
if (isOpen && lock.readLock().tryLock()) {
try {
if (line != null) {
writer.write(line);
}
writer.write('\n');
writer.flush();
} catch (IOException e) {
if ("Stream closed".equals(e.getMessage())) {
throw new ConsumerAlreadyClosedException(e.getMessage());
}
throw e;
} finally {
lock.readLock().unlock();
}
}
}

@Override
public void close() throws IOException {
if (isOpen) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that convenient way to do that is AtomicBoolean.compareAndSet()

lock.writeLock().lock();
try {
isOpen = false;
writer.close();
} finally {
lock.writeLock().unlock();
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*******************************************************************************
* Copyright (c) 2012-2016 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.core.util.lineconsumer;

import java.io.IOException;

/**
* Is thrown as result of attempt to write a line into closed line consumer.
*
* @author Mykola Morhun
*/
public class ConsumerAlreadyClosedException extends IOException {
public ConsumerAlreadyClosedException(String message) {
super(message);
}
}
Loading