-
Notifications
You must be signed in to change notification settings - Fork 512
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1019 from endophage/changefeed
Changefeed implementation
- Loading branch information
Showing
22 changed files
with
1,116 additions
and
90 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
CREATE TABLE `change_category` ( | ||
`category` VARCHAR(20) NOT NULL, | ||
PRIMARY KEY (`category`) | ||
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | ||
|
||
INSERT INTO `change_category` VALUES ("update"), ("deletion"); | ||
|
||
CREATE TABLE `changefeed` ( | ||
`id` int(11) NOT NULL AUTO_INCREMENT, | ||
`created_at` timestamp DEFAULT CURRENT_TIMESTAMP, | ||
`gun` varchar(255) NOT NULL, | ||
`version` int(11) NOT NULL, | ||
`sha256` CHAR(64) DEFAULT NULL, | ||
`category` VARCHAR(20) NOT NULL DEFAULT "update", | ||
PRIMARY KEY (`id`), | ||
FOREIGN KEY (`category`) REFERENCES `change_category` (`category`), | ||
INDEX `idx_changefeed_gun` (`gun`) | ||
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | ||
|
||
INSERT INTO `changefeed` ( | ||
`created_at`, | ||
`gun`, | ||
`version`, | ||
`sha256` | ||
) (SELECT | ||
`created_at`, | ||
`gun`, | ||
`version`, | ||
`sha256` | ||
FROM | ||
`tuf_files` | ||
WHERE | ||
`role` = "timestamp" | ||
ORDER BY | ||
`created_at` ASC | ||
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
CREATE TABLE "change_category" ( | ||
"category" VARCHAR(20) PRIMARY KEY | ||
); | ||
|
||
INSERT INTO "change_category" VALUES ('update'), ('deletion'); | ||
|
||
CREATE TABLE "changefeed" ( | ||
"id" serial PRIMARY KEY, | ||
"created_at" timestamp DEFAULT CURRENT_TIMESTAMP, | ||
"gun" varchar(255) NOT NULL, | ||
"version" integer NOT NULL, | ||
"sha256" CHAR(64) DEFAULT NULL, | ||
"category" VARCHAR(20) NOT NULL DEFAULT 'update' REFERENCES "change_category" | ||
); | ||
|
||
CREATE INDEX "idx_changefeed_gun" ON "changefeed" ("gun"); | ||
|
||
INSERT INTO "changefeed" ( | ||
"created_at", | ||
"gun", | ||
"version", | ||
"sha256" | ||
) (SELECT | ||
"created_at", | ||
"gun", | ||
"version", | ||
"sha256" | ||
FROM | ||
"tuf_files" | ||
WHERE | ||
"role" = 'timestamp' | ||
ORDER BY | ||
"created_at" ASC | ||
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package handlers | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"net/http" | ||
"strconv" | ||
|
||
ctxu "github.com/docker/distribution/context" | ||
"github.com/gorilla/mux" | ||
"golang.org/x/net/context" | ||
|
||
"github.com/docker/notary" | ||
"github.com/docker/notary/server/errors" | ||
"github.com/docker/notary/server/storage" | ||
) | ||
|
||
type changefeedResponse struct { | ||
NumberOfRecords int `json:"count"` | ||
Records []storage.Change `json:"records"` | ||
} | ||
|
||
// Changefeed returns a list of changes according to the provided filters | ||
func Changefeed(ctx context.Context, w http.ResponseWriter, r *http.Request) error { | ||
var ( | ||
vars = mux.Vars(r) | ||
logger = ctxu.GetLogger(ctx) | ||
qs = r.URL.Query() | ||
imageName = vars["imageName"] | ||
changeID = qs.Get("change_id") | ||
store, records, err = checkChangefeedInputs(logger, ctx.Value(notary.CtxKeyMetaStore), qs.Get("records")) | ||
) | ||
if err != nil { | ||
// err already logged and in correct format. | ||
return err | ||
} | ||
out, err := changefeed(logger, store, imageName, changeID, records) | ||
if err == nil { | ||
w.Write(out) | ||
} | ||
return err | ||
} | ||
|
||
func changefeed(logger ctxu.Logger, store storage.MetaStore, imageName, changeID string, records int64) ([]byte, error) { | ||
changes, err := store.GetChanges(changeID, int(records), imageName) | ||
if err != nil { | ||
logger.Errorf("500 GET could not retrieve records: %s", err.Error()) | ||
return nil, errors.ErrUnknown.WithDetail(err) | ||
} | ||
out, err := json.Marshal(&changefeedResponse{ | ||
NumberOfRecords: len(changes), | ||
Records: changes, | ||
}) | ||
if err != nil { | ||
logger.Error("500 GET could not json.Marshal changefeedResponse") | ||
return nil, errors.ErrUnknown.WithDetail(err) | ||
} | ||
return out, nil | ||
} | ||
|
||
func checkChangefeedInputs(logger ctxu.Logger, s interface{}, r string) ( | ||
store storage.MetaStore, pageSize int64, err error) { | ||
|
||
store, ok := s.(storage.MetaStore) | ||
if !ok { | ||
logger.Error("500 GET unable to retrieve storage") | ||
err = errors.ErrNoStorage.WithDetail(nil) | ||
return | ||
} | ||
pageSize, err = strconv.ParseInt(r, 10, 32) | ||
if err != nil { | ||
logger.Errorf("400 GET invalid pageSize: %s", r) | ||
err = errors.ErrInvalidParams.WithDetail( | ||
fmt.Sprintf("invalid records parameter: %s", err.Error()), | ||
) | ||
return | ||
} | ||
if pageSize == 0 { | ||
pageSize = notary.DefaultPageSize | ||
} | ||
return | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package handlers | ||
|
||
import ( | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/Sirupsen/logrus" | ||
ctxu "github.com/docker/distribution/context" | ||
"github.com/docker/notary" | ||
"github.com/docker/notary/server/storage" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type changefeedArgs struct { | ||
logger ctxu.Logger | ||
store storage.MetaStore | ||
imageName string | ||
changeID string | ||
pageSize int64 | ||
} | ||
|
||
type changefeedTest struct { | ||
name string | ||
args changefeedArgs | ||
want []byte | ||
wantErr bool | ||
} | ||
|
||
func Test_changefeed(t *testing.T) { | ||
s := storage.NewMemStorage() | ||
|
||
tests := []changefeedTest{ | ||
{ | ||
name: "Empty Store", | ||
args: changefeedArgs{ | ||
logger: logrus.New(), | ||
store: s, | ||
imageName: "", | ||
changeID: "0", | ||
pageSize: notary.DefaultPageSize, | ||
}, | ||
want: []byte("{\"count\":0,\"records\":null}"), | ||
wantErr: false, | ||
}, | ||
{ | ||
name: "Bad ChangeID", | ||
args: changefeedArgs{ | ||
logger: logrus.New(), | ||
store: s, | ||
imageName: "", | ||
changeID: "not_a_number", | ||
pageSize: notary.DefaultPageSize, | ||
}, | ||
want: nil, | ||
wantErr: true, | ||
}, | ||
} | ||
runChangefeedTests(t, tests) | ||
|
||
} | ||
|
||
func runChangefeedTests(t *testing.T, tests []changefeedTest) { | ||
for _, tt := range tests { | ||
got, err := changefeed(tt.args.logger, tt.args.store, tt.args.imageName, tt.args.changeID, tt.args.pageSize) | ||
if tt.wantErr { | ||
require.Error(t, err, | ||
"%q. changefeed() error = %v, wantErr %v", tt.name, err, tt.wantErr) | ||
} | ||
require.True(t, reflect.DeepEqual(got, tt.want), | ||
"%q. changefeed() = %v, want %v", tt.name, string(got), string(tt.want)) | ||
} | ||
} | ||
|
||
func Test_checkChangefeedInputs(t *testing.T) { | ||
type args struct { | ||
logger ctxu.Logger | ||
s interface{} | ||
ps string | ||
} | ||
s := storage.NewMemStorage() | ||
tests := []struct { | ||
name string | ||
args args | ||
wantStore storage.MetaStore | ||
wantPageSize int64 | ||
wantErr bool | ||
}{ | ||
// Error cases | ||
{ | ||
name: "No MetaStore", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: nil, | ||
}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "Bad page size", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: s, | ||
ps: "not_a_number", | ||
}, | ||
wantErr: true, | ||
wantStore: s, | ||
}, | ||
{ | ||
name: "Zero page size", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: s, | ||
ps: "0", | ||
}, | ||
wantStore: s, | ||
wantPageSize: notary.DefaultPageSize, | ||
}, | ||
{ | ||
name: "Non-zero Page Size", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: s, | ||
ps: "10", | ||
}, | ||
wantStore: s, | ||
wantPageSize: 10, | ||
}, | ||
{ | ||
name: "Reversed \"false\"", | ||
args: args{ | ||
logger: logrus.New(), | ||
s: s, | ||
ps: "-10", | ||
}, | ||
wantStore: s, | ||
wantPageSize: -10, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
gotStore, gotPageSize, err := checkChangefeedInputs(tt.args.logger, tt.args.s, tt.args.ps) | ||
if tt.wantErr { | ||
require.Error(t, err, | ||
"%q. checkChangefeedInputs() error = %v, wantErr %v", tt.name, err, tt.wantErr) | ||
} | ||
require.True(t, reflect.DeepEqual(gotStore, tt.wantStore), | ||
"%q. checkChangefeedInputs() gotStore = %v, want %v", tt.name, gotStore, tt.wantStore) | ||
|
||
require.Equal(t, tt.wantPageSize, gotPageSize, | ||
"%q. checkChangefeedInputs() gotPageSize = %v, want %v", tt.name, gotPageSize, tt.wantPageSize) | ||
|
||
} | ||
} |
Oops, something went wrong.