diff --git a/src/MonoTorrent.Tests/Client/DiskWriterTests.cs b/src/MonoTorrent.Tests/Client/DiskWriterTests.cs index 909046a99..5d71b1647 100644 --- a/src/MonoTorrent.Tests/Client/DiskWriterTests.cs +++ b/src/MonoTorrent.Tests/Client/DiskWriterTests.cs @@ -66,6 +66,8 @@ public void Teardown () public async Task CloseFileAsync_Opened () { using var writer = new DiskWriter (); + using var locker = await TorrentFile.Locker.EnterAsync (); + await writer.WriteAsync (TorrentFile, 0, new byte[10], 0, 10); Assert.IsTrue (File.Exists (TorrentFile.FullPath)); @@ -74,10 +76,11 @@ public async Task CloseFileAsync_Opened () } [Test] - public void CloseFileAsync_Unopened() + public async Task CloseFileAsync_Unopened() { using var writer = new DiskWriter (); - Assert.DoesNotThrowAsync (async () => await writer.CloseAsync (TorrentFile)); + using (await TorrentFile.Locker.EnterAsync ()) + Assert.DoesNotThrowAsync (async () => await writer.CloseAsync (TorrentFile)); } [Test] @@ -94,11 +97,15 @@ public async Task ExceedMaxOpenFiles () }; using var writer = new DiskWriter (creator, 1); + using var locker = await TorrentFile.Locker.EnterAsync (); + var writeTask = writer.WriteAsync (TorrentFile, 0, new byte[100], 0, 100); await streamCreated.Task.WithTimeout (); // There's a limit of 1 concurrent read/write. var secondStreamWaiter = streamCreated.Task.AsTask (); + + using var secondLocker = await Others.First ().Locker.EnterAsync (); var secondStream = writer.WriteAsync (Others.First (), 0, new byte[100], 0, 100); Assert.ThrowsAsync (() => secondStreamWaiter.WithTimeout (100)); diff --git a/src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs b/src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs index 5f8735b98..16ad5e5b6 100644 --- a/src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs +++ b/src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs @@ -96,6 +96,8 @@ internal async ReusableTask FlushAsync (ITorrentFileInfo file) internal RentedStream GetStream (ITorrentFileInfo file) { + file.ThrowIfNotLocked (); + if (Streams.TryGetValue (file, out ITorrentFileStream stream)) return new RentedStream (stream); return new RentedStream (null); @@ -103,6 +105,8 @@ internal RentedStream GetStream (ITorrentFileInfo file) internal async ReusableTask GetStreamAsync (ITorrentFileInfo file, FileAccess access) { + file.ThrowIfNotLocked (); + if (!Streams.TryGetValue (file, out ITorrentFileStream s)) s = null; @@ -160,6 +164,8 @@ void Add (ITorrentFileInfo file, ITorrentFileStream stream) void CloseAndRemove (ITorrentFileInfo file, ITorrentFileStream s) { + file.ThrowIfNotLocked (); + logger.InfoFormatted ("Closing and removing: {0}", file.Path); Streams.Remove (file); UsageOrder.Remove (file); diff --git a/src/MonoTorrent/MonoTorrent.Client/MainLoop.cs b/src/MonoTorrent/MonoTorrent.Client/MainLoop.cs index 3f7a8cb9e..5c7bef8ca 100644 --- a/src/MonoTorrent/MonoTorrent.Client/MainLoop.cs +++ b/src/MonoTorrent/MonoTorrent.Client/MainLoop.cs @@ -30,6 +30,7 @@ using System; using System.Collections.Generic; using System.ComponentModel; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; @@ -203,6 +204,13 @@ public static ThreadSwitcher SwitchThread () return new ThreadSwitcher (); } -#endregion + [Conditional("DEBUG")] + internal void CheckThread () + { + if (Thread.CurrentThread != thread) + throw new InvalidOperationException ($"Missing context switch to the {thread.Name} MainLoop."); + } + + #endregion } } \ No newline at end of file diff --git a/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs b/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs index 7ed99ed7c..7b8d03adc 100644 --- a/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs +++ b/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs @@ -324,7 +324,8 @@ public async Task FlushAsync (ITorrentData manager, int pieceIndex) await WaitForPendingWrites (); foreach (var file in manager.Files) { if (pieceIndex == -1 || (pieceIndex >= file.StartPieceIndex && pieceIndex <= file.EndPieceIndex)) - await Writer.FlushAsync (file); + using (await file.Locker.EnterAsync ()) + await Writer.FlushAsync (file); } } @@ -432,6 +433,8 @@ internal async ReusableTask WriteAsync (ITorrentData manager, long offset, byte[ async ReusableTask ProcessBufferedIOAsync (bool force = false) { + await IOLoop; + BufferedIO io; while (WriteQueue.Count > 0) { @@ -498,6 +501,8 @@ public static int FindFileIndex (IList files, long offset, int async ReusableTask DoReadAsync (ITorrentData manager, long offset, byte[] buffer, int count) { + IOLoop.CheckThread (); + ReadMonitor.AddDelta (count); if (count < 1) @@ -531,12 +536,8 @@ async ReusableTask DoReadAsync (ITorrentData manager, long offset, byte[] async ReusableTask ReadFromFileAsync (ITorrentFileInfo torrentFile, long offset, byte[] buffer, int bufferOffset, int count) { - await torrentFile.Locker.WaitAsync (); - try { + using (await torrentFile.Locker.EnterAsync ()) return await Writer.ReadAsync (torrentFile, offset, buffer, bufferOffset, count); - } finally { - torrentFile.Locker.Release (); - } } /// @@ -560,10 +561,7 @@ internal void Tick () /// The amount of time, in milliseconds, which has passed /// internal async ReusableTask Tick (int delta) - { - await IOLoop; - await Tick (delta, true); - } + => await Tick (delta, true); ReusableTask Tick (int delta, bool waitForBufferedIO) { @@ -581,6 +579,8 @@ ReusableTask Tick (int delta, bool waitForBufferedIO) async ReusableTask DoWriteAsync (ITorrentData manager, long offset, byte[] buffer, int count) { + IOLoop.CheckThread (); + WriteMonitor.AddDelta (count); if (offset < 0 || offset + count > manager.Size) diff --git a/src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs b/src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs index aa743136c..7beea5449 100644 --- a/src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs +++ b/src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs @@ -27,6 +27,7 @@ // +using System.Diagnostics; using System.Threading; namespace MonoTorrent.Client @@ -34,9 +35,12 @@ namespace MonoTorrent.Client public interface ITorrentFileInfo : ITorrentFile { + // FIXME: make BitField readonly. BitField BitField { get; } string FullPath { get; } Priority Priority { get; set; } + + // FIXME: Make this internal. SemaphoreSlim Locker { get; } (int startPiece, int endPiece) GetSelector (); @@ -46,5 +50,12 @@ public static class ITorrentFileInfoExtensions { public static long BytesDownloaded (this ITorrentFileInfo info) => (long) (info.BitField.PercentComplete * info.Length / 100.0); + + [Conditional ("DEBUG")] + internal static void ThrowIfNotLocked(this ITorrentFileInfo info) + { + if (info.Locker.CurrentCount > 0) + throw new System.InvalidOperationException ("File should have been locked before it was accessed"); + } } }