51 Integrationsmuster

Die Integration von OpenSearch in moderne Systemarchitekturen erfordert durchdachte Muster und Strategien. In diesem Kapitel lernen Sie verschiedene Integrationsmuster kennen und erfahren, wie Sie diese effektiv in Ihrer Architektur einsetzen können.

51.1 Microservices-Integration

Die Integration von OpenSearch in eine Microservices-Architektur erfordert besondere Überlegungen zur Datenkonsistenz, Skalierbarkeit und Fehlertoleranz.

51.1.1 Zentrale Suchfunktion implementieren

Ein häufiges Muster ist die Implementierung einer zentralen Suchfunktion als eigenen Mikroservice:

@Service
public class SearchService {
    private final OpenSearchClient client;
    private final ObjectMapper objectMapper;

    @Value("${opensearch.index.products}")
    private String productIndex;

    public SearchResponse<Product> searchProducts(SearchRequest request) {
        try {
            SearchRequest.Builder searchBuilder = new SearchRequest.Builder()
                .index(productIndex)
                .query(q -> q
                    .bool(b -> b
                        .must(m -> m
                            .multiMatch(mm -> mm
                                .fields("name^3", "description")
                                .query(request.getSearchTerm())
                            )
                        )
                        .filter(f -> f
                            .range(r -> r
                                .field("price")
                                .gte(JsonData.of(request.getMinPrice()))
                                .lte(JsonData.of(request.getMaxPrice()))
                            )
                        )
                    )
                )
                .size(request.getPageSize())
                .from(request.getPageSize() * request.getPage());

            return client.search(searchBuilder.build(), Product.class);
        } catch (IOException e) {
            throw new SearchException("Fehler bei der Produktsuche", e);
        }
    }
}

51.1.2 Event-basierte Aktualisierung

Für die Synchronisation zwischen Microservices und OpenSearch:

@Service
public class ProductIndexingService {
    private final OpenSearchClient client;
    private final ObjectMapper objectMapper;

    @KafkaListener(topics = "product-events")
    public void handleProductEvent(ConsumerRecord<String, String> record) {
        try {
            ProductEvent event = objectMapper.readValue(record.value(), ProductEvent.class);
            
            switch (event.getType()) {
                case CREATED:
                case UPDATED:
                    indexProduct(event.getProduct());
                    break;
                case DELETED:
                    deleteProduct(event.getProduct().getId());
                    break;
            }
        } catch (Exception e) {
            // Fehlerbehandlung mit Retry-Mechanismus
            handleError(record, e);
        }
    }

    private void indexProduct(Product product) {
        IndexRequest<Product> request = IndexRequest.of(i -> i
            .index("products")
            .id(product.getId())
            .document(product)
        );
        
        client.index(request);
    }
}

51.2 Event-Driven Architecture

Die Integration von OpenSearch in eine ereignisgesteuerte Architektur ermöglicht lose Kopplung und hohe Skalierbarkeit.

51.2.1 Event-Producer implementieren

@Service
public class ProductEventProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    @Transactional
    public void publishProductChange(Product product, EventType type) {
        ProductEvent event = new ProductEvent(product, type);
        
        try {
            String message = objectMapper.writeValueAsString(event);
            kafkaTemplate.send("product-events", product.getId(), message);
        } catch (JsonProcessingException e) {
            throw new EventPublishingException("Fehler beim Veröffentlichen des Events", e);
        }
    }
}

51.2.2 Event-Consumer mit Bulk-Indexierung

@Service
public class BulkIndexingConsumer {
    private final OpenSearchClient client;
    private final ObjectMapper objectMapper;
    
    private final List<BulkOperation> operations = new ArrayList<>();
    private final int BATCH_SIZE = 1000;

    @KafkaListener(topics = "product-events", groupId = "bulk-indexing-group")
    public void handleEvents(List<ConsumerRecord<String, String>> records) {
        for (ConsumerRecord<String, String> record : records) {
            ProductEvent event = objectMapper.readValue(record.value(), ProductEvent.class);
            
            BulkOperation operation = createBulkOperation(event);
            operations.add(operation);
            
            if (operations.size() >= BATCH_SIZE) {
                executeBulkOperations();
            }
        }
    }

    private void executeBulkOperations() {
        BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
        operations.forEach(bulkBuilder::operations);
        
        try {
            BulkResponse response = client.bulk(bulkBuilder.build());
            handleBulkResponse(response);
        } finally {
            operations.clear();
        }
    }
}

51.3 API-Gateway-Integration

Die Integration von OpenSearch über ein API-Gateway ermöglicht zentrale Kontrolle und Monitoring.

51.3.1 Gateway-Konfiguration

# API-Gateway Konfiguration (Spring Cloud Gateway)
spring:
  cloud:
    gateway:
      routes:
        - id: search-service
          uri: lb://search-service
          predicates:
            - Path=/api/search/**
          filters:
            - name: CircuitBreaker
              args:
                name: searchCircuitBreaker
                fallbackUri: forward:/fallback
            - name: RateLimit
              args:
                redis-rate-limiter.replenishRate: 10
                redis-rate-limiter.burstCapacity: 20
            - name: RequestSize
              args:
                maxSize: 5MB

51.3.2 Such-Endpoint mit Caching

@RestController
@RequestMapping("/api/search")
public class SearchController {
    private final SearchService searchService;
    private final CacheManager cacheManager;

    @GetMapping("/products")
    @Cacheable(value = "productSearch", key = "#request.toString()")
    public ResponseEntity<SearchResponse> searchProducts(@Valid SearchRequest request) {
        SearchResponse response = searchService.search(request);
        return ResponseEntity.ok(response);
    }

    @PostMapping("/products/_bulk")
    public ResponseEntity<BulkResponse> bulkUpdate(@RequestBody BulkRequest request) {
        BulkResponse response = searchService.bulkUpdate(request);
        cacheManager.getCache("productSearch").clear();
        return ResponseEntity.ok(response);
    }
}

51.4 Cache-Strategien

Effektives Caching ist entscheidend für die Performance.

51.4.1 Mehrstufiges Caching

@Configuration
public class CacheConfig {
    @Bean
    public CacheManager cacheManager() {
        RedisCacheManager redisCacheManager = RedisCacheManager.builder(redisConnectionFactory())
            .withCacheConfiguration("productSearch", 
                RedisCacheConfiguration.defaultCacheConfig()
                    .entryTtl(Duration.ofMinutes(10))
                    .serializeKeysWith(RedisSerializationContext
                        .SerializationPair
                        .fromSerializer(new StringRedisSerializer()))
                    .serializeValuesWith(RedisSerializationContext
                        .SerializationPair
                        .fromSerializer(new GenericJackson2JsonRedisSerializer())))
            .build();

        return new CompositeCacheManager(
            new ConcurrentMapCacheManager("localCache"),
            redisCacheManager
        );
    }
}

51.4.2 Cache Invalidierung

@Service
public class CacheInvalidationService {
    private final CacheManager cacheManager;
    private final Set<String> updatedProducts = new ConcurrentHashSet<>();

    @Scheduled(fixedRate = 60000) // Jede Minute
    public void invalidateStaleCache() {
        if (!updatedProducts.isEmpty()) {
            Cache cache = cacheManager.getCache("productSearch");
            updatedProducts.forEach(cache::evict);
            updatedProducts.clear();
        }
    }

    public void markForInvalidation(String productId) {
        updatedProducts.add(productId);
    }
}

51.5 Messaging-Systeme

Die Integration mit Messaging-Systemen ermöglicht asynchrone Verarbeitung und lose Kopplung.

51.5.1 Kafka-Integration

@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "search-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
            kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        
        return factory;
    }
}

51.5.2 RabbitMQ-Integration

@Configuration
public class RabbitConfig {
    @Bean
    public Queue indexingQueue() {
        return QueueBuilder.durable("indexing-queue")
            .withArgument("x-dead-letter-exchange", "dlx")
            .withArgument("x-dead-letter-routing-key", "dlq")
            .build();
    }

    @Bean
    public Declarables deadLetterConfig() {
        Queue dlq = QueueBuilder.durable("indexing-dlq").build();
        DirectExchange dlx = new DirectExchange("dlx");
        return new Declarables(
            dlq,
            dlx,
            BindingBuilder.bind(dlq).to(dlx).with("dlq")
        );
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

51.6 Fehlerbehandlung und Resilience

Robuste Fehlerbehandlung ist entscheidend für zuverlässige Integrationen.

51.6.1 Circuit Breaker implementieren

@Configuration
public class ResilienceConfig {
    @Bean
    public CircuitBreakerRegistry circuitBreakerRegistry() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .waitDurationInOpenState(Duration.ofSeconds(60))
            .permittedNumberOfCallsInHalfOpenState(10)
            .slidingWindowSize(100)
            .build();
            
        return CircuitBreakerRegistry.of(config);
    }

    @Bean
    public TimeLimiterRegistry timeLimiterRegistry() {
        TimeLimiterConfig config = TimeLimiterConfig.custom()
            .timeoutDuration(Duration.ofSeconds(2))
            .build();
            
        return TimeLimiterRegistry.of(config);
    }
}

51.6.2 Retry-Mechanismus

@Service
public class ResilientSearchService {
    private final CircuitBreaker circuitBreaker;
    private final RetryRegistry retryRegistry;

    public SearchResponse<Product> searchWithResilience(SearchRequest request) {
        return Retry.decorateFunction(
            retryRegistry.retry("searchRetry"),
            (SearchRequest r) -> circuitBreaker.executeSupplier(
                () -> searchService.search(r)
            )
        ).apply(request);
    }
}

51.7 Best Practices für Integration

  1. Datenintegrität
  2. Performance
  3. Monitoring
  4. Sicherheit