Skip to content

Commit

Permalink
feat: add channel.ackDuration()
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Aug 14, 2023
1 parent f12e9f8 commit b7fa899
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning].

## [Unreleased]

- Add `channel.ackDuration()` to get the number of milliseconds to wait for a subscription confirmation/rejection. ([@palkan][])

- Add `client.subscribeAsync` to issue a `subscribe` command without waiting for the confirmation. ([@palkan][])

- Fix `k6` / Logrus compatibility issue. ([@palkan][])
Expand Down
29 changes: 28 additions & 1 deletion channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"reflect"
"strings"
"sync"
"time"

"github.com/dop251/goja"
Expand All @@ -18,11 +19,26 @@ type Channel struct {
logger *logrus.Entry

confCh chan bool
ackMu sync.Mutex
readCh chan *cableMsg

asyncHandlers []goja.Callable

ignoreReads bool

createdAt time.Time
ackedAt time.Time
}

func NewChannel(c *Client, identifier string) *Channel {
return &Channel{
client: c,
identifier: identifier,
logger: c.logger,
readCh: make(chan *cableMsg, 2048),
confCh: make(chan bool, 1),
createdAt: time.Now(),
}
}

// Perform sends passed action with additional data to the channel
Expand Down Expand Up @@ -123,7 +139,18 @@ func (ch *Channel) OnMessage(fn goja.Value) {
ch.asyncHandlers = append(ch.asyncHandlers, f)
}

func (ch *Channel) handleAck(val bool) {
func (ch *Channel) AckDuration() int64 {
ch.ackMu.Lock()
defer ch.ackMu.Unlock()

return ch.ackedAt.Sub(ch.createdAt).Milliseconds()
}

func (ch *Channel) handleAck(val bool, when time.Time) {
ch.ackMu.Lock()
defer ch.ackMu.Unlock()

ch.ackedAt = when
ch.confCh <- val
}

Expand Down
17 changes: 8 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type cableMsg struct {
Identifier string `json:"identifier,omitempty"`
Data string `json:"data,omitempty"`
Message interface{} `json:"message,omitempty"`

receivedAt time.Time
}

type Client struct {
Expand Down Expand Up @@ -110,17 +112,12 @@ func (c *Client) SubscribeAsync(channelName string, paramsIn goja.Value) (*Subsc
return &SubscribePromise{client: c, channel: c.channels[identifier]}, nil
}

channel := NewChannel(c, identifier)

if err := c.send(&cableMsg{Command: "subscribe", Identifier: identifier}); err != nil {
return nil, err
}

channel := &Channel{
client: c,
identifier: identifier,
logger: c.logger,
readCh: make(chan *cableMsg, 2048),
confCh: make(chan bool, 1),
}
c.channels[identifier] = channel

return &SubscribePromise{client: c, channel: channel}, nil
Expand Down Expand Up @@ -213,9 +210,9 @@ func (c *Client) handleLoop() {
if c.channels[msg.Identifier] != nil {
switch msg.Type {
case "confirm_subscription":
c.channels[msg.Identifier].handleAck(true)
c.channels[msg.Identifier].handleAck(true, msg.receivedAt)
case "reject_subscription":
c.channels[msg.Identifier].handleAck(false)
c.channels[msg.Identifier].handleAck(false, msg.receivedAt)
default:
c.channels[msg.Identifier].handleIncoming(msg)
}
Expand Down Expand Up @@ -303,6 +300,8 @@ func (c *Client) receiveIgnoringPing() (*cableMsg, error) {
continue
}

msg.receivedAt = time.Now()

timestamp := int64(time.Now().UnixNano()) / 1_000_000

if data, ok := msg.Message.(map[string]interface{}); ok {
Expand Down
21 changes: 12 additions & 9 deletions examples/chat.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.1.0/index.js";
import { Trend } from "k6/metrics";

let rttTrend = new Trend("rtt", true);
let subTrend = new Trend("suback", true);

let userId = `100${__VU}`;
let userName = `Kay${userId}`;
Expand Down Expand Up @@ -43,33 +44,35 @@ export default function () {
});

if (
!check(client, {
"successful connection": (obj) => obj,
})
!check(client, {
"successful connection": (obj) => obj,
})
) {
fail("connection failed");
}

let channel = client.subscribe("ChatChannel", { id: WORKSPACE });

if (
!check(channel, {
"successful subscription": (obj) => obj,
})
!check(channel, {
"successful subscription": (obj) => obj,
})
) {
fail("failed to subscribe");
}

subTrend.add(channel.ackDuration());

for (let i = 0; i < MESSAGES_NUM; i++) {
let startMessage = Date.now();
channel.perform("speak", { message: `hello from ${userName}` });

let message = channel.receive({ author_id: userId });

if (
!check(message, {
"received its own message": (obj) => obj,
})
!check(message, {
"received its own message": (obj) => obj,
})
) {
fail("expected message hasn't been received");
}
Expand Down

0 comments on commit b7fa899

Please sign in to comment.