In my C# console application, I'm trying to run multiple tasks that do various data checks simultaneously. If one of the tasks returns true I should stop the other tasks since I have my actionable result. It's also very possible none of the functions return true
I have the code to run the tasks together (I think), I'm just having trouble getting to the finish line:
Task task1 = Task.Run(() => Task1(stoppingToken));
Task task2 = Task.Run(() => Task2(stoppingToken));
Task task3 = Task.Run(() => Task3(stoppingToken));
Task task4 = Task.Run(() => Task4(stoppingToken));
Task task5 = Task.Run(() => Task5(stoppingToken));
Task task6 = Task.Run(() => Task6(stoppingToken));
Task.WaitAll(task1, task2, task3, task4, task5, task6);
This is a little different than the answer in the linked question where the desired result is known (timeout value). I'm waiting for any of these tasks to possibly return true and then cancel the remaining tasks if they are still running
Task.WhenAny with cancellation of the non completed tasks and timeout
Assuming your tasks return bool
you can do something like this:
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken stoppingToken = source.Token;
Task<bool> task1 = Task.Run(() => Task1(stoppingToken));
....
var tasks = new List<Task<bool>>
{
task1, task2, task3, ...
};
bool taskResult = false;
do
{
var finished = await Task.WhenAny(tasks);
taskResult = finished.Result;
tasks.Remove(finished);
} while (tasks.Any() && !taskResult);
source.Cancel();
Here's a solution based on continuation tasks. The idea is to append continuation tasks to each of the original (provided) tasks, and check the result there. If it's a match, the completion source will be set with a result (if there's no match, the result won't be set at all).
Then, the code will wait for whatever happens first: either all the continuation tasks complete, or the task completion result will be set. Either way, we'll be ready to check the result of the task associated with task completion source (that's why we wait for the continuation tasks to complete, not the original tasks) and if it's set, it's pretty much an indication that we have a match (the additional check at the end is a little paranoid, but better safe than sorry I guess... :D)
public static async Task<bool> WhenAnyHasResult<T>(Predicate<T> isExpectedResult, params Task<T>[] tasks)
{
const TaskContinuationOptions continuationTaskFlags = TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.AttachedToParent;
// Prepare TaskCompletionSource to be set only when one of the provided tasks
// completes with expected result
var tcs = new TaskCompletionSource<T>();
// For every provided task, attach a continuation task that fires
// once the original task was completed
var taskContinuations = tasks.Select(task =>
{
return task.ContinueWith(x =>
{
var taskResult = x.Result;
if (isExpectedResult(taskResult))
{
tcs.SetResult(taskResult);
}
},
continuationTaskFlags);
});
// We either wait for all the continuation tasks to be completed
// (it's most likely an indication that none of the provided tasks completed with the expected result)
// or for the TCS task to complete (which means a failure)
await Task.WhenAny(Task.WhenAll(taskContinuations), tcs.Task);
// If the task from TCS has run to completion, it means the result has been set from
// the continuation task attached to one of the tasks provided in the arguments
var completionTask = tcs.Task;
if (completionTask.IsCompleted)
{
// We will check once more to make sure the result is set as expected
// and return this as our outcome
var tcsResult = completionTask.Result;
return isExpectedResult(tcsResult);
}
// TCS result was never set, which means we did not find a task matching the expected result.
tcs.SetCanceled();
return false;
}
Now, the usage will be as follows:
static async Task ExampleWithBooleans()
{
Console.WriteLine("Example with booleans");
var task1 = SampleTask(3000, true);
var task2 = SampleTask(5000, false);
var finalResult = await TaskUtils.WhenAnyHasResult(result => result == true, task1, task2);
// go ahead and cancel your cancellation token here
Console.WriteLine("Final result: " + finalResult);
Debug.Assert(finalResult == true);
Console.WriteLine();
}
What's nice about putting it into a generic method, is that it works with any type, not only booleans, as a result of the original task.
You could use an asynchronous method that wraps a Task<bool>
to another Task<bool>
, and cancels a CancellationTokenSource
if the result of the input task is true
. In the example below this method is the IfTrueCancel
, and it is implemented as local function. This way it captures the CancellationTokenSource
, and so you don't have to pass it as argument on every call:
var cts = new CancellationTokenSource();
var stoppingToken = cts.Token;
var task1 = IfTrueCancel(Task.Run(() => Task1(stoppingToken)));
var task2 = IfTrueCancel(Task.Run(() => Task2(stoppingToken)));
var task3 = IfTrueCancel(Task.Run(() => Task3(stoppingToken)));
var task4 = IfTrueCancel(Task.Run(() => Task4(stoppingToken)));
var task5 = IfTrueCancel(Task.Run(() => Task5(stoppingToken)));
var task6 = IfTrueCancel(Task.Run(() => Task6(stoppingToken)));
Task.WaitAll(task1, task2, task3, task4, task5, task6);
async Task<bool> IfTrueCancel(Task<bool> task)
{
bool result = await task.ConfigureAwait(false);
if (result) cts.Cancel();
return result;
}
Another, quite different, solution to this problem could be to use PLINQ instead of explicitly created Task
s. PLINQ requires an IEnumerable
of something in order to do parallel work on it, and in your case this something is the Task1
, Task2
etc functions that you want to invoke. You could put them in an array of Func<CancellationToken, bool>
, and solve the problem this way:
var functions = new Func<CancellationToken, bool>[]
{
Task1, Task2, Task3, Task4, Task5, Task6
};
bool success = functions
.AsParallel()
.WithDegreeOfParallelism(4)
.Select(function =>
{
try
{
bool result = function(stoppingToken);
if (result) cts.Cancel();
return result;
}
catch (OperationCanceledException)
{
return false;
}
})
.Any(result => result);
The advantage of this approach is that you can configure the degree of parallelism, and you don't have to rely on the ThreadPool
availability for limiting the concurrency of the whole operation. The disadvantage is that all functions should have the same signature. You could overcome this disadvantage by declaring the functions as lambda expressions like this:
var functions = new Func<CancellationToken, bool>[]
{
ct => Task1(arg1, ct),
ct => Task2(arg1, arg2, ct),
ct => Task3(ct),
ct => Task4(arg1, arg2, arg3, ct),
ct => Task5(arg1, ct),
ct => Task6(ct)
};