diff --git a/schemaregistry/internal/rest_service.go b/schemaregistry/internal/rest_service.go index 87f98393c..a1ba70537 100644 --- a/schemaregistry/internal/rest_service.go +++ b/schemaregistry/internal/rest_service.go @@ -69,8 +69,8 @@ const ( Keks = "/dek-registry/v1/keks" KekByName = Keks + "/%s?deleted=%t" Deks = Keks + "/%s/deks" - DeksBySubject = Deks + "/%s?deleted=%t" - DeksByVersion = Deks + "/%s/versions/%v?deleted=%t" + DeksBySubject = Deks + "/%s?algorithm=%s&deleted=%t" + DeksByVersion = Deks + "/%s/versions/%v?algorithm=%s&deleted=%t" TargetSRClusterKey = "Target-Sr-Cluster" TargetIdentityPoolIDKey = "Confluent-Identity-Pool-Id" diff --git a/schemaregistry/rules/encryption/deks/dekregistry_client.go b/schemaregistry/rules/encryption/deks/dekregistry_client.go index 8be80969c..0cd0d13e6 100644 --- a/schemaregistry/rules/encryption/deks/dekregistry_client.go +++ b/schemaregistry/rules/encryption/deks/dekregistry_client.go @@ -349,7 +349,7 @@ func (c *client) GetDek(kekName string, subject string, algorithm string, delete // another goroutine could have already put it in cache cacheValue, ok = c.dekCache.Get(cacheKey) if !ok { - err = c.restService.HandleRequest(internal.NewRequest("GET", internal.DeksBySubject, nil, url.QueryEscape(kekName), url.QueryEscape(subject), deleted), &dek) + err = c.restService.HandleRequest(internal.NewRequest("GET", internal.DeksBySubject, nil, url.QueryEscape(kekName), url.QueryEscape(subject), algorithm, deleted), &dek) if err == nil { c.dekCache.Put(cacheKey, &dek) } @@ -423,7 +423,7 @@ func (c *client) GetDekVersion(kekName string, subject string, version int, algo // another goroutine could have already put it in cache cacheValue, ok = c.dekCache.Get(cacheKey) if !ok { - err = c.restService.HandleRequest(internal.NewRequest("GET", internal.DeksByVersion, nil, url.QueryEscape(kekName), url.QueryEscape(subject), version, deleted), &dek) + err = c.restService.HandleRequest(internal.NewRequest("GET", internal.DeksByVersion, nil, url.QueryEscape(kekName), url.QueryEscape(subject), version, algorithm, deleted), &dek) if err == nil { c.dekCache.Put(cacheKey, &dek) } diff --git a/schemaregistry/serde/avrov2/avro_test.go b/schemaregistry/serde/avrov2/avro_test.go index ab5b1363f..e4cf92602 100644 --- a/schemaregistry/serde/avrov2/avro_test.go +++ b/schemaregistry/serde/avrov2/avro_test.go @@ -1351,6 +1351,81 @@ func TestAvroSerdeEncryption(t *testing.T) { serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } +func TestAvroSerdeEncryptionDeterministic(t *testing.T) { + serde.MaybeFail = serde.InitFailFunc(t) + var err error + + conf := schemaregistry.NewConfig("mock://") + + client, err := schemaregistry.NewClient(conf) + serde.MaybeFail("Schema Registry configuration", err) + + serConfig := NewSerializerConfig() + serConfig.AutoRegisterSchemas = false + serConfig.UseLatestVersion = true + serConfig.RuleConfig = map[string]string{ + "secret": "mysecret", + } + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) + serde.MaybeFail("Serializer configuration", err) + + encRule := schemaregistry.Rule{ + Name: "test-encrypt", + Kind: "TRANSFORM", + Mode: "WRITEREAD", + Type: "ENCRYPT", + Tags: []string{"PII"}, + Params: map[string]string{ + "encrypt.kek.name": "kek1", + "encrypt.kms.type": "local-kms", + "encrypt.kms.key.id": "mykey", + "encrypt.dek.algorithm": "AES256_SIV", + }, + OnFailure: "ERROR,NONE", + } + ruleSet := schemaregistry.RuleSet{ + DomainRules: []schemaregistry.Rule{encRule}, + } + + info := schemaregistry.SchemaInfo{ + Schema: demoSchema, + SchemaType: "AVRO", + RuleSet: &ruleSet, + } + + id, err := client.Register("topic1-value", info, false) + serde.MaybeFail("Schema registration", err) + if id <= 0 { + t.Errorf("Expected valid schema id, found %d", id) + } + + obj := DemoSchema{} + obj.IntField = 123 + obj.DoubleField = 45.67 + obj.StringField = "hi" + obj.BoolField = true + obj.BytesField = []byte{1, 2} + + bytes, err := ser.Serialize("topic1", &obj) + serde.MaybeFail("serialization", err) + + // Reset encrypted field + obj.StringField = "hi" + obj.BytesField = []byte{1, 2} + + deserConfig := NewDeserializerConfig() + deserConfig.RuleConfig = map[string]string{ + "secret": "mysecret", + } + deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + serde.MaybeFail("Deserializer configuration", err) + deser.Client = ser.Client + deser.MessageFactory = testMessageFactory + + newobj, err := deser.Deserialize("topic1", bytes) + serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) +} + func TestAvroSerdeEncryptionWithSimpleMap(t *testing.T) { serde.MaybeFail = serde.InitFailFunc(t) var err error