Skip to content

Commit

Permalink
impl for txid tracing at traffic generate (#218)
Browse files Browse the repository at this point in the history
* impl for txid tracing at traffic generate

Signed-off-by: Sam Yuan <[email protected]>

* fix up

Signed-off-by: Sam Yuan <[email protected]>

* fix up

Signed-off-by: Sam Yuan <[email protected]>

* fix up

Signed-off-by: Sam Yuan <[email protected]>
  • Loading branch information
SamYuan1990 authored Nov 6, 2021
1 parent fbea441 commit 93679ae
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 33 deletions.
10 changes: 9 additions & 1 deletion cmd/tape/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

const (
loglevel = "TAPE_LOGLEVEL"
loglevel = "TAPE_LOGLEVEL"
logfilename = "Tape.log"
)

var (
Expand Down Expand Up @@ -44,6 +45,12 @@ func main() {

logger := log.New()
logger.SetLevel(log.WarnLevel)
file, err := os.OpenFile(logfilename, os.O_CREATE|os.O_WRONLY, 0755)
if err != nil {
panic(err)
}
defer file.Close()
logger.SetOutput(file)
if customerLevel, customerSet := os.LookupEnv(loglevel); customerSet {
if lvl, err := log.ParseLevel(customerLevel); err == nil {
logger.SetLevel(lvl)
Expand Down Expand Up @@ -75,6 +82,7 @@ func main() {

if err != nil {
logger.Error(err)
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
os.Exit(0)
Expand Down
6 changes: 6 additions & 0 deletions pkg/infra/basic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import (
"gopkg.in/yaml.v2"
)

type TracingProposal struct {
*peer.Proposal
TxId string
}

type Elements struct {
TxId string
SignedProp *peer.SignedProposal
Responses []*peer.ProposalResponse
Lock sync.Mutex
Expand Down
15 changes: 15 additions & 0 deletions pkg/infra/basic/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package basic

import (
"fmt"
"time"

log "github.com/sirupsen/logrus"
)

func LogEvent(logger *log.Logger, txid, event string) {
now := time.Now()
year, month, day := now.Date()
time_str := fmt.Sprintf("%d-%d-%d 00:00:00", year, month, day)
logger.Debugf("For txid %s, event %s at %s", txid, event, time_str)
}
3 changes: 1 addition & 2 deletions pkg/infra/cmdImpl/processTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"tape/pkg/infra/trafficGenerator"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -36,7 +35,7 @@ func CreateCmd(configPath string, num int, burst, signerNumber, parallel int, ra
if err != nil {
return nil, err
}
raw := make(chan *peer.Proposal, burst)
raw := make(chan *basic.TracingProposal, burst)
signed := make([]chan *basic.Elements, len(config.Endorsers))
processed := make(chan *basic.Elements, burst)
envs := make(chan *common.Envelope, burst)
Expand Down
3 changes: 2 additions & 1 deletion pkg/infra/observer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func benchmarkNPeer(concurrency int, b *testing.B) {
processed := make(chan *basic.Elements, 10)
signeds := make([]chan *basic.Elements, concurrency)
ctx, cancel := context.WithCancel(context.Background())
logger := log.New()
defer cancel()
for i := 0; i < concurrency; i++ {
signeds[i] = make(chan *basic.Elements, 10)
Expand All @@ -34,7 +35,7 @@ func benchmarkNPeer(concurrency int, b *testing.B) {
}
mockpeer.Start()
defer mockpeer.Stop()
StartProposer(ctx, signeds[i], processed, nil, concurrency, mockpeer.PeersAddresses()[0])
StartProposer(ctx, signeds[i], processed, logger, concurrency, mockpeer.PeersAddresses()[0])
}
b.ReportAllocs()
b.ResetTimer()
Expand Down
12 changes: 7 additions & 5 deletions pkg/infra/trafficGenerator/assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,26 @@ import (

"tape/pkg/infra"

"github.com/hyperledger/fabric-protos-go/peer"
log "github.com/sirupsen/logrus"
)

type Assembler struct {
Signer infra.Crypto
Ctx context.Context
Raw chan *peer.Proposal
Raw chan *basic.TracingProposal
Signed []chan *basic.Elements
ErrorCh chan error
Logger *log.Logger
}

func (a *Assembler) sign(p *peer.Proposal) (*basic.Elements, error) {
sprop, err := SignProposal(p, a.Signer)
func (a *Assembler) sign(p *basic.TracingProposal) (*basic.Elements, error) {
sprop, err := SignProposal(p.Proposal, a.Signer)
if err != nil {
return nil, err
}
basic.LogEvent(a.Logger, p.TxId, "SignProposal")

return &basic.Elements{SignedProp: sprop}, nil
return &basic.Elements{TxId: p.TxId, SignedProp: sprop}, nil
}

func (a *Assembler) Start() {
Expand Down
11 changes: 5 additions & 6 deletions pkg/infra/trafficGenerator/generatorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"tape/pkg/infra/basic"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
log "github.com/sirupsen/logrus"
)

type TrafficGenerator struct {
ctx context.Context
crypto infra.Crypto
raw chan *peer.Proposal
raw chan *basic.TracingProposal
signed []chan *basic.Elements
envs chan *common.Envelope
processed chan *basic.Elements
Expand All @@ -27,7 +26,7 @@ type TrafficGenerator struct {
errorCh chan error
}

func NewTrafficGenerator(ctx context.Context, crypto infra.Crypto, envs chan *common.Envelope, raw chan *peer.Proposal, processed chan *basic.Elements, signed []chan *basic.Elements, config basic.Config, num int, burst, signerNumber, parallel int, rate float64, logger *log.Logger, errorCh chan error) *TrafficGenerator {
func NewTrafficGenerator(ctx context.Context, crypto infra.Crypto, envs chan *common.Envelope, raw chan *basic.TracingProposal, processed chan *basic.Elements, signed []chan *basic.Elements, config basic.Config, num int, burst, signerNumber, parallel int, rate float64, logger *log.Logger, errorCh chan error) *TrafficGenerator {
return &TrafficGenerator{
ctx: ctx,
crypto: crypto,
Expand Down Expand Up @@ -59,8 +58,8 @@ func (t *TrafficGenerator) CreateGeneratorWorkers(mode int) ([]infra.Worker, err
return generator_workers, err
}
generator_workers = append(generator_workers, proposers)
assembler := &Assembler{Signer: t.crypto, Ctx: t.ctx, Raw: t.raw, Signed: t.signed, ErrorCh: t.errorCh}
Integrator := &Integrator{Signer: t.crypto, Ctx: t.ctx, Processed: t.processed, Envs: t.envs, ErrorCh: t.errorCh}
assembler := &Assembler{Signer: t.crypto, Ctx: t.ctx, Raw: t.raw, Signed: t.signed, ErrorCh: t.errorCh, Logger: t.logger}
Integrator := &Integrator{Signer: t.crypto, Ctx: t.ctx, Processed: t.processed, Envs: t.envs, ErrorCh: t.errorCh, Logger: t.logger}
for i := 0; i < t.signerNumber; i++ {
generator_workers = append(generator_workers, assembler)
generator_workers = append(generator_workers, Integrator)
Expand All @@ -77,7 +76,7 @@ func (t *TrafficGenerator) CreateGeneratorWorkers(mode int) ([]infra.Worker, err
// if not fake int mod 2 = 0
for i := 0; i < t.parallel; i++ {
if mode%infra.QUERYFILTER == 0 {
Initiator := &Initiator{Num: t.num, Burst: t.burst, R: t.rate, Config: t.config, Crypto: t.crypto, Raw: t.raw, ErrorCh: t.errorCh}
Initiator := &Initiator{Num: t.num, Burst: t.burst, R: t.rate, Config: t.config, Crypto: t.crypto, Logger: t.logger, Raw: t.raw, ErrorCh: t.errorCh}
generator_workers = append(generator_workers, Initiator)
} else {
// if fake int mod 2 = 1
Expand Down
6 changes: 4 additions & 2 deletions pkg/infra/trafficGenerator/initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"tape/pkg/infra"
"tape/pkg/infra/basic"

"github.com/hyperledger/fabric-protos-go/peer"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)

Expand All @@ -16,7 +16,8 @@ type Initiator struct {
R float64
Config basic.Config
Crypto infra.Crypto
Raw chan *peer.Proposal
Logger *log.Logger
Raw chan *basic.TracingProposal
ErrorCh chan error
}

Expand All @@ -37,6 +38,7 @@ func (initiator *Initiator) Start() {
}
prop, err := CreateProposal(
initiator.Crypto,
initiator.Logger,
initiator.Config.Channel,
initiator.Config.Chaincode,
initiator.Config.Version,
Expand Down
19 changes: 10 additions & 9 deletions pkg/infra/trafficGenerator/initiator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"tape/pkg/infra/basic"
"tape/pkg/infra/trafficGenerator"

"github.com/hyperledger/fabric-protos-go/peer"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
)

var _ = Describe("Initiator", func() {

var (
configFile *os.File
tmpDir string
logger = log.New()
)

BeforeEach(func() {
Expand Down Expand Up @@ -56,15 +57,15 @@ var _ = Describe("Initiator", func() {
})

PIt("should crete proposal to raw without limit when number is 0", func() {
raw := make(chan *peer.Proposal, 1002)
raw := make(chan *basic.TracingProposal, 1002)
//defer close(raw)
errorCh := make(chan error, 1002)
defer close(errorCh)
config, err := basic.LoadConfig(configFile.Name())
Expect(err).NotTo(HaveOccurred())
crypto, err := config.LoadCrypto()
Expect(err).NotTo(HaveOccurred())
Initiator := &trafficGenerator.Initiator{0, 10, 0, config, crypto, raw, errorCh}
Initiator := &trafficGenerator.Initiator{0, 10, 0, config, crypto, logger, raw, errorCh}
go Initiator.Start()
for i := 0; i < 1002; i++ {
_, flag := <-raw
Expand All @@ -74,7 +75,7 @@ var _ = Describe("Initiator", func() {
})

It("should crete proposal to raw without limit when limit is 0", func() {
raw := make(chan *peer.Proposal, 1002)
raw := make(chan *basic.TracingProposal, 1002)
defer close(raw)
errorCh := make(chan error, 1002)
defer close(errorCh)
Expand All @@ -83,15 +84,15 @@ var _ = Describe("Initiator", func() {
crypto, err := config.LoadCrypto()
Expect(err).NotTo(HaveOccurred())
t := time.Now()
Initiator := &trafficGenerator.Initiator{1002, 10, 0, config, crypto, raw, errorCh}
Initiator := &trafficGenerator.Initiator{1002, 10, 0, config, crypto, logger, raw, errorCh}
Initiator.Start()
t1 := time.Now()
Expect(raw).To(HaveLen(1002))
Expect(t1.Sub(t)).To(BeNumerically("<", 2*time.Second))
})

It("should crete proposal to raw with given limit bigger than 0 less than size", func() {
raw := make(chan *peer.Proposal, 1002)
raw := make(chan *basic.TracingProposal, 1002)
defer close(raw)
errorCh := make(chan error, 1002)
defer close(errorCh)
Expand All @@ -100,15 +101,15 @@ var _ = Describe("Initiator", func() {
crypto, err := config.LoadCrypto()
Expect(err).NotTo(HaveOccurred())
t := time.Now()
Initiator := &trafficGenerator.Initiator{12, 10, 1, config, crypto, raw, errorCh}
Initiator := &trafficGenerator.Initiator{12, 10, 1, config, crypto, logger, raw, errorCh}
Initiator.Start()
t1 := time.Now()
Expect(raw).To(HaveLen(12))
Expect(t1.Sub(t)).To(BeNumerically(">", 2*time.Second))
})

It("should crete proposal to raw with given limit bigger than Size", func() {
raw := make(chan *peer.Proposal, 1002)
raw := make(chan *basic.TracingProposal, 1002)
defer close(raw)
errorCh := make(chan error, 1002)
defer close(errorCh)
Expand All @@ -117,7 +118,7 @@ var _ = Describe("Initiator", func() {
crypto, err := config.LoadCrypto()
Expect(err).NotTo(HaveOccurred())
t := time.Now()
Initiator := &trafficGenerator.Initiator{12, 10, 0, config, crypto, raw, errorCh}
Initiator := &trafficGenerator.Initiator{12, 10, 0, config, crypto, logger, raw, errorCh}
Initiator.Start()
t1 := time.Now()
Expect(raw).To(HaveLen(12))
Expand Down
3 changes: 3 additions & 0 deletions pkg/infra/trafficGenerator/intgerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"tape/pkg/infra/basic"

"github.com/hyperledger/fabric-protos-go/common"
log "github.com/sirupsen/logrus"
)

type Integrator struct {
Expand All @@ -14,10 +15,12 @@ type Integrator struct {
Processed chan *basic.Elements
Envs chan *common.Envelope
ErrorCh chan error
Logger *log.Logger
}

func (integrator *Integrator) assemble(e *basic.Elements) (*common.Envelope, error) {
env, err := CreateSignedTx(e.SignedProp, integrator.Signer, e.Responses)
basic.LogEvent(integrator.Logger, e.TxId, "CreateSignedEnvelope")
if err != nil {
return nil, err
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/infra/trafficGenerator/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (

"tape/internal/fabric/protoutil"
"tape/pkg/infra"
"tape/pkg/infra/basic"

"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

const charset = "abcdefghijklmnopqrstuvwxyz" +
Expand All @@ -27,7 +29,7 @@ const charset = "abcdefghijklmnopqrstuvwxyz" +
var seededRand *rand.Rand = rand.New(
rand.NewSource(time.Now().UnixNano()))

func CreateProposal(signer infra.Crypto, channel, ccname, version string, args ...string) (*peer.Proposal, error) {
func CreateProposal(signer infra.Crypto, logger *log.Logger, channel, ccname, version string, args ...string) (*basic.TracingProposal, error) {
var argsInByte [][]byte
for _, arg := range args {
current_arg, err := ConvertString(arg)
Expand All @@ -50,12 +52,13 @@ func CreateProposal(signer infra.Crypto, channel, ccname, version string, args .
return nil, err
}

prop, _, err := protoutil.CreateChaincodeProposal(common.HeaderType_ENDORSER_TRANSACTION, channel, invocation, creator)
prop, txid, err := protoutil.CreateChaincodeProposal(common.HeaderType_ENDORSER_TRANSACTION, channel, invocation, creator)
if err != nil {
return nil, err
}
basic.LogEvent(logger, txid, "CreateChaincodeProposal")

return prop, nil
return &basic.TracingProposal{Proposal: prop, TxId: txid}, nil
}

func SignProposal(prop *peer.Proposal, signer infra.Crypto) (*peer.SignedProposal, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/infra/trafficGenerator/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (p *Proposer) Start(ctx context.Context, signed, processed chan *basic.Elem
s.Responses = append(s.Responses, r)
if len(s.Responses) >= threshold {
processed <- s
basic.LogEvent(p.logger, s.TxId, "CompletedCollectEndorsement")
}
s.Lock.Unlock()
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion pkg/infra/trafficGenerator/proposer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var _ = Describe("Proposer", func() {
mockpeer, err := mock.NewServer(1, nil)
Expect(err).NotTo(HaveOccurred())
mockpeer.Start()
StartProposer(ctx, signeds[i], processed, nil, peerNum, mockpeer.PeersAddresses()[0])
StartProposer(ctx, signeds[i], processed, logger, peerNum, mockpeer.PeersAddresses()[0])
defer mockpeer.Stop()
}
runtime := b.Time("runtime", func() {
Expand Down
6 changes: 3 additions & 3 deletions test/integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ case $2 in
ENDORSEMNTONLY)
ARGS=(-ccep "OR('Org1.member','Org2.member')")
CMD=endorsementOnly
timeout 60 docker run -e TAPE_LOGLEVEL=debug --network $network -v $PWD:/config tape tape $CMD -c $CONFIG_FILE -n 500 --signers=10 --parallel=2
timeout 60 docker run --name tape -e TAPE_LOGLEVEL=debug --network $network -v $PWD:/config tape tape $CMD -c $CONFIG_FILE -n 500 --signers=10 --parallel=2
;;
COMMITONLY)
ARGS=(-cci initLedger)
CMD=commitOnly
timeout 60 docker run -e TAPE_LOGLEVEL=debug --network $network -v $PWD:/config tape tape $CMD -c $CONFIG_FILE -n 500 --signers=10 --parallel=2
timeout 60 docker run --name tape -e TAPE_LOGLEVEL=debug --network $network -v $PWD:/config tape tape $CMD -c $CONFIG_FILE -n 500 --signers=10 --parallel=2
;;
*)
ARGS=(-cci initLedger)
timeout 60 docker run -e TAPE_LOGLEVEL=debug --network $network -v $PWD:/config tape tape $CMD -c $CONFIG_FILE -n 500
timeout 60 docker run --name tape -e TAPE_LOGLEVEL=debug --network $network -v $PWD:/config tape tape $CMD -c $CONFIG_FILE -n 500
;;
esac

0 comments on commit 93679ae

Please sign in to comment.