-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_broker_fault_tolerance.py
131 lines (109 loc) · 4.84 KB
/
test_broker_fault_tolerance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import requests
import time
broker1_url = "http://localhost:8000"
broker2_url = "http://localhost:8001"
broker3_url = "http://localhost:8002"
def assertEqual(a, b):
assert a == b
if __name__ == '__main__':
# Create topic partition
response = requests.post(f"{broker1_url}/new", json={
"topic": "fault-tolerance",
"partition": 1,
"partners": ["raft-broker2:9000","raft-broker1:9000","raft-broker3:9000"]
})
assertEqual(response.status_code, 200)
print("Created topic 'fault-tolerance' with partition '1' on broker 1")
# Create topic partition
response = requests.post(f"{broker2_url}/new", json={
"topic": "fault-tolerance",
"partition": 1,
"partners": ["raft-broker2:9000","raft-broker1:9000","raft-broker3:9000"]
})
assertEqual(response.status_code, 200)
print("Created topic 'fault-tolerance' with partition '1' on broker 2")
# Create topic partition
response = requests.post(f"{broker3_url}/new", json={
"topic": "fault-tolerance",
"partition": 1,
"partners": ["raft-broker2:9000","raft-broker1:9000","raft-broker3:9000"]
})
assertEqual(response.status_code, 200)
print("Created topic 'fault-tolerance' with partition '1' on broker 3")
time.sleep(1)
# Create a message on broker 1
response = requests.post(f"{broker1_url}/messages", json={
"topic": "fault-tolerance",
"partition": 1,
"content": "fault-message",
})
assertEqual(response.status_code, 201)
print("Created message 'fault-message' on broker 1")
print("Pausing broker 1")
response = requests.get(f"{broker1_url}/pause")
assertEqual(response.status_code, 200)
# Test is broker 1 is paused - 503
response = requests.get(f"{broker1_url}/ping")
assertEqual(response.status_code, 503)
print("Broker 1 is paused")
# offset = response.json()["id"]
# Check if the message exists on broker 2
response = requests.get(f"{broker2_url}/messages",params={
"topic": "fault-tolerance",
"partition": 1,
"offset": 0,
})
assertEqual(response.status_code, 200)
assertEqual(response.json()["content"], "fault-message")
print("Got message {} from broker 2".format(response.json()["content"]))
# Check if the message exists on broker 3
response = requests.get(f"{broker3_url}/messages",params={
"topic": "fault-tolerance",
"partition": 1,
"offset": 0,
})
assertEqual(response.status_code, 200)
assertEqual(response.json()["content"], "fault-message")
print("Got message {} from broker 3".format(response.json()["content"]))
# Add a new message to broker 2
response = requests.post(f"{broker2_url}/messages", json={
"topic": "fault-tolerance",
"partition": 1,
"content": "fault-message-2",
})
assertEqual(response.status_code, 201)
print("Created message 'fault-message-2' on broker 2")
print("Getting broker 1 back up")
# Get broker1 back up
response = requests.get(f"{broker1_url}/pause")
assertEqual(response.status_code, 200)
# Check if broker 1 is up
response = requests.get(f"{broker1_url}/ping")
assertEqual(response.status_code, 200)
print("Broker 1 is back up")
# Check if the message exists on broker 1
response = requests.get(f"{broker1_url}/messages",params={
"topic": "fault-tolerance",
"partition": 1,
"offset": 0,
})
assertEqual(response.status_code, 200)
assertEqual(response.json()["content"], "fault-message")
print("Got message {} from broker 1".format(response.json()["content"]))
# Get length of queue from broker 1
response = requests.get(f"{broker1_url}/messages/count",params={
"topic": "fault-tolerance",
"partition": 1,
})
assertEqual(response.status_code, 200)
assertEqual(response.json(), 2)
print("Got message count {} from broker 1".format(response.json()))
# Check if the message exists on broker 1
response = requests.get(f"{broker1_url}/messages",params={
"topic": "fault-tolerance",
"partition": 1,
"offset": 1,
})
assertEqual(response.status_code, 200)
print("Got message {} from broker 1".format(response.json()["content"]))
assertEqual(response.json()["content"], "fault-message-2")