Die Fähigkeit, Daten effizient in OpenSearch zu indexieren, ist fundamental für jedes erfolgreiche Projekt. In diesem Kapitel lernen Sie die verschiedenen Methoden der Datenindexierung kennen - von einfachen Einzeldokumenten bis hin zu komplexen Massendatenimporten. Wir werden jeden Aspekt mit praktischen Beispielen untermauern und typische Herausforderungen sowie deren Lösungen betrachten.
Bevor wir in die technischen Details eintauchen, ist es wichtig, einige grundlegende Konzepte zu verstehen. In OpenSearch wird jedes Dokument in einem Index gespeichert. Sie können sich einen Index wie eine Tabelle in einer relationalen Datenbank vorstellen, allerdings mit einer flexibleren Struktur.
Ein Dokument in OpenSearch ist ein JSON-Objekt, das Ihre Daten enthält. Nehmen wir als Beispiel einen Online-Shop mit Produktdaten:
{
"sku": "B123456789",
"name": "Premium Kaffeemaschine",
"description": "Vollautomatische Espressomaschine mit integriertem Mahlwerk",
"price": 599.99,
"category": "Haushaltsgeräte",
"manufacturer": "CoffeeMax",
"stock": {
"available": 15,
"reserved": 3,
"warehouse": "Berlin-1"
},
"specifications": {
"power": "1450W",
"capacity": "2.1L",
"pressure": "15 bar"
},
"created_at": "2024-01-13T10:00:00Z",
"updated_at": "2024-01-13T10:00:00Z"
}
Dieses Beispiel zeigt wichtige Eigenschaften von OpenSearch-Dokumenten:
OpenSearch bietet verschiedene Wege, um Daten zu indexieren. Die Wahl der richtigen Methode hängt von Ihren spezifischen Anforderungen ab.
Die einfachste Form der Indexierung ist das Hinzufügen einzelner Dokumente. Dies ist ideal für:
Beispiel einer Einzeldokument-Indexierung:
# Dokument mit spezifischer ID indexieren
PUT /products/_doc/B123456789
{
"sku": "B123456789",
"name": "Premium Kaffeemaschine",
"price": 599.99
}
# Dokument mit automatisch generierter ID indexieren
POST /products/_doc
{
"sku": "B987654321",
"name": "Wasserkocher Deluxe",
"price": 79.99
}
Für größere Datenmengen ist die Bulk-API der effizientere Weg. Sie ermöglicht das Ausführen mehrerer Operationen in einer einzigen Anfrage:
POST _bulk
{"index": {"_index": "products", "_id": "1"}}
{"sku": "A111", "name": "Produkt 1", "price": 19.99}
{"index": {"_index": "products", "_id": "2"}}
{"sku": "A112", "name": "Produkt 2", "price": 29.99}
{"update": {"_index": "products", "_id": "1"}}
{"doc": {"price": 24.99}}
{"delete": {"_index": "products", "_id": "2"}}
Wichtige Aspekte der Bulk-Indexierung:
import json
import requests
from typing import List, Dict
def bulk_index_documents(documents: List[Dict], batch_size: int = 1000) -> Dict:
"""
Indexiert Dokumente im Bulk-Modus mit Error Handling.
Args:
documents: Liste der zu indexierenden Dokumente
batch_size: Anzahl der Dokumente pro Batch
Returns:
Dict mit Statistiken über erfolgreiche/fehlgeschlagene Operationen
"""
stats = {"successful": 0, "failed": 0, "errors": []}
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
bulk_body = []
# Bulk Request Body erstellen
for doc in batch:
bulk_body.append(json.dumps({"index": {"_index": "products", "_id": doc["sku"]}}))
bulk_body.append(json.dumps(doc))
# Bulk Request ausführen
response = requests.post(
"http://localhost:9200/_bulk",
headers={"Content-Type": "application/x-ndjson"},
data="\n".join(bulk_body) + "\n"
)
# Ergebnisse auswerten
result = response.json()
if result.get("errors", False):
for item in result["items"]:
if "error" in item["index"]:
stats["failed"] += 1
stats["errors"].append({
"id": item["index"]["_id"],
"error": item["index"]["error"]["reason"]
})
else:
stats["successful"] += 1
else:
stats["successful"] += len(batch)
return statsIngest Pipelines ermöglichen die Transformation von Dokumenten während der Indexierung:
PUT _ingest/pipeline/product_pipeline
{
"description": "Produktdaten-Pipeline mit Validierung und Anreicherung",
"processors": [
{
"set": {
"field": "indexed_at",
"value": "{{_ingest.timestamp}}"
}
},
{
"convert": {
"field": "price",
"type": "float",
"ignore_missing": true
}
},
{
"script": {
"lang": "painless",
"source": """
if (ctx.price < 0) {
throw new RuntimeException('Preis darf nicht negativ sein');
}
ctx.price_category = (ctx.price < 50) ? 'budget' :
(ctx.price < 200) ? 'standard' : 'premium';
"""
}
}
]
}
Verwendung der Pipeline:
POST products/_doc?pipeline=product_pipeline
{
"sku": "C123",
"name": "Testprodukt",
"price": 149.99
}
PUT /products/_settings
{
"index": {
"refresh_interval": "30s",
"number_of_replicas": 0
}
}
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
import threading
class BulkIndexer:
def __init__(self, batch_size: int = 1000, max_workers: int = 4):
self.batch_size = batch_size
self.max_workers = max_workers
self.stats_lock = threading.Lock()
self.stats = {"successful": 0, "failed": 0}
def index_batch(self, batch: List[Dict]) -> None:
try:
# Bulk-Indexierung für einen Batch
result = self._perform_bulk_request(batch)
# Statistiken thread-sicher aktualisieren
with self.stats_lock:
self.stats["successful"] += result["successful"]
self.stats["failed"] += result["failed"]
except Exception as e:
print(f"Fehler beim Indexieren eines Batches: {str(e)}")
def index_documents(self, documents: List[Dict]) -> Dict:
# Dokumente in Batches aufteilen
batches = [documents[i:i + self.batch_size]
for i in range(0, len(documents), self.batch_size)]
# Parallel indexieren mit ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
executor.map(self.index_batch, batches)
return self.stats# Indexierungsstatistiken abrufen
GET _stats/indexing
# Spezifische Index-Metriken
GET /products/_stats
Mapping-Konflikte
{
"error": {
"type": "mapper_parsing_exception",
"reason": "failed to parse field [price] of type [keyword]"
}
}
Lösung: Explizites Mapping vor der Indexierung definieren:
PUT /products
{
"mappings": {
"properties": {
"price": {
"type": "float"
}
}
}
}Speicherprobleme
Lösung: Batch-Größe reduzieren und Indexierungsprozess optimieren:
PUT /_cluster/settings
{
"persistent": {
"indices.breaker.total.limit": "70%"
}
}