Skip to content

Added thread pool for unpacking #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev/v3.17.x.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,32 @@ namespace PatchKit.Unity.Patcher.AppData.Local
{
public class MapHashExtractedFiles
{
private Dictionary<string, string> MapHash;
private volatile Dictionary<string, string> _mapHash;

public MapHashExtractedFiles()
{
MapHash = new Dictionary<string, string>();
_mapHash = new Dictionary<string, string>();
}

public string Add(string path)
{
string nameHash = HashCalculator.ComputeMD5Hash(path);
MapHash.Add(path, nameHash);
lock (_mapHash)
{
_mapHash.Add(path, nameHash);
}

return nameHash;
}

public bool TryGetHash(string path,out string nameHash)
{
if (MapHash.TryGetValue(path, out nameHash))
lock (_mapHash)
{
return true;
if (_mapHash.TryGetValue(path, out nameHash))
{
return true;
}
}

return false;
Expand Down
44 changes: 28 additions & 16 deletions Assets/PatchKit Patcher/Scripts/AppData/Local/Pack1Unarchiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
using PatchKit.Unity.Patcher.Data;
using PatchKit.Unity.Patcher.Debug;
using PatchKit.Unity.Utilities;
using SharpCompress.Compressors.LZMA;
using SharpCompress.Compressors.Xz;
using SharpRaven;
using SharpRaven.Data;
using SharpRaven.Utilities;
using FileMode = System.IO.FileMode;

namespace PatchKit.Unity.Patcher.AppData.Local
{
Expand Down Expand Up @@ -41,7 +40,7 @@ public class Pack1Unarchiver : IUnarchiver
/// </summary>
private readonly BytesRange _range;

private MapHashExtractedFiles _mapHashExtractedFiles;
private volatile MapHashExtractedFiles _mapHashExtractedFiles;

public event UnarchiveProgressChangedHandler UnarchiveProgressChanged;

Expand Down Expand Up @@ -113,7 +112,7 @@ public void Unarchive(CancellationToken cancellationToken)
Unpack(file, progress =>
{
OnUnarchiveProgressChanged(currentFile.Name, currentFile.Type == Pack1Meta.RegularFileType, currentEntry, _metaData.Files.Length, progress);
}, cancellationToken);
}, cancellationToken, false);
}
else
{
Expand All @@ -124,6 +123,8 @@ public void Unarchive(CancellationToken cancellationToken)

entry++;
}

ThreadingPool.WaitOne(cancellationToken);
DebugLogger.Log("Unpacking finished succesfully!");
}

Expand All @@ -136,7 +137,7 @@ public void UnarchiveSingleFile(Pack1Meta.FileEntry file, CancellationToken canc
throw new ArgumentOutOfRangeException("file", file, null);
}

Unpack(file, progress => OnUnarchiveProgressChanged(file.Name, file.Type == Pack1Meta.RegularFileType, 1, 1, progress), cancellationToken, destinationDirPath);
Unpack(file, progress => OnUnarchiveProgressChanged(file.Name, file.Type == Pack1Meta.RegularFileType, 1, 1, progress), cancellationToken, true, destinationDirPath);

OnUnarchiveProgressChanged(file.Name, file.Type == Pack1Meta.RegularFileType, 0, 1, 1.0);
}
Expand All @@ -156,14 +157,24 @@ private bool CanUnpack(Pack1Meta.FileEntry file)
return file.Offset >= _range.Start && file.Offset + file.Size <= _range.End;
}

private void Unpack(Pack1Meta.FileEntry file, Action<double> progress, CancellationToken cancellationToken, string destinationDirPath = null)
private void Unpack(Pack1Meta.FileEntry file, Action<double> progress, CancellationToken cancellationToken, bool canUseThreadPool, string destinationDirPath = null)
{
switch (file.Type)
{
case Pack1Meta.RegularFileType:
try
{
UnpackRegularFile(file, progress, cancellationToken, destinationDirPath);
progress(0.0);
if (file.Size.Value < 524288 && !canUseThreadPool)
{
ThreadingPool.ThreadingPoolExecute(cancellationToken,
() => UnpackRegularFile(file, cancellationToken, destinationDirPath));
}
else
{
UnpackRegularFile(file, cancellationToken, destinationDirPath);
}
progress(1.0);
}
catch (Ionic.Zlib.ZlibException e)
{
Expand Down Expand Up @@ -232,9 +243,15 @@ private DecompressorCreator ResolveDecompressor(Pack1Meta meta)
}
}

private void UnpackRegularFile(Pack1Meta.FileEntry file, Action<double> onProgress, CancellationToken cancellationToken, string destinationDirPath = null)
private void UnpackRegularFile(Pack1Meta.FileEntry file, CancellationToken cancellationToken, string destinationDirPath = null)
{
string destPath = Path.Combine(destinationDirPath == null ? _destinationDirPath : destinationDirPath, _mapHashExtractedFiles.Add(file.Name) + _suffix);
string fileRealName;
lock (_mapHashExtractedFiles)
{
fileRealName = _mapHashExtractedFiles.Add(file.Name) + _suffix;
}

string destPath = Path.Combine(destinationDirPath == null ? _destinationDirPath : destinationDirPath, fileRealName);

DebugLogger.LogFormat("Unpacking regular file {0} to {1}", file, destPath);

Expand Down Expand Up @@ -269,7 +286,7 @@ private void UnpackRegularFile(Pack1Meta.FileEntry file, Action<double> onProgre

DecompressorCreator decompressorCreator = ResolveDecompressor(_metaData);

using (var fs = new FileStream(_packagePath, FileMode.Open))
using (var fs = new FileStream(_packagePath, FileMode.Open, FileAccess.Read, FileShare.Read))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related to the thread pool support?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

{
fs.Seek(file.Offset.Value - _range.Start, SeekOrigin.Begin);

Expand All @@ -279,7 +296,7 @@ private void UnpackRegularFile(Pack1Meta.FileEntry file, Action<double> onProgre
{
using (var target = new FileStream(destPath, FileMode.Create))
{
ExtractFileFromStream(limitedStream, target, file.Size.Value, decryptor, decompressorCreator, onProgress, cancellationToken);
ExtractFileFromStream(limitedStream, target, decryptor, decompressorCreator, cancellationToken);
DebugTestCorruption(target);
}
}
Expand Down Expand Up @@ -338,10 +355,8 @@ private Stream CreateGzipDecompressor(Stream source)
private void ExtractFileFromStream(
Stream sourceStream,
Stream targetStream,
long fileSize,
ICryptoTransform decryptor,
DecompressorCreator createDecompressor,
Action<double> onProgress,
CancellationToken cancellationToken)
{
using (var cryptoStream = new CryptoStream(sourceStream, decryptor, CryptoStreamMode.Read))
Expand All @@ -364,9 +379,6 @@ private void ExtractFileFromStream(
{
cancellationToken.ThrowIfCancellationRequested();
targetStream.Write(buffer, 0, count);

long bytesProcessed = sourceStream.Position;
onProgress(bytesProcessed / (double) fileSize);
}
}
catch (OperationCanceledException)
Expand Down
53 changes: 53 additions & 0 deletions Assets/PatchKit Patcher/Scripts/Utilities/ThreadingPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Threading;
using PatchKit.Unity.Patcher.Cancellation;
using PatchKit.Unity.Patcher.Debug;

namespace PatchKit.Unity.Utilities
{
public static class ThreadingPool
{
private static readonly DebugLogger DebugLogger = new DebugLogger(typeof(ThreadingPool));

private static Semaphore _pool = new Semaphore(16, 17);
private static ManualResetEvent _awaiter = new ManualResetEvent(false);

public static void ThreadingPoolExecute(CancellationToken cancellationToken, Action action)
{
_pool.WaitOne();
ThreadPool.QueueUserWorkItem(state => ThreadingPoolProc(cancellationToken, action));
}

private static void ThreadingPoolProc(CancellationToken cancellationToken, Action action)
{
try
{
action();
}
catch (Exception e)
{
DebugLogger.LogError(e.ToString());
throw;
}
finally
{
int release = _pool.Release();
if (release == 16 || cancellationToken.IsCancelled)
{
_awaiter.Set();
}
}
}

public static void WaitOne(CancellationToken cancellationToken)
{
var release = _pool.Release();
if (release == 16 || cancellationToken.IsCancelled)
{
_awaiter.Set();
}

_awaiter.WaitOne();
}
}
}
11 changes: 11 additions & 0 deletions Assets/PatchKit Patcher/Scripts/Utilities/ThreadingPool.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.