Skip to content

Commit 55e52e2

Browse files
authored
Add docs-assembler deploy [plan|apply] commands (#1334)
* Add `docs-assembler deploy [plan|apply]` commands This gives us the ability to analyze the plan and identify which files are being uploaded, deleted or skipped * Fix path for windows * Format * Reduce logs * Cleanup * Make sure to ignore symlinked files * Fix .DestinationPath on windows * Fix Destinationpath 2 * Enhance test * Reduce logs * Revert "Reduce logs" This reverts commit 38f26bc. * Cleanup * Use FakeItEasy instead of Moq
1 parent 1fa284a commit 55e52e2

File tree

8 files changed

+603
-0
lines changed

8 files changed

+603
-0
lines changed

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
<PackageVersion Include="AWSSDK.Core" Version="4.0.0.2" />
1717
<PackageVersion Include="AWSSDK.SQS" Version="4.0.0.1" />
1818
<PackageVersion Include="AWSSDK.S3" Version="4.0.0.1" />
19+
<PackageVersion Include="FakeItEasy" Version="8.3.0" />
1920
<PackageVersion Include="Elastic.Ingest.Elasticsearch" Version="0.11.3" />
2021
</ItemGroup>
2122
<!-- Build -->
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System.Diagnostics.CodeAnalysis;
6+
using System.IO.Abstractions;
7+
using System.Text.Json;
8+
using Actions.Core.Services;
9+
using Amazon.S3;
10+
using Amazon.S3.Transfer;
11+
using ConsoleAppFramework;
12+
using Documentation.Assembler.Deploying;
13+
using Elastic.Documentation.Tooling.Diagnostics.Console;
14+
using Microsoft.Extensions.Logging;
15+
16+
namespace Documentation.Assembler.Cli;
17+
18+
internal sealed class DeployCommands(ILoggerFactory logger, ICoreService githubActionsService)
19+
{
20+
[SuppressMessage("Usage", "CA2254:Template should be a static expression")]
21+
private void AssignOutputLogger()
22+
{
23+
var log = logger.CreateLogger<Program>();
24+
ConsoleApp.Log = msg => log.LogInformation(msg);
25+
ConsoleApp.LogError = msg => log.LogError(msg);
26+
}
27+
28+
/// <summary> Creates a sync plan </summary>
29+
/// <param name="environment"> The environment to build</param>
30+
/// <param name="s3BucketName">The S3 bucket name to deploy to</param>
31+
/// <param name="out"> The file to write the plan to</param>
32+
/// <param name="ctx"></param>
33+
public async Task<int> Plan(
34+
string environment, string s3BucketName, string @out = "", Cancel ctx = default)
35+
{
36+
AssignOutputLogger();
37+
await using var collector = new ConsoleDiagnosticsCollector(logger, githubActionsService)
38+
{
39+
NoHints = true
40+
}.StartAsync(ctx);
41+
var assembleContext = new AssembleContext(environment, collector, new FileSystem(), new FileSystem(), null, null);
42+
var s3Client = new AmazonS3Client();
43+
IDocsSyncPlanStrategy planner = new AwsS3SyncPlanStrategy(s3Client, s3BucketName, assembleContext, logger);
44+
var plan = await planner.Plan(ctx);
45+
ConsoleApp.Log("Total files to sync: " + plan.Count);
46+
ConsoleApp.Log("Total files to delete: " + plan.DeleteRequests.Count);
47+
ConsoleApp.Log("Total files to add: " + plan.AddRequests.Count);
48+
ConsoleApp.Log("Total files to update: " + plan.UpdateRequests.Count);
49+
ConsoleApp.Log("Total files to skip: " + plan.SkipRequests.Count);
50+
if (!string.IsNullOrEmpty(@out))
51+
{
52+
var output = SyncPlan.Serialize(plan);
53+
await using var fileStream = new FileStream(@out, FileMode.Create, FileAccess.Write);
54+
await using var writer = new StreamWriter(fileStream);
55+
await writer.WriteAsync(output);
56+
ConsoleApp.Log("Plan written to " + @out);
57+
}
58+
await collector.StopAsync(ctx);
59+
return collector.Errors;
60+
}
61+
62+
/// <summary> Applies a sync plan </summary>
63+
/// <param name="environment"> The environment to build</param>
64+
/// <param name="s3BucketName">The S3 bucket name to deploy to</param>
65+
/// <param name="planFile">The path to the plan file to apply</param>
66+
/// <param name="ctx"></param>
67+
public async Task<int> Apply(
68+
string environment,
69+
string s3BucketName,
70+
string planFile,
71+
Cancel ctx = default)
72+
{
73+
AssignOutputLogger();
74+
await using var collector = new ConsoleDiagnosticsCollector(logger, githubActionsService)
75+
{
76+
NoHints = true
77+
}.StartAsync(ctx);
78+
var assembleContext = new AssembleContext(environment, collector, new FileSystem(), new FileSystem(), null, null);
79+
var s3Client = new AmazonS3Client();
80+
var transferUtility = new TransferUtility(s3Client, new TransferUtilityConfig
81+
{
82+
ConcurrentServiceRequests = Environment.ProcessorCount * 2,
83+
MinSizeBeforePartUpload = AwsS3SyncPlanStrategy.PartSize
84+
});
85+
IDocsSyncApplyStrategy applier = new AwsS3SyncApplyStrategy(s3Client, transferUtility, s3BucketName, assembleContext, logger, collector);
86+
if (!File.Exists(planFile))
87+
{
88+
collector.EmitError(planFile, "Plan file does not exist.");
89+
await collector.StopAsync(ctx);
90+
return collector.Errors;
91+
}
92+
var planJson = await File.ReadAllTextAsync(planFile, ctx);
93+
var plan = SyncPlan.Deserialize(planJson);
94+
await applier.Apply(plan, ctx);
95+
await collector.StopAsync(ctx);
96+
return collector.Errors;
97+
}
98+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using Amazon.S3;
6+
using Amazon.S3.Model;
7+
using Amazon.S3.Transfer;
8+
using Elastic.Documentation.Diagnostics;
9+
using Microsoft.Extensions.Logging;
10+
11+
namespace Documentation.Assembler.Deploying;
12+
13+
public class AwsS3SyncApplyStrategy(
14+
IAmazonS3 s3Client,
15+
ITransferUtility transferUtility,
16+
string bucketName,
17+
AssembleContext context,
18+
ILoggerFactory loggerFactory,
19+
DiagnosticsCollector collector) : IDocsSyncApplyStrategy
20+
{
21+
private readonly ILogger<AwsS3SyncApplyStrategy> _logger = loggerFactory.CreateLogger<AwsS3SyncApplyStrategy>();
22+
23+
private void DisplayProgress(object? sender, UploadDirectoryProgressArgs args) => LogProgress(_logger, args, null);
24+
25+
private static readonly Action<ILogger, UploadDirectoryProgressArgs, Exception?> LogProgress = LoggerMessage.Define<UploadDirectoryProgressArgs>(
26+
LogLevel.Information,
27+
new EventId(2, nameof(LogProgress)),
28+
"{Args}");
29+
30+
public async Task Apply(SyncPlan plan, Cancel ctx = default)
31+
{
32+
await Upload(plan, ctx);
33+
await Delete(plan, ctx);
34+
}
35+
36+
private async Task Upload(SyncPlan plan, Cancel ctx)
37+
{
38+
var uploadRequests = plan.AddRequests.Cast<UploadRequest>().Concat(plan.UpdateRequests).ToList();
39+
if (uploadRequests.Count > 0)
40+
{
41+
_logger.LogInformation("Starting to process {Count} uploads using directory upload", uploadRequests.Count);
42+
var tempDir = Path.Combine(context.WriteFileSystem.Path.GetTempPath(), context.WriteFileSystem.Path.GetRandomFileName());
43+
_ = context.WriteFileSystem.Directory.CreateDirectory(tempDir);
44+
try
45+
{
46+
_logger.LogInformation("Copying {Count} files to temp directory", uploadRequests.Count);
47+
foreach (var upload in uploadRequests)
48+
{
49+
var destPath = context.WriteFileSystem.Path.Combine(tempDir, upload.DestinationPath);
50+
var destDirPath = context.WriteFileSystem.Path.GetDirectoryName(destPath)!;
51+
_ = context.WriteFileSystem.Directory.CreateDirectory(destDirPath);
52+
context.WriteFileSystem.File.Copy(upload.LocalPath, destPath);
53+
}
54+
var directoryRequest = new TransferUtilityUploadDirectoryRequest
55+
{
56+
BucketName = bucketName,
57+
Directory = tempDir,
58+
SearchPattern = "*",
59+
SearchOption = SearchOption.AllDirectories,
60+
UploadFilesConcurrently = true
61+
};
62+
directoryRequest.UploadDirectoryProgressEvent += DisplayProgress;
63+
_logger.LogInformation("Uploading {Count} files to S3", uploadRequests.Count);
64+
_logger.LogDebug("Starting directory upload from {TempDir}", tempDir);
65+
await transferUtility.UploadDirectoryAsync(directoryRequest, ctx);
66+
_logger.LogDebug("Directory upload completed");
67+
}
68+
finally
69+
{
70+
// Clean up temp directory
71+
if (context.WriteFileSystem.Directory.Exists(tempDir))
72+
context.WriteFileSystem.Directory.Delete(tempDir, true);
73+
}
74+
}
75+
}
76+
77+
private async Task Delete(SyncPlan plan, Cancel ctx)
78+
{
79+
var deleteCount = 0;
80+
var deleteRequests = plan.DeleteRequests.ToList();
81+
if (deleteRequests.Count > 0)
82+
{
83+
// Process deletes in batches of 1000 (AWS S3 limit)
84+
foreach (var batch in deleteRequests.Chunk(1000))
85+
{
86+
var deleteObjectsRequest = new DeleteObjectsRequest
87+
{
88+
BucketName = bucketName,
89+
Objects = batch.Select(d => new KeyVersion
90+
{
91+
Key = d.DestinationPath
92+
}).ToList()
93+
};
94+
var response = await s3Client.DeleteObjectsAsync(deleteObjectsRequest, ctx);
95+
if (response.HttpStatusCode != System.Net.HttpStatusCode.OK)
96+
{
97+
foreach (var error in response.DeleteErrors)
98+
collector.EmitError(error.Key, $"Failed to delete: {error.Message}");
99+
}
100+
else
101+
{
102+
var newCount = Interlocked.Add(ref deleteCount, batch.Length);
103+
_logger.LogInformation("Deleted {Count} objects ({DeleteCount}/{TotalDeleteCount})",
104+
batch.Length, newCount, deleteRequests.Count);
105+
}
106+
}
107+
}
108+
}
109+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System.Collections.Concurrent;
6+
using System.Diagnostics.CodeAnalysis;
7+
using System.Security.Cryptography;
8+
using Amazon.S3;
9+
using Amazon.S3.Model;
10+
using Microsoft.Extensions.Logging;
11+
12+
namespace Documentation.Assembler.Deploying;
13+
14+
public class AwsS3SyncPlanStrategy(IAmazonS3 s3Client, string bucketName, AssembleContext context, ILoggerFactory loggerFactory) : IDocsSyncPlanStrategy
15+
{
16+
internal const long PartSize = 5 * 1024 * 1024; // 5MB
17+
private readonly ILogger<AwsS3SyncPlanStrategy> _logger = loggerFactory.CreateLogger<AwsS3SyncPlanStrategy>();
18+
private static readonly ConcurrentDictionary<string, string> EtagCache = new();
19+
20+
private bool IsSymlink(string path)
21+
{
22+
var fileInfo = context.ReadFileSystem.FileInfo.New(path);
23+
return fileInfo.LinkTarget != null;
24+
}
25+
26+
public async Task<SyncPlan> Plan(Cancel ctx = default)
27+
{
28+
var remoteObjects = await ListObjects(ctx);
29+
var localObjects = context.OutputDirectory.GetFiles("*", SearchOption.AllDirectories)
30+
.Where(f => !IsSymlink(f.FullName))
31+
.ToArray();
32+
var deleteRequests = new ConcurrentBag<DeleteRequest>();
33+
var addRequests = new ConcurrentBag<AddRequest>();
34+
var updateRequests = new ConcurrentBag<UpdateRequest>();
35+
var skipRequests = new ConcurrentBag<SkipRequest>();
36+
37+
await Parallel.ForEachAsync(localObjects, ctx, async (localFile, token) =>
38+
{
39+
var relativePath = Path.GetRelativePath(context.OutputDirectory.FullName, localFile.FullName);
40+
var destinationPath = relativePath.Replace('\\', '/');
41+
42+
if (remoteObjects.TryGetValue(destinationPath, out var remoteObject))
43+
{
44+
// Check if the ETag differs for updates
45+
var localETag = await CalculateS3ETag(localFile.FullName, token);
46+
var remoteETag = remoteObject.ETag.Trim('"'); // Remove quotes from remote ETag
47+
if (localETag == remoteETag)
48+
{
49+
var skipRequest = new SkipRequest
50+
{
51+
LocalPath = localFile.FullName,
52+
DestinationPath = remoteObject.Key
53+
};
54+
skipRequests.Add(skipRequest);
55+
}
56+
else
57+
{
58+
var updateRequest = new UpdateRequest()
59+
{
60+
LocalPath = localFile.FullName,
61+
DestinationPath = remoteObject.Key
62+
};
63+
updateRequests.Add(updateRequest);
64+
}
65+
}
66+
else
67+
{
68+
var addRequest = new AddRequest
69+
{
70+
LocalPath = localFile.FullName,
71+
DestinationPath = destinationPath
72+
};
73+
addRequests.Add(addRequest);
74+
}
75+
});
76+
77+
// Find deletions (files in S3 but not locally)
78+
foreach (var remoteObject in remoteObjects)
79+
{
80+
var localPath = Path.Combine(context.OutputDirectory.FullName, remoteObject.Key.Replace('/', Path.DirectorySeparatorChar));
81+
if (context.ReadFileSystem.File.Exists(localPath))
82+
continue;
83+
var deleteRequest = new DeleteRequest
84+
{
85+
DestinationPath = remoteObject.Key
86+
};
87+
deleteRequests.Add(deleteRequest);
88+
}
89+
90+
return new SyncPlan
91+
{
92+
DeleteRequests = deleteRequests.ToList(),
93+
AddRequests = addRequests.ToList(),
94+
UpdateRequests = updateRequests.ToList(),
95+
SkipRequests = skipRequests.ToList(),
96+
Count = deleteRequests.Count + addRequests.Count + updateRequests.Count + skipRequests.Count
97+
};
98+
}
99+
100+
private async Task<Dictionary<string, S3Object>> ListObjects(Cancel ctx = default)
101+
{
102+
var listBucketRequest = new ListObjectsV2Request
103+
{
104+
BucketName = bucketName,
105+
MaxKeys = 1000,
106+
};
107+
var objects = new List<S3Object>();
108+
ListObjectsV2Response response;
109+
do
110+
{
111+
response = await s3Client.ListObjectsV2Async(listBucketRequest, ctx);
112+
objects.AddRange(response.S3Objects);
113+
listBucketRequest.ContinuationToken = response?.NextContinuationToken;
114+
} while (response?.IsTruncated == true);
115+
116+
return objects.ToDictionary(o => o.Key);
117+
}
118+
119+
[SuppressMessage("Security", "CA5351:Do Not Use Broken Cryptographic Algorithms")]
120+
private async Task<string> CalculateS3ETag(string filePath, Cancel ctx = default)
121+
{
122+
if (EtagCache.TryGetValue(filePath, out var cachedEtag))
123+
{
124+
_logger.LogDebug("Using cached ETag for {Path}", filePath);
125+
return cachedEtag;
126+
}
127+
128+
var fileInfo = context.ReadFileSystem.FileInfo.New(filePath);
129+
var fileSize = fileInfo.Length;
130+
131+
// For files under 5MB, use simple MD5 (matching TransferUtility behavior)
132+
if (fileSize <= PartSize)
133+
{
134+
await using var stream = context.ReadFileSystem.FileStream.New(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
135+
var smallBuffer = new byte[fileSize];
136+
var bytesRead = await stream.ReadAsync(smallBuffer.AsMemory(0, (int)fileSize), ctx);
137+
var hash = MD5.HashData(smallBuffer.AsSpan(0, bytesRead));
138+
var etag = Convert.ToHexStringLower(hash);
139+
EtagCache[filePath] = etag;
140+
return etag;
141+
}
142+
143+
// For files over 5MB, use multipart format with 5MB parts (matching TransferUtility)
144+
var parts = (int)Math.Ceiling((double)fileSize / PartSize);
145+
146+
await using var fileStream = context.ReadFileSystem.FileStream.New(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
147+
var partBuffer = new byte[PartSize];
148+
var partHashes = new List<byte[]>();
149+
150+
for (var i = 0; i < parts; i++)
151+
{
152+
var bytesRead = await fileStream.ReadAsync(partBuffer.AsMemory(0, partBuffer.Length), ctx);
153+
var partHash = MD5.HashData(partBuffer.AsSpan(0, bytesRead));
154+
partHashes.Add(partHash);
155+
}
156+
157+
// Concatenate all part hashes
158+
var concatenatedHashes = partHashes.SelectMany(h => h).ToArray();
159+
var finalHash = MD5.HashData(concatenatedHashes);
160+
161+
var multipartEtag = $"{Convert.ToHexStringLower(finalHash)}-{parts}";
162+
EtagCache[filePath] = multipartEtag;
163+
return multipartEtag;
164+
}
165+
}

0 commit comments

Comments
 (0)