forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_test.go
121 lines (97 loc) · 2.53 KB
/
client_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package sarama
import (
"testing"
)
func TestDefaultClientConfigValidates(t *testing.T) {
config := NewClientConfig()
if err := config.Validate(); err != nil {
t.Error(err)
}
}
func TestSimpleClient(t *testing.T) {
mb := NewMockBroker(t, 1)
mb.Returns(new(MetadataResponse))
client, err := NewClient("client_id", []string{mb.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
defer client.Close()
defer mb.Close()
}
func TestClientExtraBrokers(t *testing.T) {
mb1 := NewMockBroker(t, 1)
mb2 := NewMockBroker(t, 2)
mdr := new(MetadataResponse)
mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
mb1.Returns(mdr)
client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
defer client.Close()
defer mb1.Close()
defer mb2.Close()
}
func TestClientMetadata(t *testing.T) {
mb1 := NewMockBroker(t, 1)
mb5 := NewMockBroker(t, 5)
mdr := new(MetadataResponse)
mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID())
mb1.Returns(mdr)
client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
defer client.Close()
defer mb1.Close()
defer mb5.Close()
topics, err := client.Topics()
if err != nil {
t.Error(err)
} else if len(topics) != 1 || topics[0] != "my_topic" {
t.Error("Client returned incorrect topics:", topics)
}
parts, err := client.Partitions("my_topic")
if err != nil {
t.Error(err)
} else if len(parts) != 1 || parts[0] != 0 {
t.Error("Client returned incorrect partitions for my_topic:", parts)
}
tst, err := client.Leader("my_topic", 0)
if err != nil {
t.Error(err)
} else if tst.ID() != 5 {
t.Error("Leader for my_topic had incorrect ID.")
}
}
func TestClientRefreshBehaviour(t *testing.T) {
mb1 := NewMockBroker(t, 1)
mb5 := NewMockBroker(t, 5)
mdr := new(MetadataResponse)
mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
mb1.Returns(mdr)
mdr2 := new(MetadataResponse)
mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID())
mb5.Returns(mdr2)
client, err := NewClient("clientID", []string{mb1.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
defer client.Close()
defer mb1.Close()
defer mb5.Close()
parts, err := client.Partitions("my_topic")
if err != nil {
t.Error(err)
} else if len(parts) != 1 || parts[0] != 0xb {
t.Error("Client returned incorrect partitions for my_topic:", parts)
}
tst, err := client.Leader("my_topic", 0xb)
if err != nil {
t.Error(err)
} else if tst.ID() != 5 {
t.Error("Leader for my_topic had incorrect ID.")
}
client.disconnectBroker(tst)
}