package org.mr.kernel.services.topics;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.mr.MantaAgent;
import org.mr.core.persistent.PersistentMap;
import org.mr.core.protocol.MantaBusMessage;
import org.mr.core.protocol.MantaBusMessageConsts;
import org.mr.kernel.delivery.PostOffice;
import org.mr.kernel.services.MantaService;
import org.mr.kernel.services.PayLoadSelector;
import org.mr.kernel.services.ServiceActorControlCenter;
import org.mr.kernel.services.ServiceConsumer;
import org.mr.kernel.services.ServiceProducer;

/* loaded from: input_file:org/mr/kernel/services/topics/TopicService.class */
public class TopicService extends MantaService {
    PersistentMap subscribers;

    public TopicService(String str) {
        super(str);
        this.subscribers = new PersistentMap(new StringBuffer().append(str).append("_subscribers").toString(), false, true);
        ArrayList arrayList = new ArrayList();
        synchronized (this.subscribers) {
            arrayList.addAll(this.subscribers.values());
        }
        int size = arrayList.size();
        for (int i = 0; i < size; i++) {
            ServiceConsumer serviceConsumer = (ServiceConsumer) arrayList.get(i);
            this.consumers.add(serviceConsumer);
            this.serviceActorMap.put(serviceConsumer.getId(), serviceConsumer);
        }
    }

    @Override // org.mr.kernel.services.MantaService
    public void addConsumer(ServiceConsumer serviceConsumer) {
        this.subscribers.put(serviceConsumer.getId(), serviceConsumer, serviceConsumer.isDurable());
        super.addConsumer(serviceConsumer);
    }

    @Override // org.mr.kernel.services.MantaService
    public void removeConsumer(ServiceConsumer serviceConsumer) {
        if (serviceConsumer.isDurable()) {
            ServiceActorControlCenter.removeUpConsumer(serviceConsumer);
        } else {
            this.subscribers.remove(serviceConsumer.getId());
            super.removeConsumer(serviceConsumer);
        }
    }

    @Override // org.mr.kernel.services.MantaService
    public byte getServiceType() {
        return (byte) 2;
    }

    public void publish(MantaBusMessage mantaBusMessage, ServiceProducer serviceProducer, byte b, byte b2, long j) throws IOException {
        List consumers = getConsumers();
        int size = consumers.size();
        if (size == 0) {
            return;
        }
        PayLoadSelector selector = MantaAgent.getInstance().getSingletonRepository().getSelectorsManager().getSelector(mantaBusMessage.getHeader(MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE));
        for (int i = 0; i < size; i++) {
            ServiceConsumer serviceConsumer = (ServiceConsumer) consumers.get(i);
            if (selector != null ? selector.accept(serviceConsumer.getSelectorStatment(), mantaBusMessage) : true) {
                MantaBusMessage prepareMessageShallowCopy = size == 1 ? mantaBusMessage : PostOffice.prepareMessageShallowCopy(mantaBusMessage);
                prepareMessageShallowCopy.setRecipient(serviceConsumer);
                prepareMessageShallowCopy.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, getServiceName());
                if (getPersistentMode() == 2) {
                    b = 2;
                }
                MantaAgent.getInstance().send(prepareMessageShallowCopy, serviceProducer, b, b2, j);
            }
        }
    }

    @Override // org.mr.kernel.services.MantaService
    public void removeProducer(ServiceProducer serviceProducer) {
        super.removeProducer(serviceProducer);
        MantaAgent mantaAgent = MantaAgent.getInstance();
        if (getProducersByAgentId(mantaAgent.getAgentName()).size() == 0) {
            mantaAgent.getSingletonRepository().getVirtualTopicManager().removeTopic(this);
        }
    }

    public void removeDurableConsumer(ServiceConsumer serviceConsumer) {
        this.subscribers.remove(serviceConsumer.getId());
        super.removeConsumer(serviceConsumer);
        ServiceActorControlCenter.removeUpConsumer(serviceConsumer);
        MantaAgent.getInstance().getSingletonRepository().getPostOffice().closeBox(serviceConsumer);
    }
}
