Skip to content

Commit

Permalink
Alpha version (#12)
Browse files Browse the repository at this point in the history
* Ported ZeroMQ from xs to zmq library
  • Loading branch information
dvladi77 authored Oct 19, 2023
1 parent a133f9f commit 05293d5
Show file tree
Hide file tree
Showing 6 changed files with 583 additions and 50 deletions.
50 changes: 0 additions & 50 deletions cgo-nmsg/xs.go

This file was deleted.

42 changes: 42 additions & 0 deletions cgo-nmsg/zmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package nmsg

/*
#cgo pkg-config: libnmsg libzmq
#cgo LDFLAGS: -lnmsg -lzmq
#include <stdlib.h>
#include <nmsg.h>
#include <zmq.h>
*/
import "C"
import (
"errors"
"unsafe"
)

var zmqContext unsafe.Pointer

func init() {
zmqContext = C.zmq_init(1)
}

// NewZMQInput opens an Input reading from the given ZMQ endpoint.
func NewZMQInput(zmqep string) (Input, error) {
czmqep := C.CString(zmqep)
defer C.free(unsafe.Pointer(czmqep))
inp := C.nmsg_input_open_zmq_endpoint(zmqContext, czmqep)
if inp == nil {
return nil, errors.New("failed to create NMSG input")
}
return &nmsgInput{input: inp}, nil
}

// NewZMQInput creates an output writing to the given ZMQ endpoint.
func NewZMQOutput(zmqep string, bufsiz int) (Output, error) {
czmqep := C.CString(zmqep)
defer C.free(unsafe.Pointer(czmqep))
outp := C.nmsg_output_open_zmq_endpoint(zmqContext, czmqep, C.size_t(bufsiz))
if outp == nil {
return nil, errors.New("failed to create NMSG output")
}
return &nmsgOutput{output: outp}, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/dnstap/golang-dnstap v0.4.0
github.com/pebbe/zmq4 v1.2.10
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v2 v2.4.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/miekg/dns v1.1.31 h1:sJFOl9BgwbYAWOGEwr61FU28pqsBNdpRBnhGXtO06Oo=
github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/pebbe/zmq4 v1.2.10 h1:wQkqRZ3CZeABIeidr3e8uQZMMH5YAykA/WN0L5zkd1c=
github.com/pebbe/zmq4 v1.2.10/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8=
Expand Down
187 changes: 187 additions & 0 deletions zmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package nmsg

import (
"errors"
zmq "github.com/pebbe/zmq4"
"io"
"strings"
)

type socketKind int
type socketDirection int
type socketType int

const (
socketInput socketKind = 0
socketOutput = 1
)
const (
sockdirInvalid socketDirection = 0
sockdirAccept = 1
sockdirConnect = 2
)

const (
SocktypeInvalid socketType = 0
SocktypePubsub = 1
SocktypePushpull = 2
)

func munge_endpoint(ep string) (string, socketDirection, socketType, error) {
endpoint := ""
sockdir := sockdirInvalid
socktype := SocktypeInvalid

tokens := strings.Split(ep, ",")
for i, tok := range tokens {
if i == 0 {
endpoint = tok
} else {
switch tok {
case "accept":
if sockdir != sockdirInvalid {
return "", sockdirInvalid, SocktypeInvalid, errors.New("socket direction is already set")
}
sockdir = sockdirAccept
case "connect":
if sockdir != sockdirInvalid {
return "", sockdirInvalid, SocktypeInvalid, errors.New("socket direction is already set")
}
sockdir = sockdirConnect
case "pubsub":
if socktype != SocktypeInvalid {
return "", sockdirInvalid, SocktypeInvalid, errors.New("socket type is already set")
}
socktype = SocktypePubsub
case "pushpull":
if socktype != SocktypeInvalid {
return "", sockdirInvalid, SocktypeInvalid, errors.New("socket type is already set")
}
socktype = SocktypePushpull
}
}

}
return endpoint, sockdir, socktype, nil
}

func setSocketOptions(socket *zmq.Socket, p zmq.Type) error {
if p == zmq.SUB {
return socket.SetSubscribe("NMSG")
}
if p == zmq.PUB || p == zmq.PUSH {
err := socket.SetSndhwm(1000)
if err == nil {
err = socket.SetLinger(1000)
}
return err
}
return nil
}

func zmq_socket_type(kind socketKind, socketType socketType) zmq.Type {
if kind == socketInput {
if socketType == SocktypePubsub {
return zmq.SUB
} else if socketType == SocktypePushpull {
return zmq.PULL
}
} else if kind == socketOutput {
if socketType == SocktypePubsub {
return zmq.PUB
} else if socketType == SocktypePushpull {
return zmq.PUSH
}
}

return zmq.Type(-1)
}

func zmq_socket(ep string, kind socketKind) (*zmq_io, error) {
endpoint, socketDir, socketType, err := munge_endpoint(ep)

binded := false

if err != nil {
return nil, err
}

if endpoint == "" {
return nil, errors.New("end point is not set")
}

if socketDir == sockdirInvalid {
return nil, errors.New("socket direction is not set")
}

if socketType == SocktypeInvalid {
return nil, errors.New("socket type is not set")
}

zmq_type := zmq_socket_type(kind, socketType)

socket, err := zmq.NewSocket(zmq_type)

if err != nil {
return nil, err
}

err = setSocketOptions(socket, zmq_type)
if err != nil {
return nil, err
}

if socketDir == sockdirAccept {
err = socket.Bind(endpoint)
if err != nil {
return nil, err
}
binded = true

} else if socketDir == sockdirConnect {
err = socket.Connect(endpoint)
if err != nil {
return nil, err
}
}

return &zmq_io{socket, binded, endpoint}, nil
}

type zmq_io struct {
sock *zmq.Socket
binded bool
ep string
}

func (i *zmq_io) Read(p []byte) (n int, err error) {
buf, err := i.sock.RecvBytes(0)
if err != nil {
return 0, err
}
copy(p, buf)
return len(buf), nil
}

func (o *zmq_io) Write(p []byte) (int, error) {
return o.sock.SendBytes(p, 0)
}

func (o *zmq_io) Close() error {
if o.binded == true {
err := o.sock.Unbind(o.ep)
if err != nil {
return err
}
}

return o.sock.Close()
}

func NewZMQWriter(ep string) (io.WriteCloser, error) {
return zmq_socket(ep, socketOutput)
}

func NewZMQReader(ep string) (io.ReadCloser, error) {
return zmq_socket(ep, socketInput)
}
Loading

0 comments on commit 05293d5

Please sign in to comment.