Skip to content

Commit

Permalink
fix up (#227)
Browse files Browse the repository at this point in the history
* fix up

Signed-off-by: Sam Yuan <[email protected]>

* fix up

Signed-off-by: Sam Yuan <[email protected]>

* fix up

Signed-off-by: Sam Yuan <[email protected]>

* fix up

Signed-off-by: Sam Yuan <[email protected]>
  • Loading branch information
SamYuan1990 authored Nov 15, 2021
1 parent 0d5ae35 commit 66fd1ab
Show file tree
Hide file tree
Showing 20 changed files with 247 additions and 128 deletions.
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/Shopify/sarama v1.29.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gogo/protobuf v1.2.1
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.4.2
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
github.com/hashicorp/go-version v1.3.0 // indirect
github.com/hyperledger/fabric v2.1.1+incompatible
github.com/hyperledger/fabric-amcl v0.0.0-20210603140002-2670f91851c8 // indirect
Expand All @@ -27,8 +27,7 @@ require (
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
golang.org/x/tools v0.1.5 // indirect
google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6 // indirect
google.golang.org/grpc v1.24.0
google.golang.org/grpc v1.29.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.3.0
)
48 changes: 39 additions & 9 deletions go.sum

Large diffs are not rendered by default.

15 changes: 10 additions & 5 deletions internal/fabric/core/comm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"crypto/x509"
"time"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"

"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"google.golang.org/grpc"
Expand Down Expand Up @@ -143,10 +144,14 @@ func (client *GRPCClient) NewConnection(address string, tlsOptions ...TLSOption)
))

tracer := opentracing.GlobalTracer()
dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
opts := []grpc_opentracing.Option{
grpc_opentracing.WithTracer(tracer),
}

dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(grpc_opentracing.StreamClientInterceptor(opts...)),
)

ctx, cancel := context.WithTimeout(context.Background(), client.timeout)
defer cancel()
Expand Down
5 changes: 0 additions & 5 deletions pkg/infra/basic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ type TracingEnvelope struct {
Span opentracing.Span
}

type TracingSpans struct {
Spans map[string]opentracing.Span
Lock sync.Mutex
}

type Config struct {
Endorsers []Node `yaml:"endorsers"`
Committers []Node `yaml:"committers"`
Expand Down
91 changes: 91 additions & 0 deletions pkg/infra/basic/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package basic
import (
"fmt"
"io"
"sync"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -38,3 +39,93 @@ func Init(service string) (opentracing.Tracer, io.Closer) {
}
return tracer, closer
}

const (
TRANSCATION = "TRANSCATION"
TRANSCATIONSTART = "TRANSCATIONSTART"
SIGN_PROPOSAL = "SIGN_PROPOSAL"
ENDORSEMENT = "ENDORSEMENT"
ENDORSEMENT_AT_PEER = "ENDORSEMENT_AT_PEER"
COLLECT_ENDORSEMENT = "COLLECT_ENDORSEMENT"
SIGN_ENVELOP = "SIGN_ENVELOP"
BROADCAST = "BROADCAST"
CONSESUS = "CONSESUS"
COMMIT_AT_NETWORK = "COMMIT_AT_NETWORK"
COMMIT_AT_PEER = "COMMIT_AT_PEER"
COMMIT_AT_ALL_PEERS = "COMMIT_AT_ALL_PEERS"
)

var TapeSpan *TracingSpans
var ProcessMod int

type TracingSpans struct {
Spans map[string]opentracing.Span
Lock sync.Mutex
}

func (TS *TracingSpans) MakeSpan(txid, address, event string, parent opentracing.Span) opentracing.Span {
str := fmt.Sprintf(event + address)
if parent == nil {
return opentracing.GlobalTracer().StartSpan(str, opentracing.Tag{Key: "txid", Value: txid})
} else {
return opentracing.GlobalTracer().StartSpan(str, opentracing.ChildOf(parent.Context()), opentracing.Tag{Key: "txid", Value: txid})
}
}

func (TS *TracingSpans) GetSpan(txid, address, event string) opentracing.Span {
TS.Lock.Lock()
defer TS.Lock.Unlock()

str := fmt.Sprintf(event + txid + address)
span, ok := TS.Spans[str]
if ok {
return span
}
return nil
}

func (TS *TracingSpans) SpanIntoMap(txid, address, event string, parent opentracing.Span) opentracing.Span {
TS.Lock.Lock()
defer TS.Lock.Unlock()

str := fmt.Sprintf(event + txid + address)
span, ok := TS.Spans[str]
if !ok {
span = TS.MakeSpan(txid, address, event, parent)
TS.Spans[str] = span
}
return span
}

func (TS *TracingSpans) FinishWithMap(txid, address, event string) {
TS.Lock.Lock()
defer TS.Lock.Unlock()

str := fmt.Sprintf(event + txid + address)
span, ok := TS.Spans[str]
if ok {
span.Finish()
}
}

func GetGlobalSpan() *TracingSpans {
return TapeSpan
}

func InitSpan() *TracingSpans {
Spans := make(map[string]opentracing.Span)

TapeSpan = &TracingSpans{
Spans: Spans,
}

return GetGlobalSpan()
}

func SetMod(mod int) {
ProcessMod = mod
}

func GetMod() int {
return ProcessMod
}
2 changes: 2 additions & 0 deletions pkg/infra/cmdImpl/fullProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os/signal"
"syscall"
"tape/pkg/infra"
"tape/pkg/infra/basic"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -25,6 +26,7 @@ func Process(configPath string, num int, burst, signerNumber, parallel int, rate
defer cmdConfig.Closer.Close()
var Observer_workers []infra.Worker
var Observers infra.ObserverWorker
basic.SetMod(processmod)
/*** workers ***/
if processmod != infra.TRAFFIC {
Observer_workers, Observers, err = cmdConfig.Observerfactory.CreateObserverWorkers(processmod)
Expand Down
10 changes: 3 additions & 7 deletions pkg/infra/cmdImpl/processTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,13 @@ func CreateCmd(configPath string, num int, burst, signerNumber, parallel int, ra
errorCh := make(chan error, burst)
ctx, cancel := context.WithCancel(context.Background())

tr, closer := basic.Init("tape")
opentracing.SetGlobalTracer(tr)
for i := 0; i < len(config.Endorsers); i++ {
signed[i] = make(chan *basic.Elements, burst)
}

Spans := make(map[string]opentracing.Span)
Tspans := &basic.TracingSpans{
Spans: Spans,
}
tr, closer := basic.Init("tape")
opentracing.SetGlobalTracer(tr)
basic.InitSpan()

mytrafficGenerator := trafficGenerator.NewTrafficGenerator(ctx,
crypto,
Expand All @@ -76,7 +73,6 @@ func CreateCmd(configPath string, num int, burst, signerNumber, parallel int, ra
num,
parallel,
envs,
Tspans,
errorCh)
cmd := &CmdConfig{
finishCh,
Expand Down
7 changes: 2 additions & 5 deletions pkg/infra/observer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,10 @@ func BenchmarkPeerEndorsement8(b *testing.B) { benchmarkNPeer(8, b) }
func benchmarkAsyncCollector(concurrent int, b *testing.B) {
block := make(chan *observer.AddressedBlock, 100)
done := make(chan struct{})
Spans := make(map[string]opentracing.Span)
Tspans := &basic.TracingSpans{
Spans: Spans,
}
logger := log.New()
basic.InitSpan()

instance, _ := observer.NewBlockCollector(concurrent, concurrent, context.Background(), block, done, b.N, false, Tspans, logger)
instance, _ := observer.NewBlockCollector(concurrent, concurrent, context.Background(), block, done, b.N, false, logger)
go instance.Start()

b.ReportAllocs()
Expand Down
23 changes: 7 additions & 16 deletions pkg/infra/observer/block_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"tape/pkg/infra"
"tape/pkg/infra/basic"
"tape/pkg/infra/bitmap"
"time"
Expand All @@ -23,7 +24,6 @@ type BlockCollector struct {
ctx context.Context
blockCh chan *AddressedBlock
finishCh chan struct{}
Spans *basic.TracingSpans
logger *log.Logger
printResult bool // controls whether to print block commit message. Tests set this to false to avoid polluting stdout.
}
Expand All @@ -42,7 +42,6 @@ func NewBlockCollector(threshold int, totalP int,
finishCh chan struct{},
totalTx int,
printResult bool,
Spans *basic.TracingSpans,
logger *log.Logger) (*BlockCollector, error) {
registry := make(map[uint64]*bitmap.BitMap)
if threshold <= 0 || totalP <= 0 {
Expand All @@ -60,7 +59,6 @@ func NewBlockCollector(threshold int, totalP int,
blockCh: blockCh,
finishCh: finishCh,
printResult: printResult,
Spans: Spans,
logger: logger,
}, nil
}
Expand Down Expand Up @@ -111,13 +109,8 @@ func (bc *BlockCollector) commit(block *AddressedBlock) {
fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", block.Now.Seconds(), block.Number, len(block.FilteredTransactions))
for _, b := range block.FilteredBlock.FilteredTransactions {
basic.LogEvent(bc.logger, b.Txid, "CommitAtPeersOverThreshold")
bc.Spans.Lock.Lock()
span, ok := bc.Spans.Spans[b.Txid+"_threshold"]
if ok {
span.Finish()
delete(bc.Spans.Spans, b.Txid+"_threshold")
}
bc.Spans.Lock.Unlock()
tapeSpan := basic.GetGlobalSpan()
tapeSpan.FinishWithMap(b.Txid, "", basic.COMMIT_AT_NETWORK)
}
}
if breakbynumber {
Expand All @@ -137,13 +130,11 @@ func (bc *BlockCollector) commit(block *AddressedBlock) {
delete(bc.registry, block.Number)
for _, b := range block.FilteredBlock.FilteredTransactions {
basic.LogEvent(bc.logger, b.Txid, "CommitAtPeers")
bc.Spans.Lock.Lock()
span, ok := bc.Spans.Spans[b.Txid]
if ok {
span.Finish()
delete(bc.Spans.Spans, b.Txid)
tapeSpan := basic.GetGlobalSpan()
tapeSpan.FinishWithMap(b.Txid, "", basic.COMMIT_AT_ALL_PEERS)
if basic.GetMod() == infra.FULLPROCESS {
tapeSpan.FinishWithMap(b.Txid, "", basic.TRANSCATION)
}
bc.Spans.Lock.Unlock()
}
}
}
Loading

0 comments on commit 66fd1ab

Please sign in to comment.