/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.index.reindex.remote;

import java.io.IOException;
import java.io.InputStream;
import java.lang.runtime.SwitchBootstraps;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hc.core5.http.ContentTooLongException;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.ResponseListener;
import org.opensearch.client.RestClient;
import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParseException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.reindex.RejectAwareActionListener;
import org.opensearch.index.reindex.RetryListener;
import org.opensearch.index.reindex.ScrollableHitSource;
import org.opensearch.index.reindex.remote.RemoteRequestBuilders;
import org.opensearch.index.reindex.remote.RemoteResponseParsers;
import org.opensearch.index.reindex.remote.RemoteVersion;
import org.opensearch.threadpool.ThreadPool;

public class RemoteScrollableHitSource
extends ScrollableHitSource {
    private final RestClient client;
    private final BytesReference query;
    private final SearchRequest searchRequest;
    RemoteVersion remoteVersion;

    public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry, Consumer<ScrollableHitSource.AsyncResponse> onResponse, Consumer<Exception> fail, RestClient client, BytesReference query, SearchRequest searchRequest) {
        super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail);
        this.query = query;
        this.searchRequest = searchRequest;
        this.client = client;
    }

    protected void doStart(RejectAwareActionListener<ScrollableHitSource.Response> searchListener) {
        this.logger.info("Starting remote reindex for {}", (Object)Arrays.toString(this.searchRequest.indices()));
        this.lookupRemoteVersion((RejectAwareActionListener<RemoteVersion>)RejectAwareActionListener.wrap(version -> {
            this.remoteVersion = version;
            this.logger.trace("Starting initial search");
            this.executeWithRetries(RemoteRequestBuilders.initialSearch(this.searchRequest, this.query, this.remoteVersion), (BiFunction<XContentParser, MediaType, ScrollableHitSource.Response>)RemoteResponseParsers.RESPONSE_PARSER, (RejectAwareActionListener<ScrollableHitSource.Response>)RejectAwareActionListener.withResponseHandler((RejectAwareActionListener)searchListener, r -> this.onStartResponse(searchListener, (ScrollableHitSource.Response)r)));
        }, arg_0 -> searchListener.onFailure(arg_0), arg_0 -> searchListener.onFailure(arg_0)));
    }

    void lookupRemoteVersion(RejectAwareActionListener<RemoteVersion> listener) {
        this.logger.trace("Checking version for remote domain");
        this.execute(new Request("GET", ""), (BiFunction)RemoteResponseParsers.MAIN_ACTION_PARSER, (RejectAwareActionListener)listener);
    }

    private void onStartResponse(RejectAwareActionListener<ScrollableHitSource.Response> searchListener, ScrollableHitSource.Response response) {
        this.logger.trace("On initial search response");
        if (Strings.hasLength((String)response.getScrollId()) && response.getHits().isEmpty()) {
            this.logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", (Object)response.getScrollId());
            this.doStartNextScroll(response.getScrollId(), TimeValue.timeValueMillis((long)0L), searchListener);
        } else {
            searchListener.onResponse((Object)response);
        }
    }

    protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<ScrollableHitSource.Response> searchListener) {
        this.logger.trace("Starting next scroll call");
        TimeValue keepAlive = TimeValue.timeValueNanos((long)(this.searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()));
        this.executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, this.remoteVersion), (BiFunction<XContentParser, MediaType, ScrollableHitSource.Response>)RemoteResponseParsers.RESPONSE_PARSER, searchListener);
    }

    protected void clearScroll(final String scrollId, final Runnable onCompletion) {
        this.logger.debug("Clearing the scrollID {}", (Object)scrollId);
        this.client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, this.remoteVersion), new ResponseListener(){

            public void onSuccess(Response response) {
                RemoteScrollableHitSource.this.logger.debug("Successfully cleared [{}]", (Object)scrollId);
                onCompletion.run();
            }

            public void onFailure(Exception e) {
                this.logFailure(e);
                onCompletion.run();
            }

            private void logFailure(Exception e) {
                if (e instanceof ResponseException) {
                    ResponseException re = (ResponseException)e;
                    if (RemoteScrollableHitSource.this.remoteVersion.before(RemoteVersion.ELASTICSEARCH_2_0_0) && re.getResponse().getStatusLine().getStatusCode() == 404) {
                        RemoteScrollableHitSource.this.logger.debug(() -> new ParameterizedMessage("Failed to clear scroll [{}] from pre-2.0 OpenSearch. This is normal if the request terminated normally as the scroll has already been cleared automatically.", (Object)scrollId), (Throwable)e);
                        return;
                    }
                }
                RemoteScrollableHitSource.this.logger.warn(() -> new ParameterizedMessage("Failed to clear scroll [{}]", (Object)scrollId), (Throwable)e);
            }
        });
    }

    protected void cleanup(Runnable onCompletion) {
        this.threadPool.generic().submit(() -> {
            try {
                this.client.close();
                this.logger.debug("Shut down remote connection");
            }
            catch (IOException e) {
                this.logger.error("Failed to shutdown the remote connection", (Throwable)e);
            }
            finally {
                onCompletion.run();
            }
        });
    }

    private void executeWithRetries(Request request, BiFunction<XContentParser, MediaType, ScrollableHitSource.Response> parser, RejectAwareActionListener<ScrollableHitSource.Response> childListener) {
        this.execute(request, (BiFunction)parser, (RejectAwareActionListener)new RetryListener(this.logger, this.threadPool, this.backoffPolicy, r -> {
            this.logger.debug("Retrying execute request {}", (Object)request.getEndpoint());
            this.countSearchRetry.run();
            this.execute(request, (BiFunction)parser, (RejectAwareActionListener)r);
        }, childListener));
    }

    private <T> void execute(Request request, final BiFunction<XContentParser, MediaType, T> parser, final RejectAwareActionListener<? super T> listener) {
        this.logger.trace("Executing http request to remote cluster {}", (Object)request.getEndpoint());
        final Supplier contextSupplier = this.threadPool.getThreadContext().newRestorableContext(true);
        try {
            this.client.performRequestAsync(request, new ResponseListener(){

                public void onSuccess(Response response) {
                    RemoteScrollableHitSource.this.logger.trace("Successfully got response from the remote");
                    try (ThreadContext.StoredContext ctx = (ThreadContext.StoredContext)contextSupplier.get();){
                        Object parsedResponse;
                        assert (ctx != null);
                        try {
                            HttpEntity responseEntity = response.getEntity();
                            InputStream content = responseEntity.getContent();
                            MediaType mediaType = null;
                            if (responseEntity.getContentType() != null) {
                                String mimeType = ContentType.parse((CharSequence)responseEntity.getContentType()).getMimeType();
                                mediaType = MediaType.fromMediaType((String)mimeType);
                            }
                            if (mediaType == null) {
                                try {
                                    RemoteScrollableHitSource.this.logger.error("Response didn't include Content-Type: " + RemoteScrollableHitSource.bodyMessage(response.getEntity()));
                                    throw new OpenSearchException("Response didn't include supported Content-Type, remote is likely not an OpenSearch instance", new Object[0]);
                                }
                                catch (IOException e) {
                                    OpenSearchException ee = new OpenSearchException("Error extracting body from response", new Object[0]);
                                    ee.addSuppressed((Throwable)e);
                                    throw ee;
                                }
                            }
                            try (XContentParser xContentParser = mediaType.xContent().createParser(NamedXContentRegistry.EMPTY, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, content);){
                                parsedResponse = parser.apply(xContentParser, mediaType);
                            }
                            catch (XContentParseException e) {
                                throw new OpenSearchException("Error parsing the response, remote is likely not an OpenSearch instance", (Throwable)e, new Object[0]);
                            }
                        }
                        catch (IOException e) {
                            throw new OpenSearchException("Error deserializing response, remote is likely not an OpenSearch instance", (Throwable)e, new Object[0]);
                        }
                        listener.onResponse(parsedResponse);
                    }
                }

                /*
                 * Unable to fully structure code
                 */
                public void onFailure(Exception e) {
                    ctx = (ThreadContext.StoredContext)contextSupplier.get();
                    try {
                        if (!2.$assertionsDisabled && ctx == null) {
                            throw new AssertionError();
                        }
                        RemoteScrollableHitSource.access$500(RemoteScrollableHitSource.this).debug("Received response failure {}", (Object)e.getMessage());
                        v0 = e;
                        Objects.requireNonNull(v0);
                        var3_3 = v0;
                        var4_5 = 0;
                        switch (SwitchBootstraps.typeSwitch("typeSwitch", new Object[]{ResponseException.class, ConnectException.class, ContentTooLongException.class}, (Object)var3_3, var4_5)) {
                            case 0: {
                                re = (ResponseException)var3_3;
                                statusCode = re.getResponse().getStatusLine().getStatusCode();
                                e = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), (Exception)re);
                                if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode || statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
                                    listener.onRejection(e);
                                    return;
                                }
                                break;
                            }
                            case 1: {
                                ignored = (ConnectException)var3_3;
                                listener.onRejection(e);
                                return;
                            }
                            case 2: {
                                ignored = (ContentTooLongException)var3_3;
                                e = new IllegalArgumentException("Remote responded with a chunk that was too large. Use a smaller batch size.", e);
                                ** break;
lbl28:
                                // 1 sources

                                break;
                            }
                            ** default:
lbl30:
                            // 1 sources

                            break;
                        }
                    }
                    finally {
                        if (ctx != null) {
                            ctx.close();
                        }
                    }
                    listener.onFailure(e);
                }
            });
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    static OpenSearchStatusException wrapExceptionToPreserveStatus(int statusCode, @Nullable HttpEntity entity, Exception cause) {
        RestStatus status = RestStatus.fromCode((int)statusCode);
        Object messagePrefix = "";
        if (status == null) {
            messagePrefix = "Couldn't extract status [" + statusCode + "]. ";
            status = RestStatus.INTERNAL_SERVER_ERROR;
        }
        try {
            return new OpenSearchStatusException((String)messagePrefix + RemoteScrollableHitSource.bodyMessage(entity), status, (Throwable)cause, new Object[0]);
        }
        catch (IOException ioe) {
            OpenSearchStatusException e = new OpenSearchStatusException((String)messagePrefix + "Failed to extract body.", status, (Throwable)cause, new Object[0]);
            e.addSuppressed((Throwable)ioe);
            return e;
        }
    }

    private static String bodyMessage(@Nullable HttpEntity entity) throws IOException {
        if (entity == null) {
            return "No error body.";
        }
        try {
            return "body=" + EntityUtils.toString((HttpEntity)entity);
        }
        catch (ParseException ex) {
            throw new IOException(ex);
        }
    }

    static /* synthetic */ Logger access$500(RemoteScrollableHitSource x0) {
        return x0.logger;
    }
}

