C# Parallel.Invoke 实现

发表于:2017-12-19 09:55

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:dz45693    来源:博客园

  Parallel.Invoke应该是Parallel几个方法中最简单的一个了,我们来看看它的实现,为了方法大家理解,我尽量保留源码中的注释:
public static class Parallel
{
internal static int s_forkJoinContextID;
internal const int DEFAULT_LOOP_STRIDE = 16;
internal static ParallelOptions s_defaultParallelOptions = new ParallelOptions();
public static void Invoke(params Action[] actions)
{
Invoke(s_defaultParallelOptions, actions);
}
//Executes each of the provided actions, possibly in parallel.
public static void Invoke(ParallelOptions parallelOptions, params Action[] actions)
{
if (actions == null)
{
throw new ArgumentNullException("actions");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
// Throw an ODE if we're passed a disposed CancellationToken.
if (parallelOptions.CancellationToken.CanBeCanceled && AppContextSwitches.ThrowExceptionIfDisposedCancellationTokenSource)
{
parallelOptions.CancellationToken.ThrowIfSourceDisposed();
}
// Quit early if we're already canceled -- avoid a bunch of work.
if (parallelOptions.CancellationToken.IsCancellationRequested)
throw new OperationCanceledException(parallelOptions.CancellationToken);
// We must validate that the actions array contains no null elements, and also
// make a defensive copy of the actions array.
Action[] actionsCopy = new Action[actions.Length];
for (int i = 0; i < actionsCopy.Length; i++)
{
actionsCopy[i] = actions[i];
if (actionsCopy[i] == null)
{
throw new ArgumentException(Environment.GetResourceString("Parallel_Invoke_ActionNull"));
}
}
// ETW event for Parallel Invoke Begin
int forkJoinContextID = 0;
Task callerTask = null;
if (TplEtwProvider.Log.IsEnabled())
{
forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID);
callerTask = Task.InternalCurrent;
TplEtwProvider.Log.ParallelInvokeBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0),
forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelInvoke,
actionsCopy.Length);
}
// If we have no work to do, we are done.
if (actionsCopy.Length < 1) return;
// In the algorithm below, if the number of actions is greater than this, we automatically
// use Parallel.For() to handle the actions, rather than the Task-per-Action strategy.
const int SMALL_ACTIONCOUNT_LIMIT = 10;
try
{
// If we've gotten this far, it's time to process the actions.
if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) ||
(parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length))
{
// Used to hold any exceptions encountered during action processing
ConcurrentQueue<Exception> exceptionQ = null; // will be lazily initialized if necessary
// This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism.
try
{
// Launch a self-replicating task to handle the execution of all actions.
// The use of a self-replicating task allows us to use as many cores
// as are available, and no more.  The exception to this rule is
// that, in the case of a blocked action, the ThreadPool may inject
// extra threads, which means extra tasks can run.
int actionIndex = 0;
ParallelForReplicatingTask rootTask = new ParallelForReplicatingTask(parallelOptions, delegate
{
// Each for-task will pull an action at a time from the list
int myIndex = Interlocked.Increment(ref actionIndex); // = index to use + 1
while (myIndex <= actionsCopy.Length)
{
// Catch and store any exceptions.  If we don't catch them, the self-replicating
// task will exit, and that may cause other SR-tasks to exit.
// And (absent cancellation) we want all actions to execute.
try
{
actionsCopy[myIndex - 1]();
}
catch (Exception e)
{
LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); });
exceptionQ.Enqueue(e);
}
// Check for cancellation.  If it is encountered, then exit the delegate.
if (parallelOptions.CancellationToken.IsCancellationRequested)
throw new OperationCanceledException(parallelOptions.CancellationToken);
// You're still in the game.  Grab your next action index.
myIndex = Interlocked.Increment(ref actionIndex);
}
}, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating);
rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);
rootTask.Wait();
}
catch (Exception e)
{
LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); });
// Since we're consuming all action exceptions, there are very few reasons that
// we would see an exception here.  Two that come to mind:
//   (1) An OCE thrown by one or more actions (AggregateException thrown)
//   (2) An exception thrown from the ParallelForReplicatingTask constructor
//       (regular exception thrown).
// We'll need to cover them both.
AggregateException ae = e as AggregateException;
if (ae != null)
{
// Strip off outer container of an AggregateException, because downstream
// logic needs OCEs to be at the top level.
foreach (Exception exc in ae.InnerExceptions) exceptionQ.Enqueue(exc);
}
else
{
exceptionQ.Enqueue(e);
}
}
// If we have encountered any exceptions, then throw.
if ((exceptionQ != null) && (exceptionQ.Count > 0))
{
ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken);
throw new AggregateException(exceptionQ);
}
}
else
{
// This is more efficient for a small number of actions and no DOP support
// Initialize our array of tasks, one per action.
Task[] tasks = new Task[actionsCopy.Length];
// One more check before we begin...
if (parallelOptions.CancellationToken.IsCancellationRequested)
throw new OperationCanceledException(parallelOptions.CancellationToken);
// Launch all actions as tasks
for (int i = 1; i < tasks.Length; i++)
{
tasks[i] = Task.Factory.StartNew(actionsCopy[i], parallelOptions.CancellationToken, TaskCreationOptions.None,
InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler);
}
// Optimization: Use current thread to run something before we block waiting for all tasks.
tasks[0] = new Task(actionsCopy[0]);
tasks[0].RunSynchronously(parallelOptions.EffectiveTaskScheduler);
// Now wait for the tasks to complete.  This will not unblock until all of
// them complete, and it will throw an exception if one or more of them also
// threw an exception.  We let such exceptions go completely unhandled.
try
{
if (tasks.Length <= 4)
{
// for 4 or less tasks, the sequential waitall version is faster
Task.FastWaitAll(tasks);
}
else
{
// otherwise we revert to the regular WaitAll which delegates the multiple wait to the cooperative event.
Task.WaitAll(tasks);
}
}
catch (AggregateException aggExp)
{
// see if we can combine it into a single OCE. If not propagate the original exception
ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken);
throw;
}
finally
{
for (int i = 0; i < tasks.Length; i++)
{
if (tasks[i].IsCompleted) tasks[i].Dispose();
}
}
}
}
finally
{
// ETW event for Parallel Invoke End
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelInvokeEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0),
forkJoinContextID);
}
}
}
}
  Parallel.Invoke 的实现非常简单,如果我们Action的个数超过10或者我们制定的并行度MaxDegreeOfParallelism小于Action的个数,我们采用ParallelForReplicatingTask来完成,否则我们直接把每个Action包装成Task【Task.Factory.StartNew】。这里我们主要看看ParallelForReplicatingTask的实现。
internal class ParallelForReplicatingTask : Task
{
private int m_replicationDownCount; // downcounter to control replication
internal ParallelForReplicatingTask(
ParallelOptions parallelOptions, Action action, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions)
: base(action, null, Task.InternalCurrent, default(CancellationToken), creationOptions, internalOptions | InternalTaskOptions.SelfReplicating, null)
{
m_replicationDownCount = parallelOptions.EffectiveMaxConcurrencyLevel;
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
PossiblyCaptureContext(ref stackMark);
}
internal override bool ShouldReplicate()
{
if (m_replicationDownCount == -1) return true; // "run wild"
if (m_replicationDownCount > 0) // Decrement and return true if not called with 0 downcount
{
m_replicationDownCount--;
return true;
}
return false; // We're done replicating
}
internal override Task CreateReplicaTask(Action<object> taskReplicaDelegate, Object stateObject, Task parentTask, TaskScheduler taskScheduler,
TaskCreationOptions creationOptionsForReplica, InternalTaskOptions internalOptionsForReplica)
{
return new ParallelForReplicaTask(taskReplicaDelegate, stateObject, parentTask, taskScheduler, creationOptionsForReplica, internalOptionsForReplica);
}
}
internal class ParallelForReplicaTask : Task
{
internal object m_stateForNextReplica;
internal object m_stateFromPreviousReplica;
internal Task m_handedOverChildReplica;
internal ParallelForReplicaTask(Action<object> taskReplicaDelegate, Object stateObject, Task parentTask, TaskScheduler taskScheduler,
TaskCreationOptions creationOptionsForReplica, InternalTaskOptions internalOptionsForReplica) :
base(taskReplicaDelegate, stateObject, parentTask, default(CancellationToken), creationOptionsForReplica, internalOptionsForReplica, taskScheduler)
{
}
internal override Object SavedStateForNextReplica
{
get { return m_stateForNextReplica; }
set { m_stateForNextReplica = value; }
}
internal override Object SavedStateFromPreviousReplica
{
get { return m_stateFromPreviousReplica; }
set { m_stateFromPreviousReplica = value; }
}
internal override Task HandedOverChildReplica
{
get { return m_handedOverChildReplica; }
set { m_handedOverChildReplica = value; }
}
}
21/212>
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号