-
Notifications
You must be signed in to change notification settings - Fork 137
/
Copy pathqueues.py
107 lines (79 loc) · 2.75 KB
/
queues.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
A full example of creating a queue with all options.
"""
import os
import grpc
from armada_client.client import ArmadaClient
from armada_client.permissions import Permissions, Subject
def create_queue_request(client, queue):
"""
Creates a queue request.
"""
subject = Subject(kind="Group", name="group1")
permissions = Permissions(subjects=[subject], verbs=["cancel", "reprioritize"])
resource_limits = {"cpu": 1.0, "memory": 1.0}
queue_req = client.create_queue_request(
name=queue,
priority_factor=3.0,
user_owners=["user1"],
group_owners=["group1"],
resource_limits=resource_limits,
permissions=[permissions],
)
return queue_req
def creating_full_queue_example(client, queue):
"""
Creates a queue.
Will update the queue if it already exists.
"""
queue_req = create_queue_request(client, queue)
# Make sure we handle the queue already existing
try:
client.create_queue(queue_req)
# Handle the error we expect to maybe occur
except grpc.RpcError as e:
code = e.code()
if code == grpc.StatusCode.ALREADY_EXISTS:
print(f"Queue {queue} already exists")
client.update_queue(queue_req)
else:
raise e
def creating_multiple_queues_example(client, queue):
"""
Creates two queues.
Will update the queues if they already exist.
"""
queue_req1 = create_queue_request(client, queue)
queue_req2 = create_queue_request(client, queue + "2")
resp = client.create_queues([queue_req1, queue_req2])
if resp.failed_queues:
for queue_resp in resp.failed_queues:
print(f"Failed to create {queue_resp.queue.name}: {queue_resp.error}")
def workflow():
"""
Starts a workflow, which includes:
- Creating a queue
- Creating a queue with a batch of queues
"""
# The queue and job_set_id that will be used for all jobs
queue = "test-queues"
# Ensures that the correct channel type is generated
if DISABLE_SSL:
channel = grpc.insecure_channel(f"{HOST}:{PORT}")
else:
channel_credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(
f"{HOST}:{PORT}",
channel_credentials,
)
client = ArmadaClient(channel)
creating_full_queue_example(client, queue)
creating_multiple_queues_example(client, queue)
if __name__ == "__main__":
# Note that the form of ARMADA_SERVER should be something like
# domain.com, localhost, or 0.0.0.0
DISABLE_SSL = os.environ.get("DISABLE_SSL", False)
HOST = os.environ.get("ARMADA_SERVER", "localhost")
PORT = os.environ.get("ARMADA_PORT", "50051")
workflow()
print("Completed Workflow")