Skip to content

Commit

Permalink
Make TryRun thread safe (#605)
Browse files Browse the repository at this point in the history
### Changes
- Make `IImpelementationTask.TryRun` thread-safe
- Add `IImpelementationTask.IsIdle`

### Testing
- Update tests to exercise new behavior.
  • Loading branch information
keyboardDrummer authored Jul 21, 2022
1 parent 81869ea commit 39f0dfb
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 36 deletions.
3 changes: 1 addition & 2 deletions Source/Core/AbsyCmd.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,8 +1214,7 @@ public List<Cmd> /*!*/ Cmds

public IEnumerable<Block> Exits()
{
GotoCmd g = TransferCmd as GotoCmd;
if (g != null)
if (TransferCmd is GotoCmd g)
{
return cce.NonNull(g.labelTargets);
}
Expand Down
2 changes: 1 addition & 1 deletion Source/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<!-- Target framework and package configuration -->
<PropertyGroup>
<Version>2.15.6</Version>
<Version>2.15.7</Version>
<TargetFramework>net6.0</TargetFramework>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<Authors>Boogie</Authors>
Expand Down
93 changes: 77 additions & 16 deletions Source/ExecutionEngine/ImplementationTask.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#nullable enable
using System;
using System.IO;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -35,12 +37,20 @@ public interface IImplementationTask {
ProcessedProgram ProcessedProgram { get; }
Implementation Implementation { get; }

/// <summary>
/// If not running, start running.
/// If already running and not cancelled, return null.
/// If already running but being cancelled, queue a new run and return its observable.
/// If already running but being cancelled, and a new run is queued, return null.
/// </summary>
IObservable<IVerificationStatus>? TryRun();
bool IsIdle { get; }
void Cancel();
}

public class ImplementationTask : IImplementationTask {
private readonly ExecutionEngine engine;
private readonly object mayAccessCancellationSource = new();

public IVerificationStatus CacheStatus { get; private set; }

Expand All @@ -62,40 +72,92 @@ public ImplementationTask(ExecutionEngine engine, ProcessedProgram processedProg
}

private CancellationTokenSource? cancellationSource;
private ReplaySubject<IVerificationStatus>? status;

public void Cancel() {
cancellationSource?.Cancel();
cancellationSource = null;
}

public bool IsIdle => cancellationSource == null;

public IObservable<IVerificationStatus>? TryRun()
{
if (CacheStatus is Completed) {
lock (mayAccessCancellationSource) {
if (cancellationSource == null) {
return StartRunIfNeeded();
}

if (cancellationSource.IsCancellationRequested) {
// Another thread is running but was cancelled,
// so we may immediately start a new run after the cancellation completes.
return QueueRun();
}

// Another thread is running and is not cancelled, so this run fails.
return null;
}
}

var alreadyRunning = cancellationSource != null;
if (alreadyRunning) {
private IObservable<IVerificationStatus>? StartRunIfNeeded()
{
// No other thread is running or can start, so we can safely access CacheStatus
if (CacheStatus is Completed) {
return null;
}

// We claim the right to run.
cancellationSource = new();

var cancellationToken = cancellationSource.Token;

var observableStatus = new ReplaySubject<IVerificationStatus>();
cancellationToken.Register(() => {
observableStatus.OnNext(new Stale());
observableStatus.OnCompleted();
});
var task = RunInternal(cancellationToken, observableStatus.OnNext);
status = new ReplaySubject<IVerificationStatus>();
var task = RunInternal(cancellationToken, status.OnNext);
task.ContinueWith(r =>
{
if (r.Exception != null) {
observableStatus.OnError(r.Exception);
} else {
observableStatus.OnCompleted();
// Lock so we may do operations after clearing cancellationSource,
// which releases our control over the field status.
lock (mayAccessCancellationSource) {
// Clear cancellationSource before calling status.OnCompleted, so ImplementationTask.IsIdle returns true
cancellationSource = null;
if (cancellationToken.IsCancellationRequested && CacheStatus is not Completed) {
status.OnNext(new Stale());
}
if (r.Exception != null) {
status.OnError(r.Exception);
} else {
status.OnCompleted();
}
}
}, TaskScheduler.Current);
return observableStatus;
return status;
}

private IObservable<IVerificationStatus>? QueueRun()
{
// We claim the right to run.
cancellationSource = new();
var myCancellationSource = cancellationSource;

// After the current run cancellation completes, call TryRun, assume it succeeds,
// and forward the observations to result.
var result = new ReplaySubject<IVerificationStatus>();
status!.Subscribe(next => { }, () =>
{
if (myCancellationSource.IsCancellationRequested) {
// Queued run was cancelled before it started.
result.OnNext(CacheStatus);
result.OnCompleted();
} else {
// The running thread has just cleared cancellationSource, so TryRun will return a non-null value.
var recursiveStatus = TryRun();
recursiveStatus!.Subscribe(result);
// Forward cancellation requests that happened between our
// myCancellationSource.IsCancellationRequested check and TryRun call
myCancellationSource.Token.Register(() => cancellationSource.Cancel());
}
});
return result;
}

private async Task<VerificationResult> RunInternal(CancellationToken cancellationToken, Action<IVerificationStatus> notifyStatusChange) {
Expand All @@ -113,7 +175,6 @@ private async Task<VerificationResult> RunInternal(CancellationToken cancellatio

var result = await verifyTask;
CacheStatus = new Completed(result);
cancellationSource = null;
notifyStatusChange(CacheStatus);
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion Source/Provers/SMTLib/NoopSolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public override async Task<IReadOnlyList<SExpr>> SendRequestsAndCloseInput(IRead
return result;
}

private Task<SExpr> GetProverResponse()
protected virtual Task<SExpr> GetProverResponse()
{
return Task.FromResult(responses.Count > 0 ? responses.Dequeue() : null);
}
Expand Down
24 changes: 22 additions & 2 deletions Source/Provers/SMTLib/UnsatSolver.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Boogie.SMTLib;

public class UnsatSolver : NoopSolver
{
public class UnsatSolver : NoopSolver {
private readonly SemaphoreSlim semaphore;

public UnsatSolver() : this(new SemaphoreSlim(int.MaxValue)) {
}

public UnsatSolver(SemaphoreSlim semaphore) {
this.semaphore = semaphore;
}

public override void Send(string request)
{
if (request == "(check-sat)")
Expand All @@ -11,4 +23,12 @@ public override void Send(string request)
}
base.Send(request);
}


protected override async Task<SExpr> GetProverResponse() {
if (responses.Peek().Name == "unsat") {
await semaphore.WaitAsync();
}
return await base.GetProverResponse();
}
}
78 changes: 64 additions & 14 deletions Source/UnitTests/ExecutionEngineTests/ExecutionEngineTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Boogie;
using Microsoft.Boogie.SMTLib;
using NUnit.Framework;
using VC;

Expand Down Expand Up @@ -47,7 +49,11 @@ procedure Second(y: int)
Assert.AreEqual(ConditionGeneration.Outcome.Errors, verificationResult1.Outcome);
Assert.AreEqual(true, verificationResult1.Errors[0].Model.ModelHasStatesAlready);

var result2 = await tasks[1].TryRun()!.ToTask();
Assert.IsTrue(tasks[1].IsIdle);
var runningStates = tasks[1].TryRun()!;
Assert.IsFalse(tasks[1].IsIdle);
var result2 = await runningStates.ToTask();
Assert.IsTrue(tasks[1].IsIdle);
var verificationResult2 = ((Completed)result2).Result;
Assert.AreEqual(ConditionGeneration.Outcome.Correct, verificationResult2.Outcome);
}
Expand Down Expand Up @@ -182,34 +188,78 @@ public async Task LoopInvariantDescriptions()
}

[Test]
public async Task RunCancelRun() {
public async Task RunCancelRunCancel() {
var options = CommandLineOptions.FromArguments();
options.VcsCores = 1;
options.CreateSolver = (_, _) => new UnsatSolver(new SemaphoreSlim(0));
var engine = ExecutionEngine.CreateWithoutSharedCache(options);

var source = @"
function Fib(x: int): int;
axiom (forall x: int :: (x <= 1 || Fib(x) == Fib(x - 1) + Fib(x - 2)));
axiom (Fib(1) == 1);
axiom (Fib(0) == 1);
procedure FibTest() {
assert Fib(31) == 1346269;
procedure Foo(x: int) {
assert true;
}".TrimStart();
var result = Parser.Parse(source, "fakeFilename1", out var program);
Assert.AreEqual(0, result);
var tasks = engine.GetImplementationTasks(program)[0];
var statusList = new List<IVerificationStatus>();
var statusList1 = new List<IVerificationStatus>();
var firstStatuses = tasks.TryRun()!;
firstStatuses.Subscribe(statusList.Add);
await firstStatuses.Where(s => s is Running).FirstAsync().ToTask();
firstStatuses.Subscribe(statusList1.Add);
tasks.Cancel();

var secondStatuses = tasks.TryRun()!;
secondStatuses.Subscribe(statusList.Add);
tasks.Cancel();
var statusList2 = new List<IVerificationStatus>();
secondStatuses.Subscribe(statusList2.Add);
await secondStatuses.DefaultIfEmpty().ToTask();
var expected1 = new List<IVerificationStatus>() {
new Running(), new Stale()
};
Assert.AreEqual(expected1, statusList1);
var expected2 = new List<IVerificationStatus>() {
new Stale()
};
Assert.AreEqual(expected2, statusList2.TakeLast(1));
}

[Test]
public async Task RunRunCancelRunRun() {
var options = CommandLineOptions.FromArguments();
var returnCheckSat = new SemaphoreSlim(0);
options.VcsCores = 1;
options.CreateSolver = (_, _) => new UnsatSolver(returnCheckSat);
var engine = ExecutionEngine.CreateWithoutSharedCache(options);

var source = @"
procedure Foo(x: int) {
assert true;
}".TrimStart();
var result = Parser.Parse(source, "fakeFilename1", out var program);
Assert.AreEqual(0, result);
var tasks = engine.GetImplementationTasks(program)[0];
var statusList1 = new List<IVerificationStatus>();
var firstStatuses = tasks.TryRun()!;
var runAfterRun1 = tasks.TryRun();
Assert.AreEqual(null, runAfterRun1);
firstStatuses.Subscribe(statusList1.Add);
tasks.Cancel();

var secondStatuses = tasks.TryRun()!;
var runAfterRun2 = tasks.TryRun();
Assert.AreEqual(null, runAfterRun2);
var statusList2 = new List<IVerificationStatus>();
secondStatuses.Subscribe(statusList2.Add);
returnCheckSat.Release();
var finalResult = await secondStatuses.ToTask();
Assert.IsTrue(finalResult is Completed);
var expected = new List<IVerificationStatus>() {
new Running(), new Stale(), new Queued(), new Running(), finalResult
var expected1 = new List<IVerificationStatus>() {
new Running(), new Stale()
};
Assert.AreEqual(expected1, statusList1);
var expected2 = new List<IVerificationStatus>() {
new Running(), finalResult
};
Assert.AreEqual(expected, statusList);
Assert.AreEqual(expected2, statusList2.Where(s => s is not Queued));
}

[Test]
Expand Down

0 comments on commit 39f0dfb

Please sign in to comment.