-
Notifications
You must be signed in to change notification settings - Fork 493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize downloading depot chunks and manifests #1404
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
3cbccbb
Deserialize depot manifests directly from deflate stream
xPaw 80e3fc9
Fix nameof
xPaw 2f28a25
Stream manifest to rented buffer
xPaw 6e1fce4
Optimize downloading depot chunks
xPaw 901cff6
Decrypt depot chunks into pooled buffer
xPaw 0863415
Change chunk downloading to use a destination buffer given by the con…
xPaw a93a308
Fix tests
xPaw File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,9 @@ | |
*/ | ||
|
||
using System; | ||
using System.Buffers; | ||
using System.IO; | ||
using System.IO.Compression; | ||
using System.Net.Http; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
@@ -58,7 +60,7 @@ public void Dispose() | |
/// <param name="server">The content server to connect to.</param> | ||
/// <param name="depotKey"> | ||
/// The depot decryption key for the depot that will be downloaded. | ||
/// This is used for decrypting filenames (if needed) in depot manifests, and processing depot chunks. | ||
/// This is used for decrypting filenames (if needed) in depot manifests. | ||
/// </param> | ||
/// <param name="proxyServer">Optional content server marked as UseAsProxy which transforms the request.</param> | ||
/// <returns>A <see cref="DepotManifest"/> instance that contains information about the files present within a depot.</returns> | ||
|
@@ -81,11 +83,65 @@ public async Task<DepotManifest> DownloadManifestAsync( uint depotId, ulong mani | |
url = $"depot/{depotId}/manifest/{manifestId}/{MANIFEST_VERSION}"; | ||
} | ||
|
||
var manifestData = await DoRawCommandAsync( server, url, proxyServer ).ConfigureAwait( false ); | ||
using var request = new HttpRequestMessage( HttpMethod.Get, BuildCommand( server, url, proxyServer ) ); | ||
|
||
manifestData = ZipUtil.Decompress( manifestData ); | ||
using var cts = new CancellationTokenSource(); | ||
cts.CancelAfter( RequestTimeout ); | ||
|
||
DepotManifest depotManifest; | ||
|
||
try | ||
{ | ||
using var response = await httpClient.SendAsync( request, HttpCompletionOption.ResponseHeadersRead, cts.Token ).ConfigureAwait( false ); | ||
|
||
if ( !response.IsSuccessStatusCode ) | ||
{ | ||
throw new SteamKitWebRequestException( $"Response status code does not indicate success: {response.StatusCode:D} ({response.ReasonPhrase}).", response ); | ||
} | ||
|
||
if ( !response.Content.Headers.ContentLength.HasValue ) | ||
{ | ||
throw new SteamKitWebRequestException( "Response does not have Content-Length", response ); | ||
} | ||
|
||
cts.CancelAfter( ResponseBodyTimeout ); | ||
|
||
var contentLength = ( int )response.Content.Headers.ContentLength; | ||
var buffer = ArrayPool<byte>.Shared.Rent( contentLength ); | ||
|
||
var depotManifest = new DepotManifest( manifestData ); | ||
try | ||
{ | ||
using var ms = new MemoryStream( buffer, 0, contentLength ); | ||
|
||
// Stream the http response into the rented buffer | ||
await response.Content.CopyToAsync( ms, cts.Token ); | ||
|
||
if ( ms.Position != contentLength ) | ||
{ | ||
throw new InvalidDataException( $"Length mismatch after downloading depot manifest! (was {ms.Position}, but should be {contentLength})" ); | ||
} | ||
|
||
ms.Position = 0; | ||
|
||
// Decompress the zipped manifest data | ||
using var zip = new ZipArchive( ms ); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ZipArchive needs a seekable stream fyi (otherwise it literally copies the stream into its own temp MemoryStream). |
||
var entries = zip.Entries; | ||
|
||
DebugLog.Assert( entries.Count == 1, nameof( CDN ), "Expected the zip to contain only one file" ); | ||
|
||
using var zipEntryStream = entries[ 0 ].Open(); | ||
depotManifest = DepotManifest.Deserialize( zipEntryStream ); | ||
} | ||
finally | ||
{ | ||
ArrayPool<byte>.Shared.Return( buffer ); | ||
} | ||
} | ||
catch ( Exception ex ) | ||
{ | ||
DebugLog.WriteLine( nameof( CDN ), $"Failed to download manifest {request.RequestUri}: {ex.Message}" ); | ||
throw; | ||
} | ||
yaakov-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if ( depotKey != null ) | ||
{ | ||
|
@@ -108,53 +164,51 @@ public async Task<DepotManifest> DownloadManifestAsync( uint depotId, ulong mani | |
/// A <see cref="DepotManifest.ChunkData"/> instance that represents the chunk to download. | ||
/// This value should come from a manifest downloaded with <see cref="o:DownloadManifestAsync"/>. | ||
/// </param> | ||
/// <returns>A <see cref="DepotChunk"/> instance that contains the data for the given chunk.</returns> | ||
/// <returns>The total number of bytes written to <paramref name="destination" />.</returns> | ||
/// <param name="server">The content server to connect to.</param> | ||
/// <param name="destination"> | ||
/// The buffer to receive the chunk data. If <paramref name="depotKey"/> is provided, this will be the decompressed buffer. | ||
/// Allocate or rent a buffer that is equal or longer than <see cref="DepotManifest.ChunkData.UncompressedLength"/> | ||
/// </param> | ||
/// <param name="depotKey"> | ||
/// The depot decryption key for the depot that will be downloaded. | ||
/// This is used for decrypting filenames (if needed) in depot manifests, and processing depot chunks. | ||
/// This is used to process the chunk data. | ||
/// </param> | ||
/// <param name="proxyServer">Optional content server marked as UseAsProxy which transforms the request.</param> | ||
/// <exception cref="System.ArgumentNullException">chunk's <see cref="DepotManifest.ChunkData.ChunkID"/> was null.</exception> | ||
/// <exception cref="System.IO.InvalidDataException">Thrown if the downloaded data does not match the expected length.</exception> | ||
/// <exception cref="HttpRequestException">An network error occurred when performing the request.</exception> | ||
/// <exception cref="SteamKitWebRequestException">A network error occurred when performing the request.</exception> | ||
public async Task<DepotChunk> DownloadDepotChunkAsync( uint depotId, DepotManifest.ChunkData chunk, Server server, byte[]? depotKey = null, Server? proxyServer = null ) | ||
public async Task<int> DownloadDepotChunkAsync( uint depotId, DepotManifest.ChunkData chunk, Server server, byte[] destination, byte[]? depotKey = null, Server? proxyServer = null ) | ||
{ | ||
ArgumentNullException.ThrowIfNull( server ); | ||
|
||
ArgumentNullException.ThrowIfNull( chunk ); | ||
ArgumentNullException.ThrowIfNull( destination ); | ||
|
||
if ( chunk.ChunkID == null ) | ||
{ | ||
throw new ArgumentException( "Chunk must have a ChunkID.", nameof( chunk ) ); | ||
throw new ArgumentException( $"Chunk must have a {nameof( DepotManifest.ChunkData.ChunkID )}.", nameof( chunk ) ); | ||
} | ||
|
||
var chunkID = Utils.EncodeHexString( chunk.ChunkID ); | ||
|
||
var chunkData = await DoRawCommandAsync( server, string.Format( "depot/{0}/chunk/{1}", depotId, chunkID ), proxyServer ).ConfigureAwait( false ); | ||
|
||
// assert that lengths match only if the chunk has a length assigned. | ||
if ( chunk.CompressedLength > 0 && chunkData.Length != chunk.CompressedLength ) | ||
if ( depotKey == null ) | ||
{ | ||
throw new InvalidDataException( $"Length mismatch after downloading depot chunk! (was {chunkData.Length}, but should be {chunk.CompressedLength})" ); | ||
if ( destination.Length < chunk.CompressedLength ) | ||
{ | ||
throw new ArgumentException( $"The destination buffer must be longer than the chunk {nameof( DepotManifest.ChunkData.CompressedLength )} (since no depot key was provided).", nameof( destination ) ); | ||
} | ||
} | ||
|
||
var depotChunk = new DepotChunk( chunk, chunkData ); | ||
|
||
if ( depotKey != null ) | ||
else | ||
{ | ||
// if we have the depot key, we can process the chunk immediately | ||
depotChunk.Process( depotKey ); | ||
if ( destination.Length < chunk.UncompressedLength ) | ||
{ | ||
throw new ArgumentException( $"The destination buffer must be longer than the chunk {nameof( DepotManifest.ChunkData.UncompressedLength )}.", nameof( destination ) ); | ||
} | ||
} | ||
|
||
return depotChunk; | ||
} | ||
var chunkID = Utils.EncodeHexString( chunk.ChunkID ); | ||
var url = $"depot/{depotId}/chunk/{chunkID}"; | ||
|
||
async Task<byte[]> DoRawCommandAsync( Server server, string command, Server? proxyServer ) | ||
{ | ||
var url = BuildCommand( server, command, proxyServer ); | ||
using var request = new HttpRequestMessage( HttpMethod.Get, url ); | ||
using var request = new HttpRequestMessage( HttpMethod.Get, BuildCommand( server, url, proxyServer ) ); | ||
|
||
using var cts = new CancellationTokenSource(); | ||
cts.CancelAfter( RequestTimeout ); | ||
|
@@ -168,13 +222,65 @@ async Task<byte[]> DoRawCommandAsync( Server server, string command, Server? pro | |
throw new SteamKitWebRequestException( $"Response status code does not indicate success: {response.StatusCode:D} ({response.ReasonPhrase}).", response ); | ||
} | ||
|
||
if ( !response.Content.Headers.ContentLength.HasValue ) | ||
{ | ||
throw new SteamKitWebRequestException( "Response does not have Content-Length", response ); | ||
} | ||
|
||
var contentLength = ( int )response.Content.Headers.ContentLength; | ||
|
||
// assert that lengths match only if the chunk has a length assigned. | ||
if ( chunk.CompressedLength > 0 && contentLength != chunk.CompressedLength ) | ||
{ | ||
throw new InvalidDataException( $"Content-Length mismatch for depot chunk! (was {contentLength}, but should be {chunk.CompressedLength})" ); | ||
} | ||
|
||
cts.CancelAfter( ResponseBodyTimeout ); | ||
|
||
return await response.Content.ReadAsByteArrayAsync( cts.Token ).ConfigureAwait( false ); | ||
// If no depot key is provided, stream into the destination buffer without renting | ||
if ( depotKey == null ) | ||
{ | ||
using var ms = new MemoryStream( destination, 0, contentLength ); | ||
|
||
// Stream the http response into the provided destination | ||
await response.Content.CopyToAsync( ms, cts.Token ); | ||
|
||
if ( ms.Position != contentLength ) | ||
{ | ||
throw new InvalidDataException( $"Length mismatch after downloading depot chunk! (was {ms.Position}, but should be {contentLength})" ); | ||
} | ||
|
||
return contentLength; | ||
} | ||
|
||
// We have to stream into a temporary buffer because a decryption will need to be performed | ||
var buffer = ArrayPool<byte>.Shared.Rent( contentLength ); | ||
|
||
try | ||
{ | ||
using var ms = new MemoryStream( buffer, 0, contentLength ); | ||
|
||
// Stream the http response into the rented buffer | ||
await response.Content.CopyToAsync( ms, cts.Token ); | ||
|
||
if ( ms.Position != contentLength ) | ||
{ | ||
throw new InvalidDataException( $"Length mismatch after downloading depot chunk! (was {ms.Position}, but should be {contentLength})" ); | ||
} | ||
|
||
// process the chunk immediately | ||
var writtenLength = DepotChunk.Process( chunk, buffer.AsSpan()[ ..contentLength ], destination, depotKey ); | ||
|
||
return writtenLength; | ||
} | ||
finally | ||
{ | ||
ArrayPool<byte>.Shared.Return( buffer ); | ||
} | ||
} | ||
catch ( Exception ex ) | ||
{ | ||
DebugLog.WriteLine( nameof( CDN ), "Failed to complete web request to {0}: {1}", url, ex.Message ); | ||
DebugLog.WriteLine( nameof( CDN ), $"Failed to download a depot chunk {request.RequestUri}: {ex.Message}" ); | ||
throw; | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the cts and response are local, I can't just return the response.Content. The only other code path using DoRawCommandAsync is the chunk download, and that code should also stream.
The chunk download has other stupid things like the vzip decompress using LZMA which allocates a 1mb buffer whenever the decoder properties are set.
And the rented buffer would have to be copied if chunk is downloaded without giving a depot key.