- Код: Выделить всё
fCompletion:=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,Length(fThreads));
Задачи в этот порт пихаются:
- Код: Выделить всё
PostQueuedCompletionStatus(fCompletion,0,0,POverlapped(Data));
А рабочий поток получает их так:
- Код: Выделить всё
while(GetQueuedCompletionStatus(fCompletion,T,K,Ovr,INFINITE))do begin
.. // выполнение реальной работы
end;
Тем не менее, несмотря на то, что у меня такой вариант вполне себе работает (в многопоточной загрузке файлов из интернета), я не могу сказать, что на 100% точно уверен, как это работает. Потому, у меня используется другой вариант, который мне более понятен:
- Код: Выделить всё
// Реализует пул потоков на базе массива
// Пример функции потока (одной из пула):
// function Foo(P:pointer):integer;
// var
// Data:PTaskInfo absolute P;
// Obj:TObject;
// begin
// repeat
// Obj:=Data.Queue.Pop;
// if Obj=nil then break;
// ... выполняем нашу работу
// until false;
// Result:=0;
// end;
unit ThreadQueue;
interface
Uses
Windows;
Type
TThreadQueue=class;
// это запись, которая передаётся каждому потоку в пуле
PTaskInfo=^TTaskInfo;
TTaskInfo=record
ThreadInfo:pointer;
ThreadNo:integer;
Queue:TThreadQueue;
end;
// собственно, пул
TThreadQueue=class
private
fThreads:array of cardinal;
fTasks:array of TObject;
fMaxTask:integer;
fSemaphore,fKillEvent,fPauseEvent:cardinal;
fRealObjs:boolean;
fCritical:TRTLCriticalSection;
public
// создаёт пул. Параметры:
// ThreadCount - количество потоков в пуле. Должно быть не меньше 2 (иначе нет смысла в пуле)
// TaskCount - максимальное количество задач в буфере... можно ставить довольно большим, например, 1000
// ThreadFunc - функция потока (на базе которой будут созданы все потоки пула)
// aThreadInfo - дополнительная информация, передаваемая пулу
// TasksAreRealObj - если установлена в true, деструктор вызовет .Free для всех задач, которые остались в буфере
// НЕ устанавливайте его в true, если собиратесь кормить потоки фиктивными объектами, например, TObject(1)
Constructor Create(ThreadCount,TaskCount:integer;ThreadFunc:TThreadFunc;aThreadInfo:pointer=nil;TasksAreRealObj:boolean=true);
Destructor Destroy;override;
// функция только для потока. Вызывайте её, чтобы получить новую задачу.
// если задач нет - функция автоматически уходит в ожидание
function Pop(Wait:cardinal=INFINITE):TObject;
// функция только для вызывающего приложения. Вызывайте её, чтобы отдать пулу новую задачу.
// НЕ пытайтесь положить в пул задачу nil.
function Push(S:TObject):boolean;
// останавливает выдачу задач в пул
procedure SetPaused(Pause:boolean);
end;
implementation
{ TThreadQueue }
Constructor TThreadQueue.Create(ThreadCount,TaskCount:integer;ThreadFunc:TThreadFunc;aThreadInfo:pointer=nil;TasksAreRealObj:boolean=true);
var
I:integer;
P:PTaskInfo;
ThId:cardinal;
begin
InitializeCriticalSection(fCritical);
fRealObjs:=TasksAreRealObj;
// сообщение выставляется при завершении работы
fKillEvent:=CreateEvent(nil,true,false,nil);
// максимальная длина очереди запросов. Сигнал, если есть сообщения.
if ThreadCount>10 then ThreadCount:=10;
if ThreadCount<2 then ThreadCount:=2;
if TaskCount>1000 then fMaxTask:=1000
else if TaskCount<ThreadCount then
fMaxTask:=ThreadCount
else
fMaxTask:=TaskCount;
// изначально, семафор обнулён, никто задания не получит
fSemaphore:=CreateSemaphore(nil,0,fMaxTask,nil);
// изначально пул НЕ стоит на паузе
fPauseEvent:=CreateEvent(nil,true,true,nil);
// генерируем потоки. Пока они могут инициализировать свои внутренние структуры,
// а потом встанут колом на Pop, потому что задач пока нет.
SetLength(fThreads,ThreadCount);
For I:=Low(fThreads) to High(fThreads) do begin
New(P);
P^.Queue:=self;
P^.ThreadInfo:=aThreadInfo;
P^.ThreadNo:=I;
fThreads[i]:=BeginThread(nil,64*1024,ThreadFunc,P,0,ThId);
end;
end;
Destructor TThreadQueue.Destroy;
var
LockTime,CurTime:cardinal;
S:string;
I:integer;
begin
// сообщаем потокам, что надо бы добровольно откинуть копыта
SetEvent(fKillEvent);
// снимаемся с паузы, на случай, если это было так
SetEvent(fPauseEvent);
// максимальное время ожидания - 2 секунды... пока хардкод
// в течении этого времени ВСЕ потоки должны откинуться
LockTime:=GetTickCount+2000;
for I:=Low(fThreads) to High(fThreads) do begin
CurTime:=GetTickCount;
if LockTime>CurTime then
CurTime:=LockTime-CurTime
else
CurTime:=0;
// ожидаем завершения конкретного потока
if WaitForSingleObject(fThreads[I],CurTime)<>WAIT_OBJECT_0 then begin
// выводим предупреждение, если поток откидываться добровольно не стал
Str(I,S);
OutputDebugString(PChar('Killed thread number '+S));
TerminateThread(fThreads[I],1);
end;
CloseHandle(fThreads[I]);
end;
// типовое освобождение ресурсов
if fRealObjs then
For I:=Low(fTasks) to High(fTasks) do
fTasks[i].Free;
CloseHandle(fKillEvent);
CloseHandle(fSemaphore);
CloseHandle(fPauseEvent);
DeleteCriticalSection(fCritical);
end;
procedure TThreadQueue.SetPaused(Pause:boolean);
begin
if Pause then
ResetEvent(fPauseEvent)
else
SetEvent(fPauseEvent);
end;
function TThreadQueue.Pop(Wait:cardinal=INFINITE):TObject;
var
Events:array[0..1]of cardinal;
I:integer;
begin
// ждём снятия с паузы
if WaitForSingleObject(fPauseEvent,Wait)=WAIT_TIMEOUT then begin
Result:=nil;
exit;
end;
events[0]:=fKillEvent;
events[1]:=fSemaphore;
// ждём снятия с семафора, что означает - есть задачи
if WaitForMultipleObjects(2,@Events,false,Wait)<>WAIT_OBJECT_0+1 then begin
Result:=nil;
exit;
end;
// входим в критическую секцию, нам нужно отредактировать список задач
EnterCriticalSection(fCritical);
if Length(fTasks)<=0 then begin
// такого быть не должно, но что-то подсказывает...
OutputDebugString('No tasks, but sema in signal state');
LeaveCriticalSection(fCritical);
Result:=nil;
exit;
end;
// выталкиваем первую задачу в очереди и смещаем остальные в конец
Result:=fTasks[0];
For I:=Low(fTasks) to High(fTasks)-1 do
fTasks[i]:=fTasks[i+1];
SetLength(fTasks,pred(Length(fTasks)));
// всё
LeaveCriticalSection(fCritical);
end;
function TThreadQueue.Push(S:TObject):boolean;
begin
// nil добавлять нельзя, это признак смерти потока
if S=nil then begin
Result:=false;
exit;
end;
// входим в критическую секцию, нам надо отредактировать перечень задач
EnterCriticalSection(fCritical);
if Length(fTasks)>=fMaxTask then begin
LeaveCriticalSection(fCritical);
Result:=false;
exit;
end;
// добавляем задачу в конец
SetLength(fTasks,succ(Length(fTasks)));
fTasks[High(fTasks)]:=S;
LeaveCriticalSection(fCritical);
// поднимаем семафор. Если есть готовые к исполнению потоки - они тут же получат задачу
ReleaseSemaphore(fSemaphore,1,nil);
Result:=true;
end;
end.
Собственно, этот вариант использования в моих проектах используется чаще. Но многопоточную отработку я применяю в основном когда ограничен по вводу-выводу. Например, чтение/запись файлов, загрузка из интернета. При попытке распараллелить работу с данными я чаще встречался с проблемами и потому предпочитал работать в один поток (даже если это и не GUI поток, а отдельный, но один).
В задаче про поиск студентов по группам я вижу только один вариант параллеленья: пусть у нас 4 потока, делим весь массив на 4 части, каждую отдаём своему потоку, ждём завершения всех, результат суммируем. Какого-то пула потоков тут я не вижу. Пул требуется когда заранее число исполняемых задач неизвестно, а порождение и уничтожение потока в Windows (да и Linux, наверное) - штука довольно дорогая, потому имеет смысл держать несколько "горячих" потоков, которые будут исполнять задачи без непрерывного порождения/смерти.