/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.gelfclient.transport;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.graylog2.gelfclient.GelfMessage;
import org.graylog2.gelfclient.util.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GelfSenderThread {
    private static final Logger LOG = LoggerFactory.getLogger(GelfSenderThread.class);
    private final ReentrantLock lock;
    private final Condition connectedCond;
    private final AtomicBoolean keepRunning = new AtomicBoolean(true);
    private final Thread senderThread;
    private Channel channel;
    private final int maxInflightSends;
    private final BlockingQueue<GelfMessage> queue;
    private final AtomicInteger inflightSends;

    public GelfSenderThread(final BlockingQueue<GelfMessage> queue, int maxInflightSends) {
        this.maxInflightSends = maxInflightSends;
        this.lock = new ReentrantLock();
        this.connectedCond = this.lock.newCondition();
        this.inflightSends = new AtomicInteger(0);
        this.queue = queue;
        if (maxInflightSends <= 0) {
            throw new IllegalArgumentException("maxInflightSends must be larger than 0");
        }
        this.senderThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                GelfMessage gelfMessage = null;
                ChannelFutureListener inflightListener = new ChannelFutureListener(){

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        GelfSenderThread.this.inflightSends.decrementAndGet();
                    }
                };
                while (GelfSenderThread.this.keepRunning.get()) {
                    GelfSenderThread.this.lock.lock();
                    try {
                        while (GelfSenderThread.this.channel == null || !GelfSenderThread.this.channel.isActive()) {
                            try {
                                GelfSenderThread.this.connectedCond.await();
                            }
                            catch (InterruptedException e2) {
                                if (GelfSenderThread.this.keepRunning.get()) continue;
                                break;
                            }
                        }
                        try {
                            if (gelfMessage == null) {
                                gelfMessage = (GelfMessage)queue.poll(100L, TimeUnit.MILLISECONDS);
                            }
                            if (gelfMessage == null || GelfSenderThread.this.channel == null || !GelfSenderThread.this.channel.isActive()) continue;
                            while (GelfSenderThread.this.inflightSends.get() > GelfSenderThread.this.maxInflightSends) {
                                Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MICROSECONDS);
                            }
                            GelfSenderThread.this.inflightSends.incrementAndGet();
                            GelfSenderThread.this.channel.writeAndFlush(gelfMessage).addListener(inflightListener);
                            gelfMessage = null;
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                    finally {
                        GelfSenderThread.this.lock.unlock();
                    }
                }
                LOG.debug("GelfSenderThread exiting!");
            }
        });
        this.senderThread.setDaemon(true);
        this.senderThread.setName("GelfSenderThread-" + this.senderThread.getId());
    }

    public void start(Channel channel) {
        this.lock.lock();
        try {
            this.channel = channel;
            this.connectedCond.signalAll();
        }
        finally {
            this.lock.unlock();
        }
        this.senderThread.start();
    }

    public void stop() {
        this.keepRunning.set(false);
        this.senderThread.interrupt();
    }

    void flushSynchronously(int waitDuration, TimeUnit timeUnit, int retries) {
        LOG.debug("Attempting to flush messages in [{}/{}] with [{}] retries", new Object[]{waitDuration, timeUnit, retries});
        for (int i2 = 0; i2 <= retries; ++i2) {
            if (!this.flushingInProgress()) {
                LOG.debug("Successfully flushed messages. Shutting down now.");
                return;
            }
            LOG.debug("Flushing in progress. [{}] messages are still enqueued, and [{}] messages are still in-flight.", (Object)this.queue.size(), (Object)this.inflightSends.get());
            try {
                timeUnit.sleep(waitDuration);
                continue;
            }
            catch (InterruptedException e2) {
                LOG.error("Interrupted message flushing during shutdown after [{}}] attempts.", (Object)i2);
                Thread.currentThread().interrupt();
                return;
            }
        }
        LOG.error("Failed to flush messages in [{}] attempts. Shutting down anyway.", (Object)retries);
    }

    private boolean flushingInProgress() {
        return this.inflightSends != null && this.inflightSends.get() != 0 || !this.queue.isEmpty();
    }
}

