C#多线程与任务的简单创建

为了充分利用CPU资源.为了提高CPU的使用率,采用多线程的方式去同时完成几件事情而不互相干扰.也或者为了处理大量的IO操作时或处理的情况需要花费大量的时间等等,比如:读写文件,视频图像的采集,处理,显示,保存等。具体来说,“串行化”不能有效的提高硬件利用率、更在有些时候影响用户体验,我们用“并行的思路”去解决问题。本篇文章只是简单的写一下常用的线程、线程池、任务、并行等创建的方法,不涉及原理和分析,想深入了解的可以参考文章:雲霏霏-五天玩转并行多线程编程Action委托和Fun委托详细解析以及Monitor和lock的区别

一、C#多线程的一些简单创建方法:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.NetworkInformation;
using System.Text;
using System.Threading;

namespace PingTest
{

    public class Program
    {
        delegate void Delegate_Test(int a);

        static void Main(string[] args)
        {
            //[无参数]通过Thread类,其实也是通过ThreadStart委托  
            Thread t1 = new Thread(new ThreadStart(fun1));
            t1.Start();


            //[有1个参数]通过Thread类,通过ParameterizedThreadStart委托
            //注意:1)目标函数参数必须声明为Object类型(不能为Object的派生类)  
            //      2)有且只有一个参数
            //提示:int,long等为struct结构,不是object对象。
            Thread t2 = new Thread(new ParameterizedThreadStart(fun4));
            t2.Start(22);


            //[有1个参数]通过委托,然后使用BeginInvoke方法  
            //注意:1)有且只有一个参数,参数类型通过委托可以设置
            Delegate_Test dt = new Delegate_Test(fun2);
            dt.BeginInvoke(10, null, null);

            //[有1个参数]线程池,使用WaitCallback
            //注意:1)目标函数(本例中的fun3)参数必须声明为Object类型(不能为Object的派生类)
            //      2)有且只有一个参数
            ThreadPool.QueueUserWorkItem(new WaitCallback(fun3), 3);
            ThreadPool.QueueUserWorkItem((s) => { while (1 > 0) { Thread.Sleep(1000); Console.WriteLine(DateTime.Now.ToString()); } }, "");
            Console.Read();
        }
        public static void fun1()
        {
            while (1 > 0)
            {
                Console.WriteLine("来自普通Thread");
                Thread.Sleep(800);
            }
        }
        public static void fun2(int a)
        {
            while (a > 0)
            {
                Console.WriteLine("来自beginInvoke");
                Thread.Sleep(1500);
            }
        }

        static void fun3(Object o)
        {
            while (1 > 0)
            {
                Console.WriteLine("来自ThreadPool");
                Thread.Sleep(2050);
            }
        }

        public static void fun4(Object a)
        {
            int dd = Convert.ToInt32(a.ToString());
            while (dd > 0)
            {
                Console.WriteLine("来自ParameterizedThreadStart");
                Thread.Sleep(1500);
            }
        }
    }
}

二、任务的简单创建:

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 1000);
            t.Start();
            //t.Wait();
            Task cwt = t.ContinueWith(task => Console.WriteLine("The result is {0}",t.Result));
            Console.ReadKey();
        }

        private static Int32 Sum(Int32 n)
        {
            Int32 sum = 0;
            for (; n > 0; --n)
                checked{ sum += n;} //结果溢出,抛出异常
            return sum;
        }
    }
}

三、执行数量控制

a、线程池如何控制同时执行的数量?

第一步,在使用线程池前,可以提前设置好同时执行的线程数量/IO数量

 // 设置线程池中工作者线程数量为10,I/O线程数量为1000
ThreadPool.SetMaxThreads(10, 1000);

第二步、按照上面的设置,就是开100个线程,同时启动的最大数量为10。

for (int i = 0; i < 100; i++){
ThreadPool.QueueUserWorkItem((s) => { while (1 > 0) { Console.WriteLine(DateTime.Now.ToString()); } }, "");
}

b、task如何控制同时执行的数量?

第一步、实现一个Task数量控制类(nuget也有更好的、功能更多的类库,这个只是简单的)

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace LhSyncService.util
{
    public class TaskSchedulerLimitCount : TaskScheduler
    {
        [ThreadStatic]
        private static bool _currentThreadIsProcessingItems;

        private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)

        private readonly int _maxDegreeOfParallelism;

        private int _delegatesQueuedOrRunning = 0;

        public TaskSchedulerLimitCount(int maxDegreeOfParallelism)
        {
            if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
            _maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        protected sealed override void QueueTask(Task task)
        {
            lock (_tasks)
            {
                _tasks.AddLast(task);
                if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
                {
                    ++_delegatesQueuedOrRunning;
                    NotifyThreadPoolOfPendingWork();
                }
            }
        }

        private void NotifyThreadPoolOfPendingWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(_ =>
            {
                _currentThreadIsProcessingItems = true;
                try
                {
                    while (true)
                    {
                        Task item;
                        lock (_tasks)
                        {
                            if (_tasks.Count == 0)
                            {
                                --_delegatesQueuedOrRunning;
                                break;
                            }
                            item = _tasks.First.Value;
                            _tasks.RemoveFirst();
                        }
                        base.TryExecuteTask(item);
                    }
                }
                finally { _currentThreadIsProcessingItems = false; }
            }, null);
        }

        protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            if (!_currentThreadIsProcessingItems) return false;

            if (taskWasPreviouslyQueued)
                if (TryDequeue(task))
                    return base.TryExecuteTask(task);
                else
                    return false;
            else
                return base.TryExecuteTask(task);
        }

        protected sealed override bool TryDequeue(Task task)
        {
            lock (_tasks) return _tasks.Remove(task);
        }

        public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            bool lockTaken = false;
            try
            {
                Monitor.TryEnter(_tasks, ref lockTaken);
                if (lockTaken) return _tasks;
                else throw new NotSupportedException();
            }
            finally
            {
                if (lockTaken) Monitor.Exit(_tasks);
            }
        }
    }
}

第二步、在创建Task之前,先设置好同时启动的task数量,如示例,同时启动2个

            var scheduler = new TaskSchedulerLimitCount(2);
            var Factory = new TaskFactory(scheduler);

第三步、创建任务Task,创建100个task,根据上面的设置,同时自启动2个

for (int i = 1; i <= 100; i++)
{
    Task<Int32> task = Factory.StartNew<Int32>(n => Sum((Int32)n), 1000);
}

如果想深入“并行、Thread、ThreadPool、Task”,强烈建议学习这个系列的博客,写的很好。:雲霏霏-五天玩转并行多线程编程

结束!

 

四、常用技术栈小知识附录:

1、Thread.Sleep() 和 Thread.SpinWait():

Thread.Sleep(int)方法:是强制放弃CPU的时间片,然后重新和其他线程一起参与CPU的竞争。
Thread.SpinWait(int)方法:只是让CPU去执行一段没有用的代码。当时间结束之后能马上继续执行,而不是重新参与CPU的竞争。

2、System.Diagnostics名称控件下的StopWatch()

var stopWatch = new StopWatch();   //创建一个Stopwatch实例
stopWatch.Start();   //开始计时
stopWatch.Stop();   //停止计时
stopWatch.Reset();  //重置StopWatch
stopWatch.Restart(); //重新启动被停止的StopWatch
stopWatch.ElapsedMilliseconds //获取stopWatch从开始到现在的时间差,单位是毫秒

3、Paralle.Invoke:并行执行多个方法,定义如下

  public static void Invoke(params Action[] actions);  
  public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);
a、没有固定的顺序,每个Task可能是不同的线程去执行,也可能是相同的;
b、主线程必须等Invoke中的所有方法执行完成后返回才继续向下执行;这样对我们以后设计并行的时候,要考虑每个Task任务尽可能差不多,如果相差很大,比如一个时间非常长,其他都比较短,这样一个线程可能会影响整个任务的性能。这点非常重要
c、这个非常简单就实现了并行,不用我们考虑线程问题。主要Framework已经为我们控制好线程池的问题。
d、Action委托决定了并行的任务不具备返回值,但可以有传入参数

先看第一个public static void Invoke(params Action[] actions)的示例

        static void Main(string[] args)
        {
            Parallel.Invoke(()=>fun2(10), () => fun2(50));
            Parallel.Invoke(() => fun1(), () => fun1());
            Parallel.Invoke(() => fun2(10), () => fun1());
            Parallel.Invoke(fun1, () => fun2(10));
            Console.ReadKey();
        }

        private static void fun1()
        {
           //do something;
        }

        private static void fun2(int n)
        {
            //do something;
        }

接下来看一下public static void Invoke(ParallelOptions parallelOptions, params Action[] actions)

主要是参数ParallelOptions,可以设置最大的并发数量和一个“取消标识”。主要说取消标志,比如A线程发送“取消标志”,其他线程就可以拿到这个“取消标志”来判断是否需要继续执行(监听到“取消标志”的线程可以继续执行、也可以不执行,“取消标志”只是一个“信号量”)

   public class ParallelInvoke
    {
        // 定义CancellationTokenSource 控制取消
        readonly CancellationTokenSource _cts = new CancellationTokenSource();
        /// <summary>
        /// Invoke方式一 action
        /// </summary>
        public void Main()
        {
            Console.WriteLine("主线程:{0}线程ID : {1};开始{2}", "Client3", Thread.CurrentThread.ManagedThreadId, DateTime.Now);
            var po = new ParallelOptions
            {
                CancellationToken = _cts.Token, // 控制线程取消
                MaxDegreeOfParallelism = 3  // 设置最大的线程数3,仔细观察线程ID变化
            };
            Parallel.Invoke(po, () => C("task1"), () => B(po), A);
            Console.WriteLine("主线程{0}线程ID : {1};结束{2}", "main", Thread.CurrentThread.ManagedThreadId, DateTime.Now);
        }

        /// <summary>
        /// 示例线程:用来演示一个不用ParallelOptions参数的普通线程
        /// </summary>
        /// <param name="data"></param>
        private void C(string data)
        {
            while (1 > 0)
            {
                Console.WriteLine("任务名Takt1:{0}线程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
                Thread.Sleep(1000);
            }
        }

        /// <summary>
        /// 示例线程:使用了ParallelOptions参数,就可以监听“取消标识”的示例
        /// </summary>
        /// <param name="po"></param>
        private void B(ParallelOptions po)
        {
            Console.WriteLine("进入Task5线程ID : {0}", Thread.CurrentThread.ManagedThreadId);
            int i = 0;
            while (i < 100)
            {
                // 判断是否已经取消
                if (po.CancellationToken.IsCancellationRequested)
                {
                    Console.WriteLine("已经被取消。");
                    return;
                }

                Thread.Sleep(100);
                Console.WriteLine(i + " ");
                Interlocked.Increment(ref i);
            }
        }

        /// <summary>
        /// 线程示例:用来发送“取消标志”
        /// </summary>
        private void A()
        {
            Console.WriteLine("进入取消任务,Task6线程ID : {0}", Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(1000 * 5);
            Console.WriteLine("发送取消标识");
            _cts.Cancel();
        }
    }

其他的如Parallel.For,Parallel.Foreach,Parallel中途退出循环和异常处理,在Parallel详细使用讲解中都有讲到,很详细(还得再次感谢博主分享)

4、Func、Action委托

a、Func 用法 (封装方法,传入参数, 有返回值)
b、Action 用法 (封装一个方法, 传入参数, 无返回值)

别人详细的介绍文档:Action委托和Fun委托详细解析

5、集合的线程安全:线程安全集合:在System.Collections.Concurrent命名空间中,ConcurrentBag<T>泛型集合,其用法和List<T>类似

因为我们平时使用的集合如List<T>是线程不安全的。比如,我们用10个线程同时往一个List内插入数据,每个线程插入1000个,插入完成后,list中的元素数量并不一定是10000个。官方为了解决这个不安全的集合出现的问题,因此为我们提供了线程安全的集合ConcurrentBag<T>。

ConcurrentBag中的数据并不是按照顺序排列的,他是随机的。我们平时使用的Max、First、Last等linq方法都存在。关于线程安全的集合还有很多,和我们平时用的集合都差不多,比如类似Dictionary的ConcurrentDictionary,还有ConcurrentStack,ConcurrentQueue等。

6、Monitor和Lock,以及自旋锁SpinLock、进程级互斥锁Mutex

作用:在指定对象上获取排他锁,用于同步代码区

a、Lock关键字是Monitor的一种替换用法,lock在IL代码中会被翻译成Monitor.  如下两段伪代码,他们的作用一样:

lock(obj){
//代码段do something()
} 
Monitor.Enter(obj); 
//代码段do something()
Monitor.Exit(obj);

所以lock能做的,Monitor肯定能做,但是Monitor能做的,lock不一定能做(Monitor可将粒度控制的更细,比如获取锁后,可以在执行一部分代码后,释放锁并等待别的线程的通知,而其他“等待锁”的线程就可以获取到锁来执行)详细参考:Monitor和lock的区别

b、Monitor伪代码:

Enter:获取一个锁。如果其他线程已对该对象执行了 Enter,但尚未执行对应的 Exit,则当前线程将阻止,直到对方线程释放该对象

Exit:释放锁。调用线程必须拥有锁。 Exit 和 Enter 调用次数必须相同,相同则该锁将被释放。否则则该锁不会被释放。

 

            while (1> 0) //异步读
            {
                Monitor.Enter(_mylock);//获取锁
                //do something
                Monitor.Exit(_mylock);//释放锁
            }

 

TryEnter:在指定的毫秒数内尝试获取指定对象上的排他锁。如果在指定的毫秒数内获得排他锁,则返回True,否则返回False。

        static object _mylock = new object();
        private static void Sum(Int32 n)
        {
            if (Monitor.TryEnter(_mylock, 1000)) //注意这里,如果1s内获得锁,则进入if,超时后进入else
            {
                //do something
                Monitor.Exit(_mylock);
            }
            else
            {
                //do another something
            }
        }

Bool Monitor.Wait(object,int waitTime)与Monitor.Pulse(monster)

wait:如果指定的超时间隔已过,则线程进入就绪队列。Int32 是一个毫秒数;该方法释放排他锁,阻塞当前线程,如果在规定的毫秒数内获得是锁的控制权,就返回True, 该线程继续运行; 否则就返回False,该线程也继续运行。
Pulse:只有锁的当前所有者可以使用 Pulse 向等待对象发出信号,当前拥有指定对象上的锁的线程调用此方法以便向队列中的下一个线程���出锁的信号。接收到脉冲后,等待线程就被移动到就绪队列中。在调用 Pulse 的线程释放锁后,就绪队列中的下一个线程(不一定是接收到脉冲的线程)将获得该锁。

c、SpinLock自旋锁:轻量级的锁,因为lock和monitor是重量级锁,.net4.0以后出现了轻量级锁,性能更优。

        static SpinLock _spinlock = new SpinLock();
        private static void Sum(Int32 n)
        {
            bool lockTaken = false;
            try
            {
                _spinlock.Enter(ref lockTaken);
                //do something     
            }
            finally
            {
                if (lockTaken) _spinlock.Exit(false);
            }
        }

 d、Mutex,互斥锁,一个超重量级的锁,目的是用于系统进程级的控制,也可以用于线程上,但有点牛刀杀鸡的味道。如下示例,在软件的第一个窗口的构造函数内,添加如下代码,就可以实现“控制一个winform程序只能启动一次“。

        public Form1()
        {
            InitializeComponent();
            #region 只能运行一个程序
            bool flag = false;
            Mutex mutex = new Mutex(true, "MutexTestName", out flag);
            //第一个参数:true--给调用线程赋予互斥体的初始所属权
            //第一个参数:互斥体的名称
            //第三个参数:返回值,如果调用线程已被授予互斥体的初始所属权,则返回true
            if (!flag)
            {
                MessageBox.Show("程序已运行!", "确定", MessageBoxButtons.OK, MessageBoxIcon.Exclamation);
                Environment.Exit(1);//退出程序
            }
            #endregion
        }

几种线程锁的区别:monitor是最基本的,lock是封装的monitor,SpinLock是官方看到monitor太重量开发的一个轻量级线程锁。Mutex定位是一个进程锁(但也可以使用到线程的控制上,只是这个锁死在系统层面存放与管理)

 

添加评论

Loading