/*
 * Decompiled with CFR 0.152.
 */
package com.sproutsocial.nsq;

import com.sproutsocial.nsq.BackoffHandler;
import com.sproutsocial.nsq.BasePubSub;
import com.sproutsocial.nsq.Client;
import com.sproutsocial.nsq.FailedMessageHandler;
import com.sproutsocial.nsq.HostAndPort;
import com.sproutsocial.nsq.Message;
import com.sproutsocial.nsq.MessageDataHandler;
import com.sproutsocial.nsq.MessageHandler;
import com.sproutsocial.nsq.Subscription;
import com.sproutsocial.nsq.Util;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class Subscriber
extends BasePubSub {
    private final List<HostAndPort> lookups = new ArrayList<HostAndPort>();
    private final List<Subscription> subscriptions = new ArrayList<Subscription>();
    private final int lookupIntervalSecs;
    private int maxLookupFailuresBeforeError;
    private int defaultMaxInFlight = 200;
    private int maxFlushDelayMillis = 2000;
    private int maxAttempts = Integer.MAX_VALUE;
    private FailedMessageHandler failedMessageHandler = null;
    private final Map<String, Integer> failures = new HashMap<String, Integer>();
    private static final int DEFAULT_LOOKUP_INTERVAL_SECS = 60;
    private static final int DEFAULT_MAX_LOOKUP_FAILURES_BEFORE_ERROR = 5;
    private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);

    public Subscriber(Client client, int lookupIntervalSecs, int maxLookupFailuresBeforeError, String ... lookupHosts) {
        super(client);
        Util.checkArgument(lookupIntervalSecs > 0);
        this.lookupIntervalSecs = lookupIntervalSecs;
        this.maxLookupFailuresBeforeError = maxLookupFailuresBeforeError;
        client.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                Subscriber.this.lookup();
            }
        }, lookupIntervalSecs * 1000, lookupIntervalSecs * 1000, true);
        for (String h : lookupHosts) {
            this.lookups.add(HostAndPort.fromString(h).withDefaultPort(4161));
        }
    }

    public Subscriber(int lookupIntervalSecs, String ... lookupHosts) {
        this(Client.getDefaultClient(), lookupIntervalSecs, 5, lookupHosts);
    }

    public Subscriber(String ... lookupHosts) {
        this(Client.getDefaultClient(), 60, 5, lookupHosts);
    }

    public synchronized void subscribe(String topic, String channel, MessageHandler handler) {
        this.subscribe(topic, channel, this.defaultMaxInFlight, handler);
    }

    public synchronized void subscribe(String topic, String channel, final MessageDataHandler handler) {
        this.subscribe(topic, channel, this.defaultMaxInFlight, new BackoffHandler(new MessageHandler(){

            @Override
            public void accept(Message msg) {
                handler.accept(msg.getData());
            }
        }));
    }

    public synchronized void subscribe(String topic, String channel, int maxInFlight, MessageHandler handler) {
        Util.checkNotNull(topic);
        Util.checkNotNull(channel);
        Util.checkNotNull(handler);
        this.client.addSubscriber(this);
        Subscription sub = new Subscription(this.client, topic, channel, handler, this, maxInFlight);
        if (handler instanceof BackoffHandler) {
            ((BackoffHandler)handler).setSubscription(sub);
        }
        this.subscriptions.add(sub);
        sub.checkConnections(this.lookupTopic(topic));
    }

    public synchronized void setMaxInFlight(String topic, String channel, int maxInFlight) {
        for (Subscription sub : this.subscriptions) {
            if (!sub.getTopic().equals(topic) || !sub.getChannel().equals(channel)) continue;
            sub.setMaxInFlight(maxInFlight);
        }
    }

    private synchronized void lookup() {
        if (this.isStopping) {
            return;
        }
        for (Subscription sub : this.subscriptions) {
            sub.checkConnections(this.lookupTopic(sub.getTopic()));
        }
    }

    /*
     * Exception decompiling
     */
    @GuardedBy(value="this")
    protected Set<HostAndPort> lookupTopic(String topic) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [3[CATCHBLOCK]], but top level block is 2[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public void stop() {
        super.stop();
        for (Subscription subscription : this.subscriptions) {
            subscription.stop();
        }
        logger.info("subscriber stopped");
    }

    public synchronized int getDefaultMaxInFlight() {
        return this.defaultMaxInFlight;
    }

    public synchronized void setDefaultMaxInFlight(int defaultMaxInFlight) {
        this.defaultMaxInFlight = defaultMaxInFlight;
    }

    public synchronized int getMaxFlushDelayMillis() {
        return this.maxFlushDelayMillis;
    }

    public synchronized void setMaxFlushDelayMillis(int maxFlushDelayMillis) {
        this.maxFlushDelayMillis = maxFlushDelayMillis;
    }

    public synchronized int getMaxAttempts() {
        return this.maxAttempts;
    }

    public synchronized void setMaxAttempts(int maxAttempts) {
        this.maxAttempts = maxAttempts;
    }

    public synchronized FailedMessageHandler getFailedMessageHandler() {
        return this.failedMessageHandler;
    }

    public synchronized void setFailedMessageHandler(FailedMessageHandler failedMessageHandler) {
        this.failedMessageHandler = failedMessageHandler;
    }

    public synchronized int getLookupIntervalSecs() {
        return this.lookupIntervalSecs;
    }

    public Integer getExecutorQueueSize() {
        ExecutorService executor = this.client.getExecutor();
        return executor instanceof ThreadPoolExecutor ? Integer.valueOf(((ThreadPoolExecutor)executor).getQueue().size()) : null;
    }

    public synchronized int getConnectionCount() {
        int count = 0;
        for (Subscription subscription : this.subscriptions) {
            count += subscription.getConnectionCount();
        }
        return count;
    }
}

