-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnatsdedup_test.go
60 lines (49 loc) · 1.47 KB
/
natsdedup_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package natsdedup_test
import (
"testing"
"time"
"github.com/claudiunicolaa/natsdedup"
"github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDeduplicator(t *testing.T) {
// Start a NATS server for testing
natsServer := test.RunDefaultServer()
defer natsServer.Shutdown()
// Connect to the test NATS server
nc, err := nats.Connect(natsServer.ClientURL())
require.NoError(t, err)
defer nc.Close()
// Set up test subjects and deduplicator
inputSubject := "test.source"
outputSubject := "test.destination"
deduplicationTTL := 100 * time.Millisecond
deduplicator := natsdedup.NewDeduplicator(inputSubject, outputSubject, deduplicationTTL)
require.NoError(t, deduplicator.Run(nc))
// Test deduplication
duplicateMessage := []byte("duplicate message")
numMessages := 5
numDeduplicated := 0
outputSub, err := nc.SubscribeSync(outputSubject)
require.NoError(t, err)
// Send duplicate messages
for i := 0; i < numMessages; i++ {
require.NoError(t, nc.Publish(inputSubject, duplicateMessage))
}
// Try to receive messages on the output subject
for {
msg, err := outputSub.NextMsg(200 * time.Millisecond)
if err != nil {
if err == nats.ErrTimeout {
break
}
require.NoError(t, err)
}
assert.Equal(t, duplicateMessage, msg.Data)
numDeduplicated++
}
// Expect only one message to be forwarded due to deduplication
assert.Equal(t, 1, numDeduplicated)
}