Ingest-Pipelines sind ein mächtiges Werkzeug in OpenSearch, das es uns ermöglicht, Daten während der Indexierung automatisch zu verarbeiten und zu transformieren. Stellen Sie sich eine Ingest-Pipeline wie ein Fließband in einer Fabrik vor: Die Rohdaten kommen auf der einen Seite hinein und durchlaufen verschiedene Verarbeitungsschritte, bevor sie am Ende in veredelter Form im Index landen.
Eine Ingest-Pipeline besteht aus einer Reihe von Prozessoren, die nacheinander auf die Dokumente angewendet werden. Jeder Prozessor führt dabei eine spezifische Aufgabe aus.
PUT _ingest/pipeline/basic_pipeline
{
"description": "Eine einfache Pipeline zur Datenbereinigung",
"processors": [
{
"trim": {
"field": "title"
}
},
{
"lowercase": {
"field": "title"
}
}
]
}
Diese Pipeline führt zwei Schritte aus:
title.title in Kleinbuchstaben
um.Die Pipeline wird beim Indexieren eines Dokuments wie folgt eingebunden:
PUT /books/_doc/1?pipeline=basic_pipeline
{
"title": " OpenSearch Grundlagen ",
"author": "Maria Schmidt"
}
OpenSearch bietet eine Vielzahl von Prozessoren für unterschiedliche Aufgaben. Hier sind einige wichtige Beispiele:
Der Set-Prozessor fügt neue Felder hinzu oder ändert bestehende:
PUT _ingest/pipeline/enrich_document
{
"description": "Fügt zusätzliche Metadaten hinzu",
"processors": [
{
"set": {
"field": "processing_date",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"field": "category",
"value": "book",
"override": false
}
}
]
}
Der Date-Prozessor konvertiert Datumsangaben in ein standardisiertes Format:
PUT _ingest/pipeline/date_standardization
{
"description": "Standardisiert Datumsformate",
"processors": [
{
"date": {
"field": "publication_date",
"formats": ["dd.MM.yyyy", "yyyy-MM-dd"],
"target_field": "publication_date_iso",
"timezone": "Europe/Berlin"
}
}
]
}
Prozessoren für die Verarbeitung von Textfeldern:
PUT _ingest/pipeline/text_processing
{
"description": "Verarbeitet und bereinigt Textfelder",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"join": {
"field": "keywords",
"separator": " "
}
},
{
"gsub": {
"field": "description",
"pattern": "\\s+",
"replacement": " "
}
}
]
}
Mit dem Conditional-Prozessor können Bedingungen für die Verarbeitung festgelegt werden:
PUT _ingest/pipeline/conditional_processing
{
"description": "Führt bedingte Verarbeitung durch",
"processors": [
{
"set": {
"if": "ctx.price != null && ctx.price > 100",
"field": "price_category",
"value": "premium"
}
},
{
"set": {
"if": "ctx.price != null && ctx.price <= 100",
"field": "price_category",
"value": "standard"
}
}
]
}
In der Praxis kombinieren wir oft mehrere Prozessoren für komplexere Transformationen.
PUT _ingest/pipeline/product_enrichment
{
"description": "Umfassende Produktdatenverarbeitung",
"processors": [
{
"script": {
"description": "Berechnet den Nettopreis",
"lang": "painless",
"source": "ctx.net_price = ctx.gross_price / 1.19"
}
},
{
"set": {
"field": "sku",
"value": "{{category}}-{{_ingest.timestamp}}"
}
},
{
"convert": {
"field": "stock_level",
"type": "integer"
}
},
{
"uppercase": {
"field": "category"
}
}
]
}
Eine robuste Pipeline sollte Fehler angemessen behandeln. Dafür können wir den On_Failure-Prozessor verwenden:
PUT _ingest/pipeline/robust_pipeline
{
"description": "Pipeline mit Fehlerbehandlung",
"processors": [
{
"convert": {
"field": "price",
"type": "float",
"on_failure": [
{
"set": {
"field": "price_conversion_error",
"value": "true"
}
},
{
"set": {
"field": "price",
"value": 0.0
}
}
]
}
}
]
}
Vor der Produktionseinführung können wir Pipelines mit der Simulate-API testen:
POST _ingest/pipeline/product_enrichment/_simulate
{
"docs": [
{
"_source": {
"category": "electronics",
"gross_price": 119.0,
"stock_level": "42"
}
}
]
}
Mit diesen Grundlagen und Empfehlungen können Sie Ingest-Pipelines effektiv für die Automatisierung Ihrer Datenverarbeitung in OpenSearch nutzen.