Skip to content

Commit

Permalink
feat: add a system message to pass to actor after initialization (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Feb 19, 2024
1 parent 0530449 commit b658076
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 26 deletions.
15 changes: 15 additions & 0 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/tochemey/goakt/discovery"
"github.com/tochemey/goakt/discovery/nats"
"github.com/tochemey/goakt/log"
messagespb "github.com/tochemey/goakt/pb/messages/v1"
testspb "github.com/tochemey/goakt/test/data/pb/v1"
"github.com/travisjeffery/go-dynaport"
"go.uber.org/atomic"
Expand Down Expand Up @@ -83,6 +84,7 @@ func (p *Tester) PostStop(context.Context) error {
// Receive processes any message dropped into the actor mailbox without a reply
func (p *Tester) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestSend:
p.counter.Inc()
case *testspb.TestPanic:
Expand Down Expand Up @@ -122,6 +124,7 @@ func (p *Monitor) PreStart(context.Context) error {

func (p *Monitor) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestSend:
default:
panic(ErrUnhandled)
Expand Down Expand Up @@ -149,6 +152,7 @@ func (c *Monitored) PreStart(context.Context) error {

func (c *Monitored) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestSend:
case *testspb.TestPanic:
panic("panicked")
Expand Down Expand Up @@ -177,6 +181,7 @@ func (x *UserActor) PostStop(_ context.Context) error {

func (x *UserActor) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestLogin:
ctx.Response(new(testspb.TestLoginSuccess))
ctx.Become(x.Authenticated)
Expand All @@ -189,6 +194,7 @@ func (x *UserActor) Receive(ctx ReceiveContext) {
// Authenticated behavior is executed when the actor receive the TestAuth message
func (x *UserActor) Authenticated(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestReadiness:
ctx.Response(new(testspb.TestReady))
ctx.UnBecome()
Expand All @@ -197,6 +203,7 @@ func (x *UserActor) Authenticated(ctx ReceiveContext) {

func (x *UserActor) CreditAccount(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.CreditAccount:
ctx.Response(new(testspb.AccountCredited))
ctx.BecomeStacked(x.DebitAccount)
Expand All @@ -207,6 +214,7 @@ func (x *UserActor) CreditAccount(ctx ReceiveContext) {

func (x *UserActor) DebitAccount(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.DebitAccount:
ctx.Response(new(testspb.AccountDebited))
ctx.UnBecomeStacked()
Expand All @@ -222,6 +230,7 @@ func (e *Exchanger) PreStart(context.Context) error {
func (e *Exchanger) Receive(ctx ReceiveContext) {
message := ctx.Message()
switch message.(type) {
case *messagespb.Initialized:
case *testspb.TestSend:
_ = ctx.Self().Tell(ctx.Context(), ctx.Sender(), new(testspb.TestSend))
case *testspb.TestReply:
Expand All @@ -247,6 +256,7 @@ func (x *Stasher) PreStart(context.Context) error {

func (x *Stasher) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestStash:
ctx.Become(x.Ready)
ctx.Stash()
Expand All @@ -258,6 +268,7 @@ func (x *Stasher) Receive(ctx ReceiveContext) {

func (x *Stasher) Ready(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestStash:
case *testspb.TestLogin:
ctx.Stash()
Expand Down Expand Up @@ -299,6 +310,7 @@ func (x *PostStopBreaker) PreStart(context.Context) error {

func (x *PostStopBreaker) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestSend:
case *testspb.TestPanic:
panic("panicked")
Expand Down Expand Up @@ -348,6 +360,7 @@ func (x *Forwarder) PreStart(context.Context) error {

func (x *Forwarder) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
case *testspb.TestBye:
ctx.Forward(x.actorRef)
}
Expand All @@ -369,6 +382,8 @@ func (d *Discarder) PreStart(context.Context) error {

func (d *Discarder) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *messagespb.Initialized:
// pass
default:
ctx.Unhandled()
}
Expand Down
7 changes: 7 additions & 0 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
}
}

// push the started message into the actor mailbox
_ = Tell(ctx, pid, &messagespb.Initialized{
Id: pid.ActorPath().String(),
Timestamp: timestamppb.Now(),
Address: pid.ActorPath().RemoteAddress(),
})

// return the actor reference
return pid, nil
}
Expand Down
143 changes: 121 additions & 22 deletions pb/messages/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions protos/goakt/messages/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ syntax = "proto3";

package messages.v1;

import "address/v1/address.proto";
import "google/protobuf/timestamp.proto";

option go_package = "github.com/tochemey/goakt/messages/v1;messagespb";

// Terminated is used to notify watching actors
Expand All @@ -12,3 +15,13 @@ message Terminated {}
// It is enqueued as ordinary messages.
// It will be handled after messages that were already queued in the mailbox.
message PoisonPill {}

// Initialized is used when an actor has successfully initialized
message Initialized {
// Specifies the actor id
string id = 1;
// Specifies the timestamp
google.protobuf.Timestamp timestamp = 2;
// Specifies the actor address
address.v1.Address address = 3;
}
17 changes: 13 additions & 4 deletions testkit/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tochemey/goakt/actors"
messagespb "github.com/tochemey/goakt/pb/messages/v1"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
Expand Down Expand Up @@ -64,10 +65,18 @@ func (x *probeActor) PreStart(_ context.Context) error {

// Receive handle message received
func (x *probeActor) Receive(ctx actors.ReceiveContext) {
// any message received is pushed to the queue
x.messageQueue <- message{
sender: ctx.Sender(),
payload: ctx.Message(),
switch ctx.Message().(type) {
// skip system message
case *messagespb.PoisonPill,
*messagespb.Terminated,
*messagespb.Initialized:
// pass
default:
// any message received is pushed to the queue
x.messageQueue <- message{
sender: ctx.Sender(),
payload: ctx.Message(),
}
}
}

Expand Down

0 comments on commit b658076

Please sign in to comment.