-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver_async.cc
116 lines (103 loc) · 3.34 KB
/
server_async.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <grpcpp/grpcpp.h>
#include <iostream>
#include <memory>
#include "hello.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using simple_service::HelloReply;
using simple_service::HelloRequest;
using simple_service::SimpleService;
class ServerAsync {
public:
~ServerAsync() {
server->Shutdown();
cq->Shutdown();
}
void run() {
ServerBuilder builder;
builder.AddListeningPort(
"0.0.0.0:50051",
grpc::InsecureServerCredentials());
builder.RegisterService(&service);
cq = builder.AddCompletionQueue();
server = builder.BuildAndStart();
std::cout << "Server is listening on connections..." << std::endl;
// Server's main loop
loop();
}
private:
class CallData {
public:
CallData(SimpleService::AsyncService* service_, ServerCompletionQueue* cq_)
: service(service_),
cq(cq_),
responder(&context) {
Proceed();
}
void Proceed() {
if (status == CallStatus::CREATE) {
status = CallStatus::PROCESS;
std::cout << "server RequestSayHello..." << std::endl;
service->RequestSayHello(&context, &request, &responder, cq, cq, this);
} else if (status == CallStatus::PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service, cq);
// The actual processing.
std::string prefix("Hello ");
reply.set_message(prefix + request.name());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status = CallStatus::FINISH;
std::cout << "server Finish()..." << std::endl;
responder.Finish(reply, Status::OK, this);
} else {
assert(status == CallStatus::FINISH);
delete this;
}
}
private:
SimpleService::AsyncService* service;
ServerCompletionQueue* cq;
ServerContext context;
HelloRequest request;
HelloReply reply;
ServerAsyncResponseWriter<HelloReply> responder;
enum CallStatus { CREATE,
PROCESS,
FINISH };
CallStatus status = CallStatus::CREATE;
};
// Server's main loop
void loop() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service, cq.get());
void* tag; // uniquely identifies a request.
bool ok = false;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
std::cout << "next event..." << std::endl;
cq->Next(&tag, &ok);
std::cout << "after next call" << std::endl;
assert(ok == true);
std::cout << "proceding..." << std::endl;
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<Server> server;
std::unique_ptr<ServerCompletionQueue> cq;
SimpleService::AsyncService service;
};
int main(int argc, char** argv) {
ServerAsync server;
server.run();
return std::cout.good() ? EXIT_SUCCESS : EXIT_FAILURE;
}