27 Datenindexierung in OpenSearch

Die Fähigkeit, Daten effizient in OpenSearch zu indexieren, ist fundamental für jedes erfolgreiche Projekt. In diesem Kapitel lernen Sie die verschiedenen Methoden der Datenindexierung kennen - von einfachen Einzeldokumenten bis hin zu komplexen Massendatenimporten. Wir werden jeden Aspekt mit praktischen Beispielen untermauern und typische Herausforderungen sowie deren Lösungen betrachten.

27.1 Grundkonzepte der Indexierung

Bevor wir in die technischen Details eintauchen, ist es wichtig, einige grundlegende Konzepte zu verstehen. In OpenSearch wird jedes Dokument in einem Index gespeichert. Sie können sich einen Index wie eine Tabelle in einer relationalen Datenbank vorstellen, allerdings mit einer flexibleren Struktur.

27.1.1 Dokumente und ihre Struktur

Ein Dokument in OpenSearch ist ein JSON-Objekt, das Ihre Daten enthält. Nehmen wir als Beispiel einen Online-Shop mit Produktdaten:

{
    "sku": "B123456789",
    "name": "Premium Kaffeemaschine",
    "description": "Vollautomatische Espressomaschine mit integriertem Mahlwerk",
    "price": 599.99,
    "category": "Haushaltsgeräte",
    "manufacturer": "CoffeeMax",
    "stock": {
        "available": 15,
        "reserved": 3,
        "warehouse": "Berlin-1"
    },
    "specifications": {
        "power": "1450W",
        "capacity": "2.1L",
        "pressure": "15 bar"
    },
    "created_at": "2024-01-13T10:00:00Z",
    "updated_at": "2024-01-13T10:00:00Z"
}

Dieses Beispiel zeigt wichtige Eigenschaften von OpenSearch-Dokumenten:

27.2 Methoden der Indexierung

OpenSearch bietet verschiedene Wege, um Daten zu indexieren. Die Wahl der richtigen Methode hängt von Ihren spezifischen Anforderungen ab.

27.2.1 Einzeldokument-Indexierung

Die einfachste Form der Indexierung ist das Hinzufügen einzelner Dokumente. Dies ist ideal für:

Beispiel einer Einzeldokument-Indexierung:

# Dokument mit spezifischer ID indexieren
PUT /products/_doc/B123456789
{
    "sku": "B123456789",
    "name": "Premium Kaffeemaschine",
    "price": 599.99
}

# Dokument mit automatisch generierter ID indexieren
POST /products/_doc
{
    "sku": "B987654321",
    "name": "Wasserkocher Deluxe",
    "price": 79.99
}

27.2.2 Bulk-Indexierung

Für größere Datenmengen ist die Bulk-API der effizientere Weg. Sie ermöglicht das Ausführen mehrerer Operationen in einer einzigen Anfrage:

POST _bulk
{"index": {"_index": "products", "_id": "1"}}
{"sku": "A111", "name": "Produkt 1", "price": 19.99}
{"index": {"_index": "products", "_id": "2"}}
{"sku": "A112", "name": "Produkt 2", "price": 29.99}
{"update": {"_index": "products", "_id": "1"}}
{"doc": {"price": 24.99}}
{"delete": {"_index": "products", "_id": "2"}}

Wichtige Aspekte der Bulk-Indexierung:

  1. Format: Jede Operation besteht aus zwei Zeilen:
  2. Performance-Optimierung:

27.2.3 Beispiel: Bulk-Indexierung mit Error Handling

import json
import requests
from typing import List, Dict

def bulk_index_documents(documents: List[Dict], batch_size: int = 1000) -> Dict:
    """
    Indexiert Dokumente im Bulk-Modus mit Error Handling.
    
    Args:
        documents: Liste der zu indexierenden Dokumente
        batch_size: Anzahl der Dokumente pro Batch
        
    Returns:
        Dict mit Statistiken über erfolgreiche/fehlgeschlagene Operationen
    """
    stats = {"successful": 0, "failed": 0, "errors": []}
    
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i + batch_size]
        bulk_body = []
        
        # Bulk Request Body erstellen
        for doc in batch:
            bulk_body.append(json.dumps({"index": {"_index": "products", "_id": doc["sku"]}}))
            bulk_body.append(json.dumps(doc))
            
        # Bulk Request ausführen
        response = requests.post(
            "http://localhost:9200/_bulk",
            headers={"Content-Type": "application/x-ndjson"},
            data="\n".join(bulk_body) + "\n"
        )
        
        # Ergebnisse auswerten
        result = response.json()
        if result.get("errors", False):
            for item in result["items"]:
                if "error" in item["index"]:
                    stats["failed"] += 1
                    stats["errors"].append({
                        "id": item["index"]["_id"],
                        "error": item["index"]["error"]["reason"]
                    })
                else:
                    stats["successful"] += 1
        else:
            stats["successful"] += len(batch)
            
    return stats

27.3 Fortgeschrittene Indexierungstechniken

27.3.1 Ingest Pipelines

Ingest Pipelines ermöglichen die Transformation von Dokumenten während der Indexierung:

PUT _ingest/pipeline/product_pipeline
{
  "description": "Produktdaten-Pipeline mit Validierung und Anreicherung",
  "processors": [
    {
      "set": {
        "field": "indexed_at",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "convert": {
        "field": "price",
        "type": "float",
        "ignore_missing": true
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
          if (ctx.price < 0) {
            throw new RuntimeException('Preis darf nicht negativ sein');
          }
          ctx.price_category = (ctx.price < 50) ? 'budget' : 
                             (ctx.price < 200) ? 'standard' : 'premium';
        """
      }
    }
  ]
}

Verwendung der Pipeline:

POST products/_doc?pipeline=product_pipeline
{
    "sku": "C123",
    "name": "Testprodukt",
    "price": 149.99
}

27.3.2 Optimierung der Indexierungsperformance

27.3.2.1 Index-Einstellungen anpassen

PUT /products/_settings
{
    "index": {
        "refresh_interval": "30s",
        "number_of_replicas": 0
    }
}

27.3.2.2 Bulk-Threading implementieren

from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
import threading

class BulkIndexer:
    def __init__(self, batch_size: int = 1000, max_workers: int = 4):
        self.batch_size = batch_size
        self.max_workers = max_workers
        self.stats_lock = threading.Lock()
        self.stats = {"successful": 0, "failed": 0}
        
    def index_batch(self, batch: List[Dict]) -> None:
        try:
            # Bulk-Indexierung für einen Batch
            result = self._perform_bulk_request(batch)
            
            # Statistiken thread-sicher aktualisieren
            with self.stats_lock:
                self.stats["successful"] += result["successful"]
                self.stats["failed"] += result["failed"]
        except Exception as e:
            print(f"Fehler beim Indexieren eines Batches: {str(e)}")
            
    def index_documents(self, documents: List[Dict]) -> Dict:
        # Dokumente in Batches aufteilen
        batches = [documents[i:i + self.batch_size] 
                  for i in range(0, len(documents), self.batch_size)]
        
        # Parallel indexieren mit ThreadPoolExecutor
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            executor.map(self.index_batch, batches)
            
        return self.stats

27.4 Monitoring und Fehlerbehebung

27.4.1 Indexierungsmetriken überwachen

# Indexierungsstatistiken abrufen
GET _stats/indexing

# Spezifische Index-Metriken
GET /products/_stats

27.4.2 Typische Probleme und Lösungen

  1. Mapping-Konflikte

    {
      "error": {
        "type": "mapper_parsing_exception",
        "reason": "failed to parse field [price] of type [keyword]"
      }
    }

    Lösung: Explizites Mapping vor der Indexierung definieren:

    PUT /products
    {
      "mappings": {
        "properties": {
          "price": {
            "type": "float"
          }
        }
      }
    }
  2. Speicherprobleme

    Lösung: Batch-Größe reduzieren und Indexierungsprozess optimieren:

    PUT /_cluster/settings
    {
      "persistent": {
        "indices.breaker.total.limit": "70%"
      }
    }

27.5 Best Practices

  1. Vor der Indexierung:
  2. Während der Indexierung:
  3. Nach der Indexierung: