Das effektive Management von Dokumenten ist eine der wichtigsten Fähigkeiten im Umgang mit OpenSearch. In diesem Kapitel lernen Sie, wie Sie Dokumente erstellen, aktualisieren, abrufen und löschen können. Wir werden jeden Aspekt des Dokumentenmanagements im Detail betrachten und durch praktische Beispiele veranschaulichen.
Stellen Sie sich ein Dokument in OpenSearch wie einen Container vor, der Ihre Daten in Form von JSON speichert. Anders als in relationalen Datenbanken, wo Daten in starre Tabellenstrukturen gepresst werden müssen, bieten Dokumente eine flexible Möglichkeit, auch komplexe Datenstrukturen natürlich abzubilden.
Betrachten wir ein typisches Dokument:
{
"id": "user_12345",
"name": {
"first": "Maria",
"last": "Schmidt"
},
"email": "maria.schmidt@example.com",
"registration_date": "2024-01-13T10:15:30Z",
"interests": ["fotografie", "reisen", "technologie"],
"address": {
"street": "Hauptstraße 123",
"city": "Berlin",
"postal_code": "10115",
"country": "Deutschland"
},
"account_status": "active",
"last_login": "2024-01-13T08:30:00Z"
}
Dieses Beispiel zeigt wichtige Eigenschaften eines OpenSearch-Dokuments:
Es gibt zwei Hauptwege, um neue Dokumente zu erstellen:
PUT /users/_doc/user_12345
{
"name": {
"first": "Maria",
"last": "Schmidt"
},
"email": "maria.schmidt@example.com"
}
POST /users/_doc
{
"name": {
"first": "Thomas",
"last": "Müller"
},
"email": "thomas.mueller@example.com"
}
Beim Erstellen von Dokumenten gibt es einige wichtige Aspekte zu beachten:
Das Abrufen von Dokumenten kann auf verschiedene Arten erfolgen:
GET /users/_doc/user_12345
GET /_mget
{
"docs": [
{
"_index": "users",
"_id": "user_12345"
},
{
"_index": "users",
"_id": "user_12346"
}
]
}
GET /users/_doc/user_12345?_source=name,email
OpenSearch bietet verschiedene Möglichkeiten zur Aktualisierung von Dokumenten:
PUT /users/_doc/user_12345
{
"name": {
"first": "Maria",
"last": "Schmidt-Meyer"
},
"email": "maria.schmidt-meyer@example.com"
}
POST /users/_update/user_12345
{
"doc": {
"account_status": "premium",
"last_login": "2024-01-13T15:45:00Z"
}
}
POST /users/_update/user_12345
{
"script": {
"source": "if (ctx._source.login_count == null) { ctx._source.login_count = 1 } else { ctx._source.login_count++ }",
"lang": "painless"
}
}
Das Löschen von Dokumenten kann einzeln oder in Masse erfolgen:
DELETE /users/_doc/user_12345
POST /users/_delete_by_query
{
"query": {
"range": {
"last_login": {
"lt": "now-6M"
}
}
}
}
OpenSearch verwendet Optimistic Concurrency Control (OCC), um Konflikte bei gleichzeitigen Änderungen zu verhindern. Dies funktioniert über die Versionierung von Dokumenten:
PUT /users/_doc/user_12345?if_seq_no=123&if_primary_term=2
{
"name": {
"first": "Maria",
"last": "Schmidt"
},
"email": "maria.schmidt@example.com"
}
Wenn die Sequenznummer oder der Primary Term nicht übereinstimmen, wird die Operation mit einem Versionierungskonflikt abgelehnt:
{
"error": {
"type": "version_conflict_engine_exception",
"reason": "[user_12345]: version conflict, required seqNo [123], primary term [2]"
},
"status": 409
}
Für die effiziente Verarbeitung mehrerer Dokumente bietet OpenSearch die Bulk-API:
POST _bulk
{"index":{"_index":"users","_id":"1"}}
{"name":{"first":"Hans","last":"Weber"},"email":"hans.weber@example.com"}
{"update":{"_index":"users","_id":"2"}}
{"doc":{"last_login":"2024-01-13T16:00:00Z"}}
{"delete":{"_index":"users","_id":"3"}}
Wichtige Aspekte bei Bulk-Operationen:
Ein praktisches Beispiel für Bulk-Operationen mit Error Handling:
def process_bulk_operations(operations, batch_size=1000):
"""
Verarbeitet Bulk-Operationen mit Error Handling und Reporting.
Args:
operations: Liste von Operationen
batch_size: Anzahl Operationen pro Batch
Returns:
Dict mit Statistiken über erfolgreiche/fehlgeschlagene Operationen
"""
stats = {"successful": 0, "failed": 0, "errors": []}
# Batches erstellen
for i in range(0, len(operations), batch_size):
batch = operations[i:i + batch_size]
bulk_body = []
# Bulk Request Body erstellen
for op in batch:
if op["action"] == "index":
bulk_body.append(json.dumps({"index": {"_index": op["index"], "_id": op["id"]}}))
bulk_body.append(json.dumps(op["document"]))
elif op["action"] == "update":
bulk_body.append(json.dumps({"update": {"_index": op["index"], "_id": op["id"]}}))
bulk_body.append(json.dumps({"doc": op["document"]}))
elif op["action"] == "delete":
bulk_body.append(json.dumps({"delete": {"_index": op["index"], "_id": op["id"]}}))
# Bulk Request ausführen
try:
response = requests.post(
"http://localhost:9200/_bulk",
headers={"Content-Type": "application/x-ndjson"},
data="\n".join(bulk_body) + "\n"
)
result = response.json()
if result.get("errors", False):
for item in result["items"]:
action = list(item.keys())[0]
if "error" in item[action]:
stats["failed"] += 1
stats["errors"].append({
"id": item[action]["_id"],
"error": item[action]["error"]["reason"]
})
else:
stats["successful"] += 1
else:
stats["successful"] += len(batch)
except Exception as e:
stats["failed"] += len(batch)
stats["errors"].append({
"batch_start": i,
"error": str(e)
})
return statsOpenSearch unterstützt keine integrierte Dokumentenhistorie, aber wir können eine eigene Implementierung erstellen:
def update_with_history(index, doc_id, update_data):
"""
Aktualisiert ein Dokument und bewahrt die Historie.
Args:
index: Name des Index
doc_id: Dokument-ID
update_data: Neue Dokumentendaten
"""
# Aktuelles Dokument abrufen
current_doc = requests.get(f"http://localhost:9200/{index}/_doc/{doc_id}").json()
if "_source" in current_doc:
# Historie erstellen
history_doc = {
"original_id": doc_id,
"version": current_doc["_version"],
"timestamp": datetime.utcnow().isoformat(),
"data": current_doc["_source"]
}
# Historie speichern
requests.post(
f"http://localhost:9200/{index}-history/_doc",
json=history_doc
)
# Dokument aktualisieren
requests.put(
f"http://localhost:9200/{index}/_doc/{doc_id}",
json=update_data
)