/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.streams.topics;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
import org.slf4j.Logger;

public class CopartitionedTopicsEnforcer {
    private final Logger log;
    private final Function<String, OptionalInt> topicPartitionCountProvider;

    public CopartitionedTopicsEnforcer(LogContext logContext, Function<String, OptionalInt> topicPartitionCountProvider) {
        this.log = logContext.logger(this.getClass());
        this.topicPartitionCountProvider = topicPartitionCountProvider;
    }

    public Map<String, Integer> enforce(Set<String> copartitionedTopics, Set<String> fixedRepartitionTopics, Set<String> flexibleRepartitionTopics) throws StreamsInvalidTopologyException {
        if (copartitionedTopics.isEmpty()) {
            this.log.debug("Ignoring unexpected empty copartitioned topics set.");
            return Map.of();
        }
        HashMap<String, Integer> returnedPartitionCounts = new HashMap<String, Integer>();
        Map<String, Integer> repartitionTopicPartitionCounts = copartitionedTopics.stream().filter(x -> fixedRepartitionTopics.contains(x) || flexibleRepartitionTopics.contains(x)).collect(Collectors.toMap(topic -> topic, this::getPartitionCount));
        Map<String, Integer> nonRepartitionTopicPartitions = copartitionedTopics.stream().filter(topic -> !repartitionTopicPartitionCounts.containsKey(topic)).collect(Collectors.toMap(topic -> topic, this::getPartitionCount));
        int numPartitionsToUseForRepartitionTopics = copartitionedTopics.equals(repartitionTopicPartitionCounts.keySet()) ? (!fixedRepartitionTopics.isEmpty() ? this.validateAndGetNumOfPartitions(repartitionTopicPartitionCounts, fixedRepartitionTopics) : this.getMaxPartitions(repartitionTopicPartitionCounts)) : this.getSamePartitions(nonRepartitionTopicPartitions);
        for (Map.Entry<String, Integer> repartitionTopic : repartitionTopicPartitionCounts.entrySet()) {
            returnedPartitionCounts.put(repartitionTopic.getKey(), numPartitionsToUseForRepartitionTopics);
            if (!fixedRepartitionTopics.contains(repartitionTopic.getKey()) || repartitionTopic.getValue() == numPartitionsToUseForRepartitionTopics) continue;
            String msg = String.format("Number of partitions [%d] of repartition topic [%s] doesn't match number of partitions [%d] of the source topic.", repartitionTopic.getValue(), repartitionTopic.getKey(), numPartitionsToUseForRepartitionTopics);
            throw TopicConfigurationException.incorrectlyPartitionedTopics(msg);
        }
        return returnedPartitionCounts;
    }

    private int getPartitionCount(String topicName) {
        OptionalInt partitions = this.topicPartitionCountProvider.apply(topicName);
        if (partitions.isPresent()) {
            return partitions.getAsInt();
        }
        throw new IllegalStateException("Number of partitions is not set for topic: " + topicName);
    }

    private int validateAndGetNumOfPartitions(Map<String, Integer> repartitionTopics, Collection<String> fixedRepartitionTopics) {
        String firstTopicName = fixedRepartitionTopics.iterator().next();
        int firstNumberOfPartitionsOfInternalTopic = this.getPartitionCount(firstTopicName);
        for (String topicName : fixedRepartitionTopics) {
            int numberOfPartitions = this.getPartitionCount(topicName);
            if (numberOfPartitions == firstNumberOfPartitionsOfInternalTopic) continue;
            String msg = String.format("Following topics do not have the same number of partitions: [%s]", new TreeMap<String, Integer>(repartitionTopics));
            throw TopicConfigurationException.incorrectlyPartitionedTopics(msg);
        }
        return firstNumberOfPartitionsOfInternalTopic;
    }

    private int getSamePartitions(Map<String, Integer> nonRepartitionTopicsInCopartitionGroup) {
        int partitions = nonRepartitionTopicsInCopartitionGroup.values().iterator().next();
        for (Map.Entry<String, Integer> entry : nonRepartitionTopicsInCopartitionGroup.entrySet()) {
            if (entry.getValue() == partitions) continue;
            TreeMap<String, Integer> sorted = new TreeMap<String, Integer>(nonRepartitionTopicsInCopartitionGroup);
            throw TopicConfigurationException.incorrectlyPartitionedTopics(String.format("Following topics do not have the same number of partitions: [%s]", sorted));
        }
        return partitions;
    }

    private int getMaxPartitions(Map<String, Integer> repartitionTopicsInCopartitionGroup) {
        int maxPartitions = 0;
        for (Integer numPartitions : repartitionTopicsInCopartitionGroup.values()) {
            maxPartitions = Integer.max(maxPartitions, numPartitions);
        }
        if (maxPartitions == 0) {
            throw new StreamsInvalidTopologyException("All topics in the copartition group had undefined partition number: " + String.valueOf(repartitionTopicsInCopartitionGroup.keySet()));
        }
        return maxPartitions;
    }
}

