Skip to content

Commit

Permalink
V0.4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Felix Clase committed Jan 27, 2024
1 parent 6eeeb0a commit 6503d06
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 127 deletions.
18 changes: 11 additions & 7 deletions src/main/Hangfire.Storage.SQLite/CountersAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
using Hangfire.Logging;
using Hangfire.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Hangfire.Storage.SQLite.Entities;

Expand All @@ -13,9 +11,9 @@ namespace Hangfire.Storage.SQLite
/// <summary>
/// Represents Counter collection aggregator for SQLite database
/// </summary>
#pragma warning disable CS0618
#pragma warning disable CS0618
public class CountersAggregator : IBackgroundProcess, IServerComponent
#pragma warning restore CS0618
#pragma warning restore CS0618
{
private static readonly ILog Logger = LogProvider.For<CountersAggregator>();
private const int NumberOfRecordsInSinglePass = 1000;
Expand Down Expand Up @@ -50,11 +48,17 @@ public void Execute([NotNull] BackgroundProcessContext context)
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
public void Execute(CancellationToken cancellationToken)
{
// DANIEL WAS HERE
Retry.Twice((_) => _Execute(cancellationToken));
}

private void _Execute(CancellationToken cancellationToken)
{
Logger.DebugFormat("Aggregating records in 'Counter' table...");

long removedCount = 0;

do
{
using (var storageConnection = (HangfireSQLiteConnection)_storage.GetConnection())
Expand Down Expand Up @@ -97,7 +101,7 @@ public void Execute(CancellationToken cancellationToken)
counter.Value += item.Value;
counter.ExpireAt = item.ExpireAt > aggregatedItem.ExpireAt
? (item.ExpireAt > DateTime.MinValue ? item.ExpireAt : DateTime.MinValue)
: (aggregatedItem.ExpireAt > DateTime.MinValue ?
: (aggregatedItem.ExpireAt > DateTime.MinValue ?
aggregatedItem.ExpireAt : DateTime.MinValue);
storageDb.Database.Update(counter);
}
Expand Down Expand Up @@ -134,4 +138,4 @@ public override string ToString()
return "SQLite Counter Collection Aggregator";
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<TargetFrameworks>netstandard2.0;net48</TargetFrameworks>
</PropertyGroup>
<PropertyGroup>
<Version>0.4.0</Version>
<Version>0.4.1</Version>
<Authors>RaisedApp</Authors>
<Company>RaisedApp</Company>
<Copyright>Copyright © 2019 - Present</Copyright>
Expand All @@ -20,7 +20,8 @@
<title>Hangfire Storage SQLite</title>
<Description>An Alternative SQLite Storage for Hangfire</Description>
<PackageReleaseNotes>
0.4.0
0.4.1
- Stability and retry enhancements introduced by: Daniel Lindblom
</PackageReleaseNotes>
</PropertyGroup>
<ItemGroup>
Expand Down
38 changes: 27 additions & 11 deletions src/main/Hangfire.Storage.SQLite/HangfireSQLiteConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,11 @@ public HangfireSQLiteConnection(
_queueProviders = queueProviders ?? throw new ArgumentNullException(nameof(queueProviders));
}

public override void Dispose()
{
DbContext.Dispose();
base.Dispose();
}

public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout)
{
return new SQLiteDistributedLock($"HangFire:{resource}", timeout, DbContext, _storageOptions);
return Retry.Twice((_) =>
new SQLiteDistributedLock($"HangFire:{resource}", timeout, DbContext, _storageOptions)
);
}

public override void AnnounceServer(string serverId, ServerContext context)
Expand Down Expand Up @@ -93,6 +89,14 @@ public override string CreateExpiredJob(Job job, IDictionary<string, string> par
if (parameters == null)
throw new ArgumentNullException(nameof(parameters));

// DANIEL WAS HERE
return Retry.Twice(
(attempt) => _CreateExpiredJob(job, parameters, createdAt, expireIn)
);
}

private string _CreateExpiredJob(Job job, IDictionary<string, string> parameters, DateTime createdAt, TimeSpan expireIn)
{
lock (_lock)
{
var invocationData = InvocationData.SerializeJob(job);
Expand Down Expand Up @@ -343,17 +347,29 @@ public override void Heartbeat(string serverId)
throw new ArgumentNullException(nameof(serverId));
}

var server = DbContext.HangfireServerRepository.FirstOrDefault(_ => _.Id == serverId);
// DANIEL WAS HERE:
// Something fishy is going on here, a BackgroundServerGoneException is unexpectedly thrown
// https://github.com/HangfireIO/Hangfire/blob/master/src/Hangfire.Core/Server/ServerHeartbeatProcess.cs
// Changing to

// DANIEL WAS HERE:
// var server = DbContext.HangfireServerRepository.FirstOrDefault(_ => _.Id == serverId);
var server = Retry.Twice((attempts) =>
// Forcing a query (read somewhere that sqlite-net handles FirstOrDefault differently )
DbContext.HangfireServerRepository.Where(_ => _.Id == serverId)
.ToArray()
.FirstOrDefault()
);
if (server == null)
throw new BackgroundServerGoneException();

server.LastHeartbeat = DateTime.UtcNow;
var affected = DbContext.Database.Update(server);

// DANIEL WAS HERE:
// var affected = DbContext.Database.Update(server);
var affected = Retry.Twice((_) => DbContext.Database.Update(server));
if (affected == 0)
{
throw new BackgroundServerGoneException();
}
}

public override void RemoveServer(string serverId)
Expand Down
131 changes: 131 additions & 0 deletions src/main/Hangfire.Storage.SQLite/Retry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
using System;
using System.Linq;
using System.Threading;

namespace Hangfire.Storage.SQLite
{

// Daniel Lindblom WAS HERE:
// Added this utility as a light alternative to use Polly since I assume you do not want to drag
// that dependency in to this library.

public static class Retrying
{
public const int Once = 1;
public const int Twice = 2;
}

public static class Retry
{
public static TimeSpan DefaultDelay { get; set; } = TimeSpan.FromMilliseconds(250);

public static void Execute(Action<int> action, CancellationToken token, params TimeSpan[] delays)
{
if (delays is null || delays.Length == 0)
delays = new TimeSpan[] { DefaultDelay };

var retries = delays.Length;
var tries = 0;
while (!token.IsCancellationRequested)
{
try
{
action(tries + 1);
return;
}
catch
{
var delay = delays[tries % delays.Length];
if (++tries > retries) throw;
token.WaitHandle.WaitOne(delay);
}
}
throw new OperationCanceledException();
}
public static void Execute(Action<int> action, params TimeSpan[] delays)
{
Execute(action, CancellationToken.None, delays);
}

public static void Execute(Action<int> action, int retries = Retrying.Once, TimeSpan? delay = null)
{
delay = delay ?? DefaultDelay;
var delays = Enumerable.Range(1, retries).Select(_ => delay.Value).ToArray();
Execute(action, delays);
}

public static void Execute(Action<int> action, CancellationToken token, int retries = Retrying.Once, TimeSpan? delay = null)
{
delay = delay ?? DefaultDelay;
var delays = Enumerable.Range(1, retries).Select(_ => delay.Value).ToArray();
Execute(action, token, delays);
}

public static TResult Execute<TResult>(Func<int, TResult> func, CancellationToken token, params TimeSpan[] delays)
{
if (delays is null || delays.Length == 0)
delays = new TimeSpan[] { DefaultDelay };

var retries = delays.Length;
var tries = 0;
while (!token.IsCancellationRequested)
{
try
{
return func(tries + 1);
}
catch
{
var delay = delays[tries % delays.Length];
if (++tries > retries) throw;
token.WaitHandle.WaitOne(delay);
}
}
throw new OperationCanceledException();
}

public static TResult Execute<TResult>(Func<int, TResult> func, params TimeSpan[] delays)
{
return Execute(func, CancellationToken.None, delays);
}

public static TResult Execute<TResult>(Func<int, TResult> func, int retries = Retrying.Once, TimeSpan? delay = null)
{
delay = delay ?? DefaultDelay;
var delays = Enumerable.Range(1, retries).Select(_ => delay.Value).ToArray();
return Execute(func, delays);
}

public static TResult Execute<TResult>(Func<int, TResult> func, CancellationToken token, int retries = Retrying.Once, TimeSpan? delay = null)
{
delay = delay ?? DefaultDelay;
var delays = Enumerable.Range(1, retries).Select(_ => delay.Value).ToArray();
return Execute(func, token, delays);
}

public static void Once(Action<int> action, CancellationToken? token = null, TimeSpan? delay = null)
{
token = token ?? CancellationToken.None;
delay = delay ?? DefaultDelay;
Execute(action, token.Value, Retrying.Once, delay.Value);
}
public static void Twice(Action<int> action, CancellationToken? token = null, TimeSpan? delay = null)
{
token = token ?? CancellationToken.None;
delay = delay ?? DefaultDelay;
Execute(action, token.Value, Retrying.Twice, delay.Value);
}
public static TResult Once<TResult>(Func<int, TResult> func, CancellationToken? token = null, TimeSpan? delay = null)
{
token = token ?? CancellationToken.None;
delay = delay ?? DefaultDelay;
return Execute(func, token.Value, Retrying.Once, delay.Value);
}
public static TResult Twice<TResult>(Func<int, TResult> func, CancellationToken? token = null, TimeSpan? delay = null)
{
token = token ?? CancellationToken.None;
delay = delay ?? DefaultDelay;
return Execute(func, token.Value, Retrying.Twice, delay.Value);
}
}
}
31 changes: 18 additions & 13 deletions src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ private void Acquire(TimeSpan timeout)
throw new DistributedLockTimeoutException(_resource);
}
}
catch (DistributedLockTimeoutException)
catch (DistributedLockTimeoutException ex)
{
throw;
throw ex;
}
catch (Exception)
catch (Exception ex)
{
throw;
throw ex;
}
}

Expand All @@ -176,26 +176,31 @@ private void Acquire(TimeSpan timeout)
/// <exception cref="DistributedLockTimeoutException"></exception>
private void Release()
{
try
{
// DANIEL WAS HERE:
Retry.Twice((retry) => {

// Remove resource lock (if it's still ours)
_dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey);
lock (EventWaitHandleName)
Monitor.Pulse(EventWaitHandleName);
}
catch (Exception)
{
throw;
}

);
}


private void Cleanup()
{
try
{
// Delete expired locks (of any owner)
_dbContext.DistributedLockRepository.
Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow);
// DANIEL WAS HERE:
Retry.Twice((_) => {

// Delete expired locks (of any owner)
_dbContext.DistributedLockRepository.
Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow);
}
);
}
catch (Exception ex)
{
Expand Down
Loading

0 comments on commit 6503d06

Please sign in to comment.