Модератор: Модераторы
runewalsh писал(а):Она гуглится и там много странностей.
Дож писал(а):Именно это и спрошу, потому что смысла мудрить с WinApi функциями для задачи параллельного вызова функции не вижу (тем более раз уж всё равно планируется использовать какой-то механизм для критических секций).
runewalsh писал(а):Нужно добавить, что функции, добавленные через QueueUserAPC, вызовутся только в ходе «alertable wait» (SleepEx или WaitForSingleObjectEx с bAlertable = TRUE), т. е. когда поток явно обозначит готовность их выполнить. Если поток не использует Ex-варианты ожиданий, APC не вызовутся никогда. Изначально этот механизм сделан для обработчиков завершения ReadFileEx/WriteFileEx. Это не пул потоков, а своего рода аналог очереди сообщений (хотя теоретически поток пула может использовать механизм APC как очередь заданий и ждать самих заданий с bAlertable = TRUE... BindIoCompletionCallback примерно так и делает).
чо?
uses
windows;
function QueueUserAPC(pfnAPC: Pointer; hThread: HANDLE; dwData: ULONG_PTR): DWORD; stdcall; external 'Kernel32.dll';
function ThreadFunction(lpParameter: Pointer): Cardinal; stdcall;
begin
while True do
SleepEx(INFINITE, True);
Exit(0);
end;
procedure PrintString(dwParam: ULONG_PTR); stdcall;
begin
Writeln('String: ', PAnsiChar(dwParam));
end;
procedure PrintInteger(dwParam: ULONG_PTR); stdcall;
begin
Writeln('Integer: ', dwParam);
Sleep(500); // Изображаем бурную деятельность
end;
var
ThreadId: Cardinal;
hThread: HANDLE;
begin
hThread := CreateThread(nil, 0, @ThreadFunction, nil, 0, ThreadId);
QueueUserAPC(@PrintString, hThread, PtrUInt(PAnsiChar('Hello!')));
QueueUserAPC(@PrintString, hThread, PtrUInt(PAnsiChar('ABC')));
QueueUserAPC(@PrintString, hThread, PtrUInt(PAnsiChar('123')));
QueueUserAPC(@PrintInteger, hThread, 666);
QueueUserAPC(@PrintInteger, hThread, 555);
QueueUserAPC(@PrintInteger, hThread, 444);
Sleep(2000);
TerminateThread(hThread, 0);
end.
Говорю, забудь про QueueUserAPC, это не то ^_~
runewalsh писал(а):Говорю, забудь про QueueUserAPC, это не то ^_~
Эта штука нужна для <начал долгую операцию типа сетевого I/O> <делаешь что-то ещё> <когда стало нечего делать, вызываешь SleepEx и, если та операция закончилась, автоматически вызывается указанный в ней коллбэк завершения>. QueueUserAPC просто добавляет произвольные коллбэки, которые вызовет SleepEx.
ЛАБОРАТОРНАЯ РАБОТА № 2
Цель выполнения лабораторной работы № 2 — освоить реализацию многопоточной обработки данных, а также пула потоков и механизма асинхронного ввода/вывода.
2.1 Задание
В работе необходимо реализовать многопоточную обработку массива структур данных (из N элементов) четырьмя способами:
1. При помощи массива из M потоков (M ≤ N), используя для синхронизации объект ядра — семафор.
2. При помощи массива из M потоков (M ≤ N), используя для синхронизации сеть Петри, моделирующую семафор.
3. При помощи пула из M потоков (M ≤ N), используя системный пул потоков или асинхронные потоки ввода/вывода.
4. При помощи пула из M потоков (M ≤ N), моделируя его при помощи сети Петри.
При обработке массива данных массивом потоков каждый поток либо заранее выбирает диапазон элементов массива данных, которые он будет обрабатывать, либо просто берет первый необработанный элемент. Завершив обработку одного элемента, поток переходит к обработке следующего.
При обработке массива данных пулом потоков, завершив обработку одного элемента массива данных, освободившийся в пуле поток переходит к обработке следующего необработанного элемента.
Чтобы не требовалось создавать слишком большие массивы (для которых эффект от параллельной обработки будет наиболее очевидным), можно имитировать ситуацию, когда обработка одного элемента массива требует больше процессорного времени, чем в действительности. Для этого после обработки очередного
элемента массива поток может делать паузу на указанное количество миллисекунд.
Мой вариант:
Структура содержит анкетные данные студентов (ФИО, группа, номер зачетной книжки, дата рождения). Требуется определить самых старших студентов в каждой группе. Список таких студентов вывести в выходной файл.
В API Windows пула потоков, как такового, нет. Для создания пула рабочих потоков можно вручную создавать массив потоков вызовом функции CreateThread, а затем ожидать окончания их выполнения вызовом функции WaitForMultipleObjects(Ex) или MsgWaitForMultipleObjects(Ex). Для создания очереди асинхронных операций используется функция QueueUserAPC. В качестве сигнальных объектов могут выступать как сами процессы и потоки, так и семафоры, мьютексы, таймеры и события (табл. 3.5, п. 3.2.3). Также для организации пула потоков можно использовать порты завершения ввода/вывода (п. 3.2.3.5).
USES Windows, DOS; //Windows для потоков, DOS для времени
TYPE
Stud= record //Исследуемая структура
name :string;
gr,ID :word;
date :0..31;
month :0..12;
year :1960..2000;
end;
FuncFindDB=record //Здесь данные для создания потока
first,last :integer;
ID :LongWord;
end;
link =^FuncFindDB;
VAR
f :text;
Struct,List :array of Stud;
Threads :array of Handle;
ThreadIDs :array of FuncFindDB;
Man,Man2 :Stud;
Sem :Handle;
FUNCTION Find (p:pointer):ptrint; //Ф-ция, используемая потоками
var a :integer;
Man,Man2 :Stud;
UD :FuncFindDB;
begin
UD:=link(p)^;
Man:=Struct[UD.first];
a:=UD.first;
repeat //цикл от first до last
Man2:=Struct[a];
if Man2.gr>Man.gr then //если перешли на другую группу
begin //берем для сравнения первого ее члена
Man:=Struct[a]; continue;
end;
Compare (Man,Man2);
if List[Man.gr].gr<>0 then //возможно какой то поток уже сделал запись
begin //по исследуемой группе
Man2:=List[Man.gr]; Compare (Man,Man2); //значит сравниваем то что есть, с тем
end; //что нашел другой поток
WaitForSingleObject(Sem, INFINITE);
List[Man.gr]:=Man; //делаем запись, ограждая "критическую секцию"
ReleaseSemaphore(Sem, 1, nil);
inc (a);
sleep (PT); //задержка, предложенная в задании
until (a>UD.last);
Find:=0;
end;
BEGIN
repeat write ('Введите количество студентов ');
readln (N); until N>0;
SetLength(Struct,N+1);
repeat write ('Введите количество групп ');
readln (Ngr); until (Ngr>0);
SetLength(List,Ngr+1);
repeat write ('Введите количество потоков ');
readln (M); until (M>=0);
if M=0 then
begin
writeln ('Кол-во потоков будет равняться кол-ву ядер (',GetCore,')');
M:=GetCore;
end;
SetLength (Threads,M);
SetLength (ThreadIDs,M);
{Далее рандомно заполняем структуру Stud и сортируем ее пузырьком по группе}
Sem := CreateSemaphore(nil, 1, 1, nil);
UD.first:=1; UD.last:=N; //подготавливаем и
Find (@UD); //запускаем однопоточное вычисление
MaxD:=1;
start:=time;
for x:=0 to M-1 do //Запускаем массив потоков
begin
ThreadIDs[x].first:=MaxD;
if x=M-1 then ThreadIDs[x].last:=N else ThreadIDs[x].last:=(N div M)*(x+1);
MaxD:=ThreadIDs[x].last+1;
Threads[x]:=BeginThread(nil, 0, @Find, @ThreadIDs[x], 0, ThreadIDs[x].ID);
end;
for y:=0 to M-1 do WaitForSingleObject(Threads[y], INFINITE);//Ждем потоки
//WaitForMultipleObjects(M, @Threads, TRUE, INFINITE ); //не работает, компилируется, но не ждет потоки
for y:=0 to M-1 do CloseHandle(Threads[y]); //Закрываем потоки
CloseHandle(Sem);
END.
uses Windows;
type
PThreadInfo = ^TThreadInfo;
TThreadInfo = record
Id: LongWord;
N: integer;
{ ... }
end;
PThreadArray = ^TThreadArray;
TThreadArray = array [0..MAXIMUM_WAIT_OBJECTS-1] of TThreadInfo;
var
Count: integer;
Sem: Handle;
function Work(p: Pointer): PtrInt;
var
i: Integer;
t: PThreadInfo;
begin
t := PThreadInfo(p);
for i:=1 to 10 do begin
WriteLn('Thread #', t^.N, ': ', i, '/', 10);
WaitForSingleObject(Sem, INFINITE);
Count := Count + 1;
ReleaseSemaphore(Sem, 1, nil);
Sleep(500);
end;
Work := 0;
end;
var
ThreadH: PWOHandleArray;
Threads: PThreadArray;
i: Integer;
m: Integer;
begin
m := 3;
WriteLn('Start threads...');
Sem := CreateSemaphore(nil, 1, 1, nil);
GetMem(ThreadH, m*SizeOf(Handle));
GetMem(Threads, m*SizeOf(TThreadInfo));
for i:=0 to m-1 do begin
Threads^[i].N := i+1;
ThreadH^[i] := BeginThread(nil, 0, @Work, @Threads^[i], 0, Threads^[i].ID);
end;
WaitForMultipleObjects(m, ThreadH, TRUE, INFINITE);
for i:=0 to m-1 do CloseHandle(ThreadH^[i]);
CloseHandle(Sem);
FreeMem(ThreadH, m*SizeOf(Handle));
FreeMem(Threads, m*SizeOf(TThreadInfo));
WriteLn('Done! count = ', Count);
end.
Единственное, что мне не нравится - это то, что в этом примере пришлось отключить проверку на выход за границы массива ($R-).
Но данная проверка, если не считать работы с динамическим массивом, может быть полезна для обнаружения ошибок в коде. Конечно, если заранее знать, какого максимального размера может быть массив, эту проблему можно решить. Но ведь мы это знаем? Если посмотреть, как объявлен тип PWOHandleArray, то увидим следующее:
type
TWOHandleArray = array[0..MAXIMUM_WAIT_OBJECTS - 1] of THandle;
PWOHandleArray = ^TWOHandleArray;
Т.е. потоков не может быть больше, чем указано в константе MAXIMUM_WAIT_OBJECTS.
for y:=0 to M-1 do WaitForSingleObject(Threads[y], INFINITE);//Ждем потоки
uses windows,crt;
TYPE
DBThreads= record
ID :Cardinal;
{Запись используем для передачи каких то параметров
в нашу ф-цию}
end;
function QueueUserAPC(pfnAPC: Pointer; hThread: HANDLE; dwData: ULONG_PTR): DWORD; stdcall; external 'Kernel32.dll';
function ThreadFunction(lpParameter: Pointer): Cardinal; stdcall;
begin
while True do
SleepEx(INFINITE, True);
Exit(0);
end;
procedure PrintInteger(dwParam: ULONG_PTR); stdcall;
begin
Writeln('Integer: ', dwParam);
Sleep(500); // Изображаем бурную деятельность
end;
var
i,n :word;
Threads :array of HANDLE;
ThreadIDs :array of DBThreads;
begin
write('Skoka potokov budet v pule? ');
readln(n);
SetLength(Threads,n);
SetLength(ThreadIDs,n);
for i:=0 to n-1 do
Threads[i] := CreateThread(nil, 0, @ThreadFunction, nil, 0, ThreadIDs[i].ID);
repeat until keypressed;
for i:=0 to n-1 do
QueueUserAPC(@PrintInteger, Threads[i], i*111);
Sleep(2000);
for i:=0 to n-1 do WaitForSingleObject(Threads[i], INFINITE);
// WaitForMultipleObjects(n, @Threads, TRUE, INFINITE );
for i:=0 to n-1 do TerminateThread (Threads[i],0);
readln
end.
uses windows,crt;
TYPE
DBThreads= record
ID :Cardinal;
{Запись используем для передачи каких то параметров
в нашу ф-цию}
end;
var
i,n :word;
Threads :array of HANDLE;
ThreadIDs :array of DBThreads;
Sem :HANDLE;
function QueueUserAPC(pfnAPC: Pointer; hThread: HANDLE; dwData: ULONG_PTR): DWORD; stdcall; external 'Kernel32.dll';
function ThreadFunction(lpParameter: Pointer): Cardinal; stdcall;
begin
while True do
SleepEx(INFINITE, True);
Exit(0);
end;
procedure PrintInteger(dwParam: ULONG_PTR); stdcall;
begin
WaitForSingleObject(Sem, INFINITE);
Writeln('Integer: ', dwParam);
ReleaseSemaphore(Sem, 1, nil);
Sleep(500); // Изображаем бурную деятельность
end;
begin
write('Skoka potokov budet v pule? ');
readln(n);
SetLength(Threads,n);
SetLength(ThreadIDs,n);
for i:=0 to n-1 do
Threads[i] := CreateThread(nil, 0, @ThreadFunction, nil, 0, ThreadIDs[i].ID);
// repeat until keypressed;
Sem := CreateSemaphore(nil, 1, 1, nil);
for i:=0 to n-1 do
QueueUserAPC(@PrintInteger, Threads[i], i*111);
Sleep(2000);
for i:=0 to n-1 do WaitForSingleObject(Threads[i], INFINITE);
// WaitForMultipleObjects(n, @Threads, TRUE, INFINITE );
for i:=0 to n-1 do TerminateThread (Threads[i],0);
// for i:=0 to n-1 do CloseHandle(Threads[i]);
readln
end.
stesl писал(а):Skoka potokov budet v pule?
{$mode objfpc} {$h+} {$codepage UTF8}
uses
Windows, SysUtils;
function QueueUserWorkItem(func: LPTHREAD_START_ROUTINE; Context: pointer; Flags: ULONG): BOOL; stdcall; external kernel32;
var
runningTasks: integer; // = 0
allTasksCompleted: PRTLEvent;
i,iCount:integer;
procedure PrepareTaskQueuing;
begin
if InterlockedIncrement(runningTasks) = 1 then RTLEventResetEvent(allTasksCompleted);
end;
procedure NoteTaskCompleted;
begin
if InterlockedDecrement(runningTasks) = 0 then RTLEventSetEvent(allTasksCompleted);
end;
procedure TaskPen(param: pointer); stdcall;
begin
inc(iCount);
writeln('Художник в потоке №' + IntToStr(GetCurrentThreadID) + ' - не знает что такео симафор. Может лучше у программистов спросим? Художники готовят кисточку.');
Sleep(random(3333));
writeln('Кисточка готова. Но мы в потоке №' + IntToStr(GetCurrentThreadID) + ' - не знаем что такое симафор. Без симафора невозможно нарисовать картину. Что такое симафор?');
NoteTaskCompleted;
end;
procedure TaskColors(param: pointer); stdcall;
begin
inc(iCount);
writeln('Художник в потоке №' + IntToStr(GetCurrentThreadID) + ' - не знает что такео симафор. Художники готовят краски. Для чего мы это делаем?');
Sleep(random(3333));
PrepareTaskQueuing;
writeln('Требование красок (из потока №' + IntToStr(GetCurrentThreadID) + '). Ты знаешь что такое симафор?');
QueueUserWorkItem(@TaskPen, nil, 0);
writeln('Краски готовы. Что такое симафор? (поток №' + IntToStr(GetCurrentThreadID) + ')');
NoteTaskCompleted;
end;
procedure TaskArt(param: pointer); stdcall;
begin
inc(iCount);
writeln('Художник в потоке №' + IntToStr(GetCurrentThreadID) + ' - не знает что такео симафор. Художники готовят холст. Зачем мы это делаем?');
PrepareTaskQueuing;
writeln('Требование красок (из потока №' + IntToStr(GetCurrentThreadID) + '). Ты знаешь что такое симафор?');
QueueUserWorkItem(@TaskColors, nil, 0);
Sleep(random(3333));
writeln('Холст готов. Что такое симафор? (поток №' + IntToStr(GetCurrentThreadID) + ')');
NoteTaskCompleted;
end;
procedure Tasks(param: pointer); stdcall;
begin
inc(iCount);
writeln('Попытка выяснить что такое симафор №' + IntToStr(GetCurrentThreadID));
PrepareTaskQueuing;
writeln('Вопрос из потока №' + IntToStr(GetCurrentThreadID) + ': Ты знаешь что такое симафор?');
QueueUserWorkItem(@TaskArt, nil, 0);
Sleep(random(3333));
writeln('выполнено (поток №' + IntToStr(GetCurrentThreadID) + ')');
NoteTaskCompleted;
end;
begin
allTasksCompleted := RTLEventCreate;
for i:=1 to 111 do begin
PrepareTaskQueuing;
writeln('Добавление задачи № '+intToStr(i));
QueueUserWorkItem(@Tasks, nil, 0);
end;
writeln(LineEnding + 'Вопрос задан '+intToStr(iCount)+' художникам. До переезда осталось 777 метров.');
RTLEventWaitFor(allTasksCompleted);
RTLEventDestroy(allTasksCompleted);
writeln(LineEnding + 'Вопрос задан '+intToStr(iCount)+' художникам. До переезда осталось 666 метров.');
readln;
end.
stesl писал(а):Сижу тут строчу... Все москвичи похоже?
vitaly_l писал(а):Что такое симафор?
stesl писал(а):Или я где то опечатался?
Семафо́р (англ. semaphore) — объект, ограничивающий количество потоков, которые могут войти в заданный участок кода. Определение введено Эдсгером Дейкстрой в 1962 или 1963 году.
vitaly_l писал(а):Вот это ==> ReleaseSemaphore(Sem, 1, nil); <== что такое? Что оно делает?
WaitForSingleObject(Sem, INFINITE);
stesl писал(а):Без этого ограничения, моя МЕГАпрога выводит на печать обрывки.
stesl писал(а):как отобразить кириллицу одновременно и в FPC и в блокнотике одновременно адекватно пока не навострячился
vitaly_l писал(а):Вот такую директиву укажите вверху страницы: {$codepage UTF8} и будет печатать на русском.
stesl писал(а):Сваливается с ошибкой, кстати, код.
stesl писал(а):Может есть решение другой МЕГАзадачи, как сделать окошко FPC побольше?
vitaly_l писал(а):Да. есть. Установите Лазарус, и куча проблем исчезнет.
vitaly_l писал(а):И поэтому "мой" код у Вас не работает. Соответственно и код runewalsh тоже
QueueUserWorkItem
Вернуться в Обучение Free Pascal
Сейчас этот форум просматривают: нет зарегистрированных пользователей и гости: 5