Использование WinApi для создания пула потоков

Форум для изучающих FPC и их учителей.

Модератор: Модераторы

Re: Использование WinApi для создания пула потоков

Сообщение vitaly_l » 01.04.2018 14:33:13

stesl писал(а):С таким примером у меня глаза на переносицу лезут. Пусть будет ф_ция с каким то параметром, и ты будешь натравливать на нее свои задачи. Из условия, если задача (или как правильно назвать то, что создает эта ф-ция...) ничем не занята, значит вперед. Ну и с имитацией бурной деятельности в вызываемой ф-ции (sleep подольше)

Код: Выделить всё
{$mode objfpc} {$h+} {$codepage UTF8}
uses
   Windows, SysUtils;

   function QueueUserWorkItem(func: LPTHREAD_START_ROUTINE; Context: pointer; Flags: ULONG): BOOL; stdcall; external kernel32;

var
   runningTasks: integer; // = 0
   allTasksCompleted: PRTLEvent;
   i,iCount,iArtists:integer;
   boZadacha:boolean;

   procedure PrepareTaskQueuing;
   begin
      if InterlockedIncrement(runningTasks) = 1 then RTLEventResetEvent(allTasksCompleted);
   end;

   procedure NoteTaskCompleted;
   begin
      if InterlockedDecrement(runningTasks) = 0 then RTLEventSetEvent(allTasksCompleted);
   end;


   procedure Tasks(param: pointer); stdcall;
   begin
      writeln('Попытка '+intToStr(runningTasks)+' выяснить что такое симафор №' + IntToStr(GetCurrentThreadID));

      while not boZadacha do sleep(1);
      boZadacha := false;
      inc(iCount);

      writeln('поток в работе №' + IntToStr(GetCurrentThreadID));
      Sleep(random(33));

      writeln('выполнено (поток №' + IntToStr(GetCurrentThreadID) + ')');
      NoteTaskCompleted;
      boZadacha := true;
   end;

begin
   iCount := 0;
   iArtists:=666;

   allTasksCompleted := RTLEventCreate;
   boZadacha := true;
   for i:=1 to iArtists do begin
       PrepareTaskQueuing;
       writeln('Добавление задачи № '+intToStr(i));
       QueueUserWorkItem(@Tasks, nil, 0);
   end;

   writeln(LineEnding + 'До переезда осталось 777 метров. Счётчик насчитал '+intToStr(iCount)+' художников. ');
   RTLEventWaitFor(allTasksCompleted);
   RTLEventDestroy(allTasksCompleted);

   writeln(LineEnding + 'До переезда осталось 666 метров. Счётчик насчитал '+intToStr(iCount)+' художников. ');
   writeln('Было '+intToStr(iArtists)+' художников. Исчезло '+intToStr(iArtists - iCount)+' художников. Куда исчезли художники?');
   writeln('Что такое семафор - сам знаешь http://www.freepascal.ru/forum/viewtopic.php?t=27500 ');
   readln;
end.
Последний раз редактировалось vitaly_l 01.04.2018 23:55:20, всего редактировалось 3 раз(а).
Аватара пользователя
vitaly_l
долгожитель
 
Сообщения: 3333
Зарегистрирован: 31.01.2012 16:41:41

Re: Использование WinApi для создания пула потоков

Сообщение olegy123 » 01.04.2018 15:35:57

stesl писал(а):Есть времечко? Можешь написать что нибудь с
Код: Выделить всё
QueueUserWorkItem

попроще :D
зачем вы мучаетесь. Мир ушел вперед..
Давно уже есть обертки - они работают надежно просто и эффективно.
olegy123
долгожитель
 
Сообщения: 1643
Зарегистрирован: 25.02.2016 12:10:20

Re: Использование WinApi для создания пула потоков

Сообщение stesl » 01.04.2018 16:23:45

А весело у вас тут :D
Как я понял, в этом симафоре и есть ответ на вопрос топика. Не могу по англицки только прочитать четко что за ф-ция QueueUserWorkItem
Уже перед уходом с работы мельком увидел, что точно такой же модуль, класс, ну не знаю я как правильно есть и в .NET И вроде как он и управляет системным пулом.
Получается системный пул есть всегда? Не, лучше не играть в догадки, сам себя запутаю.
Понял, что вместо параметра Nil, как раз можно параметры ф-ции своей передать.

Как эта вся банда без симафоров работает...

Добавлено спустя 9 минут 7 секунд:
olegy123 писал(а):ачем вы мучаетесь. Мир ушел вперед..
Давно уже есть обертки - они работают надежно просто и эффективно.

Что такое обертки?
stesl
новенький
 
Сообщения: 31
Зарегистрирован: 30.03.2018 05:40:02

Re: Использование WinApi для создания пула потоков

Сообщение vitaly_l » 01.04.2018 16:33:54

stesl писал(а):Понял, что вместо параметра Nil, как раз можно параметры ф-ции своей передать

:D
Аватара пользователя
vitaly_l
долгожитель
 
Сообщения: 3333
Зарегистрирован: 31.01.2012 16:41:41

Re: Использование WinApi для создания пула потоков

Сообщение stesl » 01.04.2018 16:43:55

vitaly_l писал(а):
stesl писал(а):Понял, что вместо параметра Nil, как раз можно параметры ф-ции своей передать

:D

Это я ещё новую банду не видел...
Конечно завтра все это пощупаю. А статейки научной не будет? Что же все таки делает эта ф-ция...
Или только после ответа - Где же остальные художники :)
stesl
новенький
 
Сообщения: 31
Зарегистрирован: 30.03.2018 05:40:02


Re: Использование WinApi для создания пула потоков

Сообщение runewalsh » 01.04.2018 17:03:33

Уф. Ну, похоже, ты не знаешь некоторых вещей и по-хорошему тебе нужно объяснять их с нуля, поэтому тема и превратилась в то, что есть. Что пока хотел сказать:

— Первая версия кода с WaitForMultipleObjects не работала потому, что переменная типа «динамический массив» — это указатель (на N динамически выделенных элементов). Беря указатель на неё, ты берёшь указатель на указатель, а WaitForMultipleObjects ожидает указатель на первый из N последовательных в памяти хэндлов, т. е. @Threads[0] (не скомпилируется с $TYPEDADDRESS) или просто pointer(Threads). Лучше, но более громоздко — скопировать хэндлы во временный TWOHandleArray и передать его. Выделять его динамически (PWOHandleArray := GetMem...) не обязательно.

Код: Выделить всё
uses
   Windows;

var
   evA, evB: HANDLE;
   wait: TWOHandleArray;

begin
   evA := CreateEvent(nil, FALSE, TRUE, nil);
   evB := CreateEvent(nil, FALSE, TRUE, nil);
   wait[0] := evA;
   wait[1] := evB;
   WaitForMultipleObjects(2, @wait, TRUE, INFINITE);
end.


— Функции WinAPI называются CreateThread и CloseHandle. Функции из модуля System называются BeginThread и CloseThread. Ты их мешаешь, это работает только потому, что BeginThread возвращает системный хэндл без изменений.

— Беру назад свои слова про QueueUserAPC, всё-таки, если уж доходит до реализации пула вручную, для очереди заданий её проще использовать (как в примере у Дож'а), чем писать очередь самому.

— QueueUserWorkItem добавляет задание в очередь на исполнение пулом потоков, предоставляемым системой. В моём примере на 4 задания автоматически создались 2 потока. MinThreads/MaxThreads не настраиваемы. Можно настроить при использовании «нового» API (CreateThreadpool), но оно сложнее.

— Если потоки не пишут в перекрывающиеся области памяти и второй не читает то, что записал первый (например, у тебя 100 объектов, первый поток обрабатывает 0–49, второй 50–99), синхронизация (мьютексы, aka одноместные семафоры) им не нужна. Но даже простое инкрементирование глобальной переменной не удовлетворяет этим условиям, да.
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 579
Зарегистрирован: 27.04.2010 00:15:25

Re: Использование WinApi для создания пула потоков

Сообщение stesl » 01.04.2018 17:39:15

runewalsh писал(а):— QueueUserWorkItem добавляет задание в очередь на исполнение пулом потоков, предоставляемым системой. В моём примере на 4 задания автоматически создались 2 потока. MinThreads/MaxThreads не настраиваемы. Можно настроить при использовании «нового» API (CreateThreadpool), но оно сложнее.

Зато вроде проще реализуемо. Семафор в примере моей лабы нужен. Вы верно подметили, что потоки работают со структурой по частям, каждый со своей. Но пишут рез-тат в одну и туже структуру. Делать это одновременно не стоит. Тем более что и позиции массива рез-тата (самый старший в группе студент, значит и позиция - группа) могут совпадать.
Про BeginThread мне впарил преподаватель. Я как обезьянка скопировал себе.
У него после упражнений с памятью (динамический массив) WaitForMultipleObjects работала.
Это нормально, что поток он запускал ф-цией из System (модуля как я понимаю, причем этого модуля в uses не было) а ожидал ф-цией API?
Код: Выделить всё
  ev1 := CreateEvent(nil, FALSE, TRUE, nil);
   ev2 := CreateEvent(nil, FALSE, TRUE, nil);
   wait[0] := ev1;
   wait[1] := ev2;

нужно проделать для всего количества потоков?
stesl
новенький
 
Сообщения: 31
Зарегистрирован: 30.03.2018 05:40:02

Re: Использование WinApi для создания пула потоков

Сообщение runewalsh » 01.04.2018 17:59:08

Модуль System подключается автоматически. CreateThread и BeginThread делают одно и то же, но первая — из WinAPI, а вторая — кроссплатформенная обёртка из стандартной библиотеки FPC, т. е. скомпилируется под любую систему (как следствие, у её коллбэка другая сигнатура — без stdcall), под Windows она вызывает внутри себя CreateThread.

>нужно проделать для всего количества потоков?
Нет, это просто пример правильной работы с WaitForMultipleObjects (которая тебе не нужна хд).

Да, и не используй TerminateThread, жди естественного завершения через WaitForThreadTerminate (System) / WaitForSingleObject (Windows) для собственноручно начатых потоков или, для заданий в пуле, по счётчику заданий + событию, как у меня с integer + PRTLEvent (System). TerminateThread убивает поток посреди чего угодно. Например, если поток был внутри GetMem, которая захватила блокировку кучи, следующий, кто вызовет GetMem, которой захочется захватить ту же блокировку, дедлокнется навсегда.
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 579
Зарегистрирован: 27.04.2010 00:15:25

Re: Использование WinApi для создания пула потоков

Сообщение stesl » 02.04.2018 11:52:38

runewalsh спасибо! После
runewalsh писал(а):Уф. Ну, похоже, ты не знаешь некоторых вещей и по-хорошему тебе нужно объяснять их с нуля

началось именно то, что мне и нужно :roll:
Полдня сегодня мучался.
ты берёшь указатель на указатель

Понял! WaitForMultipleObjects ожидает объекты из массива TWOHandleArray на ура. Понял почему это
Лучше, но более громоздко

64 хэндла. А если больше надо будет, то можно и еще такой же сделать и заполнять один за другим. А можно и ограничение при вводе кол-ва потоков поставить, объяснив в отчете - почему
Не сразу понял
Нет, это просто пример правильной работы с WaitForMultipleObjects (которая тебе не нужна хд).

Но похоже начинаю понимать. Мы в душе не... знаем сколько потоков у нас работают на данный момент.
Начал всяко хиромантить... Вспоминая примеры. И пришел к выводу, что я один черт не до конца понял КАК РАБОТАЕТ QueueUserAPC
1. Создали пул (массив потоков)
2. Усыпили эти потоки
3. С помощью QueueUserAPC запустили один из спящих потоков на ф-цию
4. Что то в этой ф-ции поток сделал, и потом ЧТО? будет соответствовать заданию? Усыпить его снова SleepEx? Я там код, кстати, не до конца понял, спрошу еще. Если нет, то ЧТО по природе ф-ции QueueUserAPC он делает? Все примеры даны линейно, задание, напомню
При обработке массива данных пулом потоков, завершив обработку одного элемента массива данных, освободившийся в пуле поток переходит к обработке следующего необработанного элемента.

то есть мы должны их ротировать пока не выполним задание. Если я вообще этот чертов документ понимаю. :?: По природе QueueUserAPC не запустит же поток, который уже работает? Ротация (итерация) пойдет дальше...
И ни фига что то дебит с кредитом не бъётся. Оставил свои закомментированные изыскания. Видно, что иду уже по кругу :cry:
Код: Выделить всё
{$mode objfpc} {$h+}
uses windows;
TYPE
DBThreads=   record
ID         :Cardinal;
end;
var
i,n,count   :word;
Threads      :TWOHandleArray;
ThreadIDs   :array of DBThreads;
Sem         :HANDLE;
Status,Zadacha,Zadanie      :LongWord;
runningTasks:integer; // = 0
allTasksCompleted: PRTLEvent;

procedure PrepareTaskQueuing;
   begin
      if InterlockedIncrement(runningTasks) = 1 then RTLEventResetEvent(allTasksCompleted);
   end;

   procedure NoteTaskCompleted;
   begin
      if InterlockedDecrement(runningTasks) = 0 then RTLEventSetEvent(allTasksCompleted);
   end;

function QueueUserAPC(pfnAPC:Pointer;hThread:HANDLE;dwData:ULONG_PTR):DWORD;
stdcall; external 'Kernel32.dll';

function ThreadFunction(lpParameter: Pointer): Cardinal; stdcall;
begin
  while True do
    SleepEx(INFINITE, TRUE);
  Exit(0);
end;

FUNCTION PrintInteger(Param: integer):ptrint; stdcall;
var
ID:Cardinal;
First:bool;
Potok,a:integer;
q:pointer;
begin
   WaitForSingleObject(Sem, INFINITE);
   inc (Zadacha);
   Writeln('Задача: ',Zadacha,' выполнена потоком c ID ',Param);      
   ReleaseSemaphore(Sem, 1, nil);
   ThreadFunction(q);
   writeln(GetCurrentThreadId,' проснулся');
   NoteTaskCompleted;
   PrintInteger:=0;
end;

begin
   write('Сколько потоков будет в пуле? ');
   readln(n);
   write('Сколько задач нужно выполнить? ');
   readln(Zadanie);
   SetLength(ThreadIDs,n);   
   for i:=0 to n-1 do
   Threads[i] := CreateThread(nil, 0, @ThreadFunction, nil, 0, ThreadIDs[i].ID);
   Sem := CreateSemaphore(nil, 1, 1, nil);
   allTasksCompleted := RTLEventCreate;
      if Zadacha<>Zadanie then
{    begin
      if count=n then count:=0;
      QueueUserAPC(@PrintInteger,Threads[count],ThreadIDs[count].ID);
      inc (count);
   end; }
   repeat
      writeln('Выполнено ',Zadacha,' задач');
      if count=n then count:=0;
      PrepareTaskQueuing;
      QueueUserAPC(@PrintInteger,Threads[count],ThreadIDs[count].ID);
      inc (count);
   until Zadacha=Zadanie;
{    repeat
      count:=0;
         for i:=0 to n-1 do
      begin
         GetExitCodeThread(Threads[i], Status);
         if (Status<>0) and (Status<>259) then inc(count);
      end;
   until count=0; }
//   WaitForMultipleObjects(count, @Threads, TRUE, INFINITE );
   RTLEventWaitFor(allTasksCompleted);
   writeln ('Все потоки в домике');
   for i:=0 to n-1 do CloseHandle(Threads[i]);   
   RTLEventDestroy(allTasksCompleted);
   writeln ('Все потоки закрыты');
   readln
end.

ВОПРОСЫ ПО КОДУ
1. Что дают директивы компилятора {$mode objfpc} {$h+}?
2. Почему в ThreadFunction сразу начинается проверка какой то переменной, или я даже хз что это такое - True? Что это за проверка циклом while, чего?
3. Если соблюдается это условие, то поток усыпляем. Как то там сигнально, но короче с возможностью его использования QueueUserAPC в дальнейшем. Почему у Exit есть параметр (0), что он вообще делает? И в целом, чтоб, может быть до конца понять - условное ветвление тут не сработало бы что ли?
stesl
новенький
 
Сообщения: 31
Зарегистрирован: 30.03.2018 05:40:02

Re: Использование WinApi для создания пула потоков

Сообщение runewalsh » 02.04.2018 16:50:31

>64 хэндла. А если больше надо будет
Больше не нужно. Тебе не нужно вообще. Завершившиеся потоки не могут внезапно воскреснуть, так что WaitForMultipleObjects(bAll=TRUE) лишняя и полностью эквивалентна for i := ... do WaitForSingleObject(Threads[i], ...).

{$mode objfpc} — это режим с «современными» штуками (class, sizeof(integer) = 4 и т. д.), {$h+} — string = ansistring, эти директивы лазарус добавляет по умолчанию.
while true — это for(;;). (блин, ты серьёзно?)
exit(0) — это result := 0; exit;
SleepEx сама вызывает один из коллбэков, которые поток попросили выполнить через QueueUserAPC, или, если их нет, ждёт следующий N мс (здесь — бесконечно).
Отсюда следует, что ThreadFunction никогда не завершается, вечно выполняя APC без возможности её «вежливо» попросить выйти. Поток с ней завершается только потому, что по выходу из тела программы FPC вызывает ExitProcess, которая неявно обрывает все потоки в духе TerminateThread.
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 579
Зарегистрирован: 27.04.2010 00:15:25

Re: Использование WinApi для создания пула потоков

Сообщение stesl » 02.04.2018 17:03:54

:cry:
Почти ничего не понял. Скоро ветка умрет. Еще и с преподавателем немного поцапался. Кранты.
Ф-ция ThreadFunction в моем примере составлена неверно? Да, я вижу, что потоки там резвятся. Примерчик бы...

Добавлено спустя 7 минут 14 секунд:
Вещи, которые я пока не понял
runewalsh писал(а):это for(;;).

это наверно нагуглю завтра
runewalsh писал(а):коллбэков

термин... Не улавливаю
runewalsh писал(а):SleepEx сама вызывает

то есть QueueUserAPC готовит, а оперирует слипех?
stesl
новенький
 
Сообщения: 31
Зарегистрирован: 30.03.2018 05:40:02

Re: Использование WinApi для создания пула потоков

Сообщение runewalsh » 02.04.2018 17:43:36

while true — это while 1 = 1. Ты правда не знаешь, что такое true и false? :\
Калбек — это узбекское имя функция, которая передаётся в другую функцию, чтобы та её вызвала (например, функция-тело потока, которая передаётся в CreateThread).

Без QueueUserAPC (и вообще Windows) будет так:
Код: Выделить всё
{$mode objfpc} {$h+}
uses SysUtils;

type
   TTaskParam = string;
   TTaskProc = procedure(const param: TTaskParam);
   TTask = record
      proc: TTaskProc;
      param: TTaskParam;
   end;

var
   tasks: array of TTask;          // очередь заданий
   tasksLock: TRTLCriticalSection; // блокировка для манипуляций с очередью
   thereAreTasks: PRTLEvent;       // событие «есть задания»
   workers: array of TThreadID;

   function Worker(param: pointer): PtrInt; // поток, выполняющий задания из очереди
   var
      task: TTask;
      i: integer;
      ok: boolean;
   begin
      while true do
      begin
         RTLEventWaitFor(thereAreTasks);
         EnterCriticalSection(tasksLock);
         ok := length(tasks) > 0; // до EnterCriticalSection задачу мог перехватить другой воркер, тогда повторить всё сначала
         if ok then
         begin
            // извлечь первое задание, сдвинуть остальные
            task := tasks[0];
            for i := 0 to High(tasks) - 1 do
               tasks[i] := tasks[i + 1];
            SetLength(tasks, length(tasks) - 1);

            // если остались ещё задания, перевыставить событие thereAreTasks, т. к. оно автоматически сбрасывается после WaitFor
            if length(tasks) > 0 then RTLEventSetEvent(thereAreTasks);
         end;
         LeaveCriticalSection(tasksLock);

         if not ok then continue;
         if task.proc = nil then break; // магическое значение «выйти»

         task.proc(task.param); // наконец выполнить само задание
      end;
   end;

   procedure SpawnWorker;
   begin
      SetLength(workers, length(workers) + 1);
      workers[High(workers)] := BeginThread(@Worker);
   end;

   procedure AddTask(proc: TTaskProc; param: TTaskParam);
   begin
      if length(workers) = 0 then raise Exception.Create('Нет рабочих потоков!');
      EnterCriticalSection(tasksLock);
         SetLength(tasks, length(tasks) + 1);
         tasks[High(tasks)].proc := proc;
         tasks[High(tasks)].param := param;
         RTLEventSetEvent(thereAreTasks);
      LeaveCriticalSection(tasksLock);
   end;

   procedure EndAllWorkers;
   var
      i: integer;
   begin
      for i := 0 to High(workers) do
         AddTask(nil, ''); // магические значения «выйти» — по одному извлечёт каждый воркер

      for i := 0 to High(workers) do
      begin
         WaitForThreadTerminate(workers[i], 0); // это функция из System, 0 = бесконечно
         CloseThread(workers[i]);
      end;
      tasks := nil;
      workers := nil;
   end;

   procedure Task(const param: TTaskParam);
   begin
      writeln('Задача ', param, ' начата в потоке ', ThreadID, '.');
      Sleep(1500);
      writeln('Задача ', param, ' выполнена.');
   end;

begin
   InitCriticalSection(tasksLock);
   thereAreTasks := RTLEventCreate;

   SpawnWorker;
   SpawnWorker;
   AddTask(@Task, 'A');
   AddTask(@Task, 'B');
   AddTask(@Task, 'C');
   AddTask(@Task, 'D');
   EndAllWorkers;

   writeln('Всё!');
   readln;

   RTLEventDestroy(thereAreTasks);
   DoneCriticalSection(tasksLock);
end.
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 579
Зарегистрирован: 27.04.2010 00:15:25

Re: Использование WinApi для создания пула потоков

Сообщение vitaly_l » 02.04.2018 18:34:51

runewalsh
Пардон что, опять встреваю, но мне снова стало интересно, вот этот кусочек:
Код: Выделить всё
         ok := length(tasks) > 0; // до EnterCriticalSection задачу мог перехватить другой воркер, тогда повторить всё сначала
         if ok then
         begin
         ....
         end;
         if task.proc = nil then break;

В вышеприведённом коде - ТЕОРЕТИЧЕСКИ возможна бесконечная рекурсия, т.к. например два воркера вошли в EnterCriticalSection и оба естественно, вышли ни с чем на if task.proc = nil then break;. Однако, оба воркера, тут же, снова оба вошли в EnterCriticalSection и так до while true. Таким образом возможна бесконечная рекурсия. ИМХО (безусловно). :roll:

Верно?

.
Аватара пользователя
vitaly_l
долгожитель
 
Сообщения: 3333
Зарегистрирован: 31.01.2012 16:41:41

Re: Использование WinApi для создания пула потоков

Сообщение runewalsh » 02.04.2018 18:47:11

Раз была просигналена thereAreTasks, задания были. Если их больше нет, значит, последнее задание увёл из-под носа другой воркер, он его и выполнит, ну и ладно, спим дальше. С ивентами в большинстве сценариев не очень удобно работать из-за необходимости таких вот повторных проверок.

Если не перепроверить, со временем вылетит range check (без $rangechecks — AV) на tasks[0]:
Код: Выделить всё
{$mode objfpc} {$h+} {$rangechecks on}
uses SysUtils;

type
   TTaskParam = string;
   TTaskProc = procedure(const param: TTaskParam);
   TTask = record
      proc: TTaskProc;
      param: TTaskParam;
   end;

var
   tasks: array of TTask;          // очередь заданий
   tasksLock: TRTLCriticalSection; // блокировка для манипуляций с очередью
   thereAreTasks: PRTLEvent;       // событие «есть задания»
   workers: array of TThreadID;

   function Worker(param: pointer): PtrInt; // поток, выполняющий задания из очереди
   var
      task: TTask;
      i: integer;
      ok: boolean;
   begin
      while true do
      begin
         RTLEventWaitFor(thereAreTasks);
         EnterCriticalSection(tasksLock);
         ok := length(tasks) > 0; // до EnterCriticalSection задачу мог перехватить другой воркер, тогда повторить всё сначала
         ok := true; // !!!
         if ok then
         begin
            // извлечь первое задание, сдвинуть остальные
            task := tasks[0];
            for i := 0 to High(tasks) - 1 do
               tasks[i] := tasks[i + 1];
            SetLength(tasks, length(tasks) - 1);

            // если остались ещё задания, перевыставить событие thereAreTasks, т. к. оно автоматически сбрасывается после WaitFor
            if length(tasks) > 0 then RTLEventSetEvent(thereAreTasks);
         end;
         LeaveCriticalSection(tasksLock);

         if not ok then continue;
         if task.proc = nil then break; // магическое значение «выйти»

         task.proc(task.param); // наконец выполнить само задание
      end;
   end;

   procedure SpawnWorker;
   begin
      SetLength(workers, length(workers) + 1);
      workers[High(workers)] := BeginThread(@Worker);
   end;

   procedure AddTask(proc: TTaskProc; param: TTaskParam);
   begin
      if length(workers) = 0 then raise Exception.Create('Нет рабочих потоков!');
      EnterCriticalSection(tasksLock);
         SetLength(tasks, length(tasks) + 1);
         tasks[High(tasks)].proc := proc;
         tasks[High(tasks)].param := param;
         RTLEventSetEvent(thereAreTasks);
      LeaveCriticalSection(tasksLock);
   end;

   procedure EndAllWorkers;
   var
      i: integer;
   begin
      for i := 0 to High(workers) do
         AddTask(nil, ''); // магические значения «выйти» — по одному извлечёт каждый воркер

      for i := 0 to High(workers) do
      begin
         WaitForThreadTerminate(workers[i], 0); // это функция из System, 0 = бесконечно
         CloseThread(workers[i]);
      end;
      tasks := nil;
      workers := nil;
   end;

   procedure Task(const param: TTaskParam);
   begin
   end;

var
   i: integer;

begin
   InitCriticalSection(tasksLock);
   thereAreTasks := RTLEventCreate;

   for i := 1 to 2 * GetCPUCount do SpawnWorker;
   for i := 1 to 100000 * GetCPUCount do AddTask(@Task, 'dummy');
   EndAllWorkers;

   writeln('Всё!');
   readln;

   RTLEventDestroy(thereAreTasks);
   DoneCriticalSection(tasksLock);
end.
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 579
Зарегистрирован: 27.04.2010 00:15:25

Пред.След.

Вернуться в Обучение Free Pascal

Кто сейчас на конференции

Сейчас этот форум просматривают: нет зарегистрированных пользователей и гости: 4

Рейтинг@Mail.ru