From cb86dad3ec58a9036fa462ca80e1e00a672ef406 Mon Sep 17 00:00:00 2001 From: andreasvogt89 <30302212+andreasvogt89@users.noreply.github.com> Date: Sun, 4 Jun 2023 21:55:26 +0200 Subject: [PATCH] Initial Commit --- .gitignore | 26 +++++++ README.md | 24 ++++++ cmd/queueic.go | 45 ++++++++++++ go.mod | 8 ++ go.sum | 4 + pkg/proto/proto.go | 125 +++++++++++++++++++++++++++++++ pkg/proto/proto_test.go | 79 ++++++++++++++++++++ pkg/queue/queue.go | 150 ++++++++++++++++++++++++++++++++++++++ pkg/queue/queue_test.go | 77 +++++++++++++++++++ pkg/server/packet.go | 113 ++++++++++++++++++++++++++++ pkg/server/server.go | 124 +++++++++++++++++++++++++++++++ pkg/server/server_test.go | 138 +++++++++++++++++++++++++++++++++++ 12 files changed, 913 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 cmd/queueic.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/proto/proto.go create mode 100644 pkg/proto/proto_test.go create mode 100644 pkg/queue/queue.go create mode 100644 pkg/queue/queue_test.go create mode 100644 pkg/server/packet.go create mode 100644 pkg/server/server.go create mode 100644 pkg/server/server_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..17b8bc5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# queuic +*.qic +*.queuic +*.bin \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..aeabc73 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +# Queuic Message Broker + +## Introduction +This is a very small qeueue message broker that I wrote for learning. It is not meant to be used in production so far. + +## Features +- [x] Simple massage queueing +- [x] Message persistence +- [x] Own protocol +- [ ] Server implementation +- [x] Encryption without certs + +## Protocol + + 0 1 2 3 4 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Command | Queue Name | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Item UUID | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | + | Item.... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ No newline at end of file diff --git a/cmd/queueic.go b/cmd/queueic.go new file mode 100644 index 0000000..0f77949 --- /dev/null +++ b/cmd/queueic.go @@ -0,0 +1,45 @@ +package main + +import ( + "fmt" + "os" + "strings" + + "github.com/dinifarb/mlog" + "github.com/dinifarb/queuic/pkg/server" +) + +func main() { + fmt.Println(` + ___ _ + / _ \ _ _ ___ _ _(_) ___ + | | | | | | |/ _ \ | | | |/ __| + | |_| | |_| | __/ |_| | | (__ + \__\_\\__,_|\___|\__,_|_|\___| + `) + mlog.SetAppName("QUEUEIC") + logLevel := os.Getenv("LOG_LEVEL") + switch strings.ToLower(logLevel) { + case "debug": + mlog.SetLevel(mlog.Ldebug) + case "info": + mlog.SetLevel(mlog.Linfo) + case "warn": + mlog.SetLevel(mlog.Lwarn) + case "error": + mlog.SetLevel(mlog.Lerror) + default: + mlog.Warn("LOG_LEVEL env variable is not set use default level") + mlog.SetLevel(mlog.Linfo) + } + keyString := os.Getenv("QUEUEIC_KEY_STRING") + if keyString == "" { + mlog.Warn("QUEUEIC_KEY_STRING env variable is not set use default key") + keyString = "QUEUEIC" + } + svr := server.NewQueuicServer(keyString) + if err := svr.Serve(); err != nil { + mlog.Error("server error: %v", err) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..059f5c5 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/dinifarb/queuic + +go 1.19 + +require ( + github.com/dinifarb/mlog v1.0.0 + github.com/google/uuid v1.3.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..750c6be --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/dinifarb/mlog v1.0.0 h1:9juET199uDF1NbVtlLanyc1kGW0eThXTcbx3sGctpD0= +github.com/dinifarb/mlog v1.0.0/go.mod h1:QEOWY+no8+AH92MS0iZGH2ERdpvIKIkJODByMaQyYM8= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go new file mode 100644 index 0000000..e0fedf7 --- /dev/null +++ b/pkg/proto/proto.go @@ -0,0 +1,125 @@ +package proto + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "fmt" + + "github.com/google/uuid" +) + +type Command uint8 +type QueueName [16]byte + +const ( + ENQUEUE Command = iota + ENQUEUE_ACK + PEEK + PEEK_ACK + ACCEPT + ACCEPT_ACK + RELEASE + RELEASE_ACK + SIZE + SIZE_ACK +) + +const ( + // MAX_PACKET_LENGTH = 4096 + MIN_PACKET_LENGTH = 17 +) + +type Queuic struct { + Command Command + QueueName QueueName + QueuicItem +} + +type QueuicItem struct { + Id uuid.UUID + Item []byte +} + +func (q *QueueName) String() string { + b := q[:] + b = bytes.Trim(b, "\x00") + return string(b[:]) +} + +func Encode(q *Queuic) ([]byte, error) { + length := MIN_PACKET_LENGTH + if q.QueuicItem.Item != nil { + length += len(q.QueuicItem.Id) + length += len(q.QueuicItem.Item) + } + b := make([]byte, length) + b[0] = byte(q.Command) + copy(b[1:17], q.QueueName[:]) + if q.QueuicItem.Item == nil { + return b, nil + } else { + copy(b[17:33], q.QueuicItem.Id[:]) + copy(b[33:], q.QueuicItem.Item[:]) + return b, nil + } +} + +func Decode(data []byte) (*Queuic, error) { + if len(data) < MIN_PACKET_LENGTH { + return nil, fmt.Errorf("packet is too short") + } + /* if len(data) > MAX_PACKET_LENGTH { + return nil, fmt.Errorf("packet is too long") + } */ + var q Queuic + q.Command = Command(data[0]) + copy(q.QueueName[:], data[1:17]) + if len(data) > MIN_PACKET_LENGTH { + itemId, err := uuid.FromBytes(data[17:33]) + if err != nil { + return nil, fmt.Errorf("failed to decode uuid: %v", err) + } + q.QueuicItem.Id = itemId + q.QueuicItem.Item = data[33:] + } + return &q, nil +} + +func Encrypt(key []byte, message []byte) ([]byte, error) { + if len(key) != 32 { + return nil, fmt.Errorf("key must be 32 bytes long") + } + block, err := aes.NewCipher(key) + if err != nil { + return nil, fmt.Errorf("failed to create new cipher: %v", err) + } + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("failed to create new gcm: %v", err) + } + nonce := make([]byte, gcm.NonceSize()) + ciphertext := gcm.Seal(nonce, nonce, message, nil) + return ciphertext, nil +} + +func Decrypt(key []byte, encryptedMessage []byte) ([]byte, error) { + if len(key) != 32 { + return nil, fmt.Errorf("key must be 32 bytes long") + } + block, err := aes.NewCipher(key) + if err != nil { + return nil, fmt.Errorf("failed to create new cipher: %v", err) + } + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("failed to create new gcm: %v", err) + } + nonceSize := gcm.NonceSize() + nonce, ciphertext := encryptedMessage[:nonceSize], encryptedMessage[nonceSize:] + plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) + if err != nil { + return nil, fmt.Errorf("failed to decrypt message: %v", err) + } + return plaintext, nil +} diff --git a/pkg/proto/proto_test.go b/pkg/proto/proto_test.go new file mode 100644 index 0000000..138e613 --- /dev/null +++ b/pkg/proto/proto_test.go @@ -0,0 +1,79 @@ +package proto_test + +import ( + "crypto/sha256" + "testing" + + "github.com/dinifarb/queuic/pkg/proto" + "github.com/google/uuid" +) + +func TestEnDecode(t *testing.T) { + queueName := proto.QueueName{} + copy(queueName[:], []byte("test")) + q := proto.Queuic{ + Command: proto.ENQUEUE, + QueueName: queueName, + QueuicItem: proto.QueuicItem{ + Id: uuid.New(), + Item: []byte("test message"), + }, + } + b, err := proto.Encode(&q) + if err != nil { + t.Errorf("failed to encode request: %v", err) + } + q2, err := proto.Decode(b) + if err != nil { + t.Errorf("failed to decode response: %v", err) + } + if q2.Command != proto.ENQUEUE { + t.Errorf("unexpected response command: %v", q2.Command) + } + if q2.QueueName != queueName { + t.Errorf("unexpected queue name: %v", q2.QueueName) + } + if string(q2.QueuicItem.Item) != "test message" { + t.Errorf("unexpected value: %v", q2.QueuicItem.Item) + } +} + +func TestEnDecodeWithCrypto(t *testing.T) { + queueName := proto.QueueName{} + copy(queueName[:], []byte("test")) + q := proto.Queuic{ + Command: proto.ENQUEUE, + QueueName: queueName, + QueuicItem: proto.QueuicItem{ + Id: uuid.New(), + Item: []byte("test message"), + }, + } + b, err := proto.Encode(&q) + if err != nil { + t.Errorf("failed to encode request: %v", err) + } + enk := sha256.Sum256([]byte("test")) + encrypted, err := proto.Encrypt(enk[:], b) + if err != nil { + t.Errorf("failed to encrypt request: %v", err) + } + dek := sha256.Sum256([]byte("test")) + decrypted, err := proto.Decrypt(dek[:], encrypted) + if err != nil { + t.Errorf("failed to decrypt request: %v", err) + } + q2, err := proto.Decode(decrypted) + if err != nil { + t.Errorf("failed to decode response: %v", err) + } + if q2.Command != proto.ENQUEUE { + t.Errorf("unexpected response command: %v", q2.Command) + } + if q2.QueueName != queueName { + t.Errorf("unexpected queue name: %v", q2.QueueName) + } + if string(q2.QueuicItem.Item) != "test message" { + t.Errorf("unexpected value: %v", q2.QueuicItem.Item) + } +} diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go new file mode 100644 index 0000000..4a763a5 --- /dev/null +++ b/pkg/queue/queue.go @@ -0,0 +1,150 @@ +package queue + +import ( + "bytes" + "encoding/gob" + "fmt" + "os" + "sync" + + "github.com/dinifarb/mlog" + "github.com/dinifarb/queuic/pkg/proto" + "github.com/google/uuid" +) + +const ( + path = "./data/%s.queuic" +) + +type Queue struct { + items []proto.QueuicItem + peeked map[uuid.UUID]proto.QueuicItem + mu sync.Mutex + store store + added int64 + removed int64 + Name proto.QueueName +} + +type store struct { + file os.File + mu sync.Mutex +} + +func NewQueue(name proto.QueueName) (*Queue, error) { + q := &Queue{ + Name: name, + } + q.items = make([]proto.QueuicItem, 0) + q.peeked = make(map[uuid.UUID]proto.QueuicItem) + fileName := fmt.Sprintf(path, name.String()) + if _, err := os.Stat(fileName); os.IsNotExist(err) { + f, err := os.Create(fileName) + if err != nil { + return nil, fmt.Errorf("failed to create bin file: %w", err) + } + q.store.file = *f + } else { + f, err := os.OpenFile(fileName, os.O_RDWR, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open bin file: %w", err) + } + q.store.file = *f + if err := q.loadFromDisk(); err != nil { + return nil, fmt.Errorf("failed to load from disk: %w", err) + } + } + return q, nil +} + +func (q *Queue) Enqueue(item proto.QueuicItem) error { + q.mu.Lock() + defer q.mu.Unlock() + q.items = append(q.items, item) + q.added++ + return q.saveToDisk() +} + +func (q *Queue) Size() int { + q.mu.Lock() + defer q.mu.Unlock() + return len(q.items) + len(q.peeked) +} + +func (q *Queue) Peek() (proto.QueuicItem, error) { + q.mu.Lock() + defer q.mu.Unlock() + if len(q.items) == 0 { + return proto.QueuicItem{}, fmt.Errorf("queue is empty") + } + item := q.items[0] + q.items = q.items[1:] + q.peeked[item.Id] = item + err := q.saveToDisk() + if err != nil { + return proto.QueuicItem{}, err + } + return item, nil +} + +func (q *Queue) Release(id uuid.UUID) error { + q.mu.Lock() + defer q.mu.Unlock() + //copy peeked to to 0 on items + q.items = append([]proto.QueuicItem{q.peeked[id]}, q.items...) + delete(q.peeked, id) + return q.saveToDisk() +} + +func (q *Queue) Accept(id uuid.UUID) error { + q.mu.Lock() + defer q.mu.Unlock() + delete(q.peeked, id) + q.removed++ + return q.saveToDisk() +} + +func (q *Queue) saveToDisk() error { + q.store.mu.Lock() + defer q.store.mu.Unlock() + if len(q.items) == 0 && len(q.peeked) == 0 { + return q.replaceFile() + } + var buff bytes.Buffer + enc := gob.NewEncoder(&buff) + if err := enc.Encode(q.items); err != nil { + return fmt.Errorf("gob error: %w", err) + } + if _, err := q.store.file.Write(buff.Bytes()); err != nil { + return fmt.Errorf("failed to write bytes to disk: %w", err) + } + mlog.Debug("saved to disk - items %d, peeked %d", len(q.items), len(q.peeked)) + return nil +} + +func (q *Queue) loadFromDisk() error { + q.store.mu.Lock() + defer q.store.mu.Unlock() + dec := gob.NewDecoder(&q.store.file) + if err := dec.Decode(&q.items); err != nil { + if err.Error() == "EOF" { + //EMPTY FILE + return nil + } + return fmt.Errorf("failed to read bytes from disk: %w", err) + } + return nil +} + +func (q *Queue) replaceFile() error { + q.store.file.Close() + fileName := fmt.Sprintf(path, q.Name.String()) + if err := os.Remove(fileName); err != nil { + return fmt.Errorf("failed to remove file: %w", err) + } + _, err := os.Create(fileName) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + return nil +} diff --git a/pkg/queue/queue_test.go b/pkg/queue/queue_test.go new file mode 100644 index 0000000..748aea9 --- /dev/null +++ b/pkg/queue/queue_test.go @@ -0,0 +1,77 @@ +package queue_test + +import ( + "os" + "sync" + "testing" + + "github.com/dinifarb/mlog" + "github.com/dinifarb/queuic/pkg/proto" + "github.com/dinifarb/queuic/pkg/queue" + "github.com/google/uuid" +) + +func TestQueueEnqueuePeekAccept(t *testing.T) { + fileName := "./data/epa.queuic" + mlog.SetLevel(mlog.Linfo) + if _, err := os.Stat(fileName); !os.IsNotExist(err) { + os.Remove(fileName) + } + name := proto.QueueName{} + copy(name[:], []byte("epa")) + q, err := queue.NewQueue(name) + if err != nil { + mlog.Error("%v", err) + os.Exit(1) + } + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + for i := 0; i < 100; i++ { + q.Enqueue(proto.QueuicItem{Id: uuid.New(), Item: []byte("test1")}) + } + wg.Done() + }() + go func() { + for i := 0; i < 100; i++ { + q.Enqueue(proto.QueuicItem{Id: uuid.New(), Item: []byte("test2")}) + } + wg.Done() + }() + wg.Wait() + if q.Size() != 200 { + t.Errorf("Expected size 200, got %d", q.Size()) + } + wg.Add(2) + go func() { + for i := 0; i < 100; i++ { + i, err := q.Peek() + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + err = q.Accept(i.Id) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + } + wg.Done() + }() + go func() { + for i := 0; i < 100; i++ { + i, err := q.Peek() + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + err = q.Accept(i.Id) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + } + wg.Done() + }() + wg.Wait() + if q.Size() != 0 { + t.Errorf("Expected size 0, got %d", q.Size()) + } + mlog.SetLevel(mlog.Ltrace) +} diff --git a/pkg/server/packet.go b/pkg/server/packet.go new file mode 100644 index 0000000..ce3af9c --- /dev/null +++ b/pkg/server/packet.go @@ -0,0 +1,113 @@ +package server + +import ( + "encoding/binary" + "fmt" + + "github.com/dinifarb/mlog" + "github.com/dinifarb/queuic/pkg/proto" + "github.com/dinifarb/queuic/pkg/queue" + "github.com/google/uuid" +) + +func (s *QueuicServer) HandleQueuicRequest(b []byte) ([]byte, error) { + req, err := proto.Decode(b) + if err != nil { + return nil, fmt.Errorf("failed to decode request: %v", err) + } + queue, ok := s.queueStore.queues[req.QueueName] + if !ok { + //TODO: handle create queue on the fly + return nil, fmt.Errorf("queue %s does not exists", req.QueueName) + } + switch req.Command { + case proto.ENQUEUE: + return handleEnqueue(queue, req) + case proto.PEEK: + return handlePeek(queue, req) + case proto.ACCEPT: + return handleAccept(queue, req) + case proto.RELEASE: + return handleRelease(queue, req) + case proto.SIZE: + return handleSize(queue, req) + default: + return nil, fmt.Errorf("unknown command: %v", req.Command) + } +} + +func handleEnqueue(current_queue *queue.Queue, q *proto.Queuic) ([]byte, error) { + err := current_queue.Enqueue(q.QueuicItem) + if err != nil { + return nil, fmt.Errorf("failed to enqueue: %v", err) + } + mlog.Debug("enqueued item: %v", q.QueuicItem.Id) + ack := proto.Queuic{ + Command: proto.ENQUEUE_ACK, + QueueName: q.QueueName, + } + return encodeResponse(&ack) +} + +func handlePeek(current_queue *queue.Queue, q *proto.Queuic) ([]byte, error) { + queueItem, err := current_queue.Peek() + if err != nil { + return nil, fmt.Errorf("failed to peek: %v", err) + } + mlog.Debug("peeked item: %v", queueItem.Id) + ack := proto.Queuic{ + Command: proto.PEEK_ACK, + QueueName: q.QueueName, + QueuicItem: queueItem, + } + return encodeResponse(&ack) +} + +func handleAccept(current_queue *queue.Queue, q *proto.Queuic) ([]byte, error) { + err := current_queue.Accept(q.QueuicItem.Id) + if err != nil { + return nil, fmt.Errorf("failed to accept: %v", err) + } + mlog.Debug("accepted item: %v", q.QueuicItem.Id) + ack := proto.Queuic{ + Command: proto.ACCEPT_ACK, + QueueName: q.QueueName, + } + return encodeResponse(&ack) +} + +func handleRelease(current_queue *queue.Queue, q *proto.Queuic) ([]byte, error) { + err := current_queue.Release(q.QueuicItem.Id) + if err != nil { + return nil, fmt.Errorf("failed to release: %v", err) + } + ack := proto.Queuic{ + Command: proto.ACCEPT_ACK, + QueueName: q.QueueName, + } + return encodeResponse(&ack) +} + +// TODO: handle size of queue +func handleSize(current_queue *queue.Queue, q *proto.Queuic) ([]byte, error) { + size := current_queue.Size() + sizeBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(sizeBytes, uint64(size)) + ack := proto.Queuic{ + Command: proto.SIZE_ACK, + QueueName: q.QueueName, + QueuicItem: proto.QueuicItem{ + Id: uuid.New(), + Item: sizeBytes, + }, + } + return encodeResponse(&ack) +} + +func encodeResponse(q *proto.Queuic) ([]byte, error) { + b, err := proto.Encode(q) + if err != nil { + return nil, fmt.Errorf("failed to encode response: %v", err) + } + return b, nil +} diff --git a/pkg/server/server.go b/pkg/server/server.go new file mode 100644 index 0000000..cb3c9c0 --- /dev/null +++ b/pkg/server/server.go @@ -0,0 +1,124 @@ +package server + +import ( + "crypto/sha256" + "fmt" + "net" + "os" + "sync" + "time" + + "github.com/dinifarb/mlog" + "github.com/dinifarb/queuic/pkg/proto" + "github.com/dinifarb/queuic/pkg/queue" +) + +const ( + NETWORK_TYPE = "udp" + MAX_PACKET_LENGTH = 4096 + DEFAULT_PORT = 9523 +) + +type QueuicServer struct { + Port int + Key [32]byte + shutdown chan bool + queueStore QueueStore +} + +type QueueStore struct { + sync.RWMutex + queues map[proto.QueueName]*queue.Queue +} + +func NewQueuicServer(key string) *QueuicServer { + q := make(map[proto.QueueName]*queue.Queue) + k := sha256.Sum256([]byte(key)) + mlog.Debug("server key: %x", k) + return &QueuicServer{ + Key: k, + queueStore: QueueStore{queues: q}, + } +} + +func (s *QueuicServer) CreateQueue(name proto.QueueName) error { + s.queueStore.Lock() + defer s.queueStore.Unlock() + for _, q := range s.queueStore.queues { + if q.Name == name { + return fmt.Errorf("queue %s already exists", name) + } + } + q, err := queue.NewQueue(name) + if err != nil { + return fmt.Errorf("failed to create queue: %v", err) + } + s.queueStore.queues[name] = q + mlog.Info("created queue: %s", name) + return nil +} + +//TODO DeleteQueue, LoadQueuesFromDisk + +func (s *QueuicServer) Serve() error { + s.shutdown = make(chan bool) + if s.Key == [32]byte{} { + return fmt.Errorf("server has no key source") + } + if s.Port == 0 { + s.Port = DEFAULT_PORT + } + mlog.Info("receive on port: %d", s.Port) + conn, err := net.ListenUDP(NETWORK_TYPE, &net.UDPAddr{Port: s.Port}) + if err != nil { + return fmt.Errorf("listen to UDP failed with: %v", err) + } + defer conn.Close() + for { + var buff = make([]byte, MAX_PACKET_LENGTH) + n, remoteAddr, err := conn.ReadFromUDP(buff[:]) + if err != nil { + mlog.Error("error reading from connection: %v", err) + continue + } + // breake if we got shutdown signal + select { + case <-s.shutdown: + mlog.Info("shutdown signal received, shutting down") + return nil + default: + go func(buff []byte, remoteAddr *net.UDPAddr) { + mlog.Debug("received message from %s", remoteAddr) + decryptedMessage, err := proto.Decrypt(s.Key[:], buff) + if err != nil { + mlog.Error("error decrypting message: %v", err) + return + } + resp, err := s.HandleQueuicRequest(decryptedMessage) + if err != nil { + mlog.Error("error handling request: %v", err) + return + } + encryptedMessage, err := proto.Encrypt(s.Key[:], resp) + if err != nil { + mlog.Error("error encrypting message: %v", err) + return + } + mlog.Debug("write message back to %s", remoteAddr) + _, err = conn.WriteToUDP(encryptedMessage, remoteAddr) + if err != nil { + mlog.Error("error writing to connection: %v", err) + } + + }(append([]byte(nil), buff[:n]...), remoteAddr) + } + } +} + +func (s *QueuicServer) Shutdown() { + // TODO Implement graceful shutdown + // wait for all onging requests to finish + s.shutdown <- true + time.Sleep(5 * time.Second) + os.Exit(0) +} diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go new file mode 100644 index 0000000..e125a72 --- /dev/null +++ b/pkg/server/server_test.go @@ -0,0 +1,138 @@ +package server_test + +import ( + "crypto/sha256" + "fmt" + "net" + "os" + "testing" + + "github.com/dinifarb/queuic/pkg/proto" + "github.com/dinifarb/queuic/pkg/server" + "github.com/google/uuid" +) + +func TestCreateServerAndEnqueue(t *testing.T) { + fileName := "./data/test.queuic" + if _, err := os.Stat(fileName); !os.IsNotExist(err) { + os.Remove(fileName) + } + svr := server.NewQueuicServer("test") + go func() { + if err := svr.Serve(); err != nil { + t.Errorf("server error: %v", err) + } + }() + name := proto.QueueName{} + copy(name[:], []byte("test")) + if err := svr.CreateQueue(name); err != nil { + t.Errorf("%v", err) + } + queueName := proto.QueueName{} + copy(queueName[:], []byte("test")) + req := proto.Queuic{ + Command: proto.ENQUEUE, + QueueName: queueName, + QueuicItem: proto.QueuicItem{ + Id: uuid.New(), + Item: []byte("test message"), + }, + } + reqBytes, err := proto.Encode(&req) + if err != nil { + t.Errorf("failed to encode request: %v", err) + } + resp, err := sendUdpMessage(reqBytes) + if err != nil { + t.Errorf("%v", err) + return + } + respQueuic, err := proto.Decode(resp) + if err != nil { + t.Errorf("failed to decode response: %v", err) + } + if respQueuic.Command != proto.ENQUEUE_ACK { + t.Errorf("unexpected response command: %v", respQueuic.Command) + } + peek := proto.Queuic{ + Command: proto.PEEK, + QueueName: queueName, + } + peekBytes, err := proto.Encode(&peek) + if err != nil { + t.Errorf("failed to encode request: %v", err) + } + resp, err = sendUdpMessage(peekBytes) + if err != nil { + t.Errorf("%v", err) + return + } + respQueuic, err = proto.Decode(resp) + if err != nil { + t.Errorf("failed to decode response: %v", err) + } + if respQueuic.Command != proto.PEEK_ACK { + t.Errorf("unexpected response command: %v", respQueuic.Command) + } + accept := proto.Queuic{ + Command: proto.ACCEPT, + QueueName: queueName, + QueuicItem: proto.QueuicItem{ + Id: respQueuic.QueuicItem.Id, + }, + } + acceptBytes, err := proto.Encode(&accept) + if err != nil { + t.Errorf("failed to encode request: %v", err) + } + resp, err = sendUdpMessage(acceptBytes) + if err != nil { + t.Errorf("%v", err) + return + } + respQueuic, err = proto.Decode(resp) + if err != nil { + t.Errorf("failed to decode response: %v", err) + } + if respQueuic.Command != proto.ACCEPT_ACK { + t.Errorf("unexpected response command: %v", respQueuic.Command) + } +} + +func sendUdpMessage(send []byte) ([]byte, error) { + key := sha256.Sum256([]byte("test")) + fmt.Printf("client key: %x\n", key) + encrypted, err := proto.Encrypt(key[:], send) + if err != nil { + return nil, err + } + s, err := net.ResolveUDPAddr("udp4", "localhost:9523") + if err != nil { + return nil, err + } + c, err := net.DialUDP("udp4", nil, s) + if err != nil { + return nil, err + } + defer c.Close() + _, err = c.Write(encrypted) + if err != nil { + return nil, err + } + fmt.Println("sent UDP Message, len: ", len(encrypted)) + buffer := make([]byte, 1024) + n, _, err := c.ReadFromUDP(buffer) + if err != nil { + return nil, err + } + if n > 0 { + fmt.Println("received UDP Message, len: ", len(buffer)) + decrypted, err := proto.Decrypt(key[:], buffer[:n]) + if err != nil { + return nil, err + } + return decrypted, nil + } else { + return nil, fmt.Errorf("udp failed") + } +}