-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CODENVY-587: Handle closed consumers in composite line comsumer #2549
Changes from 13 commits
54a5b81
5466e88
65635ba
32e5728
384ae6f
8d0e6d1
d561cdc
c52f6cb
cce75a4
b1c1f0b
615be4e
2d2817b
c37ce05
73e86a0
37589f6
655c7d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,42 +10,76 @@ | |
*******************************************************************************/ | ||
package org.eclipse.che.api.core.util; | ||
|
||
import org.eclipse.che.api.core.util.lineconsumer.ConsumerAlreadyClosedException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.nio.channels.ClosedByInterruptException; | ||
import java.util.List; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
|
||
/** | ||
* This line consumer consists of several consumers and copies each consumed line into all subconsumers. | ||
* Is used when lines should be written into two or more consumers. | ||
* <br/> | ||
* This class is not thread safe. | ||
* Also see multithreaded implementation {@link org.eclipse.che.api.core.util.lineconsumer.ConcurrentCompositeLineConsumer} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Links to other classes can be formatted with @see annotation |
||
* | ||
* @author andrew00x | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add javadocs |
||
* @author Mykola Morhun | ||
*/ | ||
public class CompositeLineConsumer implements LineConsumer { | ||
private static final Logger LOG = LoggerFactory.getLogger(CompositeLineConsumer.class); | ||
|
||
private final LineConsumer[] lineConsumers; | ||
private final List<LineConsumer> lineConsumers; | ||
private boolean isOpen; | ||
|
||
public CompositeLineConsumer(LineConsumer... lineConsumers) { | ||
this.lineConsumers = lineConsumers; | ||
this.lineConsumers = new CopyOnWriteArrayList<>(lineConsumers); | ||
this.isOpen = true; | ||
} | ||
|
||
/** | ||
* Closes all unclosed subconsumers. | ||
*/ | ||
@Override | ||
public void close() throws IOException { | ||
for (LineConsumer lineConsumer : lineConsumers) { | ||
try { | ||
lineConsumer.close(); | ||
} catch (IOException e) { | ||
LOG.error(String.format("An error occurred while closing the line consumer %s", lineConsumer), e); | ||
public void close() { | ||
if (isOpen) { | ||
isOpen = false; | ||
for (LineConsumer lineConsumer : lineConsumers) { | ||
try { | ||
lineConsumer.close(); | ||
} catch (IOException e) { | ||
LOG.error(String.format("An error occurred while closing the line consumer %s", lineConsumer), e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that toString in line consumers returns pointer to memory area. Do we need that in error message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, because it also returns name of implementation in which problem occurs |
||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Writes given line to each subconsumer. | ||
* Do nothing if this consumer is closed or all subconsumers are closed. | ||
* | ||
* @param line | ||
* line to write | ||
*/ | ||
@Override | ||
public void writeLine(String line) throws IOException { | ||
for (LineConsumer lineConsumer : lineConsumers) { | ||
try { | ||
lineConsumer.writeLine(line); | ||
} catch (IOException e) { | ||
LOG.error(String.format("An error occurred while writing line to the line consumer %s", lineConsumer), e); | ||
public void writeLine(String line) { | ||
if (isOpen) { | ||
for (LineConsumer lineConsumer : lineConsumers) { | ||
try { | ||
lineConsumer.writeLine(line); | ||
} catch (ConsumerAlreadyClosedException | ClosedByInterruptException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If ClosedByInterruptException is thrown this class should not continue work There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? Only one subconsumer is closed. What is the reason to close others? |
||
lineConsumers.remove(lineConsumer); // consumer is already closed, so we cannot write into it any more | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it an appropriate way to remove from source while you are iterating through it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would say a test is required with isClosed and writeLine throwing exceptions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mmorhun Please address my comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed by using |
||
if (lineConsumers.size() == 0) { // if all consumers are closed then we can close this one | ||
isOpen = false; | ||
} | ||
} catch (IOException e) { | ||
LOG.error(String.format("An error occurred while writing line to the line consumer %s", lineConsumer), e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is LineConsumer always have nice toString() ? Could we say also the line that we tried to log ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benoitf, we have default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benoitf, do you think we need to log that? For example, an output of user's command? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe not I was just trying to see which error message could be helpful to debug/understand what was the issue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benoitf thank you, sometimes logged information isn't enough, but the problem here is the reason why it happens. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree with Florent - toString here has no sense There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is still usable, because we will know consumer implementation |
||
} | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,22 +10,33 @@ | |
*******************************************************************************/ | ||
package org.eclipse.che.api.core.util; | ||
|
||
import org.eclipse.che.api.core.util.lineconsumer.ConsumerAlreadyClosedException; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.io.Writer; | ||
import java.nio.charset.Charset; | ||
import java.nio.file.Files; | ||
|
||
/** | ||
* Consumes logs and writes them into file. | ||
* <br/> | ||
* This class is not thread safe. | ||
* Also see multithreaded implementation {@link org.eclipse.che.api.core.util.lineconsumer.ConcurrentFileLineConsumer} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Links to other classes can be formatted with @see annotation |
||
* | ||
* @author andrew00x | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadocs? And add a note that it is not thread-safe |
||
* @author Mykola Morhun | ||
*/ | ||
public class FileLineConsumer implements LineConsumer { | ||
private final File file; | ||
private final Writer writer; | ||
|
||
private boolean isOpen; | ||
|
||
public FileLineConsumer(File file) throws IOException { | ||
this.file = file; | ||
writer = Files.newBufferedWriter(file.toPath(), Charset.defaultCharset()); | ||
isOpen = true; | ||
} | ||
|
||
public File getFile() { | ||
|
@@ -34,15 +45,27 @@ public File getFile() { | |
|
||
@Override | ||
public void writeLine(String line) throws IOException { | ||
if (line != null) { | ||
writer.write(line); | ||
if (isOpen) { | ||
try { | ||
if (line != null) { | ||
writer.write(line); | ||
} | ||
writer.write('\n'); | ||
writer.flush(); | ||
} catch (IOException e) { | ||
if ("Stream closed".equals(e.getMessage())) { | ||
throw new ConsumerAlreadyClosedException(e.getMessage()); | ||
} | ||
throw e; | ||
} | ||
} | ||
writer.write('\n'); | ||
writer.flush(); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
writer.close(); | ||
if (isOpen) { | ||
isOpen = false; | ||
writer.close(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
/******************************************************************************* | ||
* Copyright (c) 2012-2016 Codenvy, S.A. | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Public License v1.0 | ||
* which accompanies this distribution, and is available at | ||
* http://www.eclipse.org/legal/epl-v10.html | ||
* | ||
* Contributors: | ||
* Codenvy, S.A. - initial API and implementation | ||
*******************************************************************************/ | ||
package org.eclipse.che.api.core.util.lineconsumer; | ||
|
||
import org.eclipse.che.api.core.util.LineConsumer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.nio.channels.ClosedByInterruptException; | ||
import java.util.List; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
/** | ||
* This line consumer consists of several consumers and copies each consumed line into all subconsumers. | ||
* Is used when lines should be written into two or more consumers. | ||
* This implementation is thread safe. | ||
* | ||
* @author andrew00x | ||
* @author Mykola Morhun | ||
*/ | ||
public class ConcurrentCompositeLineConsumer implements LineConsumer { | ||
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentCompositeLineConsumer.class); | ||
|
||
private final List<LineConsumer> lineConsumers; | ||
private final ReentrantReadWriteLock lock; | ||
|
||
private volatile boolean isOpen; | ||
|
||
public ConcurrentCompositeLineConsumer(LineConsumer... lineConsumers) { | ||
this.lineConsumers = new CopyOnWriteArrayList<>(lineConsumers); | ||
this.lock = new ReentrantReadWriteLock(); | ||
this.isOpen = true; | ||
} | ||
|
||
public boolean isOpen() { | ||
return isOpen; | ||
} | ||
|
||
/** | ||
* Closes all unclosed subconsumers. | ||
*/ | ||
@Override | ||
public void close() { | ||
if (isOpen) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this thread-safe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thx, I forgot volatile |
||
lock.writeLock().lock(); | ||
try { | ||
isOpen = false; | ||
for (LineConsumer lineConsumer : lineConsumers) { | ||
try { | ||
lineConsumer.close(); | ||
} catch (IOException e) { | ||
LOG.error(String.format("An error occurred while closing the line consumer %s", lineConsumer), e); | ||
} | ||
} | ||
} finally { | ||
lock.writeLock().unlock(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Writes given line to each subconsumer. | ||
* Do nothing if this consumer is closed or all subconsumers are closed. | ||
* | ||
* @param line | ||
* line to write | ||
*/ | ||
@Override | ||
public void writeLine(String line) { | ||
if (isOpen && lock.readLock().tryLock()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this thread-safe? |
||
try { | ||
for (LineConsumer lineConsumer : lineConsumers) { | ||
try { | ||
lineConsumer.writeLine(line); | ||
} catch (ConsumerAlreadyClosedException | ClosedByInterruptException e) { | ||
lineConsumers.remove(lineConsumer); // consumer is already closed, so we cannot write into it any more | ||
if (lineConsumers.size() == 0) { // if all consumers are closed then we can close this one | ||
isOpen = false; | ||
} | ||
} catch (IOException e) { | ||
LOG.error(String.format("An error occurred while writing line to the line consumer %s", lineConsumer), e); | ||
} | ||
} | ||
} finally { | ||
lock.readLock().unlock(); | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/******************************************************************************* | ||
* Copyright (c) 2012-2016 Codenvy, S.A. | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Public License v1.0 | ||
* which accompanies this distribution, and is available at | ||
* http://www.eclipse.org/legal/epl-v10.html | ||
* | ||
* Contributors: | ||
* Codenvy, S.A. - initial API and implementation | ||
*******************************************************************************/ | ||
package org.eclipse.che.api.core.util.lineconsumer; | ||
|
||
import org.eclipse.che.api.core.util.LineConsumer; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.io.Writer; | ||
import java.nio.charset.Charset; | ||
import java.nio.file.Files; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
/** | ||
* Consumes logs and writes them into file. | ||
* This implementation is thread safe. | ||
* | ||
* @author andrew00x | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadocs? |
||
* @author Mykola Morhun | ||
*/ | ||
public class ConcurrentFileLineConsumer implements LineConsumer { | ||
private final File file; | ||
private final Writer writer; | ||
private final ReentrantReadWriteLock lock; | ||
|
||
private volatile boolean isOpen; | ||
|
||
public ConcurrentFileLineConsumer(File file) throws IOException { | ||
this.file = file; | ||
writer = Files.newBufferedWriter(file.toPath(), Charset.defaultCharset()); | ||
isOpen = true; | ||
lock = new ReentrantReadWriteLock(); | ||
} | ||
|
||
public File getFile() { | ||
return file; | ||
} | ||
|
||
public boolean isOpen() { | ||
return isOpen; | ||
} | ||
|
||
@Override | ||
public void writeLine(String line) throws IOException { | ||
if (isOpen && lock.readLock().tryLock()) { | ||
try { | ||
if (line != null) { | ||
writer.write(line); | ||
} | ||
writer.write('\n'); | ||
writer.flush(); | ||
} catch (IOException e) { | ||
if ("Stream closed".equals(e.getMessage())) { | ||
throw new ConsumerAlreadyClosedException(e.getMessage()); | ||
} | ||
throw e; | ||
} finally { | ||
lock.readLock().unlock(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
if (isOpen) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose that convenient way to do that is AtomicBoolean.compareAndSet() |
||
lock.writeLock().lock(); | ||
try { | ||
isOpen = false; | ||
writer.close(); | ||
} finally { | ||
lock.writeLock().unlock(); | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/******************************************************************************* | ||
* Copyright (c) 2012-2016 Codenvy, S.A. | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Public License v1.0 | ||
* which accompanies this distribution, and is available at | ||
* http://www.eclipse.org/legal/epl-v10.html | ||
* | ||
* Contributors: | ||
* Codenvy, S.A. - initial API and implementation | ||
*******************************************************************************/ | ||
package org.eclipse.che.api.core.util.lineconsumer; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* Is thrown as result of attempt to write a line into closed line consumer. | ||
* | ||
* @author Mykola Morhun | ||
*/ | ||
public class ConsumerAlreadyClosedException extends IOException { | ||
public ConsumerAlreadyClosedException(String message) { | ||
super(message); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a note that it is not thread-safe