Skip to content

Commit

Permalink
Track stream status in source (#24971)
Browse files Browse the repository at this point in the history
* WIP Track stream status in source

* Revert formatting

* Revert formatting changes

* Remove unnecessary file

* Automated Change

* Automated Change

* Use new stream status trace message

* Rename class

* Remove unnecessary import

* Formatting

* Add tests

* Fix compile issues

* Automated Commit - Formatting Changes

* Remove TODO

* Fix compilation error

* Split STOPPED into INCOMPLETE and COMPLETE

* Remove unused import

* Changelog updates for source-postgres

* Remove unused import

* auto-bump connector version

---------

Co-authored-by: jdpgrailsdev <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
3 people authored Apr 26, 2023
1 parent 6b32870 commit a38af08
Show file tree
Hide file tree
Showing 38 changed files with 612 additions and 227 deletions.
1 change: 1 addition & 0 deletions airbyte-commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ plugins {

dependencies {
// Dependencies for this module should be specified in the top-level build.gradle. See readme for more explanation.
implementation libs.airbyte.protocol

// this dependency is an exception to the above rule because it is only used INTERNALLY to the commons library.
implementation 'com.jayway.jsonpath:json-path:2.7.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.stream;

import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus;
import io.airbyte.protocol.models.v0.AirbyteTraceMessage;
import io.airbyte.protocol.models.v0.StreamDescriptor;

/**
* Represents the current status of a stream provided by a source.
*/
public class AirbyteStreamStatusHolder {

private final AirbyteStreamNameNamespacePair airbyteStream;

private final AirbyteStreamStatus airbyteStreamStatus;

public AirbyteStreamStatusHolder(final AirbyteStreamNameNamespacePair airbyteStream,
final AirbyteStreamStatus airbyteStreamStatus) {
this.airbyteStream = airbyteStream;
this.airbyteStreamStatus = airbyteStreamStatus;
}

public AirbyteTraceMessage toTraceMessage() {
final AirbyteTraceMessage traceMessage = new AirbyteTraceMessage();
final AirbyteStreamStatusTraceMessage streamStatusTraceMessage = new AirbyteStreamStatusTraceMessage()
.withStreamDescriptor(new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace()))
.withStatus(airbyteStreamStatus);
return traceMessage.withEmittedAt(Long.valueOf(System.currentTimeMillis()).doubleValue())
.withStreamStatus(streamStatusTraceMessage)
.withType(AirbyteTraceMessage.Type.STREAM_STATUS);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.stream;

import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStream;

/**
* Collection of utility methods used to convert objects to {@link AirbyteStreamNameNamespacePair}
* objects.
*/
public class AirbyteStreamUtils {

/**
* Converts an {@link AirbyteStream} to a {@link AirbyteStreamNameNamespacePair}.
*
* @param airbyteStream The {@link AirbyteStream} to convert.
* @return The {@link AirbyteStreamNameNamespacePair}.
*/
public static AirbyteStreamNameNamespacePair convertFromAirbyteStream(final AirbyteStream airbyteStream) {
return new AirbyteStreamNameNamespacePair(airbyteStream.getName(), airbyteStream.getNamespace());
}

/**
* Converts a stream name and namespace into a {@link AirbyteStreamNameNamespacePair}.
*
* @param name The name of the stream.
* @param namespace The namespace of the stream.
* @return The {@link AirbyteStreamNameNamespacePair}.
*/
public static AirbyteStreamNameNamespacePair convertFromNameAndNamespace(final String name, final String namespace) {
return new AirbyteStreamNameNamespacePair(name, namespace);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.util;

import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import java.util.Optional;

/**
* Interface that indicates that an object exposes information used to identify an Airbyte stream.
*/
public interface AirbyteStreamAware {

/**
* Returns the {@link AirbyteStreamNameNamespacePair} identifying the Airbyte stream associated with
* the object.
*
* @return The {@link AirbyteStreamNameNamespacePair} identifying the Airbyte stream (may be empty).
*/
default Optional<AirbyteStreamNameNamespacePair> getAirbyteStream() {
return Optional.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

import com.google.common.collect.Iterators;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand All @@ -23,7 +26,19 @@ public class AutoCloseableIterators {
* @return closeable iterator
*/
public static <T> AutoCloseableIterator<T> fromIterator(final Iterator<T> iterator) {
return new DefaultAutoCloseableIterator<>(iterator, VoidCallable.NOOP);
return new DefaultAutoCloseableIterator<>(iterator, VoidCallable.NOOP, null);
}

/**
* Coerces a vanilla {@link Iterator} into a {@link AutoCloseableIterator} by adding a no op close
* function.
*
* @param iterator iterator to convert
* @param <T> type
* @return closeable iterator
*/
public static <T> AutoCloseableIterator<T> fromIterator(final Iterator<T> iterator, final AirbyteStreamNameNamespacePair airbyteStream) {
return new DefaultAutoCloseableIterator<>(iterator, VoidCallable.NOOP, airbyteStream);
}

/**
Expand All @@ -35,8 +50,10 @@ public static <T> AutoCloseableIterator<T> fromIterator(final Iterator<T> iterat
* @param <T> type
* @return new autocloseable iterator with the close function appended
*/
public static <T> AutoCloseableIterator<T> fromIterator(final Iterator<T> iterator, final VoidCallable onClose) {
return new DefaultAutoCloseableIterator<>(iterator, onClose::call);
public static <T> AutoCloseableIterator<T> fromIterator(final Iterator<T> iterator,
final VoidCallable onClose,
final AirbyteStreamNameNamespacePair airbyteStream) {
return new DefaultAutoCloseableIterator<>(iterator, onClose::call, airbyteStream);
}

/**
Expand All @@ -48,8 +65,8 @@ public static <T> AutoCloseableIterator<T> fromIterator(final Iterator<T> iterat
* @param <T> type
* @return autocloseable iterator
*/
public static <T> AutoCloseableIterator<T> fromStream(final Stream<T> stream) {
return new DefaultAutoCloseableIterator<>(stream.iterator(), stream::close);
public static <T> AutoCloseableIterator<T> fromStream(final Stream<T> stream, final AirbyteStreamNameNamespacePair airbyteStream) {
return new DefaultAutoCloseableIterator<>(stream.iterator(), stream::close, airbyteStream);
}

/**
Expand All @@ -71,8 +88,9 @@ public static <T> List<T> toListAndClose(final AutoCloseableIterator<T> iterator
* @param <T> type
* @return autocloseable iterator
*/
public static <T> AutoCloseableIterator<T> lazyIterator(final Supplier<AutoCloseableIterator<T>> iteratorSupplier) {
return new LazyAutoCloseableIterator<>(iteratorSupplier);
public static <T> AutoCloseableIterator<T> lazyIterator(final Supplier<AutoCloseableIterator<T>> iteratorSupplier,
final AirbyteStreamNameNamespacePair airbyteStream) {
return new LazyAutoCloseableIterator<>(iteratorSupplier, airbyteStream);
}

/**
Expand All @@ -87,7 +105,25 @@ public static <T> AutoCloseableIterator<T> appendOnClose(final AutoCloseableIter
return new DefaultAutoCloseableIterator<>(autoCloseableIterator, () -> {
autoCloseableIterator.close();
voidCallable.call();
});
}, null);
}

/**
* Append a function to be called on {@link AutoCloseableIterator#close}.
*
* @param autoCloseableIterator autocloseable iterator to add another close to
* @param voidCallable the function that will be called on close
* @param <T> type
* @return new autocloseable iterator with the close function appended
*/
public static <T> AutoCloseableIterator<T> appendOnClose(final AutoCloseableIterator<T> autoCloseableIterator,
final VoidCallable voidCallable,
final AirbyteStreamNameNamespacePair airbyteStream) {
return new DefaultAutoCloseableIterator<>(autoCloseableIterator, () -> {
autoCloseableIterator.close();
voidCallable.call();
},
airbyteStream);
}

/**
Expand All @@ -102,7 +138,23 @@ public static <T> AutoCloseableIterator<T> appendOnClose(final AutoCloseableIter
*/
public static <F, T> AutoCloseableIterator<T> transform(final AutoCloseableIterator<F> fromIterator,
final Function<? super F, ? extends T> function) {
return new DefaultAutoCloseableIterator<>(Iterators.transform(fromIterator, function::apply), fromIterator::close);
return new DefaultAutoCloseableIterator<>(Iterators.transform(fromIterator, function::apply), fromIterator::close, null);
}

/**
* Lift and shift of Guava's {@link Iterators#transform} using the {@link AutoCloseableIterator}
* interface.
*
* @param fromIterator input autocloseable iterator
* @param function map function
* @param <F> input type
* @param <T> output type
* @return mapped autocloseable iterator
*/
public static <F, T> AutoCloseableIterator<T> transform(final AutoCloseableIterator<F> fromIterator,
final AirbyteStreamNameNamespacePair airbyteStream,
final Function<? super F, ? extends T> function) {
return new DefaultAutoCloseableIterator<>(Iterators.transform(fromIterator, function::apply), fromIterator::close, airbyteStream);
}

/**
Expand All @@ -117,17 +169,29 @@ public static <F, T> AutoCloseableIterator<T> transform(final AutoCloseableItera
* iterator but is transformed by the iterator output by the iteratorCreator
*/
public static <T> AutoCloseableIterator<T> transform(final Function<AutoCloseableIterator<T>, Iterator<T>> iteratorCreator,
final AutoCloseableIterator<T> autoCloseableIterator) {
return new DefaultAutoCloseableIterator<>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close);
final AutoCloseableIterator<T> autoCloseableIterator,
final AirbyteStreamNameNamespacePair airbyteStream) {
return new DefaultAutoCloseableIterator<>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close, airbyteStream);
}

@SafeVarargs
public static <T> CompositeIterator<T> concatWithEagerClose(final Consumer<AirbyteStreamStatusHolder> airbyteStreamStatusConsumer,
final AutoCloseableIterator<T>... iterators) {
return concatWithEagerClose(List.of(iterators), airbyteStreamStatusConsumer);
}

@SafeVarargs
public static <T> CompositeIterator<T> concatWithEagerClose(final AutoCloseableIterator<T>... iterators) {
return concatWithEagerClose(List.of(iterators));
return concatWithEagerClose(List.of(iterators), null);
}

public static <T> CompositeIterator<T> concatWithEagerClose(final List<AutoCloseableIterator<T>> iterators,
final Consumer<AirbyteStreamStatusHolder> airbyteStreamStatusConsumer) {
return new CompositeIterator<>(iterators, airbyteStreamStatusConsumer);
}

public static <T> CompositeIterator<T> concatWithEagerClose(final List<AutoCloseableIterator<T>> iterators) {
return new CompositeIterator<>(iterators);
return concatWithEagerClose(iterators, null);
}

}
Loading

0 comments on commit a38af08

Please sign in to comment.