-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Iterator-based JDBC Source (and Redshift bugfix) #1887
Conversation
8bcfa2b
to
7193ea5
Compare
airbyte-commons/src/main/java/io/airbyte/commons/concurrency/SafeVoidCallable.java
Outdated
Show resolved
Hide resolved
airbyte-commons/src/main/java/io/airbyte/commons/util/MoreIterators.java
Outdated
Show resolved
Hide resolved
airbyte-commons/src/main/java/io/airbyte/commons/util/MoreIterators.java
Outdated
Show resolved
Hide resolved
* @param <T> type | ||
* @return auto closing iterator | ||
*/ | ||
public static <T> Iterator<T> autoCloseIterator(Iterator<T> iterator, SafeVoidCallable onClose) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a VERY dangerous function. It should not return an Iterator
, it should return an AutoCloseIterator. Otherwise there is absolutely no way to release resources but to consume the iterator to the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
* @param <T> type | ||
* @return vanilla iterator | ||
*/ | ||
public static <T> Iterator<T> toCloseableIterator(CloseableIterator<T> iterator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to kill this function, it cannot exist. Way too dangerous for the reason I mentioned earlier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
airbyte-commons/src/main/java/io/airbyte/commons/concurrency/VoidCallable.java
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java
Outdated
Show resolved
Hide resolved
...ectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java
Outdated
Show resolved
Hide resolved
...s/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/StateDecoratingIterator.java
Outdated
Show resolved
Hide resolved
...s/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/StateDecoratingIterator.java
Outdated
Show resolved
Hide resolved
7371392
to
f181958
Compare
@michel-tricot i think i hit your feedback. the I'm glad we are removing the headache of streams. The iterator approach is unfortunately taking a ton of boilerplate to recreate some of the features of streams that we need though. Let me know if you feel that there's opportunity to reduce this boilerplate. Still need to do unit tests. |
airbyte-commons/src/main/java/io/airbyte/commons/util/ResourceIterator.java
Outdated
Show resolved
Hide resolved
airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseIterator.java
Outdated
Show resolved
Hide resolved
airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseIterator.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void close() throws Exception { | ||
internalIterator.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you do the boolean management here and just not do anything if close has already been called? You probably want to use an AtomicBoolean with a get&set if you do so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to push for composition here the way one would thing about a stream. Each of these iterators is designed to do one thing. They can be then composed to get whatever behavior you want.
So if you want close to be called once, then you compose this iterator with the default iterator. I think of this as good default behavior which is why it's called the DefaultResourceIterator
but if we think that needs to named more explicitly we can do that to.
Anyway, this attempt at composition is why I am not doing an AutoClosingLazySingleCloseIterator
and not combining the behavior you're describing here. Are you not a fan of this approach?
airbyte-commons/src/main/java/io/airbyte/commons/util/DefaultResourceIterator.java
Outdated
Show resolved
Hide resolved
airbyte-commons/src/main/java/io/airbyte/commons/util/ResourceIterator.java
Outdated
Show resolved
Hide resolved
|
||
public class ResourceIterators { | ||
|
||
public static <T> ResourceIterator<T> emptyIterator() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: empty()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
* @param <T> type | ||
* @return closeable iterator | ||
*/ | ||
public static <T> ResourceIterator<T> resourceIterator(Iterator<T> iterator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: fromIterator
* @param <T> type | ||
* @return new resource iterator with the close function appended | ||
*/ | ||
public static <T> ResourceIterator<T> resourceIterator(Iterator<T> iterator, VoidCallableNoException onClose) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: fromIterator
* @param <T> type | ||
* @return resource iterator | ||
*/ | ||
public static <T> ResourceIterator<T> resourceIterator(Stream<T> stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: fromStream
airbyte-commons/src/main/java/io/airbyte/commons/util/ResourceIterators.java
Outdated
Show resolved
Hide resolved
I am not completely satisfied with my suggestion for autoclosable |
5bab2a4
to
7b20b59
Compare
@michel-tricot I am still working through testing but I think I have now hit all of the feedback you left (and we discussed offline). I am a little worried that we are being too slow to push out a fix for an issue that makes our redshift source unusable and is potentially causing issues in on jdbc sources that we just haven't had reported yet. Unless there are "one-way" door sort of issues, I think it's time to push this thing over across the finish line and do additional nice to haves later. Definitely agreed that the first set of iterators were too dangerous, but this second round feels like stuff we could do as follow on tasks. @michel-tricot wdyt? (I am hammering through unit tests now, and will not merge without them) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ship it!
aec251a
to
de5a5e2
Compare
Fixed. Actually a problem with the stress test itself, not the source code. |
/publish connector=connectors/source-postgres
|
/publish connector=connectors/source-mysql
|
/publish connector=connectors/source-mssql
|
/publish connector=connectors/source-redshift
|
What
There are 2 things happening in this PR.
AbstractJdbcSource
in favor of iterators.JdbcStandardTest
for the Redshift source.Why?
.
characters in the table names, which was a known issue that we actually had a PR up for, we just hadn't merged it because we didn't want to risk adding unknown bugs right before the HN launch. I pushed a version of this PR to docker hub and gave him access. This fixed the initial issue but then we found the redshift source was hanging...flatMap
a few weeks ago (PR)). This seemed relatively safe since we fetch data from the dbs in chunks. So the first chunks would be pulled eagerly but the rest would wait until we started consuming the output. This worked with the other jdbc databases. It did not work for Redshift. I believe this is due to how connection pooling is happening in Redshift where. Empirically the database hangs up after there are multiple (>9) queries queued up. I still need to dig a little deeper into the exact limitation here.How
MoreIterators
to provide us "stream-like" syntactic sugar when dealing with iterators.AbstractJdbcSource
to use iteratorsJdbcStandardTest
for redshift.JdbcStandardTest
to handle its own clean up and not assume that there will be a new database available for every test (a constraint of working with redshift without test containers)JdbcStandardTest
to handle schemas namespaces more clearly / robustly.Pre-merge Checklist
MoreIterators
Recommended reading order
Source.java
IntegrationRunner.java
MoreIterators.java
RedshiftJdbcStandardTest.java
Other