Die Integration von OpenSearch in moderne Systemarchitekturen erfordert durchdachte Muster und Strategien. In diesem Kapitel lernen Sie verschiedene Integrationsmuster kennen und erfahren, wie Sie diese effektiv in Ihrer Architektur einsetzen können.
Die Integration von OpenSearch in eine Microservices-Architektur erfordert besondere Überlegungen zur Datenkonsistenz, Skalierbarkeit und Fehlertoleranz.
Ein häufiges Muster ist die Implementierung einer zentralen Suchfunktion als eigenen Mikroservice:
@Service
public class SearchService {
private final OpenSearchClient client;
private final ObjectMapper objectMapper;
@Value("${opensearch.index.products}")
private String productIndex;
public SearchResponse<Product> searchProducts(SearchRequest request) {
try {
SearchRequest.Builder searchBuilder = new SearchRequest.Builder()
.index(productIndex)
.query(q -> q
.bool(b -> b
.must(m -> m
.multiMatch(mm -> mm
.fields("name^3", "description")
.query(request.getSearchTerm())
)
)
.filter(f -> f
.range(r -> r
.field("price")
.gte(JsonData.of(request.getMinPrice()))
.lte(JsonData.of(request.getMaxPrice()))
)
)
)
)
.size(request.getPageSize())
.from(request.getPageSize() * request.getPage());
return client.search(searchBuilder.build(), Product.class);
} catch (IOException e) {
throw new SearchException("Fehler bei der Produktsuche", e);
}
}
}Für die Synchronisation zwischen Microservices und OpenSearch:
@Service
public class ProductIndexingService {
private final OpenSearchClient client;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "product-events")
public void handleProductEvent(ConsumerRecord<String, String> record) {
try {
ProductEvent event = objectMapper.readValue(record.value(), ProductEvent.class);
switch (event.getType()) {
case CREATED:
case UPDATED:
indexProduct(event.getProduct());
break;
case DELETED:
deleteProduct(event.getProduct().getId());
break;
}
} catch (Exception e) {
// Fehlerbehandlung mit Retry-Mechanismus
handleError(record, e);
}
}
private void indexProduct(Product product) {
IndexRequest<Product> request = IndexRequest.of(i -> i
.index("products")
.id(product.getId())
.document(product)
);
client.index(request);
}
}Die Integration von OpenSearch in eine ereignisgesteuerte Architektur ermöglicht lose Kopplung und hohe Skalierbarkeit.
@Service
public class ProductEventProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
@Transactional
public void publishProductChange(Product product, EventType type) {
ProductEvent event = new ProductEvent(product, type);
try {
String message = objectMapper.writeValueAsString(event);
kafkaTemplate.send("product-events", product.getId(), message);
} catch (JsonProcessingException e) {
throw new EventPublishingException("Fehler beim Veröffentlichen des Events", e);
}
}
}@Service
public class BulkIndexingConsumer {
private final OpenSearchClient client;
private final ObjectMapper objectMapper;
private final List<BulkOperation> operations = new ArrayList<>();
private final int BATCH_SIZE = 1000;
@KafkaListener(topics = "product-events", groupId = "bulk-indexing-group")
public void handleEvents(List<ConsumerRecord<String, String>> records) {
for (ConsumerRecord<String, String> record : records) {
ProductEvent event = objectMapper.readValue(record.value(), ProductEvent.class);
BulkOperation operation = createBulkOperation(event);
operations.add(operation);
if (operations.size() >= BATCH_SIZE) {
executeBulkOperations();
}
}
}
private void executeBulkOperations() {
BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
operations.forEach(bulkBuilder::operations);
try {
BulkResponse response = client.bulk(bulkBuilder.build());
handleBulkResponse(response);
} finally {
operations.clear();
}
}
}Die Integration von OpenSearch über ein API-Gateway ermöglicht zentrale Kontrolle und Monitoring.
# API-Gateway Konfiguration (Spring Cloud Gateway)
spring:
cloud:
gateway:
routes:
- id: search-service
uri: lb://search-service
predicates:
- Path=/api/search/**
filters:
- name: CircuitBreaker
args:
name: searchCircuitBreaker
fallbackUri: forward:/fallback
- name: RateLimit
args:
redis-rate-limiter.replenishRate: 10
redis-rate-limiter.burstCapacity: 20
- name: RequestSize
args:
maxSize: 5MB@RestController
@RequestMapping("/api/search")
public class SearchController {
private final SearchService searchService;
private final CacheManager cacheManager;
@GetMapping("/products")
@Cacheable(value = "productSearch", key = "#request.toString()")
public ResponseEntity<SearchResponse> searchProducts(@Valid SearchRequest request) {
SearchResponse response = searchService.search(request);
return ResponseEntity.ok(response);
}
@PostMapping("/products/_bulk")
public ResponseEntity<BulkResponse> bulkUpdate(@RequestBody BulkRequest request) {
BulkResponse response = searchService.bulkUpdate(request);
cacheManager.getCache("productSearch").clear();
return ResponseEntity.ok(response);
}
}Effektives Caching ist entscheidend für die Performance.
@Configuration
public class CacheConfig {
@Bean
public CacheManager cacheManager() {
RedisCacheManager redisCacheManager = RedisCacheManager.builder(redisConnectionFactory())
.withCacheConfiguration("productSearch",
RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10))
.serializeKeysWith(RedisSerializationContext
.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext
.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer())))
.build();
return new CompositeCacheManager(
new ConcurrentMapCacheManager("localCache"),
redisCacheManager
);
}
}@Service
public class CacheInvalidationService {
private final CacheManager cacheManager;
private final Set<String> updatedProducts = new ConcurrentHashSet<>();
@Scheduled(fixedRate = 60000) // Jede Minute
public void invalidateStaleCache() {
if (!updatedProducts.isEmpty()) {
Cache cache = cacheManager.getCache("productSearch");
updatedProducts.forEach(cache::evict);
updatedProducts.clear();
}
}
public void markForInvalidation(String productId) {
updatedProducts.add(productId);
}
}Die Integration mit Messaging-Systemen ermöglicht asynchrone Verarbeitung und lose Kopplung.
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "search-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}@Configuration
public class RabbitConfig {
@Bean
public Queue indexingQueue() {
return QueueBuilder.durable("indexing-queue")
.withArgument("x-dead-letter-exchange", "dlx")
.withArgument("x-dead-letter-routing-key", "dlq")
.build();
}
@Bean
public Declarables deadLetterConfig() {
Queue dlq = QueueBuilder.durable("indexing-dlq").build();
DirectExchange dlx = new DirectExchange("dlx");
return new Declarables(
dlq,
dlx,
BindingBuilder.bind(dlq).to(dlx).with("dlq")
);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}Robuste Fehlerbehandlung ist entscheidend für zuverlässige Integrationen.
@Configuration
public class ResilienceConfig {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(60))
.permittedNumberOfCallsInHalfOpenState(10)
.slidingWindowSize(100)
.build();
return CircuitBreakerRegistry.of(config);
}
@Bean
public TimeLimiterRegistry timeLimiterRegistry() {
TimeLimiterConfig config = TimeLimiterConfig.custom()
.timeoutDuration(Duration.ofSeconds(2))
.build();
return TimeLimiterRegistry.of(config);
}
}@Service
public class ResilientSearchService {
private final CircuitBreaker circuitBreaker;
private final RetryRegistry retryRegistry;
public SearchResponse<Product> searchWithResilience(SearchRequest request) {
return Retry.decorateFunction(
retryRegistry.retry("searchRetry"),
(SearchRequest r) -> circuitBreaker.executeSupplier(
() -> searchService.search(r)
)
).apply(request);
}
}