Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UnsafeBufferOperations.forEachSegment implementation #383

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/api/kotlinx-io-core.api
Original file line number Diff line number Diff line change
@@ -305,6 +305,7 @@ public abstract interface class kotlinx/io/unsafe/SegmentWriteContext {

public final class kotlinx/io/unsafe/UnsafeBufferOperations {
public static final field INSTANCE Lkotlinx/io/unsafe/UnsafeBufferOperations;
public final fun forEachSegment (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)V
public final fun getMaxSafeWriteCapacity ()I
public final fun iterate (Lkotlinx/io/Buffer;JLkotlin/jvm/functions/Function3;)V
public final fun iterate (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)V
1 change: 1 addition & 0 deletions core/api/kotlinx-io-core.klib.api
Original file line number Diff line number Diff line change
@@ -211,6 +211,7 @@ final object kotlinx.io.unsafe/UnsafeBufferOperations { // kotlinx.io.unsafe/Uns
final fun <get-maxSafeWriteCapacity>(): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.maxSafeWriteCapacity.<get-maxSafeWriteCapacity>|<get-maxSafeWriteCapacity>(){}[0]

final fun moveToTail(kotlinx.io/Buffer, kotlin/ByteArray, kotlin/Int = ..., kotlin/Int = ...) // kotlinx.io.unsafe/UnsafeBufferOperations.moveToTail|moveToTail(kotlinx.io.Buffer;kotlin.ByteArray;kotlin.Int;kotlin.Int){}[0]
final inline fun forEachSegment(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/SegmentReadContext, kotlinx.io/Segment, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.forEachSegment|forEachSegment(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.SegmentReadContext,kotlinx.io.Segment,kotlin.Unit>){}[0]
final inline fun iterate(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/BufferIterationContext, kotlinx.io/Segment?, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.iterate|iterate(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.BufferIterationContext,kotlinx.io.Segment?,kotlin.Unit>){}[0]
final inline fun iterate(kotlinx.io/Buffer, kotlin/Long, kotlin/Function3<kotlinx.io.unsafe/BufferIterationContext, kotlinx.io/Segment?, kotlin/Long, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.iterate|iterate(kotlinx.io.Buffer;kotlin.Long;kotlin.Function3<kotlinx.io.unsafe.BufferIterationContext,kotlinx.io.Segment?,kotlin.Long,kotlin.Unit>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/SegmentReadContext, kotlinx.io/Segment, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.SegmentReadContext,kotlinx.io.Segment,kotlin.Int>){}[0]
19 changes: 7 additions & 12 deletions core/apple/src/BuffersApple.kt
Original file line number Diff line number Diff line change
@@ -58,19 +58,14 @@ internal fun Buffer.snapshotAsNSData(): NSData {
val bytes = malloc(size.convert())?.reinterpret<uint8_tVar>()
?: throw Error("malloc failed: ${strerror(errno)?.toKString()}")

UnsafeBufferOperations.iterate(this) { ctx, head ->
var curr: Segment? = head
var index = 0
while (curr != null) {
val segment: Segment = curr
ctx.withData(segment) { data, pos, limit ->
val length = limit - pos
data.usePinned {
memcpy(bytes + index, it.addressOf(pos), length.convert())
}
index += length
var index = 0
UnsafeBufferOperations.forEachSegment(this) { ctx, segment ->
ctx.withData(segment) { data, pos, limit ->
val length = limit - pos
data.usePinned {
memcpy(bytes + index, it.addressOf(pos), length.convert())
}
curr = ctx.next(segment)
index += length
}
}
return NSData.create(bytesNoCopy = bytes, length = size.convert())
25 changes: 10 additions & 15 deletions core/common/src/Buffer.kt
Original file line number Diff line number Diff line change
@@ -553,21 +553,16 @@ public class Buffer : Source, Sink {
val len = minOf(maxPrintableBytes, size).toInt()

val builder = StringBuilder(len * 2 + if (size > maxPrintableBytes) 1 else 0)

UnsafeBufferOperations.iterate(this) { ctx, head ->
var bytesWritten = 0
var seg: Segment? = head
do {
seg!!
var idx = 0
while (bytesWritten < len && idx < seg.size) {
val b = ctx.getUnchecked(seg, idx++)
bytesWritten++
builder.append(HEX_DIGIT_CHARS[(b shr 4) and 0xf])
.append(HEX_DIGIT_CHARS[b and 0xf])
}
seg = ctx.next(seg)
} while (seg != null)
var bytesWritten = 0
UnsafeBufferOperations.forEachSegment(this) { ctx, segment ->
var idx = 0
while (bytesWritten < len && idx < segment.size) {
val b = ctx.getUnchecked(segment, idx++)
bytesWritten++
builder
.append(HEX_DIGIT_CHARS[(b shr 4) and 0xf])
.append(HEX_DIGIT_CHARS[b and 0xf])
}
}

if (size > maxPrintableBytes) {
8 changes: 2 additions & 6 deletions core/common/src/Buffers.kt
Original file line number Diff line number Diff line change
@@ -22,12 +22,8 @@ public fun Buffer.snapshot(): ByteString {
check(size <= Int.MAX_VALUE) { "Buffer is too long ($size) to be converted into a byte string." }

return buildByteString(size.toInt()) {
UnsafeBufferOperations.iterate(this@snapshot) { ctx, head ->
var curr = head
while (curr != null) {
ctx.withData(curr, this::append)
curr = ctx.next(curr)
}
UnsafeBufferOperations.forEachSegment(this@snapshot) { ctx, segment ->
ctx.withData(segment, this::append)
}
}
}
10 changes: 5 additions & 5 deletions core/common/src/Utf8.kt
Original file line number Diff line number Diff line change
@@ -607,17 +607,17 @@ private fun Buffer.commonReadUtf8(byteCount: Long): String {
// Invariant: byteCount was request()'ed into this buffer beforehand
if (byteCount == 0L) return ""

UnsafeBufferOperations.iterate(this) { ctx, head ->
head!!
if (head.size >= byteCount) {
UnsafeBufferOperations.forEachSegment(this) { ctx, segment ->
if (segment.size >= byteCount) {
var result = ""
ctx.withData(head) { data, pos, limit ->
ctx.withData(segment) { data, pos, limit ->
result = data.commonToUtf8String(pos, min(limit, pos + byteCount.toInt()))
skip(byteCount)
return result
}
}
}
// If the string spans multiple segments, delegate to readBytes()
return readByteArray(byteCount.toInt()).commonToUtf8String()
}
error("Unreacheable")
}
38 changes: 33 additions & 5 deletions core/common/src/unsafe/UnsafeBufferOperations.kt
Original file line number Diff line number Diff line change
@@ -295,13 +295,14 @@ public object UnsafeBufferOperations {
* the [iterationAction].
*
* Both [iterationAction] arguments are valid only within [iterationAction] scope,
* it's an error to store and reuse it later.
* it is an error to store and reuse it later.
*
* For a full iteration over buffer's segments, see [forEachSegment].
*
* @param buffer a buffer to iterate over
* @param iterationAction a callback to invoke with the head reference and an iteration context instance
*
* @sample kotlinx.io.samples.unsafe.UnsafeReadWriteSamplesJvm.messageDigest
* @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.crc32Unsafe
* @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.crc32GetUnchecked
*/
public inline fun iterate(
buffer: Buffer,
@@ -352,6 +353,34 @@ public object UnsafeBufferOperations {
iterationAction(BufferIterationContextImpl, s, o)
}
}

/**
* Iterates over [buffer] segments starting from the head.
*
* [action] is invoked with an instance of [SegmentReadContext]
* allowing to read and write in an unchecked manner from [buffer]'s segments
*
* It is considered an error to use a [SegmentReadContext] or a [Segment] instances outside the scope of
* the [action].
*
* Both [action] arguments are valid only within [action] scope, it is an error to store and reuse it later.
* The action might never be invoked if the given [buffer] is empty.
*
* @param buffer a buffer to iterate over
* @param action a callback to invoke with the head reference and an iteration context instance
* @sample kotlinx.io.samples.unsafe.UnsafeReadWriteSamplesJvm.messageDigest
* @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.crc32Unsafe
*/
public inline fun forEachSegment(
buffer: Buffer,
action: (context: SegmentReadContext, segment: Segment) -> Unit
) {
var curr: Segment? = buffer.head
while (curr != null) {
action(SegmentReadContextImpl, curr)
curr = curr.next
}
}
}

/**
@@ -494,8 +523,7 @@ public interface BufferIterationContext : SegmentReadContext {
*
* @param segment a segment for which a successor needs to be found
*
* @sample kotlinx.io.samples.unsafe.UnsafeReadWriteSamplesJvm.messageDigest
* @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.crc32Unsafe
* @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.crc32GetUnchecked
*/
public fun next(segment: Segment): Segment?
}
21 changes: 8 additions & 13 deletions core/common/test/samples/unsafe/unsafeSamples.kt
Original file line number Diff line number Diff line change
@@ -267,20 +267,15 @@ class UnsafeBufferOperationsSamples {
fun Buffer.crc32(): UInt {
var crc32 = 0xffffffffU
// Iterate over segments starting from buffer's head
UnsafeBufferOperations.iterate(this) { ctx, head ->
var currentSegment = head
// If a current segment is null, it means we ran out of segments.
while (currentSegment != null) {
// Get data from a segment
ctx.withData(currentSegment) { data, startIndex, endIndex ->
for (idx in startIndex..<endIndex) {
// Update crc32
val index = data[idx].xor(crc32.toByte()).toUByte()
crc32 = crc32Table[index.toInt()].xor(crc32.shr(8))
}
UnsafeBufferOperations.forEachSegment(this) { ctx, segment ->
var currentSegment = segment
// Get data from a segment
ctx.withData(currentSegment) { data, startIndex, endIndex ->
for (idx in startIndex..<endIndex) {
// Update crc32
val index = data[idx].toUInt().xor(crc32).toUByte()
crc32 = crc32Table[index.toInt()].xor(crc32.shr(8))
}
// Advance to the next segment
currentSegment = ctx.next(currentSegment)
}
}
return crc32.xor(0xffffffffU)
73 changes: 73 additions & 0 deletions core/common/test/unsafe/UnsafeBufferOperationsForEachTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file.
*/
@file:OptIn(UnsafeIoApi::class)

package kotlinx.io.unsafe

import kotlinx.io.Buffer
import kotlinx.io.UnsafeIoApi
import kotlinx.io.assertArrayEquals
import kotlinx.io.writeString
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlin.test.fail

class UnsafeBufferOperationsForEachTest {

@Test
fun emptyBuffer() {
UnsafeBufferOperations.forEachSegment(Buffer()) { _, head ->
fail()
}
}

@Test
fun singleSegment() {
var counter = 0
UnsafeBufferOperations.forEachSegment(Buffer().also { it.writeByte(1) }) { ctx, segment ->
++counter
assertEquals(1, segment.size)
}
assertEquals(1, counter)
}

@Test
fun multipleSegments() {
val buffer = Buffer()

val expectedSegments = 10
for (i in 0 ..< expectedSegments) {
UnsafeBufferOperations.moveToTail(buffer, byteArrayOf(i.toByte()))
}

val storedBytes = ByteArray(expectedSegments)
var idx = 0
UnsafeBufferOperations.forEachSegment(buffer) { ctx, segment ->
assertTrue(idx < expectedSegments)
storedBytes[idx++] = ctx.getUnchecked(segment, 0)
}

assertArrayEquals(ByteArray(expectedSegments) { it.toByte() }, storedBytes)
}

@Test
fun acquireDataDuringIteration() {
val buffer = Buffer().also { it.writeString("hello buffer") }

val expectedSize = buffer.size

UnsafeBufferOperations.forEachSegment(buffer) { ctx, segment ->
ctx.withData(segment) { data, startIndex, endIndex ->
assertEquals("hello buffer", data.decodeToString(startIndex, endIndex))
}
}

assertEquals(expectedSize, buffer.size)
}
}
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
* Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file.
*/
@file:OptIn(UnsafeIoApi::class)

package kotlinx.io.unsafe

@@ -11,8 +12,6 @@ import kotlinx.io.assertArrayEquals
import kotlinx.io.writeString
import kotlin.test.*

@OptIn(UnsafeIoApi::class)

class UnsafeBufferOperationsIterationTest {
@Test
fun callsInPlaceContract() {
14 changes: 5 additions & 9 deletions core/jvm/test/samples/unsafeAccessSamplesJvm.kt
Original file line number Diff line number Diff line change
@@ -108,17 +108,13 @@ class UnsafeReadWriteSamplesJvm {
fun Buffer.digest(algorithm: String): ByteString {
val md = MessageDigest.getInstance(algorithm)
// iterate over all segment and update data
UnsafeBufferOperations.iterate(this) { ctx, head ->
var segment = head
UnsafeBufferOperations.forEachSegment(this) { ctx, segment ->
// when segment is null, we reached the end of a buffer
while (segment != null) {
// access segment data without copying it
ctx.withData(segment) { data, startIndex, endIndex ->
md.update(data, startIndex, endIndex - startIndex)
}
// advance to the next segment
segment = ctx.next(segment)
// access segment data without copying it
ctx.withData(segment) { data, startIndex, endIndex ->
md.update(data, startIndex, endIndex - startIndex)
}
// advance to the next segment
}
return UnsafeByteStringOperations.wrapUnsafe(md.digest())
}