/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.purgatory;

import com.yammer.metrics.core.Meter;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.purgatory.ListOffsetsPartitionStatus;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelayedRemoteListOffsets
extends DelayedOperation {
    private static final Logger LOG = LoggerFactory.getLogger(DelayedRemoteListOffsets.class);
    private static final KafkaMetricsGroup METRICS_GROUP = new KafkaMetricsGroup("kafka.server", "DelayedRemoteListOffsetsMetrics");
    static final Meter AGGREGATE_EXPIRATION_METER = METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
    static final Map<TopicPartition, Meter> PARTITION_EXPIRATION_METERS = new ConcurrentHashMap<TopicPartition, Meter>();
    private final int version;
    private final Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition;
    private final Consumer<TopicPartition> partitionOrException;
    private final Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>> responseCallback;

    public DelayedRemoteListOffsets(long delayMs, int version, Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition, Consumer<TopicPartition> partitionOrException, Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>> responseCallback) {
        super(delayMs);
        this.version = version;
        this.statusByPartition = statusByPartition;
        this.partitionOrException = partitionOrException;
        this.responseCallback = responseCallback;
        statusByPartition.forEach((topicPartition, status) -> {
            status.completed(status.futureHolderOpt().isEmpty());
            if (status.futureHolderOpt().isPresent()) {
                status.responseOpt(Optional.of(this.buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())));
            }
            LOG.trace("Initial partition status for {} is {}", topicPartition, status);
        });
    }

    public void onExpiration() {
        this.statusByPartition.forEach((topicPartition, status) -> {
            if (!status.completed()) {
                LOG.debug("Expiring list offset request for partition {} with status {}", topicPartition, status);
                status.futureHolderOpt().ifPresent(futureHolder -> futureHolder.jobFuture().cancel(true));
                DelayedRemoteListOffsets.recordExpiration(topicPartition);
            }
        });
    }

    public void onComplete() {
        HashMap groupedByTopic = new HashMap();
        this.statusByPartition.forEach((tp, status) -> {
            ListOffsetsResponseData.ListOffsetsTopicResponse response = groupedByTopic.computeIfAbsent(tp.topic(), k -> new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(tp.topic()));
            status.responseOpt().ifPresent(res -> response.partitions().add(res));
        });
        this.responseCallback.accept(groupedByTopic.values());
    }

    public boolean tryComplete() {
        AtomicBoolean completable = new AtomicBoolean(true);
        this.statusByPartition.forEach((partition, status) -> {
            if (!status.completed()) {
                try {
                    this.partitionOrException.accept((TopicPartition)partition);
                }
                catch (ApiException e) {
                    status.futureHolderOpt().ifPresent(futureHolder -> {
                        futureHolder.jobFuture().cancel(false);
                        futureHolder.taskFuture().complete(new OffsetResultHolder.FileRecordsOrError(Optional.of(e), Optional.empty()));
                    });
                }
                status.futureHolderOpt().ifPresent(futureHolder -> {
                    if (futureHolder.taskFuture().isDone()) {
                        ListOffsetsResponseData.ListOffsetsPartitionResponse response;
                        try {
                            OffsetResultHolder.FileRecordsOrError taskFuture = (OffsetResultHolder.FileRecordsOrError)futureHolder.taskFuture().get();
                            if (taskFuture.hasException()) {
                                response = this.buildErrorResponse(Errors.forException((Throwable)taskFuture.exception().get()), partition.partition());
                            } else if (!taskFuture.hasTimestampAndOffset()) {
                                Errors error = status.maybeOffsetsError().map(e -> this.version >= 5 ? Errors.forException((Throwable)e) : Errors.LEADER_NOT_AVAILABLE).orElse(Errors.NONE);
                                response = this.buildErrorResponse(error, partition.partition());
                            } else {
                                ListOffsetsResponseData.ListOffsetsPartitionResponse partitionResponse = this.buildErrorResponse(Errors.NONE, partition.partition());
                                FileRecords.TimestampAndOffset found = taskFuture.timestampAndOffset().get();
                                if (status.lastFetchableOffset().isPresent() && found.offset >= status.lastFetchableOffset().get()) {
                                    if (status.maybeOffsetsError().isPresent()) {
                                        Errors error = this.version >= 5 ? Errors.forException((Throwable)status.maybeOffsetsError().get()) : Errors.LEADER_NOT_AVAILABLE;
                                        partitionResponse.setErrorCode(error.code());
                                    }
                                } else {
                                    partitionResponse = new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(partition.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(found.timestamp).setOffset(found.offset);
                                    if (found.leaderEpoch.isPresent() && this.version >= 4) {
                                        partitionResponse.setLeaderEpoch(((Integer)found.leaderEpoch.get()).intValue());
                                    }
                                }
                                response = partitionResponse;
                            }
                        }
                        catch (InterruptedException | ExecutionException e2) {
                            response = this.buildErrorResponse(Errors.forException((Throwable)e2), partition.partition());
                        }
                        status.responseOpt(Optional.of(response));
                        status.completed(true);
                    }
                    completable.set(completable.get() && futureHolder.taskFuture().isDone());
                });
            }
        });
        if (completable.get()) {
            return this.forceComplete();
        }
        return false;
    }

    private ListOffsetsResponseData.ListOffsetsPartitionResponse buildErrorResponse(Errors e, int partitionIndex) {
        return new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(partitionIndex).setErrorCode(e.code()).setTimestamp(-1L).setOffset(-1L);
    }

    private static void recordExpiration(TopicPartition partition) {
        AGGREGATE_EXPIRATION_METER.mark();
        PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp -> METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic", (Object)tp.topic()), Utils.mkEntry((Object)"partition", (Object)String.valueOf(tp.partition()))}))).mark();
    }
}

