澳门在线威尼斯官方 > 澳门在线威尼斯官方 > 澳门在线威尼斯官方:NET并行总括和并发4,线程

原标题:澳门在线威尼斯官方:NET并行总括和并发4,线程

浏览次数:124 时间:2019-09-13

1、IProducerConsumerCollection (线程安全接口)
  此接口的兼具完毕必需都启用此接口的具有成员,若要从多少个线程同期利用。

Thread Local Storage: Thread-Relative Static Fields and Data Slots



文章摘自msdn library官方文书档案

能够利用托管线程本地存款和储蓄区 (TLS) 存储某一线程和平运动用程序域所独有的多少。 .NET Framework 提供了二种选拔托管 TLS 的艺术:线程相关的静态字段和数据槽。

  • 纵然您能够在编写翻译时预料到您的适度要求,请使用线程相关的静态字段(在 Visual Basic 中为线程相关的 Shared 字段)。 线程相关的静态字段可提供最好品质。 它们还存有编译时类型检查的独到之处。

  • 比方不得不在运作时开掘你的其实须求,请使用数据槽。 数据槽比线程相关的静态字段慢一些且更为棘手使用,并且数据存款和储蓄为 Object.aspx) 类型,由此必得将其强制转换为准确的体系技术选拔。

在非托管 C++ 中,可以行使 TlsAlloc 来动态分配槽,使用 __declspec(thread) 来注明变量应在线程相关的存款和储蓄区中进行分配。 线程相关的静态字段和数据槽提供了此作为的托管版本。

在 .NET Framework 4中,能够选取 System.Threading.ThreadLocal<T>.aspx) 类创立线程当地对象,在率先次选择该对象时它将惰式初叶化。 有关详细新闻,请参阅延迟起首化.aspx)。

托管 TLS 中数量的独一性

 

甭管使用线程相关的静态字段照旧选取数据槽,托管 TLS 中的数据都是线程和选用程序域组合所独有的。

  • 在采纳程序域内部,二个线程不可能改改另一个线程中的数据,尽管那三个线程使用同三个字段或槽时也不可能。

  • 当线程从多少个应用程序域中拜访同一个字段或槽时,会在每一种应用程序域中保障多少个独门的值。

举例,假使有个别线程设置线程相关的静态字段的值,接着它走入另三个用到程序域,然后找寻该字段的值,则在其次个使用程序域中查找的值将分化于第3个应用程序域中的值。 在第三个利用程序域中为该字段设置三个新值不会影响率先个使用程序域中该字段的值。

一律,当有个别线程获取四个例外选用程序域中的同一命名数据槽时,第七个使用程序域中的数据将一向与第叁个应用程序域中的数据无关。

线程相关的静态字段

 

借使你领略有些数目连接有个别线程和接纳程序域组合所唯有的,请向该静态字段应用 ThreadStaticAttribute.aspx) 本性。 与运用别的其余静态字段同样选拔该字段。 该字段中的数据是每一个使用它的线程所唯有的。

线程相关的静态字段的属性优于数据槽,何况有着编写翻译时类型检查的帮助和益处。

请小心,任何类构造函数代码都就要拜望该字段的第一个上下文中的首个线程上运营。 在一样应用程序域内的具备别的线程或左右文中,假设字段是引用类型,它们将被开首化为 null(在 Visual Basic 中为 Nothing);即便字段是值类型,它们将被初阶化为它们的暗中同意值。 由此,您不应注重于类构造函数来初叶化线程相关的静态字段。 而应防止初阶化线程相关的静态字段并假定它们开始化为 null (Nothing) 或它们的暗中同意值。

数据槽

 

.NET Framework 提供了线程和选取程序域组合所只有的动态数据槽。 数据槽包罗两体系型:命名槽和未命名槽。 两个都是经过应用LocalDataStoreSlot.aspx) 结构来达成的。

  • 若要创建命名数据槽,请使用 Thread.AllocateNamedDataSlot.aspx) 或 Thread.GetNamedDataSlot.aspx) 方法。 若要赢得对有个别现成命名槽的引用,请将其名称传递给 GetNamedDataSlot.aspx) 方法。

  • 若要创造未命名数据槽,请使用 Thread.AllocateDataSlot.aspx) 方法。

对此命名槽和未命名槽,请使用 Thread.SetData.aspx) 和 Thread.GetData.aspx) 方法设置和检索槽中的音讯。 那一个都是静态方法,它们一贯功效于当下正值进行它们的线程的数量。

命名槽可能很方便,因为您能够在须求它时通过将其名称传递给 GetNamedDataSlot.aspx) 方法来搜索该槽,并不是维护对未命名槽的援用。 然而,假使另一个零件使用一样的名称来定名其线程相关的存储区,並且有贰个线程同期奉行来自您的组件和该器件的代码,则这多少个零部件大概会损坏相互的数码。(本方案一经那三个零件在平等应用程序域内运转,何况它们并不用于分享同样数量。)

澳门在线威尼斯官方 1澳门在线威尼斯官方 2

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace ConsoleApp1
{
    public class SafeStack<T> : IProducerConsumerCollection<T>
    {
        // Used for enforcing thread-safety
        private object m_lockObject = new object();

        // We'll use a regular old Stack for our core operations
        private Stack<T> m_sequentialStack = null;

        //
        // Constructors
        //
        public SafeStack()
        {
            m_sequentialStack = new Stack<T>();
        }

        public SafeStack(IEnumerable<T> collection)
        {
            m_sequentialStack = new Stack<T>(collection);
        }

        //
        // Safe Push/Pop support
        //
        public void Push(T item)
        {
            lock (m_lockObject) m_sequentialStack.Push(item);
        }

        public bool TryPop(out T item)
        {
            bool rval = true;
            lock (m_lockObject)
            {
                if (m_sequentialStack.Count == 0) { item = default(T); rval = false; }
                else item = m_sequentialStack.Pop();
            }
            return rval;
        }

        //
        // IProducerConsumerCollection(T) support
        //
        public bool TryTake(out T item)
        {
            return TryPop(out item);
        }

        public bool TryAdd(T item)
        {
            Push(item);
            return true; // Push doesn't fail
        }

        public T[] ToArray()
        {
            T[] rval = null;
            lock (m_lockObject) rval = m_sequentialStack.ToArray();
            return rval;
        }

        public void CopyTo(T[] array, int index)
        {
            lock (m_lockObject) m_sequentialStack.CopyTo(array, index);
        }



        //
        // Support for IEnumerable(T)
        //
        public IEnumerator<T> GetEnumerator()
        {
            // The performance here will be unfortunate for large stacks,
            // but thread-safety is effectively implemented.
            Stack<T> stackCopy = null;
            lock (m_lockObject) stackCopy = new Stack<T>(m_sequentialStack);
            return stackCopy.GetEnumerator();
        }


        //
        // Support for IEnumerable
        //
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }

        // 
        // Support for ICollection
        //
        public bool IsSynchronized
        {
            get { return true; }
        }

        public object SyncRoot
        {
            get { return m_lockObject; }
        }

        public int Count
        {
            get { return m_sequentialStack.Count; }
        }

        public void CopyTo(Array array, int index)
        {
            lock (m_lockObject) ((ICollection)m_sequentialStack).CopyTo(array, index);
        }
    }
}

SafeStack

澳门在线威尼斯官方 3澳门在线威尼斯官方 4

using System;
using System.Collections.Concurrent;

namespace ConsoleApp1
{
    class Program
    {
        static void Main()
        {
            TestSafeStack();

            // Keep the console window open in debug mode.
            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }

        // Test our implementation of IProducerConsumerCollection(T)
        // Demonstrates:
        //      IPCC(T).TryAdd()
        //      IPCC(T).TryTake()
        //      IPCC(T).CopyTo()
        static void TestSafeStack()
        {
            SafeStack<int> stack = new SafeStack<int>();
            IProducerConsumerCollection<int> ipcc = (IProducerConsumerCollection<int>)stack;

            // Test Push()/TryAdd()
            stack.Push(10); Console.WriteLine("Pushed 10");
            ipcc.TryAdd(20); Console.WriteLine("IPCC.TryAdded 20");
            stack.Push(15); Console.WriteLine("Pushed 15");

            int[] testArray = new int[3];

            // Try CopyTo() within boundaries
            try
            {
                ipcc.CopyTo(testArray, 0);
                Console.WriteLine("CopyTo() within boundaries worked, as expected");
            }
            catch (Exception e)
            {
                Console.WriteLine("CopyTo() within boundaries unexpectedly threw an exception: {0}", e.Message);
            }

            // Try CopyTo() that overflows
            try
            {
                ipcc.CopyTo(testArray, 1);
                Console.WriteLine("CopyTo() with index overflow worked, and it SHOULD NOT HAVE");
            }
            catch (Exception e)
            {
                Console.WriteLine("CopyTo() with index overflow threw an exception, as expected: {0}", e.Message);
            }

            // Test enumeration
            Console.Write("Enumeration (should be three items): ");
            foreach (int item in stack)
                Console.Write("{0} ", item);
            Console.WriteLine("");

            // Test TryPop()
            int popped = 0;
            if (stack.TryPop(out popped))
            {
                Console.WriteLine("Successfully popped {0}", popped);
            }
            else Console.WriteLine("FAILED to pop!!");

            // Test Count
            Console.WriteLine("stack count is {0}, should be 2", stack.Count);

            // Test TryTake()
            if (ipcc.TryTake(out popped))
            {
                Console.WriteLine("Successfully IPCC-TryTaked {0}", popped);
            }
            else Console.WriteLine("FAILED to IPCC.TryTake!!");
        }
    }
}

Program

2、ConcurrentStack类:安全酒馆

澳门在线威尼斯官方 5澳门在线威尼斯官方 6

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
        }

        static async Task RunProgram()
        {
            var taskStack = new ConcurrentStack<CustomTask>();
            var cts = new CancellationTokenSource();

            var taskSource = Task.Run(() => TaskProducer(taskStack));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(
                    () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token));
            }

            await taskSource;
            cts.CancelAfter(TimeSpan.FromSeconds(2));

            await Task.WhenAll(processors);
        }

        static async Task TaskProducer(ConcurrentStack<CustomTask> stack)
        {
            for (int i = 1; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomTask { Id = i };
                stack.Push(workItem);
                Console.WriteLine("Task {0} has been posted", workItem.Id);
            }
        }

        static async Task TaskProcessor(
            ConcurrentStack<CustomTask> stack, string name, CancellationToken token)
        {
            await GetRandomDelay();
            do
            {
                CustomTask workItem;
                bool popSuccesful = stack.TryPop(out workItem);
                if (popSuccesful)
                {
                    Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);
                }

                await GetRandomDelay();
            }
            while (!token.IsCancellationRequested);
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        class CustomTask
        {
            public int Id { get; set; }
        }
    }
}

Program

3、ConcurrentQueue类:安全队列

澳门在线威尼斯官方 7澳门在线威尼斯官方 8

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
        }

        static async Task RunProgram()
        {
            var taskQueue = new ConcurrentQueue<CustomTask>();
            var cts = new CancellationTokenSource();

            var taskSource = Task.Run(() => TaskProducer(taskQueue));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(
                    () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token));
            }

            await taskSource;
            cts.CancelAfter(TimeSpan.FromSeconds(2));

            await Task.WhenAll(processors);
        }

        static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
        {
            for (int i = 1; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomTask { Id = i };
                queue.Enqueue(workItem);
                Console.WriteLine("插入Task {0} has been posted ThreadID={1}", workItem.Id, Thread.CurrentThread.ManagedThreadId);
            }
        }

        static async Task TaskProcessor(
            ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
        {
            CustomTask workItem;
            bool dequeueSuccesful = false;

            await GetRandomDelay();
            do
            {
                dequeueSuccesful = queue.TryDequeue(out workItem);
                if (dequeueSuccesful)
                {
                    Console.WriteLine("读取Task {0} has been processed by {1} ThreadID={2}",
                                        workItem.Id, name, Thread.CurrentThread.ManagedThreadId);
                }

                await GetRandomDelay();
            }
            while (!token.IsCancellationRequested);
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        class CustomTask
        {
            public int Id { get; set; }
        }
    }
}

Program

4、ConcurrentDictionary类
  ConcurrentDictionary类写操作比使用锁的常常字典(Dictionary)要慢的多,而读操作则要快些。由此对字典要大方的线程安全的读操作,ConcurrentDictionary类是最佳的挑选
  ConcurrentDictionary类的兑现应用了细粒度锁(fine-grained locking)工夫,那在二十多线程写入方面比使用锁的平时的字典(也被喻为粗粒度锁)

澳门在线威尼斯官方 9澳门在线威尼斯官方 10

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var concurrentDictionary = new ConcurrentDictionary<int, string>();
            var dictionary = new Dictionary<int, string>();

            var sw = new Stopwatch();

            sw.Start();
            for (int i = 0; i < 1000000; i++)
            {
                lock (dictionary)
                {
                    dictionary[i] = Item;
                }
            }
            sw.Stop();
            Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed);

            sw.Restart();
            for (int i = 0; i < 1000000; i++)
            {
                concurrentDictionary[i] = Item;
            }
            sw.Stop();
            Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed);

            sw.Restart();
            for (int i = 0; i < 1000000; i++)
            {
                lock (dictionary)
                {
                    CurrentItem = dictionary[i];
                }
            }
            sw.Stop();
            Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed);

            sw.Restart();
            for (int i = 0; i < 1000000; i++)
            {
                CurrentItem = concurrentDictionary[i];
            }
            sw.Stop();
            Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed);
        }

        const string Item = "Dictionary item";
        public static string CurrentItem;
    }
}

Program

5、ConcurrentBag类

澳门在线威尼斯官方 11澳门在线威尼斯官方 12

namespace ConsoleApp1
{
    class CrawlingTask
    {
        public string UrlToCrawl { get; set; }

        public string ProducerName { get; set; }
    }
}

CrawlingTask

澳门在线威尼斯官方 13澳门在线威尼斯官方 14

using System.Collections.Generic;

namespace ConsoleApp1
{
    static class Module
    {
        public static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>();

        public static void CreateLinks()
        {
            _contentEmulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" };
            _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" };
            _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" };

            _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" };
            _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" };
            _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" };
            _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" };

            _contentEmulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" };
            _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" };
            _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" };

            _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" };
            _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" };
            _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" };
            _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" };
            _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" };
            _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" };
        }
    }
}

Module

澳门在线威尼斯官方 15澳门在线威尼斯官方 16

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            Module.CreateLinks();
            Task t = RunProgram();
            t.Wait();
        }

        static async Task RunProgram()
        {
            var bag = new ConcurrentBag<CrawlingTask>();

            string[] urls = new[] { "http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/" };

            var crawlers = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string crawlerName = "Crawler " + i.ToString();
                bag.Add(new CrawlingTask { UrlToCrawl = urls[i - 1], ProducerName = "root" });
                crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName));
            }

            await Task.WhenAll(crawlers);
        }

        static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName)
        {
            CrawlingTask task;
            //尝试从bag中取出对象
            while (bag.TryTake(out task))
            {
                IEnumerable<string> urls = await GetLinksFromContent(task);
                if (urls != null)
                {
                    foreach (var url in urls)
                    {
                        var t = new CrawlingTask
                        {
                            UrlToCrawl = url,
                            ProducerName = crawlerName
                        };
                        //将子集插入到bag中 
                        bag.Add(t);
                    }
                }
                Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!",
                    task.UrlToCrawl, task.ProducerName, crawlerName);
            }
        }

        static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task)
        {
            await GetRandomDelay();

            if (Module._contentEmulation.ContainsKey(task.UrlToCrawl)) return Module._contentEmulation[task.UrlToCrawl];

            return null;
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(150, 200);
            return Task.Delay(delay);
        }


    }
}

本文由澳门在线威尼斯官方发布于澳门在线威尼斯官方,转载请注明出处:澳门在线威尼斯官方:NET并行总括和并发4,线程

关键词:

上一篇:await的一些说明,await与async的正确打开方式

下一篇:架构的血液