Simone: Anzahl Max-Fibers bei vorgegebener Hardware

Beitrag lesen

Ja, klasse Antwort

ich habe das nochmal gecheckt, das funktioniert so nicht ohne einen Scheduler der die Fibers verwaltet. Hm, ich schaue mal, wo der Weg hinführt.

Fiber-Code kein Fiber::suspend()

na dann wirds eben synchron innerhalb des Fibers ausgeführt ;O) und der Array mit den *.jsons ggf. aufgesplittet

Jeder Fiber wird sofort gestartet ($fiber->start();) und führt seine Aufgabe komplett durch. Da wir Fiber::suspend() nicht verwenden, gibt es keine Notwendigkeit, Fiber::resume() aufzurufen, außer in einem Wiederholungszyklus, falls ein Fiber noch läuft.

Die Funktion überprüft die Anzahl der aktiven Fibers gegen eine maximale Grenze ($maxFibers), um die Systemressourcen zu schonen. Wenn waitForAll true ist, wartet die Funktion in einer Schleife, bis alle Fibers ihre Arbeit abgeschlossen haben. Dies wird durch Überprüfen des Status jedes Fibers erreicht.

Fibers, die ihre Arbeit abgeschlossen haben, werden aus dem Array der aktiven Fibers entfernt, wodurch verhindert wird, dass beendete Fibers unnötig weiterverarbeitet werden.

function manageFibersMysql(&$activeFibers, $conn_ln, $totalFiles, $startTime, $maxFibers, &$idsToProcess, &$nichtGefundenCount, $waitForAll = false) {
    foreach ($idsToProcess as $index => $filePath) {
        // Starte einen neuen Fiber nur, wenn die maximale Anzahl von Fibers nicht überschritten ist
        if (count($activeFibers) < $maxFibers) {
            $fiber = new Fiber(function () use ($filePath, $conn_ln, $totalFiles, $startTime, $maxFibers, &$nichtGefundenCount) {
                $content = readAndParseJsonMysql($filePath);
                if ($content) {
                    $result = processStelleJsonContent($content, $conn_ln, $totalFiles, $startTime, $maxFibers);
                    if ($result['nichtGefunden']) {
                        $nichtGefundenCount++;
                    }
                }
            });
            $fiber->start();
            $activeFibers[] = $fiber;
            unset($idsToProcess[$index]); // Entferne die verarbeitete ID aus der Liste
        }
    }

    if ($waitForAll) {
        // Warte, bis alle Fibers ihre Arbeit beendet haben
        while (!empty($activeFibers)) {
            foreach ($activeFibers as $index => $fiber) {
                if ($fiber->isRunning()) {
                    // Wenn Fiber noch läuft, lass ihn weiterlaufen
                    continue;
                } elseif ($fiber->isTerminated()) {
                    // Wenn Fiber beendet ist, entferne ihn aus der Liste
                    unset($activeFibers[$index]);
                }
            }
        }
    }
    // Wenn nicht auf alle gewartet werden soll, wird der Zustand der Fibers nicht geprüft
    // und die Steuerung kehrt sofort zurück.
}

Ich laufe die Funktion jetzt mal damit durch

$globalAsyncQueries = [];
$queryCounter = 0;

function addAsyncQuery($db, $sql) {
    global $globalAsyncQueries, $queryCounter;
    $queryID = 'query_' . ++$queryCounter . '_' . microtime(true);
    $globalAsyncQueries[$queryID] = ['db' => $db, 'sql' => $sql, 'id' => $queryID];
}


function executeAllAsyncQueries($ende) {
    global $globalAsyncQueries;

    if (empty($globalAsyncQueries)) {
        //echo "Keine Anfragen vorhanden in globalAsyncQueries.\n";
        return false;
    }
    
    if (count($globalAsyncQueries) < 100 && !$ende) {
        //echo "Es müssen mindestens 100 Anfragen vorhanden sein, oder 'ende' muss true sein. Aktuelle Anzahl: " . count($globalAsyncQueries) . "\n";
        return false;
    }

    // Neue Logik: Entferne die Notwendigkeit einer separaten DB-Instanz pro Anfrage.
    // Stattdessen wird $db innerhalb der Schleife aus dem globalen Array bezogen.
    foreach ($globalAsyncQueries as $queryID => $query) {
        $sql = $query['sql'];

        // Erstelle hier eine neue Datenbankverbindung, falls erforderlich
        $localDb = $query['db']; // Angenommen, dies ist bereits eine gültige Verbindung

        if (empty($sql)) {
            echo "SQL-Anfrage ist leer (ID: $queryID).\n";
            continue;
        }
        
        if (!$localDb->query($sql, MYSQLI_ASYNC)) {
            echo "Fehler beim Senden der asynchronen Anfrage (ID: $queryID): " . mysqli_error($localDb) . "\n";
            continue;
        }

        // Warte, bis die Anfrage fertig ist, und verarbeite das Ergebnis.
        // Dies ist eine vereinfachte Darstellung, die angepasst werden muss.
        do {
            $links = $errors = $reject = [$localDb];
            mysqli_poll($links, $errors, $reject, 0, 50000);
        } while (empty($links));

        foreach ($links as $link) {
            if ($result = $link->reap_async_query()) {
                if (is_object($result)) {
                    mysqli_free_result($result);
                }
                // Entferne die abgearbeitete Anfrage.

				// Debug-Funktion aufrufen, um die Operation zu überprüfen
				//  debugDatabaseOperations($link, $queryID);

                unset($globalAsyncQueries[$queryID]);
            }
        }
    }
    
    //echo "Alle Anfragen wurden erfolgreich abgeschlossen.\n";
    return true;
}




function debugDatabaseOperations($localDb, $queryID) {
    // Überprüfe die Anzahl der betroffenen Zeilen
    $affectedRows = $localDb->affected_rows;

    // Überprüfe auf Fehler
    if ($localDb->errno) {
        echo "Fehler bei der Ausführung der Anfrage (ID: $queryID): " . $localDb->error . "\n";
    } elseif ($affectedRows === 0) {
        echo "Keine Zeilen betroffen oder eingefügt für Anfrage (ID: $queryID). Überprüfe die Logik deiner Anfrage.\n";
    } else {
        echo "Anfrage (ID: $queryID) erfolgreich ausgeführt. Betroffene/eingefügte Zeilen: $affectedRows\n";
    }

    // Zusätzliche Informationen
    if ($affectedRows > 0) {
        // Hier könnten weitere Debug-Informationen eingefügt werden, 
        // wie z.B. die Ausgabe der tatsächlich eingefügten oder aktualisierten Daten.
    }
}