diff --git a/go.mod b/go.mod index fe3676271..d56dc7703 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/mattermost/mattermost-plugin-calls/server/public v0.0.3 github.com/mattermost/mattermost/server/public v0.1.9 github.com/mattermost/morph v1.1.0 - github.com/mattermost/rtcd v0.18.1-0.20250107081358-290c5ce0a692 + github.com/mattermost/rtcd v1.0.0 github.com/mattermost/squirrel v0.2.0 github.com/pkg/errors v0.9.1 github.com/rudderlabs/analytics-go v3.3.3+incompatible diff --git a/go.sum b/go.sum index 6acbb8d59..fcc14778a 100644 --- a/go.sum +++ b/go.sum @@ -375,8 +375,8 @@ github.com/mattermost/mattermost/server/public v0.1.9 h1:l/OKPRVuFeqL0yqRVC/Jpve github.com/mattermost/mattermost/server/public v0.1.9/go.mod h1:SkTKbMul91Rq0v2dIxe8mqzUOY+3KwlwwLmAlxDfGCk= github.com/mattermost/morph v1.1.0 h1:Q9vrJbeM3s2jfweGheq12EFIzdNp9a/6IovcbvOQ6Cw= github.com/mattermost/morph v1.1.0/go.mod h1:gD+EaqX2UMyyuzmF4PFh4r33XneQ8Nzi+0E8nXjMa3A= -github.com/mattermost/rtcd v0.18.1-0.20250107081358-290c5ce0a692 h1:51Rxv6A5Esd11VW0PNHe0Vw3Zbye7J6iv3tXODTdknU= -github.com/mattermost/rtcd v0.18.1-0.20250107081358-290c5ce0a692/go.mod h1:hsRBk1e6dQZrZSK2FoqZAhO+oXg+1Je5zhF1ClLbbdA= +github.com/mattermost/rtcd v1.0.0 h1:7FjRcr7sinOOoSr+in/N/eeg/+BlowkY6m0FFBlM/6M= +github.com/mattermost/rtcd v1.0.0/go.mod h1:hsRBk1e6dQZrZSK2FoqZAhO+oXg+1Je5zhF1ClLbbdA= github.com/mattermost/squirrel v0.2.0 h1:8ZWeyf+MWQ2cL7hu9REZgLtz2IJi51qqZEovI3T3TT8= github.com/mattermost/squirrel v0.2.0/go.mod h1:NPPtk+CdpWre4GxMGoOpzEVFVc0ZoEFyJBZGCtn9nSU= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= diff --git a/lt/go.mod b/lt/go.mod index ca974f2f4..898c18b1f 100644 --- a/lt/go.mod +++ b/lt/go.mod @@ -6,7 +6,7 @@ require ( github.com/aws/aws-sdk-go v1.50.3 github.com/hajimehoshi/go-mp3 v0.3.4 github.com/mattermost/mattermost/server/public v0.0.12 - github.com/mattermost/rtcd v0.18.1-0.20250107081358-290c5ce0a692 + github.com/mattermost/rtcd v1.0.0 github.com/pion/rtp v1.8.10 github.com/pion/webrtc/v4 v4.0.7 gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 diff --git a/lt/go.sum b/lt/go.sum index fe7eb288e..8d028a973 100644 --- a/lt/go.sum +++ b/lt/go.sum @@ -85,8 +85,8 @@ github.com/mattermost/logr/v2 v2.0.21 h1:CMHsP+nrbRlEC4g7BwOk1GAnMtHkniFhlSQPXy5 github.com/mattermost/logr/v2 v2.0.21/go.mod h1:kZkB/zqKL9e+RY5gB3vGpsyenC+TpuiOenjMkvJJbzc= github.com/mattermost/mattermost/server/public v0.0.12 h1:iunc9q4/XkArOrndEUn73uFw6v9TOEXEtp6Nm6Iv218= github.com/mattermost/mattermost/server/public v0.0.12/go.mod h1:Bk+atJcELCIk9Yeq5FoqTr+gra9704+X4amrlwlTgSc= -github.com/mattermost/rtcd v0.18.1-0.20250107081358-290c5ce0a692 h1:51Rxv6A5Esd11VW0PNHe0Vw3Zbye7J6iv3tXODTdknU= -github.com/mattermost/rtcd v0.18.1-0.20250107081358-290c5ce0a692/go.mod h1:hsRBk1e6dQZrZSK2FoqZAhO+oXg+1Je5zhF1ClLbbdA= +github.com/mattermost/rtcd v1.0.0 h1:7FjRcr7sinOOoSr+in/N/eeg/+BlowkY6m0FFBlM/6M= +github.com/mattermost/rtcd v1.0.0/go.mod h1:hsRBk1e6dQZrZSK2FoqZAhO+oXg+1Je5zhF1ClLbbdA= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/server/activate.go b/server/activate.go index 63fceffb1..fabe6d342 100644 --- a/server/activate.go +++ b/server/activate.go @@ -162,6 +162,10 @@ func (p *Plugin) OnActivate() (retErr error) { p.LogDebug("rtcd client manager initialized successfully") p.rtcdManager = rtcdManager + + if err := p.cleanUpState(); err != nil { + p.LogError("failed to cleanup state", "err", err.Error()) + } } else { rtcServerConfig := rtc.ServerConfig{ ICEAddressUDP: cfg.UDPServerAddress, diff --git a/server/interfaces/interfaces.go b/server/interfaces/interfaces.go index a73fa7363..168994c03 100644 --- a/server/interfaces/interfaces.go +++ b/server/interfaces/interfaces.go @@ -57,4 +57,6 @@ type RTCDClient interface { Close() error GetVersionInfo() (rtcd.VersionInfo, error) GetSystemInfo() (rtcd.SystemInfo, error) + GetSession(callID, sessionID string) (rtc.SessionConfig, int, error) + GetSessions(callID string) ([]rtc.SessionConfig, int, error) } diff --git a/server/mocks/github.com/mattermost/mattermost-plugin-calls/server/interfaces/mock_RTCDClient.go b/server/mocks/github.com/mattermost/mattermost-plugin-calls/server/interfaces/mock_RTCDClient.go index 86a71b13d..2ae61436d 100644 --- a/server/mocks/github.com/mattermost/mattermost-plugin-calls/server/interfaces/mock_RTCDClient.go +++ b/server/mocks/github.com/mattermost/mattermost-plugin-calls/server/interfaces/mock_RTCDClient.go @@ -3,8 +3,10 @@ package interfaces import ( - service "github.com/mattermost/rtcd/service" + rtc "github.com/mattermost/rtcd/service/rtc" mock "github.com/stretchr/testify/mock" + + service "github.com/mattermost/rtcd/service" ) // MockRTCDClient is an autogenerated mock type for the RTCDClient type @@ -110,6 +112,135 @@ func (_c *MockRTCDClient_Connected_Call) RunAndReturn(run func() bool) *MockRTCD return _c } +// GetSession provides a mock function with given fields: callID, sessionID +func (_m *MockRTCDClient) GetSession(callID string, sessionID string) (rtc.SessionConfig, int, error) { + ret := _m.Called(callID, sessionID) + + if len(ret) == 0 { + panic("no return value specified for GetSession") + } + + var r0 rtc.SessionConfig + var r1 int + var r2 error + if rf, ok := ret.Get(0).(func(string, string) (rtc.SessionConfig, int, error)); ok { + return rf(callID, sessionID) + } + if rf, ok := ret.Get(0).(func(string, string) rtc.SessionConfig); ok { + r0 = rf(callID, sessionID) + } else { + r0 = ret.Get(0).(rtc.SessionConfig) + } + + if rf, ok := ret.Get(1).(func(string, string) int); ok { + r1 = rf(callID, sessionID) + } else { + r1 = ret.Get(1).(int) + } + + if rf, ok := ret.Get(2).(func(string, string) error); ok { + r2 = rf(callID, sessionID) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockRTCDClient_GetSession_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSession' +type MockRTCDClient_GetSession_Call struct { + *mock.Call +} + +// GetSession is a helper method to define mock.On call +// - callID string +// - sessionID string +func (_e *MockRTCDClient_Expecter) GetSession(callID interface{}, sessionID interface{}) *MockRTCDClient_GetSession_Call { + return &MockRTCDClient_GetSession_Call{Call: _e.mock.On("GetSession", callID, sessionID)} +} + +func (_c *MockRTCDClient_GetSession_Call) Run(run func(callID string, sessionID string)) *MockRTCDClient_GetSession_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *MockRTCDClient_GetSession_Call) Return(_a0 rtc.SessionConfig, _a1 int, _a2 error) *MockRTCDClient_GetSession_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockRTCDClient_GetSession_Call) RunAndReturn(run func(string, string) (rtc.SessionConfig, int, error)) *MockRTCDClient_GetSession_Call { + _c.Call.Return(run) + return _c +} + +// GetSessions provides a mock function with given fields: callID +func (_m *MockRTCDClient) GetSessions(callID string) ([]rtc.SessionConfig, int, error) { + ret := _m.Called(callID) + + if len(ret) == 0 { + panic("no return value specified for GetSessions") + } + + var r0 []rtc.SessionConfig + var r1 int + var r2 error + if rf, ok := ret.Get(0).(func(string) ([]rtc.SessionConfig, int, error)); ok { + return rf(callID) + } + if rf, ok := ret.Get(0).(func(string) []rtc.SessionConfig); ok { + r0 = rf(callID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]rtc.SessionConfig) + } + } + + if rf, ok := ret.Get(1).(func(string) int); ok { + r1 = rf(callID) + } else { + r1 = ret.Get(1).(int) + } + + if rf, ok := ret.Get(2).(func(string) error); ok { + r2 = rf(callID) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockRTCDClient_GetSessions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSessions' +type MockRTCDClient_GetSessions_Call struct { + *mock.Call +} + +// GetSessions is a helper method to define mock.On call +// - callID string +func (_e *MockRTCDClient_Expecter) GetSessions(callID interface{}) *MockRTCDClient_GetSessions_Call { + return &MockRTCDClient_GetSessions_Call{Call: _e.mock.On("GetSessions", callID)} +} + +func (_c *MockRTCDClient_GetSessions_Call) Run(run func(callID string)) *MockRTCDClient_GetSessions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockRTCDClient_GetSessions_Call) Return(_a0 []rtc.SessionConfig, _a1 int, _a2 error) *MockRTCDClient_GetSessions_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockRTCDClient_GetSessions_Call) RunAndReturn(run func(string) ([]rtc.SessionConfig, int, error)) *MockRTCDClient_GetSessions_Call { + _c.Call.Return(run) + return _c +} + // GetSystemInfo provides a mock function with given fields: func (_m *MockRTCDClient) GetSystemInfo() (service.SystemInfo, error) { ret := _m.Called() diff --git a/server/rtcd.go b/server/rtcd.go index b00369aa4..5cf582f58 100644 --- a/server/rtcd.go +++ b/server/rtcd.go @@ -10,6 +10,7 @@ import ( "fmt" "math/rand" "net" + "net/http" "net/url" "os" "strings" @@ -21,6 +22,7 @@ import ( "github.com/mattermost/mattermost-plugin-calls/server/db" "github.com/mattermost/mattermost-plugin-calls/server/interfaces" "github.com/mattermost/mattermost-plugin-calls/server/license" + "github.com/mattermost/mattermost-plugin-calls/server/public" rtcd "github.com/mattermost/rtcd/service" "github.com/mattermost/rtcd/service/random" @@ -706,3 +708,54 @@ func (h *rtcdHost) isFlagged() bool { defer h.mut.RUnlock() return h.flagged } + +// hasCallEnded checks if the call has ended by querying the RTCD host assigned to the call. +// Since this method is used to clean up the call state, it's important to be as conservative as possible +// and only return true if we are absolutely sure the call has ended. +func (m *rtcdClientManager) hasCallEnded(call *public.Call) bool { + m.ctx.LogDebug("RTCD host is set in call, checking...", "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + host := m.getHost(call.Props.RTCDHost) + if host != nil { + m.ctx.LogDebug("RTCD host found", "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + + // Version compatibility check. We need to be talking to RTCD v1.0.0 or higher to be able to call GetSessions. + info, err := host.client.GetVersionInfo() + if err != nil { + m.ctx.LogDebug("failed to get version info", "err", err.Error(), "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + return false + } + + // Always support dev builds. + if info.BuildVersion == "" || info.BuildVersion == "master" || strings.HasPrefix(info.BuildVersion, "dev") { + m.ctx.LogDebug("skipping version compatibility check", "buildVersion", info.BuildVersion, "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + } else if err := checkMinVersion("v1.0.0", info.BuildVersion); err != nil { + m.ctx.LogDebug("RTCD host version is not compatible", "err", err.Error(), "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + return false + } + + cfgs, code, err := host.client.GetSessions(call.ID) + if err != nil { + m.ctx.LogDebug("failed to get sessions for call", "err", err.Error(), "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + } + + if code != http.StatusOK && code != http.StatusNotFound { + m.ctx.LogDebug("unexpected status code from RTCD", "code", code, "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + // The request above could fail for various reasons, in which case we can't assume the call has ended. + return false + } + + if len(cfgs) > 0 { + // The call is ongoing so we skip state cleanup. + m.ctx.LogDebug("call is still ongoing", "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + return false + } + + // no call ongoing + m.ctx.LogDebug("call was not found", "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + } else { + // If the host is not found we can assume the RTCD node is gone for good so there's no way a call would be ongoing. + m.ctx.LogDebug("RTCD host not found", "callID", call.ID, "rtcdHost", call.Props.RTCDHost) + } + + return true +} diff --git a/server/state.go b/server/state.go index 29c8eda0c..378336e85 100644 --- a/server/state.go +++ b/server/state.go @@ -368,10 +368,19 @@ func (p *Plugin) cleanUpState() error { p.LogError("failed to lock call", "err", err.Error()) continue } + + // If a call has a RTCD host assigned, we want to check with the RTCD side whether the call is still ongoing or not before + // cleaning up the state. + if p.rtcdManager != nil && call.Props.RTCDHost != "" && !p.rtcdManager.hasCallEnded(call) { + p.unlockCall(call.ChannelID) + continue + } + if err := p.cleanCallState(call); err != nil { p.unlockCall(call.ChannelID) return fmt.Errorf("failed to clean up state: %w", err) } + p.unlockCall(call.ChannelID) } diff --git a/server/state_test.go b/server/state_test.go index 4acd07f53..f75c30c0d 100644 --- a/server/state_test.go +++ b/server/state_test.go @@ -9,10 +9,20 @@ import ( "testing" "time" + "github.com/mattermost/mattermost-plugin-calls/server/cluster" + "github.com/mattermost/mattermost-plugin-calls/server/db" "github.com/mattermost/mattermost-plugin-calls/server/public" + rtcd "github.com/mattermost/rtcd/service" + "github.com/mattermost/rtcd/service/rtc" + + rtcdMocks "github.com/mattermost/mattermost-plugin-calls/server/mocks/github.com/mattermost/mattermost-plugin-calls/server/interfaces" + serverMocks "github.com/mattermost/mattermost-plugin-calls/server/mocks/github.com/mattermost/mattermost-plugin-calls/server/interfaces" + pluginMocks "github.com/mattermost/mattermost-plugin-calls/server/mocks/github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/plugin" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -545,3 +555,555 @@ func BenchmarkCallStateClone(b *testing.B) { }) } } + +func TestCleanUpState(t *testing.T) { + mockAPI := &pluginMocks.MockAPI{} + mockMetrics := &serverMocks.MockMetrics{} + + p := Plugin{ + MattermostPlugin: plugin.MattermostPlugin{ + API: mockAPI, + }, + metrics: mockMetrics, + callsClusterLocks: map[string]*cluster.Mutex{}, + } + + store, tearDown := NewTestStore(t) + t.Cleanup(tearDown) + p.store = store + + t.Run("plugin mode", func(t *testing.T) { + defer mockAPI.AssertExpectations(t) + defer mockMetrics.AssertExpectations(t) + + t.Run("no calls", func(t *testing.T) { + mockAPI.On("LogDebug", "cleaning up calls state", + "origin", mock.AnythingOfType("string")).Once() + + err := p.cleanUpState() + require.NoError(t, err) + }) + + t.Run("ongoing calls", func(t *testing.T) { + defer ResetTestStore(t, p.store) + + channelID := model.NewId() + postID := model.NewId() + userID := model.NewId() + callID := model.NewId() + + call := &public.Call{ + ID: callID, + CreateAt: time.Now().UnixMilli(), + ChannelID: channelID, + StartAt: time.Now().UnixMilli(), + PostID: postID, + ThreadID: model.NewId(), + OwnerID: userID, + } + err := p.store.CreateCall(call) + require.NoError(t, err) + + createPost(t, store, postID, userID, channelID) + + err = p.store.CreateCallSession(&public.CallSession{ + ID: "connA", + CallID: callID, + UserID: "userA", + JoinAt: time.Now().UnixMilli(), + }) + require.NoError(t, err) + + mockAPI.On("LogDebug", "cleaning up calls state", + "origin", mock.AnythingOfType("string")).Once() + + mockAPI.On("LogDebug", "creating cluster mutex for call", + "origin", mock.AnythingOfType("string"), "channelID", channelID).Once() + + mockAPI.On("KVSetWithOptions", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + mockMetrics.On("ObserveClusterMutexGrabTime", "mutex_call", mock.AnythingOfType("float64")) + mockMetrics.On("ObserveClusterMutexLockedTime", "mutex_call", mock.AnythingOfType("float64")) + + mockAPI.On("UpdatePost", mock.AnythingOfType("*model.Post")).Return(&model.Post{Id: postID}, nil).Once() + mockAPI.On("GetConfig").Return(&model.Config{}, nil).Once() + mockAPI.On("KVDelete", "mutex_call_"+channelID).Return(nil) + + err = p.cleanUpState() + require.NoError(t, err) + + // Verify the call has ended and sessions have been deleted + calls, err := p.store.GetAllActiveCalls(db.GetCallOpts{}) + require.NoError(t, err) + require.Empty(t, calls) + sessions, err := p.store.GetCallSessions(callID, db.GetCallSessionOpts{}) + require.NoError(t, err) + require.Empty(t, sessions) + }) + }) + + t.Run("rtcd", func(t *testing.T) { + defer mockAPI.AssertExpectations(t) + defer mockMetrics.AssertExpectations(t) + + t.Run("no calls", func(t *testing.T) { + p.rtcdManager = &rtcdClientManager{ + ctx: &Plugin{ + MattermostPlugin: plugin.MattermostPlugin{ + API: mockAPI, + }, + }, + hosts: map[string]*rtcdHost{}, + } + + mockAPI.On("LogDebug", "cleaning up calls state", + "origin", mock.AnythingOfType("string")).Once() + + err := p.cleanUpState() + require.NoError(t, err) + }) + + t.Run("no rtcd host", func(t *testing.T) { + defer ResetTestStore(t, p.store) + + channelID := model.NewId() + postID := model.NewId() + userID := model.NewId() + callID := model.NewId() + + call := &public.Call{ + ID: callID, + CreateAt: time.Now().UnixMilli(), + ChannelID: channelID, + StartAt: time.Now().UnixMilli(), + PostID: postID, + ThreadID: model.NewId(), + OwnerID: userID, + Props: public.CallProps{ + RTCDHost: "127.0.0.1", + }, + } + err := p.store.CreateCall(call) + require.NoError(t, err) + + createPost(t, store, postID, userID, channelID) + + err = p.store.CreateCallSession(&public.CallSession{ + ID: "connA", + CallID: callID, + UserID: "userA", + JoinAt: time.Now().UnixMilli(), + }) + require.NoError(t, err) + + p.rtcdManager = &rtcdClientManager{ + ctx: &Plugin{ + MattermostPlugin: plugin.MattermostPlugin{ + API: mockAPI, + }, + }, + hosts: map[string]*rtcdHost{}, + } + + mockAPI.On("LogDebug", "cleaning up calls state", + "origin", mock.AnythingOfType("string")).Once() + + mockAPI.On("LogDebug", "creating cluster mutex for call", + "origin", mock.AnythingOfType("string"), "channelID", channelID).Once() + + mockAPI.On("KVSetWithOptions", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + mockMetrics.On("ObserveClusterMutexGrabTime", "mutex_call", mock.AnythingOfType("float64")) + mockMetrics.On("ObserveClusterMutexLockedTime", "mutex_call", mock.AnythingOfType("float64")) + + mockAPI.On("UpdatePost", mock.AnythingOfType("*model.Post")).Return(&model.Post{Id: postID}, nil).Once() + mockAPI.On("GetConfig").Return(&model.Config{}, nil).Once() + mockAPI.On("KVDelete", "mutex_call_"+channelID).Return(nil) + + mockAPI.On("LogDebug", "RTCD host is set in call, checking...", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockAPI.On("LogDebug", "RTCD host not found", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + err = p.cleanUpState() + require.NoError(t, err) + + // Verify the call has ended and sessions have been deleted + calls, err := p.store.GetAllActiveCalls(db.GetCallOpts{}) + require.NoError(t, err) + require.Empty(t, calls) + sessions, err := p.store.GetCallSessions(callID, db.GetCallSessionOpts{}) + require.NoError(t, err) + require.Empty(t, sessions) + }) + + t.Run("rtcd host but no call", func(t *testing.T) { + defer ResetTestStore(t, p.store) + + channelID := model.NewId() + postID := model.NewId() + userID := model.NewId() + callID := model.NewId() + + call := &public.Call{ + ID: callID, + CreateAt: time.Now().UnixMilli(), + ChannelID: channelID, + StartAt: time.Now().UnixMilli(), + PostID: postID, + ThreadID: model.NewId(), + OwnerID: userID, + Props: public.CallProps{ + RTCDHost: "127.0.0.1", + }, + } + err := p.store.CreateCall(call) + require.NoError(t, err) + + createPost(t, store, postID, userID, channelID) + + err = p.store.CreateCallSession(&public.CallSession{ + ID: "connA", + CallID: callID, + UserID: "userA", + JoinAt: time.Now().UnixMilli(), + }) + require.NoError(t, err) + + mockRTCDClient := &rtcdMocks.MockRTCDClient{} + defer mockRTCDClient.AssertExpectations(t) + + p.rtcdManager = &rtcdClientManager{ + ctx: &Plugin{ + MattermostPlugin: plugin.MattermostPlugin{ + API: mockAPI, + }, + }, + hosts: map[string]*rtcdHost{ + "127.0.0.1": { + client: mockRTCDClient, + }, + }, + } + + mockAPI.On("LogDebug", "cleaning up calls state", + "origin", mock.AnythingOfType("string")).Once() + + mockAPI.On("LogDebug", "creating cluster mutex for call", + "origin", mock.AnythingOfType("string"), "channelID", channelID).Once() + + mockAPI.On("KVSetWithOptions", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + mockMetrics.On("ObserveClusterMutexGrabTime", "mutex_call", mock.AnythingOfType("float64")) + mockMetrics.On("ObserveClusterMutexLockedTime", "mutex_call", mock.AnythingOfType("float64")) + + mockAPI.On("UpdatePost", mock.AnythingOfType("*model.Post")).Return(&model.Post{Id: postID}, nil).Once() + mockAPI.On("GetConfig").Return(&model.Config{}, nil).Once() + mockAPI.On("KVDelete", "mutex_call_"+channelID).Return(nil) + + mockAPI.On("LogDebug", "RTCD host is set in call, checking...", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockAPI.On("LogDebug", "RTCD host found", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockRTCDClient.On("GetVersionInfo").Return(rtcd.VersionInfo{}, nil) + + mockAPI.On("LogDebug", "skipping version compatibility check", + "origin", mock.AnythingOfType("string"), "buildVersion", "", "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockRTCDClient.On("GetSessions", callID).Return(nil, 404, fmt.Errorf("call not found")) + + mockAPI.On("LogDebug", "failed to get sessions for call", + "origin", mock.AnythingOfType("string"), "err", "call not found", "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockAPI.On("LogDebug", "call was not found", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + err = p.cleanUpState() + require.NoError(t, err) + + // Verify the call has ended and sessions have been deleted + calls, err := p.store.GetAllActiveCalls(db.GetCallOpts{}) + require.NoError(t, err) + require.Empty(t, calls) + sessions, err := p.store.GetCallSessions(callID, db.GetCallSessionOpts{}) + require.NoError(t, err) + require.Empty(t, sessions) + }) + + t.Run("rtcd host and active call", func(t *testing.T) { + defer ResetTestStore(t, p.store) + + channelID := model.NewId() + postID := model.NewId() + userID := model.NewId() + callID := model.NewId() + + call := &public.Call{ + ID: callID, + CreateAt: time.Now().UnixMilli(), + ChannelID: channelID, + StartAt: time.Now().UnixMilli(), + PostID: postID, + ThreadID: model.NewId(), + OwnerID: userID, + Props: public.CallProps{ + RTCDHost: "127.0.0.1", + }, + } + err := p.store.CreateCall(call) + require.NoError(t, err) + + createPost(t, store, postID, userID, channelID) + + err = p.store.CreateCallSession(&public.CallSession{ + ID: "connA", + CallID: callID, + UserID: "userA", + JoinAt: time.Now().UnixMilli(), + }) + require.NoError(t, err) + + mockRTCDClient := &rtcdMocks.MockRTCDClient{} + defer mockRTCDClient.AssertExpectations(t) + + p.rtcdManager = &rtcdClientManager{ + ctx: &Plugin{ + MattermostPlugin: plugin.MattermostPlugin{ + API: mockAPI, + }, + }, + hosts: map[string]*rtcdHost{ + "127.0.0.1": { + client: mockRTCDClient, + }, + }, + } + + mockAPI.On("LogDebug", "cleaning up calls state", + "origin", mock.AnythingOfType("string")).Once() + + mockAPI.On("LogDebug", "creating cluster mutex for call", + "origin", mock.AnythingOfType("string"), "channelID", channelID).Once() + + mockAPI.On("KVSetWithOptions", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + mockMetrics.On("ObserveClusterMutexGrabTime", "mutex_call", mock.AnythingOfType("float64")) + mockMetrics.On("ObserveClusterMutexLockedTime", "mutex_call", mock.AnythingOfType("float64")) + + mockAPI.On("KVDelete", "mutex_call_"+channelID).Return(nil) + + mockAPI.On("LogDebug", "RTCD host is set in call, checking...", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockAPI.On("LogDebug", "RTCD host found", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockRTCDClient.On("GetVersionInfo").Return(rtcd.VersionInfo{}, nil) + + mockAPI.On("LogDebug", "skipping version compatibility check", + "origin", mock.AnythingOfType("string"), "buildVersion", "", "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockRTCDClient.On("GetSessions", callID).Return([]rtc.SessionConfig{ + { + SessionID: "connA", + CallID: callID, + }, + }, 200, nil) + + mockAPI.On("LogDebug", "call is still ongoing", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + err = p.cleanUpState() + require.NoError(t, err) + + // Verify call and sessions are retained. + calls, err := p.store.GetAllActiveCalls(db.GetCallOpts{}) + require.NoError(t, err) + require.NotEmpty(t, calls) + sessions, err := p.store.GetCallSessions(callID, db.GetCallSessionOpts{}) + require.NoError(t, err) + require.NotEmpty(t, sessions) + }) + + t.Run("API request failure", func(t *testing.T) { + defer ResetTestStore(t, p.store) + + channelID := model.NewId() + postID := model.NewId() + userID := model.NewId() + callID := model.NewId() + + call := &public.Call{ + ID: callID, + CreateAt: time.Now().UnixMilli(), + ChannelID: channelID, + StartAt: time.Now().UnixMilli(), + PostID: postID, + ThreadID: model.NewId(), + OwnerID: userID, + Props: public.CallProps{ + RTCDHost: "127.0.0.1", + }, + } + err := p.store.CreateCall(call) + require.NoError(t, err) + + createPost(t, store, postID, userID, channelID) + + err = p.store.CreateCallSession(&public.CallSession{ + ID: "connA", + CallID: callID, + UserID: "userA", + JoinAt: time.Now().UnixMilli(), + }) + require.NoError(t, err) + + mockRTCDClient := &rtcdMocks.MockRTCDClient{} + defer mockRTCDClient.AssertExpectations(t) + + p.rtcdManager = &rtcdClientManager{ + ctx: &Plugin{ + MattermostPlugin: plugin.MattermostPlugin{ + API: mockAPI, + }, + }, + hosts: map[string]*rtcdHost{ + "127.0.0.1": { + client: mockRTCDClient, + }, + }, + } + + mockAPI.On("LogDebug", "cleaning up calls state", + "origin", mock.AnythingOfType("string")).Once() + + mockAPI.On("LogDebug", "creating cluster mutex for call", + "origin", mock.AnythingOfType("string"), "channelID", channelID).Once() + + mockAPI.On("KVSetWithOptions", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + mockMetrics.On("ObserveClusterMutexGrabTime", "mutex_call", mock.AnythingOfType("float64")) + mockMetrics.On("ObserveClusterMutexLockedTime", "mutex_call", mock.AnythingOfType("float64")) + + mockAPI.On("KVDelete", "mutex_call_"+channelID).Return(nil) + + mockAPI.On("LogDebug", "RTCD host is set in call, checking...", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockAPI.On("LogDebug", "RTCD host found", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockRTCDClient.On("GetVersionInfo").Return(rtcd.VersionInfo{}, nil) + + mockAPI.On("LogDebug", "skipping version compatibility check", + "origin", mock.AnythingOfType("string"), "buildVersion", "", "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockRTCDClient.On("GetSessions", callID).Return(nil, 500, fmt.Errorf("internal server error")) + + mockAPI.On("LogDebug", "failed to get sessions for call", + "origin", mock.AnythingOfType("string"), "err", "internal server error", "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockAPI.On("LogDebug", "unexpected status code from RTCD", + "origin", mock.AnythingOfType("string"), "code", 500, "callID", callID, "rtcdHost", "127.0.0.1").Once() + + err = p.cleanUpState() + require.NoError(t, err) + + // Verify call and sessions are retained. + calls, err := p.store.GetAllActiveCalls(db.GetCallOpts{}) + require.NoError(t, err) + require.NotEmpty(t, calls) + sessions, err := p.store.GetCallSessions(callID, db.GetCallSessionOpts{}) + require.NoError(t, err) + require.NotEmpty(t, sessions) + }) + + t.Run("version compatibility failure", func(t *testing.T) { + defer ResetTestStore(t, p.store) + + channelID := model.NewId() + postID := model.NewId() + userID := model.NewId() + callID := model.NewId() + + call := &public.Call{ + ID: callID, + CreateAt: time.Now().UnixMilli(), + ChannelID: channelID, + StartAt: time.Now().UnixMilli(), + PostID: postID, + ThreadID: model.NewId(), + OwnerID: userID, + Props: public.CallProps{ + RTCDHost: "127.0.0.1", + }, + } + err := p.store.CreateCall(call) + require.NoError(t, err) + + createPost(t, store, postID, userID, channelID) + + err = p.store.CreateCallSession(&public.CallSession{ + ID: "connA", + CallID: callID, + UserID: "userA", + JoinAt: time.Now().UnixMilli(), + }) + require.NoError(t, err) + + mockRTCDClient := &rtcdMocks.MockRTCDClient{} + defer mockRTCDClient.AssertExpectations(t) + + p.rtcdManager = &rtcdClientManager{ + ctx: &Plugin{ + MattermostPlugin: plugin.MattermostPlugin{ + API: mockAPI, + }, + }, + hosts: map[string]*rtcdHost{ + "127.0.0.1": { + client: mockRTCDClient, + }, + }, + } + + mockAPI.On("LogDebug", "cleaning up calls state", + "origin", mock.AnythingOfType("string")).Once() + + mockAPI.On("LogDebug", "creating cluster mutex for call", + "origin", mock.AnythingOfType("string"), "channelID", channelID).Once() + + mockAPI.On("KVSetWithOptions", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + mockMetrics.On("ObserveClusterMutexGrabTime", "mutex_call", mock.AnythingOfType("float64")) + mockMetrics.On("ObserveClusterMutexLockedTime", "mutex_call", mock.AnythingOfType("float64")) + + mockAPI.On("KVDelete", "mutex_call_"+channelID).Return(nil).Once() + + mockAPI.On("LogDebug", "RTCD host is set in call, checking...", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockAPI.On("LogDebug", "RTCD host found", + "origin", mock.AnythingOfType("string"), "callID", callID, "rtcdHost", "127.0.0.1").Once() + + mockRTCDClient.On("GetVersionInfo").Return(rtcd.VersionInfo{BuildVersion: "v0.17.0"}, nil) + + mockAPI.On("LogDebug", "RTCD host version is not compatible", + "origin", mock.AnythingOfType("string"), "err", "current version (v0.17.0) is lower than minimum supported version (v1.0.0)", "callID", callID, "rtcdHost", "127.0.0.1").Once() + + err = p.cleanUpState() + require.NoError(t, err) + + // Verify call and sessions are retained. + calls, err := p.store.GetAllActiveCalls(db.GetCallOpts{}) + require.NoError(t, err) + require.NotEmpty(t, calls) + sessions, err := p.store.GetCallSessions(callID, db.GetCallSessionOpts{}) + require.NoError(t, err) + require.NotEmpty(t, sessions) + }) + }) +}