Skip to content

Commit

Permalink
Add loadtest hammer operation to the client (google#946)
Browse files Browse the repository at this point in the history
* Hammer with QPS graph

* wip: client operation

* Update default parameters

* fixups

* Silence logging before parse errors

* Update server URL

* reviewer edits
  • Loading branch information
gdbelvin authored Mar 8, 2018
1 parent 6b2641d commit 931e93c
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 14 deletions.
220 changes: 220 additions & 0 deletions cmd/keytransparency-client/cmd/hammer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"bytes"
"context"
"flag"
"fmt"
"log"
"sync"
"time"

"github.com/google/keytransparency/core/authentication"

"github.com/aybabtme/uniplot/histogram"
"github.com/paulbellamy/ratecounter"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"

tm "github.com/buger/goterm"
tpb "github.com/google/keytransparency/core/api/type/type_proto"
)

var (
maxWorkers uint
workers uint
memLog = new(bytes.Buffer)
ramp time.Duration
)

func init() {
// Silence "logging before flag.Parse"
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
flag.CommandLine.Parse([]string{})

log.SetOutput(memLog)

RootCmd.AddCommand(hammerCmd)

hammerCmd.Flags().UintVar(&maxWorkers, "workers", 1, "Number of parallel workers")
hammerCmd.Flags().DurationVar(&ramp, "ramp", 1*time.Second, "Time to spend ramping up")
}

// hammerCmd represents the post command
var hammerCmd = &cobra.Command{
Use: "hammer",
Short: "Loadtest the server",
Long: `Sends update requests for user_1 through user_n using a select number of workers in parallel.`,

PreRun: func(cmd *cobra.Command, args []string) {
if err := readKeyStoreFile(); err != nil {
log.Fatal(err)
}
},
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
runHammer(ctx, maxWorkers, ramp)
return nil
},
}

// runHammer adds workers up to maxWorkers over the ramp time.
func runHammer(ctx context.Context, maxWorkers uint, ramp time.Duration) {
rampDelta := ramp / time.Duration(maxWorkers)
times := make(chan time.Duration)
jobs := make(chan job)
var wg sync.WaitGroup
go recordLatencies(ctx, times, rampDelta)
go generateJobs(ctx, jobs)

// Slowly add workers up to maxWorkers
rampTicker := time.NewTicker(rampDelta)
for ; workers < maxWorkers && ctx.Err() == nil; <-rampTicker.C {
workers++
wg.Add(1)
go worker(ctx, workers, jobs, times, &wg)
}
rampTicker.Stop()
wg.Wait()
}

type job func() error

func generateJobs(ctx context.Context, jobs chan<- job) {
i := 0
for {
select {
case <-ctx.Done():
return
case jobs <- bindJob(ctx, i):
i++
}
}
}

func bindJob(ctx context.Context, i int) func() error {
userID := fmt.Sprintf("user_%v", i)
return func() error {
return writeOp(ctx, "app1", userID)
}
}

func worker(ctx context.Context, id uint, jobs <-chan job, times chan<- time.Duration, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case f := <-jobs:
start := time.Now()
if err := f(); err != nil {
log.Printf("f(): %v", err)
continue
}
times <- time.Since(start)
}
}
}

func recordLatencies(ctx context.Context, times <-chan time.Duration, rampDelta time.Duration) {
latencies := make([]float64, 0, 1000)
refresh := time.NewTicker(250 * time.Millisecond)
newWorker := time.NewTicker(rampDelta)
qps := ratecounter.NewRateCounter(rampDelta)

qpsData := new(tm.DataTable)
qpsData.AddColumn("Workers")
qpsData.AddColumn("QPS")
qpsData.AddRow(0, 0)

for {
select {
case <-ctx.Done():
return
case t := <-times:
latencies = append(latencies, t.Seconds())
qps.Incr(1)
case <-refresh.C:
draw(latencies, qpsData, qps.Rate())
case <-newWorker.C:
qpsData.AddRow(float64(workers), float64(qps.Rate()))
}
}
}

func draw(latencies []float64, data *tm.DataTable, qps int64) {
tm.Clear()
tm.MoveCursor(0, 0)

// Global stats
tm.Printf("Workers: %v\n", workers)
tm.Printf("Total Requests: %v\n", len(latencies))
tm.Printf("Recent QPS: %v\n", qps)

// Threads per QPS graph
tm.Printf("QPS Chart:\n")
chart := tm.NewLineChart(100, 20)
tm.Println(chart.Draw(data))

// Console output
tm.Printf("Debug Output:\n")
box := tm.NewBox(180, 10, 0)
box.Write(memLog.Bytes())
tm.Println(box)

// Latency Hist
width := 50
height := 10

tm.Printf("Latency Histogram:\n")
hist := histogram.Hist(height, latencies)
histogram.Fprint(tm.Output, hist, histogram.Linear(width))

tm.Flush()
}

// writeOp performs one write command and returns the time it took to complete.
func writeOp(ctx context.Context, appID, userID string) error {
timeout := viper.GetDuration("timeout")
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

userCreds := authentication.GetFakeCredential(userID)
c, err := GetClient(ctx, userCreds)
if err != nil {
return fmt.Errorf("error connecting: %v", err)
}

signers := store.Signers()
authorizedKeys, err := store.PublicKeys()
if err != nil {
return fmt.Errorf("store.PublicKeys() failed: %v", err)
}
if err != nil {
return fmt.Errorf("updateKeys() failed: %v", err)
}
u := &tpb.User{
DomainId: viper.GetString("domain"),
AppId: appID,
UserId: userID,
PublicKeyData: []byte("publickey"),
AuthorizedKeys: authorizedKeys,
}
_, err = c.Update(ctx, u, signers)
return err
}
26 changes: 12 additions & 14 deletions cmd/keytransparency-client/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func init() {
cobra.OnInitialize(initConfig)
RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.keytransparency.yaml)")

RootCmd.PersistentFlags().String("domain", "google.com", "Domain within the KT server")
RootCmd.PersistentFlags().String("kt-url", "35.184.134.53:8080", "URL of Key Transparency server")
RootCmd.PersistentFlags().String("domain", "default", "Domain within the KT server")
RootCmd.PersistentFlags().String("kt-url", "35.224.99.110:8080", "URL of Key Transparency server")
RootCmd.PersistentFlags().String("kt-cert", "genfiles/server.crt", "Path to public key for Key Transparency")
RootCmd.PersistentFlags().Bool("autoconfig", true, "Fetch config info from the server's /v1/domain/info")
RootCmd.PersistentFlags().Bool("insecure", false, "Skip TLS checks")
RootCmd.PersistentFlags().Bool("insecure", true, "Skip TLS checks")

RootCmd.PersistentFlags().String("vrf", "genfiles/vrf-pubkey.pem", "path to vrf public key")

Expand All @@ -92,8 +92,8 @@ func init() {
RootCmd.PersistentFlags().String("fake-auth-userid", "", "userid to present to the server as identity for authentication. Only succeeds if fake auth is enabled on the server side.")

// Global flags for use by subcommands.
RootCmd.PersistentFlags().DurationP("timeout", "t", 3*time.Minute, "Time to wait before operations timeout")
RootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Print in/out and verification steps")
RootCmd.PersistentFlags().DurationP("timeout", "t", 15*time.Second, "Time to wait before operations timeout")
RootCmd.PersistentFlags().BoolVar(&verbose, "verbose", false, "Print in/out and verification steps")
if err := viper.BindPFlags(RootCmd.PersistentFlags()); err != nil {
log.Fatalf("%v", err)
}
Expand Down Expand Up @@ -237,27 +237,25 @@ func GetClient(ctx context.Context, userCreds credentials.PerRPCCredentials) (*c

cc, err := dial(ctx, ktURL, userCreds)
if err != nil {
return nil, fmt.Errorf("Error Dialing: %v", err)
return nil, fmt.Errorf("dial %v: %v", ktURL, err)
}
ktCli := pb.NewKeyTransparencyClient(cc)

config, err := config(ctx, cc)
config, err := config(ctx, ktCli)
if err != nil {
return nil, fmt.Errorf("Error reading config: %v", err)
return nil, fmt.Errorf("config: %v", err)
}

return client.NewFromConfig(pb.NewKeyTransparencyClient(cc), config)
return client.NewFromConfig(ktCli, config)
}

// config selects a source for and returns the client configuration.
func config(ctx context.Context, cc *grpc.ClientConn) (*pb.Domain, error) {
func config(ctx context.Context, client pb.KeyTransparencyClient) (*pb.Domain, error) {
autoConfig := viper.GetBool("autoconfig")
domain := viper.GetString("domain")
switch {
case autoConfig:
ktClient := pb.NewKeyTransparencyClient(cc)
return ktClient.GetDomain(ctx, &pb.GetDomainRequest{
DomainId: domain,
})
return client.GetDomain(ctx, &pb.GetDomainRequest{DomainId: domain})
default:
return readConfigFromDisk()
}
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/server-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ spec:
- --map-url=map-server:8090
- --tls-key=/kt/server.key
- --tls-cert=/kt/server.crt
- --auth-type=insecure-fake
- --alsologtostderr
- --v=5
image: us.gcr.io/key-transparency/keytransparency-server:latest
Expand Down

0 comments on commit 931e93c

Please sign in to comment.