Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize downloading depot chunks and manifests #1404

Merged
merged 7 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 136 additions & 30 deletions SteamKit2/SteamKit2/Steam/CDN/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
*/

using System;
using System.Buffers;
using System.IO;
using System.IO.Compression;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -58,7 +60,7 @@ public void Dispose()
/// <param name="server">The content server to connect to.</param>
/// <param name="depotKey">
/// The depot decryption key for the depot that will be downloaded.
/// This is used for decrypting filenames (if needed) in depot manifests, and processing depot chunks.
/// This is used for decrypting filenames (if needed) in depot manifests.
/// </param>
/// <param name="proxyServer">Optional content server marked as UseAsProxy which transforms the request.</param>
/// <returns>A <see cref="DepotManifest"/> instance that contains information about the files present within a depot.</returns>
Expand All @@ -81,11 +83,65 @@ public async Task<DepotManifest> DownloadManifestAsync( uint depotId, ulong mani
url = $"depot/{depotId}/manifest/{manifestId}/{MANIFEST_VERSION}";
}

var manifestData = await DoRawCommandAsync( server, url, proxyServer ).ConfigureAwait( false );
using var request = new HttpRequestMessage( HttpMethod.Get, BuildCommand( server, url, proxyServer ) );

manifestData = ZipUtil.Decompress( manifestData );
using var cts = new CancellationTokenSource();
cts.CancelAfter( RequestTimeout );

DepotManifest depotManifest;

try
{
using var response = await httpClient.SendAsync( request, HttpCompletionOption.ResponseHeadersRead, cts.Token ).ConfigureAwait( false );
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the cts and response are local, I can't just return the response.Content. The only other code path using DoRawCommandAsync is the chunk download, and that code should also stream.

The chunk download has other stupid things like the vzip decompress using LZMA which allocates a 1mb buffer whenever the decoder properties are set.

And the rented buffer would have to be copied if chunk is downloaded without giving a depot key.


if ( !response.IsSuccessStatusCode )
{
throw new SteamKitWebRequestException( $"Response status code does not indicate success: {response.StatusCode:D} ({response.ReasonPhrase}).", response );
}

if ( !response.Content.Headers.ContentLength.HasValue )
{
throw new SteamKitWebRequestException( "Response does not have Content-Length", response );
}

cts.CancelAfter( ResponseBodyTimeout );

var contentLength = ( int )response.Content.Headers.ContentLength;
var buffer = ArrayPool<byte>.Shared.Rent( contentLength );

var depotManifest = new DepotManifest( manifestData );
try
{
using var ms = new MemoryStream( buffer, 0, contentLength );

// Stream the http response into the rented buffer
await response.Content.CopyToAsync( ms, cts.Token );

if ( ms.Position != contentLength )
{
throw new InvalidDataException( $"Length mismatch after downloading depot manifest! (was {ms.Position}, but should be {contentLength})" );
}

ms.Position = 0;

// Decompress the zipped manifest data
using var zip = new ZipArchive( ms );
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZipArchive needs a seekable stream fyi (otherwise it literally copies the stream into its own temp MemoryStream).

var entries = zip.Entries;

DebugLog.Assert( entries.Count == 1, nameof( CDN ), "Expected the zip to contain only one file" );

using var zipEntryStream = entries[ 0 ].Open();
depotManifest = DepotManifest.Deserialize( zipEntryStream );
}
finally
{
ArrayPool<byte>.Shared.Return( buffer );
}
}
catch ( Exception ex )
{
DebugLog.WriteLine( nameof( CDN ), $"Failed to download manifest {request.RequestUri}: {ex.Message}" );
throw;
}
yaakov-h marked this conversation as resolved.
Show resolved Hide resolved

if ( depotKey != null )
{
Expand All @@ -108,53 +164,51 @@ public async Task<DepotManifest> DownloadManifestAsync( uint depotId, ulong mani
/// A <see cref="DepotManifest.ChunkData"/> instance that represents the chunk to download.
/// This value should come from a manifest downloaded with <see cref="o:DownloadManifestAsync"/>.
/// </param>
/// <returns>A <see cref="DepotChunk"/> instance that contains the data for the given chunk.</returns>
/// <returns>The total number of bytes written to <paramref name="destination" />.</returns>
/// <param name="server">The content server to connect to.</param>
/// <param name="destination">
/// The buffer to receive the chunk data. If <paramref name="depotKey"/> is provided, this will be the decompressed buffer.
/// Allocate or rent a buffer that is equal or longer than <see cref="DepotManifest.ChunkData.UncompressedLength"/>
/// </param>
/// <param name="depotKey">
/// The depot decryption key for the depot that will be downloaded.
/// This is used for decrypting filenames (if needed) in depot manifests, and processing depot chunks.
/// This is used to process the chunk data.
/// </param>
/// <param name="proxyServer">Optional content server marked as UseAsProxy which transforms the request.</param>
/// <exception cref="System.ArgumentNullException">chunk's <see cref="DepotManifest.ChunkData.ChunkID"/> was null.</exception>
/// <exception cref="System.IO.InvalidDataException">Thrown if the downloaded data does not match the expected length.</exception>
/// <exception cref="HttpRequestException">An network error occurred when performing the request.</exception>
/// <exception cref="SteamKitWebRequestException">A network error occurred when performing the request.</exception>
public async Task<DepotChunk> DownloadDepotChunkAsync( uint depotId, DepotManifest.ChunkData chunk, Server server, byte[]? depotKey = null, Server? proxyServer = null )
public async Task<int> DownloadDepotChunkAsync( uint depotId, DepotManifest.ChunkData chunk, Server server, byte[] destination, byte[]? depotKey = null, Server? proxyServer = null )
{
ArgumentNullException.ThrowIfNull( server );

ArgumentNullException.ThrowIfNull( chunk );
ArgumentNullException.ThrowIfNull( destination );

if ( chunk.ChunkID == null )
{
throw new ArgumentException( "Chunk must have a ChunkID.", nameof( chunk ) );
throw new ArgumentException( $"Chunk must have a {nameof( DepotManifest.ChunkData.ChunkID )}.", nameof( chunk ) );
}

var chunkID = Utils.EncodeHexString( chunk.ChunkID );

var chunkData = await DoRawCommandAsync( server, string.Format( "depot/{0}/chunk/{1}", depotId, chunkID ), proxyServer ).ConfigureAwait( false );

// assert that lengths match only if the chunk has a length assigned.
if ( chunk.CompressedLength > 0 && chunkData.Length != chunk.CompressedLength )
if ( depotKey == null )
{
throw new InvalidDataException( $"Length mismatch after downloading depot chunk! (was {chunkData.Length}, but should be {chunk.CompressedLength})" );
if ( destination.Length < chunk.CompressedLength )
{
throw new ArgumentException( $"The destination buffer must be longer than the chunk {nameof( DepotManifest.ChunkData.CompressedLength )} (since no depot key was provided).", nameof( destination ) );
}
}

var depotChunk = new DepotChunk( chunk, chunkData );

if ( depotKey != null )
else
{
// if we have the depot key, we can process the chunk immediately
depotChunk.Process( depotKey );
if ( destination.Length < chunk.UncompressedLength )
{
throw new ArgumentException( $"The destination buffer must be longer than the chunk {nameof( DepotManifest.ChunkData.UncompressedLength )}.", nameof( destination ) );
}
}

return depotChunk;
}
var chunkID = Utils.EncodeHexString( chunk.ChunkID );
var url = $"depot/{depotId}/chunk/{chunkID}";

async Task<byte[]> DoRawCommandAsync( Server server, string command, Server? proxyServer )
{
var url = BuildCommand( server, command, proxyServer );
using var request = new HttpRequestMessage( HttpMethod.Get, url );
using var request = new HttpRequestMessage( HttpMethod.Get, BuildCommand( server, url, proxyServer ) );

using var cts = new CancellationTokenSource();
cts.CancelAfter( RequestTimeout );
Expand All @@ -168,13 +222,65 @@ async Task<byte[]> DoRawCommandAsync( Server server, string command, Server? pro
throw new SteamKitWebRequestException( $"Response status code does not indicate success: {response.StatusCode:D} ({response.ReasonPhrase}).", response );
}

if ( !response.Content.Headers.ContentLength.HasValue )
{
throw new SteamKitWebRequestException( "Response does not have Content-Length", response );
}

var contentLength = ( int )response.Content.Headers.ContentLength;

// assert that lengths match only if the chunk has a length assigned.
if ( chunk.CompressedLength > 0 && contentLength != chunk.CompressedLength )
{
throw new InvalidDataException( $"Content-Length mismatch for depot chunk! (was {contentLength}, but should be {chunk.CompressedLength})" );
}

cts.CancelAfter( ResponseBodyTimeout );

return await response.Content.ReadAsByteArrayAsync( cts.Token ).ConfigureAwait( false );
// If no depot key is provided, stream into the destination buffer without renting
if ( depotKey == null )
{
using var ms = new MemoryStream( destination, 0, contentLength );

// Stream the http response into the provided destination
await response.Content.CopyToAsync( ms, cts.Token );

if ( ms.Position != contentLength )
{
throw new InvalidDataException( $"Length mismatch after downloading depot chunk! (was {ms.Position}, but should be {contentLength})" );
}

return contentLength;
}

// We have to stream into a temporary buffer because a decryption will need to be performed
var buffer = ArrayPool<byte>.Shared.Rent( contentLength );

try
{
using var ms = new MemoryStream( buffer, 0, contentLength );

// Stream the http response into the rented buffer
await response.Content.CopyToAsync( ms, cts.Token );

if ( ms.Position != contentLength )
{
throw new InvalidDataException( $"Length mismatch after downloading depot chunk! (was {ms.Position}, but should be {contentLength})" );
}

// process the chunk immediately
var writtenLength = DepotChunk.Process( chunk, buffer.AsSpan()[ ..contentLength ], destination, depotKey );

return writtenLength;
}
finally
{
ArrayPool<byte>.Shared.Return( buffer );
}
}
catch ( Exception ex )
{
DebugLog.WriteLine( nameof( CDN ), "Failed to complete web request to {0}: {1}", url, ex.Message );
DebugLog.WriteLine( nameof( CDN ), $"Failed to download a depot chunk {request.RequestUri}: {ex.Message}" );
throw;
}
}
Expand Down
98 changes: 49 additions & 49 deletions SteamKit2/SteamKit2/Steam/CDN/DepotChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,84 +4,84 @@
*/

using System;
using System.Buffers;
using System.IO;
using System.Linq;
using System.Security.Cryptography;

namespace SteamKit2.CDN
{
/// <summary>
/// Represents a single downloaded chunk from a file in a depot.
/// Provides a helper function to decrypt and decompress a single depot chunk.
/// </summary>
public sealed class DepotChunk
public static class DepotChunk
{
/// <summary>
/// Gets the depot manifest chunk information associated with this chunk.
/// </summary>
public DepotManifest.ChunkData ChunkInfo { get; }

/// <summary>
/// Gets a value indicating whether this chunk has been processed. A chunk is processed when the data has been decrypted and decompressed.
/// </summary>
/// <value>
/// <c>true</c> if this chunk has been processed; otherwise, <c>false</c>.
/// </value>
public bool IsProcessed { get; internal set; }

/// <summary>
/// Gets the underlying data for this chunk.
/// </summary>
public byte[] Data { get; private set; }

/// <summary>
/// Initializes a new instance of the <see cref="DepotChunk"/> class.
/// </summary>
/// <param name="info">The manifest chunk information associated with this chunk.</param>
/// <param name="data">The underlying data for this chunk.</param>
public DepotChunk( DepotManifest.ChunkData info, byte[] data )
{
ArgumentNullException.ThrowIfNull( info );

ArgumentNullException.ThrowIfNull( data );

ChunkInfo = info;
Data = data;
}

/// <summary>
/// Processes the specified depot key by decrypting the data with the given depot encryption key, and then by decompressing the data.
/// If the chunk has already been processed, this function does nothing.
/// </summary>
/// <param name="info">The depot chunk data representing.</param>
/// <param name="data">The encrypted chunk data.</param>
/// <param name="destination">The buffer to receive the decrypted chunk data.</param>
/// <param name="depotKey">The depot decryption key.</param>
/// <exception cref="System.IO.InvalidDataException">Thrown if the processed data does not match the expected checksum given in it's chunk information.</exception>
public void Process( byte[] depotKey )
/// <exception cref="InvalidDataException">Thrown if the processed data does not match the expected checksum given in it's chunk information.</exception>
public static int Process( DepotManifest.ChunkData info, ReadOnlySpan<byte> data, byte[] destination, byte[] depotKey )
{
ArgumentNullException.ThrowIfNull( info );
ArgumentNullException.ThrowIfNull( depotKey );

if ( IsProcessed )
if ( destination.Length < info.UncompressedLength )
{
return;
throw new ArgumentException( $"The destination buffer must be longer than the chunk {nameof( DepotManifest.ChunkData.UncompressedLength )}.", nameof( destination ) );
}

byte[] processedData = CryptoHelper.SymmetricDecrypt( Data, depotKey );
DebugLog.Assert( depotKey.Length == 32, nameof( DepotChunk ), $"Tried to decrypt depot chunk with non 32 byte key!" );

using var aes = Aes.Create();
aes.BlockSize = 128;
aes.KeySize = 256;
aes.Key = depotKey;

// first 16 bytes of input is the ECB encrypted IV
Span<byte> iv = stackalloc byte[ 16 ];
aes.DecryptEcb( data[ ..iv.Length ], iv, PaddingMode.None );

if ( processedData.Length > 1 && processedData[ 0 ] == 'V' && processedData[ 1 ] == 'Z' )
// With CBC and padding, the decrypted size will always be smaller
var buffer = ArrayPool<byte>.Shared.Rent( data.Length - iv.Length );

var writtenDecompressed = 0;

try
{
var written = aes.DecryptCbc( data[ iv.Length.. ], iv, buffer, PaddingMode.PKCS7 );
var decryptedStream = new MemoryStream( buffer, 0, written );

if ( buffer.Length > 1 && buffer[ 0 ] == 'V' && buffer[ 1 ] == 'Z' )
{
writtenDecompressed = VZipUtil.Decompress( decryptedStream, destination, verifyChecksum: false );
}
else
{
writtenDecompressed = ZipUtil.Decompress( decryptedStream, destination, verifyChecksum: false );
}
}
finally
{
processedData = VZipUtil.Decompress( processedData );
ArrayPool<byte>.Shared.Return( buffer );
}
else

if ( info.UncompressedLength != writtenDecompressed )
{
processedData = ZipUtil.Decompress( processedData );
throw new InvalidDataException( $"Processed data checksum failed to decompressed to the expected chunk uncompressed length. (was {writtenDecompressed}, should be {info.UncompressedLength})" );
}

var dataCrc = Utils.AdlerHash( processedData );
var dataCrc = Utils.AdlerHash( destination.AsSpan()[ ..writtenDecompressed ] );

if ( dataCrc != ChunkInfo.Checksum )
if ( dataCrc != info.Checksum )
{
throw new InvalidDataException( "Processed data checksum is incorrect! Downloaded depot chunk is corrupt or invalid/wrong depot key?" );
}

Data = processedData;
IsProcessed = true;
return writtenDecompressed;
}
}
}
Loading
Loading