Skip to content

Commit

Permalink
Impl173 (#208)
Browse files Browse the repository at this point in the history
* adjust code

Signed-off-by: Sam Yuan <[email protected]>

* code refactor

Signed-off-by: Sam Yuan <[email protected]>

* code refactor

Signed-off-by: Sam Yuan <[email protected]>

* use const instead

Signed-off-by: Sam Yuan <[email protected]>

* adding parallel flag

Signed-off-by: Sam Yuan <[email protected]>
  • Loading branch information
SamYuan1990 authored and DavidLiu committed Mar 23, 2022
1 parent ee285f1 commit 389cd1f
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 222 deletions.
60 changes: 34 additions & 26 deletions cmd/tape/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"

"tape/pkg/infra"
"tape/pkg/infra/cmdImpl"

"github.com/pkg/errors"
Expand All @@ -18,28 +19,31 @@ const (
var (
app = kingpin.New("tape", "A performance test tool for Hyperledger Fabric")

run = app.Command("run", "Start the tape program").Default()
con = run.Flag("config", "Path to config file").Required().Short('c').String()
num = run.Flag("number", "Number of tx for shot").Required().Short('n').Int()
rate = run.Flag("rate", "[Optional] Creates tx rate, default 0 as unlimited").Default("0").Float64()
burst = run.Flag("burst", "[Optional] Burst size for Tape, should bigger than rate").Default("1000").Int()
signerNumber = run.Flag("signers", "[Optional] signer parallel Number for Tape, default as 5").Default("5").Int()
run = app.Command("run", "Start the tape program").Default()
con = run.Flag("config", "Path to config file").Required().Short('c').String()
num = run.Flag("number", "Number of tx for shot").Required().Short('n').Int()
rate = run.Flag("rate", "[Optional] Creates tx rate, default 0 as unlimited").Default("0").Float64()
burst = run.Flag("burst", "[Optional] Burst size for Tape, should bigger than rate").Default("1000").Int()
signerNumber = run.Flag("signers", "[Optional] signer parallel Number for Tape, default as 5").Default("5").Int()
ParallelNumber = run.Flag("parallel", "[Optional] parallel Number for Tape, default as 1").Default("1").Int()

version = app.Command("version", "Show version information")

commitOnly = app.Command("commitOnly", "Start tape with commitOnly mode, starts dummy envelop for test orderer only")
commitcon = commitOnly.Flag("config", "Path to config file").Required().Short('c').String()
commitnum = commitOnly.Flag("number", "Number of tx for shot").Required().Short('n').Int()
commitrate = commitOnly.Flag("rate", "[Optional] Creates tx rate, default 0 as unlimited").Default("0").Float64()
commitburst = commitOnly.Flag("burst", "[Optional] Burst size for Tape, should bigger than rate").Default("1000").Int()
commitsignerNumber = commitOnly.Flag("signers", "[Optional] signer parallel Number for Tape, default as 5").Default("5").Int()

endorsementOnly = app.Command("endorsementOnly", "Start tape with endorsementOnly mode, starts endorsement and end")
endorsementcon = endorsementOnly.Flag("config", "Path to config file").Required().Short('c').String()
endorsementnum = endorsementOnly.Flag("number", "Number of tx for shot").Required().Short('n').Int()
endorsementrate = endorsementOnly.Flag("rate", "[Optional] Creates tx rate, default 0 as unlimited").Default("0").Float64()
endorsementburst = endorsementOnly.Flag("burst", "[Optional] Burst size for Tape, should bigger than rate").Default("1000").Int()
endorsementsignerNumber = endorsementOnly.Flag("signers", "[Optional] signer parallel Number for Tape, default as 5").Default("5").Int()
commitOnly = app.Command("commitOnly", "Start tape with commitOnly mode, starts dummy envelop for test orderer only")
commitcon = commitOnly.Flag("config", "Path to config file").Required().Short('c').String()
commitnum = commitOnly.Flag("number", "Number of tx for shot").Required().Short('n').Int()
commitrate = commitOnly.Flag("rate", "[Optional] Creates tx rate, default 0 as unlimited").Default("0").Float64()
commitburst = commitOnly.Flag("burst", "[Optional] Burst size for Tape, should bigger than rate").Default("1000").Int()
commitsignerNumber = commitOnly.Flag("signers", "[Optional] signer parallel Number for Tape, default as 5").Default("5").Int()
commitParallelNumber = commitOnly.Flag("parallel", "[Optional] parallel Number for Tape, default as 1").Default("1").Int()

endorsementOnly = app.Command("endorsementOnly", "Start tape with endorsementOnly mode, starts endorsement and end")
endorsementcon = endorsementOnly.Flag("config", "Path to config file").Required().Short('c').String()
endorsementnum = endorsementOnly.Flag("number", "Number of tx for shot").Required().Short('n').Int()
endorsementrate = endorsementOnly.Flag("rate", "[Optional] Creates tx rate, default 0 as unlimited").Default("0").Float64()
endorsementburst = endorsementOnly.Flag("burst", "[Optional] Burst size for Tape, should bigger than rate").Default("1000").Int()
endorsementsignerNumber = endorsementOnly.Flag("signers", "[Optional] signer parallel Number for Tape, default as 5").Default("5").Int()
endorsementParallelNumber = endorsementOnly.Flag("parallel", "[Optional] parallel Number for Tape, default as 1").Default("1").Int()
)

func main() {
Expand All @@ -58,14 +62,14 @@ func main() {
case version.FullCommand():
fmt.Printf(cmdImpl.GetVersionInfo())
case commitOnly.FullCommand():
checkArgs(commitrate, commitburst, commitsignerNumber, logger)
err = cmdImpl.ProcessCommitOnly(*commitcon, *commitnum, *commitburst, *commitsignerNumber, *commitrate, logger)
checkArgs(commitrate, commitburst, commitsignerNumber, commitParallelNumber, logger)
err = cmdImpl.Process(*commitcon, *commitnum, *commitburst, *commitsignerNumber, *commitParallelNumber, *commitrate, logger, infra.COMMIT)
case endorsementOnly.FullCommand():
checkArgs(endorsementrate, endorsementburst, endorsementsignerNumber, logger)
err = cmdImpl.ProcessEndorsementOnly(*endorsementcon, *endorsementnum, *endorsementburst, *endorsementsignerNumber, *endorsementrate, logger)
checkArgs(endorsementrate, endorsementburst, endorsementsignerNumber, endorsementParallelNumber, logger)
err = cmdImpl.Process(*endorsementcon, *endorsementnum, *endorsementburst, *endorsementsignerNumber, *endorsementParallelNumber, *endorsementrate, logger, infra.ENDORSEMENT)
case run.FullCommand():
checkArgs(rate, burst, signerNumber, logger)
err = cmdImpl.Process(*con, *num, *burst, *signerNumber, *rate, logger)
checkArgs(rate, burst, signerNumber, ParallelNumber, logger)
err = cmdImpl.Process(*con, *num, *burst, *signerNumber, *ParallelNumber, *rate, logger, infra.FULLPROCESS)
default:
err = errors.Errorf("invalid command: %s", fullCmd)
}
Expand All @@ -78,7 +82,7 @@ func main() {
os.Exit(0)
}

func checkArgs(rate *float64, burst, signerNumber *int, logger *log.Logger) {
func checkArgs(rate *float64, burst, signerNumber, parallel *int, logger *log.Logger) {
if *rate < 0 {
os.Stderr.WriteString("tape: error: rate must be zero (unlimited) or positive number\n")
os.Exit(1)
Expand All @@ -91,6 +95,10 @@ func checkArgs(rate *float64, burst, signerNumber *int, logger *log.Logger) {
os.Stderr.WriteString("tape: error: signerNumber at least 1\n")
os.Exit(1)
}
if *parallel < 1 {
os.Stderr.WriteString("tape: error: parallel at least 1\n")
os.Exit(1)
}

if int64(*rate) > int64(*burst) {
fmt.Printf("As rate %d is bigger than burst %d, real rate is burst\n", int64(*rate), int64(*burst))
Expand Down
4 changes: 2 additions & 2 deletions e2e/multi_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ = Describe("Mock test for good path", func() {
}
GenerateConfigFile(config.Name(), configValue)

cmd := exec.Command(tapeBin, "-c", config.Name(), "-n", "500")
cmd := exec.Command(tapeBin, "-c", config.Name(), "-n", "500", "--parallel", "5")
tapeSession, err = gexec.Start(cmd, nil, nil)
Expect(err).NotTo(HaveOccurred())
Eventually(tapeSession.Out).Should(Say("Time.*Block.*Tx.*10.*"))
Expand All @@ -59,7 +59,7 @@ var _ = Describe("Mock test for good path", func() {
}
GenerateConfigFile(config.Name(), configValue)

cmd := exec.Command(tapeBin, "-c", config.Name(), "-n", "500")
cmd := exec.Command(tapeBin, "-c", config.Name(), "-n", "500", "--signers", "2")
tapeSession, err = gexec.Start(cmd, nil, nil)
Expect(err).NotTo(HaveOccurred())
Eventually(tapeSession.Out).Should(Say("Time.*Block.*Tx.*10.*"))
Expand Down
51 changes: 0 additions & 51 deletions pkg/infra/cmdImpl/commitOnly.go

This file was deleted.

47 changes: 0 additions & 47 deletions pkg/infra/cmdImpl/endorsementOnly.go

This file was deleted.

13 changes: 6 additions & 7 deletions pkg/infra/cmdImpl/fullProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,24 @@ package cmdImpl

import (
"fmt"
"tape/pkg/infra/observer"
"tape/pkg/infra/trafficGenerator"
"time"

log "github.com/sirupsen/logrus"
)

func Process(configPath string, num int, burst, signerNumber int, rate float64, logger *log.Logger) error {
func Process(configPath string, num int, burst, signerNumber, parallel int, rate float64, logger *log.Logger, processmod int) error {
/*** variables ***/
cmdConfig, err := CreateCmd(configPath, num, burst, signerNumber, rate)
cmdConfig, err := CreateCmd(configPath, num, burst, signerNumber, parallel, rate, logger)
if err != nil {
return err
}
defer cmdConfig.cancel()
/*** workers ***/
Observer_workers, Observers, err := observer.CreateObserverWorkers(cmdConfig.Config, cmdConfig.Crypto, cmdConfig.BlockCh, logger, cmdConfig.Ctx, cmdConfig.FinishCh, num, cmdConfig.ErrorCh)
Observer_workers, Observers, err := cmdConfig.Observerfactory.CreateObserverWorkers(processmod)
if err != nil {
return err
}
generator_workers, err := trafficGenerator.CreateGeneratorWorkers(cmdConfig.Ctx, cmdConfig.Crypto, cmdConfig.Raw, cmdConfig.Signed, cmdConfig.Envs, cmdConfig.Processed, cmdConfig.Config, num, burst, signerNumber, rate, logger, cmdConfig.ErrorCh)
generator_workers, err := cmdConfig.Generator.CreateGeneratorWorkers(processmod)
if err != nil {
return err
}
Expand All @@ -33,14 +31,15 @@ func Process(configPath string, num int, burst, signerNumber int, rate float64,
go worker.Start()
}
/*** waiting for complete ***/
total := num * parallel
for {
select {
case err = <-cmdConfig.ErrorCh:
return err
case <-cmdConfig.FinishCh:
duration := time.Since(Observers.GetTime())
logger.Infof("Completed processing transactions.")
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", num, duration, float64(num)/duration.Seconds())
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", total, duration, float64(total)/duration.Seconds())
return nil
}
}
Expand Down
67 changes: 49 additions & 18 deletions pkg/infra/cmdImpl/processTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,32 @@ package cmdImpl

import (
"context"
"tape/pkg/infra"
"tape/pkg/infra/basic"
"tape/pkg/infra/observer"
"tape/pkg/infra/trafficGenerator"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
log "github.com/sirupsen/logrus"
)

type CmdConfig struct {
Config basic.Config
Crypto infra.Crypto
Raw chan *peer.Proposal
Signed []chan *basic.Elements
Processed chan *basic.Elements
Envs chan *common.Envelope
BlockCh chan *observer.AddressedBlock
FinishCh chan struct{}
ErrorCh chan error
Ctx context.Context
cancel context.CancelFunc
// Config basic.Config
// Crypto infra.Crypto
// Raw chan *peer.Proposal
// Signed []chan *basic.Elements
// Processed chan *basic.Elements
// Envs chan *common.Envelope
// BlockCh chan *observer.AddressedBlock
FinishCh chan struct{}
ErrorCh chan error
//Ctx context.Context
cancel context.CancelFunc
Generator *trafficGenerator.TrafficGenerator
Observerfactory *observer.ObserverFactory
}

func CreateCmd(configPath string, num int, burst, signerNumber int, rate float64) (*CmdConfig, error) {
func CreateCmd(configPath string, num int, burst, signerNumber, parallel int, rate float64, logger *log.Logger) (*CmdConfig, error) {
config, err := basic.LoadConfig(configPath)
if err != nil {
return nil, err
Expand All @@ -47,18 +50,46 @@ func CreateCmd(configPath string, num int, burst, signerNumber int, rate float64
for i := 0; i < len(config.Endorsers); i++ {
signed[i] = make(chan *basic.Elements, burst)
}
cmd := &CmdConfig{
config,
mytrafficGenerator := trafficGenerator.NewTrafficGenerator(ctx,
crypto,
envs,
raw,
signed,
processed,
envs,
signed,
config,
num,
burst,
signerNumber,
parallel,
rate,
logger,
errorCh)

Observerfactory := observer.NewObserverFactory(
config,
crypto,
blockCh,
logger,
ctx,
finishCh,
num,
parallel,
envs,
errorCh)
cmd := &CmdConfig{
//config,
//crypto,
//raw,
//signed,
//processed,
//envs,
//blockCh,
finishCh,
errorCh,
ctx,
//ctx,
cancel,
mytrafficGenerator,
Observerfactory,
}
return cmd, nil
}
15 changes: 15 additions & 0 deletions pkg/infra/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@ package infra

import (
"tape/internal/fabric/protoutil"
"time"

"github.com/hyperledger/fabric-protos-go/common"
)

const (
FULLPROCESS = 6
ENDORSEMENT = 4
COMMIT = 3
PROPOSALFILTER = 4
COMMITFILTER = 3
QUERYFILTER = 2
)

/*
to do for #127 SM crypto
just need to do an impl for this interface and replace
Expand All @@ -27,3 +37,8 @@ as for #56 and #174,in cli imp adjust sequence of P&C impl to control workflow.
type Worker interface {
Start()
}

type ObserverWorker interface {
Worker
GetTime() time.Time
}
Loading

0 comments on commit 389cd1f

Please sign in to comment.