基于阻塞队列的生产者消费者C#并发设计
标签:运行 生产者 view proc dynamic block exception rtp oid
这是从上文的>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看>,我改成适合我的版本了,直接给code:
调用code:
static void Main(string[] args)
{
ProcessQueueint> processQueue = new ProcessQueueint>();
processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent;
processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent;
for (int i = 0; i 50; i++)
{
processQueue.Enqueue(i);
}
Console.WriteLine("阻塞队列的数量: {0}", processQueue.GetInternalItemCount());
processQueue.Flush();
Console.Read();
}
///
/// 该方法对入队的每个元素进行处理
///
///
private static void ProcessQueue_ProcessItemEvent(int value)
{
Console.WriteLine("输出: {0}", value);
}
///
/// 处理异常
///
/// 队列实例
/// 异常对象
/// 出错的数据
private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value)
{
Console.WriteLine(ex.ToString());
}
封装的队列:
public class ProcessQueue
{
private BlockingCollection _queue;
private CancellationTokenSource _cancellationTokenSource;
private CancellationToken _cancellToken;
//内部线程池
private List _threadCollection;
//队列是否正在处理数据
private int _isProcessing;
//有线程正在处理数据
private const int Processing = 1;
//没有线程处理数据
private const int UnProcessing = 0;
//队列是否可用
private volatile bool _enabled = true;
//内部处理线程数量
private int _internalThreadCount;
// 消费者处理事件
public event Action ProcessItemEvent;
//处理异常,需要三个参数,当前队列实例,异常,当时处理的数据
public event Actiondynamic, Exception, T> ProcessExceptionEvent;
public ProcessQueue()
{
_queue = new BlockingCollection();
_cancellationTokenSource = new CancellationTokenSource();
_internalThreadCount = 3;
_cancellToken = _cancellationTokenSource.Token;
_threadCollection = new List();
}
public ProcessQueue(int internalThreadCount) : this()
{
this._internalThreadCount = internalThreadCount;
}
///
/// 队列内部元素的数量
///
public int GetInternalItemCount()
{
//return _queue.Count;
return _threadCollection.Count;
}
//生产者生产
public void Enqueue(T items)
{
if (items == null)
{
throw new ArgumentException("items");
}
_queue.Add(items);
DataAdded();
}
public void Flush()
{
StopProcess();
while (_queue.Count != 0)
{
T item = default(T);
if (_queue.TryTake(out item))
{
try
{
ProcessItemEvent(item);
}
catch (Exception ex)
{
OnProcessException(ex, item);
}
}
}
}
// 通知消费者消费队列元素
private void DataAdded()
{
if (_enabled)
{
if (!IsProcessingItem())
{
Console.WriteLine("DataAdded");
ProcessRangeItem();
StartProcess();
}
}
}
//判断是否队列有线程正在处理
private bool IsProcessingItem()
{
// 替换第一个参数, 如果相等
//int x = Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing);
return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing);
}
// 多消费者消费
private void ProcessRangeItem()
{
for (int i = 0; i this._internalThreadCount; i++)
{
ProcessItem();
}
}
// 开启消费处理
private void ProcessItem()
{
Thread currentThread = new Thread((state) =>
{
T item = default(T);
while (_enabled)
{
try
{
try
{
if (!_queue.TryTake(out item))
{
//Console.WriteLine("阻塞队列为0时的item: {0}", item);
//Console.WriteLine("ok!!!");
break;
}
// 处理事件
ProcessItemEvent(item);
}
catch (OperationCanceledException ex)
{
DebugHelper.DebugView(ex.ToString());
}
}
catch (Exception ex)
{
OnProcessException(ex, item);
}
}
});
_threadCollection.Add(currentThread);
}
// 开启消费者
private void StartProcess()
{
//Console.WriteLine("线程的数量: {0}", _threadCollection.Count);
foreach (var thread in _threadCollection)
{
thread.Start();
thread.IsBackground = true;
}
}
// 终止运行
private void StopProcess()
{
this._enabled = false;
foreach (var thread in _threadCollection)
{
if (thread.IsAlive)
{
thread.Join();
}
}
_threadCollection.Clear();
}
private void OnProcessException(Exception ex, T item)
{
var tempException = ProcessExceptionEvent;
Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null);
if (tempException != null)
{
ProcessExceptionEvent(this, ex, item);
}
}
}
基于阻塞队列的生产者消费者C#并发设计
标签:运行 生产者 view proc dynamic block exception rtp oid
原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8312965.html
评论