线程池处理异步任务队列
2021-03-29 12:28
标签:kth get std signed work assign worker integer pos 线程池处理异步任务队列 线程池处理异步任务队列 标签:kth get std signed work assign worker integer pos 原文地址:https://www.cnblogs.com/hnxxcxg/p/13605686.html/// cxg 2020-9-3
/// 支持d7以上版本,更低版本没有测试,支持跨OS
unit tasks;
interface
uses
{$IFDEF mswindows}
Windows,
{$ENDIF}
{$IFDEF posix}
posix.Unistd, posix.Semaphore,
{$ENDIF}
Contnrs, SyncObjs, Classes, SysUtils;
type
TCallBack = procedure(task: Pointer) of object;
type
TThreadConf = class
private
fCallBack: TCallBack;
fThreadNum: Integer;
fWorkers: array of TThread;
procedure freeThreads;
procedure newThreads;
public
constructor Create(const threadNum: Integer = 0);
destructor Destroy; override;
procedure startThreads;
procedure stopThreads;
procedure allotTask(task: Pointer);
property onCallback: TCallBack read fCallBack write fCallBack;
end;
type
TWorkThread = class(TThread)
private
fConf: TThreadConf;
fQueue: TQueue;
public
constructor Create(conf: TThreadConf);
destructor Destroy; override;
procedure Execute; override;
procedure enqueue(task: Pointer);
end;
function cpuNum: Integer;
implementation
var
{$IFDEF mswindows}
hsem: THandle; //信号量
{$ELSE}
hsem: sem_t;
{$ENDIF}
gIndex: Integer;
function cpuNum: Integer;
{$IFDEF MSWINDOWS}
var
si: SYSTEM_INFO;
{$ENDIF}
begin
{$IFDEF MSWINDOWS}
GetSystemInfo(si);
Result := si.dwNumberOfProcessors;
{$ELSE}
Result := sysconf(_SC_NPROCESSORS_ONLN);
{$ENDIF}
end;
{ TThreadConf }
procedure TThreadConf.allotTask(task: Pointer);
var
i: integer;
thread: TWorkThread;
begin
i := AtomicIncrement(gIndex) mod fThreadNum;
thread := fWorkers[i] as TWorkThread;
thread.enqueue(task);
end;
constructor TThreadConf.Create(const threadNum: Integer = 0);
begin
fThreadNum := threadNum;
if fThreadNum = 0 then
fThreadNum := cpuNum;
SetLength(fWorkers, fThreadNum);
{$IFDEF mswindows}
hsem := CreateSemaphore(nil, 0, threadNum , nil);
{$ELSE}
sem_init(hsem, 0, 0);
{$ENDIF}
newThreads;
end;
destructor TThreadConf.Destroy;
begin
freeThreads;
{$IFDEF mswindows}
CloseHandle(hsem);
{$ELSE}
sem_destroy(hsem);
{$ENDIF}
inherited;
end;
procedure TThreadConf.freeThreads;
var
i: Integer;
begin
for i := 0 to fThreadNum - 1 do
begin
fWorkers[i].Terminate;
fWorkers[i].WaitFor;
FreeAndNil(fWorkers[i]);
end;
end;
procedure TThreadConf.newThreads;
var
i: Integer;
begin
for i := 0 to fThreadNum - 1 do
fWorkers[i] := TWorkThread.Create(Self);
end;
procedure TThreadConf.startThreads;
var
i: Integer;
begin
for i := 0 to fThreadNum - 1 do
{$IFDEF unicode}
fWorkers[i].Start;
{$ELSE}
fWorkers[i].Resume;
{$ENDIF}
end;
procedure TThreadConf.stopThreads;
var
i: Integer;
begin
for i := 0 to fThreadNum - 1 do
fWorkers[i].Suspend;
end;
{ TWorkThread }
constructor TWorkThread.Create(conf: TThreadConf);
begin
inherited Create(True);
FreeOnTerminate := true;
fConf := conf;
fQueue := TQueue.Create;
end;
destructor TWorkThread.Destroy;
begin
FreeAndNil(fQueue);
inherited;
end;
procedure TWorkThread.enqueue(task: Pointer);
begin
fQueue.Push(task);
{$IFDEF mswindows}
ReleaseSemaphore(hsem, 1, nil);
{$ELSE}
sem_post(hsem);
{$ENDIF}
end;
procedure TWorkThread.Execute;
var
task: Pointer;
procedure run;
begin
task := fQueue.Pop;
if task nil then
if Assigned(fConf.fCallBack) then
fConf.fCallBack(task);
end;
begin
{$IFDEF mswindows}
if WaitForSingleObject(hSem, INFINITE) = WAIT_OBJECT_0 then
begin
run;
ReleaseSemaphore(hsem, 1, nil);
end;
{$ELSE}
if sem_wait(hsem) > 0 then
begin
run;
sem_post(hsem);
end;
{$ENDIF}
end;
end.
上一篇:Java中4大基本加密算法解析
下一篇:数组实现一个简单的栈结构