-
Notifications
You must be signed in to change notification settings - Fork 512
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changefeed implementation #1019
Changes from all commits
6becde9
3638ba3
11c1966
237c29a
d4896db
efdf807
4552ede
7b72ad6
8a55490
5af2d0e
c48e50e
285172d
d1440f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
CREATE TABLE `change_category` ( | ||
`category` VARCHAR(20) NOT NULL, | ||
PRIMARY KEY (`category`) | ||
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | ||
|
||
INSERT INTO `change_category` VALUES ("update"), ("deletion"); | ||
|
||
CREATE TABLE `changefeed` ( | ||
`id` int(11) NOT NULL AUTO_INCREMENT, | ||
`created_at` timestamp DEFAULT CURRENT_TIMESTAMP, | ||
`gun` varchar(255) NOT NULL, | ||
`version` int(11) NOT NULL, | ||
`sha256` CHAR(64) DEFAULT NULL, | ||
`category` VARCHAR(20) NOT NULL DEFAULT "update", | ||
PRIMARY KEY (`id`), | ||
FOREIGN KEY (`category`) REFERENCES `change_category` (`category`), | ||
INDEX `idx_changefeed_gun` (`gun`) | ||
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | ||
|
||
INSERT INTO `changefeed` ( | ||
`created_at`, | ||
`gun`, | ||
`version`, | ||
`sha256` | ||
) (SELECT | ||
`created_at`, | ||
`gun`, | ||
`version`, | ||
`sha256` | ||
FROM | ||
`tuf_files` | ||
WHERE | ||
`role` = "timestamp" | ||
ORDER BY | ||
`created_at` ASC | ||
); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
CREATE TABLE "change_category" ( | ||
"category" VARCHAR(20) PRIMARY KEY | ||
); | ||
|
||
INSERT INTO "change_category" VALUES ('update'), ('deletion'); | ||
|
||
CREATE TABLE "changefeed" ( | ||
"id" serial PRIMARY KEY, | ||
"created_at" timestamp DEFAULT CURRENT_TIMESTAMP, | ||
"gun" varchar(255) NOT NULL, | ||
"version" integer NOT NULL, | ||
"sha256" CHAR(64) DEFAULT NULL, | ||
"category" VARCHAR(20) NOT NULL DEFAULT 'update' REFERENCES "change_category" | ||
); | ||
|
||
CREATE INDEX "idx_changefeed_gun" ON "changefeed" ("gun"); | ||
|
||
INSERT INTO "changefeed" ( | ||
"created_at", | ||
"gun", | ||
"version", | ||
"sha256" | ||
) (SELECT | ||
"created_at", | ||
"gun", | ||
"version", | ||
"sha256" | ||
FROM | ||
"tuf_files" | ||
WHERE | ||
"role" = 'timestamp' | ||
ORDER BY | ||
"created_at" ASC | ||
); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package handlers | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"net/http" | ||
"strconv" | ||
|
||
ctxu "github.com/docker/distribution/context" | ||
"github.com/gorilla/mux" | ||
"golang.org/x/net/context" | ||
|
||
"github.com/docker/notary" | ||
"github.com/docker/notary/server/errors" | ||
"github.com/docker/notary/server/storage" | ||
) | ||
|
||
type changefeedResponse struct { | ||
NumberOfRecords int `json:"count"` | ||
Records []storage.Change `json:"records"` | ||
} | ||
|
||
// Changefeed returns a list of changes according to the provided filters | ||
func Changefeed(ctx context.Context, w http.ResponseWriter, r *http.Request) error { | ||
var ( | ||
vars = mux.Vars(r) | ||
logger = ctxu.GetLogger(ctx) | ||
qs = r.URL.Query() | ||
imageName = vars["imageName"] | ||
changeID = qs.Get("change_id") | ||
store, records, err = checkChangefeedInputs(logger, ctx.Value(notary.CtxKeyMetaStore), qs.Get("records")) | ||
) | ||
if err != nil { | ||
// err already logged and in correct format. | ||
return err | ||
} | ||
out, err := changefeed(logger, store, imageName, changeID, records) | ||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
w.Write(out) | ||
} | ||
return err | ||
} | ||
|
||
func changefeed(logger ctxu.Logger, store storage.MetaStore, imageName, changeID string, records int64) ([]byte, error) { | ||
changes, err := store.GetChanges(changeID, int(records), imageName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little worried about the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I parse it from the query string I specifically parse as a 32 bit. It's not great but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah right, I see the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could, I don't have any particular reason why it ended up being done here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The build is has been so flaky the last 48 hours I'm mildly afraid of making any more changes. I've filed issues to look at CI. Unless you feel really strongly about this I'm not going to change it with the tight timeline. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nothing more at present , I think we can do it in the followed PRs if we would like to change anything. Just go and get some sleep lol |
||
if err != nil { | ||
logger.Errorf("500 GET could not retrieve records: %s", err.Error()) | ||
return nil, errors.ErrUnknown.WithDetail(err) | ||
} | ||
out, err := json.Marshal(&changefeedResponse{ | ||
NumberOfRecords: len(changes), | ||
Records: changes, | ||
}) | ||
if err != nil { | ||
logger.Error("500 GET could not json.Marshal changefeedResponse") | ||
return nil, errors.ErrUnknown.WithDetail(err) | ||
} | ||
return out, nil | ||
} | ||
|
||
func checkChangefeedInputs(logger ctxu.Logger, s interface{}, r string) ( | ||
store storage.MetaStore, pageSize int64, err error) { | ||
|
||
store, ok := s.(storage.MetaStore) | ||
if !ok { | ||
logger.Error("500 GET unable to retrieve storage") | ||
err = errors.ErrNoStorage.WithDetail(nil) | ||
return | ||
} | ||
pageSize, err = strconv.ParseInt(r, 10, 32) | ||
if err != nil { | ||
logger.Errorf("400 GET invalid pageSize: %s", r) | ||
err = errors.ErrInvalidParams.WithDetail( | ||
fmt.Sprintf("invalid records parameter: %s", err.Error()), | ||
) | ||
return | ||
} | ||
if pageSize == 0 { | ||
pageSize = notary.DefaultPageSize | ||
} | ||
return | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package handlers | ||
|
||
import ( | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/Sirupsen/logrus" | ||
ctxu "github.com/docker/distribution/context" | ||
"github.com/docker/notary" | ||
"github.com/docker/notary/server/storage" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type changefeedArgs struct { | ||
logger ctxu.Logger | ||
store storage.MetaStore | ||
imageName string | ||
changeID string | ||
pageSize int64 | ||
} | ||
|
||
type changefeedTest struct { | ||
name string | ||
args changefeedArgs | ||
want []byte | ||
wantErr bool | ||
} | ||
|
||
func Test_changefeed(t *testing.T) { | ||
s := storage.NewMemStorage() | ||
|
||
tests := []changefeedTest{ | ||
{ | ||
name: "Empty Store", | ||
args: changefeedArgs{ | ||
logger: logrus.New(), | ||
store: s, | ||
imageName: "", | ||
changeID: "0", | ||
pageSize: notary.DefaultPageSize, | ||
}, | ||
want: []byte("{\"count\":0,\"records\":null}"), | ||
wantErr: false, | ||
}, | ||
{ | ||
name: "Bad ChangeID", | ||
args: changefeedArgs{ | ||
logger: logrus.New(), | ||
store: s, | ||
imageName: "", | ||
changeID: "not_a_number", | ||
pageSize: notary.DefaultPageSize, | ||
}, | ||
want: nil, | ||
wantErr: true, | ||
}, | ||
} | ||
runChangefeedTests(t, tests) | ||
|
||
} | ||
|
||
func runChangefeedTests(t *testing.T, tests []changefeedTest) { | ||
for _, tt := range tests { | ||
got, err := changefeed(tt.args.logger, tt.args.store, tt.args.imageName, tt.args.changeID, tt.args.pageSize) | ||
if tt.wantErr { | ||
require.Error(t, err, | ||
"%q. changefeed() error = %v, wantErr %v", tt.name, err, tt.wantErr) | ||
} | ||
require.True(t, reflect.DeepEqual(got, tt.want), | ||
"%q. changefeed() = %v, want %v", tt.name, string(got), string(tt.want)) | ||
} | ||
} | ||
|
||
func Test_checkChangefeedInputs(t *testing.T) { | ||
type args struct { | ||
logger ctxu.Logger | ||
s interface{} | ||
ps string | ||
} | ||
s := storage.NewMemStorage() | ||
tests := []struct { | ||
name string | ||
args args | ||
wantStore storage.MetaStore | ||
wantPageSize int64 | ||
wantErr bool | ||
}{ | ||
// Error cases | ||
{ | ||
name: "No MetaStore", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: nil, | ||
}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "Bad page size", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: s, | ||
ps: "not_a_number", | ||
}, | ||
wantErr: true, | ||
wantStore: s, | ||
}, | ||
{ | ||
name: "Zero page size", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: s, | ||
ps: "0", | ||
}, | ||
wantStore: s, | ||
wantPageSize: notary.DefaultPageSize, | ||
}, | ||
{ | ||
name: "Non-zero Page Size", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: s, | ||
ps: "10", | ||
}, | ||
wantStore: s, | ||
wantPageSize: 10, | ||
}, | ||
{ | ||
name: "Reversed \"false\"", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: s, | ||
ps: "-10", | ||
}, | ||
wantStore: s, | ||
wantPageSize: -10, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
gotStore, gotPageSize, err := checkChangefeedInputs(tt.args.logger, tt.args.s, tt.args.ps) | ||
if tt.wantErr { | ||
require.Error(t, err, | ||
"%q. checkChangefeedInputs() error = %v, wantErr %v", tt.name, err, tt.wantErr) | ||
} | ||
require.True(t, reflect.DeepEqual(gotStore, tt.wantStore), | ||
"%q. checkChangefeedInputs() gotStore = %v, want %v", tt.name, gotStore, tt.wantStore) | ||
|
||
require.Equal(t, tt.wantPageSize, gotPageSize, | ||
"%q. checkChangefeedInputs() gotPageSize = %v, want %v", tt.name, gotPageSize, tt.wantPageSize) | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about putting this import line along with those
github
package.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, actually when I went and looked at it, this is in the right place. We've been nominally (and we should document it somewhere) stealing Python's PEP8 import formatting, which requires imports be broken into 3 blocks of:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, then we might need to make another PR to do the cleanup of the whole project, I've supposed it was like: