package org.mr.kernel.services.queues;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mr.IMessageListener;
import org.mr.MantaAgent;
import org.mr.MantaAgentConstants;
import org.mr.MantaException;
import org.mr.core.protocol.MantaBusMessage;
import org.mr.core.protocol.MantaBusMessageConsts;
import org.mr.kernel.control.ControlSignal;
import org.mr.kernel.services.ServiceConsumer;

/* loaded from: input_file:org/mr/kernel/services/queues/QueueSubscriberManager.class */
public class QueueSubscriberManager {
    QueueService queue;
    private static Log log;
    ArrayList queueSubscribers = new ArrayList();
    HashMap subscribersToListeners = new HashMap();
    HashMap subscribersToNumberOfReceives = new HashMap();
    MantaAgent layer = MantaAgent.getInstance();

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueSubscriberManager(QueueService queueService) {
        this.queue = queueService;
        Log log2 = LogFactory.getLog("QueueSubscriberManager");
        log = log2;
        log = log2;
    }

    public synchronized void subscribeToQueue(ServiceConsumer serviceConsumer, IMessageListener iMessageListener, long j) throws MantaException {
        this.queueSubscribers.add(serviceConsumer);
        this.subscribersToListeners.put(serviceConsumer, iMessageListener);
        this.subscribersToNumberOfReceives.put(serviceConsumer, String.valueOf(j));
        sendSubscriptionToCoordinator(this.queue.getQueueMaster(), serviceConsumer, iMessageListener);
    }

    public synchronized void unregisterFromQueue(ServiceConsumer serviceConsumer, IMessageListener iMessageListener) throws MantaException {
        this.queueSubscribers.remove(serviceConsumer);
        this.subscribersToListeners.remove(serviceConsumer);
        this.subscribersToNumberOfReceives.remove(serviceConsumer);
        this.layer.unsubscribeMessageListener(iMessageListener, new StringBuffer().append(this.queue.getServiceName()).append(serviceConsumer.getId()).toString());
        QueueMaster queueMaster = this.queue.getQueueMaster();
        if (queueMaster == null) {
            return;
        }
        MantaBusMessage mantaBusMessage = MantaBusMessage.getInstance();
        mantaBusMessage.setMessageType((byte) 1);
        mantaBusMessage.setPayload(new ControlSignal((byte) 5, this.layer.getSingletonRepository().getServiceSecurityManager().getServiceSecurityKey(this.queue.getServiceName(), (byte) 1)));
        mantaBusMessage.setRecipient(queueMaster);
        mantaBusMessage.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, this.queue.getServiceName());
        this.layer.send(mantaBusMessage, serviceConsumer, (byte) 1, (byte) 0, MantaAgentConstants.CONTROL_MESSAGES_TTL);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void queueCoordinatorFound(QueueMaster queueMaster) {
        Iterator it = this.queueSubscribers.iterator();
        while (it.hasNext()) {
            ServiceConsumer serviceConsumer = (ServiceConsumer) it.next();
            try {
                sendSubscriptionToCoordinator(queueMaster, serviceConsumer, (IMessageListener) this.subscribersToListeners.get(serviceConsumer));
            } catch (MantaException e) {
                if (log.isErrorEnabled()) {
                    log.error("error sending subscribe to queue coordinator", e);
                }
            }
        }
    }

    private void sendSubscriptionToCoordinator(QueueMaster queueMaster, ServiceConsumer serviceConsumer, IMessageListener iMessageListener) throws MantaException {
        if (queueMaster != null) {
            MantaBusMessage mantaBusMessage = MantaBusMessage.getInstance();
            mantaBusMessage.setMessageType((byte) 1);
            ControlSignal controlSignal = new ControlSignal((byte) 4, this.layer.getSingletonRepository().getServiceSecurityManager().getServiceSecurityKey(this.queue.getServiceName(), (byte) 1));
            controlSignal.getParams().put(ControlSignal.NUMBER_OF_RECEIVE_ON_QUEUE_KEY, (String) this.subscribersToNumberOfReceives.get(serviceConsumer));
            mantaBusMessage.setPayload(controlSignal);
            String stringBuffer = new StringBuffer().append(this.queue.getServiceName()).append(serviceConsumer.getId()).toString();
            this.layer.unsubscribeMessageListener(iMessageListener, stringBuffer);
            this.layer.subscribeMessageListener(iMessageListener, stringBuffer);
            mantaBusMessage.setRecipient(queueMaster);
            mantaBusMessage.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, this.queue.getServiceName());
            this.layer.send(mantaBusMessage, serviceConsumer, (byte) 1, (byte) 0, MantaAgentConstants.CONTROL_MESSAGES_TTL);
        }
    }

    public void removeSubscribeToQueue(ServiceConsumer serviceConsumer) {
        this.queueSubscribers.remove(serviceConsumer);
        this.subscribersToListeners.remove(serviceConsumer);
        this.subscribersToNumberOfReceives.remove(serviceConsumer);
    }
}
