Skip to content

Commit

Permalink
Implement multiple partions usage in Kafka trigger (#2360)
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Ribeiro De Araujo <[email protected]>
Co-authored-by: Thomas Ribeiro De Araujo <[email protected]>
  • Loading branch information
2 people authored and whynowy committed Jan 17, 2023
1 parent a0ef3c2 commit eeae3da
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 336 deletions.
5 changes: 2 additions & 3 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3582,12 +3582,12 @@
"type": "array"
},
"partition": {
"description": "Partition to write data to.",
"description": "DEPRECATED",
"format": "int32",
"type": "integer"
},
"partitioningKey": {
"description": "The partitioning key for the messages put on the Kafka topic. Defaults to broker url.",
"description": "The partitioning key for the messages put on the Kafka topic.",
"type": "string"
},
"payload": {
Expand Down Expand Up @@ -3626,7 +3626,6 @@
"required": [
"url",
"topic",
"partition",
"payload"
],
"type": "object"
Expand Down
5 changes: 2 additions & 3 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

615 changes: 310 additions & 305 deletions pkg/apis/sensor/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/apis/sensor/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/apis/sensor/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/apis/sensor/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ type KafkaTrigger struct {
// Name of the topic.
// More info at https://kafka.apache.org/documentation/#intro_topics
Topic string `json:"topic" protobuf:"bytes,2,opt,name=topic"`
// Partition to write data to.
// +optional
// DEPRECATED
Partition int32 `json:"partition" protobuf:"varint,3,opt,name=partition"`
// Parameters is the list of parameters that is applied to resolved Kafka trigger object.
Parameters []TriggerParameter `json:"parameters,omitempty" protobuf:"bytes,4,rep,name=parameters"`
Expand All @@ -591,9 +592,8 @@ type KafkaTrigger struct {
// Payload is the list of key-value extracted from an event payload to construct the request payload.
Payload []TriggerParameter `json:"payload" protobuf:"bytes,9,rep,name=payload"`
// The partitioning key for the messages put on the Kafka topic.
// Defaults to broker url.
// +optional.
PartitioningKey string `json:"partitioningKey,omitempty" protobuf:"bytes,10,opt,name=partitioningKey"`
PartitioningKey *string `json:"partitioningKey,omitempty" protobuf:"bytes,10,opt,name=partitioningKey"`
// Specify what kafka version is being connected to enables certain features in sarama, defaults to 1.0.0
// +optional
Version string `json:"version,omitempty" protobuf:"bytes,11,opt,name=version"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions sensors/triggers/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,19 @@ func (t *KafkaTrigger) Execute(ctx context.Context, events map[string]*v1alpha1.
return nil, err
}

pk := trigger.PartitioningKey
if pk == "" {
pk = trigger.URL
}

t.Producer.Input() <- &sarama.ProducerMessage{
msg := &sarama.ProducerMessage{
Topic: trigger.Topic,
Key: sarama.StringEncoder(pk),
Value: sarama.ByteEncoder(payload),
Partition: trigger.Partition,
Timestamp: time.Now().UTC(),
}

t.Logger.Infow("successfully produced a message", zap.Any("topic", trigger.Topic), zap.Any("partition", trigger.Partition))
if trigger.PartitioningKey != nil {
msg.Key = sarama.StringEncoder(*trigger.PartitioningKey)
}

t.Producer.Input() <- msg

t.Logger.Infow("successfully produced a message", zap.Any("topic", trigger.Topic))

return nil, nil
}
Expand Down
3 changes: 1 addition & 2 deletions sensors/triggers/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ var sensorObj = &v1alpha1.Sensor{
Kafka: &v1alpha1.KafkaTrigger{
URL: "fake-kafka-url",
Topic: "fake-topic",
Partition: 0,
Parameters: nil,
RequiredAcks: 1,
Compress: false,
Expand All @@ -53,7 +52,7 @@ var sensorObj = &v1alpha1.Sensor{
},
TLS: nil,
Payload: nil,
PartitioningKey: "",
PartitioningKey: nil,
},
},
},
Expand Down

0 comments on commit eeae3da

Please sign in to comment.