package org.mr.kernel.services.queues;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mr.MantaAgent;
import org.mr.MantaAgentConstants;
import org.mr.core.protocol.DeadEndRecipient;
import org.mr.core.protocol.MantaBusMessage;
import org.mr.core.protocol.MantaBusMessageConsts;
import org.mr.core.util.byteable.ByteableList;
import org.mr.kernel.control.ControlSignal;
import org.mr.kernel.services.ServiceConsumer;
import org.mr.kernel.services.ServiceProducer;

/* loaded from: input_file:org/mr/kernel/services/queues/VirtualQueuesManager.class */
public class VirtualQueuesManager {
    private Map virtualQueueServicesMap = Collections.synchronizedMap(new HashMap(101));
    public Log log;
    public static final String ENQUEUE_ACK_HEADER = "enqueue_ack";
    public static final byte ENQUEUE_OK = 1;
    public static final byte NOT_MASTER = 2;
    public static final byte ENQUEUE_FAIL = 3;
    public static final byte RETRANSMIT = 4;
    public static final String ORIGINAL_MESSAGE_PRUDUCER = "msg_org";

    public VirtualQueuesManager() {
        this.log = null;
        this.log = LogFactory.getLog("VirtualQueuesManager");
    }

    public QueueService getQueueService(String str) {
        return (QueueService) this.virtualQueueServicesMap.get(str);
    }

    public List getQueueServices() {
        return new ArrayList(this.virtualQueueServicesMap.values());
    }

    public void registerReceiverToQueue(ServiceConsumer serviceConsumer, long j) {
        QueueService queueService = getQueueService(serviceConsumer.getServiceName());
        if (queueService != null) {
            queueService.active();
            queueService.registerReceiverToQueue(serviceConsumer, j);
        } else if (this.log.isErrorEnabled()) {
            this.log.error(new StringBuffer().append("Got receive request on queue that this agent is not a producer of - queue = ").append(serviceConsumer.getServiceName()).toString());
        }
    }

    public void unregisterReceiverToQueue(ServiceConsumer serviceConsumer) {
        QueueService queueService = getQueueService(serviceConsumer.getServiceName());
        if (queueService != null) {
            queueService.active();
            queueService.unregisterReceiverToQueue(serviceConsumer);
        } else if (this.log.isErrorEnabled()) {
            this.log.error(new StringBuffer().append("Got un-register request on queue that this agent is not a producer of - queue = ").append(serviceConsumer.getServiceName()).toString());
        }
    }

    public void sendQueueCopy(ServiceConsumer serviceConsumer) {
        QueueService queueService = getQueueService(serviceConsumer.getServiceName());
        if (queueService != null) {
            queueService.active();
            queueService.sendQueueCopy(serviceConsumer);
            return;
        }
        if (this.log.isErrorEnabled()) {
            this.log.error(new StringBuffer().append("Got sendQueueCopy request on queue that this agent is not a producer of  - queue = ").append(serviceConsumer.getServiceName()).toString());
        }
        ByteableList byteableList = new ByteableList();
        QueueReceiver queueReceiver = new QueueReceiver(serviceConsumer, 0L);
        MantaBusMessage mantaBusMessage = MantaBusMessage.getInstance();
        mantaBusMessage.setPayload(byteableList);
        mantaBusMessage.setPriority((byte) 0);
        mantaBusMessage.setDeliveryMode((byte) 1);
        mantaBusMessage.setMessageType((byte) 2);
        queueReceiver.receive(mantaBusMessage);
    }

    public int getNumberOfQueueServices() {
        return this.virtualQueueServicesMap.size();
    }

    public boolean hasQueueService(String str) {
        return this.virtualQueueServicesMap.containsKey(str);
    }

    public void addNewQueueServiceToMap(QueueService queueService) {
        this.virtualQueueServicesMap.put(queueService.getServiceName(), queueService);
    }

    public void enqueueMessageToQueue(String str, ServiceProducer serviceProducer, QueueMaster queueMaster, MantaBusMessage mantaBusMessage, String str2) {
        QueueService queueService;
        boolean z = false;
        try {
            queueService = getQueueService(serviceProducer.getServiceName());
        } catch (Throwable th) {
            if (this.log.isErrorEnabled()) {
                this.log.error("Cound not enqueue message. ", th);
            }
        }
        if (queueService == null || !queueService.amIQueueMaster()) {
            sendEnqueueResp(str, serviceProducer, queueMaster, mantaBusMessage, (byte) 2, str2);
            return;
        }
        mantaBusMessage.addHeader(ORIGINAL_MESSAGE_PRUDUCER, ((ServiceProducer) mantaBusMessage.getSource()).getId());
        mantaBusMessage.setSource(queueService.getQueueMaster());
        boolean z2 = mantaBusMessage.getDeliveryMode() == 2;
        if (queueService.getPersistentMode() == 2) {
            z2 = true;
        }
        queueService.active();
        z = queueService.enqueue(mantaBusMessage, z2);
        if (z) {
            sendEnqueueResp(str, serviceProducer, queueMaster, mantaBusMessage, (byte) 1, str2);
        } else {
            sendEnqueueResp(str, serviceProducer, queueMaster, mantaBusMessage, (byte) 3, str2);
        }
    }

    public void sendRespWithoutEnqueue(MantaBusMessage mantaBusMessage, ControlSignal controlSignal) {
        if (this.log.isInfoEnabled()) {
            this.log.info(new StringBuffer().append("Got retransmit of enqueue messages responded with code 4 message was ").append(mantaBusMessage).append(".").toString());
        }
        ServiceProducer serviceProducer = (ServiceProducer) mantaBusMessage.getSource();
        MantaBusMessage mantaBusMessage2 = MantaBusMessage.getInstance();
        mantaBusMessage2.setMessageType((byte) 2);
        mantaBusMessage2.setDeliveryMode((byte) 1);
        mantaBusMessage2.setRecipient(DeadEndRecipient.createDeadEndRecipient(serviceProducer.getAgentName(), serviceProducer.getDomainName()));
        mantaBusMessage2.addHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE, mantaBusMessage.getMessageId());
        mantaBusMessage2.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, new StringBuffer().append(serviceProducer.getServiceName()).append(controlSignal.getControlId()).toString());
        MantaAgent.getInstance().send(mantaBusMessage2, mantaBusMessage.getRecipient(), (byte) 1, (byte) 0, MantaAgentConstants.CONTROL_MESSAGES_TTL);
    }

    private void sendEnqueueResp(String str, ServiceProducer serviceProducer, QueueMaster queueMaster, MantaBusMessage mantaBusMessage, byte b, String str2) {
        if (this.log.isDebugEnabled()) {
            this.log.debug(new StringBuffer().append("Got message enqueue and responded with code ").append((int) b).append(" message was ").append(mantaBusMessage).append(".").toString());
        }
        MantaBusMessage mantaBusMessage2 = MantaBusMessage.getInstance();
        mantaBusMessage2.setMessageType((byte) 2);
        mantaBusMessage2.setDeliveryMode((byte) 1);
        mantaBusMessage2.setRecipient(DeadEndRecipient.createDeadEndRecipient(serviceProducer.getAgentName(), serviceProducer.getDomainName()));
        mantaBusMessage2.addHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE, str2);
        mantaBusMessage2.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, new StringBuffer().append(serviceProducer.getServiceName()).append(str).toString());
        MantaAgent.getInstance().send(mantaBusMessage2, queueMaster, (byte) 1, (byte) 0, MantaAgentConstants.CONTROL_MESSAGES_TTL);
    }
}
