30 Dokumentenmanagement

Das effektive Management von Dokumenten ist eine der wichtigsten Fähigkeiten im Umgang mit OpenSearch. In diesem Kapitel lernen Sie, wie Sie Dokumente erstellen, aktualisieren, abrufen und löschen können. Wir werden jeden Aspekt des Dokumentenmanagements im Detail betrachten und durch praktische Beispiele veranschaulichen.

30.1 Das Dokument als Grundbaustein

Stellen Sie sich ein Dokument in OpenSearch wie einen Container vor, der Ihre Daten in Form von JSON speichert. Anders als in relationalen Datenbanken, wo Daten in starre Tabellenstrukturen gepresst werden müssen, bieten Dokumente eine flexible Möglichkeit, auch komplexe Datenstrukturen natürlich abzubilden.

Betrachten wir ein typisches Dokument:

{
    "id": "user_12345",
    "name": {
        "first": "Maria",
        "last": "Schmidt"
    },
    "email": "maria.schmidt@example.com",
    "registration_date": "2024-01-13T10:15:30Z",
    "interests": ["fotografie", "reisen", "technologie"],
    "address": {
        "street": "Hauptstraße 123",
        "city": "Berlin",
        "postal_code": "10115",
        "country": "Deutschland"
    },
    "account_status": "active",
    "last_login": "2024-01-13T08:30:00Z"
}

Dieses Beispiel zeigt wichtige Eigenschaften eines OpenSearch-Dokuments:

30.2 CRUD-Operationen im Detail

30.2.1 Dokumente erstellen (Create)

Es gibt zwei Hauptwege, um neue Dokumente zu erstellen:

  1. Mit einer spezifischen ID:
PUT /users/_doc/user_12345
{
    "name": {
        "first": "Maria",
        "last": "Schmidt"
    },
    "email": "maria.schmidt@example.com"
}
  1. Mit automatisch generierter ID:
POST /users/_doc
{
    "name": {
        "first": "Thomas",
        "last": "Müller"
    },
    "email": "thomas.mueller@example.com"
}

Beim Erstellen von Dokumenten gibt es einige wichtige Aspekte zu beachten:

  1. Versionierung: Jedes Dokument erhält automatisch eine Version
  2. Optimistic Concurrency Control: Verhindert unbeabsichtigte Überschreibungen
  3. Automatisches Mapping: Feldtypen werden automatisch erkannt, wenn kein explizites Mapping existiert

30.2.2 Dokumente lesen (Read)

Das Abrufen von Dokumenten kann auf verschiedene Arten erfolgen:

  1. Einzelnes Dokument über ID:
GET /users/_doc/user_12345
  1. Mehrere Dokumente über Multi-Get:
GET /_mget
{
    "docs": [
        {
            "_index": "users",
            "_id": "user_12345"
        },
        {
            "_index": "users",
            "_id": "user_12346"
        }
    ]
}
  1. Selektives Abrufen von Feldern:
GET /users/_doc/user_12345?_source=name,email

30.2.3 Dokumente aktualisieren (Update)

OpenSearch bietet verschiedene Möglichkeiten zur Aktualisierung von Dokumenten:

  1. Vollständige Aktualisierung (Replace):
PUT /users/_doc/user_12345
{
    "name": {
        "first": "Maria",
        "last": "Schmidt-Meyer"
    },
    "email": "maria.schmidt-meyer@example.com"
}
  1. Partielle Aktualisierung:
POST /users/_update/user_12345
{
    "doc": {
        "account_status": "premium",
        "last_login": "2024-01-13T15:45:00Z"
    }
}
  1. Skriptbasierte Aktualisierung:
POST /users/_update/user_12345
{
    "script": {
        "source": "if (ctx._source.login_count == null) { ctx._source.login_count = 1 } else { ctx._source.login_count++ }",
        "lang": "painless"
    }
}

30.2.4 Dokumente löschen (Delete)

Das Löschen von Dokumenten kann einzeln oder in Masse erfolgen:

  1. Einzelnes Dokument löschen:
DELETE /users/_doc/user_12345
  1. Mehrere Dokumente über Query löschen:
POST /users/_delete_by_query
{
    "query": {
        "range": {
            "last_login": {
                "lt": "now-6M"
            }
        }
    }
}

30.3 Optimistic Concurrency Control

OpenSearch verwendet Optimistic Concurrency Control (OCC), um Konflikte bei gleichzeitigen Änderungen zu verhindern. Dies funktioniert über die Versionierung von Dokumenten:

PUT /users/_doc/user_12345?if_seq_no=123&if_primary_term=2
{
    "name": {
        "first": "Maria",
        "last": "Schmidt"
    },
    "email": "maria.schmidt@example.com"
}

Wenn die Sequenznummer oder der Primary Term nicht übereinstimmen, wird die Operation mit einem Versionierungskonflikt abgelehnt:

{
    "error": {
        "type": "version_conflict_engine_exception",
        "reason": "[user_12345]: version conflict, required seqNo [123], primary term [2]"
    },
    "status": 409
}

30.4 Bulk-Operationen

Für die effiziente Verarbeitung mehrerer Dokumente bietet OpenSearch die Bulk-API:

POST _bulk
{"index":{"_index":"users","_id":"1"}}
{"name":{"first":"Hans","last":"Weber"},"email":"hans.weber@example.com"}
{"update":{"_index":"users","_id":"2"}}
{"doc":{"last_login":"2024-01-13T16:00:00Z"}}
{"delete":{"_index":"users","_id":"3"}}

Wichtige Aspekte bei Bulk-Operationen:

  1. Format: Jede Operation besteht aus zwei Zeilen (außer Delete)
  2. Performance: Optimale Batch-Größe liegt zwischen 5-15 MB
  3. Error Handling: Fehler bei einzelnen Operationen beeinflussen nicht den gesamten Batch

Ein praktisches Beispiel für Bulk-Operationen mit Error Handling:

def process_bulk_operations(operations, batch_size=1000):
    """
    Verarbeitet Bulk-Operationen mit Error Handling und Reporting.
    
    Args:
        operations: Liste von Operationen
        batch_size: Anzahl Operationen pro Batch
    
    Returns:
        Dict mit Statistiken über erfolgreiche/fehlgeschlagene Operationen
    """
    stats = {"successful": 0, "failed": 0, "errors": []}
    
    # Batches erstellen
    for i in range(0, len(operations), batch_size):
        batch = operations[i:i + batch_size]
        bulk_body = []
        
        # Bulk Request Body erstellen
        for op in batch:
            if op["action"] == "index":
                bulk_body.append(json.dumps({"index": {"_index": op["index"], "_id": op["id"]}}))
                bulk_body.append(json.dumps(op["document"]))
            elif op["action"] == "update":
                bulk_body.append(json.dumps({"update": {"_index": op["index"], "_id": op["id"]}}))
                bulk_body.append(json.dumps({"doc": op["document"]}))
            elif op["action"] == "delete":
                bulk_body.append(json.dumps({"delete": {"_index": op["index"], "_id": op["id"]}}))
        
        # Bulk Request ausführen
        try:
            response = requests.post(
                "http://localhost:9200/_bulk",
                headers={"Content-Type": "application/x-ndjson"},
                data="\n".join(bulk_body) + "\n"
            )
            
            result = response.json()
            if result.get("errors", False):
                for item in result["items"]:
                    action = list(item.keys())[0]
                    if "error" in item[action]:
                        stats["failed"] += 1
                        stats["errors"].append({
                            "id": item[action]["_id"],
                            "error": item[action]["error"]["reason"]
                        })
                    else:
                        stats["successful"] += 1
            else:
                stats["successful"] += len(batch)
                
        except Exception as e:
            stats["failed"] += len(batch)
            stats["errors"].append({
                "batch_start": i,
                "error": str(e)
            })
    
    return stats

30.5 Versionierung und Historisierung

OpenSearch unterstützt keine integrierte Dokumentenhistorie, aber wir können eine eigene Implementierung erstellen:

def update_with_history(index, doc_id, update_data):
    """
    Aktualisiert ein Dokument und bewahrt die Historie.
    
    Args:
        index: Name des Index
        doc_id: Dokument-ID
        update_data: Neue Dokumentendaten
    """
    # Aktuelles Dokument abrufen
    current_doc = requests.get(f"http://localhost:9200/{index}/_doc/{doc_id}").json()
    
    if "_source" in current_doc:
        # Historie erstellen
        history_doc = {
            "original_id": doc_id,
            "version": current_doc["_version"],
            "timestamp": datetime.utcnow().isoformat(),
            "data": current_doc["_source"]
        }
        
        # Historie speichern
        requests.post(
            f"http://localhost:9200/{index}-history/_doc",
            json=history_doc
        )
        
        # Dokument aktualisieren
        requests.put(
            f"http://localhost:9200/{index}/_doc/{doc_id}",
            json=update_data
        )

30.6 Best Practices

  1. Dokumentendesign:
  2. Performance:
  3. Konsistenz: