Skip to content

Commit

Permalink
Merge pull request #92 from communityconnectlabs/ccl-v7.0
Browse files Browse the repository at this point in the history
Release 7.11
  • Loading branch information
teehamaral authored Jan 3, 2025
2 parents 5e126cc + 6150bab commit 6c532eb
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 352 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ For Trackable Links the follow parameters should be set up as env variable:
For Spell Checker the follow parameter should be set up as env variable:
- `MAILROOM_SPELL_CHECKER_KEY`: The Bing Spell Checker API key

Others
- `MAILROOM_OPT_BACK_IN_KEYWORDS`: By default it's REJOIN START but you can add more separate by space. Once the contact is stopped, they will join back only if they type one of the opt back in keywords.

# Development

Once you've checked out the code, you can build Mailroom with:
Expand Down
134 changes: 2 additions & 132 deletions core/models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"database/sql/driver"
"encoding/json"
"github.com/nyaruka/goflow/flows"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/null"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -372,132 +371,3 @@ func ReadSessionHistory(data []byte) (*flows.SessionHistory, error) {
h := &flows.SessionHistory{}
return h, jsonx.Unmarshal(data, h)
}

// StudioFlowStart represents the top level studio flow start in our system
type StudioFlowStart struct {
s struct {
ID StartID `json:"start_id" db:"id"`
UUID uuids.UUID ` db:"uuid"`
OrgID OrgID `json:"org_id" db:"org_id"`
FlowSID string `json:"flow_sid" db:"flow_sid"`
Channel string `json:"channel" db:"channel"`

GroupIDs []GroupID `json:"group_ids,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`

Metadata map[string]interface{} `json:"metadata" db:"metadata"`
}
}

func (s *StudioFlowStart) ID() StartID { return s.s.ID }
func (s *StudioFlowStart) OrgID() OrgID { return s.s.OrgID }
func (s *StudioFlowStart) FlowSID() string { return s.s.FlowSID }
func (s *StudioFlowStart) GroupIDs() []GroupID { return s.s.GroupIDs }
func (s *StudioFlowStart) ContactIDs() []ContactID { return s.s.ContactIDs }
func (s *StudioFlowStart) Channel() string { return s.s.Channel }

func (s *StudioFlowStart) MarshalJSON() ([]byte, error) { return json.Marshal(s.s) }
func (s *StudioFlowStart) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, &s.s) }

const loadContactPhonesSQL = `
SELECT
DISTINCT urns.path
FROM
contacts_contacturn urns
WHERE
scheme='tel' AND
urns.contact_id = ANY($1);
`

func (s *StudioFlowStart) LoadContactPhones(ctx context.Context, db *sqlx.DB, ids []int64) ([]string, error) {
rows, err := db.QueryxContext(ctx, loadContactPhonesSQL, pq.Array(ids))
if err != nil {
return nil, errors.Wrapf(err, "error querying urns for studio flow start: %d", s.ID())
}
defer rows.Close()

var path string
contactURNs := make([]string, 0)
for rows.Next() {
err := rows.Scan(&path)
if err != nil {
return nil, errors.Wrapf(err, "error getting urn")
}
contactURNs = append(contactURNs, path)
}
return contactURNs, nil
}

const selectTwilioConfigSQL = `
SELECT
s.config_json->>'ACCOUNT_SID' as account_sid,
s.config_json->>'ACCOUNT_TOKEN' as account_token
FROM (
SELECT
config::json as config_json
FROM orgs_org
WHERE id = $1
) s;
`

func (s *StudioFlowStart) LoadTwilioConfig(ctx context.Context, db *sqlx.DB) (string, string, error) {
rows, err := db.QueryxContext(ctx, selectTwilioConfigSQL, s.s.OrgID)
if err != nil {
return "", "", errors.Wrapf(err, "error querying urns for studio flow start: %d", s.ID())
}
defer rows.Close()

var accountSID string
var accountToken string
for rows.Next() {
err := rows.Scan(&accountSID, &accountToken)
if err != nil {
return "", "", errors.Wrapf(err, "error selecting twilio config")
}
}
return accountSID, accountToken, nil
}

func (s *StudioFlowStart) WithMetadata(metadata map[string]interface{}) *StudioFlowStart {
s.s.Metadata = metadata
return s
}

const updateStudioFlowMetadata = `
UPDATE flows_studioflowstart SET metadata = $2, modified_on = NOW() WHERE id = $1
`

func (s *StudioFlowStart) UpdateMetadata(ctx context.Context, db *sqlx.DB) error {
if encodedMetadata, err := json.Marshal(s.s.Metadata); err == nil {
if _, err = db.ExecContext(ctx, updateStudioFlowMetadata, s.ID(), encodedMetadata); err != nil {
return errors.Wrapf(err, "error updating metadata")
}
} else {
return errors.Wrapf(err, "error marshaling metadata")
}
return nil
}

func (s *StudioFlowStart) MarkStartStarted(ctx context.Context, db *sqlx.DB) error {
_, err := db.ExecContext(ctx, "UPDATE flows_studioflowstart SET status = 'S', modified_on = NOW() WHERE id = $1", s.ID())
if err != nil {
return errors.Wrapf(err, "error setting start as started")
}
return nil
}

func (s *StudioFlowStart) MarkStartComplete(ctx context.Context, db *sqlx.DB) error {
_, err := db.ExecContext(ctx, "UPDATE flows_studioflowstart SET status = 'C', modified_on = NOW() WHERE id = $1", s.ID())
if err != nil {
return errors.Wrapf(err, "error setting start as complete")
}
return nil
}

func (s *StudioFlowStart) MarkStartFailed(ctx context.Context, db *sqlx.DB) error {
_, err := db.ExecContext(ctx, "UPDATE flows_studioflowstart SET status = 'F', modified_on = NOW() WHERE id = $1", s.ID())
if err != nil {
return errors.Wrapf(err, "error setting start as failed")
}
return nil
}
3 changes: 0 additions & 3 deletions core/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ const (
// StartFlow is our task type to start a flow
StartFlow = "start_flow"

// StartStudioFlow is our task type to start Twilio Studio Flow
StartStudioFlow = "start_studio_flow"

// StartFlowBatch is our task for starting a flow batch
StartFlowBatch = "start_flow_batch"

Expand Down
51 changes: 32 additions & 19 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nfnt/resize"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/storage"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/excellent/types"
Expand Down Expand Up @@ -596,27 +596,40 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent, s
// stopped contact? they are unstopped if they send us an incoming message
newContact := event.NewContact
if modelContact.Status() == models.ContactStatusStopped {
err := modelContact.Unstop(ctx, rt.DB)
if err != nil {
return errors.Wrapf(err, "error unstopping contact")
}
optBackInKeywords := strings.Split(rt.Config.OptBackInKeywords, " ")
if utils.StringSliceContains(optBackInKeywords, event.Text, false) {
err := modelContact.Unstop(ctx, rt.DB)
if err != nil {
return errors.Wrapf(err, "error unstopping contact")
}

err = models.AddContactToOptOutedGroups(ctx, rt, event.OrgID, modelContact.ID())
if err != nil {
return errors.Wrapf(err, "error adding contact to groups")
}
err = models.AddContactToOptOutedGroups(ctx, rt, event.OrgID, modelContact.ID())
if err != nil {
return errors.Wrapf(err, "error adding contact to groups")
}

newContact = true
newContact = true

// reload contact with updated status
contacts, err = models.LoadContacts(ctx, rt.ReadonlyDB, oa, []models.ContactID{event.ContactID})
if err != nil {
return errors.Wrapf(err, "error loading contact")
}
modelContact = contacts[0]
contact, err = modelContact.FlowContact(oa)
if err != nil {
return errors.Wrapf(err, "error creating flow contact")
// reload contact with updated status
contacts, err = models.LoadContacts(ctx, rt.ReadonlyDB, oa, []models.ContactID{event.ContactID})
if err != nil {
return errors.Wrapf(err, "error loading contact")
}
modelContact = contacts[0]
contact, err = modelContact.FlowContact(oa)
if err != nil {
return errors.Wrapf(err, "error creating flow contact")
}
} else {
msgIn := flows.NewMsgIn(event.MsgUUID, event.URN, channel.ChannelReference(), event.Text, event.Attachments)
msgIn.SetExternalID(string(event.MsgExternalID))
msgIn.SetID(event.MsgID)

err = handleAsInbox(ctx, rt, oa, contact, msgIn, topupID, nil)
if err != nil {
return errors.Wrapf(err, "error handling inbox message")
}
return nil
}
}

Expand Down
135 changes: 0 additions & 135 deletions core/tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ package starts
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"time"

"github.com/nyaruka/goflow/contactql"
Expand All @@ -29,7 +25,6 @@ const (
func init() {
mailroom.AddTaskFunction(queue.StartFlow, handleFlowStart)
mailroom.AddTaskFunction(queue.StartFlowBatch, handleFlowStartBatch)
mailroom.AddTaskFunction(queue.StartStudioFlow, handleStudioFlowStart)
}

// handleFlowStart creates all the batches of contacts to start in a flow
Expand Down Expand Up @@ -233,133 +228,3 @@ func handleFlowStartBatch(ctx context.Context, rt *runtime.Runtime, task *queue.

return err
}

type RequestSender interface {
Do(*http.Request) (*http.Response, error)
}

var requestSender RequestSender = http.DefaultClient

func handleStudioFlowStart(ctx context.Context, rt *runtime.Runtime, task *queue.Task) error {
db := rt.DB
ctx, cancel := context.WithTimeout(ctx, time.Minute*60)
defer cancel()

startTask := &models.StudioFlowStart{}
err := json.Unmarshal(task.Task, startTask)
if err != nil {
return errors.Wrapf(err, "error unmarshalling studio flow start task: %s", string(task.Task))
}

accountSID, accountToken, err := startTask.LoadTwilioConfig(ctx, db)
if err != nil {
return errors.Wrapf(err, "error loading studio flow start channel")
}

if accountSID == "" {
return errors.Wrapf(err, "missing account sid for %d org", task.OrgID)
}

if accountToken == "" {
return errors.Wrapf(err, "missing account auth token for %d org", task.OrgID)
}

contactIDsSet := make(map[models.ContactID]bool)
// we are building a set of contact ids, start with the explicit ones
for _, id := range startTask.ContactIDs() {
contactIDsSet[id] = true
}

// now add all the ids for our groups
if len(startTask.GroupIDs()) > 0 {
rows, err := db.QueryxContext(ctx, `SELECT contact_id FROM contacts_contactgroup_contacts WHERE contactgroup_id = ANY($1)`, pq.Array(startTask.GroupIDs()))
if err != nil {
return errors.Wrapf(err, "error selecting contacts for groups")
}
defer rows.Close()

var contactID models.ContactID
for rows.Next() {
err := rows.Scan(&contactID)
if err != nil {
return errors.Wrapf(err, "error scanning contact id")
}
contactIDsSet[contactID] = true
}
}

// skip if there is no contacts selected
if len(contactIDsSet) == 0 {
return nil
}

contactIDs := make([]int64, 0, len(contactIDsSet))
for contactID := range contactIDsSet {
contactIDs = append(contactIDs, int64(contactID))
}

// 80 mps limiting for the twilio
chunkSize := 80
chunkNumber := 0
successCount := 0
failureCount := 0
totalContactIDs := len(contactIDs)
contactIDChunkSelector := func(chunkIndex int) []int64 {
start := chunkIndex * chunkSize
end := start + chunkSize
if start > totalContactIDs {
return []int64{}
}
if end > totalContactIDs {
end = totalContactIDs
}
return contactIDs[start:end]
}
sendURL := fmt.Sprintf("https://studio.twilio.com/v2/Flows/%s/Executions", startTask.FlowSID())
for range time.Tick(1 * time.Second) {
contactIDsChunk := contactIDChunkSelector(chunkNumber)
if len(contactIDsChunk) == 0 {
break
}

contactPhones, err := startTask.LoadContactPhones(ctx, db, contactIDsChunk)
if err != nil {
startTask.MarkStartFailed(ctx, db)
return errors.Wrapf(err, "error getting contact urns")
}

// send requests to twilio
for _, phone := range contactPhones {
form := url.Values{
"To": []string{phone},
"From": []string{startTask.Channel()},
}

req, err := http.NewRequest(http.MethodPost, sendURL, strings.NewReader(form.Encode()))
if err != nil {
startTask.MarkStartFailed(ctx, db)
return err
}
req.SetBasicAuth(accountSID, accountToken)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Accept", "application/json")

resp, err := requestSender.Do(req)
if err != nil || resp.StatusCode != 201 {
failureCount++
} else {
successCount++
}
}
chunkNumber++

startTask.WithMetadata(map[string]interface{}{
"total_contacts": totalContactIDs,
"success_count": successCount,
"failure_count": failureCount,
"processed_batches": chunkNumber,
"batch_size": chunkSize,
}).UpdateMetadata(ctx, db)
}
return startTask.MarkStartComplete(ctx, db)
}
Loading

0 comments on commit 6c532eb

Please sign in to comment.