Skip to content

Commit

Permalink
Removed the Twilio Studio
Browse files Browse the repository at this point in the history
  • Loading branch information
ihor-palii committed Dec 16, 2024
1 parent cfd9833 commit 3c59ba3
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 329 deletions.
134 changes: 2 additions & 132 deletions core/models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"database/sql/driver"
"encoding/json"
"github.com/nyaruka/goflow/flows"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/null"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -372,132 +371,3 @@ func ReadSessionHistory(data []byte) (*flows.SessionHistory, error) {
h := &flows.SessionHistory{}
return h, jsonx.Unmarshal(data, h)
}

// StudioFlowStart represents the top level studio flow start in our system
type StudioFlowStart struct {
s struct {
ID StartID `json:"start_id" db:"id"`
UUID uuids.UUID ` db:"uuid"`
OrgID OrgID `json:"org_id" db:"org_id"`
FlowSID string `json:"flow_sid" db:"flow_sid"`
Channel string `json:"channel" db:"channel"`

GroupIDs []GroupID `json:"group_ids,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`

Metadata map[string]interface{} `json:"metadata" db:"metadata"`
}
}

func (s *StudioFlowStart) ID() StartID { return s.s.ID }
func (s *StudioFlowStart) OrgID() OrgID { return s.s.OrgID }
func (s *StudioFlowStart) FlowSID() string { return s.s.FlowSID }
func (s *StudioFlowStart) GroupIDs() []GroupID { return s.s.GroupIDs }
func (s *StudioFlowStart) ContactIDs() []ContactID { return s.s.ContactIDs }
func (s *StudioFlowStart) Channel() string { return s.s.Channel }

func (s *StudioFlowStart) MarshalJSON() ([]byte, error) { return json.Marshal(s.s) }
func (s *StudioFlowStart) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, &s.s) }

const loadContactPhonesSQL = `
SELECT
DISTINCT urns.path
FROM
contacts_contacturn urns
WHERE
scheme='tel' AND
urns.contact_id = ANY($1);
`

func (s *StudioFlowStart) LoadContactPhones(ctx context.Context, db *sqlx.DB, ids []int64) ([]string, error) {
rows, err := db.QueryxContext(ctx, loadContactPhonesSQL, pq.Array(ids))
if err != nil {
return nil, errors.Wrapf(err, "error querying urns for studio flow start: %d", s.ID())
}
defer rows.Close()

var path string
contactURNs := make([]string, 0)
for rows.Next() {
err := rows.Scan(&path)
if err != nil {
return nil, errors.Wrapf(err, "error getting urn")
}
contactURNs = append(contactURNs, path)
}
return contactURNs, nil
}

const selectTwilioConfigSQL = `
SELECT
s.config_json->>'ACCOUNT_SID' as account_sid,
s.config_json->>'ACCOUNT_TOKEN' as account_token
FROM (
SELECT
config::json as config_json
FROM orgs_org
WHERE id = $1
) s;
`

func (s *StudioFlowStart) LoadTwilioConfig(ctx context.Context, db *sqlx.DB) (string, string, error) {
rows, err := db.QueryxContext(ctx, selectTwilioConfigSQL, s.s.OrgID)
if err != nil {
return "", "", errors.Wrapf(err, "error querying urns for studio flow start: %d", s.ID())
}
defer rows.Close()

var accountSID string
var accountToken string
for rows.Next() {
err := rows.Scan(&accountSID, &accountToken)
if err != nil {
return "", "", errors.Wrapf(err, "error selecting twilio config")
}
}
return accountSID, accountToken, nil
}

func (s *StudioFlowStart) WithMetadata(metadata map[string]interface{}) *StudioFlowStart {
s.s.Metadata = metadata
return s
}

const updateStudioFlowMetadata = `
UPDATE flows_studioflowstart SET metadata = $2, modified_on = NOW() WHERE id = $1
`

func (s *StudioFlowStart) UpdateMetadata(ctx context.Context, db *sqlx.DB) error {
if encodedMetadata, err := json.Marshal(s.s.Metadata); err == nil {
if _, err = db.ExecContext(ctx, updateStudioFlowMetadata, s.ID(), encodedMetadata); err != nil {
return errors.Wrapf(err, "error updating metadata")
}
} else {
return errors.Wrapf(err, "error marshaling metadata")
}
return nil
}

func (s *StudioFlowStart) MarkStartStarted(ctx context.Context, db *sqlx.DB) error {
_, err := db.ExecContext(ctx, "UPDATE flows_studioflowstart SET status = 'S', modified_on = NOW() WHERE id = $1", s.ID())
if err != nil {
return errors.Wrapf(err, "error setting start as started")
}
return nil
}

func (s *StudioFlowStart) MarkStartComplete(ctx context.Context, db *sqlx.DB) error {
_, err := db.ExecContext(ctx, "UPDATE flows_studioflowstart SET status = 'C', modified_on = NOW() WHERE id = $1", s.ID())
if err != nil {
return errors.Wrapf(err, "error setting start as complete")
}
return nil
}

func (s *StudioFlowStart) MarkStartFailed(ctx context.Context, db *sqlx.DB) error {
_, err := db.ExecContext(ctx, "UPDATE flows_studioflowstart SET status = 'F', modified_on = NOW() WHERE id = $1", s.ID())
if err != nil {
return errors.Wrapf(err, "error setting start as failed")
}
return nil
}
3 changes: 0 additions & 3 deletions core/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ const (
// StartFlow is our task type to start a flow
StartFlow = "start_flow"

// StartStudioFlow is our task type to start Twilio Studio Flow
StartStudioFlow = "start_studio_flow"

// StartFlowBatch is our task for starting a flow batch
StartFlowBatch = "start_flow_batch"

Expand Down
135 changes: 0 additions & 135 deletions core/tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ package starts
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"time"

"github.com/nyaruka/goflow/contactql"
Expand All @@ -29,7 +25,6 @@ const (
func init() {
mailroom.AddTaskFunction(queue.StartFlow, handleFlowStart)
mailroom.AddTaskFunction(queue.StartFlowBatch, handleFlowStartBatch)
mailroom.AddTaskFunction(queue.StartStudioFlow, handleStudioFlowStart)
}

// handleFlowStart creates all the batches of contacts to start in a flow
Expand Down Expand Up @@ -233,133 +228,3 @@ func handleFlowStartBatch(ctx context.Context, rt *runtime.Runtime, task *queue.

return err
}

type RequestSender interface {
Do(*http.Request) (*http.Response, error)
}

var requestSender RequestSender = http.DefaultClient

func handleStudioFlowStart(ctx context.Context, rt *runtime.Runtime, task *queue.Task) error {
db := rt.DB
ctx, cancel := context.WithTimeout(ctx, time.Minute*60)
defer cancel()

startTask := &models.StudioFlowStart{}
err := json.Unmarshal(task.Task, startTask)
if err != nil {
return errors.Wrapf(err, "error unmarshalling studio flow start task: %s", string(task.Task))
}

accountSID, accountToken, err := startTask.LoadTwilioConfig(ctx, db)
if err != nil {
return errors.Wrapf(err, "error loading studio flow start channel")
}

if accountSID == "" {
return errors.Wrapf(err, "missing account sid for %d org", task.OrgID)
}

if accountToken == "" {
return errors.Wrapf(err, "missing account auth token for %d org", task.OrgID)
}

contactIDsSet := make(map[models.ContactID]bool)
// we are building a set of contact ids, start with the explicit ones
for _, id := range startTask.ContactIDs() {
contactIDsSet[id] = true
}

// now add all the ids for our groups
if len(startTask.GroupIDs()) > 0 {
rows, err := db.QueryxContext(ctx, `SELECT contact_id FROM contacts_contactgroup_contacts WHERE contactgroup_id = ANY($1)`, pq.Array(startTask.GroupIDs()))
if err != nil {
return errors.Wrapf(err, "error selecting contacts for groups")
}
defer rows.Close()

var contactID models.ContactID
for rows.Next() {
err := rows.Scan(&contactID)
if err != nil {
return errors.Wrapf(err, "error scanning contact id")
}
contactIDsSet[contactID] = true
}
}

// skip if there is no contacts selected
if len(contactIDsSet) == 0 {
return nil
}

contactIDs := make([]int64, 0, len(contactIDsSet))
for contactID := range contactIDsSet {
contactIDs = append(contactIDs, int64(contactID))
}

// 80 mps limiting for the twilio
chunkSize := 80
chunkNumber := 0
successCount := 0
failureCount := 0
totalContactIDs := len(contactIDs)
contactIDChunkSelector := func(chunkIndex int) []int64 {
start := chunkIndex * chunkSize
end := start + chunkSize
if start > totalContactIDs {
return []int64{}
}
if end > totalContactIDs {
end = totalContactIDs
}
return contactIDs[start:end]
}
sendURL := fmt.Sprintf("https://studio.twilio.com/v2/Flows/%s/Executions", startTask.FlowSID())
for range time.Tick(1 * time.Second) {
contactIDsChunk := contactIDChunkSelector(chunkNumber)
if len(contactIDsChunk) == 0 {
break
}

contactPhones, err := startTask.LoadContactPhones(ctx, db, contactIDsChunk)
if err != nil {
startTask.MarkStartFailed(ctx, db)
return errors.Wrapf(err, "error getting contact urns")
}

// send requests to twilio
for _, phone := range contactPhones {
form := url.Values{
"To": []string{phone},
"From": []string{startTask.Channel()},
}

req, err := http.NewRequest(http.MethodPost, sendURL, strings.NewReader(form.Encode()))
if err != nil {
startTask.MarkStartFailed(ctx, db)
return err
}
req.SetBasicAuth(accountSID, accountToken)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Accept", "application/json")

resp, err := requestSender.Do(req)
if err != nil || resp.StatusCode != 201 {
failureCount++
} else {
successCount++
}
}
chunkNumber++

startTask.WithMetadata(map[string]interface{}{
"total_contacts": totalContactIDs,
"success_count": successCount,
"failure_count": failureCount,
"processed_batches": chunkNumber,
"batch_size": chunkSize,
}).UpdateMetadata(ctx, db)
}
return startTask.MarkStartComplete(ctx, db)
}
60 changes: 1 addition & 59 deletions core/tasks/starts/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package starts
import (
"encoding/json"
"fmt"
"net/http"
"testing"

"github.com/nyaruka/gocommon/dbutil/assertdb"
Expand All @@ -15,9 +14,9 @@ import (
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/olivere/elastic/v7"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/olivere/elastic/v7"
)

func TestStarts(t *testing.T) {
Expand Down Expand Up @@ -324,60 +323,3 @@ func TestStarts(t *testing.T) {
}
}
}

type mockHttpClient struct {
LastRequest *http.Request
}

func (mhc *mockHttpClient) Do(req *http.Request) (*http.Response, error) {
mhc.LastRequest = req
return &http.Response{StatusCode: 200}, nil
}

const createStudioFlowStartTable = `
UPDATE orgs_org SET config = '{"ACCOUNT_SID": "account_sid", "ACCOUNT_TOKEN": "account_token"}' WHERE id = 1;
CREATE TABLE IF NOT EXISTS flows_studioflowstart
(
id serial not null constraint flows_studioflowstart_pkey primary key,
uuid uuid not null constraint flows_studioflowstart_uuid_key unique,
flow_sid varchar(64) not null,
status varchar(1) not null,
org_id integer not null constraint flows_studioflowstart_org_id_8b06fe26_fk_orgs_org_id references orgs_org deferrable initially deferred,
channel_id integer not null constraint flows_studioflowstar_channel_id_15399058_fk_channels_ references channels_channel deferrable initially deferred,
created_by_id integer constraint flows_studioflowstart_created_by_id_650bb2e5_fk_auth_user_id references auth_user deferrable initially deferred,
metadata jsonb not null,
created_on timestamp with time zone not null,
modified_on timestamp with time zone not null
);
`

func TestStudioFlowStarts(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

mes := testsuite.NewMockElasticServer()
defer mes.Close()

startTask := map[string]interface{}{
"org_id": 1,
"start_id": 1,
"flow_sid": "FW2932f221ca8741fb714ff97df7986172",
"channel_id": testdata.TwilioChannel.ID,
"contact_ids": []int64{int64(testdata.George.ID)},
}
startTaskEncoded, _ := json.Marshal(startTask)
task := &queue.Task{
OrgID: 1,
Type: queue.StartStudioFlow,
Task: startTaskEncoded,
}
db.MustExecContext(ctx, createStudioFlowStartTable)
db.MustExecContext(ctx, `INSERT INTO flows_studioflowstart(org_id, uuid, status, metadata, flow_sid, channel, created_by_id, created_on, modified_on)
VALUES(1, $1, 'P', '{}', 'FW2932f221ca8741fb714ff97df7986172', $2, $3, now(), now());`,
uuids.New(), testdata.TwilioChannel.ID, int64(2),
)

requestSender = &mockHttpClient{}
err := handleStudioFlowStart(ctx, rt, task)
assert.NoError(t, err)
requestSender = http.DefaultClient
}

0 comments on commit 3c59ba3

Please sign in to comment.