Skip to content

Commit

Permalink
Implement Chocobozzz suggestions
Browse files Browse the repository at this point in the history
#4290 (comment)

The remarks in question:
    Try to use objectStorage prefix instead of s3 prefix for your function/variables/config names
    Prefer to use a tree for the config: s3.streaming_playlists_bucket -> object_storage.streaming_playlists.bucket
    Use uppercase for config: S3.STREAMING_PLAYLISTS_BUCKETINFO.bucket -> OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET (maybe BUCKET_NAME instead of BUCKET)
    I suggest to rename moveJobsRunning to pendingMovingJobs (or better, create a dedicated videoJobInfo table with a pendingMove & videoId columns so we could also use this table to track pending transcoding jobs)
    https://github.com/Chocobozzz/PeerTube/pull/4290/files#diff-3e26d41ca4bda1de8e1747af70ca2af642abcc1e9e0bfb94239ff2165acfbde5R19 uses a string instead of an integer
    I think we should store the origin object storage URL in fileUrl, without base_url injection. Instead, inject the base_url at "runtime" so admins can easily change this configuration without running a script to update DB URLs
  • Loading branch information
pingiun committed Aug 3, 2021
1 parent eb3ccd2 commit 105cc1f
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 88 deletions.
19 changes: 11 additions & 8 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,21 @@ storage:
# If not, peertube will fallback to the default fil
client_overrides: 'storage/client-overrides/'

s3:
object_storage:
enabled: false
# Will always use https with default URL generation (see below)
endpoint: 's3.amazonaws.com'
streaming_playlists_bucket: ''
# Allows setting all buckets to the same value but with a different prefix
streaming_playlists_prefix: ''
streaming_playlists_base_url: ''
streaming_playlists:
bucket_name: ''
# Allows setting all buckets to the same value but with a different prefix
prefix: ''
# Base url for object URL generation, path in bucket is appended to this url
base_url: ''
# Same settings but for webtorrent videos
videos_bucket: ''
videos_prefix: ''
videos_base_url: ''
videos:
bucket_name: ''
prefix: ''
base_url: ''

log:
level: 'info' # 'debug' | 'info' | 'warn' | 'error'
Expand Down
3 changes: 1 addition & 2 deletions server/controllers/api/videos/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,10 @@ async function addVideo (options: {
createTorrentFederate(video, videoFile)
.then(() => {
if (video.state !== VideoState.TO_TRANSCODE) {
return
} else {
// Video will be published before move is complete which may cause some video connections to drop
// But it's recommended to enable transcoding anyway, so this is the tradeoff
addMoveToObjectStorageJob(video, videoFile)
return
}

return addOptimizeOrMergeAudioJob(videoCreated, videoFile, user)
Expand Down
12 changes: 6 additions & 6 deletions server/initializers/checker-after-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,16 @@ function checkConfig () {
}

// Object storage
if (CONFIG.S3.ENABLED === true) {
if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && !CONFIG.S3.VIDEOS_BUCKETINFO.bucket) {
if (CONFIG.OBJECT_STORAGE.ENABLED === true) {
if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && !CONFIG.OBJECT_STORAGE.VIDEOS.BUCKET_NAME) {
return 'videos_bucket should be set when object storage support is enabled.'
}
if (CONFIG.TRANSCODING.HLS.ENABLED && !CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO.bucket) {
if (CONFIG.TRANSCODING.HLS.ENABLED && !CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET_NAME) {
return 'streaming_playlists_bucket should be set when object storage support is enabled.'
}
if (CONFIG.S3.VIDEOS_BUCKETINFO.bucket === CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO.bucket &&
CONFIG.S3.VIDEOS_BUCKETINFO.prefix === CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO.prefix) {
if (CONFIG.S3.VIDEOS_BUCKETINFO.prefix === '') {
if (CONFIG.OBJECT_STORAGE.VIDEOS.BUCKET_NAME === CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET_NAME &&
CONFIG.OBJECT_STORAGE.VIDEOS.PREFIX === CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS.PREFIX) {
if (CONFIG.OBJECT_STORAGE.VIDEOS.PREFIX === '') {
return 'Object storage bucket prefixes should be set when the same bucket is used for both types of video.'
} else {
return 'Object storage bucket prefixes should be set to different values when the same bucket is used for both types of video.'
Expand Down
22 changes: 11 additions & 11 deletions server/initializers/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,18 @@ const CONFIG = {
PLUGINS_DIR: buildPath(config.get<string>('storage.plugins')),
CLIENT_OVERRIDES_DIR: buildPath(config.get<string>('storage.client_overrides'))
},
S3: {
ENABLED: config.get<boolean>('s3.enabled'),
ENDPOINT: config.get<string>('s3.endpoint'),
VIDEOS_BUCKETINFO: {
bucket: config.get<string>('s3.videos_bucket'),
prefix: config.get<string>('s3.videos_prefix'),
base_url: config.get<string>('s3.videos_base_url')
OBJECT_STORAGE: {
ENABLED: config.get<boolean>('object_storage.enabled'),
ENDPOINT: config.get<string>('object_storage.endpoint'),
VIDEOS: {
BUCKET_NAME: config.get<string>('object_storage.videos.bucket_name'),
PREFIX: config.get<string>('object_storage.videos.prefix'),
BASE_URL: config.get<string>('object_storage.videos.base_url')
},
STREAMING_PLAYLISTS_BUCKETINFO: {
bucket: config.get<string>('s3.streaming_playlists_bucket'),
prefix: config.get<string>('s3.streaming_playlists_prefix'),
base_url: config.get<string>('s3.streaming_playlists_base_url')
STREAMING_PLAYLISTS: {
BUCKET_NAME: config.get<string>('object_storage.streaming_playlists.bucket_name'),
PREFIX: config.get<string>('object_storage.streaming_playlists.prefix'),
BASE_URL: config.get<string>('object_storage.streaming_playlists.base_url')
}
},
WEBSERVER: {
Expand Down
4 changes: 3 additions & 1 deletion server/initializers/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import { VideoTagModel } from '../models/video/video-tag'
import { VideoViewModel } from '../models/video/video-view'
import { CONFIG } from './config'
import { ActorCustomPageModel } from '@server/models/account/actor-custom-page'
import { VideoJobInfoModel } from '@server/models/video/video-job-info'

require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string

Expand Down Expand Up @@ -143,7 +144,8 @@ async function initDatabaseModels (silent: boolean) {
TrackerModel,
VideoTrackerModel,
PluginModel,
ActorCustomPageModel
ActorCustomPageModel,
VideoJobInfoModel
])

// Check extensions exist in the database
Expand Down
18 changes: 15 additions & 3 deletions server/initializers/migrations/0660-object-storage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { VideoStorageType } from '@server/types/models'
import * as Sequelize from 'sequelize'

async function up (utils: {
Expand All @@ -7,7 +8,18 @@ async function up (utils: {
db: any
}): Promise<void> {
{
await utils.queryInterface.addColumn('video', 'moveJobsRunning', { type: Sequelize.INTEGER, allowNull: false, defaultValue: 0 })
const query = `
CREATE TABLE IF NOT EXISTS "videoJobInfo" (
"id" serial,
"pendingMove" INTEGER NOT NULL,
"videoUUID" uuid UNIQUE NOT NULL REFERENCES "video" ("uuid") ON DELETE CASCADE ON UPDATE CASCADE,
"createdAt" timestamp WITH time zone NOT NULL,
"updatedAt" timestamp WITH time zone NOT NULL,
PRIMARY KEY ("id")
);
`

await utils.sequelize.query(query)
}

{
Expand All @@ -16,7 +28,7 @@ async function up (utils: {

{
await utils.sequelize.query(
`UPDATE "videoFile" SET "storage" = 'local'`
`UPDATE "videoFile" SET "storage" = ${VideoStorageType.LOCAL}`
)
}

Expand All @@ -26,7 +38,7 @@ async function up (utils: {

{
await utils.sequelize.query(
`UPDATE "videoStreamingPlaylist" SET "storage" = 'local'`
`UPDATE "videoStreamingPlaylist" SET "storage" = ${VideoStorageType.LOCAL}`
)
}
}
Expand Down
26 changes: 14 additions & 12 deletions server/lib/job-queue/handlers/move-to-object-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ import * as Bull from 'bull'
import { logger } from '@server/helpers/logger'
import { MoveObjectStoragePayload } from '../../../../shared'
import { VideoModel } from '@server/models/video/video'
import { generateUrl, storeObject } from '@server/lib/object-storage'
import { generateObjectStoreUrl, storeObject } from '@server/lib/object-storage'
import { CONFIG } from '@server/initializers/config'
import { join } from 'path'
import { HLS_STREAMING_PLAYLIST_DIRECTORY } from '@server/initializers/constants'
import { getHlsResolutionPlaylistFilename } from '@server/lib/video-paths'
import { MVideoWithAllFiles, VideoStorageType } from '@server/types/models'
import { remove } from 'fs-extra'
import { publishAndFederateIfNeeded } from '@server/lib/video'
import { VideoJobInfoModel } from '@server/models/video/video-job-info'

export async function processMoveToObjectStorage (job: Bull.Job) {
const payload = job.data as MoveObjectStoragePayload
Expand All @@ -30,8 +31,9 @@ export async function processMoveToObjectStorage (job: Bull.Job) {
await moveHLSFiles(video, payload.videoFileId)
}

await video.decrement('moveJobsRunning')
if (video.moveJobsRunning === 0) {
const pendingMove = await VideoJobInfoModel.decreasePendingMove(video.uuid)
if (pendingMove === 0) {
logger.info("Running cleanup after moving files to object storage (video %s in job %d)", video.uuid, job.id)
await doAfterLastJob(video)
}

Expand All @@ -46,11 +48,11 @@ async function moveWebTorrentFiles (video: MVideoWithAllFiles, videoFileId?: num
const filename = file.filename
await storeObject(
{ filename, path: join(CONFIG.STORAGE.VIDEOS_DIR, file.filename) },
CONFIG.S3.VIDEOS_BUCKETINFO
CONFIG.OBJECT_STORAGE.VIDEOS
)

file.storage = VideoStorageType.OBJECT_STORAGE
file.fileUrl = generateUrl(filename, CONFIG.S3.VIDEOS_BUCKETINFO)
file.fileUrl = generateObjectStoreUrl(filename, CONFIG.OBJECT_STORAGE.VIDEOS)
await file.save()
}
}
Expand All @@ -70,7 +72,7 @@ async function moveHLSFiles (video: MVideoWithAllFiles, videoFileId: number) {
filename: join(playlist.getStringType(), video.uuid, playlistFileName),
path: join(baseHlsDirectory, playlistFileName)
},
CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO
CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
)

// Resolution fragmented file
Expand All @@ -80,12 +82,12 @@ async function moveHLSFiles (video: MVideoWithAllFiles, videoFileId: number) {
filename,
path: join(baseHlsDirectory, file.filename)
},
CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO
CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
)

// Signals that the video file + playlist file were uploaded
file.storage = VideoStorageType.OBJECT_STORAGE
file.fileUrl = generateUrl(filename, CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO)
file.fileUrl = generateObjectStoreUrl(filename, CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
await file.save()
}
}
Expand All @@ -101,7 +103,7 @@ async function doAfterLastJob (video: MVideoWithAllFiles) {
filename: masterPlaylistFilename,
path: join(baseHlsDirectory, playlist.playlistFilename)
},
CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO
CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
)

// Sha256 segments file
Expand All @@ -111,11 +113,11 @@ async function doAfterLastJob (video: MVideoWithAllFiles) {
filename: segmentsFileName,
path: join(baseHlsDirectory, playlist.segmentsSha256Filename)
},
CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO
CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
)

playlist.playlistUrl = generateUrl(masterPlaylistFilename, CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO)
playlist.segmentsSha256Url = generateUrl(segmentsFileName, CONFIG.S3.STREAMING_PLAYLISTS_BUCKETINFO)
playlist.playlistUrl = generateObjectStoreUrl(masterPlaylistFilename, CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
playlist.segmentsSha256Url = generateObjectStoreUrl(segmentsFileName, CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
playlist.storage = VideoStorageType.OBJECT_STORAGE
await playlist.save()
}
Expand Down
21 changes: 12 additions & 9 deletions server/lib/job-queue/handlers/video-transcoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, pay
}

// Publishing will be done by mvoe-to-object-storage if enabled
if (!CONFIG.S3.ENABLED) {
if (!CONFIG.OBJECT_STORAGE.ENABLED) {
await publishAndFederateIfNeeded(video)
}
}
Expand Down Expand Up @@ -182,15 +182,18 @@ async function onVideoFileOptimizer (

const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode, 'webtorrent')

if (!hasHls && !hasNewResolutions) {
// No transcoding to do, it's now published
videoPublished = await videoDatabase.publishIfNeededAndSave(undefined)
}
// Publishing will be done after the move-to-object-storage-job if enabled
if (!CONFIG.OBJECT_STORAGE.ENABLED) {
if (!hasHls && !hasNewResolutions) {
// No transcoding to do, it's now published
videoPublished = await videoDatabase.publishIfNeededAndSave(undefined)
}

await federateVideoIfNeeded(videoDatabase, payload.isNewVideo)
await federateVideoIfNeeded(videoDatabase, payload.isNewVideo)

if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
}
}

async function onNewWebTorrentFileResolution (
Expand All @@ -199,7 +202,7 @@ async function onNewWebTorrentFileResolution (
payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
) {
// Publishing will be done by mvoe-to-object-storage if enabled
if (!CONFIG.S3.ENABLED) {
if (!CONFIG.OBJECT_STORAGE.ENABLED) {
await publishAndFederateIfNeeded(video)
}

Expand Down
8 changes: 5 additions & 3 deletions server/lib/job-queue/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViews } from './handlers/video-views'
import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
import { VideoModel } from '@server/models/video/video'
import { VideoJobInfoModel } from '@server/models/video/video-job-info'

type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
Expand Down Expand Up @@ -164,8 +164,10 @@ class JobQueue {
// This value is decreased when the move job is finished in ./handlers/move-to-object-storage.ts
// Because every transcode job starts a move job for the transcoded file, the value will only reach
// 0 again when all transcode jobs are finished and the last move job is running
VideoModel.increment('moveJobsRunning', { where: { uuid: obj.payload.videoUUID } })
.catch(err => logger.error('Cannot increase moveJobsRunning.', { err }))
// If object storage support is not enabled all the pendingMove values stay at the amount of transcode
// jobs that were started for that video.
VideoJobInfoModel.increaseOrCreatePendingMove(obj.payload.videoUUID)
.catch(err => logger.error('Cannot increase pendingMove.', { err }))
}

const jobArgs: Bull.JobOptions = {
Expand Down
Loading

0 comments on commit 105cc1f

Please sign in to comment.