-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathzmq.go
586 lines (471 loc) · 14.6 KB
/
zmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
package zmq
// import "fmt"
import . "bytes"
import "io"
import "os"
import "unsafe"
import "strconv"
import . "unsafe/coffer"
import . "gonewrong"
// #include <u.h>
// #include <zmq.h>
// #include <stdlib.h>
// #include "coffer_cb.h"
import "C"
// ******** Global ZMQ Constants *********************************************
const (
ZmqPair = C.ZMQ_PAIR
ZmqPub = C.ZMQ_PUB
ZmqSub = C.ZMQ_SUB
ZmqReq = C.ZMQ_REQ
ZmqRep = C.ZMQ_REP
ZmqXReq = C.ZMQ_XREQ
ZmqXRep = C.ZMQ_XREP
ZmqUpstream = C.ZMQ_PULL
ZmqDownstream = C.ZMQ_PUSH
ZmqHWM = C.ZMQ_HWM
ZmqSwap = C.ZMQ_SWAP
ZmqAffinity = C.ZMQ_AFFINITY
ZmqIdentitiy = C.ZMQ_IDENTITY
ZmqSubscribe = C.ZMQ_SUBSCRIBE
ZmqUnsubscribe = C.ZMQ_UNSUBSCRIBE
ZmqRate = C.ZMQ_RATE
ZmqRecoveryIVL = C.ZMQ_RECOVERY_IVL
ZmqMCastLoop = C.ZMQ_MCAST_LOOP
ZmqSndBuf = C.ZMQ_SNDBUF
ZmqRcvBuf = C.ZMQ_RCVBUF
ZmqRcvMore = C.ZMQ_RCVMORE
ZmqNoBlock = C.ZMQ_NOBLOCK
ZmqSndMore = C.ZMQ_SNDMORE
ZmqDelimiter = C.ZMQ_DELIMITER
ZmqVSM = C.ZMQ_VSM
ZmqMsgMore = C.ZMQ_MSG_MORE
ZmqMsgShared = C.ZMQ_MSG_SHARED
ZmqMaxVSMSize = C.ZMQ_MAX_VSM_SIZE
ZmqENOTSUP = C.ENOTSUP
ZmqEPROTONOSUPPORT = C.EPROTONOSUPPORT
ZmqENOBUFS = C.ENOBUFS
ZmqENETDOWN = C.ENETDOWN
ZmqEADDRINUSE = C.EADDRINUSE
ZmqEADDRNOTAVAIL = C.EADDRNOTAVAIL
ZmqECONNREFUSED = C.ECONNREFUSED
ZmqEINPROGRESS = C.EINPROGRESS
ZmqStreamer = C.ZMQ_STREAMER
ZmqForwarder = C.ZMQ_FORWARDER
ZmqQueue = C.ZMQ_QUEUE
ZmqPollIn = C.ZMQ_POLLIN
ZmqPollOut = C.ZMQ_POLLOUT
ZmqPollErr = C.ZMQ_POLLERR
)
func ZmqEMTHREAD() int { return int(C.EMTHREAD) }
func ZmqEFSM() int { return int(C.EFSM) }
func ZmqENOCOMPATPROTO() int { return int(C.ENOCOMPATPROTO) }
func ZmqETERM() int { return int(C.ETERM) }
// ******** ZMQ Interfaces ***************************************************
type Provider interface {
ErrKnow
NewContext(initArgs InitArgs) (Context, os.Error)
NewMessage() Message
Version() (major int, minor int, pl int)
}
type Provided interface {
Provider() Provider
}
// Arguments to New Context
type InitArgs struct {
IoThreads int
}
// Sensible default init args as per most recent zmq docs
// (uses one I/O thread by default).
func DefaultInitArgs() InitArgs {
return InitArgs{IoThreads: 1}
}
// Integer value of environment variable GOMAXPROCS if > 1, 1 otherwise
func EnvGOMAXPROCS() int {
var maxProcs, error = strconv.Atoi(os.Getenv("GOMAXPROCS"))
if error == nil && maxProcs > 1 {
return maxProcs
}
return 1
}
// Context interface
//
// Contexts are always global thread-safe objects
type Context interface {
io.Closer
Provided
NewSocket(socketType int) (Socket, os.Error)
ProcPollItem(pi *PollItem, fdFun ProcFdFun, socketFun ProcSocketFun) os.Error
Poll(items []PollItem, timeout int) (int, os.Error)
Terminate() os.Error
}
type PollItem C.zmq_pollitem_t
type ProcFdFun func(fd int, events int8, revents int8)
type ProcSocketFun func(socket Socket, events int8, revents int8)
func SetFdPollItem(pi *PollItem, fd int, events int8) {
pi.fd = C.int(fd)
pi.socket = unsafe.Pointer(uintptr(0))
pi.events = C.short(events)
}
func (p lzmqSocket) SetPollItem(pi *PollItem, revents int8) os.Error {
if pi == nil {
return os.EINVAL
}
pi.socket = unsafe.Pointer(uintptr(p))
pi.revents = C.short(revents)
pi.fd = 0
return nil
}
func (p lzmqContext) Poll(items []PollItem, timeout int) (int, os.Error) {
ret := int(C.zmq_poll((*C.zmq_pollitem_t)(unsafe.Pointer(&items[0])), C.int(len(items)), C.long(timeout)))
if ret >= 0 {
return ret, nil
}
return 0, p.Provider().GetError()
}
func (p lzmqContext) ProcPollItem(pi *PollItem, fdFun ProcFdFun, socketFun ProcSocketFun) os.Error {
if pi == nil {
return os.EINVAL
}
var ptr uintptr = uintptr(pi.socket)
if IsCNullPtr(ptr) {
if fdFun != nil {
fdFun(int(pi.fd), int8(pi.events), int8(pi.revents))
} else {
return os.EINVAL
}
} else {
if socketFun != nil {
socketFun(lzmqSocket(ptr), int8(pi.events), int8(pi.revents))
} else {
return os.EINVAL
}
}
return nil
}
// Message interface
//
// Messages may only be used reliably with sockets from the same provider
type Message interface {
io.Closer
Provided
WriteTo(buf *Buffer) (n int, err os.Error)
ReadFrom(buf *Buffer) (n int, err os.Error)
GetData(coffer *PtrCoffer) os.Error
SetData(coffer *MemCoffer) os.Error
MoveTo(msg Message) os.Error
CopyTo(msg Message) os.Error
Size() int
}
// Socket interface
//
// Sockets are typically thread-bound
type Socket interface {
io.Closer
Provided
Bind(address string) os.Error
Connect(address string) os.Error
SetInt64SockOpt(option int, value int64) os.Error
SetUInt64SockOpt(option int, value uint64) os.Error
SetBinaryDataSockOpt(option int, value []byte) os.Error
SetStringSockOpt(option int, value string) os.Error
Receive(msg Message, flags int) os.Error
Send(msg Message, flags int) os.Error
SetPollItem(pi *PollItem, revents int8) os.Error
// Flush() os.Error
}
// ******** lzmq: Provider ***************************************************
// libzmq provider impl
type libZmqProvider struct{}
var theLibZmqProvider = new(libZmqProvider)
func (p libZmqProvider) Provider() Provider {
return theLibZmqProvider
}
func LibZmqProvider() Provider {
return theLibZmqProvider
}
// ******** lzmq: Context ********
// libzmq context wrapper
type lzmqContext uintptr
// Creates a zmq context and returns it.
//
// Don't forget to set EnvGOMAXPROCS appropriately when working with libzmq
//
// Contexts are finalized by the GC unless they are manually destructed
// by calling Terminate() beforehand. Applications need to arrange
// that no socket is used or even closed after the owning context has
// been destructed. This requires to have at least one running go routine
// with a live referene to the context.
func (p libZmqProvider) NewContext(args InitArgs) (Context, os.Error) {
contextPtr := C.zmq_init(C.int(args.IoThreads))
if IsCNullPtr(uintptr(contextPtr)) {
return nil, p.GetError()
}
lzmqContext := lzmqContext(contextPtr)
return lzmqContext, nil
}
func (p *libZmqProvider) NewMessage() Message {
msg := new(lzmqMessage)
return msg
}
// Type of error codes used by LibZmq
//
// Necessary since lzmq provides its own zmq_errno, zmq_strerror functions
type LibZmqErrno int
func (p LibZmqErrno) String() string {
return C.GoString(C.zmq_strerror(C.int(p)))
}
func (p *libZmqProvider) GetError() os.Error {
return LibZmqErrno(C.zmq_errno())
}
func (p *libZmqProvider) OkIf(cond bool) os.Error {
return p.ErrorIf(!cond)
}
func (p *libZmqProvider) ErrorIf(cond bool) os.Error {
if cond {
err := p.GetError()
if err != nil {
return err
}
}
return nil
}
func (p *libZmqProvider) Version() (major int, minor int, pl int) {
C.zmq_version((*C.int)(unsafe.Pointer(&major)), (*C.int)(unsafe.Pointer(&minor)), (*C.int)(unsafe.Pointer(&pl)))
return
}
func (p lzmqContext) Provider() Provider {
return theLibZmqProvider
}
// Calls Terminate()
func (p lzmqContext) Close() os.Error {
return p.Terminate()
}
// Calls zmq_term on underlying context pointer
//
// Only call once
func (p lzmqContext) Terminate() os.Error {
ch := make(chan os.Error)
ptr := unsafe.Pointer(p)
if ptr != nil {
// Needs to run in separate go routine to safely lock the OS Thread
// and synchronize via channel to know when we're done
Thunk(func() {
ch <- p.Provider().OkIf(C.zmq_term(ptr) == 0)
}).RunInOSThread()
// Wait for completion
return <-ch
}
return nil
}
// ******** lzmq: Messages ********
type lzmqMessageHolder interface {
Message
getLzmqMessage() *lzmqMessage
}
type lzmqMessage C.zmq_msg_t
func (p *lzmqMessage) empty() os.Error {
return p.Provider().OkIf(C.zmq_msg_init(p.ptr()) == 0)
}
func (p *lzmqMessage) allocate(length int) os.Error {
return p.Provider().OkIf(C.zmq_msg_init_size(p.ptr(), C.size_t(length)) == 0)
}
func (p *lzmqMessage) Provider() Provider { return LibZmqProvider() }
func (p *lzmqMessage) Size() int {
// size_t always fits int, we do not allocate larger messages
return int(C.zmq_msg_size(p.ptr()))
}
func (p *lzmqMessage) MoveTo(msg Message) os.Error {
if msg == nil {
return os.EINVAL
}
lzmqMsgHolder, err := msg.(lzmqMessageHolder)
if err == false {
return os.EINVAL
}
lzmqMsg := lzmqMsgHolder.getLzmqMessage()
return p.Provider().OkIf(C.zmq_msg_move(lzmqMsg.ptr(), p.ptr()) == 0)
}
func (p *lzmqMessage) CopyTo(msg Message) os.Error {
if msg == nil {
return os.EINVAL
}
lzmqMsgHolder, err := msg.(lzmqMessageHolder)
if err == false {
return os.EINVAL
}
lzmqMsg := lzmqMsgHolder.getLzmqMessage()
return p.Provider().OkIf(C.zmq_msg_copy(lzmqMsg.ptr(), p.ptr()) == 0)
}
func (p *lzmqMessage) WriteTo(buf *Buffer) (n int, err os.Error) {
n = p.Size()
if n <= 0 {
return 0, nil
}
start := p.data()
var coffr Coffer
coffr, err = NewPtrCoffer(start, n)
if err != nil {
return 0, err
}
var n64 int64
n64, err = buf.ReadFrom(coffr)
if n64 == int64(n) && err == os.EOF {
err = nil
}
return int(n64), err
}
func (p *lzmqMessage) ReadFrom(buf *Buffer) (n int, err os.Error) {
n = buf.Len()
if n <= 0 {
return 0, nil
}
err = p.allocate(n)
if err != nil {
return 0, err
}
start := p.data()
var coffr Coffer
coffr, err = NewPtrCoffer(start, n)
if err != nil {
return 0, err
}
var n64 int64
n64, err = buf.WriteTo(coffr)
if n64 == int64(n) && err == os.EOF {
err = nil
}
return int(n64), err
}
func (p *lzmqMessage) GetData(coffer *PtrCoffer) os.Error {
if coffer == nil {
return os.EINVAL
}
defer p.empty()
// This is flaky wrt freeing
return coffer.InitPtrCoffer(p.data(), p.Size())
}
func (p *lzmqMessage) SetData(coffer *MemCoffer) os.Error {
data := unsafe.Pointer(coffer.GetBasePtr())
return p.Provider().OkIf(C.zmq_msg_init_data((*C.zmq_msg_t)(p), data, C.size_t(coffer.Cap()), C.CloseMemCofferCb(), unsafe.Pointer(coffer)) == 0)
}
func (p *lzmqMessage) ptr() *C.zmq_msg_t {
return (*C.zmq_msg_t)(unsafe.Pointer(p))
}
func (p *lzmqMessage) data() uintptr {
return uintptr(C.zmq_msg_data(p.ptr()))
}
func (p *lzmqMessage) getLzmqMessage() *lzmqMessage {
return p
}
func (p *lzmqMessage) Close() os.Error {
return p.Provider().OkIf(C.zmq_msg_close(p.ptr()) == 0)
}
// ******** lzmq: Sockets ********
// For casting
type lzmqSocketHolder interface {
Socket
getLzmqSocket() lzmqSocket
}
// libzmq socket wrapper
type lzmqSocket uintptr
// Creates a new Socket with the given socketType
//
// Sockets only must be used from a fixed OSThread. This may be achieved
// by conveniently using Thunk.NewOSThread() or by calling runtime.LockOSThread()
func (p lzmqContext) NewSocket(socketType int) (Socket, os.Error) {
ptr := unsafe.Pointer(C.zmq_socket(unsafe.Pointer(p), C.int(socketType)))
if IsCNullPtr(uintptr(ptr)) {
return nil, p.Provider().GetError()
}
return lzmqSocket(ptr), nil
}
func (p lzmqSocket) getLzmqSocket() lzmqSocket {
return p
}
func (p lzmqSocket) Provider() Provider { return LibZmqProvider() }
// Bind server socket
func (p lzmqSocket) Bind(address string) os.Error {
ptr := unsafe.Pointer(p)
// apparantly freed by zmq
c_addr := C.CString(address)
return p.Provider().OkIf(C.zmq_bind(ptr, c_addr) == 0)
}
// Connect client socket
func (p lzmqSocket) Connect(address string) os.Error {
ptr := unsafe.Pointer(p)
// apparently freed by zmq
c_addr := C.CString(address)
return p.Provider().OkIf(C.zmq_connect(ptr, c_addr) == 0)
}
func (p lzmqSocket) SetInt64SockOpt(option int, value int64) os.Error {
// TODO CheckMem
return p.Provider().OkIf(C.zmq_setsockopt(unsafe.Pointer(p), C.int(option), unsafe.Pointer(&value), C.size_t(unsafe.Sizeof(value))) == 0)
}
func (p lzmqSocket) SetUInt64SockOpt(option int, value uint64) os.Error {
// TODO CheckMem
return p.Provider().OkIf(C.zmq_setsockopt(unsafe.Pointer(p), C.int(option), unsafe.Pointer(&value), C.size_t(unsafe.Sizeof(value))) == 0)
}
func (p lzmqSocket) SetBinaryDataSockOpt(option int, value []byte) os.Error {
// TODO CheckMem
return p.Provider().OkIf(C.zmq_setsockopt(unsafe.Pointer(p), C.int(option), unsafe.Pointer(&value[0]), C.size_t(len(value))) == 0)
}
func (p lzmqSocket) SetStringSockOpt(option int, value string) os.Error {
cstring := (unsafe.Pointer)(C.CString(value))
defer C.free(cstring)
// TODO CheckMem
return p.Provider().OkIf(C.zmq_setsockopt(unsafe.Pointer(p), C.int(option), cstring, C.size_t(len(value))) == 0)
}
func (p lzmqSocket) Receive(msg Message, flags int) os.Error {
if msg == nil {
return os.EINVAL
}
lzmqMsgHolder, castable := msg.(lzmqMessageHolder)
if !castable {
return os.EINVAL
}
lzmqMsg := lzmqMsgHolder.getLzmqMessage()
err := lzmqMsg.empty()
if err != nil {
return err
}
ret := p.Provider().OkIf(C.zmq_recv(unsafe.Pointer(p), lzmqMsg.ptr(), C.int(flags)) == 0)
// fmt.Println("recv", msg)
return ret
}
func (p lzmqSocket) Send(msg Message, flags int) os.Error {
if msg == nil {
return os.EINVAL
}
lzmqMsgHolder, err := msg.(lzmqMessageHolder)
if err == false {
return os.EINVAL
}
lzmqMsg := lzmqMsgHolder.getLzmqMessage()
ret := p.Provider().OkIf(C.zmq_send(unsafe.Pointer(p), lzmqMsg.ptr(), C.int(flags)) == 0)
// fmt.Println("sent", msg)
return ret
}
// keep this
/*func (p lzmqSocket) Flush() os.Error {
return p.Provider().OkIf(C.zmq_flush(unsafe.Pointer(p)) == 0)
}
*/
// Closes this socket
//
// Expects the executing go routine to still be locked onto an OSThread.
// May be called only once
func (p lzmqSocket) Close() os.Error {
return p.Provider().OkIf(C.zmq_close(unsafe.Pointer(p)) == 0)
}
// ******** Watches ********
/*type lzmqWatch uintptr
func (p lzmqWatch) Stop() uint64 {
return uint64(C.zmq_stopwatch_stop(unsafe.Pointer(p)))
}
*/
//export free_mem_coffer
func freeMemCoffer(base uintptr, hint uintptr) {
((*MemCoffer)(unsafe.Pointer(hint))).Close()
}
// {}