-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Iterator-based JDBC Source (and Redshift bugfix) (#1887)
- Loading branch information
Showing
39 changed files
with
1,468 additions
and
260 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
35 changes: 35 additions & 0 deletions
35
airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.commons.util; | ||
|
||
import java.util.Iterator; | ||
|
||
/** | ||
* If you operate on this iterator, you better close it. {@link AutoCloseableIterator#close} must be | ||
* idempotent. The contract on this interface is that it may be called MANY times. | ||
* | ||
* @param <T> type | ||
*/ | ||
public interface AutoCloseableIterator<T> extends Iterator<T>, AutoCloseable {} |
138 changes: 138 additions & 0 deletions
138
airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.commons.util; | ||
|
||
import com.google.common.collect.Iterators; | ||
import io.airbyte.commons.concurrency.VoidCallable; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Stream; | ||
|
||
public class AutoCloseableIterators { | ||
|
||
/** | ||
* Coerces a vanilla {@link Iterator} into a {@link AutoCloseableIterator} by adding a no op close | ||
* function. | ||
* | ||
* @param iterator iterator to convert | ||
* @param <T> type | ||
* @return closeable iterator | ||
*/ | ||
public static <T> AutoCloseableIterator<T> fromIterator(Iterator<T> iterator) { | ||
return new DefaultAutoCloseableIterator<>(iterator, VoidCallable.NOOP); | ||
} | ||
|
||
/** | ||
* Coerces a vanilla {@link Iterator} into a {@link AutoCloseableIterator}. The provided | ||
* {@param onClose} function will be called at most one time. | ||
* | ||
* @param iterator autocloseable iterator to add another close to | ||
* @param onClose the function that will be called on close | ||
* @param <T> type | ||
* @return new autocloseable iterator with the close function appended | ||
*/ | ||
public static <T> AutoCloseableIterator<T> fromIterator(Iterator<T> iterator, VoidCallable onClose) { | ||
return new DefaultAutoCloseableIterator<>(iterator, onClose::call); | ||
} | ||
|
||
/** | ||
* Wraps a {@link Stream} in a {@link AutoCloseableIterator}. The first time | ||
* {@link AutoCloseableIterator#close()} is called, {@link Stream#close()} will be called. It will | ||
* not be called again subsequently. | ||
* | ||
* @param stream stream to wrap | ||
* @param <T> type | ||
* @return autocloseable iterator | ||
*/ | ||
public static <T> AutoCloseableIterator<T> fromStream(Stream<T> stream) { | ||
return new DefaultAutoCloseableIterator<>(stream.iterator(), stream::close); | ||
} | ||
|
||
/** | ||
* Returns a {@link AutoCloseableIterator} that will call the provided supplier ONE time when | ||
* {@link AutoCloseableIterator#hasNext()} is called the first time. The supplier returns a stream | ||
* that will be exposed as an iterator. | ||
* | ||
* @param iteratorSupplier supplier that provides a autocloseable iterator that will be invoked | ||
* lazily | ||
* @param <T> type | ||
* @return autocloseable iterator | ||
*/ | ||
public static <T> AutoCloseableIterator<T> lazyIterator(Supplier<AutoCloseableIterator<T>> iteratorSupplier) { | ||
return new LazyAutoCloseableIterator<>(iteratorSupplier); | ||
} | ||
|
||
/** | ||
* Append a function to be called on {@link AutoCloseableIterator#close}. | ||
* | ||
* @param autoCloseableIterator autocloseable iterator to add another close to | ||
* @param voidCallable the function that will be called on close | ||
* @param <T> type | ||
* @return new autocloseable iterator with the close function appended | ||
*/ | ||
public static <T> AutoCloseableIterator<T> appendOnClose(AutoCloseableIterator<T> autoCloseableIterator, VoidCallable voidCallable) { | ||
return new DefaultAutoCloseableIterator<>(autoCloseableIterator, () -> { | ||
autoCloseableIterator.close(); | ||
voidCallable.call(); | ||
}); | ||
} | ||
|
||
/** | ||
* Lift and shift of Guava's {@link Iterators#transform} using the {@link AutoCloseableIterator} | ||
* interface. | ||
* | ||
* @param fromIterator input autocloseable iterator | ||
* @param function map function | ||
* @param <F> input type | ||
* @param <T> output type | ||
* @return mapped autocloseable iterator | ||
*/ | ||
public static <F, T> AutoCloseableIterator<T> transform(AutoCloseableIterator<F> fromIterator, Function<? super F, ? extends T> function) { | ||
return new DefaultAutoCloseableIterator<>(Iterators.transform(fromIterator, function::apply), fromIterator::close); | ||
} | ||
|
||
/** | ||
* Map over a {@link AutoCloseableIterator} using a vanilla {@link Iterator} while retaining all of | ||
* the Resource behavior of the input {@link AutoCloseableIterator}. | ||
* | ||
* @param iteratorCreator function that takes in a autocloseable iterator and uses it to create a | ||
* vanilla iterator | ||
* @param autoCloseableIterator input autocloseable iterator | ||
* @param <T> type | ||
* @return autocloseable iterator that still has the close functionality of the original input | ||
* iterator but is transformed by the iterator output by the iteratorCreator | ||
*/ | ||
public static <T> AutoCloseableIterator<T> transform(Function<AutoCloseableIterator<T>, Iterator<T>> iteratorCreator, | ||
AutoCloseableIterator<T> autoCloseableIterator) { | ||
return new DefaultAutoCloseableIterator<>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close); | ||
} | ||
|
||
public static <T> CompositeIterator<T> concatWithEagerClose(List<AutoCloseableIterator<T>> iterators) { | ||
return new CompositeIterator<>(iterators); | ||
} | ||
|
||
} |
127 changes: 127 additions & 0 deletions
127
airbyte-commons/src/main/java/io/airbyte/commons/util/CompositeIterator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.commons.util; | ||
|
||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.AbstractIterator; | ||
import java.util.ArrayList; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Composes multiple {@link AutoCloseableIterator}s. For each internal iterator, after the first | ||
* time its {@link Iterator#hasNext} function returns false, the composite iterator will call | ||
* {@link AutoCloseableIterator#close()} on that internal iterator. | ||
* | ||
* <p> | ||
* {@link CompositeIterator}s should be closed. Calling {@link CompositeIterator#close()} will | ||
* attempt to close each internal iterator as well. Thus the close method on each internal iterator | ||
* should be idempotent as it is will likely be called multiple times. | ||
* </p> | ||
* <p> | ||
* {@link CompositeIterator#close()} gives the guarantee that it will call close on each internal | ||
* iterator once (even if any of the iterators throw an exception). After it has attempted to close | ||
* each one once, {@link CompositeIterator} will rethrow the _first_ exception that it encountered | ||
* while closing internal iterators. If multiple internal iterators throw exceptions, only the first | ||
* exception will be rethrown, though the others will be logged. | ||
* </p> | ||
* | ||
* @param <T> type | ||
*/ | ||
public final class CompositeIterator<T> extends AbstractIterator<T> implements AutoCloseableIterator<T> { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(CompositeIterator.class); | ||
|
||
private final List<AutoCloseableIterator<T>> iterators; | ||
|
||
private int i; | ||
private boolean hasClosed; | ||
|
||
CompositeIterator(List<AutoCloseableIterator<T>> iterators) { | ||
Preconditions.checkNotNull(iterators); | ||
|
||
this.iterators = iterators; | ||
this.i = 0; | ||
this.hasClosed = false; | ||
} | ||
|
||
@Override | ||
protected T computeNext() { | ||
assertHasNotClosed(); | ||
|
||
if (iterators.isEmpty()) { | ||
return endOfData(); | ||
} | ||
|
||
// 1. search for an iterator that hasNext. | ||
// 2. close each iterator we encounter those that do not. | ||
// 3. if there are none, we are done. | ||
while (!currentIterator().hasNext()) { | ||
try { | ||
currentIterator().close(); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
if (i + 1 < iterators.size()) { | ||
i++; | ||
} else { | ||
return endOfData(); | ||
} | ||
} | ||
|
||
return currentIterator().next(); | ||
} | ||
|
||
private AutoCloseableIterator<T> currentIterator() { | ||
return iterators.get(i); | ||
} | ||
|
||
@Override | ||
public void close() throws Exception { | ||
hasClosed = true; | ||
|
||
final List<Exception> exceptions = new ArrayList<>(); | ||
for (AutoCloseableIterator<T> iterator : iterators) { | ||
try { | ||
iterator.close(); | ||
} catch (Exception e) { | ||
LOGGER.error("exception while closing", e); | ||
exceptions.add(e); | ||
} | ||
} | ||
|
||
if (!exceptions.isEmpty()) { | ||
throw exceptions.get(0); | ||
} | ||
} | ||
|
||
private void assertHasNotClosed() { | ||
Preconditions.checkState(!hasClosed); | ||
} | ||
|
||
} |
77 changes: 77 additions & 0 deletions
77
airbyte-commons/src/main/java/io/airbyte/commons/util/DefaultAutoCloseableIterator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.commons.util; | ||
|
||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.AbstractIterator; | ||
import io.airbyte.commons.concurrency.VoidCallable; | ||
import java.util.Iterator; | ||
|
||
/** | ||
* The canonical {@link AutoCloseableIterator}. The default behavior guarantees that the provided | ||
* close functional will be called no more than one time. | ||
* | ||
* @param <T> type | ||
*/ | ||
class DefaultAutoCloseableIterator<T> extends AbstractIterator<T> implements AutoCloseableIterator<T> { | ||
|
||
private final Iterator<T> iterator; | ||
private final VoidCallable onClose; | ||
|
||
private boolean hasClosed; | ||
|
||
public DefaultAutoCloseableIterator(Iterator<T> iterator, VoidCallable onClose) { | ||
Preconditions.checkNotNull(iterator); | ||
Preconditions.checkNotNull(onClose); | ||
|
||
this.iterator = iterator; | ||
this.onClose = onClose; | ||
this.hasClosed = false; | ||
} | ||
|
||
@Override | ||
protected T computeNext() { | ||
assertHasNotClosed(); | ||
|
||
if (iterator.hasNext()) { | ||
return iterator.next(); | ||
} else { | ||
return endOfData(); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws Exception { | ||
if (!hasClosed) { | ||
hasClosed = true; | ||
onClose.call(); | ||
} | ||
} | ||
|
||
private void assertHasNotClosed() { | ||
Preconditions.checkState(!hasClosed); | ||
} | ||
|
||
} |
Oops, something went wrong.