Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🧹 Delete old flow starts after deleting runs #80

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,10 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config

case RunType:
err = DeleteArchivedRuns(ctx, config, db, s3Client, a)
if err == nil {
err = DeleteFlowStarts(ctx, now, config, db, org)
}

default:
err = fmt.Errorf("unknown archive type: %s", a.ArchiveType)
}
Expand Down
89 changes: 89 additions & 0 deletions archives/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,92 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie

return nil
}

const selectOldOrgFlowStarts = `
SELECT id
FROM flows_flowstart s
WHERE s.org_id = $1 AND s.created_on < $2 AND NOT EXISTS (SELECT 1 FROM flows_flowrun WHERE start_id = s.id)
LIMIT 1000000;`

// DeleteFlowStarts deletes all starts older than 90 days for the passed in org which have no associated runs
func DeleteFlowStarts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error {
start := dates.Now()
threshhold := now.AddDate(0, 0, -org.RetentionPeriod)

rows, err := db.QueryxContext(ctx, selectOldOrgFlowStarts, org.ID, threshhold)
if err != nil {
return err
}
defer rows.Close()

count := 0
for rows.Next() {
if count == 0 {
logrus.WithField("org_id", org.ID).Info("deleting starts")
}

// been deleting this org more than an hour? thats enough for today, exit out
if dates.Since(start) > time.Hour {
break
}

var startID int64
if err := rows.Scan(&startID); err != nil {
return errors.Wrap(err, "unable to get start id")
}

// we delete starts in a transaction per start
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return errors.Wrapf(err, "error starting transaction while deleting start: %d", startID)
}

// delete contacts M2M
_, err = tx.Exec(`DELETE from flows_flowstart_contacts WHERE flowstart_id = $1`, startID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related contacts for start: %d", startID)
}

// delete groups M2M
_, err = tx.Exec(`DELETE from flows_flowstart_groups WHERE flowstart_id = $1`, startID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related groups for start: %d", startID)
}

// delete calls M2M
_, err = tx.Exec(`DELETE from flows_flowstart_calls WHERE flowstart_id = $1`, startID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related calls for start: %d", startID)
}

// delete counts
_, err = tx.Exec(`DELETE from flows_flowstartcount WHERE start_id = $1`, startID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting counts for start: %d", startID)
}

// finally, delete our start
_, err = tx.Exec(`DELETE from flows_flowstart WHERE id = $1`, startID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting start: %d", startID)
}

err = tx.Commit()
if err != nil {
return errors.Wrapf(err, "error deleting start: %d", startID)
}

count++
}

if count > 0 {
logrus.WithFields(logrus.Fields{"elapsed": dates.Since(start), "count": count, "org_id": org.ID}).Info("completed deleting starts")
}

return nil
}
133 changes: 93 additions & 40 deletions testdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ CREATE EXTENSION IF NOT EXISTS HSTORE;
DROP TABLE IF EXISTS archives_archive CASCADE;
DROP TABLE IF EXISTS channels_channellog CASCADE;
DROP TABLE IF EXISTS channels_channel CASCADE;
DROP TABLE IF EXISTS flows_flowstart_contacts CASCADE;
DROP TABLE IF EXISTS flows_flowstart_groups CASCADE;
DROP TABLE IF EXISTS flows_flowstart_calls CASCADE;
DROP TABLE IF EXISTS flows_flowstart CASCADE;
DROP TABLE IF EXISTS flows_flowrun CASCADE;
DROP TABLE IF EXISTS flows_flow CASCADE;
DROP TABLE IF EXISTS msgs_broadcast_contacts CASCADE;
Expand All @@ -13,6 +17,7 @@ DROP TABLE IF EXISTS msgs_broadcast CASCADE;
DROP TABLE IF EXISTS msgs_label CASCADE;
DROP TABLE IF EXISTS msgs_msg_labels CASCADE;
DROP TABLE IF EXISTS msgs_msg CASCADE;
DROP TABLE IF EXISTS ivr_call CASCADE;
DROP TABLE IF EXISTS contacts_contacturn CASCADE;
DROP TABLE IF EXISTS contacts_contactgroup_contacts CASCADE;
DROP TABLE IF EXISTS contacts_contactgroup CASCADE;
Expand All @@ -32,7 +37,7 @@ CREATE TABLE channels_channel (
id serial primary key,
name character varying(255) NOT NULL,
uuid character varying(36) NOT NULL,
org_id integer references orgs_org(id) on delete cascade
org_id integer NOT NULL REFERENCES orgs_org(id)
);

CREATE TABLE contacts_contact (
Expand All @@ -42,7 +47,7 @@ CREATE TABLE contacts_contact (
created_on timestamp with time zone NOT NULL,
modified_by_id integer NOT NULL,
modified_on timestamp with time zone NOT NULL,
org_id integer NOT NULL references orgs_org(id) on delete cascade,
org_id integer NOT NULL REFERENCES orgs_org(id),
name character varying(128),
language character varying(3),
uuid character varying(36) NOT NULL,
Expand All @@ -53,7 +58,7 @@ CREATE TABLE contacts_contacturn (
id serial primary key,
contact_id integer,
scheme character varying(128) NOT NULL,
org_id integer NOT NULL,
org_id integer NOT NULL REFERENCES orgs_org(id),
priority integer NOT NULL,
path character varying(255) NOT NULL,
channel_id integer,
Expand All @@ -64,6 +69,7 @@ CREATE TABLE contacts_contacturn (

CREATE TABLE contacts_contactgroup (
id serial primary key,
org_id integer NOT NULL REFERENCES orgs_org(id),
uuid uuid NOT NULL,
name character varying(128) NOT NULL
);
Expand All @@ -76,6 +82,7 @@ CREATE TABLE contacts_contactgroup_contacts (

CREATE TABLE flows_flow (
id serial primary key,
org_id integer NOT NULL REFERENCES orgs_org(id),
uuid character varying(36) NOT NULL,
name character varying(128) NOT NULL
);
Expand All @@ -99,11 +106,11 @@ CREATE TABLE msgs_msg (
next_attempt timestamp with time zone NOT NULL,
external_id character varying(255),
attachments character varying(255)[],
channel_id integer references channels_channel(id) on delete cascade,
contact_id integer NOT NULL references contacts_contact(id) on delete cascade,
contact_urn_id integer NULL references contacts_contacturn(id) on delete cascade,
org_id integer NOT NULL references orgs_org(id) on delete cascade,
flow_id integer NULL references flows_flow(id) on delete cascade,
channel_id integer REFERENCES channels_channel(id),
contact_id integer NOT NULL REFERENCES contacts_contact(id),
contact_urn_id integer NULL REFERENCES contacts_contacturn(id),
org_id integer NOT NULL REFERENCES orgs_org(id),
flow_id integer NULL REFERENCES flows_flow(id),
metadata text
);

Expand All @@ -112,31 +119,31 @@ CREATE TABLE msgs_broadcast (
text hstore NOT NULL,
created_on timestamp with time zone NOT NULL,
schedule_id int NULL,
org_id integer NOT NULL references orgs_org(id) on delete cascade
org_id integer NOT NULL REFERENCES orgs_org(id)
);

CREATE TABLE msgs_broadcast_contacts (
id serial primary key,
broadcast_id integer NOT NULL,
contact_id integer NOT NULL
broadcast_id integer NOT NULL REFERENCES msgs_broadcast(id),
contact_id integer NOT NULL REFERENCES contacts_contact(id)
);

CREATE TABLE msgs_broadcast_groups (
id serial primary key,
broadcast_id integer NOT NULL,
group_id integer NOT NULL
broadcast_id integer NOT NULL REFERENCES msgs_broadcast(id),
contactgroup_id integer NOT NULL REFERENCES contacts_contactgroup(id)
);

CREATE TABLE msgs_broadcast_urns (
id serial primary key,
broadcast_id integer NOT NULL,
contacturn_id integer NOT NULL
broadcast_id integer NOT NULL REFERENCES msgs_broadcast(id),
contacturn_id integer NOT NULL REFERENCES contacts_contacturn(id)
);

CREATE TABLE msgs_broadcastmsgcount (
id serial primary key,
count integer NOT NULL,
broadcast_id integer NOT NULL
broadcast_id integer NOT NULL REFERENCES msgs_broadcast(id)
);

CREATE TABLE msgs_label (
Expand All @@ -147,28 +154,59 @@ CREATE TABLE msgs_label (

CREATE TABLE msgs_msg_labels (
id serial primary key,
msg_id integer NOT NULL references msgs_msg(id),
label_id integer NOT NULL
msg_id integer NOT NULL REFERENCES msgs_msg(id),
label_id integer NOT NULL REFERENCES msgs_label(id)
);

CREATE TABLE auth_user (
id serial primary key,
username character varying(128) NOT NULL
);

CREATE TABLE ivr_call (
id serial primary key,
org_id integer NOT NULL REFERENCES orgs_org(id),
created_on timestamp with time zone NOT NULL
);

CREATE TABLE flows_flowstart (
id serial primary key,
org_id integer NOT NULL REFERENCES orgs_org(id),
created_on timestamp with time zone NOT NULL
);

CREATE TABLE flows_flowstart_contacts (
id serial primary key,
flowstart_id integer NOT NULL REFERENCES flows_flowstart(id),
contact_id integer NOT NULL REFERENCES contacts_contact(id)
);

CREATE TABLE flows_flowstart_groups (
id serial primary key,
flowstart_id integer NOT NULL REFERENCES flows_flowstart(id),
contactgroup_id integer NOT NULL REFERENCES contacts_contactgroup(id)
);

CREATE TABLE flows_flowstart_calls (
id serial primary key,
flowstart_id integer NOT NULL REFERENCES flows_flowstart(id),
call_id integer NOT NULL REFERENCES ivr_call(id)
);

CREATE TABLE flows_flowrun (
id serial primary key,
uuid uuid NOT NULL UNIQUE,
responded boolean NOT NULL,
contact_id integer NOT NULL references contacts_contact(id),
flow_id integer NOT NULL references flows_flow(id),
org_id integer NOT NULL references orgs_org(id),
contact_id integer NOT NULL REFERENCES contacts_contact(id),
flow_id integer NOT NULL REFERENCES flows_flow(id),
start_id integer NULL REFERENCES flows_flowstart(id),
org_id integer NOT NULL REFERENCES orgs_org(id),
results text NOT NULL,
path text NOT NULL,
created_on timestamp with time zone NOT NULL,
modified_on timestamp with time zone NOT NULL,
exited_on timestamp with time zone NULL,
submitted_by_id integer NULL references auth_user(id),
submitted_by_id integer NULL REFERENCES auth_user(id),
status varchar(1) NOT NULL,
delete_from_results boolean
);
Expand Down Expand Up @@ -235,22 +273,22 @@ INSERT INTO contacts_contacturn(id, contact_id, scheme, org_id, priority, path,
(10, 9, 'facebook', 2, 90, 1000001, 'funguy', 'facebook:1000001'),
(11, 10, 'twitterid', 2, 90, 1000001, 'fungal', 'twitterid:1000001');

INSERT INTO contacts_contactgroup(id, uuid, name) VALUES
(1, '4ea0f313-2f62-4e57-bdf0-232b5191dd57', 'Group 1'),
(2, '4c016340-468d-4675-a974-15cb7a45a5ab', 'Group 2'),
(3, 'e61b5bf7-8ddf-4e05-b0a8-4c46a6b68cff', 'Group 3'),
(4, '529bac39-550a-4d6f-817c-1833f3449007', 'Group 4');
INSERT INTO contacts_contactgroup(id, uuid, org_id, name) VALUES
(1, '4ea0f313-2f62-4e57-bdf0-232b5191dd57', 2, 'Group 1'),
(2, '4c016340-468d-4675-a974-15cb7a45a5ab', 2, 'Group 2'),
(3, 'e61b5bf7-8ddf-4e05-b0a8-4c46a6b68cff', 2, 'Group 3'),
(4, '529bac39-550a-4d6f-817c-1833f3449007', 2, 'Group 4');

INSERT INTO contacts_contactgroup_contacts(id, contact_id, contactgroup_id) VALUES
(1, 1, 1),
(3, 1, 4),
(4, 3, 4);

INSERT INTO flows_flow(id, uuid, name) VALUES
(1, '6639286a-9120-45d4-aa39-03ae3942a4a6', 'Flow 1'),
(2, '629db399-a5fb-4fa0-88e6-f479957b63d2', 'Flow 2'),
(3, '3914b88e-625b-4603-bd9f-9319dc331c6b', 'Flow 3'),
(4, 'cfa2371d-2f06-481d-84b2-d974f3803bb0', 'Flow 4');
INSERT INTO flows_flow(id, uuid, org_id, name) VALUES
(1, '6639286a-9120-45d4-aa39-03ae3942a4a6', 2, 'Flow 1'),
(2, '629db399-a5fb-4fa0-88e6-f479957b63d2', 2, 'Flow 2'),
(3, '3914b88e-625b-4603-bd9f-9319dc331c6b', 2, 'Flow 3'),
(4, 'cfa2371d-2f06-481d-84b2-d974f3803bb0', 2, 'Flow 4');

INSERT INTO msgs_broadcast(id, text, created_on, org_id, schedule_id) VALUES
(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', 2, 1),
Expand Down Expand Up @@ -290,23 +328,38 @@ INSERT INTO channels_channellog(id, msg_id) VALUES
INSERT INTO auth_user(id, username) VALUES
(1, '[email protected]');

INSERT INTO flows_flowrun(id, uuid, responded, contact_id, flow_id, org_id, results, path, created_on, modified_on, exited_on, status, submitted_by_id) VALUES
(1, '4ced1260-9cfe-4b7f-81dd-b637108f15b9', TRUE, 6, 1, 2, '{}', '[]', '2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', NULL),
INSERT INTO ivr_call(id, org_id, created_on) VALUES
(1, 2, NOW());

INSERT INTO flows_flowstart(id, org_id, created_on) VALUES
(1, 2, NOW());

INSERT INTO flows_flowstart_contacts(flowstart_id, contact_id) VALUES
(1, 6);

INSERT INTO flows_flowstart_groups(flowstart_id, contactgroup_id) VALUES
(1, 1);

INSERT INTO flows_flowstart_calls(flowstart_id, call_id) VALUES
(1, 1);

INSERT INTO flows_flowrun(id, uuid, responded, contact_id, flow_id, org_id, results, path, created_on, modified_on, exited_on, status, submitted_by_id, start_id) VALUES
(1, '4ced1260-9cfe-4b7f-81dd-b637108f15b9', TRUE, 6, 1, 2, '{}', '[]', '2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', NULL, 1),
(2, '7d68469c-0494-498a-bdf3-bac68321fd6d', TRUE, 6, 1, 2,
'{"agree": {"category": "Strongly agree", "node_uuid": "a0434c54-3e26-4eb0-bafc-46cdeaf435ac", "name": "Do you agree?", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}}',
'[{"uuid": "c3d0b417-db75-417c-8050-33776ec8f620", "node_uuid": "10896d63-8df7-4022-88dd-a9d93edf355b", "arrived_on": "2017-08-12T15:07:24.049815+02:00", "exit_uuid": "2f890507-2ad2-4bd1-92fc-0ca031155fca"}]',
'2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', NULL),
'2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00','2017-08-12 21:11:59.890662+02:00', 'C', NULL, NULL),
(3, 'de782b35-a398-46ed-8550-34c66053841b', TRUE, 7, 2, 3,
'{"agree": {"category": "Strongly agree", "node_uuid": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "name": "Agree", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}, "confirm_agree": {"category": "Confirmed Strongly agree", "node_uuid": "a0434c54-3e26-4eb0-bafc-46cdeaf435ab", "name": "Do you agree?", "value": "A", "created_on": "2017-05-03T12:25:21.714339+00:00", "input": "A"}}',
'[{"uuid": "600ac5b4-4895-4161-ad97-6e2f1bb48bcb", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-08-12T15:07:24.049815+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}]',
'2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00', 'C', 1),
'2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00', 'C', 1, NULL),
(4, '329a5d24-64fc-479c-8d24-9674c9b46530', TRUE, 7, 2, 3,
'{"agree": {"category": "Disagree", "node_uuid": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "name": "Agree", "value": "B", "created_on": "2017-10-10T12:25:21.714339+00:00", "input": "B"}}',
'[{"uuid": "babf4fc8-e12c-4bb9-a9dd-61178a118b5a", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-10-12T15:07:24.049815+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}]',
'2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', NULL),
(5, 'abed67d2-06b8-4749-8bb9-ecda037b673b', TRUE, 7, 2, 3, '{}', '[]', '2017-10-10 21:11:59.890663+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', NULL),
(6, '6262eefe-a6e9-4201-9b76-a7f25e3b7f29', TRUE, 7, 2, 3, '{}', '[]', '2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00', 'C', NULL),
(7, '6c0d7db9-076b-4edc-ab4b-38576ae394fc', TRUE, 7, 2, 2, '{}', '[]', '2017-08-13 13:11:59.890662+02:00','2017-08-14 16:11:59.890662+02:00', NULL, 'W', NULL);
'2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', NULL, NULL),
(5, 'abed67d2-06b8-4749-8bb9-ecda037b673b', TRUE, 7, 2, 3, '{}', '[]', '2017-10-10 21:11:59.890663+02:00','2017-10-10 21:11:59.890662+02:00','2017-10-10 21:11:59.890662+02:00', 'C', NULL, NULL),
(6, '6262eefe-a6e9-4201-9b76-a7f25e3b7f29', TRUE, 7, 2, 3, '{}', '[]', '2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00', 'C', NULL, NULL),
(7, '6c0d7db9-076b-4edc-ab4b-38576ae394fc', TRUE, 7, 2, 2, '{}', '[]', '2017-08-13 13:11:59.890662+02:00','2017-08-14 16:11:59.890662+02:00', NULL, 'W', NULL, NULL);

-- update run #5 to have a path longer than 500 steps
UPDATE flows_flowrun SET path = s.path FROM (
Expand Down