diff --git a/README.md b/README.md index 9666d20..f766b69 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ import ( // "/cats" and "/dogs" and exposes it on a localhost:8765. It then opens connections // to that port, selects the protocols and tests that the handlers are working. func main() { - mux := ms.NewMultistreamMuxer() + mux := ms.NewMultistreamMuxer[string]() mux.AddHandler("/cats", func(proto string, rwc io.ReadWriteCloser) error { fmt.Fprintln(rwc, proto, ": HELLO I LIKE CATS") return rwc.Close() diff --git a/client.go b/client.go index 811e3b3..506e453 100644 --- a/client.go +++ b/client.go @@ -31,7 +31,7 @@ const ( // to inform the muxer of the protocol that will be used to communicate // on this ReadWriteCloser. It returns an error if, for example, // the muxer does not know how to handle this protocol. -func SelectProtoOrFail(proto string, rwc io.ReadWriteCloser) (err error) { +func SelectProtoOrFail[T StringLike](proto T, rwc io.ReadWriteCloser) (err error) { defer func() { if rerr := recover(); rerr != nil { fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack()) @@ -66,7 +66,7 @@ func SelectProtoOrFail(proto string, rwc io.ReadWriteCloser) (err error) { // SelectOneOf will perform handshakes with the protocols on the given slice // until it finds one which is supported by the muxer. -func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (proto string, err error) { +func SelectOneOf[T StringLike](protos []T, rwc io.ReadWriteCloser) (proto T, err error) { defer func() { if rerr := recover(); rerr != nil { fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack()) @@ -97,7 +97,7 @@ const simOpenProtocol = "/libp2p/simultaneous-connect" // SelectWithSimopenOrFail performs protocol negotiation with the simultaneous open extension. // The returned boolean indicator will be true if we should act as a server. -func SelectWithSimopenOrFail(protos []string, rwc io.ReadWriteCloser) (proto string, isServer bool, err error) { +func SelectWithSimopenOrFail[T StringLike](protos []T, rwc io.ReadWriteCloser) (proto T, isServer bool, err error) { defer func() { if rerr := recover(); rerr != nil { fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack()) @@ -125,7 +125,7 @@ func SelectWithSimopenOrFail(protos []string, rwc io.ReadWriteCloser) (proto str return "", false, err } - tok, err := ReadNextToken(rwc) + tok, err := ReadNextToken[T](rwc) if err != nil { return "", false, err } @@ -146,13 +146,13 @@ func SelectWithSimopenOrFail(protos []string, rwc io.ReadWriteCloser) (proto str } return proto, false, nil default: - return "", false, errors.New("unexpected response: " + tok) + return "", false, fmt.Errorf("unexpected response: %s", tok) } } -func clientOpen(protos []string, rwc io.ReadWriteCloser) (string, error) { +func clientOpen[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, error) { // check to see if we selected the pipelined protocol - tok, err := ReadNextToken(rwc) + tok, err := ReadNextToken[T](rwc) if err != nil { return "", err } @@ -163,11 +163,11 @@ func clientOpen(protos []string, rwc io.ReadWriteCloser) (string, error) { case "na": return selectProtosOrFail(protos[1:], rwc) default: - return "", errors.New("unexpected response: " + tok) + return "", fmt.Errorf("unexpected response: %s", tok) } } -func selectProtosOrFail(protos []string, rwc io.ReadWriteCloser) (string, error) { +func selectProtosOrFail[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, error) { for _, p := range protos { err := trySelect(p, rwc) switch err { @@ -181,7 +181,7 @@ func selectProtosOrFail(protos []string, rwc io.ReadWriteCloser) (string, error) return "", ErrNotSupported } -func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { +func simOpen[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, bool, error) { randBytes := make([]byte, 8) _, err := rand.Read(randBytes) if err != nil { @@ -198,17 +198,17 @@ func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { // skip exactly one protocol // see https://github.com/multiformats/go-multistream/pull/42#discussion_r558757135 - _, err = ReadNextToken(rwc) + _, err = ReadNextToken[T](rwc) if err != nil { return "", false, err } // read the tie breaker nonce - tok, err := ReadNextToken(rwc) + tok, err := ReadNextToken[T](rwc) if err != nil { return "", false, err } - if !strings.HasPrefix(tok, tieBreakerPrefix) { + if !strings.HasPrefix(string(tok), tieBreakerPrefix) { return "", false, errors.New("tie breaker nonce not sent with the correct prefix") } @@ -216,7 +216,7 @@ func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { return "", false, err } - peerNonce, err := strconv.ParseUint(tok[len(tieBreakerPrefix):], 10, 64) + peerNonce, err := strconv.ParseUint(string(tok[len(tieBreakerPrefix):]), 10, 64) if err != nil { return "", false, err } @@ -228,7 +228,7 @@ func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { } iamserver = peerNonce > myNonce - var proto string + var proto T if iamserver { proto, err = simOpenSelectServer(protos, rwc) } else { @@ -238,26 +238,26 @@ func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { return proto, iamserver, err } -func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error) { +func simOpenSelectServer[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, error) { werrCh := make(chan error, 1) go func() { err := delimWriteBuffered(rwc, []byte(responder)) werrCh <- err }() - tok, err := ReadNextToken(rwc) + tok, err := ReadNextToken[T](rwc) if err != nil { return "", err } if tok != initiator { - return "", errors.New("unexpected response: " + tok) + return "", fmt.Errorf("unexpected response: %s", tok) } if err = <-werrCh; err != nil { return "", err } for { - tok, err = ReadNextToken(rwc) + tok, err = ReadNextToken[T](rwc) if err == io.EOF { return "", ErrNotSupported @@ -286,19 +286,19 @@ func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error } -func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error) { +func simOpenSelectClient[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, error) { werrCh := make(chan error, 1) go func() { err := delimWriteBuffered(rwc, []byte(initiator)) werrCh <- err }() - tok, err := ReadNextToken(rwc) + tok, err := ReadNextToken[T](rwc) if err != nil { return "", err } if tok != responder { - return "", errors.New("unexpected response: " + tok) + return "", fmt.Errorf("unexpected response: %s", tok) } if err = <-werrCh; err != nil { return "", err @@ -308,7 +308,7 @@ func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error } func readMultistreamHeader(r io.Reader) error { - tok, err := ReadNextToken(r) + tok, err := ReadNextToken[string](r) if err != nil { return err } @@ -319,7 +319,7 @@ func readMultistreamHeader(r io.Reader) error { return nil } -func trySelect(proto string, rwc io.ReadWriteCloser) error { +func trySelect[T StringLike](proto T, rwc io.ReadWriteCloser) error { err := delimWriteBuffered(rwc, []byte(proto)) if err != nil { return err @@ -327,8 +327,8 @@ func trySelect(proto string, rwc io.ReadWriteCloser) error { return readProto(proto, rwc) } -func readProto(proto string, r io.Reader) error { - tok, err := ReadNextToken(r) +func readProto[T StringLike](proto T, r io.Reader) error { + tok, err := ReadNextToken[T](r) if err != nil { return err } @@ -339,6 +339,6 @@ func readProto(proto string, r io.Reader) error { case "na": return ErrNotSupported default: - return errors.New("unrecognized response: " + tok) + return fmt.Errorf("unrecognized response: %s", tok) } } diff --git a/lazyClient.go b/lazyClient.go index 76d79ff..13108cc 100644 --- a/lazyClient.go +++ b/lazyClient.go @@ -8,9 +8,9 @@ import ( // NewMSSelect returns a new Multistream which is able to perform // protocol selection with a MultistreamMuxer. -func NewMSSelect(c io.ReadWriteCloser, proto string) LazyConn { - return &lazyClientConn{ - protos: []string{ProtocolID, proto}, +func NewMSSelect[T StringLike](c io.ReadWriteCloser, proto T) LazyConn { + return &lazyClientConn[T]{ + protos: []T{ProtocolID, proto}, con: c, } } @@ -18,9 +18,9 @@ func NewMSSelect(c io.ReadWriteCloser, proto string) LazyConn { // NewMultistream returns a multistream for the given protocol. This will not // perform any protocol selection. If you are using a MultistreamMuxer, use // NewMSSelect. -func NewMultistream(c io.ReadWriteCloser, proto string) LazyConn { - return &lazyClientConn{ - protos: []string{proto}, +func NewMultistream[T StringLike](c io.ReadWriteCloser, proto T) LazyConn { + return &lazyClientConn[T]{ + protos: []T{proto}, con: c, } } @@ -31,7 +31,7 @@ func NewMultistream(c io.ReadWriteCloser, proto string) LazyConn { // It *does not* block writes waiting for the other end to respond. Instead, it // simply assumes the negotiation went successfully and starts writing data. // See: https://github.com/multiformats/go-multistream/issues/20 -type lazyClientConn struct { +type lazyClientConn[T StringLike] struct { // Used to ensure we only trigger the write half of the handshake once. rhandshakeOnce sync.Once rerr error @@ -41,7 +41,7 @@ type lazyClientConn struct { werr error // The sequence of protocols to negotiate. - protos []string + protos []T // The inner connection. con io.ReadWriteCloser @@ -53,7 +53,7 @@ type lazyClientConn struct { // half of the handshake and then waits for the read half to complete. // // It returns an error if the read half of the handshake fails. -func (l *lazyClientConn) Read(b []byte) (int, error) { +func (l *lazyClientConn[T]) Read(b []byte) (int, error) { l.rhandshakeOnce.Do(func() { go l.whandshakeOnce.Do(l.doWriteHandshake) l.doReadHandshake() @@ -68,10 +68,10 @@ func (l *lazyClientConn) Read(b []byte) (int, error) { return l.con.Read(b) } -func (l *lazyClientConn) doReadHandshake() { +func (l *lazyClientConn[T]) doReadHandshake() { for _, proto := range l.protos { // read protocol - tok, err := ReadNextToken(l.con) + tok, err := ReadNextToken[T](l.con) if err != nil { l.rerr = err return @@ -88,12 +88,12 @@ func (l *lazyClientConn) doReadHandshake() { } } -func (l *lazyClientConn) doWriteHandshake() { +func (l *lazyClientConn[T]) doWriteHandshake() { l.doWriteHandshakeWithData(nil) } // Perform the write handshake but *also* write some extra data. -func (l *lazyClientConn) doWriteHandshakeWithData(extra []byte) int { +func (l *lazyClientConn[T]) doWriteHandshakeWithData(extra []byte) int { buf := getWriter(l.con) defer putWriter(buf) @@ -122,7 +122,7 @@ func (l *lazyClientConn) doWriteHandshakeWithData(extra []byte) int { // // Write *also* ignores errors from the read half of the handshake (in case the // stream is actually write only). -func (l *lazyClientConn) Write(b []byte) (int, error) { +func (l *lazyClientConn[T]) Write(b []byte) (int, error) { n := 0 l.whandshakeOnce.Do(func() { go l.rhandshakeOnce.Do(l.doReadHandshake) @@ -137,7 +137,7 @@ func (l *lazyClientConn) Write(b []byte) (int, error) { // Close closes the underlying io.ReadWriteCloser // // This does not flush anything. -func (l *lazyClientConn) Close() error { +func (l *lazyClientConn[T]) Close() error { // As the client, we flush the handshake on close to cover an // interesting edge-case where the server only speaks a single protocol // and responds eagerly with that protocol before waiting for out @@ -151,7 +151,7 @@ func (l *lazyClientConn) Close() error { } // Flush sends the handshake. -func (l *lazyClientConn) Flush() error { +func (l *lazyClientConn[T]) Flush() error { l.whandshakeOnce.Do(func() { go l.rhandshakeOnce.Do(l.doReadHandshake) l.doWriteHandshake() diff --git a/multistream.go b/multistream.go index 9be9db4..daca87c 100644 --- a/multistream.go +++ b/multistream.go @@ -29,29 +29,35 @@ var writerPool = sync.Pool{ }, } +// StringLike is an interface that supports all types with underlying type +// string +type StringLike interface { + ~string +} + // HandlerFunc is a user-provided function used by the MultistreamMuxer to // handle a protocol/stream. -type HandlerFunc = func(protocol string, rwc io.ReadWriteCloser) error +type HandlerFunc[T StringLike] func(protocol T, rwc io.ReadWriteCloser) error // Handler is a wrapper to HandlerFunc which attaches a name (protocol) and a // match function which can optionally be used to select a handler by other // means than the name. -type Handler struct { - MatchFunc func(string) bool - Handle HandlerFunc - AddName string +type Handler[T StringLike] struct { + MatchFunc func(T) bool + Handle HandlerFunc[T] + AddName T } // MultistreamMuxer is a muxer for multistream. Depending on the stream // protocol tag it will select the right handler and hand the stream off to it. -type MultistreamMuxer struct { +type MultistreamMuxer[T StringLike] struct { handlerlock sync.RWMutex - handlers []Handler + handlers []Handler[T] } // NewMultistreamMuxer creates a muxer. -func NewMultistreamMuxer() *MultistreamMuxer { - return new(MultistreamMuxer) +func NewMultistreamMuxer[T StringLike]() *MultistreamMuxer[T] { + return new(MultistreamMuxer[T]) } // LazyConn is the connection type returned by the lazy negotiation functions. @@ -111,26 +117,26 @@ func delimWrite(w io.Writer, mes []byte) error { return nil } -func fulltextMatch(s string) func(string) bool { - return func(a string) bool { +func fulltextMatch[T StringLike](s T) func(T) bool { + return func(a T) bool { return a == s } } // AddHandler attaches a new protocol handler to the muxer. -func (msm *MultistreamMuxer) AddHandler(protocol string, handler HandlerFunc) { +func (msm *MultistreamMuxer[T]) AddHandler(protocol T, handler HandlerFunc[T]) { msm.AddHandlerWithFunc(protocol, fulltextMatch(protocol), handler) } // AddHandlerWithFunc attaches a new protocol handler to the muxer with a match. // If the match function returns true for a given protocol tag, the protocol // will be selected even if the handler name and protocol tags are different. -func (msm *MultistreamMuxer) AddHandlerWithFunc(protocol string, match func(string) bool, handler HandlerFunc) { +func (msm *MultistreamMuxer[T]) AddHandlerWithFunc(protocol T, match func(T) bool, handler HandlerFunc[T]) { msm.handlerlock.Lock() defer msm.handlerlock.Unlock() msm.removeHandler(protocol) - msm.handlers = append(msm.handlers, Handler{ + msm.handlers = append(msm.handlers, Handler[T]{ MatchFunc: match, Handle: handler, AddName: protocol, @@ -138,14 +144,14 @@ func (msm *MultistreamMuxer) AddHandlerWithFunc(protocol string, match func(stri } // RemoveHandler removes the handler with the given name from the muxer. -func (msm *MultistreamMuxer) RemoveHandler(protocol string) { +func (msm *MultistreamMuxer[T]) RemoveHandler(protocol T) { msm.handlerlock.Lock() defer msm.handlerlock.Unlock() msm.removeHandler(protocol) } -func (msm *MultistreamMuxer) removeHandler(protocol string) { +func (msm *MultistreamMuxer[T]) removeHandler(protocol T) { for i, h := range msm.handlers { if h.AddName == protocol { msm.handlers = append(msm.handlers[:i], msm.handlers[i+1:]...) @@ -155,11 +161,11 @@ func (msm *MultistreamMuxer) removeHandler(protocol string) { } // Protocols returns the list of handler-names added to this this muxer. -func (msm *MultistreamMuxer) Protocols() []string { +func (msm *MultistreamMuxer[T]) Protocols() []T { msm.handlerlock.RLock() defer msm.handlerlock.RUnlock() - var out []string + var out []T for _, h := range msm.handlers { out = append(out, h.AddName) } @@ -171,7 +177,7 @@ func (msm *MultistreamMuxer) Protocols() []string { // fails because of a ProtocolID mismatch. var ErrIncorrectVersion = errors.New("client connected with incorrect version") -func (msm *MultistreamMuxer) findHandler(proto string) *Handler { +func (msm *MultistreamMuxer[T]) findHandler(proto T) *Handler[T] { msm.handlerlock.RLock() defer msm.handlerlock.RUnlock() @@ -186,7 +192,7 @@ func (msm *MultistreamMuxer) findHandler(proto string) *Handler { // Negotiate performs protocol selection and returns the protocol name and // the matching handler function for it (or an error). -func (msm *MultistreamMuxer) Negotiate(rwc io.ReadWriteCloser) (proto string, handler HandlerFunc, err error) { +func (msm *MultistreamMuxer[T]) Negotiate(rwc io.ReadWriteCloser) (proto T, handler HandlerFunc[T], err error) { defer func() { if rerr := recover(); rerr != nil { fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack()) @@ -199,8 +205,7 @@ func (msm *MultistreamMuxer) Negotiate(rwc io.ReadWriteCloser) (proto string, ha // other side has closed this rwc for writing. They may have sent us a // message and closed. Future writers will get an error anyways. _ = delimWriteBuffered(rwc, []byte(ProtocolID)) - - line, err := ReadNextToken(rwc) + line, err := ReadNextToken[T](rwc) if err != nil { return "", nil, err } @@ -213,7 +218,7 @@ func (msm *MultistreamMuxer) Negotiate(rwc io.ReadWriteCloser) (proto string, ha loop: for { // Now read and respond to commands until they send a valid protocol id - tok, err := ReadNextToken(rwc) + tok, err := ReadNextToken[T](rwc) if err != nil { return "", nil, err } @@ -240,7 +245,7 @@ loop: // Handle performs protocol negotiation on a ReadWriteCloser // (i.e. a connection). It will find a matching handler for the // incoming protocol and pass the ReadWriteCloser to it. -func (msm *MultistreamMuxer) Handle(rwc io.ReadWriteCloser) error { +func (msm *MultistreamMuxer[T]) Handle(rwc io.ReadWriteCloser) error { p, h, err := msm.Negotiate(rwc) if err != nil { return err @@ -250,13 +255,13 @@ func (msm *MultistreamMuxer) Handle(rwc io.ReadWriteCloser) error { // ReadNextToken extracts a token from a Reader. It is used during // protocol negotiation and returns a string. -func ReadNextToken(r io.Reader) (string, error) { +func ReadNextToken[T StringLike](r io.Reader) (T, error) { tok, err := ReadNextTokenBytes(r) if err != nil { return "", err } - return string(tok), nil + return T(tok), nil } // ReadNextTokenBytes extracts a token from a Reader. It is used diff --git a/multistream_test.go b/multistream_test.go index 4d67b56..454e59d 100644 --- a/multistream_test.go +++ b/multistream_test.go @@ -78,7 +78,7 @@ func newPipe(t *testing.T) (io.ReadWriteCloser, io.ReadWriteCloser) { func TestProtocolNegotiation(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -112,7 +112,7 @@ func TestProtocolNegotiation(t *testing.T) { func TestProtocolNegotiationLazy(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -149,7 +149,7 @@ func TestProtocolNegotiationLazy(t *testing.T) { func TestProtocolNegotiationUnsupported(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() done := make(chan struct{}) go func() { @@ -170,7 +170,7 @@ func TestProtocolNegotiationUnsupported(t *testing.T) { func TestNegLazyStressRead(t *testing.T) { const count = 75 - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -225,7 +225,7 @@ func TestNegLazyStressRead(t *testing.T) { func TestNegLazyStressWrite(t *testing.T) { const count = 100 - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -277,7 +277,7 @@ func TestNegLazyStressWrite(t *testing.T) { func TestInvalidProtocol(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() done := make(chan struct{}) go func() { defer close(done) @@ -303,7 +303,7 @@ func TestInvalidProtocol(t *testing.T) { func TestSelectOne(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -341,7 +341,7 @@ func TestSelectOne(t *testing.T) { func TestSelectFails(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -355,7 +355,7 @@ func TestSelectFails(t *testing.T) { } func TestRemoveProtocol(t *testing.T) { - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -378,7 +378,7 @@ func TestRemoveProtocol(t *testing.T) { func TestSelectOneAndWrite(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -416,7 +416,7 @@ func TestSelectOneAndWrite(t *testing.T) { func TestLazyConns(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -430,7 +430,7 @@ func TestLazyConns(t *testing.T) { func TestLazyAndMux(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -474,7 +474,7 @@ func TestLazyAndMux(t *testing.T) { func TestHandleFunc(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", func(p string, rwc io.ReadWriteCloser) error { @@ -505,7 +505,7 @@ func TestHandleFunc(t *testing.T) { func TestAddHandlerOverride(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/foo", func(p string, rwc io.ReadWriteCloser) error { t.Error("shouldnt execute this handler") return nil @@ -536,7 +536,7 @@ func TestAddHandlerOverride(t *testing.T) { func TestLazyAndMuxWrite(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) mux.AddHandler("/b", nil) mux.AddHandler("/c", nil) @@ -624,7 +624,7 @@ func TestTooLargeMessage(t *testing.T) { t.Fatal(err) } - _, err = ReadNextToken(buf) + _, err = ReadNextToken[string](buf) if err == nil { t.Fatal("should have failed to read message larger than 64k") } @@ -674,7 +674,7 @@ func TestNegotiatThenWriteFail(t *testing.T) { t.Fatal(err) } - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("foo", nil) rob := &readonlyBuffer{bytes.NewReader(buf.Bytes())} @@ -776,7 +776,7 @@ func TestNegotiatePeerSendsAndCloses(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("foo", nil) _, _, err = mux.Negotiate(tc.s) if err != nil { @@ -789,7 +789,7 @@ func TestNegotiatePeerSendsAndCloses(t *testing.T) { func TestSimopenClientServer(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) done := make(chan struct{}) @@ -829,7 +829,7 @@ func TestSimopenClientServer(t *testing.T) { func TestSimopenClientServerFail(t *testing.T) { a, b := newPipe(t) - mux := NewMultistreamMuxer() + mux := NewMultistreamMuxer[string]() mux.AddHandler("/a", nil) done := make(chan struct{})