package org.mr.api.blocks;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import org.apache.commons.logging.LogFactory;
import org.mr.api.jms.MantaTopicConnectionFactory;
import org.mr.core.util.Stage;
import org.mr.core.util.StageHandler;
import org.mr.core.util.StageParams;

/* loaded from: input_file:org/mr/api/blocks/ScalableDispatcher.class */
public class ScalableDispatcher implements StageHandler, MessageListener {
    private boolean distributed;
    private Stage stage;
    private TopicSession sendSession;
    private TopicSession receiveSession;
    private MessageConsumer consumer = null;
    private MessageProducer producer = null;
    private List handlers = new ArrayList();
    private Object handlerSync = new Object();
    private String dispatcherName;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScalableDispatcher(String str, boolean z) {
        this.stage = null;
        this.sendSession = null;
        this.receiveSession = null;
        this.distributed = z;
        this.dispatcherName = str;
        if (!z) {
            StageParams stageParams = new StageParams();
            stageParams.setBlocking(false);
            stageParams.setHandler(this);
            stageParams.setMaxNumberOfThreads(1);
            stageParams.setNumberOfStartThreads(1);
            stageParams.setStageName(str);
            stageParams.setPersistent(false);
            this.stage = new Stage(stageParams);
            return;
        }
        try {
            TopicConnection createTopicConnection = new MantaTopicConnectionFactory().createTopicConnection();
            createTopicConnection.start();
            this.sendSession = createTopicConnection.createTopicSession(false, 1);
            this.receiveSession = createTopicConnection.createTopicSession(false, 1);
        } catch (JMSException e) {
            if (LogFactory.getLog("ScalableDispatcher").isErrorEnabled()) {
                LogFactory.getLog("ScalableDispatcher").error("Problem while init distributed Dispatcher.", e);
            }
        }
    }

    public synchronized void dispatch(Serializable serializable) {
        if (!this.distributed) {
            this.stage.enqueue(serializable);
            return;
        }
        try {
            if (this.producer == null) {
                this.producer = this.sendSession.createProducer(this.sendSession.createTopic(this.dispatcherName));
            }
            ObjectMessage createObjectMessage = this.sendSession.createObjectMessage();
            createObjectMessage.setObject(serializable);
            this.producer.send(createObjectMessage, 1, 4, 100000L);
        } catch (JMSException e) {
            if (LogFactory.getLog("ScalableDispatcher").isErrorEnabled()) {
                LogFactory.getLog("ScalableDispatcher").error("Problem while dispatching distributed event.", e);
            }
        }
    }

    @Override // org.mr.core.util.StageHandler
    public boolean handle(Object obj) {
        synchronized (this.handlerSync) {
            if (this.handlers.size() == 0) {
                try {
                    this.handlerSync.wait();
                } catch (InterruptedException e) {
                    if (LogFactory.getLog("ScalableDispatcher").isErrorEnabled()) {
                        LogFactory.getLog("ScalableDispatcher").error("Problem while waiting for handlers.", e);
                    }
                }
            }
            Iterator it = this.handlers.iterator();
            while (it.hasNext()) {
                ((ScalableHandler) it.next()).handle(obj);
            }
        }
        return true;
    }

    public List getHandlers() {
        return this.handlers;
    }

    public void addHandler(ScalableHandler scalableHandler) {
        if (scalableHandler == null) {
            return;
        }
        synchronized (this.handlerSync) {
            if (this.distributed && this.handlers.size() == 0) {
                try {
                    if (this.consumer == null) {
                        this.consumer = this.receiveSession.createConsumer(this.receiveSession.createTopic(this.dispatcherName));
                    }
                    this.consumer.setMessageListener(this);
                } catch (JMSException e) {
                    if (LogFactory.getLog("ScalableDispatcher").isErrorEnabled()) {
                        LogFactory.getLog("ScalableDispatcher").error("Problem creating the JMS objects.", e);
                    }
                }
            }
            this.handlers.add(scalableHandler);
            this.handlerSync.notifyAll();
        }
    }

    public void removeHandler(StageHandler stageHandler) {
        synchronized (this.handlerSync) {
            this.handlers.remove(stageHandler);
            if (this.handlers.size() == 0) {
                try {
                    this.consumer.setMessageListener((MessageListener) null);
                } catch (JMSException e) {
                    if (LogFactory.getLog("ScalableDispatcher").isErrorEnabled()) {
                        LogFactory.getLog("ScalableDispatcher").error("Problem removing JMS listener.", e);
                    }
                }
            }
        }
    }

    public void onMessage(Message message) {
        try {
            handle(((ObjectMessage) message).getObject());
        } catch (JMSException e) {
            if (LogFactory.getLog("ScalableDispatcher").isErrorEnabled()) {
                LogFactory.getLog("ScalableDispatcher").error("Problem handeling new message.", e);
            }
        }
    }
}
