Stellen Sie sich Ingest-Pipelines als Fließband in einer Fabrik vor: Bevor die Rohmaterialien (Ihre Daten) ihren endgültigen Platz (den Index) erreichen, durchlaufen sie verschiedene Bearbeitungsstationen (Prozessoren), die jeweils spezifische Transformationen vornehmen.
Ingest-Pipelines lösen häufige Herausforderungen bei der Datenverarbeitung:
Eine Pipeline besteht aus zwei Hauptkomponenten:
PUT /_ingest/pipeline/meine-pipeline
{
"description": "Eine aussagekräftige Beschreibung der Pipeline",
"processors": [
{
"prozessor-name": {
"field": "feldname",
"weitere": "parameter"
}
}
]
}{
"set": {
"field": "kategorie",
"value": "standard"
}
}{
"convert": {
"field": "preis",
"type": "float"
}
}{
"date": {
"field": "erstellungsdatum",
"formats": ["dd.MM.yyyy", "yyyy-MM-dd"],
"target_field": "erstellungsdatum_iso"
}
}Prozessoren können basierend auf Bedingungen ausgeführt werden:
{
"set": {
"if": "ctx.preis > 100",
"field": "preiskategorie",
"value": "premium"
}
}Dynamische Werte durch Templates:
{
"set": {
"field": "vollerName",
"value": "{{vorname}} {{nachname}}"
}
}PUT /_ingest/pipeline/log-pipeline
{
"description": "Bereitet Log-Einträge auf",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"]
}
},
{
"date": {
"field": "timestamp",
"formats": ["ISO8601"]
}
},
{
"set": {
"field": "processed_timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}PUT /_ingest/pipeline/produkt-pipeline
{
"description": "Normalisiert Produktdaten",
"processors": [
{
"uppercase": {
"field": "produktcode"
}
},
{
"convert": {
"field": "preis",
"type": "double"
}
},
{
"script": {
"source": "ctx.preis_netto = ctx.preis / 1.19"
}
}
]
}Der Simulate-Endpoint ermöglicht das Testen von Pipelines:
POST /_ingest/pipeline/meine-pipeline/_simulate
{
"docs": [
{
"_source": {
"feldname": "testwert"
}
}
]
}Pipeline-Performance optimieren:
{
"processors": [
{
"script": {
"description": "Leichte Operationen zuerst",
"source": "ctx.einfache_berechnung = ctx.wert * 2"
}
},
{
"script": {
"description": "Komplexe Operationen später",
"source": "ctx.komplexe_berechnung = komplexe_funktion(ctx.wert)"
}
}
]
}Fehlerbehandlung für einzelne Prozessoren:
{
"date": {
"field": "timestamp",
"formats": ["dd/MM/yyyy"],
"on_failure": [
{
"set": {
"field": "error_message",
"value": "Ungültiges Datumsformat in {{_ingest._value}}"
}
}
]
}
}PUT /_ingest/pipeline/robuste-pipeline
{
"description": "Pipeline mit umfassender Fehlerbehandlung",
"processors": [...],
"on_failure": [
{
"set": {
"field": "processing_error",
"value": "Pipeline-Fehler: {{_ingest.on_failure_message}}"
}
}
]
}// Basis-Pipeline
PUT /_ingest/pipeline/basis
{
"processors": [
{
"set": {
"field": "processed_date",
"value": "{{_ingest.timestamp}}"
}
}
]
}
// Spezifische Pipeline
PUT /_ingest/pipeline/spezifisch
{
"processors": [
{
"pipeline": {
"name": "basis"
}
},
{
"set": {
"field": "type",
"value": "spezial"
}
}
]
}{
"script": {
"lang": "painless",
"id": "mein-cached-script", // Vorcompiliertes Script
"params": {
"faktor": 1.19
}
}
}Pipeline-Statistiken überwachen:
GET /_ingest/pipeline/_statsGute Pipeline-Dokumentation durch aussagekräftige Beschreibungen:
PUT /_ingest/pipeline/gut-dokumentiert
{
"description": "Diese Pipeline verarbeitet Kundendaten: \n- Normalisiert Namen\n- Validiert E-Mails\n- Berechnet Kundenalter",
"processors": [
{
"uppercase": {
"description": "Konvertiert Namen in Großbuchstaben für einheitliche Suche",
"field": "name"
}
}
]
}Detaillierte Simulation für Debugging:
POST /_ingest/pipeline/meine-pipeline/_simulate?verbose=true
{
"docs": [
{
"_source": {
"test": "wert"
}
}
]
}Einzelne Prozessoren analysieren:
GET /_ingest/processorPipeline-Design:
Performance:
Wartbarkeit:
Ingest-Pipelines sind ein mächtiges Werkzeug zur Datentransformation. Mit sorgfältiger Planung und Implementierung können sie die Datenqualität erheblich verbessern und den Indexierungsprozess optimieren.