Skip to content

Commit

Permalink
Various improvements to hosting scheduled jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ejsmith committed Jul 23, 2024
1 parent 0f4e865 commit 3465aea
Show file tree
Hide file tree
Showing 15 changed files with 389 additions and 184 deletions.
27 changes: 0 additions & 27 deletions samples/Foundatio.HostingSample/Jobs/EvenMinutesJob.cs

This file was deleted.

11 changes: 10 additions & 1 deletion samples/Foundatio.HostingSample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Extensions.Hosting.Jobs;
using Foundatio.Extensions.Hosting.Startup;
using Foundatio.Jobs;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -90,7 +92,14 @@ public static IHostBuilder CreateHostBuilder(string[] args)
s.AddCronJob<EveryMinuteJob>("* * * * *");

if (evenMinutes)
s.AddCronJob<EvenMinutesJob>("*/2 * * * *");
s.AddCronJob("*/2 * * * *", async sp =>
{
var logger = sp.GetRequiredService<ILogger<Program>>();
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EvenMinuteJob Run Thread={ManagedThreadId}", Thread.CurrentThread.ManagedThreadId);

await Task.Delay(TimeSpan.FromSeconds(5));
});

if (sample1)
s.AddJob(sp => new Sample1Job(sp.GetRequiredService<ILoggerFactory>()), o => o.ApplyDefaults<Sample1Job>().WaitForStartupActions(true).InitialDelay(TimeSpan.FromSeconds(4)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"profiles": {
"Foundatio.HostingSample": {
"commandName": "Project",
"commandLineArgs": "sample1 sample2"
"commandLineArgs": "all"
}
}
}
12 changes: 11 additions & 1 deletion samples/Foundatio.HostingSample/Startup/MyStartupAction.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Extensions.Hosting.Jobs;
using Foundatio.Extensions.Hosting.Startup;
using Microsoft.Extensions.Logging;

namespace Foundatio.HostingSample;

public class MyStartupAction : IStartupAction
{
private readonly ScheduledJobManager _scheduledJobManager;
private readonly ILogger _logger;

public MyStartupAction(ILogger<MyStartupAction> logger)
public MyStartupAction(ScheduledJobManager scheduledJobManager, ILogger<MyStartupAction> logger)
{
_scheduledJobManager = scheduledJobManager;
_logger = logger;
}

Expand All @@ -21,5 +24,12 @@ public async Task RunAsync(CancellationToken cancellationToken = default)
_logger.LogTrace("MyStartupAction Run Thread={ManagedThreadId}", Thread.CurrentThread.ManagedThreadId);
await Task.Delay(500);
}

_scheduledJobManager.AddOrUpdate("MyJob", "* * * * *", async () =>
{
_logger.LogInformation("Running MyJob");
await Task.Delay(1000);
_logger.LogInformation("MyJob Complete");
});
}
}
26 changes: 26 additions & 0 deletions src/Foundatio.Extensions.Hosting/Jobs/DynamicJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Jobs;
using Foundatio.Utility;

namespace Foundatio.Extensions.Hosting.Jobs;

internal class DynamicJob : IJob
{
private readonly IServiceProvider _serviceProvider;
private readonly Func<IServiceProvider, CancellationToken, Task> _action;

public DynamicJob(IServiceProvider serviceProvider, Func<IServiceProvider, CancellationToken, Task> action)
{
_serviceProvider = serviceProvider;
_action = action;
}

public async Task<JobResult> RunAsync(CancellationToken cancellationToken = default)
{
await _action(_serviceProvider, cancellationToken).AnyContext();

return JobResult.Success;
}
}
2 changes: 1 addition & 1 deletion src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private async Task ExecuteAsync(CancellationToken stoppingToken)
}
}

var runner = new JobRunner(_jobOptions, _loggerFactory);
var runner = new JobRunner(_jobOptions, _serviceProvider, _loggerFactory);

try
{
Expand Down
88 changes: 68 additions & 20 deletions src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Jobs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand All @@ -18,13 +20,14 @@ public static IServiceCollection AddJob(this IServiceCollection services, Hosted
{
return services.AddTransient<IHostedService>(s => new HostedJobService(s, jobOptions, s.GetService<ILoggerFactory>()));
}
else
{
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService)))
services.AddTransient<IHostedService, ScheduledJobService>();

return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.JobFactory, jobOptions.CronSchedule));
}
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService)))
services.AddTransient<IHostedService, ScheduledJobService>();

if (!services.Any(s => s.ServiceType == typeof(ScheduledJobManager)))
services.AddSingleton<ScheduledJobManager>();

return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.CronSchedule, jobOptions.Name ?? Guid.NewGuid().ToString(), jobOptions.JobFactory));
}

public static IServiceCollection AddJob(this IServiceCollection services, Func<IServiceProvider, IJob> jobFactory, HostedJobOptions jobOptions)
Expand All @@ -33,18 +36,19 @@ public static IServiceCollection AddJob(this IServiceCollection services, Func<I
{
return services.AddTransient<IHostedService>(s =>
{
jobOptions.JobFactory = () => jobFactory(s);
jobOptions.JobFactory = jobFactory;

return new HostedJobService(s, jobOptions, s.GetService<ILoggerFactory>());
});
}
else
{
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService)))
services.AddTransient<IHostedService, ScheduledJobService>();

return services.AddTransient(s => new ScheduledJobRegistration(() => jobFactory(s), jobOptions.CronSchedule));
}
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService)))
services.AddTransient<IHostedService, ScheduledJobService>();

if (!services.Any(s => s.ServiceType == typeof(ScheduledJobManager)))
services.AddSingleton<ScheduledJobManager>();

return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.CronSchedule, jobOptions.Name ?? Guid.NewGuid().ToString(), _ => jobFactory(s)));
}

public static IServiceCollection AddJob<T>(this IServiceCollection services, HostedJobOptions jobOptions) where T : class, IJob
Expand All @@ -55,18 +59,19 @@ public static IServiceCollection AddJob<T>(this IServiceCollection services, Hos
return services.AddTransient<IHostedService>(s =>
{
if (jobOptions.JobFactory == null)
jobOptions.JobFactory = s.GetRequiredService<T>;
jobOptions.JobFactory = _ => s.GetRequiredService<T>();

return new HostedJobService(s, jobOptions, s.GetService<ILoggerFactory>());
});
}
else
{
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService)))
services.AddTransient<IHostedService, ScheduledJobService>();

return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.JobFactory ?? (s.GetRequiredService<T>), jobOptions.CronSchedule));
}
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService)))
services.AddTransient<IHostedService, ScheduledJobService>();

if (!services.Any(s => s.ServiceType == typeof(ScheduledJobManager)))
services.AddSingleton<ScheduledJobManager>();

return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.CronSchedule, typeof(T).FullName, jobOptions.JobFactory ?? (_ => s.GetRequiredService<T>())));
}

public static IServiceCollection AddJob<T>(this IServiceCollection services, bool waitForStartupActions = false) where T : class, IJob
Expand All @@ -79,9 +84,52 @@ public static IServiceCollection AddCronJob<T>(this IServiceCollection services,
return services.AddJob<T>(o => o.CronSchedule(cronSchedule));
}

public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Func<IServiceProvider, CancellationToken, Task> action)
{
return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, action)));
}

public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Func<IServiceProvider, Task> action)
{
return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (xp, _) => action(xp))));
}

public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Func<Task> action)
{
return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, _) => action())));
}

public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Action<IServiceProvider, CancellationToken> action)
{
return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (xp, ct) =>
{
action(xp, ct);
return Task.CompletedTask;
})));
}

public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Action<CancellationToken> action)
{
return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, ct) =>
{
action(ct);
return Task.CompletedTask;
})));
}

public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule, Action action)
{
return services.AddJob(o => o.CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, _) =>
{
action();
return Task.CompletedTask;
})));
}

public static IServiceCollection AddJob<T>(this IServiceCollection services, Action<HostedJobOptionsBuilder> configureJobOptions) where T : class, IJob
{
var jobOptionsBuilder = new HostedJobOptionsBuilder();
jobOptionsBuilder.Name(typeof(T).FullName);
configureJobOptions?.Invoke(jobOptionsBuilder);
return services.AddJob<T>(jobOptionsBuilder.Target);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public HostedJobOptionsBuilder Description(string value)
return this;
}

public HostedJobOptionsBuilder JobFactory(Func<IJob> value)
public HostedJobOptionsBuilder JobFactory(Func<IServiceProvider, IJob> value)
{
Target.JobFactory = value;
return this;
Expand Down
111 changes: 111 additions & 0 deletions src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Caching;
using Foundatio.Jobs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Foundatio.Extensions.Hosting.Jobs;

public class ScheduledJobManager
{
private readonly IServiceProvider _serviceProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly ICacheClient _cacheClient;

public ScheduledJobManager(IServiceProvider serviceProvider, ILoggerFactory loggerFactory)
{
_serviceProvider = serviceProvider;
_loggerFactory = loggerFactory;
_cacheClient = serviceProvider.GetService<ICacheClient>() ?? new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory));
Jobs.AddRange(serviceProvider.GetServices<ScheduledJobRegistration>().Select(j => new ScheduledJobRunner(j.Schedule, j.Name, j.JobFactory, serviceProvider, _cacheClient, loggerFactory)));
}

public void AddOrUpdate<TJob>(string cronSchedule) where TJob : class, IJob
{
string jobName = typeof(TJob).Name;
var job = Jobs.FirstOrDefault(j => j.JobName == jobName);
if (job == null)
{
Jobs.Add(new ScheduledJobRunner(cronSchedule, jobName, sp => sp.GetRequiredService<TJob>(), _serviceProvider, _cacheClient, _loggerFactory));
}
else
{
job.Schedule = cronSchedule;
}
}

public void AddOrUpdate(string jobName, string cronSchedule, Func<IServiceProvider, CancellationToken, Task> action)
{
var job = Jobs.FirstOrDefault(j => j.JobName == jobName);
if (job == null)
{
Jobs.Add(new ScheduledJobRunner(cronSchedule, jobName, sp => new DynamicJob(sp, action), _serviceProvider, _cacheClient, _loggerFactory));
}
else
{
job.Schedule = cronSchedule;
}
}

public void AddOrUpdate(string jobName, string cronSchedule, Func<CancellationToken, Task> action)
{
AddOrUpdate(jobName, cronSchedule, (_, ct) => action(ct));
}

public void AddOrUpdate(string jobName, string cronSchedule, Func<Task> action)
{
AddOrUpdate(jobName, cronSchedule, (_, _) => action());
}

public void AddOrUpdate(string jobName, string cronSchedule, Action<IServiceProvider, CancellationToken> action)
{
AddOrUpdate(jobName, cronSchedule, (sp, ct) =>
{
action(sp, ct);
return Task.CompletedTask;
});
}

public void AddOrUpdate(string jobName, string cronSchedule, Action<CancellationToken> action)
{
AddOrUpdate(jobName, cronSchedule, (_, ct) =>
{
action(ct);
return Task.CompletedTask;
});
}

public void AddOrUpdate(string jobName, string cronSchedule, Action action)
{
AddOrUpdate(jobName, cronSchedule, (_, _) =>
{
action();
return Task.CompletedTask;
});
}

public void Remove<TJob>() where TJob : class, IJob
{
string jobName = typeof(TJob).Name;
var job = Jobs.FirstOrDefault(j => j.JobName == jobName);
if (job != null)
{
Jobs.Remove(job);
}
}

public void Remove(string jobName)
{
var job = Jobs.FirstOrDefault(j => j.JobName == jobName);
if (job != null)
{
Jobs.Remove(job);
}
}

internal List<ScheduledJobRunner> Jobs { get; } = new();
}
Loading

0 comments on commit 3465aea

Please sign in to comment.