Skip to content

Commit

Permalink
[core] Fix a threading issue in FlushAsync
Browse files Browse the repository at this point in the history
We should ensure we're executing on the IO thread
when we're interacting with the filestreams.

The 'Tick' method is invoked from the main engine's
tick so it'll be on the wrong thread unless we swap
here.

Also, ensure we take an async lock on the filestream
we intend to flush. This ensures there's multi-threaded
IO, but each stream is accessed in a single-threaded way.

Fixes #345
  • Loading branch information
alanmcgovern committed Dec 7, 2020
1 parent 6bbd8ad commit ec3c8eb
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 13 deletions.
11 changes: 9 additions & 2 deletions src/MonoTorrent.Tests/Client/DiskWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public void Teardown ()
public async Task CloseFileAsync_Opened ()
{
using var writer = new DiskWriter ();
using var locker = await TorrentFile.Locker.EnterAsync ();

await writer.WriteAsync (TorrentFile, 0, new byte[10], 0, 10);
Assert.IsTrue (File.Exists (TorrentFile.FullPath));

Expand All @@ -74,10 +76,11 @@ public async Task CloseFileAsync_Opened ()
}

[Test]
public void CloseFileAsync_Unopened()
public async Task CloseFileAsync_Unopened()
{
using var writer = new DiskWriter ();
Assert.DoesNotThrowAsync (async () => await writer.CloseAsync (TorrentFile));
using (await TorrentFile.Locker.EnterAsync ())
Assert.DoesNotThrowAsync (async () => await writer.CloseAsync (TorrentFile));
}

[Test]
Expand All @@ -94,11 +97,15 @@ public async Task ExceedMaxOpenFiles ()
};
using var writer = new DiskWriter (creator, 1);

using var locker = await TorrentFile.Locker.EnterAsync ();

var writeTask = writer.WriteAsync (TorrentFile, 0, new byte[100], 0, 100);
await streamCreated.Task.WithTimeout ();

// There's a limit of 1 concurrent read/write.
var secondStreamWaiter = streamCreated.Task.AsTask ();

using var secondLocker = await Others.First ().Locker.EnterAsync ();
var secondStream = writer.WriteAsync (Others.First (), 0, new byte[100], 0, 100);
Assert.ThrowsAsync<TimeoutException> (() => secondStreamWaiter.WithTimeout (100));

Expand Down
6 changes: 6 additions & 0 deletions src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,17 @@ internal async ReusableTask FlushAsync (ITorrentFileInfo file)

internal RentedStream GetStream (ITorrentFileInfo file)
{
file.ThrowIfNotLocked ();

if (Streams.TryGetValue (file, out ITorrentFileStream stream))
return new RentedStream (stream);
return new RentedStream (null);
}

internal async ReusableTask<RentedStream> GetStreamAsync (ITorrentFileInfo file, FileAccess access)
{
file.ThrowIfNotLocked ();

if (!Streams.TryGetValue (file, out ITorrentFileStream s))
s = null;

Expand Down Expand Up @@ -160,6 +164,8 @@ void Add (ITorrentFileInfo file, ITorrentFileStream stream)

void CloseAndRemove (ITorrentFileInfo file, ITorrentFileStream s)
{
file.ThrowIfNotLocked ();

logger.InfoFormatted ("Closing and removing: {0}", file.Path);
Streams.Remove (file);
UsageOrder.Remove (file);
Expand Down
10 changes: 9 additions & 1 deletion src/MonoTorrent/MonoTorrent.Client/MainLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;

Expand Down Expand Up @@ -203,6 +204,13 @@ public static ThreadSwitcher SwitchThread ()
return new ThreadSwitcher ();
}

#endregion
[Conditional("DEBUG")]
internal void CheckThread ()
{
if (Thread.CurrentThread != thread)
throw new InvalidOperationException ($"Missing context switch to the {thread.Name} MainLoop.");
}

#endregion
}
}
20 changes: 10 additions & 10 deletions src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ public async Task FlushAsync (ITorrentData manager, int pieceIndex)
await WaitForPendingWrites ();
foreach (var file in manager.Files) {
if (pieceIndex == -1 || (pieceIndex >= file.StartPieceIndex && pieceIndex <= file.EndPieceIndex))
await Writer.FlushAsync (file);
using (await file.Locker.EnterAsync ())
await Writer.FlushAsync (file);
}
}

Expand Down Expand Up @@ -432,6 +433,8 @@ internal async ReusableTask WriteAsync (ITorrentData manager, long offset, byte[

async ReusableTask ProcessBufferedIOAsync (bool force = false)
{
await IOLoop;

BufferedIO io;

while (WriteQueue.Count > 0) {
Expand Down Expand Up @@ -498,6 +501,8 @@ public static int FindFileIndex (IList<ITorrentFileInfo> files, long offset, int

async ReusableTask<bool> DoReadAsync (ITorrentData manager, long offset, byte[] buffer, int count)
{
IOLoop.CheckThread ();

ReadMonitor.AddDelta (count);

if (count < 1)
Expand Down Expand Up @@ -531,12 +536,8 @@ async ReusableTask<bool> DoReadAsync (ITorrentData manager, long offset, byte[]

async ReusableTask<int> ReadFromFileAsync (ITorrentFileInfo torrentFile, long offset, byte[] buffer, int bufferOffset, int count)
{
await torrentFile.Locker.WaitAsync ();
try {
using (await torrentFile.Locker.EnterAsync ())
return await Writer.ReadAsync (torrentFile, offset, buffer, bufferOffset, count);
} finally {
torrentFile.Locker.Release ();
}
}

/// <summary>
Expand All @@ -560,10 +561,7 @@ internal void Tick ()
/// <param name="delta">The amount of time, in milliseconds, which has passed</param>
/// <returns></returns>
internal async ReusableTask Tick (int delta)
{
await IOLoop;
await Tick (delta, true);
}
=> await Tick (delta, true);

ReusableTask Tick (int delta, bool waitForBufferedIO)
{
Expand All @@ -581,6 +579,8 @@ ReusableTask Tick (int delta, bool waitForBufferedIO)

async ReusableTask DoWriteAsync (ITorrentData manager, long offset, byte[] buffer, int count)
{
IOLoop.CheckThread ();

WriteMonitor.AddDelta (count);

if (offset < 0 || offset + count > manager.Size)
Expand Down
11 changes: 11 additions & 0 deletions src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@
//


using System.Diagnostics;
using System.Threading;

namespace MonoTorrent.Client
{

public interface ITorrentFileInfo : ITorrentFile
{
// FIXME: make BitField readonly.
BitField BitField { get; }
string FullPath { get; }
Priority Priority { get; set; }

// FIXME: Make this internal.
SemaphoreSlim Locker { get; }

(int startPiece, int endPiece) GetSelector ();
Expand All @@ -46,5 +50,12 @@ public static class ITorrentFileInfoExtensions
{
public static long BytesDownloaded (this ITorrentFileInfo info)
=> (long) (info.BitField.PercentComplete * info.Length / 100.0);

[Conditional ("DEBUG")]
internal static void ThrowIfNotLocked(this ITorrentFileInfo info)
{
if (info.Locker.CurrentCount > 0)
throw new System.InvalidOperationException ("File should have been locked before it was accessed");
}
}
}

0 comments on commit ec3c8eb

Please sign in to comment.