Übung 4: Parallel Lua, Map-Reduce Tutorial (Stefan Bosse) [8.6.2025]

Übung 4: Parallel Lua, Map-Reduce Tutorial

In dieser Übung findet eine erste Einführung in die Programmierung von parallelen Systemen nach dem Map & Reduce Modell statt.

Vorbereitung und Verwendung

Es wird das aes.lua Modul benötigt. Ab lvm 1.1.16 integriert, ansonsten muss die Datei sich im gleichen Verzeichnis befinden wie weblvm.lua.

Falls es einen segfault Fehler bei der Ausührung mittels lvm weblvm.lua gibteinfach den Code in eine Datei exportieren (erster Button rechts im Codeeditor) und direkt mit lvm ausführen.

Partitionierung

Das lvm Modul parallel kann verwendet werden um selbstsynchronisierende parallele Datenverarbeitung von partitionierbaren Datensätzen nach dem Map & Reduce (MaR) Prinzip zu ermöglichen, also einer Datenverarbeitungskette (Pipeline). Im Gegensatz zum CSP Modell gibt es keine explizite Interprozesskommunikation zwischen einzelnen parallel ausgeführten Berechnungsprozessen. Daher kann die MaR Architektur eher als Datenpfadparallelität verstanden werden.

Folgende Bedingungen müssen erfüllt sein:

  1. Ein Datensatz D kann in unabhängige Teilsätze {d}i zerlegen
  2. Es gibt eine Berechnungsfunktion f(x):xy die auf Fraktionen oder einzelne Elemente des Datensatzes angewendet werden kann
  3. Es gibt i.A. eine Kette von Berechnungsfunktionen f(g(h(...(x)))) : xy
  4. Die Teilberechnungen sind vertikal unabhängig, horizonttal strikt sequenziell.
  5. Es gibt (implizit) keinen geteilten Speicher, d.h. alle Ein- und Ausgabedaten werden kopiert! Das ermöglicht aber auch die verteilte Anwendung (ohne kommunikationsintensiven verteilten geteilten Speicher).
  6. Jedoch können aus dem Csp Modul Shared Memory Matrizen Matrix an die Arbeitsprozesse weitergegeben werden (als Referenz, nicht Kopie). Veränderung dieser Matrizen in einem Prozess ist in den anderen Prozessen daher sichtbar.

Das lvm MaR Modul definiert eine parallele Berechnung als Sequenz von synchronisierten Operationen (horizontal Verabeitungskette). Dabei wird eine folgende Operation erst dann ausgeführt wenn die vorherige terminiert ist.

Es gibt folgende Operationen:

data = {d1,d2,d3,..,dn}

.. mapchunks=false // default ..

local function worker(data1,index,id) 
  ... // data1=d1 d2 d3 ..

end

.. mapchunks=true // not default ..

local function worker(dataChunk,id) 
  // dataChunk = {d1,d2,..} {d5,d6,.. }

  for i=1,#dataChunk do
    local data1=dataChunk[i]
    ...
  end
end

Aufgabe 1. Was sollte bei der Partitionierung beachtet werden (automatisch oder manuell durch den Nutzer)?


Prozessfluss

                     ┌────┐                            
                     │ P0 │ (map)                      
                     └──┬─┘                            
                       A│                              
        ┌───────┬───────┼───────┬───────┐              
  A= {a1│     a2│     a3│     a4│     a5│    ...}  ◀──┐
     ┌──┴─┐  ┌──┴─┐  ┌──┴─┐  ┌──┴─┐  ┌──┴─┐           │
     │ P1 │  │ P2 │  │ P3 │  │ P4 │  │ P5 │  ...      │
     └──┬─┘  └──┬─┘  └──┬─┘  └──┬─┘  └──┬─┘           │
  B= {b1│     b2│     b3│     b4│     b5│    ...} ────┘
        └───────┴───────┼───────┴───────┘              
                       B│                              
                     ┌──┴─┐                            
                     │ P0 │ (reduce)                   
                     └──┬─┘                            
                       c│                              
                        ▼                              
                      Result                           

Paralleles Map, sequenzielles Reduce

                     ┌────┐                              
                     │ P0 │ (map)                        
                     └──┬─┘                              
                       A│                                
        ┌───────┬───────┼───────┬───────┐                
  A= {a1│     a2│     a3│     a4│     a5│    ...}  ◀──┐  
     ┌──┴─┐  ┌──┴─┐  ┌──┴─┐  ┌──┴─┐  ┌──┴─┐           │  
     │ P1 │  │ P2 │  │ P3 │  │ P4 │  │ P5 │  ...      │  
     └──┬─┘  └──┬─┘  └──┬─┘  └──┬─┘  └──┬─┘           │  
  B= {b1│     b2│     b3│     b4│     b5│    ...} ────┘  
        └───────┴───────┼───────┴───────┘                
                       B│                                
        ┌───────────────┴────────────────────┐           
        │                                    │           
        ├────┐     ┌────┐     ┌────┐    ┌────┤           
        │ P1 │     │ P2 │     │ P3 │    │ P4 │           
        └──┬─┘     └─┬──┘     └──┬─┘    └─┬──┘           
           │         │           │        │              
           └────┬────┘           └───┬────┘              
                │                    │                   
             ┌──┴─┐                ┌─┴──┐                
             │ P1 │                │ P2 │   (reduce)     
             └──┬─┘                └─┬──┘                
                │                    │                   
                └──────────┬─────────┘                   
                           │                             
                        ┌──┴─┐                           
                        │ P1 │                           
                        ├────┘                           
                       c│                                
                        ▼                                
                      Result                             

Paralleles Map, semi-paralleles Reduce

Bei ungradzahliger Verteilung können die EIngabedatenpartitonen unterschiedlich groß sein.

Erzeugung einer Parallelen Pipeline:

Parallel=require 'parallel';
local options = {workers = 4, scheduler = "circular", verbose=0, remap=true, mapchunks=false}
local p = Parallel:new(data,options)

Anwendungen

A. Funktionale Rekursion versa Schleifeniteration

  1. Mittels Rekursion;
  2. Mittels Schleifeniteration.

Parallele Berechnung (1)

 ▸ 
 ◼ 
 ✗ 
 ↻ 
 ≡ 

Aufgabe 2. Implementiere die Berechnung der Fibonacci Zahlen für ein gegebenes n jeweils als Rekursion (fib1) und als Schleifeniteration (fib2).

PGI+bG9jYWw8L2I+IDxiPmZ1bmN0aW9uPC9iPiBmaWIxKG4pIAogIDxiPmlmPC9iPiBuPjEgdGhlbiA8Yj5yZXR1cm48L2I+IGZpYjEobi0xKStmaWIxKG4tMikgPGI+ZWxzZTwvYj4gPGI+cmV0dXJuPC9iPiAxIDxiPmVuZDwvYj4KPGI+ZW5kPC9iPgo8Yj5sb2NhbDwvYj4gPGI+ZnVuY3Rpb248L2I+IGZpYjIobikgCiAgPGI+bG9jYWw8L2I+IGEsYixjPTEsMQogIGM9MQogIDxiPmZvcjwvYj4gaSA9IDIsbiA8Yj5kbzwvYj4KICAgIGM9YStiCiAgICBhPWIKICAgIGI9YwogIDxiPmVuZDwvYj4KICA8Yj5yZXR1cm48L2I+IGMKPGI+ZW5kPC9iPgo=

Frage 3. Welcher Speed-up wird für die Konfigurationen workers=1,2,3,4 erzielt? Vergleiche die Rekursions- und Schleifenmethode. Bei der Schleifenmethode muss nrun auf einen Wert > 1 gesetzt werden der pro Prozess wenigstens eine Laufzeit von 100ms erzielt. Probiere verschiedene Scheduler der Datenpartitionierung (circular, sequential, random), wo gibt es Unterschiede?


Frage 4. Wieso ist das Ergebnis enttäuschend? Wo lieht hier das Problem bei der Parallelisierung (bzgl. Beschleunigung)? Hinweis: Betrachte die Rechenkomplexität (in Abhängigkeit des Startwertes x)


Frage 5. Wie könnte eine gleichmäßige(re) Auslastung der Prozesse ohne a-priori Kenntnis der Berechnung erfolgen?


B. Verschlüsselung


MaR in der Verschlüsselung

 ▸ 
 ◼ 
 ✗ 
 ↻ 
 ≡ 

Aufgabe 6. Implementiere die beiden Worker Funktionen. Worker 1 erzeugt einen String der Länge STRLENGTH mit randomisiert ausgewählten Zeichen aus der charset Tabelle. Dazu kann der charset[math.random(1, #charset)] Ausdruck verwendet werden. Zeichenketten werden mittels des .. Infixoperators verknüpft. Worker 2 soll nun die Verschlüsselung der Strings der jeweiligen Partition mittels AES durchführen. Dazu muss AES.ECB_256(AES.encrypt, key, str) ausgeführt werden. Schließlich sollen mittels der concatenate Funktion alle verschlüsselten Strings zu einem zusammengefügt werden (Funktion muss auch noch implementiert werden).

PGI+bG9jYWw8L2I+IDxiPmZ1bmN0aW9uPC9iPiBzdHJybmQobGVuZ3RoKQogIDxiPmxvY2FsPC9iPiBzPScnCiAgPGI+Zm9yPC9iPiBpPTEsbGVuZ3RoIDxiPmRvPC9iPgogICAgcyA9IHMgLi4gY2hhcnNldFttYXRoLnJhbmRvbSgxLCAjY2hhcnNldCldCiAgPGI+ZW5kPC9iPgogIDxiPnJldHVybjwvYj4gcwo8Yj5lbmQ8L2I+CgotLSBDcmVhdGUgcmFuZG9tIHN0cmluZ3MKPGI+bG9jYWw8L2I+IDxiPmZ1bmN0aW9uPC9iPiB3b3JrZXIxKGRhdGEsaW5kZXgsaWQpCiAgLS0gQ3JlYXRlIHJhbmRvbSBzdHJpbmcKICA8Yj5yZXR1cm48L2I+IHN0cnJuZChTVFJMRU5HVEgpOwo8Yj5lbmQ8L2I+CgotLSBFbmNyeXB0IGFsbCBzdHJpbmdzCjxiPmxvY2FsPC9iPiA8Yj5mdW5jdGlvbjwvYj4gd29ya2VyMihkYXRhLGluZGV4LGlkKQogIC0tIENyZWF0ZSBlbmNyeXB0ZWQgc3RyaW5ncwogIDxiPnJldHVybjwvYj4gQUVTLkVDQl8yNTYoQUVTLmVuY3J5cHQsIGtleSwgZGF0YSkKPGI+ZW5kPC9iPgo8Yj5sb2NhbDwvYj4gPGI+ZnVuY3Rpb248L2I+IGNvbmNhdGVuYXRlKHMxLHMyKQogIDxiPnJldHVybjwvYj4gczEuLnMyCjxiPmVuZDwvYj4K

Frage 7. Welcher Speed-up wird für workers=1,2,3,4 erzielt?

Frage 8. Wieso ist das Ergebnis hier besser (im Vergleich zu Beispiel 1)?


C. Bildverarbeitung und Matrixalgebra

Hier werden nicht die eigentlichen Eingabedaten, die Matrix/die Matrizen, partitioniert, sondern eine Liste von Koordinaten kleiner Segmente der Matrix. Wichtig: Da die Matrizen von den Worker Prozessen geteilt werden, diese OO sind, aber bei der Serialisierung ihre "Methodentabelle" mit z.B. read und write verlieren, müssen diese im Worker Prozess noch einmal angehängt werden (siehe folgendes Beispiel). Die Segmentgrößen und Segmentanzahl sind unabhängig von den Anzahl der Prozesse!


MaR in der Matrixalgebra, hier am einfachen Beispiel der segmentierten Mittelwertberechnung einer Matrix m, mit w Spalten, h Zeilen und einer Segmentbreite/höhe von sw,sh.

 ▸ 
 ◼ 
 ✗ 
 ↻ 
 ≡ 

Aufgabe 9. Im obigen Beispiel ist die Segmenttabelle data für w=h=10 erstellt worden. Erstelle die Koordinatenboxen der Segmente mittels einer Schleifeniteration in Abhängigkeit von w,h, sw und sh, so dass die gesamte Matrix segmentiert wurde (keine Überlappung der Segmente!).


Frage 10. Welcher Speed-up wird für workers=1,2,3,4 erzielt? Gibt es Veränderungen in Abhängigkeit der Segmentgröße und dem Scheduleralgortihmus?

Aufgabe 11. jetzt geht es los. Es soll eine Matrixmultiplikation (Punktprodukt) zweier Matrizen a und b implementiert werden. Die Ergebnismatrix ist c. Auch hier vereinfachen wir das Problem indem geteilte Matrizen verwendet werden. Die zu partitionierenden Daten für die Pipeline sind hier wieder die Segmentkoordinaten.

Eine Matrixmultiplikation (Punktprodukt) verknüpft immer eine zeile von a mit einer Spalte von b; Multiplikation der Elemente und Summation aller Produkte (Produktsuemme).

\[ {c}_{{{i},{j}}}={\sum_{{{k}={1}}}^{{{n}}}}{a}_{{{i},{k}}}{b}_{{{k},{j}}} \]

Das heißt, der Eintrag ci,j des Produkts wird erhalten, indem die Einträge der i-ten Zeile von A und der j-ten Spalte von B Term für Term multipliziert und diese n Produkte summiert werden. Mit anderen Worten, ci,j ist das Skalarprodukt der i-ten Zeile von A und der j-ten Spalte von B.


MaR in der Matrixalgebra, hier am komplexren Beispiel der matrixmultiplikation, mit w Spalten, h Zeilen und einer Segmentbreite/höhe von sw,sh.

 ▸ 
 ◼ 
 ✗ 
 ↻ 
 ≡ 

Frage 12. Welcher Speed-up wird für workers=1,2,3,4 erzielt? Gibt es Veränderungen in Abhängigkeit der Segmentgröße und dem Scheduleralgortihmus?



Hilfe



Einreichung (Assignment #2025-89346 )



Prüfen



Bewerten (Lehrer)




Created by the NoteBook Compiler Ver. 1.36.4 (c) Dr. Stefan Bosse (Sun Jun 08 2025 11:15:38 GMT+0200 (Central European Summer Time))