package org.apache.qpid.jms;

import jakarta.jms.ConnectionConsumer;
import jakarta.jms.IllegalStateException;
import jakarta.jms.JMSException;
import jakarta.jms.ServerSession;
import jakarta.jms.ServerSessionPool;
import jakarta.jms.Session;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsMessageSupport;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderException;
import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.util.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/qpid-jms-client-2.6.1.jar:org/apache/qpid/jms/JmsConnectionConsumer.class */
public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) JmsConnectionConsumer.class);
    private static final long DEFAULT_DISPATCH_RETRY_DELAY = 1000;
    private final JmsConnection connection;
    private final JmsConsumerInfo consumerInfo;
    private final ServerSessionPool sessionPool;
    private final MessageQueue messageQueue;
    private final Lock stateLock = new ReentrantLock();
    private final Lock dispatchLock = new ReentrantLock();
    private final ReadWriteLock deliveringLock = new ReentrantReadWriteLock(true);
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
    private final ScheduledThreadPoolExecutor dispatcher;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/qpid-jms-client-2.6.1.jar:org/apache/qpid/jms/JmsConnectionConsumer$BoundedMessageDeliverTask.class */
    public final class BoundedMessageDeliverTask implements Runnable {
        private final int deliveryCount;

        public BoundedMessageDeliverTask(int i) {
            this.deliveryCount = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            while (JmsConnectionConsumer.this.messageQueue.isRunning()) {
                int i2 = i;
                i++;
                if (i2 >= this.deliveryCount || !JmsConnectionConsumer.this.deliverNextPending()) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/qpid-jms-client-2.6.1.jar:org/apache/qpid/jms/JmsConnectionConsumer$DeliveryTask.class */
    public final class DeliveryTask implements Consumer<JmsSession> {
        private final JmsInboundMessageDispatch envelope;

        public DeliveryTask(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
            this.envelope = jmsInboundMessageDispatch;
        }

        @Override // java.util.function.Consumer
        public void accept(JmsSession jmsSession) {
            JmsConnectionConsumer.this.deliveringLock.readLock().lock();
            try {
                try {
                    if (JmsConnectionConsumer.this.closed.get()) {
                        JmsConnectionConsumer.this.deliveringLock.readLock().unlock();
                        return;
                    }
                    if (this.envelope.getMessage().isExpired()) {
                        JmsConnectionConsumer.LOG.trace("{} filtered expired message: {}", this.envelope.getConsumerId(), this.envelope);
                        jmsSession.acknowledge(this.envelope, ProviderConstants.ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
                    } else if (jmsSession.redeliveryExceeded(this.envelope)) {
                        JmsConnectionConsumer.LOG.trace("{} filtered message with excessive redelivery count: {}", this.envelope.getConsumerId(), this.envelope);
                        jmsSession.acknowledge(this.envelope, JmsMessageSupport.lookupAckTypeForDisposition(this.envelope.getConsumerInfo().getRedeliveryPolicy().getOutcome(this.envelope.getConsumerInfo().getDestination())));
                    } else {
                        boolean z = false;
                        JmsMessage copy = jmsSession.acknowledge(this.envelope, ProviderConstants.ACK_TYPE.DELIVERED).getMessage().copy();
                        jmsSession.clearSessionRecovered();
                        try {
                            jmsSession.getMessageListener().onMessage(copy);
                        } catch (RuntimeException e) {
                            z = true;
                        }
                        if (!jmsSession.isSessionRecovered()) {
                            if (z) {
                                jmsSession.acknowledge(this.envelope, ProviderConstants.ACK_TYPE.RELEASED);
                            } else {
                                jmsSession.acknowledge(this.envelope, ProviderConstants.ACK_TYPE.ACCEPTED);
                            }
                        }
                    }
                    JmsConnectionConsumer.this.deliveringLock.readLock().unlock();
                } catch (Exception e2) {
                    JmsConnectionConsumer.this.getConnection().onAsyncException(e2);
                    JmsConnectionConsumer.this.deliveringLock.readLock().unlock();
                }
            } catch (Throwable th) {
                JmsConnectionConsumer.this.deliveringLock.readLock().unlock();
                throw th;
            }
        }
    }

    public JmsConnectionConsumer(final JmsConnection jmsConnection, final JmsConsumerInfo jmsConsumerInfo, MessageQueue messageQueue, ServerSessionPool serverSessionPool) throws JMSException {
        this.connection = jmsConnection;
        this.consumerInfo = jmsConsumerInfo;
        this.sessionPool = serverSessionPool;
        this.messageQueue = messageQueue;
        this.dispatcher = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { // from class: org.apache.qpid.jms.JmsConnectionConsumer.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName(getClass().getSimpleName() + ":(" + jmsConsumerInfo.getId() + ")");
                return thread;
            }
        });
        this.dispatcher.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.dispatcher.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        jmsConnection.createResource(jmsConsumerInfo, new ProviderSynchronization() { // from class: org.apache.qpid.jms.JmsConnectionConsumer.2
            @Override // org.apache.qpid.jms.provider.ProviderSynchronization
            public void onPendingSuccess() {
                jmsConnection.addConnectionConsumer(jmsConsumerInfo, JmsConnectionConsumer.this);
            }

            @Override // org.apache.qpid.jms.provider.ProviderSynchronization
            public void onPendingFailure(ProviderException providerException) {
            }
        });
    }

    public JmsConnectionConsumer init() throws JMSException {
        getConnection().startResource(this.consumerInfo);
        return this;
    }

    @Override // org.apache.qpid.jms.JmsMessageDispatcher
    public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
        jmsInboundMessageDispatch.setConsumerInfo(this.consumerInfo);
        this.stateLock.lock();
        try {
            if (jmsInboundMessageDispatch.isEnqueueFirst()) {
                this.messageQueue.enqueueFirst(jmsInboundMessageDispatch);
            } else {
                this.messageQueue.enqueue(jmsInboundMessageDispatch);
            }
            if (this.messageQueue.isRunning()) {
                try {
                    this.dispatcher.execute(() -> {
                        deliverNextPending();
                    });
                } catch (RejectedExecutionException e) {
                    LOG.debug("Rejected on attempt to queue message dispatch", (Throwable) e);
                }
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    @Override // jakarta.jms.ConnectionConsumer
    public void close() throws JMSException {
        if (this.closed.get()) {
            return;
        }
        doClose();
    }

    protected void doClose() throws JMSException {
        this.deliveringLock.writeLock().lock();
        try {
            shutdown();
            this.connection.destroyResource(this.consumerInfo);
        } finally {
            this.deliveringLock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdown() throws JMSException {
        shutdown(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdown(Throwable th) throws JMSException {
        if (this.closed.compareAndSet(false, true)) {
            this.dispatchLock.lock();
            try {
                this.failureCause.set(th);
                this.consumerInfo.setState(JmsResource.ResourceState.CLOSED);
                this.connection.removeConnectionConsumer(this.consumerInfo);
                stop(true);
                this.dispatcher.shutdown();
                try {
                    this.dispatcher.awaitTermination(this.connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    LOG.trace("ConnectionConsumer shutdown of dispatcher was interupted");
                }
            } finally {
                this.dispatchLock.unlock();
            }
        }
    }

    public void start() {
        this.stateLock.lock();
        try {
            if (!this.messageQueue.isRunning()) {
                this.messageQueue.start();
                this.dispatcher.execute(new BoundedMessageDeliverTask(this.messageQueue.size()));
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    public void stop() {
        stop(false);
    }

    private void stop(boolean z) {
        this.dispatchLock.lock();
        this.stateLock.lock();
        try {
            if (z) {
                this.messageQueue.close();
            } else {
                this.messageQueue.stop();
            }
        } finally {
            this.stateLock.unlock();
            this.dispatchLock.unlock();
        }
    }

    @Override // jakarta.jms.ConnectionConsumer
    public ServerSessionPool getServerSessionPool() throws JMSException {
        checkClosed();
        return this.sessionPool;
    }

    JmsConnection getConnection() {
        return this.connection;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JmsConsumerInfo getConsumerInfo() {
        return this.consumerInfo;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setFailureCause(Throwable th) {
        this.failureCause.set(th);
    }

    Throwable getFailureCause() {
        return this.failureCause.get();
    }

    public String toString() {
        return "JmsConnectionConsumer { id=" + this.consumerInfo.getId() + " }";
    }

    protected void checkClosed() throws IllegalStateException {
        IllegalStateException illegalStateException;
        if (this.closed.get()) {
            if (getFailureCause() == null) {
                illegalStateException = new IllegalStateException("The ConnectionConsumer is closed");
            } else {
                illegalStateException = new IllegalStateException("The ConnectionConsumer was closed due to an unrecoverable error.");
                illegalStateException.initCause(getFailureCause());
            }
            throw illegalStateException;
        }
    }

    private boolean deliverNextPending() {
        if (this.messageQueue.isRunning() && !this.messageQueue.isEmpty()) {
            this.dispatchLock.lock();
            try {
                try {
                    ServerSession serverSession = getServerSessionPool().getServerSession();
                    if (serverSession == null) {
                        this.dispatcher.schedule(new BoundedMessageDeliverTask(this.messageQueue.size()), 1000L, TimeUnit.MILLISECONDS);
                        this.dispatchLock.unlock();
                        return false;
                    }
                    Session session = serverSession.getSession();
                    JmsInboundMessageDispatch dequeueNoWait = this.messageQueue.dequeueNoWait();
                    if (session instanceof JmsSession) {
                        ((JmsSession) session).enqueueInSession(new DeliveryTask(dequeueNoWait));
                    } else {
                        LOG.warn("ServerSession provided an unknown JMS Session type to this ConnectionConsumer: {}", session);
                    }
                    serverSession.start();
                    this.dispatchLock.unlock();
                } catch (JMSException e) {
                    this.connection.onAsyncException(e);
                    stop(true);
                    this.dispatchLock.unlock();
                }
            } catch (Throwable th) {
                this.dispatchLock.unlock();
                throw th;
            }
        }
        return !this.messageQueue.isEmpty();
    }
}
