-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy patheph.go
68 lines (61 loc) · 2.4 KB
/
eph.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package azureeventhub
import (
"context"
"fmt"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/Azure/azure-event-hubs-go/v3/storage"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"
)
// runWithEPH will consume ingested events using the Event Processor Host (EPH) https://github.com/Azure/azure-event-hubs-go#event-processor-host, https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host
func (a *azureInput) runWithEPH() error {
// create a new Azure Storage Leaser / Checkpointer
cred, err := azblob.NewSharedKeyCredential(a.config.SAName, a.config.SAKey)
if err != nil {
return err
}
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, a.config.SAName, a.config.SAContainer, azure.PublicCloud)
if err != nil {
return err
}
// adding a nil EventProcessorHostOption will break the code, this is why a condition is added and a.processor is assigned
if a.config.ConsumerGroup != "" {
a.processor, err = eph.NewFromConnectionString(
a.workerCtx,
fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName),
leaserCheckpointer,
leaserCheckpointer,
eph.WithConsumerGroup(a.config.ConsumerGroup))
} else {
a.processor, err = eph.NewFromConnectionString(
a.workerCtx,
fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName),
leaserCheckpointer,
leaserCheckpointer)
}
if err != nil {
return err
}
// register a message handler -- many can be registered
handlerID, err := a.processor.RegisterHandler(a.workerCtx,
func(c context.Context, e *eventhub.Event) error {
// partitionID is not yet mapped in the azure-eventhub sdk
return a.processEvents(e, "")
})
if err != nil {
return err
}
a.log.Infof("handler id: %q is running\n", handlerID)
// unregister a handler to stop that handler from receiving events
// processor.UnregisterHandler(ctx, handleID)
// start handling messages from all of the partitions balancing across multiple consumers
err = a.processor.Start(a.workerCtx)
if err != nil {
return err
}
return nil
}