C# Parallel.Invoke 实现

发表于:2017-12-19 09:55

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:dz45693    来源:博客园

分享:
  ParallelForReplicatingTask的ShouldReplicate方法表示当前Task是否可以继续Replicate,每Replicate一次并行计数器减1,调用CreateReplicaTask方法创建新的ParallelForReplicaTask实例,最后调用Task的RunSynchronously方法,RunSynchronously【ExecuteSelfReplicating】才是核心实现。
public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable
{
/*
Runs the Task synchronously on the current TaskScheduler. A task may only be started and run only once. Any attempts to schedule a task a second time will result in an exception.If the target scheduler does not support running this Task on the current thread, the Task will be scheduled for execution on the scheduler, and the current thread will block until the Task has completed execution.
*/
public void RunSynchronously()
{
InternalRunSynchronously(TaskScheduler.Current, waitForCompletion: true);
}
public void RunSynchronously(TaskScheduler scheduler)
{
if (scheduler == null)
{
throw new ArgumentNullException("scheduler");
}
Contract.EndContractBlock();
InternalRunSynchronously(scheduler, waitForCompletion: true);
}
internal void InnerInvokeWithArg(Task childTask)
{
InnerInvoke();
}
private static void ExecuteSelfReplicating(Task root)
{
TaskCreationOptions creationOptionsForReplicas = root.CreationOptions | TaskCreationOptions.AttachedToParent;
InternalTaskOptions internalOptionsForReplicas =
InternalTaskOptions.ChildReplica |  // child replica flag disables self replication for the replicas themselves.
InternalTaskOptions.SelfReplicating |  // we still want to identify this as part of a self replicating group
InternalTaskOptions.QueuedByRuntime;   // we queue and cancel these tasks internally, so don't allow CT registration to take place
// Important Note: The child replicas we launch from here will be attached the root replica (by virtue of the root.CreateReplicaTask call)
// because we need the root task to receive all their exceptions, and to block until all of them return
// This variable is captured in a closure and shared among all replicas.
bool replicasAreQuitting = false;
// Set up a delegate that will form the body of the root and all recursively created replicas.
Action<object> taskReplicaDelegate = null;
taskReplicaDelegate = delegate
{
Task currentTask = Task.InternalCurrent;
// Check if a child task has been handed over by a prematurely quiting replica that we might be a replacement for.
Task childTask = currentTask.HandedOverChildReplica;
if (childTask == null)
{
// Apparently we are not a replacement task. This means we need to queue up a child task for replication to progress
// Down-counts a counter in the root task.
if (!root.ShouldReplicate()) return;
// If any of the replicas have quit, we will do so ourselves.
if (Volatile.Read(ref replicasAreQuitting))
{
return;
}
// Propagate a copy of the context from the root task. It may be null if flow was suppressed.
ExecutionContext creatorContext = root.CapturedContext;
childTask = root.CreateReplicaTask(taskReplicaDelegate, root.m_stateObject, root, root.ExecutingTaskScheduler,
creationOptionsForReplicas, internalOptionsForReplicas);
childTask.CapturedContext = CopyExecutionContext(creatorContext);
childTask.ScheduleAndStart(false);
}
// Finally invoke the meat of the task.
// Note that we are directly calling root.InnerInvoke() even though we are currently be in the action delegate of a child replica
// This is because the actual work was passed down in that delegate, and the action delegate of the child replica simply contains this
// replication control logic.
try
{
// passing in currentTask only so that the parallel debugger can find it
root.InnerInvokeWithArg(currentTask);
}
catch (Exception exn)
{
// Record this exception in the root task's exception list
root.HandleException(exn);
if (exn is ThreadAbortException)
{
// If this is a ThreadAbortException it will escape this catch clause, causing us to skip the regular Finish codepath
// In order not to leave the task unfinished, we now call FinishThreadAbortedTask here
currentTask.FinishThreadAbortedTask(false, true);
}
}
Object savedState = currentTask.SavedStateForNextReplica;
// check for premature exit
if (savedState != null)
{
// the replica decided to exit early
// we need to queue up a replacement, attach the saved state, and yield the thread right away
Task replacementReplica = root.CreateReplicaTask(taskReplicaDelegate, root.m_stateObject, root, root.ExecutingTaskScheduler,
creationOptionsForReplicas, internalOptionsForReplicas);
// Propagate a copy of the context from the root task to the replacement task
ExecutionContext creatorContext = root.CapturedContext;
replacementReplica.CapturedContext = CopyExecutionContext(creatorContext);
replacementReplica.HandedOverChildReplica = childTask;
replacementReplica.SavedStateFromPreviousReplica = savedState;
replacementReplica.ScheduleAndStart(false);
}
else
{
// The replica finished normally, which means it can't find more work to grab.
// Time to mark replicas quitting
replicasAreQuitting = true;
// InternalCancel() could conceivably throw in the underlying scheduler's TryDequeue() method.
// If it does, then make sure that we record it.
try
{
childTask.InternalCancel(true);
}
catch (Exception e)
{
// Apparently TryDequeue threw an exception.  Before propagating that exception, InternalCancel should have
// attempted an atomic state transition and a call to CancellationCleanupLogic() on this task. So we know
// the task was properly cleaned up if it was possible.
//
// Now all we need to do is to Record the exception in the root task.
root.HandleException(e);
}
// No specific action needed if the child could not be canceled
// because we attached it to the root task, which should therefore be receiving any exceptions from the child,
// and root.wait will not return before this child finishes anyway.
}
};
//
// Now we execute as the root task
//
taskReplicaDelegate(null);
}
}
  Task的RunSynchronously的实现路径有以下两种方式:
  Task.RunSynchronously->Task.InternalRunSynchronously->TaskScheduler.TryRunInline->ThreadPoolTaskScheduler.TryExecuteTaskInline->Task.ExecuteWithThreadLocal->Task.ExecuteEntry->Task.Execute
  Task.RunSynchronously->Task.InternalRunSynchronously->TaskScheduler.TryRunInline(false)->TaskScheduler.InternalQueueTask->ThreadPoolTaskScheduler.QueueTask->Task.IThreadPoolWorkItem.ExecuteWorkItem()->Task.ExecuteWithThreadLocal->Task.ExecuteEntry->Task.Execute,说白了最终会调用Task的Execute方法,在Execute方法中会检查 IsSelfReplicatingRoot是否为true【在实例ParallelForReplicatingTask时指定了参数InternalTaskOptions.SelfReplicating】,如果是则调用ExecuteSelfReplicating方法。
  ExecuteSelfReplicating方法首先检查当前Task的ExecuteSelfReplicating属性是否为空【该属性也是一个Task,如果为空表示这个task运行的Action已经结束】,不为空时 我们检查Root Task是否还需要 Replicate【调用ParallelForReplicatingTask的ShouldReplicate,root.ShouldReplicate()】,然后在检查变量replicasAreQuitting是否退出循环【if (Volatile.Read(ref replicasAreQuitting)) 多线程读】,否者调用ParallelForReplicatingTask的CreateReplicaTask创建子任务,最后调用root.InnerInvokeWithArg(currentTask);,其实这里就是调用Parallel.Invoke里面的的delegate委托,每次调用只执行一个Action,currentTask.SavedStateForNextReplica这一句在Parallel.Invoke没有什么意义,但是在Parallel.For里面表示下一个要执行的Task,ParallelForReplicatingTask会执行一个Action,它可以创建子的ParallelForReplicatingTask,每个ParallelForReplicatingTask实例也会执行一个Action。实际上我没有多少个Action 就会调用多少次Task.Execute,ParallelForReplicatingTask实例个数很大程度上取决于并行度参数EffectiveMaxConcurrencyLevel,也决定ExecuteSelfReplicating调用的次数。如下线程代用流程如下: 流程1永远只调用1次【 rootTask.RunSynchronously】,流程2是ExecuteSelfReplicating代用次数,流程3是普通InnerInvoke调用次数
上文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理。
22/2<12
精选软件测试好文,快来阅读吧~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号