diff --git a/rsocket/statemachine/RSocketStateMachine.cpp b/rsocket/statemachine/RSocketStateMachine.cpp index a2105754f..d0a1d5bb7 100644 --- a/rsocket/statemachine/RSocketStateMachine.cpp +++ b/rsocket/statemachine/RSocketStateMachine.cpp @@ -1190,23 +1190,15 @@ void RSocketStateMachine::setProtocolVersionOrThrow( } StreamId RSocketStateMachine::getNextStreamId() { - constexpr auto limit = - static_cast(std::numeric_limits::max() - 2); - - auto const streamId = nextStreamId_; - if (streamId >= limit) { - throw std::runtime_error{"Ran out of stream IDs"}; - } - - CHECK_EQ(0, streams_.count(streamId)) - << "Next stream ID already exists in the streams map"; - - nextStreamId_ += 2; + StreamId streamId; + do { + streamId = nextStreamId_.fetch_add(2); + } while( streamId == 0 || streams_.count(streamId) > 0); return streamId; } void RSocketStateMachine::setNextStreamId(StreamId streamId) { - nextStreamId_ = streamId + 2; + nextStreamId_.store(streamId + 2); } bool RSocketStateMachine::registerNewPeerStreamId(StreamId streamId) { diff --git a/rsocket/statemachine/RSocketStateMachine.h b/rsocket/statemachine/RSocketStateMachine.h index 71deeb322..2422b6244 100644 --- a/rsocket/statemachine/RSocketStateMachine.h +++ b/rsocket/statemachine/RSocketStateMachine.h @@ -324,7 +324,7 @@ class RSocketStateMachine final /// Map of all individual stream state machines. std::unordered_map> streams_; - StreamId nextStreamId_; + std::atomic nextStreamId_; StreamId lastPeerStreamId_{0}; // Manages all state needed for warm/cold resumption.