Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mos status flow rework #1356

Draft
wants to merge 3 commits into
base: release52
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .github/workflows/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -490,16 +490,19 @@ jobs:
- node-version: 22.x
package-name: job-worker
send-coverage: true
# No tests for the gateways yet
# No tests for some gateways yet
# - node-version: 22.x
# package-name: playout-gateway
# - node-version: 22.x
# package-name: mos-gateway
# send-coverage: true
- node-version: 22.x
package-name: mos-gateway
send-coverage: true
- node-version: 22.x
package-name: live-status-gateway
send-coverage: true
- node-version: 22.x
package-name: webui
send-coverage: true
# manual meteor-lib as it only needs a couple of versions
- node-version: 22.x
package-name: meteor-lib
Expand Down
3 changes: 2 additions & 1 deletion meteor/server/collections/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ export interface AsyncOnlyReadOnlyMongoCollection<DBInterface extends { _id: Pro
observeChanges(
selector: MongoQuery<DBInterface> | DBInterface['_id'],
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
options?: FindOptions<DBInterface>
findOptions?: FindOptions<DBInterface>,
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
): Promise<Meteor.LiveQueryHandle>

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
async observeChanges(
selector: MongoQuery<DBInterface> | DBInterface['_id'],
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
options?: FindOptions<DBInterface>
findOptions?: FindOptions<DBInterface>,
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
): Promise<Meteor.LiveQueryHandle> {
const span = profiler.startSpan(`MongoCollection.${this.name}.observeChanges`)
if (span) {
Expand All @@ -152,8 +153,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
}
try {
const res = await this._collection
.find((selector ?? {}) as any, options as any)
.observeChangesAsync(callbacks)
.find((selector ?? {}) as any, findOptions as any)
.observeChangesAsync(callbacks, callbackOptions)
if (span) span.end()
return res
} catch (e) {
Expand Down
1 change: 1 addition & 0 deletions meteor/server/publications/_publications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import './buckets'
import './blueprintUpgradeStatus/publication'
import './ingestStatus/publication'

Check warning on line 6 in meteor/server/publications/_publications.ts

View check run for this annotation

Codecov / codecov/patch

meteor/server/publications/_publications.ts#L6

Added line #L6 was not covered by tests
import './packageManager/expectedPackages/publication'
import './packageManager/packageContainers'
import './packageManager/playoutContext'
Expand Down
134 changes: 134 additions & 0 deletions meteor/server/publications/ingestStatus/createIngestRundownStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import type { RundownId, PartId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { NrcsIngestCacheType } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache'
import type { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import {
IngestRundownStatus,
IngestPartPlaybackStatus,
IngestRundownActiveStatus,
IngestPartStatus,
} from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
import type { ReadonlyDeep } from 'type-fest'
import _ from 'underscore'
import type { ContentCache, PartFields, PartInstanceFields, PlaylistFields } from './reactiveContentCache'
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection'
import { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances'
import { IngestPart } from '@sofie-automation/blueprints-integration'
import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'

export function createIngestRundownStatus(
cache: ReadonlyDeep<ContentCache>,
rundownId: RundownId
): IngestRundownStatus | null {
const rundown = cache.Rundowns.findOne(rundownId)
if (!rundown) return null

const newDoc: IngestRundownStatus = {
_id: rundownId,
externalId: rundown.externalId,

active: IngestRundownActiveStatus.INACTIVE,

segments: [],
}

const playlist = cache.Playlists.findOne({
_id: rundown.playlistId,
activationId: { $exists: true },
})

if (playlist) {
newDoc.active = playlist.rehearsal ? IngestRundownActiveStatus.REHEARSAL : IngestRundownActiveStatus.ACTIVE
}

// Find the most important part instance for each part
const partInstanceMap = findPartInstanceForEachPart(playlist, rundownId, cache.PartInstances)

const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch()
for (const nrcsSegment of nrcsSegments) {
const nrcsParts = cache.NrcsIngestData.find({
rundownId,
segmentId: nrcsSegment.segmentId,
type: NrcsIngestCacheType.PART,
}).fetch()

newDoc.segments.push({
externalId: nrcsSegment.data.externalId,
parts: _.compact(
nrcsParts.map((nrcsPart) => {
if (!nrcsPart.partId || !nrcsPart.segmentId) return null

const part = cache.Parts.findOne({ _id: nrcsPart.partId, rundownId })
const partInstance = partInstanceMap.get(nrcsPart.partId)

return createIngestPartStatus(playlist, partInstance, part, nrcsPart.data as IngestPart)
})
),
})
}

return newDoc
}

function findPartInstanceForEachPart(
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
rundownId: RundownId,
partInstancesCache: ReadonlyDeep<ReactiveCacheCollection<Pick<PartInstance, PartInstanceFields>>>
) {
const partInstanceMap = new Map<PartId, Pick<DBPartInstance, PartInstanceFields>>()
if (!playlist) return partInstanceMap

for (const partInstance of partInstancesCache.find({}).fetch()) {
if (partInstance.rundownId !== rundownId) continue
// Ignore the next partinstance
if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue

// The current part instance is the most important
if (partInstance._id === playlist.currentPartInfo?.partInstanceId) {
partInstanceMap.set(partInstance.part._id, partInstance)
continue
}

// Take the part with the highest takeCount
const existingEntry = partInstanceMap.get(partInstance.part._id)
if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) {
partInstanceMap.set(partInstance.part._id, partInstance)
}
}

return partInstanceMap
}

function createIngestPartStatus(
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
partInstance: Pick<PartInstance, PartInstanceFields> | undefined,
part: Pick<DBPart, PartFields> | undefined,
ingestPart: IngestPart
): IngestPartStatus {
// Determine the playback status from the PartInstance
let playbackStatus = IngestPartPlaybackStatus.UNKNOWN
if (playlist && partInstance && partInstance.part.shouldNotifyCurrentPlayingPart) {
const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id

if (isCurrentPartInstance) {
// If the current, it is playing
playbackStatus = IngestPartPlaybackStatus.PLAY
} else {
// If not the current, but has been played, it is stopped
playbackStatus = IngestPartPlaybackStatus.STOP
}
}

// Determine the ready status from the PartInstance or Part
const isReady = partInstance ? partInstance.part.ingestNotifyPartReady : part?.ingestNotifyPartReady
const itemsReady = partInstance ? partInstance.part.ingestNotifyItemsReady : part?.ingestNotifyItemsReady

return {
externalId: ingestPart.externalId,

isReady: isReady ?? null,
itemsReady: itemsReady ?? [],

playbackStatus,
}
}

Check warning on line 134 in meteor/server/publications/ingestStatus/createIngestRundownStatus.ts

View check run for this annotation

Codecov / codecov/patch

meteor/server/publications/ingestStatus/createIngestRundownStatus.ts#L2-L134

Added lines #L2 - L134 were not covered by tests
195 changes: 195 additions & 0 deletions meteor/server/publications/ingestStatus/publication.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import { PeripheralDeviceId, RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { ReadonlyDeep } from 'type-fest'
import {
CustomPublishCollection,
meteorCustomPublish,
setUpCollectionOptimizedObserver,
SetupObserversResult,
TriggerUpdate,
} from '../../lib/customPublication'
import { logger } from '../../logging'
import { ContentCache, createReactiveContentCache } from './reactiveContentCache'
import { RundownsObserver } from '../lib/rundownsObserver'
import { RundownContentObserver } from './rundownContentObserver'
import {
PeripheralDevicePubSub,
PeripheralDevicePubSubCollectionsNames,
} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice'
import { checkAccessAndGetPeripheralDevice } from '../../security/check'
import { check } from '../../lib/check'
import { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
import { createIngestRundownStatus } from './createIngestRundownStatus'

interface IngestRundownStatusArgs {
readonly deviceId: PeripheralDeviceId
}

export interface IngestRundownStatusState {
contentCache: ReadonlyDeep<ContentCache>
}

interface IngestRundownStatusUpdateProps {
newCache: ContentCache

invalidateRundownIds: RundownId[]
invalidatePlaylistIds: RundownPlaylistId[]
}

async function setupIngestRundownStatusPublicationObservers(
args: ReadonlyDeep<IngestRundownStatusArgs>,
triggerUpdate: TriggerUpdate<IngestRundownStatusUpdateProps>
): Promise<SetupObserversResult> {
const rundownsObserver = await RundownsObserver.createForPeripheralDevice(args.deviceId, async (rundownIds) => {
logger.silly(`Creating new RundownContentObserver`, rundownIds)

// TODO - can this be done cheaper?
const cache = createReactiveContentCache(rundownIds)

// Push update
triggerUpdate({ newCache: cache })

const contentObserver = await RundownContentObserver.create(rundownIds, cache)

const innerQueries = [
cache.Playlists.find({}).observeChanges(
{
added: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
changed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
removed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
},
{ nonMutatingCallbacks: true }
),
cache.Rundowns.find({}).observeChanges(
{
added: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
changed: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
removed: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
},
{ nonMutatingCallbacks: true }
),
cache.Parts.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
cache.PartInstances.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
cache.NrcsIngestData.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
]

return () => {
contentObserver.dispose()

for (const query of innerQueries) {
query.stop()
}
}
})

// Set up observers:
return [rundownsObserver]
}

async function manipulateIngestRundownStatusPublicationData(
_args: IngestRundownStatusArgs,
state: Partial<IngestRundownStatusState>,
collection: CustomPublishCollection<IngestRundownStatus>,
updateProps: Partial<ReadonlyDeep<IngestRundownStatusUpdateProps>> | undefined
): Promise<void> {
// Prepare data for publication:

if (updateProps?.newCache !== undefined) {
state.contentCache = updateProps.newCache ?? undefined
}

if (!state.contentCache) {
// Remove all the notes
collection.remove(null)

return
}

const updateAll = !updateProps || !!updateProps?.newCache
if (updateAll) {
// Remove all the notes
collection.remove(null)

const knownRundownIds = new Set(state.contentCache.RundownIds)

for (const rundownId of knownRundownIds) {
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
if (newDoc) collection.replace(newDoc)
}
} else {
const regenerateForRundownIds = new Set(updateProps.invalidateRundownIds)

// Include anything where the playlist has changed
if (updateProps.invalidatePlaylistIds && updateProps.invalidatePlaylistIds.length > 0) {
const rundownsToUpdate = state.contentCache.Rundowns.find(
{
playlistId: { $in: updateProps.invalidatePlaylistIds },
},
{
projection: {
_id: 1,
},
}
).fetch() as Pick<DBRundown, '_id'>[]

for (const rundown of rundownsToUpdate) {
regenerateForRundownIds.add(rundown._id)
}
}

for (const rundownId of regenerateForRundownIds) {
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
if (newDoc) {
collection.replace(newDoc)
} else {
collection.remove(rundownId)
}
}
}
}

meteorCustomPublish(
PeripheralDevicePubSub.ingestDeviceRundownStatus,
PeripheralDevicePubSubCollectionsNames.ingestRundownStatus,
async function (pub, deviceId: PeripheralDeviceId, token: string | undefined) {
check(deviceId, String)

await checkAccessAndGetPeripheralDevice(deviceId, token, this)

await setUpCollectionOptimizedObserver<
IngestRundownStatus,
IngestRundownStatusArgs,
IngestRundownStatusState,
IngestRundownStatusUpdateProps
>(
`pub_${PeripheralDevicePubSub.ingestDeviceRundownStatus}_${deviceId}`,
{ deviceId },
setupIngestRundownStatusPublicationObservers,
manipulateIngestRundownStatusPublicationData,
pub,
100
)
}
)

Check warning on line 195 in meteor/server/publications/ingestStatus/publication.ts

View check run for this annotation

Codecov / codecov/patch

meteor/server/publications/ingestStatus/publication.ts#L2-L195

Added lines #L2 - L195 were not covered by tests
Loading
Loading