Skip to content

Commit

Permalink
OpenRTB Prebid Cache support (#276)
Browse files Browse the repository at this point in the history
* Added the cache API contract classes, and renamed some existing ones for consistency.

* Fixed some more bugs in the unmarshalling.

* Made a Prebid Cache client which saved the entire bid as a JSON value.

* Added a new test, and improved the cache implementation.

* Ran gofmt.

* Moved the GetBaseURL function onto the config.Cache struct, so that the new client can call it.

* Separated the prebid cache client dependencies from OpenRTB. Moved OpenRTB serialization into the exchange.

* Simplified cache implementation, because some of the graceful fallback behavior was actually untestable.

* Added a cache client to the exchange.

* basic default cache behavior.

* Cached the best bid from each bidder in each imp.

* Fixed tests and some bugs.

* Added unit tests for the auction state.

* Removed cache from the gitignore.

* Fixed some bugs, and added some more unit tests.

* Added error log for critical errors.

* Some code logic simplifications.

* Fixed another bug and added some more tests.

* Made the reserved cache time configurable.

* Ran gofmt.

* Fixed issue #199 in OpenRTB.

* Removed a few unused methods.

* Updated comment to be more accurate.

* Renamed hb_uuid to hb_cache_id, for consistency with today so that publishers dont need to update their creatives in DFP.

* Removed an unnecessary nil check.
  • Loading branch information
dbemiller authored Jan 17, 2018
1 parent f6b1da7 commit 21da662
Show file tree
Hide file tree
Showing 21 changed files with 823 additions and 96 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ build

# config files
pbs.*
cache.*
inventory_url.yaml

# autogenerated version file
Expand Down
27 changes: 19 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ type Cache struct {
Scheme string `mapstructure:"scheme"`
Host string `mapstructure:"host"`
Query string `mapstructure:"query"`

// A static timeout here is not ideal. This is a hack because we have some aggressive timelines for OpenRTB support.
// This value specifies how much time the prebid server host expects a call to prebid cache to take.
//
// OpenRTB allows the caller to specify the auction timeout. Prebid Server will subtract _this_ amount of time
// from the timeout it gives demand sources to respond.
//
// In reality, the cache response time will probably fluctuate with the traffic over time. Someday,
// this should be replaced by code which tracks the response time of recent cache calls and
// adjusts the time dynamically.
ExpectedTimeMillis int `mapstructure:"expected_millis"`
}

type Cookie struct {
Expand All @@ -144,17 +155,17 @@ func New() (*Configuration, error) {
}

//Allows for protocol relative URL if scheme is empty
func (cfg *Configuration) GetCacheBaseURL() string {
cfg.CacheURL.Scheme = strings.ToLower(cfg.CacheURL.Scheme)
if strings.Contains(cfg.CacheURL.Scheme, "https") {
return fmt.Sprintf("https://%s", cfg.CacheURL.Host)
func (cfg *Cache) GetBaseURL() string {
cfg.Scheme = strings.ToLower(cfg.Scheme)
if strings.Contains(cfg.Scheme, "https") {
return fmt.Sprintf("https://%s", cfg.Host)
}
if strings.Contains(cfg.CacheURL.Scheme, "http") {
return fmt.Sprintf("http://%s", cfg.CacheURL.Host)
if strings.Contains(cfg.Scheme, "http") {
return fmt.Sprintf("http://%s", cfg.Host)
}
return fmt.Sprintf("//%s", cfg.CacheURL.Host)
return fmt.Sprintf("//%s", cfg.Host)
}

func (cfg *Configuration) GetCachedAssetURL(uuid string) string {
return fmt.Sprintf("%s/cache?%s", cfg.GetCacheBaseURL(), strings.Replace(cfg.CacheURL.Query, "%PBS_CACHE_UUID%", uuid, 1))
return fmt.Sprintf("%s/cache?%s", cfg.CacheURL.GetBaseURL(), strings.Replace(cfg.CacheURL.Query, "%PBS_CACHE_UUID%", uuid, 1))
}
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestFullConfig(t *testing.T) {
cmpStrings(t, "datacache.password", cfg.DataCache.Password, "db2342")
cmpInts(t, "datacache.cache_size", cfg.DataCache.CacheSize, 10000000)
cmpInts(t, "datacache.ttl_seconds", cfg.DataCache.TTLSeconds, 3600)
cmpStrings(t, "", cfg.GetCacheBaseURL(), "http://prebidcache.net")
cmpStrings(t, "", cfg.CacheURL.GetBaseURL(), "http://prebidcache.net")
cmpStrings(t, "", cfg.GetCachedAssetURL("a0eebc99-9c0b-4ef8-bb00-6bb9bd380a11"), "http://prebidcache.net/cache?uuid=a0eebc99-9c0b-4ef8-bb00-6bb9bd380a11")
cmpStrings(t, "adapters.indexExchange.endpoint", cfg.Adapters["indexexchange"].Endpoint, "http://ixtest.com/api")
cmpStrings(t, "adapters.rubicon.endpoint", cfg.Adapters["rubicon"].Endpoint, "http://rubitest.com/api")
Expand Down
16 changes: 16 additions & 0 deletions endpoints/openrtb2/auction.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (deps *endpointDeps) Auction(w http.ResponseWriter, r *http.Request, _ http
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Critical error while running the auction: %v", err)
glog.Errorf("/openrtb2/auction Critical error: %v", err)
return
}

Expand Down Expand Up @@ -156,6 +157,10 @@ func (deps *endpointDeps) validateRequest(req *openrtb.BidRequest) error {
return err
}

if err := deps.validateBidRequestExt(req.Ext); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -288,6 +293,17 @@ func (deps *endpointDeps) validateImpExt(ext openrtb.RawJSON, impIndex int) erro
return nil
}

func (deps *endpointDeps) validateBidRequestExt(ext openrtb.RawJSON) error {
if len(ext) < 1 {
return nil
}
var tmpExt openrtb_ext.ExtRequest
if err := json.Unmarshal(ext, &tmpExt); err != nil {
return fmt.Errorf("request.ext is invalid: %v", err)
}
return nil
}

func (deps *endpointDeps) validateSite(site *openrtb.Site) error {
if site != nil && site.ID == "" && site.Page == "" {
return errors.New("request.site should include at least one of request.site.id or request.site.page.")
Expand Down
2 changes: 1 addition & 1 deletion endpoints/openrtb2/auction_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newDummyRequest() *http.Request {
func BenchmarkOpenrtbEndpoint(b *testing.B) {
server := httptest.NewServer(http.HandlerFunc(dummyServer))
defer server.Close()
endpoint, _ := NewEndpoint(exchange.NewExchange(server.Client(), &config.Configuration{}), &bidderParamValidator{}, empty_fetcher.EmptyFetcher(), &config.Configuration{MaxRequestSize: maxSize})
endpoint, _ := NewEndpoint(exchange.NewExchange(server.Client(), nil, &config.Configuration{}), &bidderParamValidator{}, empty_fetcher.EmptyFetcher(), &config.Configuration{MaxRequestSize: maxSize})

b.ResetTimer()
for n := 0; n < b.N; n++ {
Expand Down
31 changes: 30 additions & 1 deletion endpoints/openrtb2/auction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,18 @@ var validRequests = []string{
"appnexus": "good"
}
}
]
],
"ext": {
"prebid": {
"targeting": {
"pricegranularity": "low",
"lengthmax": 20
},
"cache": {
"bids": {}
}
}
}
}`,
`{
"id": "some-request-id",
Expand Down Expand Up @@ -610,6 +621,24 @@ var invalidRequests = []string{
}
}]
}`,
`{
"id": "some-request-id",
"site": {"page": "test.somepage.com"},
"imp": [{
"id": "my-imp-id",
"video": {
"mimes":["video/mp4"]
},
"ext": {
"appnexus": "good"
}
}],
"ext": {
"prebid": {
"cache": {}
}
}
}`,
}

// StoredRequest testing
Expand Down
66 changes: 66 additions & 0 deletions exchange/auction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package exchange

import (
"github.com/mxmCherry/openrtb"
"github.com/prebid/prebid-server/openrtb_ext"
)

// auction stores the Bids for a single call to Exchange.HoldAuction().
// Construct these with the newAuction() function.
type auction struct {
// winningBids is a map from imp.id to the highest overall CPM bid in that imp.
winningBids map[string]*openrtb.Bid
// winningBidders is a map from imp.id to the BidderName which made the winning Bid.
winningBidders map[string]openrtb_ext.BidderName
// winningBidsFromBidder stores the highest bid on each imp by each bidder.
winningBidsByBidder map[string]map[openrtb_ext.BidderName]*openrtb.Bid
// cachedBids stores the cache ID for each bid, if it exists.
// This is set by cacheBids() in cache.go, and is nil beforehand.
cachedBids map[*openrtb.Bid]string
}

func newAuction(numImps int) *auction {
return &auction{
winningBids: make(map[string]*openrtb.Bid, numImps),
winningBidders: make(map[string]openrtb_ext.BidderName, numImps),
winningBidsByBidder: make(map[string]map[openrtb_ext.BidderName]*openrtb.Bid, numImps),
}
}

// addBid should be called for each bid which is "officially" valid for the auction.
func (auction *auction) addBid(name openrtb_ext.BidderName, bid *openrtb.Bid) {
if auction == nil {
return
}

cpm := bid.Price
wbid, ok := auction.winningBids[bid.ImpID]
if !ok || cpm > wbid.Price {
auction.winningBidders[bid.ImpID] = name
auction.winningBids[bid.ImpID] = bid
}
if bidMap, ok := auction.winningBidsByBidder[bid.ImpID]; ok {
bestSoFar, ok := bidMap[name]
if !ok || cpm > bestSoFar.Price {
bidMap[name] = bid
}
} else {
auction.winningBidsByBidder[bid.ImpID] = make(map[openrtb_ext.BidderName]*openrtb.Bid)
auction.winningBidsByBidder[bid.ImpID][name] = bid
}
}

func (auction *auction) cacheId(bid *openrtb.Bid) (id string, exists bool) {
id, exists = auction.cachedBids[bid]
return
}

// forEachBestBid runs the callback function on every bid which is the highest one for each Bidder on each Imp.
func (auction *auction) forEachBestBid(callback func(impID string, bidder openrtb_ext.BidderName, bid *openrtb.Bid, winner bool)) {
for impId, bidderMap := range auction.winningBidsByBidder {
overallWinner, _ := auction.winningBids[impId]
for bidderName, bid := range bidderMap {
callback(impId, bidderName, bid, bid == overallWinner)
}
}
}
90 changes: 90 additions & 0 deletions exchange/auction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package exchange

import (
"github.com/mxmCherry/openrtb"
"github.com/prebid/prebid-server/openrtb_ext"
"testing"
)

func TestImpCount(t *testing.T) {
a := newAuction(2)
a.addBid(openrtb_ext.BidderAppnexus, &openrtb.Bid{
ImpID: "imp-1",
})
a.addBid(openrtb_ext.BidderRubicon, &openrtb.Bid{
ImpID: "imp-1",
})
a.addBid(openrtb_ext.BidderIndex, &openrtb.Bid{
ImpID: "imp-2",
})
if len(a.winningBids) != 2 {
t.Errorf("Expected 2 imps. Got %d", len(a.winningBids))
}
}

func TestAuctionIntegrity(t *testing.T) {
a := newAuction(2)
oneImpId := "imp-1"
otherImpId := "imp-2"

apnWinner := &openrtb.Bid{
ImpID: oneImpId,
Price: 3,
}
apnLoser := &openrtb.Bid{
ImpID: oneImpId,
Price: 2,
}
apnCompetitor := &openrtb.Bid{
ImpID: otherImpId,
Price: 1,
}
rubiWinner := &openrtb.Bid{
ImpID: otherImpId,
Price: 2,
}
a.addBid(openrtb_ext.BidderAppnexus, apnWinner)
a.addBid(openrtb_ext.BidderAppnexus, apnLoser)
a.addBid(openrtb_ext.BidderRubicon, rubiWinner)
a.addBid(openrtb_ext.BidderAppnexus, apnCompetitor)

seenWinnerImp1 := false
seenWinnerImp2 := false
seenLoserImp1 := false
seenLoserImp2 := false

numBestBids := 0
a.forEachBestBid(func(impId string, bidderName openrtb_ext.BidderName, bid *openrtb.Bid, winner bool) {
numBestBids++

if bid == apnWinner {
seenWinnerImp1 = true
}
if bid == apnLoser {
seenLoserImp1 = true
}
if bid == rubiWinner {
seenWinnerImp2 = true
}
if bid == apnCompetitor {
seenLoserImp2 = true
}
})

if !seenWinnerImp1 {
t.Errorf("foreachBestBid did not execute on apn winning bid.")
}
if seenLoserImp1 {
t.Errorf("foreachBestBid should not execute on apn backup bid.")
}
if !seenWinnerImp2 {
t.Errorf("foreachBestBid did not execute on rubicon winning bid.")
}
if !seenLoserImp2 {
t.Errorf("foreachBestBid did not execute on apn best-effort losing bid.")
}

if numBestBids != 3 {
t.Errorf("expected 3 best-effort bids. Got %d", numBestBids)
}
}
5 changes: 1 addition & 4 deletions exchange/bidder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,7 @@ func TestTargetingKeys(t *testing.T) {
Ext: bidReqExtRaw,
}

seatBid, errs := bidder.requestBid(context.Background(), bidRequest, &targetData{
winningBids: make(map[string]*openrtb.Bid),
winningBidders: make(map[string]openrtb_ext.BidderName),
}, "dummy")
seatBid, errs := bidder.requestBid(context.Background(), bidRequest, &targetData{}, "dummy")
if len(errs) > 0 {
t.Errorf("Errors processing requestBid")
for _, e := range errs {
Expand Down
50 changes: 50 additions & 0 deletions exchange/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package exchange

import (
"context"
"encoding/json"
"github.com/golang/glog"
"github.com/mxmCherry/openrtb"
"github.com/prebid/prebid-server/openrtb_ext"
"github.com/prebid/prebid-server/pbs/buckets"
"github.com/prebid/prebid-server/prebid_cache_client"
"strings"
)

// cacheBids mutates the auction so that the highest Bid from each Bidder in each Imp has a Cache ID associated with it.
//
// If any cache calls fail, then there's not much anyone can do about it. This function will just log
// the error and save IDs to any bids which are cached successfully.
func cacheBids(ctx context.Context, cache prebid_cache_client.Client, auction *auction, granularity openrtb_ext.PriceGranularity) {
bids := make([]*openrtb.Bid, 0, 30) // Arbitrary initial capacity
nextBidIndex := 0
auction.forEachBestBid(func(impID string, bidder openrtb_ext.BidderName, bid *openrtb.Bid, winner bool) {
// Fixes #199
granularityStr, err := buckets.GetPriceBucketString(bid.Price, granularity)
if err == nil && strings.ContainsAny(granularityStr, "123456789") {
bids = append(bids, bid)
nextBidIndex++
}
})

// Marshal the bids into JSON payloads. If any errors occur during marshalling, eject that bid from the array.
// After this block, we expect "bids" and "jsonValues" to have the same number of elements in the same order.
jsonValues := make([]json.RawMessage, 0, len(bids))
for i := 0; i < len(bids); i++ {
if jsonBytes, err := json.Marshal(bids[i]); err != nil {
glog.Errorf("Error marshalling OpenRTB Bid for Prebid Cache: %v", err)
bids = append(bids[:i], bids[i+1:]...)
i--
} else {
jsonValues = append(jsonValues, jsonBytes)
}
}

ids := cache.PutJson(ctx, jsonValues)
auction.cachedBids = make(map[*openrtb.Bid]string, len(bids))
for i := 0; i < len(bids); i++ {
if ids[i] != "" {
auction.cachedBids[bids[i]] = ids[i]
}
}
}
Loading

0 comments on commit 21da662

Please sign in to comment.