Parallel.Invoke应该是Parallel几个方法中最简单的一个了,我们来看看它的实现,为了方法大家理解,我尽量保留源码中的注释:
public static class Parallel { internal static int s_forkJoinContextID; internal const int DEFAULT_LOOP_STRIDE = 16; internal static ParallelOptions s_defaultParallelOptions = new ParallelOptions(); public static void Invoke(params Action[] actions) { Invoke(s_defaultParallelOptions, actions); } //Executes each of the provided actions, possibly in parallel. public static void Invoke(ParallelOptions parallelOptions, params Action[] actions) { if (actions == null) { throw new ArgumentNullException("actions"); } if (parallelOptions == null) { throw new ArgumentNullException("parallelOptions"); } // Throw an ODE if we're passed a disposed CancellationToken. if (parallelOptions.CancellationToken.CanBeCanceled && AppContextSwitches.ThrowExceptionIfDisposedCancellationTokenSource) { parallelOptions.CancellationToken.ThrowIfSourceDisposed(); } // Quit early if we're already canceled -- avoid a bunch of work. if (parallelOptions.CancellationToken.IsCancellationRequested) throw new OperationCanceledException(parallelOptions.CancellationToken); // We must validate that the actions array contains no null elements, and also // make a defensive copy of the actions array. Action[] actionsCopy = new Action[actions.Length]; for (int i = 0; i < actionsCopy.Length; i++) { actionsCopy[i] = actions[i]; if (actionsCopy[i] == null) { throw new ArgumentException(Environment.GetResourceString("Parallel_Invoke_ActionNull")); } } // ETW event for Parallel Invoke Begin int forkJoinContextID = 0; Task callerTask = null; if (TplEtwProvider.Log.IsEnabled()) { forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); callerTask = Task.InternalCurrent; TplEtwProvider.Log.ParallelInvokeBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelInvoke, actionsCopy.Length); } // If we have no work to do, we are done. if (actionsCopy.Length < 1) return; // In the algorithm below, if the number of actions is greater than this, we automatically // use Parallel.For() to handle the actions, rather than the Task-per-Action strategy. const int SMALL_ACTIONCOUNT_LIMIT = 10; try { // If we've gotten this far, it's time to process the actions. if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) || (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length)) { // Used to hold any exceptions encountered during action processing ConcurrentQueue<Exception> exceptionQ = null; // will be lazily initialized if necessary // This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism. try { // Launch a self-replicating task to handle the execution of all actions. // The use of a self-replicating task allows us to use as many cores // as are available, and no more. The exception to this rule is // that, in the case of a blocked action, the ThreadPool may inject // extra threads, which means extra tasks can run. int actionIndex = 0; ParallelForReplicatingTask rootTask = new ParallelForReplicatingTask(parallelOptions, delegate { // Each for-task will pull an action at a time from the list int myIndex = Interlocked.Increment(ref actionIndex); // = index to use + 1 while (myIndex <= actionsCopy.Length) { // Catch and store any exceptions. If we don't catch them, the self-replicating // task will exit, and that may cause other SR-tasks to exit. // And (absent cancellation) we want all actions to execute. try { actionsCopy[myIndex - 1](); } catch (Exception e) { LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); }); exceptionQ.Enqueue(e); } // Check for cancellation. If it is encountered, then exit the delegate. if (parallelOptions.CancellationToken.IsCancellationRequested) throw new OperationCanceledException(parallelOptions.CancellationToken); // You're still in the game. Grab your next action index. myIndex = Interlocked.Increment(ref actionIndex); } }, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating); rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); rootTask.Wait(); } catch (Exception e) { LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); }); // Since we're consuming all action exceptions, there are very few reasons that // we would see an exception here. Two that come to mind: // (1) An OCE thrown by one or more actions (AggregateException thrown) // (2) An exception thrown from the ParallelForReplicatingTask constructor // (regular exception thrown). // We'll need to cover them both. AggregateException ae = e as AggregateException; if (ae != null) { // Strip off outer container of an AggregateException, because downstream // logic needs OCEs to be at the top level. foreach (Exception exc in ae.InnerExceptions) exceptionQ.Enqueue(exc); } else { exceptionQ.Enqueue(e); } } // If we have encountered any exceptions, then throw. if ((exceptionQ != null) && (exceptionQ.Count > 0)) { ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken); throw new AggregateException(exceptionQ); } } else { // This is more efficient for a small number of actions and no DOP support // Initialize our array of tasks, one per action. Task[] tasks = new Task[actionsCopy.Length]; // One more check before we begin... if (parallelOptions.CancellationToken.IsCancellationRequested) throw new OperationCanceledException(parallelOptions.CancellationToken); // Launch all actions as tasks for (int i = 1; i < tasks.Length; i++) { tasks[i] = Task.Factory.StartNew(actionsCopy[i], parallelOptions.CancellationToken, TaskCreationOptions.None, InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler); } // Optimization: Use current thread to run something before we block waiting for all tasks. tasks[0] = new Task(actionsCopy[0]); tasks[0].RunSynchronously(parallelOptions.EffectiveTaskScheduler); // Now wait for the tasks to complete. This will not unblock until all of // them complete, and it will throw an exception if one or more of them also // threw an exception. We let such exceptions go completely unhandled. try { if (tasks.Length <= 4) { // for 4 or less tasks, the sequential waitall version is faster Task.FastWaitAll(tasks); } else { // otherwise we revert to the regular WaitAll which delegates the multiple wait to the cooperative event. Task.WaitAll(tasks); } } catch (AggregateException aggExp) { // see if we can combine it into a single OCE. If not propagate the original exception ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); throw; } finally { for (int i = 0; i < tasks.Length; i++) { if (tasks[i].IsCompleted) tasks[i].Dispose(); } } } } finally { // ETW event for Parallel Invoke End if (TplEtwProvider.Log.IsEnabled()) { TplEtwProvider.Log.ParallelInvokeEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), forkJoinContextID); } } } } |
Parallel.Invoke 的实现非常简单,如果我们Action的个数超过10或者我们制定的并行度MaxDegreeOfParallelism小于Action的个数,我们采用ParallelForReplicatingTask来完成,否则我们直接把每个Action包装成Task【Task.Factory.StartNew】。这里我们主要看看ParallelForReplicatingTask的实现。
internal class ParallelForReplicatingTask : Task { private int m_replicationDownCount; // downcounter to control replication internal ParallelForReplicatingTask( ParallelOptions parallelOptions, Action action, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions) : base(action, null, Task.InternalCurrent, default(CancellationToken), creationOptions, internalOptions | InternalTaskOptions.SelfReplicating, null) { m_replicationDownCount = parallelOptions.EffectiveMaxConcurrencyLevel; StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; PossiblyCaptureContext(ref stackMark); } internal override bool ShouldReplicate() { if (m_replicationDownCount == -1) return true; // "run wild" if (m_replicationDownCount > 0) // Decrement and return true if not called with 0 downcount { m_replicationDownCount--; return true; } return false; // We're done replicating } internal override Task CreateReplicaTask(Action<object> taskReplicaDelegate, Object stateObject, Task parentTask, TaskScheduler taskScheduler, TaskCreationOptions creationOptionsForReplica, InternalTaskOptions internalOptionsForReplica) { return new ParallelForReplicaTask(taskReplicaDelegate, stateObject, parentTask, taskScheduler, creationOptionsForReplica, internalOptionsForReplica); } } internal class ParallelForReplicaTask : Task { internal object m_stateForNextReplica; internal object m_stateFromPreviousReplica; internal Task m_handedOverChildReplica; internal ParallelForReplicaTask(Action<object> taskReplicaDelegate, Object stateObject, Task parentTask, TaskScheduler taskScheduler, TaskCreationOptions creationOptionsForReplica, InternalTaskOptions internalOptionsForReplica) : base(taskReplicaDelegate, stateObject, parentTask, default(CancellationToken), creationOptionsForReplica, internalOptionsForReplica, taskScheduler) { } internal override Object SavedStateForNextReplica { get { return m_stateForNextReplica; } set { m_stateForNextReplica = value; } } internal override Object SavedStateFromPreviousReplica { get { return m_stateFromPreviousReplica; } set { m_stateFromPreviousReplica = value; } } internal override Task HandedOverChildReplica { get { return m_handedOverChildReplica; } set { m_handedOverChildReplica = value; } } } |