35 Datenverarbeitung mit Ingest-Pipelines

Ingest-Pipelines sind ein mächtiges Werkzeug in OpenSearch, das es uns ermöglicht, Daten während der Indexierung automatisch zu verarbeiten und zu transformieren. Stellen Sie sich eine Ingest-Pipeline wie ein Fließband in einer Fabrik vor: Die Rohdaten kommen auf der einen Seite hinein und durchlaufen verschiedene Verarbeitungsschritte, bevor sie am Ende in veredelter Form im Index landen.

35.1 Grundlagen der Datenverarbeitung

Eine Ingest-Pipeline besteht aus einer Reihe von Prozessoren, die nacheinander auf die Dokumente angewendet werden. Jeder Prozessor führt dabei eine spezifische Aufgabe aus.

35.1.1 Beispiel: Einfache Pipeline

PUT _ingest/pipeline/basic_pipeline
{
  "description": "Eine einfache Pipeline zur Datenbereinigung",
  "processors": [
    {
      "trim": {
        "field": "title"
      }
    },
    {
      "lowercase": {
        "field": "title"
      }
    }
  ]
}

Diese Pipeline führt zwei Schritte aus:

  1. Entfernt führende und nachfolgende Leerzeichen aus dem Feld title.
  2. Wandelt den Text im Feld title in Kleinbuchstaben um.

Die Pipeline wird beim Indexieren eines Dokuments wie folgt eingebunden:

PUT /books/_doc/1?pipeline=basic_pipeline
{
  "title": "  OpenSearch Grundlagen  ",
  "author": "Maria Schmidt"
}

35.2 Prozessoren und ihre Anwendungsfälle

OpenSearch bietet eine Vielzahl von Prozessoren für unterschiedliche Aufgaben. Hier sind einige wichtige Beispiele:

35.2.1 Feldmanipulation

Der Set-Prozessor fügt neue Felder hinzu oder ändert bestehende:

PUT _ingest/pipeline/enrich_document
{
  "description": "Fügt zusätzliche Metadaten hinzu",
  "processors": [
    {
      "set": {
        "field": "processing_date",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "set": {
        "field": "category",
        "value": "book",
        "override": false
      }
    }
  ]
}

35.2.2 Datumstransformation

Der Date-Prozessor konvertiert Datumsangaben in ein standardisiertes Format:

PUT _ingest/pipeline/date_standardization
{
  "description": "Standardisiert Datumsformate",
  "processors": [
    {
      "date": {
        "field": "publication_date",
        "formats": ["dd.MM.yyyy", "yyyy-MM-dd"],
        "target_field": "publication_date_iso",
        "timezone": "Europe/Berlin"
      }
    }
  ]
}

35.2.3 Textverarbeitung

Prozessoren für die Verarbeitung von Textfeldern:

PUT _ingest/pipeline/text_processing
{
  "description": "Verarbeitet und bereinigt Textfelder",
  "processors": [
    {
      "split": {
        "field": "tags",
        "separator": ","
      }
    },
    {
      "join": {
        "field": "keywords",
        "separator": " "
      }
    },
    {
      "gsub": {
        "field": "description",
        "pattern": "\\s+",
        "replacement": " "
      }
    }
  ]
}

35.2.4 Konditionale Verarbeitung

Mit dem Conditional-Prozessor können Bedingungen für die Verarbeitung festgelegt werden:

PUT _ingest/pipeline/conditional_processing
{
  "description": "Führt bedingte Verarbeitung durch",
  "processors": [
    {
      "set": {
        "if": "ctx.price != null && ctx.price > 100",
        "field": "price_category",
        "value": "premium"
      }
    },
    {
      "set": {
        "if": "ctx.price != null && ctx.price <= 100",
        "field": "price_category",
        "value": "standard"
      }
    }
  ]
}

35.3 Komplexe Transformationen

In der Praxis kombinieren wir oft mehrere Prozessoren für komplexere Transformationen.

35.3.1 Beispiel: Produktdatenverarbeitung

PUT _ingest/pipeline/product_enrichment
{
  "description": "Umfassende Produktdatenverarbeitung",
  "processors": [
    {
      "script": {
        "description": "Berechnet den Nettopreis",
        "lang": "painless",
        "source": "ctx.net_price = ctx.gross_price / 1.19"
      }
    },
    {
      "set": {
        "field": "sku",
        "value": "{{category}}-{{_ingest.timestamp}}"
      }
    },
    {
      "convert": {
        "field": "stock_level",
        "type": "integer"
      }
    },
    {
      "uppercase": {
        "field": "category"
      }
    }
  ]
}

35.4 Fehlerbehandlung und Validierung

Eine robuste Pipeline sollte Fehler angemessen behandeln. Dafür können wir den On_Failure-Prozessor verwenden:

PUT _ingest/pipeline/robust_pipeline
{
  "description": "Pipeline mit Fehlerbehandlung",
  "processors": [
    {
      "convert": {
        "field": "price",
        "type": "float",
        "on_failure": [
          {
            "set": {
              "field": "price_conversion_error",
              "value": "true"
            }
          },
          {
            "set": {
              "field": "price",
              "value": 0.0
            }
          }
        ]
      }
    }
  ]
}

35.5 Pipeline-Simulation

Vor der Produktionseinführung können wir Pipelines mit der Simulate-API testen:

POST _ingest/pipeline/product_enrichment/_simulate
{
  "docs": [
    {
      "_source": {
        "category": "electronics",
        "gross_price": 119.0,
        "stock_level": "42"
      }
    }
  ]
}

35.6 Best Practices und Hinweise

Mit diesen Grundlagen und Empfehlungen können Sie Ingest-Pipelines effektiv für die Automatisierung Ihrer Datenverarbeitung in OpenSearch nutzen.