diff --git a/samples/Foundatio.HostingSample/Jobs/EvenMinutesJob.cs b/samples/Foundatio.HostingSample/Jobs/EvenMinutesJob.cs deleted file mode 100644 index 3f3958e6..00000000 --- a/samples/Foundatio.HostingSample/Jobs/EvenMinutesJob.cs +++ /dev/null @@ -1,27 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Foundatio.Jobs; -using Microsoft.Extensions.Logging; - -namespace Foundatio.HostingSample; - -public class EvenMinutesJob : IJob -{ - private readonly ILogger _logger; - - public EvenMinutesJob(ILoggerFactory loggerFactory) - { - _logger = loggerFactory.CreateLogger(); - } - - public async Task RunAsync(CancellationToken cancellationToken = default) - { - if (_logger.IsEnabled(LogLevel.Information)) - _logger.LogInformation("EvenMinuteJob Run Thread={ManagedThreadId}", Thread.CurrentThread.ManagedThreadId); - - await Task.Delay(TimeSpan.FromSeconds(5)); - - return JobResult.Success; - } -} diff --git a/samples/Foundatio.HostingSample/Program.cs b/samples/Foundatio.HostingSample/Program.cs index a1e67598..97fb2da6 100644 --- a/samples/Foundatio.HostingSample/Program.cs +++ b/samples/Foundatio.HostingSample/Program.cs @@ -1,9 +1,11 @@ using System; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Foundatio.Extensions.Hosting.Jobs; using Foundatio.Extensions.Hosting.Startup; +using Foundatio.Jobs; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; @@ -90,7 +92,14 @@ public static IHostBuilder CreateHostBuilder(string[] args) s.AddCronJob("* * * * *"); if (evenMinutes) - s.AddCronJob("*/2 * * * *"); + s.AddCronJob("*/2 * * * *", async sp => + { + var logger = sp.GetRequiredService>(); + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EvenMinuteJob Run Thread={ManagedThreadId}", Thread.CurrentThread.ManagedThreadId); + + await Task.Delay(TimeSpan.FromSeconds(5)); + }); if (sample1) s.AddJob(sp => new Sample1Job(sp.GetRequiredService()), o => o.ApplyDefaults().WaitForStartupActions(true).InitialDelay(TimeSpan.FromSeconds(4))); diff --git a/samples/Foundatio.HostingSample/Properties/launchSettings.json b/samples/Foundatio.HostingSample/Properties/launchSettings.json index 8b955c7e..ae82a820 100644 --- a/samples/Foundatio.HostingSample/Properties/launchSettings.json +++ b/samples/Foundatio.HostingSample/Properties/launchSettings.json @@ -2,7 +2,7 @@ "profiles": { "Foundatio.HostingSample": { "commandName": "Project", - "commandLineArgs": "sample1 sample2" + "commandLineArgs": "all" } } } \ No newline at end of file diff --git a/samples/Foundatio.HostingSample/Startup/MyStartupAction.cs b/samples/Foundatio.HostingSample/Startup/MyStartupAction.cs index 12c89137..88d39589 100644 --- a/samples/Foundatio.HostingSample/Startup/MyStartupAction.cs +++ b/samples/Foundatio.HostingSample/Startup/MyStartupAction.cs @@ -1,5 +1,6 @@ using System.Threading; using System.Threading.Tasks; +using Foundatio.Extensions.Hosting.Jobs; using Foundatio.Extensions.Hosting.Startup; using Microsoft.Extensions.Logging; @@ -7,10 +8,12 @@ namespace Foundatio.HostingSample; public class MyStartupAction : IStartupAction { + private readonly ScheduledJobManager _scheduledJobManager; private readonly ILogger _logger; - public MyStartupAction(ILogger logger) + public MyStartupAction(ScheduledJobManager scheduledJobManager, ILogger logger) { + _scheduledJobManager = scheduledJobManager; _logger = logger; } @@ -21,5 +24,12 @@ public async Task RunAsync(CancellationToken cancellationToken = default) _logger.LogTrace("MyStartupAction Run Thread={ManagedThreadId}", Thread.CurrentThread.ManagedThreadId); await Task.Delay(500); } + + _scheduledJobManager.AddOrUpdate("MyJob", "* * * * *", async () => + { + _logger.LogInformation("Running MyJob"); + await Task.Delay(1000); + _logger.LogInformation("MyJob Complete"); + }); } } diff --git a/src/Foundatio.Extensions.Hosting/Jobs/DynamicJob.cs b/src/Foundatio.Extensions.Hosting/Jobs/DynamicJob.cs new file mode 100644 index 00000000..1eda9192 --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Jobs/DynamicJob.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.Jobs; +using Foundatio.Utility; + +namespace Foundatio.Extensions.Hosting.Jobs; + +internal class DynamicJob : IJob +{ + private readonly IServiceProvider _serviceProvider; + private readonly Func _action; + + public DynamicJob(IServiceProvider serviceProvider, Func action) + { + _serviceProvider = serviceProvider; + _action = action; + } + + public async Task RunAsync(CancellationToken cancellationToken = default) + { + await _action(_serviceProvider, cancellationToken).AnyContext(); + + return JobResult.Success; + } +} diff --git a/src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs b/src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs index e838e7e5..ffc54dbf 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs @@ -47,7 +47,7 @@ private async Task ExecuteAsync(CancellationToken stoppingToken) } } - var runner = new JobRunner(_jobOptions, _loggerFactory); + var runner = new JobRunner(_jobOptions, _serviceProvider, _loggerFactory); try { diff --git a/src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs b/src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs index 43cea720..a3c5a215 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs @@ -1,5 +1,7 @@ using System; using System.Linq; +using System.Threading; +using System.Threading.Tasks; using Foundatio.Jobs; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -18,13 +20,14 @@ public static IServiceCollection AddJob(this IServiceCollection services, Hosted { return services.AddTransient(s => new HostedJobService(s, jobOptions, s.GetService())); } - else - { - if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) - services.AddTransient(); - return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.JobFactory, jobOptions.CronSchedule)); - } + if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) + services.AddTransient(); + + if (!services.Any(s => s.ServiceType == typeof(ScheduledJobManager))) + services.AddSingleton(); + + return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.CronSchedule, jobOptions.Name ?? Guid.NewGuid().ToString(), jobOptions.JobFactory)); } public static IServiceCollection AddJob(this IServiceCollection services, Func jobFactory, HostedJobOptions jobOptions) @@ -33,18 +36,19 @@ public static IServiceCollection AddJob(this IServiceCollection services, Func(s => { - jobOptions.JobFactory = () => jobFactory(s); + jobOptions.JobFactory = jobFactory; return new HostedJobService(s, jobOptions, s.GetService()); }); } - else - { - if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) - services.AddTransient(); - return services.AddTransient(s => new ScheduledJobRegistration(() => jobFactory(s), jobOptions.CronSchedule)); - } + if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) + services.AddTransient(); + + if (!services.Any(s => s.ServiceType == typeof(ScheduledJobManager))) + services.AddSingleton(); + + return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.CronSchedule, jobOptions.Name ?? Guid.NewGuid().ToString(), _ => jobFactory(s))); } public static IServiceCollection AddJob(this IServiceCollection services, HostedJobOptions jobOptions) where T : class, IJob @@ -55,18 +59,19 @@ public static IServiceCollection AddJob(this IServiceCollection services, Hos return services.AddTransient(s => { if (jobOptions.JobFactory == null) - jobOptions.JobFactory = s.GetRequiredService; + jobOptions.JobFactory = _ => s.GetRequiredService(); return new HostedJobService(s, jobOptions, s.GetService()); }); } - else - { - if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) - services.AddTransient(); - return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.JobFactory ?? (s.GetRequiredService), jobOptions.CronSchedule)); - } + if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) + services.AddTransient(); + + if (!services.Any(s => s.ServiceType == typeof(ScheduledJobManager))) + services.AddSingleton(); + + return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.CronSchedule, typeof(T).FullName, jobOptions.JobFactory ?? (_ => s.GetRequiredService()))); } public static IServiceCollection AddJob(this IServiceCollection services, bool waitForStartupActions = false) where T : class, IJob @@ -79,9 +84,52 @@ public static IServiceCollection AddCronJob(this IServiceCollection services, return services.AddJob(o => o.CronSchedule(cronSchedule)); } + public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Func action) + { + return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, action))); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Func action) + { + return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (xp, _) => action(xp)))); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Func action) + { + return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, _) => action()))); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Action action) + { + return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (xp, ct) => + { + action(xp, ct); + return Task.CompletedTask; + }))); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Action action) + { + return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, ct) => + { + action(ct); + return Task.CompletedTask; + }))); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Action action) + { + return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, _) => + { + action(); + return Task.CompletedTask; + }))); + } + public static IServiceCollection AddJob(this IServiceCollection services, Action configureJobOptions) where T : class, IJob { var jobOptionsBuilder = new HostedJobOptionsBuilder(); + jobOptionsBuilder.Name(typeof(T).FullName); configureJobOptions?.Invoke(jobOptionsBuilder); return services.AddJob(jobOptionsBuilder.Target); } diff --git a/src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs b/src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs index 05194610..08f238a0 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs @@ -36,7 +36,7 @@ public HostedJobOptionsBuilder Description(string value) return this; } - public HostedJobOptionsBuilder JobFactory(Func value) + public HostedJobOptionsBuilder JobFactory(Func value) { Target.JobFactory = value; return this; diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobManager.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobManager.cs new file mode 100644 index 00000000..18799c7d --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobManager.cs @@ -0,0 +1,111 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.Caching; +using Foundatio.Jobs; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Foundatio.Extensions.Hosting.Jobs; + +public class ScheduledJobManager +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILoggerFactory _loggerFactory; + private readonly ICacheClient _cacheClient; + + public ScheduledJobManager(IServiceProvider serviceProvider, ILoggerFactory loggerFactory) + { + _serviceProvider = serviceProvider; + _loggerFactory = loggerFactory; + _cacheClient = serviceProvider.GetService() ?? new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)); + Jobs.AddRange(serviceProvider.GetServices().Select(j => new ScheduledJobRunner(j.Schedule, j.Name, j.JobFactory, serviceProvider, _cacheClient, loggerFactory))); + } + + public void AddOrUpdate(string cronSchedule) where TJob : class, IJob + { + string jobName = typeof(TJob).Name; + var job = Jobs.FirstOrDefault(j => j.JobName == jobName); + if (job == null) + { + Jobs.Add(new ScheduledJobRunner(cronSchedule, jobName, sp => sp.GetRequiredService(), _serviceProvider, _cacheClient, _loggerFactory)); + } + else + { + job.Schedule = cronSchedule; + } + } + + public void AddOrUpdate(string jobName, string cronSchedule, Func action) + { + var job = Jobs.FirstOrDefault(j => j.JobName == jobName); + if (job == null) + { + Jobs.Add(new ScheduledJobRunner(cronSchedule, jobName, sp => new DynamicJob(sp, action), _serviceProvider, _cacheClient, _loggerFactory)); + } + else + { + job.Schedule = cronSchedule; + } + } + + public void AddOrUpdate(string jobName, string cronSchedule, Func action) + { + AddOrUpdate(jobName, cronSchedule, (_, ct) => action(ct)); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Func action) + { + AddOrUpdate(jobName, cronSchedule, (_, _) => action()); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Action action) + { + AddOrUpdate(jobName, cronSchedule, (sp, ct) => + { + action(sp, ct); + return Task.CompletedTask; + }); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Action action) + { + AddOrUpdate(jobName, cronSchedule, (_, ct) => + { + action(ct); + return Task.CompletedTask; + }); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Action action) + { + AddOrUpdate(jobName, cronSchedule, (_, _) => + { + action(); + return Task.CompletedTask; + }); + } + + public void Remove() where TJob : class, IJob + { + string jobName = typeof(TJob).Name; + var job = Jobs.FirstOrDefault(j => j.JobName == jobName); + if (job != null) + { + Jobs.Remove(job); + } + } + + public void Remove(string jobName) + { + var job = Jobs.FirstOrDefault(j => j.JobName == jobName); + if (job != null) + { + Jobs.Remove(job); + } + } + + internal List Jobs { get; } = new(); +} diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRegistration.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRegistration.cs index 89fd57b6..39edf4a5 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRegistration.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRegistration.cs @@ -5,12 +5,14 @@ namespace Foundatio.Extensions.Hosting.Jobs; public class ScheduledJobRegistration { - public ScheduledJobRegistration(Func jobFactory, string schedule) + public ScheduledJobRegistration(string schedule, string jobName, Func jobFactory) { - JobFactory = jobFactory; Schedule = schedule; + Name = jobName; + JobFactory = jobFactory; } - public Func JobFactory { get; } public string Schedule { get; } + public string Name { get; } + public Func JobFactory { get; } } diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs new file mode 100644 index 00000000..f4187348 --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs @@ -0,0 +1,113 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Cronos; +using Foundatio.Caching; +using Foundatio.Jobs; +using Foundatio.Lock; +using Foundatio.Utility; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Foundatio.Extensions.Hosting.Jobs; + +internal class ScheduledJobRunner +{ + private readonly Func _jobFactory; + private readonly IServiceProvider _serviceProvider; + private CronExpression _cronSchedule; + private readonly ILockProvider _lockProvider; + private readonly ILogger _logger; + private readonly DateTime _baseDate = new(2010, 1, 1); + private string _cacheKeyPrefix; + + public ScheduledJobRunner(string schedule, string jobName, Func jobFactory, IServiceProvider serviceProvider, ICacheClient cacheClient, ILoggerFactory loggerFactory = null) + { + _jobFactory = jobFactory; + _serviceProvider = serviceProvider; + Schedule = schedule; + JobName = jobName; + _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; + + _cronSchedule = CronExpression.Parse(schedule); + if (_cronSchedule == null) + throw new ArgumentException("Could not parse schedule.", nameof(schedule)); + + var interval = TimeSpan.FromDays(1); + + var nextOccurrence = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); + if (nextOccurrence.HasValue) + { + var nextNextOccurrence = _cronSchedule.GetNextOccurrence(nextOccurrence.Value); + if (nextNextOccurrence.HasValue) + interval = nextNextOccurrence.Value.Subtract(nextOccurrence.Value); + } + + _lockProvider = new ThrottlingLockProvider(cacheClient, 1, interval.Add(interval)); + + NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); + } + + private string _schedule; + public string Schedule + { + get { return _schedule;} + set + { + _cronSchedule = CronExpression.Parse(value); + NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); + _schedule = value; + } + } + + public string JobName { get; private set; } + public DateTime? LastRun { get; private set; } + public DateTime? NextRun { get; private set; } + public Task RunTask { get; private set; } + + public bool ShouldRun() + { + if (!NextRun.HasValue) + return false; + + // not time yet + if (NextRun > SystemClock.UtcNow) + return false; + + // check if already run + if (LastRun != null && LastRun.Value == NextRun.Value) + return false; + + return true; + } + + public Task StartAsync(CancellationToken cancellationToken = default) + { + // using lock provider in a cluster with a distributed cache implementation keeps cron jobs from running duplicates + // TODO: provide ability to run cron jobs on a per host isolated schedule + return _lockProvider.TryUsingAsync(GetLockKey(NextRun.Value), t => + { + // start running the job in a thread + RunTask = Task.Factory.StartNew(async () => + { + var result = await _jobFactory(_serviceProvider).TryRunAsync(cancellationToken).AnyContext(); + // TODO: Should we only set last run on success? Seems like that could be bad. + _logger.LogJobResult(result, JobName); + }, cancellationToken).Unwrap(); + + LastRun = NextRun; + NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); + + return Task.CompletedTask; + }, TimeSpan.Zero, TimeSpan.Zero); + } + + private string GetLockKey(DateTime date) + { + _cacheKeyPrefix ??= TypeHelper.GetTypeDisplayName(_jobFactory(_serviceProvider).GetType()); + + long minute = (long)date.Subtract(_baseDate).TotalMinutes; + + return _cacheKeyPrefix + minute; + } +} diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs index e724b7ab..a777156c 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs @@ -1,31 +1,23 @@ using System; -using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; -using Cronos; -using Foundatio.Caching; using Foundatio.Extensions.Hosting.Startup; -using Foundatio.Jobs; -using Foundatio.Lock; using Foundatio.Utility; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; namespace Foundatio.Extensions.Hosting.Jobs; public class ScheduledJobService : BackgroundService, IJobStatus { - private readonly List _jobs; private readonly IServiceProvider _serviceProvider; + private readonly ScheduledJobManager _jobManager; - public ScheduledJobService(IServiceProvider serviceProvider, ILoggerFactory loggerFactory) + public ScheduledJobService(IServiceProvider serviceProvider, ScheduledJobManager jobManager) { _serviceProvider = serviceProvider; - var cacheClient = serviceProvider.GetService() ?? new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)); - _jobs = new List(serviceProvider.GetServices().Select(j => new ScheduledJobRunner(j.JobFactory, j.Schedule, cacheClient, loggerFactory))); + _jobManager = jobManager; var lifetime = serviceProvider.GetService(); lifetime?.RegisterHostedJobInstance(this); @@ -49,7 +41,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) while (!stoppingToken.IsCancellationRequested) { - var jobsToRun = _jobs.Where(j => j.ShouldRun()).ToArray(); + var jobsToRun = _jobManager.Jobs.Where(j => j.ShouldRun()).ToArray(); foreach (var jobToRun in jobsToRun) await jobToRun.StartAsync(stoppingToken).AnyContext(); @@ -61,94 +53,4 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.Delay(timeUntilNextMinute, stoppingToken).AnyContext(); } } - - private class ScheduledJobRunner - { - private readonly Func _jobFactory; - private readonly CronExpression _cronSchedule; - private readonly ILockProvider _lockProvider; - private readonly ILogger _logger; - private readonly DateTime _baseDate = new(2010, 1, 1); - private string _cacheKeyPrefix; - - public ScheduledJobRunner(Func jobFactory, string schedule, ICacheClient cacheClient, ILoggerFactory loggerFactory = null) - { - _jobFactory = jobFactory; - Schedule = schedule; - _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; - - _cronSchedule = CronExpression.Parse(schedule); - if (_cronSchedule == null) - throw new ArgumentException("Could not parse schedule.", nameof(schedule)); - - var interval = TimeSpan.FromDays(1); - - var nextOccurrence = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); - if (nextOccurrence.HasValue) - { - var nextNextOccurrence = _cronSchedule.GetNextOccurrence(nextOccurrence.Value); - if (nextNextOccurrence.HasValue) - interval = nextNextOccurrence.Value.Subtract(nextOccurrence.Value); - } - - _lockProvider = new ThrottlingLockProvider(cacheClient, 1, interval.Add(interval)); - - NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); - } - - public string Schedule { get; private set; } - public DateTime? LastRun { get; private set; } - public DateTime? NextRun { get; private set; } - public Task RunTask { get; private set; } - - public bool ShouldRun() - { - if (!NextRun.HasValue) - return false; - - // not time yet - if (NextRun > SystemClock.UtcNow) - return false; - - // check if already run - if (LastRun != null && LastRun.Value == NextRun.Value) - return false; - - return true; - } - - public Task StartAsync(CancellationToken cancellationToken = default) - { - // using lock provider in a cluster with a distributed cache implementation keeps cron jobs from running duplicates - // TODO: provide ability to run cron jobs on a per host isolated schedule - return _lockProvider.TryUsingAsync(GetLockKey(NextRun.Value), t => - { - // start running the job in a thread - RunTask = Task.Factory.StartNew(async () => - { - var job = _jobFactory(); - // TODO: Don't calculate job name every time - string jobName = job.GetType().Name; - var result = await _jobFactory().TryRunAsync(cancellationToken).AnyContext(); - // TODO: Should we only set last run on success? Seems like that could be bad. - _logger.LogJobResult(result, jobName); - }, cancellationToken).Unwrap(); - - LastRun = NextRun; - NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); - - return Task.CompletedTask; - }, TimeSpan.Zero, TimeSpan.Zero); - } - - private string GetLockKey(DateTime date) - { - if (_cacheKeyPrefix == null) - _cacheKeyPrefix = TypeHelper.GetTypeDisplayName(_jobFactory().GetType()); - - long minute = (long)date.Subtract(_baseDate).TotalMinutes; - - return _cacheKeyPrefix + minute; - } - } } diff --git a/src/Foundatio/Jobs/JobOptions.cs b/src/Foundatio/Jobs/JobOptions.cs index 1b69bbb1..36c745bb 100644 --- a/src/Foundatio/Jobs/JobOptions.cs +++ b/src/Foundatio/Jobs/JobOptions.cs @@ -8,7 +8,7 @@ public class JobOptions { public string Name { get; set; } public string Description { get; set; } - public Func JobFactory { get; set; } + public Func JobFactory { get; set; } public bool RunContinuous { get; set; } = true; public TimeSpan? Interval { get; set; } public TimeSpan? InitialDelay { get; set; } @@ -65,25 +65,25 @@ public static JobOptions GetDefaults() where T : IJob public static JobOptions GetDefaults(IJob instance) { var jobOptions = GetDefaults(instance.GetType()); - jobOptions.JobFactory = () => instance; + jobOptions.JobFactory = _ => instance; return jobOptions; } public static JobOptions GetDefaults(IJob instance) where T : IJob { var jobOptions = GetDefaults(); - jobOptions.JobFactory = () => instance; + jobOptions.JobFactory = _ => instance; return jobOptions; } - public static JobOptions GetDefaults(Type jobType, Func jobFactory) + public static JobOptions GetDefaults(Type jobType, Func jobFactory) { var jobOptions = GetDefaults(jobType); jobOptions.JobFactory = jobFactory; return jobOptions; } - public static JobOptions GetDefaults(Func jobFactory) where T : IJob + public static JobOptions GetDefaults(Func jobFactory) where T : IJob { var jobOptions = GetDefaults(); jobOptions.JobFactory = jobFactory; diff --git a/src/Foundatio/Jobs/JobRunner.cs b/src/Foundatio/Jobs/JobRunner.cs index d40e79b3..51510059 100644 --- a/src/Foundatio/Jobs/JobRunner.cs +++ b/src/Foundatio/Jobs/JobRunner.cs @@ -15,37 +15,42 @@ public class JobRunner private readonly ILogger _logger; private string _jobName; private readonly JobOptions _options; + private readonly IServiceProvider _serviceProvider; - public JobRunner(JobOptions options, ILoggerFactory loggerFactory = null) + public JobRunner(JobOptions options, IServiceProvider serviceProvider, ILoggerFactory loggerFactory = null) { _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; _options = options; + _serviceProvider = serviceProvider; } - public JobRunner(IJob instance, ILoggerFactory loggerFactory = null, TimeSpan? initialDelay = null, int instanceCount = 1, bool runContinuous = true, int iterationLimit = -1, TimeSpan? interval = null) + public JobRunner(IJob instance, IServiceProvider serviceProvider, ILoggerFactory loggerFactory = null, TimeSpan? initialDelay = null, int instanceCount = 1, bool runContinuous = true, int iterationLimit = -1, TimeSpan? interval = null) : this(new JobOptions { - JobFactory = () => instance, + JobFactory = _ => instance, InitialDelay = initialDelay, InstanceCount = instanceCount, IterationLimit = iterationLimit, RunContinuous = runContinuous, Interval = interval - }, loggerFactory) + }, serviceProvider, loggerFactory) { } - public JobRunner(Func jobFactory, ILoggerFactory loggerFactory = null, TimeSpan? initialDelay = null, int instanceCount = 1, bool runContinuous = true, int iterationLimit = -1, TimeSpan? interval = null) + public JobRunner(Func jobFactory, IServiceProvider serviceProvider, + ILoggerFactory loggerFactory = null, TimeSpan? initialDelay = null, int instanceCount = 1, + bool runContinuous = true, int iterationLimit = -1, TimeSpan? interval = null) : this(new JobOptions - { - JobFactory = jobFactory, - InitialDelay = initialDelay, - InstanceCount = instanceCount, - IterationLimit = iterationLimit, - RunContinuous = runContinuous, - Interval = interval - }, loggerFactory) - { } + { + JobFactory = jobFactory, + InitialDelay = initialDelay, + InstanceCount = instanceCount, + IterationLimit = iterationLimit, + RunContinuous = runContinuous, + Interval = interval + }, serviceProvider, loggerFactory) + { + } public CancellationTokenSource CancellationTokenSource { get; private set; } @@ -127,7 +132,7 @@ public async Task RunAsync(CancellationToken cancellationToken = default) IJob job = null; try { - job = _options.JobFactory(); + job = _options.JobFactory(_serviceProvider); } catch (Exception ex) { @@ -171,7 +176,7 @@ public async Task RunAsync(CancellationToken cancellationToken = default) { try { - var jobInstance = _options.JobFactory(); + var jobInstance = _options.JobFactory(_serviceProvider); await jobInstance.RunContinuousAsync(_options.Interval, _options.IterationLimit, cancellationToken).AnyContext(); } catch (TaskCanceledException) diff --git a/tests/Foundatio.Tests/Jobs/JobTests.cs b/tests/Foundatio.Tests/Jobs/JobTests.cs index 48777b69..55e35379 100644 --- a/tests/Foundatio.Tests/Jobs/JobTests.cs +++ b/tests/Foundatio.Tests/Jobs/JobTests.cs @@ -9,6 +9,7 @@ using Foundatio.Metrics; using Foundatio.Utility; using Foundatio.Xunit; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Xunit; using Xunit.Abstractions; @@ -23,8 +24,9 @@ public JobTests(ITestOutputHelper output) : base(output) { } public async Task CanCancelJob() { var job = new HelloWorldJob(Log); + var sp = new ServiceCollection().BuildServiceProvider(); var timeoutCancellationTokenSource = new CancellationTokenSource(1000); - var resultTask = new JobRunner(job, Log).RunAsync(timeoutCancellationTokenSource.Token); + var resultTask = new JobRunner(job, sp, Log).RunAsync(timeoutCancellationTokenSource.Token); await SystemClock.SleepAsync(TimeSpan.FromSeconds(2)); Assert.True(await resultTask); } @@ -33,7 +35,8 @@ public async Task CanCancelJob() public async Task CanStopLongRunningJob() { var job = new LongRunningJob(Log); - var runner = new JobRunner(job, Log); + var sp = new ServiceCollection().BuildServiceProvider(); + var runner = new JobRunner(job, sp, Log); var cts = new CancellationTokenSource(1000); bool result = await runner.RunAsync(cts.Token); @@ -44,7 +47,8 @@ public async Task CanStopLongRunningJob() public async Task CanStopLongRunningCronJob() { var job = new LongRunningJob(Log); - var runner = new JobRunner(job, Log); + var sp = new ServiceCollection().BuildServiceProvider(); + var runner = new JobRunner(job, sp, Log); var cts = new CancellationTokenSource(1000); bool result = await runner.RunAsync(cts.Token); @@ -81,11 +85,12 @@ public async Task CanRunJobs() public async Task CanRunMultipleInstances() { var job = new HelloWorldJob(Log); + var sp = new ServiceCollection().BuildServiceProvider(); HelloWorldJob.GlobalRunCount = 0; using (var timeoutCancellationTokenSource = new CancellationTokenSource(1000)) { - await new JobRunner(job, Log, instanceCount: 5, iterationLimit: 1).RunAsync(timeoutCancellationTokenSource.Token); + await new JobRunner(job, sp, Log, instanceCount: 5, iterationLimit: 1).RunAsync(timeoutCancellationTokenSource.Token); } Assert.Equal(5, HelloWorldJob.GlobalRunCount); @@ -93,7 +98,7 @@ public async Task CanRunMultipleInstances() HelloWorldJob.GlobalRunCount = 0; using (var timeoutCancellationTokenSource = new CancellationTokenSource(50000)) { - await new JobRunner(job, Log, instanceCount: 5, iterationLimit: 100).RunAsync(timeoutCancellationTokenSource.Token); + await new JobRunner(job, sp, Log, instanceCount: 5, iterationLimit: 100).RunAsync(timeoutCancellationTokenSource.Token); } Assert.Equal(500, HelloWorldJob.GlobalRunCount); @@ -105,13 +110,14 @@ public async Task CanCancelContinuousJobs() using (TestSystemClock.Install()) { var job = new HelloWorldJob(Log); + var sp = new ServiceCollection().BuildServiceProvider(); var timeoutCancellationTokenSource = new CancellationTokenSource(100); await job.RunContinuousAsync(TimeSpan.FromSeconds(1), 5, timeoutCancellationTokenSource.Token); Assert.Equal(1, job.RunCount); timeoutCancellationTokenSource = new CancellationTokenSource(500); - var runnerTask = new JobRunner(job, Log, instanceCount: 5, iterationLimit: 10000, interval: TimeSpan.FromMilliseconds(1)).RunAsync(timeoutCancellationTokenSource.Token); + var runnerTask = new JobRunner(job, sp, Log, instanceCount: 5, iterationLimit: 10000, interval: TimeSpan.FromMilliseconds(1)).RunAsync(timeoutCancellationTokenSource.Token); await SystemClock.SleepAsync(TimeSpan.FromSeconds(1)); await runnerTask; }