Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MM-59070] Add telemetry events for host controls #831

Merged
merged 10 commits into from
Aug 13, 2024
3 changes: 3 additions & 0 deletions server/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type clientConfig struct {
HostControlsAllowed bool
// When set to true it enables using the AV1 codec to encode screen sharing tracks.
EnableAV1 *bool
// Let the server determine whether or not group calls are allowed (through license checks or otherwise)
GroupCallsAllowed bool
}

type adminClientConfig struct {
Expand Down Expand Up @@ -514,6 +516,7 @@ func (p *Plugin) getClientConfig(c *configuration) clientConfig {
SkuShortName: skuShortName,
HostControlsAllowed: p.licenseChecker.HostControlsAllowed(),
EnableAV1: c.EnableAV1,
GroupCallsAllowed: p.licenseChecker.GroupCallsAllowed(),
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/enterprise/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package enterprise

import (
"os"

"github.com/mattermost/mattermost-plugin-calls/server/license"

"github.com/mattermost/mattermost/server/public/model"
Expand Down Expand Up @@ -54,3 +56,7 @@ func (e *LicenseChecker) TranscriptionsAllowed() bool {
func (e *LicenseChecker) HostControlsAllowed() bool {
return e.isAtLeastE10Licensed()
}

func (e *LicenseChecker) GroupCallsAllowed() bool {
return e.isAtLeastE10Licensed() || os.Getenv("MM_CALLS_GROUP_CALLS_ALLOWED") == "true"
}
14 changes: 14 additions & 0 deletions server/host_controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (p *Plugin) changeHost(requesterID, channelID, newHostID string) error {
UserIDs: getUserIDsFromSessions(state.sessions),
})

p.track(evHostChangeHost, nil)

return nil
}

Expand Down Expand Up @@ -102,6 +104,8 @@ func (p *Plugin) muteSession(requesterID, channelID, sessionID string) error {
"session_id": sessionID,
}, &WebSocketBroadcast{UserID: ust.UserID, ReliableClusterSend: true})

p.track(evHostMuteParticipant, nil)

return nil
}

Expand Down Expand Up @@ -132,6 +136,8 @@ func (p *Plugin) muteOthers(requesterID, channelID string) error {
}
}

p.track(evHostMuteOthers, nil)

return nil
}

Expand Down Expand Up @@ -165,6 +171,8 @@ func (p *Plugin) screenOff(requesterID, channelID, sessionID string) error {
"session_id": sessionID,
}, &WebSocketBroadcast{UserID: ust.UserID, ReliableClusterSend: true})

p.track(evHostStopScreenshare, nil)

return nil
}

Expand Down Expand Up @@ -200,6 +208,8 @@ func (p *Plugin) lowerHand(requesterID, channelID, sessionID string) error {
"host_id": requesterID,
}, &WebSocketBroadcast{UserID: ust.UserID, ReliableClusterSend: true})

p.track(evHostLowerHand, nil)

return nil
}

Expand Down Expand Up @@ -237,6 +247,8 @@ func (p *Plugin) hostRemoveSession(requesterID, channelID, sessionID string) err
UserIDs: getUserIDsFromSessions(state.sessions),
})

p.track(evHostRemoveParticipant, nil)

go func() {
// Wait a few seconds for the client to end their session cleanly. If they don't (like for an
// older mobile client) then forcibly end it.
Expand Down Expand Up @@ -313,5 +325,7 @@ func (p *Plugin) hostEnd(requesterID, channelID string) error {
}
}()

p.track(evHostEndCall, nil)

return nil
}
4 changes: 4 additions & 0 deletions server/i18n/en.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
[
{
"id": "app.add_user_session.group_calls_not_allowed_error",
"translation": "Calls on unlicensed servers are only available in DMs."
},
{
"id": "app.admin.concurrent_sessions_warning.enterprise",
"translation": "We highly recommend [deploying the RTCD service](https://docs.mattermost.com/configure/calls-deployment.html#the-rtcd-service) to offload calls processing to a separate instance in order to maintain the performance, scalability, and reliability of your main Mattermost server."
Expand Down
49 changes: 39 additions & 10 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"errors"
"fmt"
"sync/atomic"
"time"
Expand All @@ -21,6 +22,10 @@ const (
msgChSize = 50
)

var (
errGroupCallsNotAllowed = fmt.Errorf("unlicensed servers only allow calls in DMs")
)

type session struct {
userID string
channelID string
Expand Down Expand Up @@ -77,7 +82,7 @@ func newUserSession(userID, channelID, connID, callID string, rtc bool) *session
}
}

func (p *Plugin) addUserSession(state *callState, callsEnabled *bool, userID, connID, channelID, jobID string) (retState *callState, retErr error) {
func (p *Plugin) addUserSession(state *callState, callsEnabled *bool, userID, connID, channelID, jobID string, ct model.ChannelType) (retState *callState, retErr error) {
defer func(start time.Time) {
p.metrics.ObserveAppHandlersTime("addUserSession", time.Since(start).Seconds())
}(time.Now())
Expand All @@ -95,8 +100,22 @@ func (p *Plugin) addUserSession(state *callState, callsEnabled *bool, userID, co
}()

// If there is an ongoing call, we can let anyone join.
if state == nil && !p.userCanStartOrJoin(userID, callsEnabled) {
return nil, fmt.Errorf("calls are not enabled")
if state == nil {
if err := p.userCanStartOrJoin(userID, callsEnabled, ct); err != nil {
if errors.Is(err, errGroupCallsNotAllowed) {
T := p.getTranslationFunc("")
// Sending a message for unsupported clients (e.g. older mobile apps).
p.API.SendEphemeralPost(
userID,
&model.Post{
UserId: p.getBotID(),
ChannelId: channelID,
Message: T("app.add_user_session.group_calls_not_allowed_error"),
},
)
}
return nil, err
}
}

if state == nil {
Expand Down Expand Up @@ -132,7 +151,7 @@ func (p *Plugin) addUserSession(state *callState, callsEnabled *bool, userID, co
return nil, fmt.Errorf("session is already connected")
}

// Check for cloud limits -- needs to be done here to prevent a race condition
// Check for license limits -- needs to be done here to prevent a race condition
if allowed, err := p.joinAllowed(state); !allowed {
if err != nil {
p.LogError("joinAllowed failed", "error", err.Error())
Expand Down Expand Up @@ -226,30 +245,40 @@ func (p *Plugin) addUserSession(state *callState, callsEnabled *bool, userID, co
return state, nil
}

func (p *Plugin) userCanStartOrJoin(userID string, enabled *bool) bool {
func (p *Plugin) userCanStartOrJoin(userID string, enabled *bool, channelType model.ChannelType) error {
// (since v1) Calls can only be started/joined in DMs in unlicensed servers.
// If calls are disabled, no-one can start or join.
// If explicitly enabled, everyone can start or join.
// If not explicitly enabled and default enabled, everyone can join or start
// otherwise (not explicitly enabled and not default enabled), only sysadmins can start
// TODO: look to see what logic we should lift to the joinCall fn

if channelType != model.ChannelTypeDirect && !p.licenseChecker.GroupCallsAllowed() {
return errGroupCallsNotAllowed
}

cfg := p.getConfiguration()

explicitlyEnabled := enabled != nil && *enabled
explicitlyDisabled := enabled != nil && !*enabled
defaultEnabled := cfg.DefaultEnabled != nil && *cfg.DefaultEnabled

if explicitlyDisabled {
return false
return fmt.Errorf("calls are disabled in the channel")
}
if explicitlyEnabled {
return true
return nil
}
if defaultEnabled {
return true
return nil
}

// must be !explicitlyEnabled and !defaultEnabled
return p.API.HasPermissionTo(userID, model.PermissionManageSystem)
if p.API.HasPermissionTo(userID, model.PermissionManageSystem) {
return nil
}

return fmt.Errorf("insufficient permissions")
}

func (p *Plugin) removeUserSession(state *callState, userID, originalConnID, connID, channelID string) error {
Expand Down Expand Up @@ -456,7 +485,7 @@ func (p *Plugin) removeUserSession(state *callState, userID, originalConnID, con
}

// JoinAllowed returns true if the user is allowed to join the call, taking into
// account cloud and configuration limits
// account license and configuration limits
func (p *Plugin) joinAllowed(state *callState) (bool, error) {
// Rules are:
// Cloud Starter: channels, dm/gm: limited to cfg.cloudStarterMaxParticipantsDefault
Expand Down
91 changes: 79 additions & 12 deletions server/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/mattermost/mattermost-plugin-calls/server/cluster"
"github.com/mattermost/mattermost-plugin-calls/server/enterprise"
"github.com/mattermost/mattermost-plugin-calls/server/public"

serverMocks "github.com/mattermost/mattermost-plugin-calls/server/mocks/github.com/mattermost/mattermost-plugin-calls/server/interfaces"
Expand Down Expand Up @@ -38,35 +39,46 @@ func TestAddUserSession(t *testing.T) {
sessions: map[string]*session{},
}

p.licenseChecker = enterprise.NewLicenseChecker(p.API)

store, tearDown := NewTestStore(t)
t.Cleanup(tearDown)
p.store = store

mockAPI.On("LogDebug", mock.Anything, mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything, mock.Anything)
mockAPI.On("KVSetWithOptions", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
mockMetrics.On("ObserveClusterMutexGrabTime", "mutex_call", mock.AnythingOfType("float64"))
mockMetrics.On("ObserveClusterMutexLockedTime", "mutex_call", mock.AnythingOfType("float64"))
mockMetrics.On("ObserveAppHandlersTime", mock.AnythingOfType("string"), mock.AnythingOfType("float64"))
mockMetrics.On("IncStoreOp", "KVGet")
mockMetrics.On("IncStoreOp", "KVSet")

t.Run("not enabled", func(t *testing.T) {
defer mockAPI.AssertExpectations(t)
defer mockMetrics.AssertExpectations(t)

mockAPI.On("GetConfig").Return(&model.Config{}, nil).Once()
mockAPI.On("GetLicense").Return(&model.License{
SkuShortName: "professional",
}, nil).Once()

var cs *callState
state, err := p.addUserSession(cs, model.NewBool(false), "userID", "connID", "channelID", "")
state, err := p.addUserSession(cs, model.NewBool(false), "userID", "connID", "channelID", "", model.ChannelTypeOpen)
require.Nil(t, state)
require.EqualError(t, err, "calls are not enabled")
require.EqualError(t, err, "calls are disabled in the channel")
})

t.Run("consistent state after error", func(t *testing.T) {
defer mockAPI.AssertExpectations(t)
defer mockMetrics.AssertExpectations(t)
defer ResetTestStore(t, p.store)

mockAPI.On("GetConfig").Return(&model.Config{}, nil).Once()
mockAPI.On("GetLicense").Return(&model.License{
SkuShortName: "professional",
}, nil).Once()

// We'd be starting a new call
mockMetrics.On("IncWebSocketEvent", "out", wsEventCallHostChanged).Once()
mockAPI.On("PublishWebSocketEvent", wsEventCallHostChanged, mock.Anything,
&model.WebsocketBroadcast{UserId: "userA", ChannelId: "channelID", ReliableClusterSend: true}).Once()

// Start call
retState, err := p.addUserSession(nil, model.NewBool(true), "userA", "connA", "channelID", "")
retState, err := p.addUserSession(nil, model.NewBool(true), "userA", "connA", "channelID", "", model.ChannelTypeOpen)
require.NoError(t, err)
require.NotNil(t, retState)
require.Equal(t, map[string]struct{}{"userA": {}}, retState.Props.Participants)
Expand All @@ -82,7 +94,7 @@ func TestAddUserSession(t *testing.T) {
})
require.NoError(t, err)

retState2, err := p.addUserSession(retState, model.NewBool(true), "userB", "connB", "channelID", "")
retState2, err := p.addUserSession(retState, model.NewBool(true), "userB", "connB", "channelID", "", model.ChannelTypeOpen)
require.NotNil(t, retState2)
require.EqualError(t, err, "failed to create call session: failed to run query: pq: duplicate key value violates unique constraint \"calls_sessions_pkey\"")

Expand All @@ -93,4 +105,59 @@ func TestAddUserSession(t *testing.T) {

require.Equal(t, retState, retState2)
})

t.Run("allow calls in DMs only when unlicensed", func(t *testing.T) {
defer mockAPI.AssertExpectations(t)
defer mockMetrics.AssertExpectations(t)
defer ResetTestStore(t, p.store)

mockAPI.On("GetConfig").Return(&model.Config{}, nil).Times(6)
mockAPI.On("GetLicense").Return(&model.License{}, nil).Times(3)

t.Run("public channel", func(t *testing.T) {
mockAPI.On("SendEphemeralPost", "userA", &model.Post{
ChannelId: "channelID",
Message: "app.add_user_session.group_calls_not_allowed_error",
}).Return(nil).Once()

retState, err := p.addUserSession(nil, model.NewBool(true), "userA", "connA", "channelID", "", model.ChannelTypeOpen)
require.Equal(t, errGroupCallsNotAllowed, err)
require.Nil(t, retState)
})

t.Run("private channel", func(t *testing.T) {
mockAPI.On("SendEphemeralPost", "userA", &model.Post{
ChannelId: "channelID",
Message: "app.add_user_session.group_calls_not_allowed_error",
}).Return(nil).Once()

retState, err := p.addUserSession(nil, model.NewBool(true), "userA", "connA", "channelID", "", model.ChannelTypePrivate)
require.Equal(t, errGroupCallsNotAllowed, err)
require.Nil(t, retState)
})

t.Run("group channel", func(t *testing.T) {
mockAPI.On("SendEphemeralPost", "userA", &model.Post{
ChannelId: "channelID",
Message: "app.add_user_session.group_calls_not_allowed_error",
}).Return(nil).Once()

retState, err := p.addUserSession(nil, model.NewBool(true), "userA", "connA", "channelID", "", model.ChannelTypeGroup)
require.Equal(t, errGroupCallsNotAllowed, err)
require.Nil(t, retState)
})

t.Run("direct channel", func(t *testing.T) {
mockMetrics.On("IncWebSocketEvent", "out", wsEventCallHostChanged).Once()
mockAPI.On("PublishWebSocketEvent", wsEventCallHostChanged, mock.Anything,
&model.WebsocketBroadcast{UserId: "userA", ChannelId: "channelID", ReliableClusterSend: true}).Once()

retState, err := p.addUserSession(nil, model.NewBool(true), "userA", "connA", "channelID", "", model.ChannelTypeDirect)
require.NoError(t, err)
require.NotNil(t, retState)
require.Equal(t, map[string]struct{}{"userA": {}}, retState.Props.Participants)
require.Len(t, retState.sessions, 1)
require.NotNil(t, retState.sessions["connA"])
})
})
}
Loading
Loading