Ingest-Pipelines sind ein zentraler Bestandteil der Datenverarbeitung in OpenSearch. Um ihr volles Potenzial zu verstehen, ist es wichtig, ihr Zusammenspiel mit anderen Komponenten und Prozessen in einem realistischen Kontext zu betrachten. Dieses Kapitel gibt eine Übersicht über das Big Picture der Ingest-Pipelines und beschreibt ihren Platz im Datenfluss von der Datenerfassung bis zur Analyse.
Der typische Datenfluss in OpenSearch lässt sich in folgende Schritte unterteilen:
Die Ingest-Pipeline ist der zentrale Verarbeitungsschritt zwischen der Datenerfassung und der Indexierung. Sie stellt sicher, dass:
Ein Online-Shop möchte Log-Daten aus seinen Webservern analysieren, um Einblicke in Nutzerverhalten und Performance zu gewinnen.
Datenerfassung: Logs werden in Echtzeit von den Webservern generiert und durch Filebeat gesammelt.
Datenanlieferung: Filebeat sendet die Log-Daten über einen HTTP-Request an OpenSearch und gibt dabei die zu verwendende Ingest-Pipeline an.
Verarbeitung in der Ingest-Pipeline:
processing_timestamp und Geo-Informationen basierend auf
IP-Adressen.status_code oder response_time vorhanden und
gültig sind.Beispiel einer Pipeline für die Log-Verarbeitung:
PUT _ingest/pipeline/log_processing_pipeline
{
"description": "Pipeline zur Verarbeitung von Webserver-Logs",
"processors": [
{
"gsub": {
"field": "message",
"pattern": "\\s+",
"replacement": " "
}
},
{
"geoip": {
"field": "client_ip",
"target_field": "geo_location"
}
},
{
"date": {
"field": "timestamp",
"formats": ["dd/MMM/yyyy:HH:mm:ss Z"],
"target_field": "@timestamp"
}
},
{
"set": {
"field": "processing_timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}Indexierung: Die verarbeiteten Log-Daten werden
im Index webserver-logs gespeichert.
Analyse und Abfrage:
Hier ist ein vollständiges Beispiel, das eine Ingest-Pipeline definiert, Daten durch diese Pipeline verarbeitet und die Daten in einen Index schreibt.
products geschrieben.PUT _ingest/pipeline/product_pipeline
{
"description": "Pipeline zur Verarbeitung von Produktdaten",
"processors": [
{
"trim": {
"field": "product_name"
}
},
{
"lowercase": {
"field": "product_name"
}
},
{
"set": {
"field": "processing_date",
"value": "{{_ingest.timestamp}}"
}
}
]
}trim: Entfernt Leerzeichen aus dem Feld
product_name.lowercase: Wandelt den Inhalt von
product_name in Kleinbuchstaben um.set: Fügt ein neues Feld processing_date
mit dem aktuellen Zeitstempel hinzu.PUT /products/_doc/1?pipeline=product_pipeline
{
"product_name": " OpenSearch T-Shirt ",
"price": 19.99,
"stock": 100
}?pipeline=product_pipeline:
Der Pipeline wird die Verarbeitung des Dokuments vor dem Schreiben in
den Index übertragen.Rufe das verarbeitete Dokument aus dem Index ab:
GET /products/_doc/1{
"_index": "products",
"_id": "1",
"_source": {
"product_name": "opensearch t-shirt",
"price": 19.99,
"stock": 100,
"processing_date": "2025-01-14T12:00:00.000Z"
}
}processing_date hinzu.products
gespeichert.Mit diesem Ansatz können Sie Schritt für Schritt nachvollziehen, wie Ingest-Pipelines in OpenSearch Daten modifizieren und sie in einem Index ablegen. Soll ich ein weiteres Beispiel mit anderen Prozessoren erstellen?