Skip to content

Commit

Permalink
changefeed code, tests to follow
Browse files Browse the repository at this point in the history
Signed-off-by: David Lawrence <[email protected]> (github: endophage)
  • Loading branch information
David Lawrence committed Oct 28, 2016
1 parent 3ca7b72 commit a437971
Show file tree
Hide file tree
Showing 13 changed files with 399 additions and 45 deletions.
3 changes: 3 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ const (
// and PrivNoExecPerms for file.
PrivExecPerms = 0700
PrivNoExecPerms = 0600

// DefaultPageSize is the default number of records to return from the changefeed
DefaultPageSize = 100
)

// enum to use for setting and retrieving values from contexts
Expand Down
8 changes: 8 additions & 0 deletions migrations/server/mysql/0005_changefeed.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE `changefeed` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`created_at` timestamp DEFAULT CURRENT_TIMESTAMP,
`gun` varchar(255) NOT NULL,
`version` int(11) NOT NULL,
`sha256` CHAR(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
7 changes: 7 additions & 0 deletions migrations/server/postgresql/0002_changefeed.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE "changefeed" (
"id" serial PRIMARY KEY,
"created_at" timestamp DEFAULT CURRENT_TIMESTAMP,
"gun" varchar(255) NOT NULL,
"version" integer NOT NULL,
"sha256" CHAR(64) DEFAULT NULL,
);
8 changes: 7 additions & 1 deletion server/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ var (
ErrInvalidGUN = errcode.Register(errGroup, errcode.ErrorDescriptor{
Value: "INVALID_GUN",
Message: "The server does not support actions on images of this name.",
Description: "The server does not support actions on images of this name",
Description: "The server does not support actions on images of this name.",
HTTPStatusCode: http.StatusBadRequest,
})
ErrInvalidParams = errcode.Register(errGroup, errcode.ErrorDescriptor{
Value: "INVALID_PARAMETERS",
Message: "The parameters provided are not valid.",
Description: "The parameters provided are not valid.",
HTTPStatusCode: http.StatusBadRequest,
})
ErrUnknown = errcode.ErrorCodeUnknown
Expand Down
75 changes: 75 additions & 0 deletions server/handlers/changefeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package handlers

import (
"encoding/json"
"net/http"
"strconv"
"strings"

ctxu "github.com/docker/distribution/context"
"golang.org/x/net/context"

"github.com/docker/notary"
"github.com/docker/notary/server/errors"
"github.com/docker/notary/server/storage"
)

type changefeedResponse struct {
NumberOfRecords int `json:"count"`
Records []storage.Change `json:"records"`
}

// Changefeed returns a list of changes according to the provided filters
func Changefeed(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
logger := ctxu.GetLogger(ctx)
s := ctx.Value(notary.CtxKeyMetaStore)
store, ok := s.(storage.MetaStore)
if !ok {
logger.Error("500 GET unable to retrieve storage")
return errors.ErrNoStorage.WithDetail(nil)
}

qs := r.URL.Query()
imageName := qs.Get("filter")
if imageName == "" {
// no image name means global feed
imageName = "*"
}
pageSizeStr := qs.Get("pageSize")
pageSize, err := strconv.ParseInt(pageSizeStr, 10, 32)
if err != nil {
logger.Errorf("400 GET invalid pageSize: %s", pageSizeStr)
return errors.ErrInvalidParams.WithDetail("invalid pageSize parameter, must be an integer >= 0")
}
if pageSize == 0 {
pageSize = notary.DefaultPageSize
}

changeID := qs.Get("changeID")
reversedStr := qs.Get("reversed")
reversed := reversedStr == "1" || strings.ToLower(reversedStr) == "true"

changes, err := store.GetChanges(changeID, int(pageSize), reversed)
if err != nil {
logger.Errorf("500 GET could not retrieve records: %s", err.Error())
return errors.ErrUnknown.WithDetail(err)
}

// if reversed, we need to flip the list order so oldest is first
if reversed {
for i, j := 0, len(changes)-1; i < j; i, j = i+1, j-1 {
changes[i], changes[j] = changes[j], changes[i]
}
}

out, err := json.Marshal(&changefeedResponse{
NumberOfRecords: len(changes),
Records: changes,
})
if err != nil {
logger.Error("500 GET could not json.Marshal changefeedResponse")
return errors.ErrUnknown.WithDetail(err)
}
w.Write(out)
return nil
}
22 changes: 18 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ type _serverEndpoint struct {
IncludeCacheHeaders bool
CacheControlConfig utils.CacheControlConfig
PermissionsRequired []string
// ImageNameAt defaults to 0, which will cause the image name to be looked
// up in the URL segment. Therefore only need to set it when in some other
// place.
ImageNameAt utils.ImageNameLocation
SkipFilterPrefixes bool
}

// RootHandler returns the handler that routes all the paths from / for the
Expand All @@ -130,19 +135,21 @@ func RootHandler(ctx context.Context, ac auth.AccessController, trust signed.Cry

createHandler := func(opts _serverEndpoint) http.Handler {
var wrapped http.Handler
wrapped = authWrapper(opts.ServerHandler, opts.PermissionsRequired...)
wrapped = authWrapper(opts.ServerHandler, opts.ImageNameAt, opts.PermissionsRequired...)
if opts.IncludeCacheHeaders {
wrapped = utils.WrapWithCacheHandler(opts.CacheControlConfig, wrapped)
}
wrapped = filterImagePrefixes(repoPrefixes, opts.ErrorIfGUNInvalid, wrapped)
if !opts.SkipFilterPrefixes {
wrapped = filterImagePrefixes(repoPrefixes, opts.ErrorIfGUNInvalid, wrapped)
}
return prometheus.InstrumentHandlerWithOpts(prometheusOpts(opts.OperationName), wrapped)
}

invalidGUNErr := errors.ErrInvalidGUN.WithDetail(fmt.Sprintf("Require GUNs with prefix: %v", repoPrefixes))
notFoundError := errors.ErrMetadataNotFound.WithDetail(nil)

r := mux.NewRouter()
r.Methods("GET").Path("/v2/").Handler(authWrapper(handlers.MainHandler))
r.Methods("GET").Path("/v2/").Handler(authWrapper(handlers.MainHandler, utils.NoImageName))

r.Methods("POST").Path("/v2/{imageName:.*}/_trust/tuf/").Handler(createHandler(_serverEndpoint{
OperationName: "UpdateTUF",
Expand Down Expand Up @@ -186,11 +193,18 @@ func RootHandler(ctx context.Context, ac auth.AccessController, trust signed.Cry
ServerHandler: handlers.DeleteHandler,
PermissionsRequired: []string{"*"},
}))
r.Methods("GET").Path("/v2/changefeed/_trust").Handler(createHandler(_serverEndpoint{
OperationName: "Changefeed",
ServerHandler: handlers.Changefeed,
PermissionsRequired: []string{"pull"},
ImageNameAt: utils.ImageInQueryString,
SkipFilterPrefixes: true,
}))

r.Methods("GET").Path("/_notary_server/health").HandlerFunc(health.StatusHandler)
r.Methods("GET").Path("/metrics").Handler(prometheus.Handler())
r.Methods("GET", "POST", "PUT", "HEAD", "DELETE").Path("/{other:.*}").Handler(
authWrapper(handlers.NotFoundHandler))
authWrapper(handlers.NotFoundHandler, utils.NoImageName))

return r
}
10 changes: 10 additions & 0 deletions server/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ type ErrNoKey struct {
func (err ErrNoKey) Error() string {
return fmt.Sprintf("Error, no timestamp key found for %s", err.gun)
}

// ErrBadChangeID indicates the change ID provided by the user is not
// valid for some reason, i.e. it is out of bounds
type ErrBadChangeID struct {
id string
}

func (err ErrBadChangeID) Error() string {
return fmt.Sprintf("Error, the change ID \"%s\" is not valid", err.id)
}
24 changes: 24 additions & 0 deletions server/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,28 @@ type MetaStore interface {
// Delete removes all metadata for a given GUN. It does not return an
// error if no metadata exists for the given GUN.
Delete(gun string) error

// GetChanges returns an ordered slice of changes. It starts from
// the change matching changeID, but excludes this change from the results
// on the assumption that if a user provides an ID, they've seen that change.
// If changeID is 0, it starts from the
// beginning, and if changeID is -1, it starts from the most recent
// change. The number of results returned is limited by pageSize.
// Reversed indicates we are fetching pages going backwards in time, the
// default being to fetch pageSize from changeID going forwards in time.
GetChanges(changeID string, pageSize int, reversed bool) ([]Change, error)
}

// Change implements the minimal interface to get the change data.
type Change interface {
// ChangeID returns the unique ID for this update
ChangeID() string
// GUN returns the GUN for this update
GUN() string
// Version returns the timestamp version for the published update
Version() int
// Checksum returns the timestamp.json checksum for the published update
Checksum() string
// RecordedAt returns the time at which the update was recorded
RecordedAt() time.Time
}
57 changes: 57 additions & 0 deletions server/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"strings"
"sync"
"time"

"strconv"

"github.com/docker/notary/tuf/data"
)

type key struct {
Expand All @@ -31,13 +35,28 @@ func (k verList) Less(i, j int) bool {
return k[i].version < k[j].version
}

type change struct {
id int
gun string
ver int
checksum string
recordedAt time.Time
}

func (c change) ChangeID() string { return fmt.Sprintf("%d", c.id) }
func (c change) GUN() string { return c.gun }
func (c change) Version() int { return c.ver }
func (c change) Checksum() string { return c.checksum }
func (c change) RecordedAt() time.Time { return c.recordedAt }

// MemStorage is really just designed for dev and testing. It is very
// inefficient in many scenarios
type MemStorage struct {
lock sync.Mutex
tufMeta map[string]verList
keys map[string]map[string]*key
checksums map[string]map[string]ver
changes []Change
}

// NewMemStorage instantiates a memStorage instance
Expand Down Expand Up @@ -71,9 +90,25 @@ func (st *MemStorage) UpdateCurrent(gun string, update MetaUpdate) error {
st.checksums[gun] = make(map[string]ver)
}
st.checksums[gun][checksum] = version
if update.Role == data.CanonicalTimestampRole {
st.writeChange(gun, update.Version, checksum)
}
return nil
}

// writeChange must only be called by a function already holding a lock on
// the MemStorage. Behaviour is undefined otherwise
func (st *MemStorage) writeChange(gun string, version int, checksum string) {
c := change{
id: len(st.changes),
gun: gun,
ver: version,
checksum: checksum,
recordedAt: time.Now(),
}
st.changes = append(st.changes, c)
}

// UpdateMany updates multiple TUF records
func (st *MemStorage) UpdateMany(gun string, updates []MetaUpdate) error {
st.lock.Lock()
Expand Down Expand Up @@ -118,6 +153,9 @@ func (st *MemStorage) UpdateMany(gun string, updates []MetaUpdate) error {
st.checksums[gun] = make(map[string]ver)
}
st.checksums[gun][checksum] = version
if u.Role == data.CanonicalTimestampRole {
st.writeChange(gun, u.Version, checksum)
}
}
return nil
}
Expand Down Expand Up @@ -158,6 +196,25 @@ func (st *MemStorage) Delete(gun string) error {
return nil
}

// GetChanges returns a []Change starting from but excluding the record
// identified by changeID. In the context of the memory store, changeID
// is simply an index into st.changes. The ID of a change, and its index
// are equal, therefore, we want to return results starting at index
// changeID+1 to match the exclusivity of the interface definition.
func (st *MemStorage) GetChanges(changeID string, pageSize int, reversed bool) ([]Change, error) {
id, err := strconv.ParseInt(changeID, 10, 32)
size := len(st.changes)
if err != nil || size <= int(id) {
return nil, ErrBadChangeID{id: changeID}
}
start := int(id) + 1
end := start + pageSize
if end >= size {
return st.changes[start:], nil
}
return st.changes[start:end], nil
}

func entryKey(gun, role string) string {
return fmt.Sprintf("%s.%s", gun, role)
}
6 changes: 6 additions & 0 deletions server/storage/rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sort"
"time"
Expand Down Expand Up @@ -274,3 +275,8 @@ func (rdb RethinkDB) CheckHealth() error {
defer res.Close()
return nil
}

// GetChanges is not implemented for RethinkDB
func (rdb RethinkDB) GetChanges(changeID string, pageSize int, reversed bool) ([]Change, error) {
return nil, errors.New("Not Implemented")
}
42 changes: 37 additions & 5 deletions server/storage/sql_models.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package storage

import "github.com/jinzhu/gorm"
import (
"fmt"
"time"

"github.com/jinzhu/gorm"
)

// TUFFile represents a TUF file in the database
type TUFFile struct {
Expand All @@ -17,6 +22,30 @@ func (g TUFFile) TableName() string {
return "tuf_files"
}

// Changefeed defines the Change object for SQL databases
type Changefeed struct {
ID uint `gorm:"primary_key" sql:"not null"`
CreatedAt time.Time
Gun string `sql:"type:varchar(255);not null"`
Ver int `gorm:"column:version" sql:"not null"`
Sha256 string `sql:"type:varchar(64);"`
}

// ChangeID returns the unique ID for this update
func (c Changefeed) ChangeID() string { return fmt.Sprintf("%d", c.ID) }

// GUN returns the GUN for this update
func (c Changefeed) GUN() string { return c.Gun }

// Version returns the timestamp version for the published update
func (c Changefeed) Version() int { return c.Ver }

// Checksum returns the timestamp.json checksum for the published update
func (c Changefeed) Checksum() string { return c.Sha256 }

// RecordedAt returns the time at which the update was recorded
func (c Changefeed) RecordedAt() time.Time { return c.CreatedAt }

// CreateTUFTable creates the DB table for TUFFile
func CreateTUFTable(db gorm.DB) error {
// TODO: gorm
Expand All @@ -26,8 +55,11 @@ func CreateTUFTable(db gorm.DB) error {
}
query = db.Model(&TUFFile{}).AddUniqueIndex(
"idx_gun", "gun", "role", "version")
if query.Error != nil {
return query.Error
}
return nil
return query.Error
}

// CreateChangefeedTable creates the DB table for Changefeed
func CreateChangefeedTable(db gorm.DB) error {
query := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8").CreateTable(&Changefeed{})
return query.Error
}
Loading

0 comments on commit a437971

Please sign in to comment.