Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For the various Yielder objects, don't create new Yielders and instead mutate state. #12475

Merged
merged 1 commit into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,23 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, final YieldingAcc
final Yielder<T> baseYielder = baseSequence.toYielder(null, combiningAccumulator);

try {
return makeYielder(baseYielder, combiningAccumulator, false);
// If the yielder is already done at this point, that means that it ran through all of the inputs
// without hitting a yield(), i.e. it's effectively just a single accumulate() call. As such we just
// return a done yielder with the correct accumulated value.
if (baseYielder.isDone()) {
if (combiningAccumulator.accumulatedSomething()) {
combiningAccumulator.accumulateLastValue();
}
// If we yielded, then the expectation is that we get a Yielder with the yielded value, followed by a done
// yielder. This will happen if we fall through to the normal makeYielder. If the accumulator did not yield
// then the code expects a single Yielder that returns whatever was left over from the accumulation on the
// get() call.
if (!combiningAccumulator.yielded()) {
return Yielders.done(combiningAccumulator.getRetVal(), baseYielder);
}
}

return makeYielder(baseYielder, combiningAccumulator);
}
catch (Throwable t1) {
try {
Expand All @@ -94,52 +110,37 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, final YieldingAcc

private <OutType> Yielder<OutType> makeYielder(
final Yielder<T> yielder,
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
boolean finalValue
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator
)
{
final Yielder<T> finalYielder;
final OutType retVal;
final boolean finalFinalValue;

if (!yielder.isDone()) {
retVal = combiningAccumulator.getRetVal();
finalYielder = null;
finalFinalValue = false;
} else {
if (!finalValue && combiningAccumulator.accumulatedSomething()) {
combiningAccumulator.accumulateLastValue();
retVal = combiningAccumulator.getRetVal();
finalFinalValue = true;

if (!combiningAccumulator.yielded()) {
return Yielders.done(retVal, yielder);
} else {
finalYielder = Yielders.done(null, yielder);
}
} else {
return Yielders.done(combiningAccumulator.getRetVal(), yielder);
}
}


return new Yielder<OutType>()
{
private Yielder<T> myYielder = yielder;
private CombiningYieldingAccumulator<OutType, T> accum = combiningAccumulator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be final


@Override
public OutType get()
{
return retVal;
return accum.getRetVal();
}

@Override
public Yielder<OutType> next(OutType initValue)
{
combiningAccumulator.reset();
return makeYielder(
finalYielder == null ? yielder.next(yielder.get()) : finalYielder,
combiningAccumulator,
finalFinalValue
);
accum.reset();
if (myYielder.isDone()) {
return Yielders.done(null, myYielder);
}

myYielder = myYielder.next(myYielder.get());
if (myYielder.isDone() && accum.accumulatedSomething()) {
accum.accumulateLastValue();
if (!accum.yielded()) {
return Yielders.done(accum.getRetVal(), myYielder);
}
}

return this;
}

@Override
Expand All @@ -151,7 +152,7 @@ public boolean isDone()
@Override
public void close() throws IOException
{
yielder.close();
myYielder.close();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,19 @@ public <OutType> Yielder<OutType> toYielder(
final IterType iterator = maker.make();

try {
return makeYielder(initValue, accumulator, iterator);
OutType retVal = initValue;
while (!accumulator.yielded() && iterator.hasNext()) {
retVal = accumulator.accumulate(retVal, iterator.next());
}

if (!accumulator.yielded()) {
return Yielders.done(
retVal,
(Closeable) () -> maker.cleanup(iterator)
);
}

return makeYielder(retVal, accumulator, iterator);
}
catch (Throwable t) {
try {
Expand All @@ -80,47 +92,34 @@ public <OutType> Yielder<OutType> toYielder(
}

private <OutType> Yielder<OutType> makeYielder(
final OutType initValue,
final OutType retValue,
final YieldingAccumulator<OutType, T> accumulator,
final IterType iter
)
{
OutType retVal = initValue;
while (!accumulator.yielded() && iter.hasNext()) {
retVal = accumulator.accumulate(retVal, iter.next());
}

if (!accumulator.yielded()) {
return Yielders.done(
retVal,
(Closeable) () -> maker.cleanup(iter)
);
}

final OutType finalRetVal = retVal;
return new Yielder<OutType>()
{
OutType retVal = retValue;

@Override
public OutType get()
{
return finalRetVal;
return retVal;
}

@Override
public Yielder<OutType> next(OutType initValue)
{
accumulator.reset();
try {
return makeYielder(initValue, accumulator, iter);
retVal = initValue;
while (!accumulator.yielded() && iter.hasNext()) {
retVal = accumulator.accumulate(retVal, iter.next());
}
catch (Throwable t) {
try {
maker.cleanup(iter);
}
catch (Exception e) {
t.addSuppressed(e);
}
throw t;

if (accumulator.yielded()) {
return this;
} else {
return Yielders.done(retVal, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

final class WrappingYielder<OutType> implements Yielder<OutType>
{
private final Yielder<OutType> baseYielder;
private Yielder<OutType> baseYielder;
private final SequenceWrapper wrapper;

WrappingYielder(Yielder<OutType> baseYielder, SequenceWrapper wrapper)
Expand All @@ -50,7 +50,8 @@ public Yielder<OutType> next(final OutType initValue)
@Override
public Yielder<OutType> get()
{
return new WrappingYielder<>(baseYielder.next(initValue), wrapper);
baseYielder = baseYielder.next(initValue);
return WrappingYielder.this;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
* necessarily good at this job, but it works. I think.
*
* Essentially, you can think of a Yielder as a linked list of items where the Yielder gives you access to the current
* head via get() and it will give you another Yielder representing the next item in the chain via next(). A Yielder
* that isDone() may return anything from both get() and next(), there is no contract and depending on those return
* values will likely lead to bugs.
* head via get() and it will give you another Yielder representing the next item in the chain via next(). When using
* a yielder object, a call to yield() on the yielding accumulator will result in a new Yielder being returned whose
* get() method will return the return value of the accumulator from the call that called yield().
*
* When a call to next() exhausts the underlying data stream without having a yield() call, various implementations
* of Sequences and Yielders assume that they will receive a Yielder where isDone() is true and get() will return the
* accumulated value up until that point.
*
* Once next is called, there is no guarantee and no requirement that references to old Yielder objects will continue
* to obey the contract.
Expand Down Expand Up @@ -60,9 +64,8 @@ public interface Yielder<T> extends Closeable
Yielder<T> next(T initValue);

/**
* Returns true if this is the last Yielder in the chain. A Yielder that isDone() may return anything
* from both get() and next(), there is no contract and depending on those return values will likely lead to bugs.
* It will probably break your code to call next() on a Yielder that is done and expect something good from it.
* Returns true if this is the last Yielder in the chain. Review the class level javadoc for an understanding
* of the contract for other methods when isDone() is true.
*
* Once next() is called on this Yielder object, all further operations on this object are undefined.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.ExplodingSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand Down Expand Up @@ -254,6 +255,8 @@ private void testCombining(
int limit
) throws Exception
{
final String prefix = StringUtils.format("yieldEvery[%d], limit[%d]", yieldEvery, limit);

// Test that closing works too
final CountDownLatch closed = new CountDownLatch(1);
final Closeable closeable = closed::countDown;
Expand All @@ -276,7 +279,7 @@ private void testCombining(

List<Pair<Integer, Integer>> merged = seq.toList();

Assert.assertEquals(expected, merged);
Assert.assertEquals(prefix, expected, merged);

Yielder<Pair<Integer, Integer>> yielder = seq.toYielder(
null,
Expand Down Expand Up @@ -318,16 +321,17 @@ public boolean apply(
}
);

int i = 0;
if (expectedVals.hasNext()) {
while (!yielder.isDone()) {
final Pair<Integer, Integer> expectedVal = expectedVals.next();
final Pair<Integer, Integer> actual = yielder.get();
Assert.assertEquals(expectedVal, actual);
Assert.assertEquals(StringUtils.format("%s, i[%s]", prefix, i++), expectedVal, actual);
yielder = yielder.next(actual);
}
}
Assert.assertTrue(yielder.isDone());
Assert.assertFalse(expectedVals.hasNext());
Assert.assertTrue(prefix, yielder.isDone());
Assert.assertFalse(prefix, expectedVals.hasNext());
yielder.close();

Assert.assertTrue("resource closed", closed.await(10000, TimeUnit.MILLISECONDS));
Expand Down