forked from airbytehq/airbyte
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Track stream status in source (airbytehq#24971)
* WIP Track stream status in source * Revert formatting * Revert formatting changes * Remove unnecessary file * Automated Change * Automated Change * Use new stream status trace message * Rename class * Remove unnecessary import * Formatting * Add tests * Fix compile issues * Automated Commit - Formatting Changes * Remove TODO * Fix compilation error * Split STOPPED into INCOMPLETE and COMPLETE * Remove unused import * Changelog updates for source-postgres * Remove unused import * auto-bump connector version --------- Co-authored-by: jdpgrailsdev <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
- Loading branch information
1 parent
ca36a0b
commit 1fc44bc
Showing
38 changed files
with
612 additions
and
227 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
airbyte-commons/src/main/java/io/airbyte/commons/stream/AirbyteStreamStatusHolder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.commons.stream; | ||
|
||
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage; | ||
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus; | ||
import io.airbyte.protocol.models.v0.AirbyteTraceMessage; | ||
import io.airbyte.protocol.models.v0.StreamDescriptor; | ||
|
||
/** | ||
* Represents the current status of a stream provided by a source. | ||
*/ | ||
public class AirbyteStreamStatusHolder { | ||
|
||
private final AirbyteStreamNameNamespacePair airbyteStream; | ||
|
||
private final AirbyteStreamStatus airbyteStreamStatus; | ||
|
||
public AirbyteStreamStatusHolder(final AirbyteStreamNameNamespacePair airbyteStream, | ||
final AirbyteStreamStatus airbyteStreamStatus) { | ||
this.airbyteStream = airbyteStream; | ||
this.airbyteStreamStatus = airbyteStreamStatus; | ||
} | ||
|
||
public AirbyteTraceMessage toTraceMessage() { | ||
final AirbyteTraceMessage traceMessage = new AirbyteTraceMessage(); | ||
final AirbyteStreamStatusTraceMessage streamStatusTraceMessage = new AirbyteStreamStatusTraceMessage() | ||
.withStreamDescriptor(new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace())) | ||
.withStatus(airbyteStreamStatus); | ||
return traceMessage.withEmittedAt(Long.valueOf(System.currentTimeMillis()).doubleValue()) | ||
.withStreamStatus(streamStatusTraceMessage) | ||
.withType(AirbyteTraceMessage.Type.STREAM_STATUS); | ||
} | ||
|
||
} |
37 changes: 37 additions & 0 deletions
37
airbyte-commons/src/main/java/io/airbyte/commons/stream/AirbyteStreamUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.commons.stream; | ||
|
||
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.v0.AirbyteStream; | ||
|
||
/** | ||
* Collection of utility methods used to convert objects to {@link AirbyteStreamNameNamespacePair} | ||
* objects. | ||
*/ | ||
public class AirbyteStreamUtils { | ||
|
||
/** | ||
* Converts an {@link AirbyteStream} to a {@link AirbyteStreamNameNamespacePair}. | ||
* | ||
* @param airbyteStream The {@link AirbyteStream} to convert. | ||
* @return The {@link AirbyteStreamNameNamespacePair}. | ||
*/ | ||
public static AirbyteStreamNameNamespacePair convertFromAirbyteStream(final AirbyteStream airbyteStream) { | ||
return new AirbyteStreamNameNamespacePair(airbyteStream.getName(), airbyteStream.getNamespace()); | ||
} | ||
|
||
/** | ||
* Converts a stream name and namespace into a {@link AirbyteStreamNameNamespacePair}. | ||
* | ||
* @param name The name of the stream. | ||
* @param namespace The namespace of the stream. | ||
* @return The {@link AirbyteStreamNameNamespacePair}. | ||
*/ | ||
public static AirbyteStreamNameNamespacePair convertFromNameAndNamespace(final String name, final String namespace) { | ||
return new AirbyteStreamNameNamespacePair(name, namespace); | ||
} | ||
|
||
} |
25 changes: 25 additions & 0 deletions
25
airbyte-commons/src/main/java/io/airbyte/commons/util/AirbyteStreamAware.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.commons.util; | ||
|
||
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; | ||
import java.util.Optional; | ||
|
||
/** | ||
* Interface that indicates that an object exposes information used to identify an Airbyte stream. | ||
*/ | ||
public interface AirbyteStreamAware { | ||
|
||
/** | ||
* Returns the {@link AirbyteStreamNameNamespacePair} identifying the Airbyte stream associated with | ||
* the object. | ||
* | ||
* @return The {@link AirbyteStreamNameNamespacePair} identifying the Airbyte stream (may be empty). | ||
*/ | ||
default Optional<AirbyteStreamNameNamespacePair> getAirbyteStream() { | ||
return Optional.empty(); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.