From bf465815b8afc3f10c90fc5ceb0e7dd9c5277a3a Mon Sep 17 00:00:00 2001 From: Jacky Chan Date: Thu, 1 Aug 2019 08:58:10 -0700 Subject: [PATCH 1/2] Fix stream id overflow problem reuse stream id and avoid id conflict --- rsocket/statemachine/RSocketStateMachine.cpp | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/rsocket/statemachine/RSocketStateMachine.cpp b/rsocket/statemachine/RSocketStateMachine.cpp index a2105754f..c6e9197a2 100644 --- a/rsocket/statemachine/RSocketStateMachine.cpp +++ b/rsocket/statemachine/RSocketStateMachine.cpp @@ -1190,18 +1190,12 @@ void RSocketStateMachine::setProtocolVersionOrThrow( } StreamId RSocketStateMachine::getNextStreamId() { - constexpr auto limit = - static_cast(std::numeric_limits::max() - 2); - - auto const streamId = nextStreamId_; - if (streamId >= limit) { - throw std::runtime_error{"Ran out of stream IDs"}; - } - - CHECK_EQ(0, streams_.count(streamId)) - << "Next stream ID already exists in the streams map"; - - nextStreamId_ += 2; + constexpr unsigned int MASK = 0x7FFFFFFF; + StreamId streamId; + do { + streamId = nextStreamId_ & MASK; + nextStreamId_ = (nextStreamId_ + 2) & MASK; + } while( streamId == 0 || streams_.count(streamId) > 0); return streamId; } From f52cd80f1cb1aa6e8035089eb0901c0d24b9faa9 Mon Sep 17 00:00:00 2001 From: linux_china Date: Tue, 6 Aug 2019 14:42:19 -0700 Subject: [PATCH 2/2] Change nextStreamId_ to std::atomic --- rsocket/statemachine/RSocketStateMachine.cpp | 6 ++---- rsocket/statemachine/RSocketStateMachine.h | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/rsocket/statemachine/RSocketStateMachine.cpp b/rsocket/statemachine/RSocketStateMachine.cpp index c6e9197a2..d0a1d5bb7 100644 --- a/rsocket/statemachine/RSocketStateMachine.cpp +++ b/rsocket/statemachine/RSocketStateMachine.cpp @@ -1190,17 +1190,15 @@ void RSocketStateMachine::setProtocolVersionOrThrow( } StreamId RSocketStateMachine::getNextStreamId() { - constexpr unsigned int MASK = 0x7FFFFFFF; StreamId streamId; do { - streamId = nextStreamId_ & MASK; - nextStreamId_ = (nextStreamId_ + 2) & MASK; + streamId = nextStreamId_.fetch_add(2); } while( streamId == 0 || streams_.count(streamId) > 0); return streamId; } void RSocketStateMachine::setNextStreamId(StreamId streamId) { - nextStreamId_ = streamId + 2; + nextStreamId_.store(streamId + 2); } bool RSocketStateMachine::registerNewPeerStreamId(StreamId streamId) { diff --git a/rsocket/statemachine/RSocketStateMachine.h b/rsocket/statemachine/RSocketStateMachine.h index 71deeb322..2422b6244 100644 --- a/rsocket/statemachine/RSocketStateMachine.h +++ b/rsocket/statemachine/RSocketStateMachine.h @@ -324,7 +324,7 @@ class RSocketStateMachine final /// Map of all individual stream state machines. std::unordered_map> streams_; - StreamId nextStreamId_; + std::atomic nextStreamId_; StreamId lastPeerStreamId_{0}; // Manages all state needed for warm/cold resumption.