diff --git a/archiver.go b/archiver.go index b9c08ef..191a71f 100644 --- a/archiver.go +++ b/archiver.go @@ -18,6 +18,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" "github.com/lib/pq" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -1280,6 +1281,121 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 return nil } +// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them +func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { + start := time.Now() + threshhold := now.AddDate(0, 0, -org.ActiveDays) + + rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold) + if err != nil { + return err + } + defer rows.Close() + + count := 0 + for rows.Next() { + if count == 0 { + logrus.WithField("org_id", org.ID).Info("deleting broadcasts") + } + + // been deleting this org more than an hour? thats enough for today, exit out + if time.Since(start) > time.Hour { + break + } + + var broadcastID int64 + err := rows.Scan(&broadcastID) + if err != nil { + return errors.Wrap(err, "unable to get broadcast id") + } + + // make sure we have no active messages + var msgCount int64 + err = db.Get(&msgCount, `SELECT count(*) FROM msgs_msg WHERE broadcast_id = $1`, broadcastID) + if err != nil { + return errors.Wrapf(err, "unable to select number of msgs for broadcast: %d", broadcastID) + } + + if msgCount != 0 { + logrus.WithField("broadcast_id", broadcastID).WithField("org_id", org.ID).WithField("msg_count", msgCount).Warn("unable to delete broadcast, has messages still") + continue + } + + // we delete broadcasts in a transaction per broadcast + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return errors.Wrapf(err, "error starting transaction while deleting broadcast: %d", broadcastID) + } + + // delete contacts M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_contacts WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related contacts for broadcast: %d", broadcastID) + } + + // delete groups M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_groups WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related groups for broadcast: %d", broadcastID) + } + + // delete URNs M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_urns WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related urns for broadcast: %d", broadcastID) + } + + // delete counts associated with this broadcast + _, err = tx.Exec(`DELETE from msgs_broadcastmsgcount WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting counts for broadcast: %d", broadcastID) + } + + // finally, delete our broadcast + _, err = tx.Exec(`DELETE from msgs_broadcast WHERE id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) + } + + err = tx.Commit() + if err != nil { + return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) + } + + count++ + } + + if count > 0 { + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + "count": count, + "org_id": org.ID, + }).Info("completed deleting broadcasts") + } + + return nil +} + +const selectOldOrgBroadcasts = ` +SELECT + id +FROM + msgs_broadcast +WHERE + org_id = $1 AND + created_on < $2 AND + schedule_id IS NULL +ORDER BY + created_on ASC, + id ASC +LIMIT 1000000; +` + const selectOrgRunsInRange = ` SELECT fr.id, fr.is_active, cc.is_test FROM flows_flowrun fr @@ -1527,6 +1643,10 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config switch a.ArchiveType { case MessageType: err = DeleteArchivedMessages(ctx, config, db, s3Client, a) + if err == nil { + err = DeleteBroadcasts(ctx, now, config, db, org) + } + case RunType: err = DeleteArchivedRuns(ctx, config, db, s3Client, a) default: diff --git a/archiver_test.go b/archiver_test.go index 4e7a637..ed32e20 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -298,6 +298,8 @@ func TestArchiveOrgMessages(t *testing.T) { s3Client, err := NewS3Client(config) assert.NoError(t, err) + assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) + created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType) assert.NoError(t, err) @@ -397,6 +399,9 @@ func TestArchiveOrgMessages(t *testing.T) { ) assert.NoError(t, err) assert.Equal(t, 1, count) + + // one broadcast still exists because it has a schedule, the other because it still has msgs, the last because it is new + assertCount(t, db, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) } } @@ -406,6 +411,13 @@ FROM flows_flowrun WHERE org_id = $1 and modified_on >= $2 and modified_on < $3 ` +func assertCount(t *testing.T, db *sqlx.DB, expected int, query string, args ...interface{}) { + var count int + err := db.Get(&count, query, args...) + assert.NoError(t, err, "error executing query: %s", query) + assert.Equal(t, expected, count, "counts mismatch for query %s", query) +} + func TestArchiveOrgRuns(t *testing.T) { db := setup(t) ctx := context.Background() diff --git a/go.mod b/go.mod index c918611..efc26e8 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/naoina/go-stringutil v0.1.0 github.com/naoina/toml v0.1.1 github.com/nyaruka/ezconf v0.2.1 - github.com/pkg/errors v0.8.0 + github.com/pkg/errors v0.8.1 github.com/pmezard/go-difflib v1.0.0 github.com/sirupsen/logrus v1.0.5 github.com/stretchr/testify v1.2.1 diff --git a/go.sum b/go.sum index 8e1caaf..781bf5b 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36 h1:i93kN7TI/4T5 github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/go-ini/ini v1.36.0 h1:63En8accP8FKkFZ77ztSfvQf9kGRJN3qBIdItP46RRk= github.com/go-ini/ini v1.36.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -23,6 +24,7 @@ github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2 h1:hRGSmZu7j271trc9sneMrpOW github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= @@ -32,6 +34,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0= github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.0.5 h1:8c8b5uO0zS4X6RPl/sd1ENwSkIc0/H2PaHxE3udaE8I= diff --git a/testdb.sql b/testdb.sql index 0bcfcc4..857a773 100644 --- a/testdb.sql +++ b/testdb.sql @@ -109,9 +109,38 @@ CREATE TABLE msgs_broadcast ( "text" hstore NOT NULL, purged BOOLEAN NOT NULL, created_on timestamp with time zone NOT NULL, + schedule_id int NULL, org_id integer NOT NULL references orgs_org(id) on delete cascade ); +DROP TABLE IF EXISTS msgs_broadcast_contacts; +CREATE TABLE msgs_broadcast_contacts ( + id serial primary key, + broadcast_id integer NOT NULL, + contact_id integer NOT NULL +); + +DROP TABLE IF EXISTS msgs_broadcast_groups; +CREATE TABLE msgs_broadcast_groups ( + id serial primary key, + broadcast_id integer NOT NULL, + group_id integer NOT NULL +); + +DROP TABLE IF EXISTS msgs_broadcast_urns; +CREATE TABLE msgs_broadcast_urns ( + id serial primary key, + broadcast_id integer NOT NULL, + contacturn_id integer NOT NULL +); + +DROP TABLE IF EXISTS msgs_broadcastmsgcount; +CREATE TABLE msgs_broadcastmsgcount ( + id serial primary key, + "count" integer NOT NULL, + broadcast_id integer NOT NULL +); + DROP TABLE IF EXISTS msgs_label CASCADE; CREATE TABLE msgs_label ( id serial primary key, @@ -266,13 +295,19 @@ INSERT INTO contacts_contactgroup_contacts(id, contact_id, contactgroup_id) VALU (3, 1, 4), (4, 3, 4); +INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id, schedule_id) VALUES +(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, 1), +(2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, NULL), +(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2, NULL), +(4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', FALSE, 2, NULL); + INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt, response_to_id) VALUES (1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL), (2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL), (3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL), (4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00', NULL), (5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00', NULL), -(6, NULL, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL), +(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL), (7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00', 2), (8, NULL, '48fab92b-6d75-45c8-9121-81176d97bdbf', 'message 8', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 11, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL), (9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL); @@ -296,11 +331,6 @@ INSERT INTO channels_channellog(id, msg_id) VALUES (5, 5), (6, 6); -INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id) VALUES -(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2), -(2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2), -(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2); - INSERT INTO flows_flow(id, uuid, name) VALUES (1, '6639286a-9120-45d4-aa39-03ae3942a4a6', 'Flow 1'), (2, '629db399-a5fb-4fa0-88e6-f479957b63d2', 'Flow 2'),