Skip to content

Commit

Permalink
Merge pull request #830 from nyaruka/contact_fires
Browse files Browse the repository at this point in the history
Insert contact fire for session timeout
  • Loading branch information
rowanseymour authored Jan 28, 2025
2 parents 34ecc58 + 8e07f3d commit d0e4b96
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 46 deletions.
2 changes: 1 addition & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status
// if message was successfully sent, and we have a session timeout, update it
wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead
if wasSuccess && dbMsg.SessionTimeout_ != 0 {
if err := updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionModifiedOn_, dbMsg.SessionTimeout_); err != nil {
if err := b.insertTimeoutFire(ctx, dbMsg); err != nil {
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}
Expand Down
42 changes: 37 additions & 5 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/test"
"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/i18n"
Expand Down Expand Up @@ -1337,15 +1338,46 @@ func (ts *BackendTestSuite) TestChannelEvent() {
func (ts *BackendTestSuite) TestSessionTimeout() {
ctx := context.Background()

// parse from an iso date
t, err := time.Parse("2006-01-02 15:04:05.000000-07", "2018-12-04 11:52:20.900234-08")
dates.SetNowFunc(dates.NewSequentialNow(time.Date(2025, 1, 28, 20, 43, 34, 157379218, time.UTC), time.Second))
defer dates.SetNowFunc(time.Now)

msgJSON := `{
"uuid": "54c893b9-b026-44fc-a490-50aed0361c3f",
"id": 204,
"org_id": 1,
"text": "Test message 21",
"contact_id": 100,
"contact_urn_id": 14,
"channel_uuid": "f3ad3eb6-d00d-4dc3-92e9-9f34f32940ba",
"urn": "telegram:3527065",
"created_on": "2017-07-21T19:22:23.242757Z",
"high_priority": true,
"session_id": 12345,
"session_timeout": 3600,
"session_modified_on": "2025-01-28T20:43:34.157379218Z"
}`

msg := &Msg{}
jsonx.MustUnmarshal([]byte(msgJSON), msg)

err := ts.b.insertTimeoutFire(ctx, msg)
ts.NoError(err)

err = updateSessionTimeout(ctx, ts.b, SessionID(1), t, 300)
assertdb.Query(ts.T(), ts.b.db, `SELECT org_id, contact_id, fire_type, scope, extra->>'session_id' AS session_id, extra->>'session_modified_on' AS session_modified_on FROM contacts_contactfire`).
Columns(map[string]any{
"org_id": int64(1),
"contact_id": int64(100),
"fire_type": "T",
"scope": "",
"session_id": "12345",
"session_modified_on": "2025-01-28T20:43:34.157379218Z",
})

// if there's a conflict (e.g. in this case trying to add same timeout again), it should be ignored
err = ts.b.insertTimeoutFire(ctx, msg)
ts.NoError(err)

// make sure that took
assertdb.Query(ts.T(), ts.b.db, `SELECT count(*) from flows_flowsession WHERE timeout_on > NOW()`).Returns(1)
assertdb.Query(ts.T(), ts.b.db, `SELECT count(*) FROM contacts_contactfire`).Returns(1)
}

func (ts *BackendTestSuite) TestMailroomEvents() {
Expand Down
21 changes: 12 additions & 9 deletions backends/rapidpro/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ CREATE TABLE contacts_contacturn (
UNIQUE (org_id, identity)
);

DROP TABLE IF EXISTS contacts_contactfire CASCADE;
CREATE TABLE IF NOT EXISTS contacts_contactfire (
id serial primary key,
org_id integer NOT NULL,
contact_id integer references contacts_contact(id) on delete cascade,
fire_type character varying(1) NOT NULL,
scope character varying(128) NOT NULL,
extra jsonb,
fire_on timestamp with time zone NOT NULL,
UNIQUE (contact_id, fire_type, scope)
);

DROP TABLE IF EXISTS msgs_optin CASCADE;
CREATE TABLE msgs_optin (
id serial primary key,
Expand Down Expand Up @@ -128,15 +140,6 @@ CREATE TABLE channels_channelevent (
log_uuids uuid[]
);

DROP TABLE IF EXISTS flows_flowsession CASCADE;
CREATE TABLE flows_flowsession (
id serial primary key,
status character varying(1) NOT NULL,
modified_on timestamp with time zone NOT NULL,
timeout_on timestamp with time zone NULL,
wait_started_on timestamp with time zone
);

DROP TABLE IF EXISTS msgs_media CASCADE;
CREATE TABLE IF NOT EXISTS msgs_media (
id serial primary key,
Expand Down
26 changes: 0 additions & 26 deletions backends/rapidpro/sessions.go

This file was deleted.

5 changes: 0 additions & 5 deletions backends/rapidpro/testdata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,3 @@ INSERT INTO msgs_media("id", "uuid", "org_id", "content_type", "url", "path", "s
INSERT INTO msgs_media("id", "uuid", "org_id", "content_type", "url", "path", "size", "duration", "width", "height", "original_id")
VALUES(102, '514c552c-e585-40e2-938a-fe9450172da8', 1, 'audio/mp4', 'http://nyaruka.s3.com/orgs/1/media/514c/514c552c-e585-40e2-938a-fe9450172da8/test.m4a', '/orgs/1/media/514c/514c552c-e585-40e2-938a-fe9450172da8/test.m4a', 114, 500, 0, 0, 101);

/** Simple session */
DELETE from flows_flowsession;
INSERT INTO flows_flowsession("id", "status", "modified_on", "wait_started_on")
VALUES(1, 'W', '2018-12-04 11:52:20.900234-08', '2018-12-04 11:52:20.900123-08'),
(2, 'C', '2018-12-04 11:52:20.900456-08', '2018-12-04 11:52:20.900345-08');
30 changes: 30 additions & 0 deletions backends/rapidpro/timeouts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package rapidpro

import (
"context"
"fmt"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/jsonx"
)

// SessionID is our type for RapidPro session ids
type SessionID int64

const sqlInsertTimeoutFire = `
INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, extra, fire_on)
VALUES($1, $2, 'T', '', $3, $4)
ON CONFLICT DO NOTHING`

// insertTimeoutFire inserts a timeout fire for the session associated with the given msg
func (b *backend) insertTimeoutFire(ctx context.Context, m *Msg) error {
extra := map[string]any{"session_id": m.SessionID_, "session_modified_on": m.SessionModifiedOn_}
timeoutOn := dates.Now().Add(time.Duration(m.SessionTimeout_) * time.Second)

_, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, jsonx.MustMarshal(extra), timeoutOn)
if err != nil {
return fmt.Errorf("error inserting session timeout contact fire for session #%d: %w", m.SessionID_, err)
}
return nil
}

0 comments on commit d0e4b96

Please sign in to comment.