TaskScheduler 类

定义

表示一个对象,该对象处理线程上排队任务的低级别工作。

public ref class TaskScheduler abstract
public abstract class TaskScheduler
type TaskScheduler = class
Public MustInherit Class TaskScheduler
继承
TaskScheduler

示例

以下示例创建一个自定义任务计划程序,用于限制应用使用的线程数。 然后,它会启动两组任务,并显示有关任务及其执行任务的线程的信息。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

class Example
{
   static void Main()
   {
       // Create a scheduler that uses two threads.
       LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(2);
       List<Task> tasks = new List<Task>();

       // Create a TaskFactory and pass it our custom scheduler.
       TaskFactory factory = new TaskFactory(lcts);
       CancellationTokenSource cts = new CancellationTokenSource();

       // Use our factory to run a set of tasks.
       Object lockObj = new Object();
       int outputItem = 0;

       for (int tCtr = 0; tCtr <= 4; tCtr++) {
          int iteration = tCtr;
          Task t = factory.StartNew(() => {
                                       for (int i = 0; i < 1000; i++) {
                                          lock (lockObj) {
                                             Console.Write("{0} in task t-{1} on thread {2}   ",
                                                           i, iteration, Thread.CurrentThread.ManagedThreadId);
                                             outputItem++;
                                             if (outputItem % 3 == 0)
                                                Console.WriteLine();
                                          }
                                       }
                                    }, cts.Token);
          tasks.Add(t);
      }
      // Use it to run a second set of tasks.
      for (int tCtr = 0; tCtr <= 4; tCtr++) {
         int iteration = tCtr;
         Task t1 = factory.StartNew(() => {
                                       for (int outer = 0; outer <= 10; outer++) {
                                          for (int i = 0x21; i <= 0x7E; i++) {
                                             lock (lockObj) {
                                                Console.Write("'{0}' in task t1-{1} on thread {2}   ",
                                                              Convert.ToChar(i), iteration, Thread.CurrentThread.ManagedThreadId);
                                                outputItem++;
                                                if (outputItem % 3 == 0)
                                                   Console.WriteLine();
                                             }
                                          }
                                       }
                                    }, cts.Token);
         tasks.Add(t1);
      }

      // Wait for the tasks to complete before displaying a completion message.
      Task.WaitAll(tasks.ToArray());
      cts.Dispose();
      Console.WriteLine("\n\nSuccessful completion.");
   }
}

// Provides a task scheduler that ensures a maximum concurrency level while
// running on top of the thread pool.
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
   // Indicates whether the current thread is processing work items.
   [ThreadStatic]
   private static bool _currentThreadIsProcessingItems;

  // The list of tasks to be executed
   private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)

   // The maximum concurrency level allowed by this scheduler.
   private readonly int _maxDegreeOfParallelism;

   // Indicates whether the scheduler is currently processing work items.
   private int _delegatesQueuedOrRunning = 0;

   // Creates a new instance with the specified degree of parallelism.
   public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
   {
       if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
       _maxDegreeOfParallelism = maxDegreeOfParallelism;
   }

   // Queues a task to the scheduler.
   protected sealed override void QueueTask(Task task)
   {
      // Add the task to the list of tasks to be processed.  If there aren't enough
      // delegates currently queued or running to process tasks, schedule another.
       lock (_tasks)
       {
           _tasks.AddLast(task);
           if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
           {
               ++_delegatesQueuedOrRunning;
               NotifyThreadPoolOfPendingWork();
           }
       }
   }

   // Inform the ThreadPool that there's work to be executed for this scheduler.
   private void NotifyThreadPoolOfPendingWork()
   {
       ThreadPool.UnsafeQueueUserWorkItem(_ =>
       {
           // Note that the current thread is now processing work items.
           // This is necessary to enable inlining of tasks into this thread.
           _currentThreadIsProcessingItems = true;
           try
           {
               // Process all available items in the queue.
               while (true)
               {
                   Task item;
                   lock (_tasks)
                   {
                       // When there are no more items to be processed,
                       // note that we're done processing, and get out.
                       if (_tasks.Count == 0)
                       {
                           --_delegatesQueuedOrRunning;
                           break;
                       }

                       // Get the next item from the queue
                       item = _tasks.First.Value;
                       _tasks.RemoveFirst();
                   }

                   // Execute the task we pulled out of the queue
                   base.TryExecuteTask(item);
               }
           }
           // We're done processing items on the current thread
           finally { _currentThreadIsProcessingItems = false; }
       }, null);
   }

   // Attempts to execute the specified task on the current thread.
   protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
   {
       // If this thread isn't already processing a task, we don't support inlining
       if (!_currentThreadIsProcessingItems) return false;

       // If the task was previously queued, remove it from the queue
       if (taskWasPreviouslyQueued)
          // Try to run the task.
          if (TryDequeue(task))
            return base.TryExecuteTask(task);
          else
             return false;
       else
          return base.TryExecuteTask(task);
   }

   // Attempt to remove a previously scheduled task from the scheduler.
   protected sealed override bool TryDequeue(Task task)
   {
       lock (_tasks) return _tasks.Remove(task);
   }

   // Gets the maximum concurrency level supported by this scheduler.
   public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

   // Gets an enumerable of the tasks currently scheduled on this scheduler.
   protected sealed override IEnumerable<Task> GetScheduledTasks()
   {
       bool lockTaken = false;
       try
       {
           Monitor.TryEnter(_tasks, ref lockTaken);
           if (lockTaken) return _tasks;
           else throw new NotSupportedException();
       }
       finally
       {
           if (lockTaken) Monitor.Exit(_tasks);
       }
   }
}
// The following is a portion of the output from a single run of the example:
//    'T' in task t1-4 on thread 3   'U' in task t1-4 on thread 3   'V' in task t1-4 on thread 3
//    'W' in task t1-4 on thread 3   'X' in task t1-4 on thread 3   'Y' in task t1-4 on thread 3
//    'Z' in task t1-4 on thread 3   '[' in task t1-4 on thread 3   '\' in task t1-4 on thread 3
//    ']' in task t1-4 on thread 3   '^' in task t1-4 on thread 3   '_' in task t1-4 on thread 3
//    '`' in task t1-4 on thread 3   'a' in task t1-4 on thread 3   'b' in task t1-4 on thread 3
//    'c' in task t1-4 on thread 3   'd' in task t1-4 on thread 3   'e' in task t1-4 on thread 3
//    'f' in task t1-4 on thread 3   'g' in task t1-4 on thread 3   'h' in task t1-4 on thread 3
//    'i' in task t1-4 on thread 3   'j' in task t1-4 on thread 3   'k' in task t1-4 on thread 3
//    'l' in task t1-4 on thread 3   'm' in task t1-4 on thread 3   'n' in task t1-4 on thread 3
//    'o' in task t1-4 on thread 3   'p' in task t1-4 on thread 3   ']' in task t1-2 on thread 4
//    '^' in task t1-2 on thread 4   '_' in task t1-2 on thread 4   '`' in task t1-2 on thread 4
//    'a' in task t1-2 on thread 4   'b' in task t1-2 on thread 4   'c' in task t1-2 on thread 4
//    'd' in task t1-2 on thread 4   'e' in task t1-2 on thread 4   'f' in task t1-2 on thread 4
//    'g' in task t1-2 on thread 4   'h' in task t1-2 on thread 4   'i' in task t1-2 on thread 4
//    'j' in task t1-2 on thread 4   'k' in task t1-2 on thread 4   'l' in task t1-2 on thread 4
//    'm' in task t1-2 on thread 4   'n' in task t1-2 on thread 4   'o' in task t1-2 on thread 4
//    'p' in task t1-2 on thread 4   'q' in task t1-2 on thread 4   'r' in task t1-2 on thread 4
//    's' in task t1-2 on thread 4   't' in task t1-2 on thread 4   'u' in task t1-2 on thread 4
//    'v' in task t1-2 on thread 4   'w' in task t1-2 on thread 4   'x' in task t1-2 on thread 4
//    'y' in task t1-2 on thread 4   'z' in task t1-2 on thread 4   '{' in task t1-2 on thread 4
//    '|' in task t1-2 on thread 4   '}' in task t1-2 on thread 4   '~' in task t1-2 on thread 4
//    'q' in task t1-4 on thread 3   'r' in task t1-4 on thread 3   's' in task t1-4 on thread 3
//    't' in task t1-4 on thread 3   'u' in task t1-4 on thread 3   'v' in task t1-4 on thread 3
//    'w' in task t1-4 on thread 3   'x' in task t1-4 on thread 3   'y' in task t1-4 on thread 3
//    'z' in task t1-4 on thread 3   '{' in task t1-4 on thread 3   '|' in task t1-4 on thread 3

注解

TaskScheduler 类表示任务计划程序。 任务计划程序确保任务最终被执行。

默认任务计划程序提供负载均衡的工作窃取、线程注入/停用,以实现最大吞吐量和总体良好性能。 它应该足以满足大多数方案。

TaskScheduler 类还充当所有可自定义计划逻辑的扩展点。 这包括如何计划任务以供执行的机制,以及如何向调试器公开计划任务。 如果需要特殊功能,可以创建自定义计划程序,并为特定任务或查询启用它。

默认任务计划程序和线程池

任务并行库和 PLINQ 的默认计划程序使用由类表示 ThreadPool 的 .NET 线程池来排队和执行工作。 线程池使用Task类型提供的信息,以有效支持并行任务和查询中通常体现的细粒度并行度(生存期较短的工作单位)。

全局队列与本地队列

线程池维护每个应用程序域中线程的全局 FIFO(先入先出)工作队列。 每当程序调用 ThreadPool.QueueUserWorkItem (或 ThreadPool.UnsafeQueueUserWorkItem) 方法时,工作就会放在此共享队列上,并最终从队列中移除到下一个空闲线程。 从 .NET Framework 4 开始,此队列使用类似于类的 ConcurrentQueue<T> 无锁算法。 使用此无锁实现,线程池在排队和取消队列工作项时花费的时间更少。 此性能优势可用于使用线程池的所有程序。

顶级任务(不是在另一个任务上下文中创建的任务)与任何其他工作项一样置于全局队列中。 但是,嵌套任务或子任务是在另一个任务的上下文中创建的,处理方式大相径庭。 子任务或嵌套任务被放置在与父任务执行线程特定相关的本地队列中。 父任务可能是最高级任务,也可能是其他任务的子任务。 当此线程准备好进行更多工作时,它会首先在本地队列中查找。 如果工作项在此处等待,即可快速访问它们。 本地队列按照后进先出 (LIFO) 顺序访问,以保留缓存局部性并减少争用。 有关子任务和嵌套任务的详细信息,请参阅 附加和分离的子任务

使用本地队列不仅会降低全局队列的压力,还能够充分利用数据的局部性。 本地队列中的工作项经常引用内存中物理上彼此接近的数据结构。 在这些情况下,第一个任务运行后,数据已在缓存中,可以快速访问。 并行 LINQ (PLINQ)Parallel类都广泛使用嵌套任务和子任务,并使用本地工作队列实现显著加速。

工作窃取

从 .NET Framework 4 开始,线程池还具有工作窃取算法,以帮助确保没有线程处于空闲状态,而其他人仍在其队列中工作。 当线程池线程准备好接受更多工作时,它首先会查看自己的本地队列的头部,然后查看全局队列,最后查看其他线程的本地队列。 如果它在另一个线程的本地队列中找到工作项,则首先应用启发式方法,以确保它可以有效地运行工作。 如果可以,它会从尾部取消排队工作项(按 FIFO 顺序)。 这样可以减少每个本地队列上的争用并保留数据位置。 此体系结构可帮助线程池负载均衡比以前版本更高效地工作。

长期任务

你可能希望显式阻止将任务置于本地队列中。 例如,你可能知道特定工作项将运行相对长的时间并可能阻塞本地队列中的所有其他工作项。 在这种情况下,可以指定 System.Threading.Tasks.TaskCreationOptions 选项,它向计划程序提供提示,指示任务可能需要其他线程,以便它不会阻止本地队列上其他线程或工作项的向前进度。 使用此选项可以完全避免线程池,包括全局队列和本地队列。

任务内联

在某些情况下,等待 Task 时,可能会在执行等待操作的线程上同步执行。 通过使用现有线程而不需要额外线程来提高性能,否则现有线程就会被阻塞。 为了防止因重新进入而导致的错误,仅当在相关线程的本地队列中找到等待目标时,才会发生任务内联。

指定同步上下文

可以使用该方法 TaskScheduler.FromCurrentSynchronizationContext 指定任务应计划在特定线程上运行。 这在 Windows 窗体和 Windows Presentation Foundation 等框架中非常有用,其中对用户界面对象的访问通常仅限于在创建 UI 对象的同一线程上运行的代码。

以下示例使用 TaskScheduler.FromCurrentSynchronizationContext Windows Presentation Foundation (WPF) 应用中的方法在创建用户界面(UI)控件的同一线程上计划任务。 该示例创建从指定目录中随机选择的图像马赛克。 WPF 对象用于加载和调整图像的大小。 然后将原始像素传递给使用 For 循环将像素数据写入大型单字节数组的任务。 不需要同步,因为没有两个磁贴占用相同的数组元素。 磁贴可以按照任何顺序排列,因为它们的位置是独立计算的,不受其他磁贴影响。 然后,将大型数组传递到 UI 线程上运行的任务,其中像素数据加载到图像控件中。

该示例将数据移出 UI 线程,使用并行循环和 Task 对象对其进行修改,然后将其传回 UI 线程上运行的任务。 当您必须使用任务并行库来执行 WPF API 所不支持的操作或速度不够的操作时,这种方法非常有用。 在 WPF 中创建图像马赛克的另一种方法是使用 System.Windows.Controls.WrapPanel 控件并向其添加图像。 WrapPanel 处理定位磁贴的工作。 但是,只能在 UI 线程上执行此工作。

using System;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Media;
using System.Windows.Media.Imaging;

namespace WPF_CS1
{
    /// <summary>
    /// Interaction logic for MainWindow.xaml
    /// </summary>
    public partial class MainWindow : Window
    {
        private int fileCount;
        int colCount;
        int rowCount;
        private int tilePixelHeight;
        private int tilePixelWidth;
        private int largeImagePixelHeight;
        private int largeImagePixelWidth;
        private int largeImageStride;
        PixelFormat format;
        BitmapPalette palette = null;

        public MainWindow()
        {
            InitializeComponent();

            // For this example, values are hard-coded to a mosaic of 8x8 tiles.
            // Each tile is 50 pixels high and 66 pixels wide and 32 bits per pixel.
            colCount = 12;
            rowCount = 8;
            tilePixelHeight = 50;
            tilePixelWidth = 66;
            largeImagePixelHeight = tilePixelHeight * rowCount;
            largeImagePixelWidth = tilePixelWidth * colCount;
            largeImageStride = largeImagePixelWidth * (32 / 8);
            this.Width = largeImagePixelWidth + 40;
            image.Width = largeImagePixelWidth;
            image.Height = largeImagePixelHeight;
        }

        private void button_Click(object sender, RoutedEventArgs e)
        {

            // For best results use 1024 x 768 jpg files at 32bpp.
            string[] files = System.IO.Directory.GetFiles(@"C:\Users\Public\Pictures\Sample Pictures\", "*.jpg");

            fileCount = files.Length;
            Task<byte[]>[] images = new Task<byte[]>[fileCount];
            for (int i = 0; i < fileCount; i++)
            {
                int x = i;
                images[x] = Task.Factory.StartNew(() => LoadImage(files[x]));
            }

            // When they've all been loaded, tile them into a single byte array.
            var tiledImage = Task.Factory.ContinueWhenAll(
                images, (i) => TileImages(i));

            // We are currently on the UI thread. Save the sync context and pass it to
            // the next task so that it can access the UI control "image".
            var UISyncContext = TaskScheduler.FromCurrentSynchronizationContext();

            // On the UI thread, put the bytes into a bitmap and
            // display it in the Image control.
            var t3 = tiledImage.ContinueWith((antecedent) =>
            {
                // Get System DPI.
                Matrix m = PresentationSource.FromVisual(Application.Current.MainWindow)
                                            .CompositionTarget.TransformToDevice;
                double dpiX = m.M11;
                double dpiY = m.M22;

                BitmapSource bms = BitmapSource.Create(largeImagePixelWidth,
                    largeImagePixelHeight,
                    dpiX,
                    dpiY,
                    format,
                    palette, //use default palette
                    antecedent.Result,
                    largeImageStride);
                image.Source = bms;
            }, UISyncContext);
        }

        byte[] LoadImage(string filename)
        {
            // Use the WPF BitmapImage class to load and
            // resize the bitmap. NOTE: Only 32bpp formats are supported correctly.
            // Support for additional color formats is left as an exercise
            // for the reader. For more information, see documentation for ColorConvertedBitmap.

            BitmapImage bitmapImage = new BitmapImage();
            bitmapImage.BeginInit();
            bitmapImage.UriSource = new Uri(filename);
            bitmapImage.DecodePixelHeight = tilePixelHeight;
            bitmapImage.DecodePixelWidth = tilePixelWidth;
            bitmapImage.EndInit();

            format = bitmapImage.Format;
            int size = (int)(bitmapImage.Height * bitmapImage.Width);
            int stride = (int)bitmapImage.Width * 4;
            byte[] dest = new byte[stride * tilePixelHeight];

            bitmapImage.CopyPixels(dest, stride, 0);

            return dest;
        }

        int Stride(int pixelWidth, int bitsPerPixel)
        {
            return (((pixelWidth * bitsPerPixel + 31) / 32) * 4);
        }

        // Map the individual image tiles to the large image
        // in parallel. Any kind of raw image manipulation can be
        // done here because we are not attempting to access any
        // WPF controls from multiple threads.
        byte[] TileImages(Task<byte[]>[] sourceImages)
        {
            byte[] largeImage = new byte[largeImagePixelHeight * largeImageStride];
            int tileImageStride = tilePixelWidth * 4; // hard coded to 32bpp

            Random rand = new Random();
            Parallel.For(0, rowCount * colCount, (i) =>
            {
                // Pick one of the images at random for this tile.
                int cur = rand.Next(0, sourceImages.Length);
                byte[] pixels = sourceImages[cur].Result;

                // Get the starting index for this tile.
                int row = i / colCount;
                int col = (int)(i % colCount);
                int idx = ((row * (largeImageStride * tilePixelHeight)) + (col * tileImageStride));

                // Write the pixels for the current tile. The pixels are not contiguous
                // in the array, therefore we have to advance the index by the image stride
                // (minus the stride of the tile) for each scanline of the tile.
                int tileImageIndex = 0;
                for (int j = 0; j < tilePixelHeight; j++)
                {
                    // Write the next scanline for this tile.
                    for (int k = 0; k < tileImageStride; k++)
                    {
                        largeImage[idx++] = pixels[tileImageIndex++];
                    }
                    // Advance to the beginning of the next scanline.
                    idx += largeImageStride - tileImageStride;
                }
            });
            return largeImage;
        }
    }
}

若要创建示例,请在 Visual Studio 中创建 WPF 应用程序项目并将其命名为WPF_CS1(对于 C# WPF 项目)或WPF_VB1(对于 Visual Basic WPF 项目)。 然后执行以下操作:

  1. 在设计视图中,将Image工具箱中的控件拖到设计图面的左上角。 在“属性”窗口的“名称”文本框中,将控件命名为“image”。

  2. Button 控件从 工具箱 拖到应用程序窗口左下半部分。 在 XAML 视图中,将 Content 按钮的属性指定为“生成马赛克”,并将其属性指定 Width 为“100”。 通过添加Clickbutton_Click元素,将Click="button_Click"事件与<Button>事件处理程序连接起来,如示例代码中所定义的。 在“属性”窗口的“名称”文本框中,将控件命名为“按钮”。

  3. 将MainWindow.xaml.cs或MainWindow.xaml.vb文件的整个内容替换为此示例中的代码。 对于 C# WPF 项目,请确保工作区的名称与项目名称匹配。

  4. 该示例从名为 C:\Users\Public\Pictures\Sample Pictures 的目录中读取 JPEG 图像。 创建目录并将某些映像放入其中,或更改路径以引用包含图像的其他目录。

此示例存在一些限制。 例如,仅支持每个像素的 32 位图像;其他格式的图像在调整大小作期间被 BitmapImage 对象损坏。 此外,源图像必须都大于磁贴大小。 作为进一步练习,可以添加用于处理多个像素格式和文件大小的功能。

构造函数

名称 说明
TaskScheduler()

初始化 TaskScheduler

属性

名称 说明
Current

获取与 TaskScheduler 当前正在执行的任务关联的任务。

Default

获取由.NET提供的默认 TaskScheduler 实例。

Id

获取此 TaskSchedulerID 的唯一 ID。

MaximumConcurrencyLevel

指示能够支持的最大并发级别 TaskScheduler

方法

名称 说明
Equals(Object)

确定指定的对象是否等于当前对象。

(继承自 Object)
Finalize()

释放与此计划程序关联的所有资源。

FromCurrentSynchronizationContext()

创建与当前TaskScheduler关联的项SynchronizationContext

GetHashCode()

用作默认哈希函数。

(继承自 Object)
GetScheduledTasks()

仅对于调试器支持,生成当前排队到等待执行的计划程序的实例的 Task 可枚举。

GetType()

获取当前实例的 Type

(继承自 Object)
MemberwiseClone()

创建当前 Object的浅表副本。

(继承自 Object)
QueueTask(Task)

将一个 Task 队列排到计划程序。

ToString()

返回一个表示当前对象的字符串。

(继承自 Object)
TryDequeue(Task)

尝试取消 Task 排队之前已排入此计划程序队列的队列。

TryExecuteTask(Task)

尝试在此计划程序上执行提供的 Task

TryExecuteTaskInline(Task, Boolean)

确定是否可以在此调用中同步执行所提供的 Task 内容,如果可以,则执行它。

活动

名称 说明
UnobservedTaskException

当出错的任务未观察到的异常即将触发异常升级策略时发生,默认情况下,该策略将终止进程。

适用于

线程安全性

抽象 TaskScheduler 类型的所有成员都是线程安全的,可以同时从多个线程使用。

另请参阅