Skip to content

Commit

Permalink
Add utilities for encoding/decoding of Deletion Vectors bitmap
Browse files Browse the repository at this point in the history
This PR is part of the feature: Support reading Delta tables with deletion vectors (more at #1485)

Deletion vectors are stored either in a separate file or inline as part of `AddFile` struct in DeltaLog. More details of the format are found [here](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-descriptor-schema)

In this PR, add utilities to encode/decode the DV bitmap in Base85 variant [Z85](https://rfc.zeromq.org/spec/32/) for storing it in `AddFile` struct in DeltaLog.

Close #1487

GitOrigin-RevId: e12b67abd7498b174cd3942b7c0ae82ffd362cc6
  • Loading branch information
vkorukanti authored and allisonport-db committed Nov 28, 2022
1 parent 6726e57 commit cfa85f0
Show file tree
Hide file tree
Showing 2 changed files with 396 additions and 0 deletions.
208 changes: 208 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/util/Codec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.util

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets.US_ASCII
import java.util.UUID

import com.google.common.primitives.UnsignedInteger

/** Additional codecs not supported by Apache Commons Codecs. */
object Codec {

def uuidToBytes(id: UUID): Array[Byte] = uuidToByteBuffer(id).array()

def uuidFromBytes(bytes: Array[Byte]): UUID = {
require(bytes.length == 16)
uuidFromByteBuffer(ByteBuffer.wrap(bytes))
}

def uuidToByteBuffer(id: UUID): ByteBuffer = {
val buffer = ByteBuffer.allocate(16)
buffer.putLong(id.getMostSignificantBits)
buffer.putLong(id.getLeastSignificantBits)
buffer.rewind()
buffer
}

def uuidFromByteBuffer(buffer: ByteBuffer): UUID = {
require(buffer.remaining() >= 16)
val highBits = buffer.getLong
val lowBits = buffer.getLong
new UUID(highBits, lowBits)
}

/**
* This implements Base85 using the 4 byte block aligned encoding and character set from Z85.
*
* @see https://rfc.zeromq.org/spec/32/
*/
object Base85Codec {

final val ENCODE_MAP: Array[Byte] = {
val chars = ('0' to '9') ++ ('a' to 'z') ++ ('A' to 'Z') ++ ".-:+=^!/*?&<>()[]{}@%$#"
chars.map(_.toByte).toArray
}

lazy val DECODE_MAP: Array[Byte] = {
require(ENCODE_MAP.length - 1 <= Byte.MaxValue)
// The bitmask is the same as largest possible value, so the length of the array must
// be one greater.
val map: Array[Byte] = Array.fill(ASCII_BITMASK + 1)(-1)
for ((b, i) <- ENCODE_MAP.zipWithIndex) {
map(b) = i.toByte
}
map
}

final val BASE: Long = 85L
final val BASE_2ND_POWER: Long = 7225L // 85^2
final val BASE_3RD_POWER: Long = 614125L // 85^3
final val BASE_4TH_POWER: Long = 52200625L // 85^4
final val ASCII_BITMASK: Int = 0x7F

// UUIDs always encode into 20 characters.
final val ENCODED_UUID_LENGTH: Int = 20

/** Encode a 16 byte UUID. */
def encodeUUID(id: UUID): String = {
val buffer = uuidToByteBuffer(id)
encodeBlocks(buffer)
}

/**
* Decode a 16 byte UUID. */
def decodeUUID(encoded: String): UUID = {
val buffer = decodeBlocks(encoded)
uuidFromByteBuffer(buffer)
}

/**
* Encode an arbitrary byte array.
*
* Unaligned input will be padded to a multiple of 4 bytes.
*/
def encodeBytes(input: Array[Byte]): String = {
if (input.length % 4 == 0) {
encodeBlocks(ByteBuffer.wrap(input))
} else {
val alignedLength = ((input.length + 4) / 4) * 4
val buffer = ByteBuffer.allocate(alignedLength)
buffer.put(input)
while (buffer.hasRemaining) {
buffer.put(0.asInstanceOf[Byte])
}
buffer.rewind()
encodeBlocks(buffer)
}
}

/**
* Encode an arbitrary byte array using 4 byte blocks.
*
* Expects the input to be 4 byte aligned.
*/
private def encodeBlocks(buffer: ByteBuffer): String = {
require(buffer.remaining() % 4 == 0)
val numBlocks = buffer.remaining() / 4
// Every 4 byte block gets encoded into 5 bytes/chars
val outputLength = numBlocks * 5
val output: Array[Byte] = Array.ofDim(outputLength)
var outputIndex = 0

while (buffer.hasRemaining) {
var sum: Long = buffer.getInt & 0x00000000ffffffffL
output(outputIndex) = ENCODE_MAP((sum / BASE_4TH_POWER).toInt)
sum %= BASE_4TH_POWER
output(outputIndex + 1) = ENCODE_MAP((sum / BASE_3RD_POWER).toInt)
sum %= BASE_3RD_POWER
output(outputIndex + 2) = ENCODE_MAP((sum / BASE_2ND_POWER).toInt)
sum %= BASE_2ND_POWER
output(outputIndex + 3) = ENCODE_MAP((sum / BASE).toInt)
output(outputIndex + 4) = ENCODE_MAP((sum % BASE).toInt)
outputIndex += 5
}

new String(output, US_ASCII)
}

/**
* Decode an arbitrary byte array.
*
* Only `outputLength` bytes will be returned.
* Any extra bytes, such as padding added because the input was unaligned, will be dropped.
*/
def decodeBytes(encoded: String, outputLength: Int): Array[Byte] = {
val result = decodeBlocks(encoded)
if (result.remaining() > outputLength) {
// Only read the expected number of bytes.
val output: Array[Byte] = Array.ofDim(outputLength)
result.get(output)
output
} else {
result.array()
}
}

/**
* Decode an arbitrary byte array.
*
* Output may contain padding bytes, if the input was not 4 byte aligned.
* Use [[decodeBytes]] in that case and specify the expected number of output bytes
* without padding.
*/
def decodeAlignedBytes(encoded: String): Array[Byte] = decodeBlocks(encoded).array()

/**
* Decode an arbitrary byte array.
*
* Output may contain padding bytes, if the input was not 4 byte aligned.
*/
private def decodeBlocks(encoded: String): ByteBuffer = {
val input = encoded.toCharArray
require(input.length % 5 == 0, "Input should be 5 character aligned.")
val buffer = ByteBuffer.allocate(input.length / 5 * 4)

// A mechanism to detect invalid characters in the input while decoding, that only has a
// single conditional at the very end, instead of branching for every character.
var canary: Int = 0
def decodeInputChar(i: Int): Long = {
val c = input(i)
canary |= c // non-ascii char has bits outside of ASCII_BITMASK
val b = DECODE_MAP(c & ASCII_BITMASK)
canary |= b // invalid char maps to -1, which has bits outside ASCII_BITMASK
b.toLong
}

var inputIndex = 0
while (buffer.hasRemaining) {
var sum = 0L
sum += decodeInputChar(inputIndex) * BASE_4TH_POWER
sum += decodeInputChar(inputIndex + 1) * BASE_3RD_POWER
sum += decodeInputChar(inputIndex + 2) * BASE_2ND_POWER
sum += decodeInputChar(inputIndex + 3) * BASE
sum += decodeInputChar(inputIndex + 4)
buffer.putInt(sum.toInt)
inputIndex += 5
}
require((canary & ~ASCII_BITMASK) == 0, s"Input is not valid Z85: $encoded")
buffer.rewind()
buffer
}
}
}
188 changes: 188 additions & 0 deletions core/src/test/scala/org/apache/spark/sql/delta/util/CodecSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.util

import java.nio.charset.StandardCharsets.US_ASCII
import java.util.UUID

import scala.util.Random

import org.apache.spark.SparkFunSuite

class CodecSuite extends SparkFunSuite {

import CodecSuite._

// Z85 reference strings are generated by https://cryptii.com/pipes/z85-encoder
val testUuids = Seq[(UUID, String)](
new UUID(0L, 0L) -> "00000000000000000000",
new UUID(Long.MinValue, Long.MinValue) -> "Fb/MH00000Fb/MH00000",
new UUID(-1L, -1L) -> "%nSc0%nSc0%nSc0%nSc0",
new UUID(0L, Long.MinValue) -> "0000000000Fb/MH00000",
new UUID(0L, -1L) -> "0000000000%nSc0%nSc0",
new UUID(0L, Long.MaxValue) -> "0000000000Fb/MG%nSc0",
new UUID(Long.MinValue, 0L) -> "Fb/MH000000000000000",
new UUID(-1L, 0L) -> "%nSc0%nSc00000000000",
new UUID(Long.MaxValue, 0L) -> "Fb/MG%nSc00000000000",
new UUID(0L, 1L) -> "00000000000000000001",
// Just a few random ones, using literals for test determinism
new UUID(-4124158004264678669L, -6032951921472435211L) -> "-(5oirYA.yTvx6v@H:L>",
new UUID(6453181356142382984L, 8208554093199893996L) -> "s=Mlx-0Pp@AQ6uw@k6=D",
new UUID(6453181356142382984L, -8208554093199893996L) -> "s=Mlx-0Pp@JUL=R13LuL",
new UUID(-4124158004264678669L, 8208554093199893996L) -> "-(5oirYA.yAQ6uw@k6=D")

// From https://rfc.zeromq.org/spec/32/ - Test Case
test("Z85 spec reference value") {
val inputBytes: Array[Byte] =
Array(0x86, 0x4F, 0xD2, 0x6F, 0xB5, 0x59, 0xF7, 0x5B).map(_.toByte)
val expectedEncodedString = "HelloWorld"
val actualEncodedString = Codec.Base85Codec.encodeBytes(inputBytes)
assert(actualEncodedString === expectedEncodedString)
val outputBytes = Codec.Base85Codec.decodeAlignedBytes(actualEncodedString)
assert(outputBytes sameElements inputBytes)
}

test("Z85 reference implementation values") {
for ((id, expectedEncodedString) <- testUuids) {
val actualEncodedString = Codec.Base85Codec.encodeUUID(id)
assert(actualEncodedString === expectedEncodedString)
}
}

test("Z85 spec character map") {
assert(Codec.Base85Codec.ENCODE_MAP.length === 85)
val referenceBytes = Seq(
0x00, 0x09, 0x98, 0x62, 0x0f, 0xc7, 0x99, 0x43, 0x1f, 0x85,
0x9a, 0x24, 0x2f, 0x43, 0x9b, 0x05, 0x3f, 0x01, 0x9b, 0xe6,
0x4e, 0xbf, 0x9c, 0xc7, 0x5e, 0x7d, 0x9d, 0xa8, 0x6e, 0x3b,
0x9e, 0x89, 0x7d, 0xf9, 0x9f, 0x6a, 0x8d, 0xb7, 0xa0, 0x4b,
0x9d, 0x75, 0xa1, 0x2c, 0xad, 0x33, 0xa2, 0x0d, 0xbc, 0xf1,
0xa2, 0xee, 0xcc, 0xaf, 0xa3, 0xcf, 0xdc, 0x6d, 0xa4, 0xb0,
0xec, 0x2b, 0xa5, 0x91, 0xfb, 0xe9, 0xa6, 0x72)
.map(_.toByte).toArray
val referenceString = new String(Codec.Base85Codec.ENCODE_MAP, US_ASCII)
val encodedString = Codec.Base85Codec.encodeBytes(referenceBytes)
assert(encodedString === referenceString)
val decodedBytes = Codec.Base85Codec.decodeAlignedBytes(encodedString)
assert(decodedBytes sameElements referenceBytes)
}

test("Reject illegal Z85 input - unaligned string") {
// Minimum string should 5 characters
val illegalEncodedString = "abc"
assertThrows[IllegalArgumentException] {
Codec.Base85Codec.decodeBytes(
illegalEncodedString,
// This value is irrelevant, any value should cause the failure.
outputLength = 3)
}
}

// scalastyle:off nonascii
test(s"Reject illegal Z85 input - illegal character") {
for (char <- Seq[Char]('î', 'π', '"', 0x7F)) {
val illegalEncodedString = String.valueOf(Array[Char]('a', 'b', char, 'd', 'e'))
val ex = intercept[IllegalArgumentException] {
Codec.Base85Codec.decodeAlignedBytes(illegalEncodedString)
}
assert(ex.getMessage.contains("Input is not valid Z85"))
}
}
// scalastyle:on nonascii

test("base85 codec uuid roundtrips") {
for ((id, _) <- testUuids) {
val encodedString = Codec.Base85Codec.encodeUUID(id)
// 16 bytes always get encoded into 20 bytes with Base85.
assert(encodedString.length === Codec.Base85Codec.ENCODED_UUID_LENGTH)
val decodedId = Codec.Base85Codec.decodeUUID(encodedString)
assert(id === decodedId, s"encodedString = $encodedString")
}
}

test("base85 codec empty byte array") {
val empty = Array.empty[Byte]
val encodedString = Codec.Base85Codec.encodeBytes(empty)
assert(encodedString === "")
val decodedArray = Codec.Base85Codec.decodeAlignedBytes(encodedString)
assert(decodedArray.isEmpty)
val decodedArray2 = Codec.Base85Codec.decodeBytes(encodedString, 0)
assert(decodedArray2.isEmpty)
}

test("base85 codec byte array random roundtrips") {
val rand = new Random(1L) // Fixed seed for determinism
val arrayLengths = (1 to 20) ++ Seq(32, 56, 64, 128, 1022, 11 * 1024 * 1024)

for (len <- arrayLengths) {
val inputArray: Array[Byte] = Array.ofDim(len)
rand.nextBytes(inputArray)
val encodedString = Codec.Base85Codec.encodeBytes(inputArray)
val decodedArray = Codec.Base85Codec.decodeBytes(encodedString, len)
assert(decodedArray === inputArray, s"encodedString = $encodedString")
}
}

/**
* Execute `thunk` works for strings containing any of the possible base85 characters at either
* beginning, middle, or end positions.
*/
private def forAllEncodedStrings(thunk: String => Unit): Unit = {
// Basically test that every possible character can occur at any
// position with a 20 character string.
val characterString = new String(Codec.Base85Codec.ENCODE_MAP, US_ASCII)
// Use this to fill in the remaining 17 characters.
val fillerChar = "x"

var count = 0
for {
firstChar <- characterString
middleChar <- characterString
finalChar <- characterString
} {
val sb = new StringBuilder
sb += firstChar
sb ++= fillerChar * 9
sb += middleChar
sb ++= fillerChar * 8
sb += finalChar
val encodedString = sb.toString()
assert(encodedString.length === 20)
thunk(encodedString)
count += 1
}
assert(count === 85 * 85 * 85)
}

test("base85 character set is JSON-safe") {
forAllEncodedStrings { inputString =>
val inputObject = JsonRoundTripContainer(inputString)
val jsonString = JsonUtils.toJson(inputObject)
assert(jsonString.contains(inputString),
"Some character from the input had to be escaped to be JSON-safe:" +
s"input = '$inputString' vs JSON = '$jsonString'")
val outputObject = JsonUtils.fromJson[JsonRoundTripContainer](jsonString)
val outputString = outputObject.data
assert(inputString === outputString)
}
}

}

object CodecSuite {
final case class JsonRoundTripContainer(data: String)
}

0 comments on commit cfa85f0

Please sign in to comment.