/*
 * Decompiled with CFR 0.152.
 */
package org.restcomm.protocols.ss7.sccp.impl;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.restcomm.protocols.ss7.sccp.impl.SccpConnectionBaseImpl;
import org.restcomm.protocols.ss7.sccp.impl.SccpRoutingControl;
import org.restcomm.protocols.ss7.sccp.impl.SccpStackImpl;
import org.restcomm.protocols.ss7.sccp.impl.message.MessageUtil;
import org.restcomm.protocols.ss7.sccp.impl.message.SccpConnSegmentableMessageImpl;
import org.restcomm.protocols.ss7.sccp.message.SccpConnMessage;
import org.restcomm.protocols.ss7.sccp.parameter.LocalReference;
import org.restcomm.protocols.ss7.sccp.parameter.ProtocolClass;
import org.restcomm.protocols.ss7.sccp.parameter.RefusalCause;
import org.restcomm.protocols.ss7.sccp.parameter.ReleaseCause;
import org.restcomm.protocols.ss7.sccp.parameter.ResetCause;
import org.restcomm.protocols.ss7.scheduler.Scheduler;
import org.restcomm.protocols.ss7.scheduler.Task;

abstract class SccpConnectionWithTransmitQueueImpl
extends SccpConnectionBaseImpl {
    private static final int SLEEP_DELAY = 15;
    private static final int OUTGOING_SIZE_LIMIT = 10000;
    private MessageSender messageSender;
    private final ConcurrentLinkedQueue<SccpConnMessage> outgoing = new ConcurrentLinkedQueue();

    public SccpConnectionWithTransmitQueueImpl(int sls, int localSsn, LocalReference localReference, ProtocolClass protocol, SccpStackImpl stack, SccpRoutingControl sccpRoutingControl) {
        super(sls, localSsn, localReference, protocol, stack, sccpRoutingControl);
        this.messageSender = new MessageSender(this.stack.scheduler);
    }

    @Override
    protected void sendMessage(SccpConnMessage message) throws Exception {
        if (this.stack.state != SccpStackImpl.State.RUNNING) {
            this.logger.error("Trying to send SCCP message from SCCP user but SCCP stack is not RUNNING");
            return;
        }
        if (!(message instanceof SccpConnSegmentableMessageImpl)) {
            super.sendMessage(message);
        } else {
            if (MessageUtil.getDln(message) == null) {
                this.logger.error(String.format("Message doesn't have DLN set: ", message));
                throw new IllegalStateException();
            }
            if (this.outgoing.size() > 10000) {
                this.logger.error(String.format("Outgoing messages queue overloaded, already reached the limit %d", 10000));
                throw new IllegalStateException(String.format("Outgoing messages queue overloaded, already reached the limit %d", 10000));
            }
            this.outgoing.add(message);
            this.messageSender.submit();
        }
    }

    @Override
    public void reset(ResetCause reason) throws Exception {
        super.reset(reason);
        this.clearTransmitQueue();
    }

    @Override
    public void disconnect(ReleaseCause reason, byte[] data) throws Exception {
        super.disconnect(reason, data);
        this.clearTransmitQueue();
    }

    @Override
    public void refuse(RefusalCause reason, byte[] data) throws Exception {
        super.refuse(reason, data);
        this.clearTransmitQueue();
    }

    protected int getTransmitQueueSize() {
        return this.outgoing.size();
    }

    private void clearTransmitQueue() {
        this.outgoing.clear();
    }

    private class MessageSender
    extends Task {
        public MessageSender(Scheduler scheduler) {
            super(scheduler);
        }

        public int getQueueNumber() {
            return Scheduler.L4WRITE_QUEUE;
        }

        public void submit() {
            this.scheduler.submit((Task)this, Integer.valueOf(this.getQueueNumber()));
        }

        public long perform() {
            while (!SccpConnectionWithTransmitQueueImpl.this.outgoing.isEmpty() && SccpConnectionWithTransmitQueueImpl.this.isCanSendData()) {
                SccpConnMessage message = (SccpConnMessage)SccpConnectionWithTransmitQueueImpl.this.outgoing.poll();
                if (SccpConnectionWithTransmitQueueImpl.this.logger.isDebugEnabled()) {
                    SccpConnectionWithTransmitQueueImpl.this.logger.debug("Polling another message from queue: " + message.toString());
                }
                try {
                    SccpConnectionWithTransmitQueueImpl.super.sendMessage(message);
                }
                catch (Exception e) {
                    SccpConnectionWithTransmitQueueImpl.this.logger.error("IOException when sending the message: " + e.getMessage(), e);
                    throw new RuntimeException(e);
                }
            }
            if (!SccpConnectionWithTransmitQueueImpl.this.outgoing.isEmpty()) {
                SccpConnectionWithTransmitQueueImpl.this.logger.debug("Queue not empty, retrying");
                try {
                    Thread.sleep(15L);
                }
                catch (InterruptedException e) {
                    SccpConnectionWithTransmitQueueImpl.this.logger.error(e);
                    throw new RuntimeException(e);
                }
                this.submit();
            }
            return 0L;
        }
    }
}

