Übung 4: Parallel Lua, Map-Reduce Tutorial (Stefan Bosse) [8.6.2025] |
In dieser Übung findet eine erste Einführung in die Programmierung von parallelen Systemen nach dem Map & Reduce Modell statt.
# lvm myprog.lua
# lvm weblvm.lua
Service thread 2 started.
[2@1592562880] HTTP server (2) listening to http://0.0.0.0:4610
Monitor thread 4 started.
[4@1592562880] HTTP server (4) listening to http://0.0.0.0:4611
Es wird das aes.lua
Modul benötigt. Ab lvm 1.1.16 integriert, ansonsten muss die Datei sich im gleichen Verzeichnis befinden wie weblvm.lua
.
Falls es einen segfault Fehler bei der Ausührung mittels lvm weblvm.lua
gibteinfach den Code in eine Datei exportieren (erster Button rechts im Codeeditor) und direkt mit lvm ausführen.
Das lvm Modul parallel kann verwendet werden um selbstsynchronisierende parallele Datenverarbeitung von partitionierbaren Datensätzen nach dem Map & Reduce (MaR) Prinzip zu ermöglichen, also einer Datenverarbeitungskette (Pipeline). Im Gegensatz zum CSP Modell gibt es keine explizite Interprozesskommunikation zwischen einzelnen parallel ausgeführten Berechnungsprozessen. Daher kann die MaR Architektur eher als Datenpfadparallelität verstanden werden.
Folgende Bedingungen müssen erfüllt sein:
Das lvm MaR Modul definiert eine parallele Berechnung als Sequenz von synchronisierten Operationen (horizontal Verabeitungskette). Dabei wird eine folgende Operation erst dann ausgeführt wenn die vorherige terminiert ist.
Es gibt folgende Operationen:
time() startet oder stoppt einen Zeitmesser;
map(worker,init) partitioniert und verteilt die Eingabedatenmenge auf options.workers parallele Prozesse die jeweils die worker Funktion ausführen, das Ergebnis besitzt die gleiche Dimensionalität wie die Eingabedaten, aber u.U. eine andere Typsignatur;
function worker(data,index,id) return f(data) end
function worker(set,id) return set:map(f) end
data = {d1,d2,d3,..,dn}
.. mapchunks=false // default ..
local function worker(data1,index,id)
... // data1=d1 d2 d3 ..
end
.. mapchunks=true // not default ..
local function worker(dataChunk,id)
// dataChunk = {d1,d2,..} {d5,d6,.. }
for i=1,#dataChunk do
local data1=dataChunk[i]
...
end
end
apply(success,fail) wendet (sequenziell, Master Prozess) eine Funktion auf den gesamten Eingabedatenvektor (Ausgabe der vorherigen ode rletzten Berechnung) an. Trat bisher kein Fehler auf wird die Funktion success aufgerufen, ansonsten fail. Kann benutzt werden um z.B. die gesamten Daten auszugeben oder zu speichern.
reduce(fun,doparallel) wendet eine Reduktionsfunktion fun(a,b) : (a,b) → c auf den Datenvektor an und liefert einen reduzierten Vektor oder meistens einen Skalarwert. Standard ist sequenzielle Ausführung. Mit doparallel auf true gesetzt findet eine semi-parallele Verarbeitung statt (in Ebenen, siehe Abbildung unten).
eval(fun) wendet eine Funktion auf den gesamten Datensatz an.
done() wird zum Finalisierung einer MAR Verarbeitungskette aufgerufen und gibt belegte Ressourcen frei.
Aufgabe 1. Was sollte bei der Partitionierung beachtet werden (automatisch oder manuell durch den Nutzer)?
┌────┐
│ P0 │ (map)
└──┬─┘
A│
┌───────┬───────┼───────┬───────┐
A= {a1│ a2│ a3│ a4│ a5│ ...} ◀──┐
┌──┴─┐ ┌──┴─┐ ┌──┴─┐ ┌──┴─┐ ┌──┴─┐ │
│ P1 │ │ P2 │ │ P3 │ │ P4 │ │ P5 │ ... │
└──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ │
B= {b1│ b2│ b3│ b4│ b5│ ...} ────┘
└───────┴───────┼───────┴───────┘
B│
┌──┴─┐
│ P0 │ (reduce)
└──┬─┘
c│
▼
Result
Paralleles Map, sequenzielles Reduce
┌────┐
│ P0 │ (map)
└──┬─┘
A│
┌───────┬───────┼───────┬───────┐
A= {a1│ a2│ a3│ a4│ a5│ ...} ◀──┐
┌──┴─┐ ┌──┴─┐ ┌──┴─┐ ┌──┴─┐ ┌──┴─┐ │
│ P1 │ │ P2 │ │ P3 │ │ P4 │ │ P5 │ ... │
└──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ │
B= {b1│ b2│ b3│ b4│ b5│ ...} ────┘
└───────┴───────┼───────┴───────┘
B│
┌───────────────┴────────────────────┐
│ │
├────┐ ┌────┐ ┌────┐ ┌────┤
│ P1 │ │ P2 │ │ P3 │ │ P4 │
└──┬─┘ └─┬──┘ └──┬─┘ └─┬──┘
│ │ │ │
└────┬────┘ └───┬────┘
│ │
┌──┴─┐ ┌─┴──┐
│ P1 │ │ P2 │ (reduce)
└──┬─┘ └─┬──┘
│ │
└──────────┬─────────┘
│
┌──┴─┐
│ P1 │
├────┘
c│
▼
Result
Paralleles Map, semi-paralleles Reduce
A und B sind Vektoren oder Matrizen, und c ein reduzierter niderigdimensionalerer Wert.
Die Anzahl der Partitionen wird den Worker Prozessen angepasst.
Bei ungradzahliger Verteilung können die EIngabedatenpartitonen unterschiedlich groß sein.
Erzeugung einer Parallelen Pipeline:
Parallel=require 'parallel';
local options = {workers = 4, scheduler = "circular", verbose=0, remap=true, mapchunks=false}
local p = Parallel:new(data,options)
sequential
, circular
, random
);
▸
◼
|
✗
↻
≡
|
Aufgabe 2. Implementiere die Berechnung der Fibonacci Zahlen für ein gegebenes n jeweils als Rekursion (fib1) und als Schleifeniteration (fib2).
PGI+bG9jYWw8L2I+IDxiPmZ1bmN0aW9uPC9iPiBmaWIxKG4pIAogIDxiPmlmPC9iPiBuPjEgdGhlbiA8Yj5yZXR1cm48L2I+IGZpYjEobi0xKStmaWIxKG4tMikgPGI+ZWxzZTwvYj4gPGI+cmV0dXJuPC9iPiAxIDxiPmVuZDwvYj4KPGI+ZW5kPC9iPgo8Yj5sb2NhbDwvYj4gPGI+ZnVuY3Rpb248L2I+IGZpYjIobikgCiAgPGI+bG9jYWw8L2I+IGEsYixjPTEsMQogIGM9MQogIDxiPmZvcjwvYj4gaSA9IDIsbiA8Yj5kbzwvYj4KICAgIGM9YStiCiAgICBhPWIKICAgIGI9YwogIDxiPmVuZDwvYj4KICA8Yj5yZXR1cm48L2I+IGMKPGI+ZW5kPC9iPgo=
Frage 3. Welcher Speed-up wird für die Konfigurationen workers=1,2,3,4 erzielt? Vergleiche die Rekursions- und Schleifenmethode. Bei der Schleifenmethode muss nrun auf einen Wert > 1 gesetzt werden der pro Prozess wenigstens eine Laufzeit von 100ms erzielt. Probiere verschiedene Scheduler der Datenpartitionierung (circular
, sequential
, random
), wo gibt es Unterschiede?
Frage 4. Wieso ist das Ergebnis enttäuschend? Wo lieht hier das Problem bei der Parallelisierung (bzgl. Beschleunigung)? Hinweis: Betrachte die Rechenkomplexität (in Abhängigkeit des Startwertes x)
Frage 5. Wie könnte eine gleichmäßige(re) Auslastung der Prozesse ohne a-priori Kenntnis der Berechnung erfolgen?
Im folgenden Beispiel soll eine Verschlüsselung parallel mit MaR durchgeführt werden die typisch gesicherten Anwednungen ist.
Es wird das Modul AES.lua benötigt (Datei muss sich im gleichen Verzeichnis wie weblvm befinden).
Eingabedaten: Eine Menge von Zeichenketten (Texten), randomisiert in worker1 erzeugt;
▸
◼
|
✗
↻
≡
|
Aufgabe 6. Implementiere die beiden Worker Funktionen. Worker 1 erzeugt einen String der Länge STRLENGTH mit randomisiert ausgewählten Zeichen aus der charset Tabelle. Dazu kann der charset[math.random(1, #charset)]
Ausdruck verwendet werden. Zeichenketten werden mittels des ..
Infixoperators verknüpft. Worker 2 soll nun die Verschlüsselung der Strings der jeweiligen Partition mittels AES durchführen. Dazu muss AES.ECB_256(AES.encrypt, key, str)
ausgeführt werden. Schließlich sollen mittels der concatenate Funktion alle verschlüsselten Strings zu einem zusammengefügt werden (Funktion muss auch noch implementiert werden).
PGI+bG9jYWw8L2I+IDxiPmZ1bmN0aW9uPC9iPiBzdHJybmQobGVuZ3RoKQogIDxiPmxvY2FsPC9iPiBzPScnCiAgPGI+Zm9yPC9iPiBpPTEsbGVuZ3RoIDxiPmRvPC9iPgogICAgcyA9IHMgLi4gY2hhcnNldFttYXRoLnJhbmRvbSgxLCAjY2hhcnNldCldCiAgPGI+ZW5kPC9iPgogIDxiPnJldHVybjwvYj4gcwo8Yj5lbmQ8L2I+CgotLSBDcmVhdGUgcmFuZG9tIHN0cmluZ3MKPGI+bG9jYWw8L2I+IDxiPmZ1bmN0aW9uPC9iPiB3b3JrZXIxKGRhdGEsaW5kZXgsaWQpCiAgLS0gQ3JlYXRlIHJhbmRvbSBzdHJpbmcKICA8Yj5yZXR1cm48L2I+IHN0cnJuZChTVFJMRU5HVEgpOwo8Yj5lbmQ8L2I+CgotLSBFbmNyeXB0IGFsbCBzdHJpbmdzCjxiPmxvY2FsPC9iPiA8Yj5mdW5jdGlvbjwvYj4gd29ya2VyMihkYXRhLGluZGV4LGlkKQogIC0tIENyZWF0ZSBlbmNyeXB0ZWQgc3RyaW5ncwogIDxiPnJldHVybjwvYj4gQUVTLkVDQl8yNTYoQUVTLmVuY3J5cHQsIGtleSwgZGF0YSkKPGI+ZW5kPC9iPgo8Yj5sb2NhbDwvYj4gPGI+ZnVuY3Rpb248L2I+IGNvbmNhdGVuYXRlKHMxLHMyKQogIDxiPnJldHVybjwvYj4gczEuLnMyCjxiPmVuZDwvYj4K
Frage 7. Welcher Speed-up wird für workers=1,2,3,4 erzielt?
Frage 8. Wieso ist das Ergebnis hier besser (im Vergleich zu Beispiel 1)?
Im folgenden Beispiel soll eine Matrixberechnung parallel mit MaR durchgeführt werden die typisch in der Bildverarbeitung ist
Eingabedaten: Matrizen (indirekt, shared environment), Matrizensegmente (direkt, Bounding Box)
Hier werden nicht die eigentlichen Eingabedaten, die Matrix/die Matrizen, partitioniert, sondern eine Liste von Koordinaten kleiner Segmente der Matrix. Wichtig: Da die Matrizen von den Worker Prozessen geteilt werden, diese OO sind, aber bei der Serialisierung ihre "Methodentabelle" mit z.B. read und write verlieren, müssen diese im Worker Prozess noch einmal angehängt werden (siehe folgendes Beispiel). Die Segmentgrößen und Segmentanzahl sind unabhängig von den Anzahl der Prozesse!
▸
◼
|
✗
↻
≡
|
Aufgabe 9. Im obigen Beispiel ist die Segmenttabelle data für w=h=10 erstellt worden. Erstelle die Koordinatenboxen der Segmente mittels einer Schleifeniteration in Abhängigkeit von w,h, sw und sh, so dass die gesamte Matrix segmentiert wurde (keine Überlappung der Segmente!).
Frage 10. Welcher Speed-up wird für workers=1,2,3,4 erzielt? Gibt es Veränderungen in Abhängigkeit der Segmentgröße und dem Scheduleralgortihmus?
Aufgabe 11. jetzt geht es los. Es soll eine Matrixmultiplikation (Punktprodukt) zweier Matrizen a und b implementiert werden. Die Ergebnismatrix ist c. Auch hier vereinfachen wir das Problem indem geteilte Matrizen verwendet werden. Die zu partitionierenden Daten für die Pipeline sind hier wieder die Segmentkoordinaten.
Eine Matrixmultiplikation (Punktprodukt) verknüpft immer eine zeile von a mit einer Spalte von b; Multiplikation der Elemente und Summation aller Produkte (Produktsuemme).
Das heißt, der Eintrag ci,j des Produkts wird erhalten, indem die Einträge der i-ten Zeile von A und der j-ten Spalte von B Term für Term multipliziert und diese n Produkte summiert werden. Mit anderen Worten, ci,j ist das Skalarprodukt der i-ten Zeile von A und der j-ten Spalte von B.
▸
◼
|
✗
↻
≡
|
Frage 12. Welcher Speed-up wird für workers=1,2,3,4 erzielt? Gibt es Veränderungen in Abhängigkeit der Segmentgröße und dem Scheduleralgortihmus?