-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathSqlDurabilityProvider.cs
232 lines (199 loc) · 9.13 KB
/
SqlDurabilityProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.SqlServer.AzureFunctions
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Host.Scale;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
class SqlDurabilityProvider : DurabilityProvider
{
public const string Name = "mssql";
readonly SqlDurabilityOptions durabilityOptions;
readonly SqlOrchestrationService service;
SqlScaleMonitor? scaleMonitor;
#if FUNCTIONS_V4
SqlTargetScaler? targetScaler;
#endif
public SqlDurabilityProvider(
SqlOrchestrationService service,
SqlDurabilityOptions durabilityOptions)
: base(Name, service, service, durabilityOptions.ConnectionStringName)
{
this.service = service ?? throw new ArgumentNullException(nameof(service));
this.durabilityOptions = durabilityOptions;
}
public override bool GuaranteesOrderedDelivery => true;
public override bool SupportsImplicitEntityDeletion => true;
public override JObject ConfigurationJson => JObject.FromObject(this.durabilityOptions);
public override TimeSpan MaximumDelayTime { get; set; } = TimeSpan.MaxValue;
public override string EventSourceName => "DurableTask-SqlServer";
public override async Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
OrchestrationState? state = await this.service.GetOrchestrationStateAsync(instanceId, executionId: null);
if (state == null)
{
return Array.Empty<OrchestrationState>();
}
if (!showInput)
{
// CONSIDER: It would be more efficient to not load the input at all from the data source.
state.Input = null;
}
return new[] { state };
}
public override async Task<string?> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializierSettings)
{
string instanceId = entityId.ToString();
OrchestrationState? orchestrationState = await this.service.GetOrchestrationStateAsync(
instanceId,
executionId: null);
// Entity state is expected to be persisted as orchestration input.
string? entityMetadata = orchestrationState?.Input;
if (string.IsNullOrEmpty(entityMetadata))
{
return null;
}
// The entity state envelope is expected to be a JSON object.
JObject entityJson;
try
{
entityJson = JObject.Parse(entityMetadata!);
}
catch (JsonException e)
{
throw new InvalidDataException($"Unable to read the entity data for {instanceId} because it's in an unrecognizeable format.", e);
}
// Entities that are deleted are expected to have { "exists": false }.
if (entityJson.TryGetValue("exists", out JToken? existsValue) &&
existsValue.Type == JTokenType.Boolean &&
existsValue.Value<bool>() == false)
{
return null;
}
// The actual state comes from the { "state": "..." } string field.
if (!entityJson.TryGetValue("state", out JToken? value))
{
return null;
}
return value.ToString();
}
public override Task<IList<OrchestrationState>> GetAllOrchestrationStates(CancellationToken cancellationToken)
{
throw new NotSupportedException($"Use the method overload that takes a {nameof(OrchestrationStatusQueryCondition)} parameter.");
}
public override async Task<OrchestrationStatusQueryResult> GetOrchestrationStateWithPagination(OrchestrationStatusQueryCondition condition, CancellationToken cancellationToken)
{
if (condition == null)
{
throw new ArgumentNullException(nameof(condition));
}
var query = new SqlOrchestrationQuery
{
PageSize = condition.PageSize,
FetchInput = condition.ShowInput,
CreatedTimeFrom = condition.CreatedTimeFrom,
CreatedTimeTo = condition.CreatedTimeTo,
InstanceIdPrefix = condition.InstanceIdPrefix,
};
if (condition.RuntimeStatus?.Any() == true)
{
query.StatusFilter = new HashSet<OrchestrationStatus>(condition.RuntimeStatus.Select(status => (OrchestrationStatus)status));
}
// The continuation token is just a page number.
if (string.IsNullOrWhiteSpace(condition.ContinuationToken))
{
query.PageNumber = 0;
}
else if (int.TryParse(condition.ContinuationToken, out int pageNumber))
{
query.PageNumber = pageNumber;
}
else
{
throw new ArgumentException($"The continuation token '{condition.ContinuationToken}' is invalid.", nameof(condition));
}
IReadOnlyCollection<OrchestrationState> results = await this.service.GetManyOrchestrationsAsync(query, cancellationToken);
return new OrchestrationStatusQueryResult
{
DurableOrchestrationState = results.Select(s => new DurableOrchestrationStatus
{
Name = s.Name,
InstanceId = s.OrchestrationInstance.InstanceId,
RuntimeStatus = (OrchestrationRuntimeStatus)s.OrchestrationStatus,
CustomStatus = s.Status,
CreatedTime = s.CreatedTime,
LastUpdatedTime = s.LastUpdatedTime,
Input = s.Input,
Output = s.Output,
}),
ContinuationToken = results.Count == query.PageSize ? (query.PageNumber + 1).ToString() : null,
};
}
public override async Task<PurgeHistoryResult> PurgeInstanceHistoryByInstanceId(string instanceId)
{
int deletedInstances = await this.service.PurgeOrchestrationHistoryAsync(new[] { instanceId });
return new PurgeHistoryResult(deletedInstances);
}
/// <summary>
/// Purges the history of orchestrations and entities based on a set of filters.
/// </summary>
/// <remarks>
/// This method will purge at most 1000 instances. The caller can purge more than this by calling the method
/// multiple times, checking for a non-zero return value after each call.
/// </remarks>
/// <param name="createdTimeFrom">The minimum creation time filter. Only instances created after this date are purged.</param>
/// <param name="createdTimeTo">The maximum creation time filter. Only instances created before this date are purged.</param>
/// <param name="runtimeStatus">The set of orchestration status values to filter orchestrations by.</param>
/// <returns>Returns the number of purged instances.</returns>
public override async Task<int> PurgeHistoryByFilters(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus)
{
var purgeFilter = new PurgeInstanceFilter(createdTimeFrom, createdTimeTo, runtimeStatus);
var purgeResult = await this.service.PurgeInstanceStateAsync(purgeFilter);
return purgeResult.DeletedInstanceCount;
}
public override async Task RewindAsync(string instanceId, string reason)
{
await this.service.RewindTaskOrchestrationAsync(instanceId, reason);
}
public override bool TryGetScaleMonitor(
string functionId,
string functionName,
string hubName,
string storageConnectionString,
out IScaleMonitor scaleMonitor)
{
if (this.scaleMonitor == null)
{
var sqlMetricsProvider = new SqlMetricsProvider(this.service);
this.scaleMonitor = new SqlScaleMonitor(hubName, sqlMetricsProvider);
}
scaleMonitor = this.scaleMonitor;
return true;
}
#if FUNCTIONS_V4
public override bool TryGetTargetScaler(
string functionId,
string functionName,
string hubName,
string connectionName,
out ITargetScaler targetScaler)
{
if (this.targetScaler == null)
{
var sqlMetricsProvider = new SqlMetricsProvider(this.service);
this.targetScaler = new SqlTargetScaler(hubName, sqlMetricsProvider);
}
targetScaler = this.targetScaler;
return true;
}
#endif
}
}