Tipps & Tricks

5 Powerful Python Decorators for High-Performance Data Pipelines

6 min Lesezeit
5 Powerful Python Decorators for High-Performance Data Pipelines

Datenpipelines spielen in der Datenwissenschaft und bei Projekten im Bereich des maschinellen Lernens eine entscheidende Rolle, da sie eine praktische und vielseitige Möglichkeit bieten, Datenverarbeitungsabläufe zu automatisieren. Oft kann jedoch der Code zusätzliche Komplexität zur Kernlogik hinzufügen. Python-Dekoratoren bieten eine Lösung für dieses häufige Problem. In diesem Artikel werden fünf nützliche und effektive Python-Dekoratoren vorgestellt, die dazu beitragen, leistungsstarke Datenpipelines zu erstellen und zu optimieren.

Der folgende Beispielcode dient als Grundlage für die fünf Dekoratoren und lädt eine Version des Kalifornischen Wohnungsdatensatzes, die in einem öffentlichen GitHub-Repository zur Verfügung steht:

import pandas as pd
import numpy as np
# Laden des Datensatzes
DATA_URL = "https://raw.githubusercontent.com/gakudo-ai/open-datasets/main/housing.csv"
print("Lade Datenpipeline-Quelle...")
df_pipeline = pd.read_csv(DATA_URL)
print(f"{df_pipeline.shape[0]} Zeilen und {df_pipeline.shape[1]} Spalten geladen.")

1. JIT-Kompilierung

Python-Schleifen haben den zweifelhaften Ruf, bemerkenswert langsam zu sein und Engpässe zu verursachen, insbesondere bei komplexen Operationen wie mathematischen Transformationen über einen Datensatz. Eine schnelle Lösung hierfür ist der Dekorator @njit aus der Numba-Bibliothek, der Python-Funktionen zur Laufzeit in optimierten Maschinencode übersetzt. Dies kann bei großen Datensätzen und komplexen Datenpipelines zu drastischen Geschwindigkeitssteigerungen führen.

from numba import njit
import time
# Extrahieren einer numerischen Spalte als NumPy-Array für schnelle Verarbeitung
incomes = df_pipeline['median_income'].fillna(0).values
@njit
def compute_complex_metric(income_array):
 result = np.zeros_like(income_array)
 # In reinem Python würde eine Schleife wie diese normalerweise langsam sein
 for i in range(len(income_array)):
 result[i] = np.log1p(income_array[i] * 2.5) ** 1.5
 return result
start = time.time()
df_pipeline['income_metric'] = compute_complex_metric(incomes)
print(f"Array in {time.time() - start:.5f} Sekunden verarbeitet!")

2. Zwischenablage

Wenn Datenpipelines rechenintensive Aggregationen oder Datenverknüpfungen enthalten, die Minuten bis Stunden in Anspruch nehmen können, kann memory.cache verwendet werden, um die Ausgaben von Funktionen zu serialisieren. Im Falle eines Neustarts des Skripts oder einer Wiederherstellung nach einem Absturz kann dieser Dekorator die serialisierten Array-Daten von der Festplatte neu laden, wodurch aufwendige Berechnungen übersprungen werden und sowohl Ressourcen als auch Zeit gespart werden.

from joblib import Memory
import time
# Erstellen eines lokalen Cache-Verzeichnisses für Pipeline-Artefakte
memory = Memory(".pipeline_cache", verbose=0)
@memory.cache
def expensive_aggregation(df):
 print("Führe schwere Gruppierungsoperation aus...")
 time.sleep(1.5) # Simulation eines langwierigen Pipeline-Schrittes
 # Gruppieren von Datenpunkten nach ocean_proximity und Berechnung der Mittelwerte
 return df.groupby('ocean_proximity', as_index=False).mean(numeric_only=True)
# Der erste Lauf führt den Code aus; der zweite greift auf die Festplatte für sofortiges Laden zurück
agg_df = expensive_aggregation(df_pipeline)
agg_df_cached = expensive_aggregation(df_pipeline)

3. Schema-Validierung

Pandera ist eine Bibliothek zur statistischen Typisierung (Schema-Überprüfung), die entwickelt wurde, um die schleichende, subtile Korruption von Analysemodellen wie maschinellen Lernvorhersagen oder Dashboards aufgrund von Daten von schlechter Qualität zu verhindern. Im folgenden Beispiel wird gezeigt, wie sie in Kombination mit der parallelen Verarbeitung der Dask-Bibliothek verwendet werden kann, um sicherzustellen, dass die anfängliche Pipeline dem festgelegten Schema entspricht. Andernfalls wird ein Fehler ausgelöst, der hilft, potenzielle Probleme frühzeitig zu erkennen.

import pandera as pa
import pandas as pd
import numpy as np
from dask import delayed, compute
# Definieren eines Schemas zur Durchsetzung von Datentypen und gültigen Bereichen
housing_schema = pa.DataFrameSchema({
 "median_income": pa.Column(float, pa.Check.greater_than(0)),
 "total_rooms": pa.Column(float, pa.Check.gt(0)),
 "ocean_proximity": pa.Column(str, pa.Check.isin(['NEAR BAY', 'NEAR OCEAN']))
})
@delayed
def validate_and_process(df):
 """
 Validiert den DataFrame-Chunk gegen das definierte Schema.
 Wenn die Daten beschädigt sind, löst Pandera einen SchemaError aus.
 """
 return housing_schema.validate(df)
# Aufteilen der Pipeline-Daten in 4 Teile zur parallelen Validierung
chunks = np.array_split(df_pipeline, 4)
lazy_validations = [validate_and_process(chunk) for chunk in chunks]
print("Starte parallele Schema-Validierung...")
try:
 # Auslösen des Dask-Diagramms zur parallelen Validierung der Teile
 validated_chunks = compute(*lazy_validations)
 df_parallel = pd.concat(validated_chunks)
 print(f"Validierung erfolgreich. {len(df_parallel)} Zeilen verarbeitet.")
except pa.errors.SchemaError as e:
 print(f"Datenintegritätsfehler: {e}")

4. Lazy Parallelisierung

Das Ausführen von Pipeline-Schritten, die unabhängig sind, in einer sequenziellen Weise nutzt möglicherweise nicht optimal die Verarbeitungseinheiten wie CPUs. Der Dekorator @delayed über solchen Transformationsfunktionen erstellt ein Abhängigkeitsdiagramm, um die Aufgaben später parallel in optimierter Weise auszuführen, was zur Reduzierung der Gesamtlaufzeit beiträgt.

from dask import delayed, compute
@delayed
def process_chunk(df_chunk):
 # Simulation einer isolierten Transformationsaufgabe
 df_chunk_copy = df_chunk.copy()
 df_chunk_copy['value_per_room'] = df_chunk_copy['median_house_value'] / df_chunk_copy['total_rooms']
 return df_chunk_copy
# Aufteilen des Datensatzes in 4 Teile, die parallel verarbeitet werden
chunks = np.array_split(df_pipeline, 4)
# Lazy-Berechnungsgraf (so funktioniert Dask!)
lazy_results = [process_chunk(chunk) for chunk in chunks]
# Auslösen der Ausführung über mehrere CPUs gleichzeitig
processed_chunks = compute(*lazy_results)
df_parallel = pd.concat(processed_chunks)
print(f"Parallelisierte Ausgabeform: {df_parallel.shape}")

5. Speicherprofilierung

Der Dekorator @profile wurde entwickelt, um stille Speicherlecks zu erkennen, die manchmal dazu führen können, dass Server abstürzen, wenn die zu verarbeitenden Dateien massiv sind. Das Muster besteht darin, die umschlossene Funktion Schritt für Schritt zu überwachen und den RAM-Verbrauch oder den freigegebenen Speicher bei jedem einzelnen Schritt zu beobachten. Letztendlich ist dies eine hervorragende Möglichkeit, Ineffizienzen im Code zu identifizieren und die Speichernutzung mit klaren Zielen zu optimieren.

from memory_profiler import profile
# Eine dekorierte Funktion, die eine zeilenweise Speicheraufteilung in der Konsole ausgibt
@profile(precision=2)
def memory_intensive_step(df):
 print("Führe Speicherdiagnosen durch...")
 # Erstellung einer massiven temporären Kopie, um einen absichtlichen Speicheranstieg zu verursachen
 df_temp = df.copy() 
 df_temp['new_col'] = df_temp['total_bedrooms'] * 100
 # Löschen des temporären DataFrames gibt den RAM frei
 del df_temp 
 return df.dropna(subset=['total_bedrooms'])
# Ausführen des Pipeline-Schrittes: Sie können den Speicherbericht in Ihrem Terminal beobachten
final_df = memory_intensive_step(df_pipeline)

Fazit

In diesem Artikel wurden fünf nützliche und leistungsstarke Python-Dekoratoren zur Optimierung rechenintensiver Datenpipelines vorgestellt. Unterstützt durch paralleles Rechnen und effiziente Verarbeitungsbibliotheken wie Dask und Numba können diese Dekoratoren nicht nur schwere Datenverarbeitungsprozesse beschleunigen, sondern sie auch widerstandsfähiger gegen Fehler und Ausfälle machen.

Iván Palomares Carrascosa ist ein Experte, Autor, Redner und Berater im Bereich KI, maschinelles Lernen, Deep Learning und LLMs. Er schult und berät andere darin, KI in der Praxis zu nutzen.

Bildquelle: ai-generated-gemini

KI Snack

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert