ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

多线程---Parallel(并行编程)

2022-07-24 12:02:21  阅读:148  来源: 互联网

标签:Console Age --- static WriteLine new 多线程 Parallel


1.基本介绍

Parallel类是对线程的一个很好的抽象。该类位于System.Threading.Tasks命名空间中,提供了数据和任务并行性。

2.Parallel.Invoke 主要用于任务的并行

这个函数的功能和Task有些相似,就是并发执行一系列任务,然后等待所有完成。和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相关管理功能。它有两种形式:
  Parallel.Invoke( params Action[] actions);
  Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options);

      static void Main()
        {
            try
            {
                Parallel.Invoke(
                    BasicAction,    
                    () =>            
                    {
                        Console.WriteLine("Method=beta, Thread={0}", Thread.CurrentThread.ManagedThreadId);
                    },
                    delegate ()        
                    {
                        Console.WriteLine("Method=gamma, Thread={0}", Thread.CurrentThread.ManagedThreadId);
                    }
                );
            }
            // No exception is expected in this example, but if one is still thrown from a task,
            // it will be wrapped in AggregateException and propagated to the main thread.
            catch (AggregateException e)
            {
                Console.WriteLine("An action has thrown an exception. THIS WAS UNEXPECTED.\n{0}", e.InnerException.ToString());
            }
        }

        static void BasicAction()
        {
            Console.WriteLine("Method=alpha, Thread={0}", Thread.CurrentThread.ManagedThreadId);
        }

3.Parallel.For方法,主要用于处理针对数组元素的并行操作(数据的并行) 

       static void Main(string[] args)
        {
            int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.For(0, nums.Length, (i) =>
            {
                Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }

4.Parallel.ForEach方法,主要用于处理泛型集合元素的并行操作(数据的并行)

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncDemo1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<Student> students = new List<Student>();
            students.Add(new Student() { 
                StudentID=1,
                Name="张三",
                Age=18
            });
            students.Add(new Student()
            {
                StudentID = 2,
                Name = "李四",
                Age = 17
            });
            students.Add(new Student()
            {
                StudentID = 3,
                Name = "王五",
                Age = 19
            });
            Parallel.ForEach(students, (item)=>
            {
                Console.WriteLine($"学生编号:{item.StudentID},姓名:{item.Name},年龄:{item.Age}");
            });
            Console.ReadLine();
        }
    }

    public class Student
    {
        public int StudentID { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
    }
}

5.ForEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现。

        static void Main(string[] args)
        {
            ConcurrentBag<int> bag = new ConcurrentBag<int>();
            var watch = Stopwatch.StartNew();
            watch.Start();
            Parallel.ForEach(Partitioner.Create(0, 3000000), tuple =>
            {
                for (int m = tuple.Item1; m < tuple.Item2; m++)
                {
                    bag.Add(m);
                }
            });
            Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
            GC.Collect();
            Console.ReadLine();
        }

6.ParallelOptions类

有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了MaxDegreeOfParallelism属性。

  static void Main(string[] args)
        {
            List<Student> students = new List<Student>();
            students.Add(new Student()
            {
                StudentID = 1,
                Name = "张三",
                Age = 18
            });
            students.Add(new Student()
            {
                StudentID = 2,
                Name = "李四",
                Age = 17
            });
            students.Add(new Student()
            {
                StudentID = 3,
                Name = "王五",
                Age = 19
            });
            Parallel.ForEach(students, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (item) =>
                {
                    Console.WriteLine($"学生编号:{item.StudentID},姓名:{item.Name},年龄:{item.Age}");
                });
            Console.ReadLine();
        }
 public class Student
    {
        public int ID { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
        public DateTime CreateTime { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var dic = LoadData();
            Stopwatch watch = new Stopwatch();
            watch.Start();
            var query2 = (from n in dic.Values.AsParallel()
                          where n.Age > 20 && n.Age < 25
                          select n).ToList();
            watch.Stop();
            Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds);

            Console.Read();
        }

        public static ConcurrentDictionary<int, Student> LoadData()
        {
            ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
            ParallelOptions options = new ParallelOptions();
            //指定使用的硬件线程数为4
            options.MaxDegreeOfParallelism = 4;
            //预加载1500w条记录
            Parallel.For(0, 15000000, options, (i) =>
            {
                var single = new Student()
                {
                    ID = i,
                    Name = "hxc" + i,
                    Age = i % 151,
                    CreateTime = DateTime.Now.AddSeconds(i)
                };
                dic.TryAdd(i, single);
            });

            return dic;
        }
    }

7.中途退出并行循环

       在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。
  Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。
  Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。

 static void Main(string[] args)
        {
            ConcurrentBag<int> bag = new ConcurrentBag<int>();
            Parallel.For(0, 20000000, (i, state) =>
            {
                if (bag.Count == 1000)
                {
                    //state.Break();
                    state.Stop();
                    return;
                }
                bag.Add(i);
            });
            Console.WriteLine("当前集合有{0}个元素。", bag.Count);
        }

8.并行中的异常处理

首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就可以获取到一组异常。

       static void Main(string[] args)
        {
            try
            {
                Parallel.Invoke(Run1, Run2);
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            Console.WriteLine("结束了!");
            //Console.Read();
        }

        static void Run1()
        {
            Thread.Sleep(3000);
            throw new Exception("我是任务1抛出的异常");
        }

        static void Run2()
        {
            Thread.Sleep(5000);
            throw new Exception("我是任务2抛出的异常");
        }

还有另外一种就是直接在Parallel处理异常

 private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            Parallel.For(0, 10, (i) =>
            {
                try
                {
                    TestClass a = new TestClass();
                    a.Test(i);
                }
                catch (Exception ex)
                {
                    lock (locker)
                    {
                        errList.Add(ex.Message);
                    }
                    //Console.WriteLine(ex.Message);
                    //注:这里不再将错误抛出.....
                    //throw ex;
                }
            });

            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的错误:{1}", Index++, err);
            }
        }

参考链接:https://www.cnblogs.com/scmail81/p/9521096.html

               https://docs.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.parallel?view=net-6.0

标签:Console,Age,---,static,WriteLine,new,多线程,Parallel
来源: https://www.cnblogs.com/hobelee/p/16514228.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有