Introduction
So I was back in .NET land the other day at work, where I had to schedule some code to run periodically on some schedule.
The business also needed this schedule to be adjustable, so that they could adjust it when things were busier and wind it down when they are not.
This adjusting of the schedule time would be done via a setting in the App.Config, where the App.Config is monitored for changes. If there is a change, then we would look to use the new schedule value from the App.Config to run the job. Ideally, the app must not go down to afford this change of job schedule time.
There are some good job / scheduling libraries out there, but for this, I just wanted to use something light weight, so I went with Quartz.net.
It's easy to setup and use, and has a fairly nice API, supports IOC and CRON schedules. In short, it fits the bill.
In a netshell, this post will simply talk about how you can adjust the schedule of an ALREADY scheduled job, there will also be some special caveats that I personally had to deal with in my requirements, which may or may not be an issue for you.
Some Weird Issues That I Needed To Cope With
So let me just talk about some of the issues that I had to deal with.
The guts of the job code that I run on my schedule is actually writing to Azure Blob Storage and then to Azure SQL DW tables. And as such has several writes to several components one after another.
So this run of the current job run MUST be allowed to complete in FULL (or fail using Exception handling, that’s ok to). It would not be acceptable to just stop the Quartz job while there is work in flight.
I guess some folks may be thinking of some sort of transaction here, that must either commit or rollback. Unfortunately, that doesn’t work with Azure Blob Storage uploads.
So I had to think of another plan.
So here is what I came up with. I would use threading primitives, namely an AutoResetEvent
that would control when the Quartz.net job could be changed to use a new schedule.
If a change in the App.Config was seen, then we know that we “should” be using a new schedule time, however the scheduled job MAY have work in flight. So we need to wait for that work to complete (or fail) before we could think about swapping the Quartz.net scheduler time.
So that is what I went for, there are a few other things to be aware of such as I needed threading primitives that worked with Async
/Await
code. Luckily, Stephen Toub from the TPL team has done that for us: asyncautoresetevent.
There is also the well known fact that the FileSystemWatcher
class fires events twice: http://lmgtfy.com/?q=filesystemwatcher+firing+twice.
So as we go through the code, you will see how I dealt with those.
The Code
Ok, so now that we have talked about the problem, let's go through the code.
There are several NuGet packages I am using to make my life easier.
So let's start with the entry point, which for me is the simple Program
class shown below:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Principal;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using SachaBarber.QuartzJobUpdate.Services;
using Topshelf;
namespace SachaBarber.QuartzJobUpdate
{
static class Program
{
private static ILogger _log = null;
[STAThread]
public static void Main()
{
try
{
var container = ContainerOperations.Container;
_log = container.Resolve<ILogger>();
_log.Log("Starting");
AppDomain.CurrentDomain.UnhandledException += AppDomainUnhandledException;
TaskScheduler.UnobservedTaskException += TaskSchedulerUnobservedTaskException;
Thread.CurrentPrincipal = new WindowsPrincipal(WindowsIdentity.GetCurrent());
HostFactory.Run(c =>
{
c.Service<SomeWindowsService>(s =>
{
s.ConstructUsing(() => container.Resolve<SomeWindowsService>());
s.WhenStarted(tc => tc.Start());
s.WhenStopped(tc => tc.Stop());
});
c.RunAsLocalSystem();
c.SetDescription("Uploads Calc Payouts/Summary data
into Azure blob storage for RiskStore DW ingestion");
c.SetDisplayName("SachaBarber.QuartzJobUpdate");
c.SetServiceName("SachaBarber.QuartzJobUpdate");
});
}
catch (Exception ex)
{
_log.Log(ex.Message);
}
finally
{
_log.Log("Closing");
}
}
private static void AppDomainUnhandledException
(object sender, UnhandledExceptionEventArgs e)
{
ProcessUnhandledException((Exception)e.ExceptionObject);
}
private static void TaskSchedulerUnobservedTaskException
(object sender, UnobservedTaskExceptionEventArgs e)
{
ProcessUnhandledException(e.Exception);
e.SetObserved();
}
private static void ProcessUnhandledException(Exception ex)
{
if (ex is TargetInvocationException)
{
ProcessUnhandledException(ex.InnerException);
return;
}
_log.Log("Error");
}
}
}
All this does is host the actual windows service class for me using TopShelf
. Where the actual service class looks like this:
using System;
using System.Configuration;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Xml.Linq;
using Autofac;
using SachaBarber.QuartzJobUpdate.Async;
using SachaBarber.QuartzJobUpdate.Configuration;
using SachaBarber.QuartzJobUpdate.Jobs;
using SachaBarber.QuartzJobUpdate.Services;
using Quartz;
namespace SachaBarber.QuartzJobUpdate
{
public class SomeWindowsService
{
private readonly ILogger _log;
private readonly ISchedulingAssistanceService _schedulingAssistanceService;
private readonly IRxSchedulerService _rxSchedulerService;
private readonly IObservableFileSystemWatcher _observableFileSystemWatcher;
private IScheduler _quartzScheduler;
private readonly AsyncLock _lock = new AsyncLock();
private readonly SerialDisposable
_configWatcherDisposable = new SerialDisposable();
private static readonly JobKey _someScheduledJobKey =
new JobKey("SomeScheduledJobKey");
private static readonly TriggerKey _someScheduledJobTriggerKey =
new TriggerKey("SomeScheduledJobTriggerKey");
public SomeWindowsService(
ILogger log,
ISchedulingAssistanceService schedulingAssistanceService,
IRxSchedulerService rxSchedulerService,
IObservableFileSystemWatcher observableFileSystemWatcher)
{
_log = log;
_schedulingAssistanceService = schedulingAssistanceService;
_rxSchedulerService = rxSchedulerService;
_observableFileSystemWatcher = observableFileSystemWatcher;
}
public void Start()
{
try
{
var ass = typeof (SomeWindowsService).Assembly;
var configFile = $"{ass.Location}.config";
CreateConfigWatcher(new FileInfo(configFile));
_log.Log("Starting SomeWindowsService");
_quartzScheduler = ContainerOperations.Container.Resolve<IScheduler>();
_quartzScheduler.JobFactory =
new AutofacJobFactory(ContainerOperations.Container);
_quartzScheduler.Start();
CreateScheduledJob();
}
catch (JobExecutionException jeex)
{
_log.Log(jeex.Message);
}
catch (SchedulerConfigException scex)
{
_log.Log(scex.Message);
}
catch (SchedulerException sex)
{
_log.Log(sex.Message);
}
}
public void Stop()
{
_log.Log("Stopping SomeWindowsService");
_quartzScheduler?.Shutdown();
_configWatcherDisposable.Dispose();
_observableFileSystemWatcher.Dispose();
}
private void CreateConfigWatcher(FileInfo configFileInfo)
{
FileSystemWatcher watcher = new FileSystemWatcher();
watcher.Path = configFileInfo.DirectoryName;
watcher.NotifyFilter =
NotifyFilters.LastAccess |
NotifyFilters.LastWrite |
NotifyFilters.FileName |
NotifyFilters.DirectoryName;
watcher.Filter = configFileInfo.Name;
_observableFileSystemWatcher.SetFile(watcher);
_configWatcherDisposable.Disposable =
_observableFileSystemWatcher.Changed.SubscribeOn(
_rxSchedulerService.TaskPool).Throttle
(TimeSpan.FromMilliseconds(500)).Subscribe(
async x =>
{
using (var releaser = await _lock.LockAsync())
{
_log.Log($"Config file {configFileInfo.Name}
has changed, attempting to read new config data");
_schedulingAssistanceService.RequiresNewSchedulerSetup = true;
_schedulingAssistanceService.SchedulerRestartGate.WaitAsync().
GetAwaiter().GetResult();
and recreate the scheduler using new settings
ConfigurationManager.RefreshSection
("schedulingConfiguration");
var newSchedulingConfiguration =
SimpleConfig.Configuration.Load<SchedulingConfiguration>();
_log.Log($"SchedulingConfiguration section is now :
{newSchedulingConfiguration}");
ContainerOperations.ReInitialiseSchedulingConfiguration
(newSchedulingConfiguration);
ReScheduleJob();
}
},
ex =>
{
_log.Log($"Error encountered attempting
to read new config data from config file {configFileInfo.Name}");
});
}
private void CreateScheduledJob(IJobDetail existingJobDetail = null)
{
var azureBlobConfiguration =
ContainerOperations.Container.Resolve<SchedulingConfiguration>();
IJobDetail job = JobBuilder.Create<SomeQuartzJob>()
.WithIdentity(_someScheduledJobKey)
.Build();
ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(_someScheduledJobTriggerKey)
.WithSimpleSchedule(x => x
.RepeatForever()
.WithIntervalInSeconds(azureBlobConfiguration.ScheduleTimerInMins)
)
.StartAt(DateTimeOffset.Now.AddSeconds(azureBlobConfiguration.ScheduleTimerInMins))
.Build();
_quartzScheduler.ScheduleJob(job, trigger);
}
private void ReScheduleJob()
{
if (_quartzScheduler != null)
{
_quartzScheduler.DeleteJob(_someScheduledJobKey);
CreateScheduledJob();
}
}
}
}
There is a fair bit going on here. So let's list some of the work this code does:
- It creates the initial Quartz.Net job and scheduled it using the values from a custom config section which are read into an object.
- It watches the config file for changes (we will go through that in a moment) and will wait on the
AsyncAutoResetEvent
to be signalled, at which point, it will recreate the Quartz.net job.
So let's have a look at some of the small helper parts.
This is a simple Rx based file system watcher. The reason Rx is good here is that you can Throttle the events (see this post FileSystemWatcher raises 2 events).
using System;
using System.IO;
using System.Reactive.Linq;
namespace SachaBarber.QuartzJobUpdate.Services
{
public class ObservableFileSystemWatcher : IObservableFileSystemWatcher
{
private FileSystemWatcher _watcher;
public void SetFile(FileSystemWatcher watcher)
{
_watcher = watcher;
Changed = Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
(h => _watcher.Changed += h, h => _watcher.Changed -= h)
.Select(x => x.EventArgs);
Renamed = Observable
.FromEventPattern<RenamedEventHandler, RenamedEventArgs>
(h => _watcher.Renamed += h, h => _watcher.Renamed -= h)
.Select(x => x.EventArgs);
Deleted = Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
(h => _watcher.Deleted += h, h => _watcher.Deleted -= h)
.Select(x => x.EventArgs);
Errors = Observable
.FromEventPattern<ErrorEventHandler, ErrorEventArgs>
(h => _watcher.Error += h, h => _watcher.Error -= h)
.Select(x => x.EventArgs);
Created = Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
(h => _watcher.Created += h, h => _watcher.Created -= h)
.Select(x => x.EventArgs);
All = Changed.Merge(Renamed).Merge(Deleted).Merge(Created);
_watcher.EnableRaisingEvents = true;
}
public void Dispose()
{
_watcher.EnableRaisingEvents = false;
_watcher.Dispose();
}
public IObservable<FileSystemEventArgs> Changed { get; private set; }
public IObservable<RenamedEventArgs> Renamed { get; private set; }
public IObservable<FileSystemEventArgs> Deleted { get; private set; }
public IObservable<ErrorEventArgs> Errors { get; private set; }
public IObservable<FileSystemEventArgs> Created { get; private set; }
public IObservable<FileSystemEventArgs> All { get; private set; }
}
}
And this is a small utility class that will contain the results of the custom config section that may be read using SimpleConfig.
namespace SachaBarber.QuartzJobUpdate.Configuration
{
public class SchedulingConfiguration
{
public int ScheduleTimerInMins { get; set; }
public override string ToString()
{
return $"ScheduleTimerInMins: {ScheduleTimerInMins}";
}
}
}
Which you read from the App.Config like this:
var newSchedulingConfiguration =
SimpleConfig.Configuration.Load<SchedulingConfiguration>();
And this is the Async
/Await
compatible AutoResetEvent
that I took from Stephen Toubs blog:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SachaBarber.QuartzJobUpdate.Async
{
public class AsyncAutoResetEvent
{
private static readonly Task Completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>>
_waits = new Queue<TaskCompletionSource<bool>>();
private bool _signaled;
public Task WaitAsync()
{
lock (_waits)
{
if (_signaled)
{
_signaled = false;
return Completed;
}
else
{
var tcs = new TaskCompletionSource<bool>();
_waits.Enqueue(tcs);
return tcs.Task;
}
}
}
public void Set()
{
TaskCompletionSource<bool> toRelease = null;
lock (_waits)
{
if (_waits.Count > 0)
toRelease = _waits.Dequeue();
else if (!_signaled)
_signaled = true;
}
toRelease?.SetResult(true);
}
}
}
So the last part of the puzzle is how does the AsynAutoReset
event get signalled?
Well, as we said above, we need to wait for any in progress work to complete first. So the way I tackled that was that within the job code that gets run every Quartz.Net scheduler tick time, we just check whether we have been requested to swap out the current schedule time, and if so we should signal the waiting code of the (shared) AsyncAutoResetEvent
, otherwise we just carry on and do the regular job work.
The way that we get the AsyncAutoResetEvent
that is used by the waiting code and also the job code (to signal it) is via using a singleton registration in an IOC container. I am using AutoFac
which I set up like this, but you could have your own singleton, or IOC container of choice that you could use.
The trick is to make sure that both classes that need to access the AsyncAutoResetEvent
use a single instance.
using System;
using System.Reflection;
using Autofac;
using SachaBarber.QuartzJobUpdate.Configuration;
using SachaBarber.QuartzJobUpdate.Services;
using Quartz;
using Quartz.Impl;
namespace SachaBarber.QuartzJobUpdate
{
public class ContainerOperations
{
private static Lazy<IContainer> _containerSingleton =
new Lazy<IContainer>(CreateContainer);
public static IContainer Container => _containerSingleton.Value;
public static void ReInitialiseSchedulingConfiguration(
SchedulingConfiguration newSchedulingConfiguration)
{
var currentSchedulingConfiguration =
Container.Resolve<SchedulingConfiguration>();
currentSchedulingConfiguration.ScheduleTimerInMins =
newSchedulingConfiguration.ScheduleTimerInMins;
}
private static IContainer CreateContainer()
{
var builder = new ContainerBuilder();
builder.RegisterType<ObservableFileSystemWatcher>()
.As<IObservableFileSystemWatcher>().ExternallyOwned();
builder.RegisterType<RxSchedulerService>()
.As<IRxSchedulerService>().ExternallyOwned();
builder.RegisterType<Logger>().As<ILogger>().ExternallyOwned();
builder.RegisterType<SomeWindowsService>();
builder.RegisterInstance(new SchedulingAssistanceService())
.As<ISchedulingAssistanceService>();
builder.RegisterInstance(
SimpleConfig.Configuration.Load<SchedulingConfiguration>());
builder.Register(c => new StdSchedulerFactory().GetScheduler())
.As<Quartz.IScheduler>();
builder.RegisterAssemblyTypes(Assembly.GetExecutingAssembly())
.Where(x => typeof(IJob).IsAssignableFrom(x));
return builder.Build();
}
}
}
Where the shared instance in my case is this class:
using SachaBarber.QuartzJobUpdate.Async;
namespace SachaBarber.QuartzJobUpdate.Services
{
public class SchedulingAssistanceService : ISchedulingAssistanceService
{
public SchedulingAssistanceService()
{
SchedulerRestartGate = new AsyncAutoResetEvent();
RequiresNewSchedulerSetup = false;
}
public AsyncAutoResetEvent SchedulerRestartGate { get; }
public bool RequiresNewSchedulerSetup { get; set; }
}
}
Here is the actual job code that will check to see if a change in the App.Config has been detected. Which would require this code to signal the waiting code that it may continue.
using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using Quartz;
namespace SachaBarber.QuartzJobUpdate.Services
{
public class SomeQuartzJob : IJob
{
private readonly ILogger _log;
private readonly ISchedulingAssistanceService _schedulingAssistanceService;
public SomeQuartzJob(
ILogger log,
ISchedulingAssistanceService schedulingAssistanceService)
{
_log = log;
_schedulingAssistanceService = schedulingAssistanceService;
}
public void Execute(IJobExecutionContext context)
{
try
{
ExecuteAsync(context).GetAwaiter().GetResult();
}
catch (JobExecutionException jeex)
{
_log.Log(jeex.Message);
throw;
}
catch (SchedulerConfigException scex)
{
_log.Log(scex.Message);
throw;
}
catch (SchedulerException sex)
{
_log.Log(sex.Message);
throw;
}
catch (ArgumentNullException anex)
{
_log.Log(anex.Message);
throw;
}
catch (OperationCanceledException ocex)
{
_log.Log(ocex.Message);
throw;
}
catch (IOException ioex)
{
_log.Log(ioex.Message);
throw;
}
}
public async Task ExecuteAsync(IJobExecutionContext context)
{
await Task.Run(async () =>
{
if (_schedulingAssistanceService.RequiresNewSchedulerSetup)
{
_schedulingAssistanceService.RequiresNewSchedulerSetup = false;
_log.Log("Job has been asked to stop,
to allow job reschedule due to change in config");
_schedulingAssistanceService.SchedulerRestartGate.Set();
}
else
{
await Task.Delay(1000);
_log.Log("Doing the uninterruptible work now");
}
});
}
}
}
So when the AsyncAutoResetEvent
is signalled, the waiting code (inside the subscribe code of the Rx file system watcher inside the SomeWindowsService.cs code) will proceed to swap out the Quartz.Net scheduler time.
It can do this safely as we know there is NO work in flight as the job has told this waiting to code to proceed, which it can only do if there is no work in flight.
This swapping over of the scheduler time to use the newly read App.Config values is also protected in an AsyncLock
class (again taken from Stephen Toub).
using System;
using System.Threading;
using System.Threading.Tasks;
namespace SachaBarber.QuartzJobUpdate.Async
{
public class AsyncLock
{
private readonly AsyncSemaphore m_semaphore;
private readonly Task<Releaser> m_releaser;
public AsyncLock()
{
m_semaphore = new AsyncSemaphore(1);
m_releaser = Task.FromResult(new Releaser(this));
}
public Task<Releaser> LockAsync()
{
var wait = m_semaphore.WaitAsync();
return wait.IsCompleted ?
m_releaser :
wait.ContinueWith((_, state) => new Releaser((AsyncLock)state),
this, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
public struct Releaser : IDisposable
{
private readonly AsyncLock m_toRelease;
internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; }
public void Dispose()
{
if (m_toRelease != null)
m_toRelease.m_semaphore.Release();
}
}
}
}
Where this relies on AsyncSemaphore
:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace SachaBarber.QuartzJobUpdate.Async
{
public class AsyncSemaphore
{
private static readonly Task s_completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>>
_mWaiters = new Queue<TaskCompletionSource<bool>>();
private int _mCurrentCount;
public AsyncSemaphore(int initialCount)
{
if (initialCount < 0)
throw new ArgumentOutOfRangeException("initialCount");
_mCurrentCount = initialCount;
}
public Task WaitAsync()
{
lock (_mWaiters)
{
if (_mCurrentCount > 0)
{
--_mCurrentCount;
return s_completed;
}
else
{
var waiter = new TaskCompletionSource<bool>();
_mWaiters.Enqueue(waiter);
return waiter.Task;
}
}
}
public void Release()
{
TaskCompletionSource<bool> toRelease = null;
lock (_mWaiters)
{
if (_mWaiters.Count > 0)
toRelease = _mWaiters.Dequeue();
else
++_mCurrentCount;
}
if (toRelease != null)
toRelease.SetResult(true);
}
}
}
Just for completeness, this is how you get an App.Config section to refresh at runtime:
ConfigurationManager.RefreshSection("schedulingConfiguration");
Anyway, this works fine for me, I now have a reactive app that changes to changes in the App.Config without the need to restart the app, and it does so by allowing inflight work to be completed.
Hope it helps someone out there.
Where is the Code?
The code can be found at https://github.com/sachabarber/SachaBarber.QuartzJobUpdate.