diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index b9d8a5fbbd..23cdb29ee9 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -771,16 +771,26 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { } } -// generateMessages sends a sequence of messages to the specified multiplex connection `mc`. +const ( + kibibyte = 1024 + mebibyte = 1024 * 1024 +) + +// generateAndSendMessages sends a sequence of messages to the specified multiplex connection `mc`. // Each message has the given size and is sent at the specified rate // `messagingRate`. This process continues for the duration `totalDuration` or // until `totalNum` messages are sent. If `totalNum` is negative, // messaging persists for the entire `totalDuration`. -func generateMessages(mc *MConnection, +func generateAndSendMessages(mc *MConnection, messagingRate time.Duration, - totalDuration time.Duration, totalNum int, msgSize int, chID byte) { - // all messages have an identical content - msg := bytes.Repeat([]byte{'x'}, msgSize) + totalDuration time.Duration, totalNum int, msgSize int, + msgContnet []byte, chID byte) { + var msg []byte + if msgContnet == nil { + msg = bytes.Repeat([]byte{'x'}, msgSize) + } else { + msg = msgContnet + } // message generation interval ticker ticker := time.NewTicker(messagingRate) @@ -847,13 +857,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 1, " + "total load = 50 KB, " + "msg rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 1 * 50, sendQueueCapacity: 1, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 2 @@ -864,13 +874,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 50, " + "total load = 50 KB, " + "traffic rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 1 * 50, sendQueueCapacity: 50, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 3 @@ -881,13 +891,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 50 KB, " + "traffic rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 1 * 50, sendQueueCapacity: 100, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 4 @@ -896,13 +906,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 2 * 50 KB, " + "traffic rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 2 * 50, sendQueueCapacity: 100, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 5 @@ -912,13 +922,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 8 * 50, sendQueueCapacity: 100, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 6 @@ -928,13 +938,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = 2 * send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 8 * 50, sendQueueCapacity: 100, messagingRate: 10 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 7 @@ -944,13 +954,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = 10 * send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 8 * 50, sendQueueCapacity: 100, messagingRate: 2 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, } @@ -974,8 +984,8 @@ func BenchmarkMConnection(b *testing.B) { } cnfg := DefaultMConnConfig() - cnfg.SendRate = 50 * 1024 // 500 KB/s - cnfg.RecvRate = 50 * 1024 // 500 KB/s + cnfg.SendRate = tt.sendRate + cnfg.RecvRate = tt.recRate chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, SendQueueCapacity: tt.sendQueueCapacity}} clientMconn := NewMConnectionWithConfig(client, chDescs, @@ -1006,11 +1016,12 @@ func BenchmarkMConnection(b *testing.B) { // taken to set up the connections b.StartTimer() // start generating messages, it is a blocking call - generateMessages(clientMconn, + generateAndSendMessages(clientMconn, tt.messagingRate, tt.totalDuration, tt.totalMsg, - tt.msgSize, chID) + tt.msgSize, + nil, chID) // wait for all messages to be received <-allReceived @@ -1040,3 +1051,177 @@ func tcpNetPipe() (net.Conn, net.Conn) { return conn2, conn1 } + +// generateExponentialSizedMessages creates and returns a series of messages +// with sizes (in the specified unit) increasing exponentially. +// The size of each message doubles, starting from 1 up to maxSizeBytes. +// unit is expected to be a power of 2. +func generateExponentialSizedMessages(maxSizeBytes int, unit int) [][]byte { + maxSizeToUnit := maxSizeBytes / unit + msgs := make([][]byte, 0) + + for size := 1; size <= maxSizeToUnit; size *= 2 { + msgs = append(msgs, bytes.Repeat([]byte{'x'}, size*unit)) // create a message of the calculated size + } + return msgs +} + +type testCase struct { + name string + msgSize int // size of each message in bytes + msg []byte // message to be sent + totalMsg int // total number of messages to be sent + messagingRate time.Duration // rate at which messages are sent + totalDuration time.Duration // total duration for which messages are sent + sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered + sendRate int64 // send rate in bytes per second + recRate int64 // receive rate in bytes per second + chID byte // channel ID +} + +func runBenchmarkTest(b *testing.B, tt testCase) { + b.Run(tt.name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + receivedLoad := 0 // number of messages received + onReceive := func(chID byte, msgBytes []byte) { + receivedLoad++ + if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { + allReceived <- true + } + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = tt.sendRate + cnfg.RecvRate = tt.recRate + chDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + clientMconn := NewMConnectionWithConfig(client, chDescs, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverChDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + serverMconn := NewMConnectionWithConfig(server, serverChDescs, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(b, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(b, err) + defer func() { + _ = serverMconn.Stop() + }() + + // start measuring the time from here to exclude the time + // taken to set up the connections + b.StartTimer() + // start generating messages, it is a blocking call + generateAndSendMessages(clientMconn, + tt.messagingRate, + tt.totalDuration, + tt.totalMsg, + tt.msgSize, + tt.msg, + tt.chID) + + // wait for all messages to be received + <-allReceived + b.StopTimer() + } + }) +} + +func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { + // One aspect that could impact the performance of MConnection and the + // transmission rate is the size of the messages sent over the network, + // especially when they exceed the MConnection.MaxPacketMsgPayloadSize ( + // messages are sent in packets of maximum size MConnection. + // MaxPacketMsgPayloadSize). + // The test cases in this benchmark involve sending messages with sizes + // ranging exponentially from 1KB to 8192KB ( + // the max value of 8192KB is inspired by the largest possible PFB in a + // Celestia block with 128*128 number of 512-byte shares) + // The bandwidth is set significantly higher than the message load to ensure + // it does not become a limiting factor. + // All test cases are expected to complete in less than one second, + // indicating a healthy performance. + + squareSize := 128 // number of shares in a row/column + shareSize := 512 // bytes + maxSize := squareSize * squareSize * shareSize // bytes + msgs := generateExponentialSizedMessages(maxSize, kibibyte) + chID := byte(0x01) + + // create test cases for each message size + var testCases = make([]testCase, len(msgs)) + for i, msg := range msgs { + testCases[i] = testCase{ + name: fmt.Sprintf("msgSize = %d KB", len(msg)/kibibyte), + msgSize: len(msg), + msg: msg, + totalMsg: 10, + messagingRate: time.Millisecond, + totalDuration: 1 * time.Minute, + sendQueueCapacity: 100, + sendRate: 512 * mebibyte, + recRate: 512 * mebibyte, + chID: chID, + } + } + + for _, tt := range testCases { + runBenchmarkTest(b, tt) + } +} + +func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { + // This benchmark test builds upon the previous one i.e., + // BenchmarkMConnection_ScalingPayloadSizes_HighSendRate + // by setting the send/and receive rates lower than the message load. + // Test cases involve sending the same load of messages but with different message sizes. + // Since the message load and bandwidth are consistent across all test cases, + // they are expected to complete in the same amount of time. i.e., + //totalLoad/sendRate. + + maxSize := 32 * kibibyte // 32KB + msgs := generateExponentialSizedMessages(maxSize, kibibyte) + totalLoad := float64(maxSize) + chID := byte(0x01) + // create test cases for each message size + var testCases = make([]testCase, len(msgs)) + for i, msg := range msgs { + msgSize := len(msg) + totalMsg := int(math.Ceil(totalLoad / float64(msgSize))) + testCases[i] = testCase{ + name: fmt.Sprintf("msgSize = %d KB", msgSize/kibibyte), + msgSize: msgSize, + msg: msg, + totalMsg: totalMsg, + messagingRate: time.Millisecond, + totalDuration: 1 * time.Minute, + sendQueueCapacity: 100, + sendRate: 4 * kibibyte, + recRate: 4 * kibibyte, + chID: chID, + } + } + + for _, tt := range testCases { + runBenchmarkTest(b, tt) + } +}