Skip to content

Commit

Permalink
Merge branch 'master' into cbini/398-add-support-latest-version-jsons…
Browse files Browse the repository at this point in the history
…chema-library
  • Loading branch information
cbini authored Nov 18, 2024
2 parents 46ae965 + 5958697 commit 6117524
Show file tree
Hide file tree
Showing 31 changed files with 439 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ airbyte:
file-transfer:
enabled: ${USE_FILE_TRANSFER:false}
staging-path: ${AIRBYTE_STAGING_DIRECTORY:/staging/files}
resources:
disk:
bytes: ${CONNECTOR_STORAGE_LIMIT_BYTES:5368709120} # 5GB
1 change: 1 addition & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base'))

testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
testImplementation("io.mockk:mockk:1.13.12")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20"
testFixturesImplementation "uk.org.webcompere:system-stubs-jupiter:2.1.7"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@
package io.airbyte.cdk.load.config

import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.state.MemoryManager
import io.airbyte.cdk.load.state.ReservationManager
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton

/** Factory for instantiating beans necessary for the sync process. */
@Factory
class SyncBeanFactory {
@Singleton
@Named("memoryManager")
fun memoryManager(
config: DestinationConfiguration,
): MemoryManager {
): ReservationManager {
val memory = config.maxMessageQueueMemoryUsageRatio * Runtime.getRuntime().maxMemory()

return MemoryManager(memory.toLong())
return ReservationManager(memory.toLong())
}

@Singleton
@Named("diskManager")
fun diskManager(
@Value("\${airbyte.resources.disk.bytes}") availableBytes: Long,
): ReservationManager {
return ReservationManager(availableBytes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package io.airbyte.cdk.load.message

import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.state.MemoryManager
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.state.Reserved
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
Expand Down Expand Up @@ -55,9 +55,9 @@ data class StreamFileCompleteWrapped(
class DestinationRecordQueue : ChannelMessageQueue<Reserved<DestinationRecordWrapped>>()

/**
* A supplier of message queues to which ([MemoryManager.reserve]'d) @ [DestinationRecordWrapped]
* messages can be published on a @ [DestinationStream] key. The queues themselves do not manage
* memory.
* A supplier of message queues to which ([ReservationManager.reserve]'d) @
* [DestinationRecordWrapped] messages can be published on a @ [DestinationStream] key. The queues
* themselves do not manage memory.
*/
@Singleton
@Secondary
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.state

import io.airbyte.cdk.load.util.CloseableCoroutine
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/** Releasable reservation of memory. */
class Reserved<T>(
private val parentManager: ReservationManager,
val bytesReserved: Long,
val value: T,
) : CloseableCoroutine {
private var released = AtomicBoolean(false)

suspend fun release() {
if (!released.compareAndSet(false, true)) {
return
}
parentManager.release(bytesReserved)
}

fun <U> replace(value: U): Reserved<U> = Reserved(parentManager, bytesReserved, value)

override suspend fun close() {
release()
}
}

/**
* Manages memory usage for the destination.
*
* TODO: Better initialization of available runtime memory?
*
* TODO: Some degree of logging/monitoring around how accurate we're actually being?
*/
class ReservationManager(val totalCapacityBytes: Long) {

private var usedBytes = AtomicLong(0L)
private var updateChannel = MutableStateFlow(0L)
private val reserveLock = Mutex()

val remainingCapacityBytes: Long
get() = totalCapacityBytes - usedBytes.get()

/* Attempt to reserve memory. If enough memory is not available, waits until it is, then reserves. */
suspend fun <T> reserve(bytes: Long, reservedFor: T): Reserved<T> {
reserve(bytes)

return Reserved(this, bytes, reservedFor)
}

/* Attempt to reserve memory. If enough memory is not available, waits until it is, then reserves. */
suspend fun reserve(bytes: Long) {
if (bytes > totalCapacityBytes) {
throw IllegalArgumentException(
"Requested ${bytes}b exceeds ${totalCapacityBytes}b total"
)
}

reserveLock.withLock {
while (usedBytes.get() + bytes > totalCapacityBytes) {
updateChannel.first()
}
usedBytes.addAndGet(bytes)
}
}

suspend fun release(bytes: Long) {
updateChannel.value = usedBytes.addAndGet(-bytes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete
import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.task.ImplementorScope
Expand All @@ -21,6 +22,7 @@ import io.airbyte.cdk.load.util.lineSequence
import io.airbyte.cdk.load.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Named
import jakarta.inject.Singleton
import kotlin.io.path.inputStream

Expand All @@ -40,6 +42,7 @@ class DefaultProcessRecordsTask(
private val file: SpilledRawMessagesLocalFile,
private val deserializer: Deserializer<DestinationMessage>,
private val syncManager: SyncManager,
private val diskManager: ReservationManager,
) : ProcessRecordsTask {
override suspend fun execute() {
val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -74,6 +77,7 @@ class DefaultProcessRecordsTask(
} finally {
log.info { "Processing completed, deleting $file" }
file.localFile.toFile().delete()
diskManager.release(file.totalSizeBytes)
}

val wrapped = BatchEnvelope(batch, file.indexRange)
Expand All @@ -94,12 +98,20 @@ interface ProcessRecordsTaskFactory {
class DefaultProcessRecordsTaskFactory(
private val deserializer: Deserializer<DestinationMessage>,
private val syncManager: SyncManager,
@Named("diskManager") private val diskManager: ReservationManager,
) : ProcessRecordsTaskFactory {
override fun make(
taskLauncher: DestinationTaskLauncher,
stream: DestinationStream,
file: SpilledRawMessagesLocalFile,
): ProcessRecordsTask {
return DefaultProcessRecordsTask(stream, taskLauncher, file, deserializer, syncManager)
return DefaultProcessRecordsTask(
stream,
taskLauncher,
file,
deserializer,
syncManager,
diskManager,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ package io.airbyte.cdk.load.task.internal

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.CheckpointMessage
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
import io.airbyte.cdk.load.message.Deserializer
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationFileStreamComplete
import io.airbyte.cdk.load.message.DestinationFileStreamIncomplete
Expand All @@ -32,7 +30,6 @@ import io.airbyte.cdk.load.message.StreamFileWrapped
import io.airbyte.cdk.load.message.StreamRecordCompleteWrapped
import io.airbyte.cdk.load.message.StreamRecordWrapped
import io.airbyte.cdk.load.message.Undefined
import io.airbyte.cdk.load.state.MemoryManager
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.task.KillableScope
Expand All @@ -41,9 +38,6 @@ import io.airbyte.cdk.load.util.use
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.io.InputStream
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector

interface InputConsumerTask : SyncLevel, KillableScope

Expand Down Expand Up @@ -192,44 +186,3 @@ class DefaultInputConsumerTask(
}
}
}

interface SizedInputFlow<T> : Flow<Pair<Long, T>>

abstract class ReservingDeserializingInputFlow<T : Any> : SizedInputFlow<Reserved<T>> {
val log = KotlinLogging.logger {}

abstract val config: DestinationConfiguration
abstract val deserializer: Deserializer<T>
abstract val memoryManager: MemoryManager
abstract val inputStream: InputStream

override suspend fun collect(collector: FlowCollector<Pair<Long, Reserved<T>>>) {
log.info { "Reserved ${memoryManager.totalMemoryBytes/1024}mb memory for input processing" }

inputStream.bufferedReader().lineSequence().forEachIndexed { index, line ->
if (line.isEmpty()) {
return@forEachIndexed
}

val lineSize = line.length.toLong()
val estimatedSize = lineSize * config.estimatedRecordMemoryOverheadRatio
val reserved = memoryManager.reserve(estimatedSize.toLong(), line)
val message = deserializer.deserialize(line)
collector.emit(Pair(lineSize, reserved.replace(message)))

if (index % 10_000 == 0) {
log.info { "Processed $index lines" }
}
}

log.info { "Finished processing input" }
}
}

@Singleton
class DefaultInputFlow(
override val config: DestinationConfiguration,
override val deserializer: Deserializer<DestinationMessage>,
override val memoryManager: MemoryManager,
override val inputStream: InputStream
) : ReservingDeserializingInputFlow<DestinationMessage>()
Loading

0 comments on commit 6117524

Please sign in to comment.