package cgl.narada.service.reliable.impl;

import cgl.narada.event.EventID;
import cgl.narada.event.NBEvent;
import cgl.narada.event.impl.NBEventGenerator;
import cgl.narada.matching.Profile;
import cgl.narada.service.ServiceException;
import cgl.narada.service.client.ClientService;
import cgl.narada.service.client.EventConsumer;
import cgl.narada.service.client.EventProducer;
import cgl.narada.service.client.NBEventListener;
import cgl.narada.service.client.SessionService;
import cgl.narada.service.qos.QosEvent;
import cgl.narada.service.reliable.events.RdAckInvoiceEntityEvent;
import cgl.narada.service.reliable.events.RdAckResponseServiceInvoiceEvent;
import cgl.narada.service.reliable.events.RdArchivalServiceNotification;
import cgl.narada.service.reliable.events.RdNakInvoiceEntityEvent;
import cgl.narada.service.reliable.events.RdProfileUpdateRequest;
import cgl.narada.service.reliable.events.RdProfileUpdateResponse;
import cgl.narada.service.reliable.events.RdRecoveryEntityRequest;
import cgl.narada.service.reliable.events.RdRecoveryResponse;
import cgl.narada.service.reliable.events.RdRetransmissionServiceEvent;
import java.util.Hashtable;
import java.util.Vector;

/* loaded from: input_file:cgl/narada/service/reliable/impl/SubscriberPercolator.class */
public class SubscriberPercolator implements NBEventListener, RdDebugFlags {
    private int entityId;
    private RdEntityActuatorImpl entityActuator;
    private ClientService clientService;
    private EventProducer producer;
    private EventConsumer consumer;
    private InvoicerThread invoicer;
    private String moduleName = "SubscriberPercolator: ";
    private String rdsString = "ReliableDeliveryService/";
    private Hashtable receivedEvents = new Hashtable();
    private Hashtable templatesAndSyncs = new Hashtable();
    private Hashtable templatesAndAcks = new Hashtable();
    private Hashtable suppressAcksDuringRecovery = new Hashtable();
    private RdCommunicationsMultiplexerImpl rdsMultiplexer = new RdCommunicationsMultiplexerImpl();

    public SubscriberPercolator(int i, RdEntityActuatorImpl rdEntityActuatorImpl) throws ServiceException {
        this.entityId = i;
        this.entityActuator = rdEntityActuatorImpl;
        this.rdsMultiplexer.registerSubscriberPercolator(this);
        initializeSubscriptions();
        this.invoicer = new InvoicerThread(i, this.producer);
        this.invoicer.start();
    }

    private void initializeSubscriptions() throws ServiceException {
        this.clientService = SessionService.getClientService(this.entityId);
        if (!this.clientService.isInitialized()) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Client service for entity=").append(this.entityId).append(" should have been previously initialized").toString());
        }
        Profile createProfile = this.clientService.createProfile(1, new StringBuffer().append(this.entityId).append("/Invoice").toString());
        this.consumer = this.clientService.createEventConsumer(this);
        this.consumer.subscribeTo(createProfile);
        this.consumer.subscribeTo(this.clientService.createProfile(1, new StringBuffer().append(this.entityId).append("/ProfileManagement").toString()));
        this.producer = this.clientService.createEventProducer();
    }

    public void terminateService() throws ServiceException {
        this.invoicer.terminateService();
    }

    public void issueRecoveryRequest(RdRecoveryEntityRequest rdRecoveryEntityRequest) throws ServiceException {
        rdRecoveryEntityRequest.getTemplateId();
        this.consumer.subscribeTo(this.clientService.createProfile(1, new StringBuffer().append(this.entityId).append("/Recovery/Subscriber").toString()));
        System.out.println(new StringBuffer().append(this.moduleName).append("Will issue recovery request with ").append("recovery identifier = ").append(rdRecoveryEntityRequest.getRecoveryRequestId()).append("\n\n\n").toString());
        this.invoicer.manageRecoveryRequest(rdRecoveryEntityRequest);
        this.invoicer.issueRecoveryRequest(rdRecoveryEntityRequest);
    }

    public void processAckResponseEvent(RdAckResponseServiceInvoiceEvent rdAckResponseServiceInvoiceEvent) throws ServiceException {
        int templateId = rdAckResponseServiceInvoiceEvent.getTemplateId();
        long syncpoint = rdAckResponseServiceInvoiceEvent.getSyncpoint();
        long longValue = ((Long) this.templatesAndSyncs.get(new Integer(templateId))).longValue();
        if (longValue > syncpoint) {
            return;
        }
        if (longValue < syncpoint) {
            processSyncAdvance(templateId, syncpoint);
        }
        if (rdAckResponseServiceInvoiceEvent.containsMissedSequenceInfo()) {
            if (rdAckResponseServiceInvoiceEvent.containsSingleSequenceInfo()) {
                this.invoicer.processMissedSequence(templateId, rdAckResponseServiceInvoiceEvent.getMissedSequence());
            } else {
                this.invoicer.processMissedSequences(templateId, rdAckResponseServiceInvoiceEvent.getMissedSequences());
            }
        }
    }

    private void processSyncAdvance(int i, long j) {
        Integer num = new Integer(i);
        this.templatesAndSyncs.remove(num);
        this.templatesAndSyncs.put(num, new Long(j));
        Vector vector = (Vector) this.templatesAndAcks.get(num);
        int i2 = 0;
        while (i2 < vector.size()) {
            if (((Long) vector.elementAt(i2)).longValue() <= j) {
                vector.removeElementAt(i2);
                i2--;
            }
            i2++;
        }
        this.invoicer.processSyncAdvance(i, j);
        this.entityActuator.syncPointHasBeenAdvanced(this.entityId, i, j);
    }

    public void processArchivalNotification(RdArchivalServiceNotification rdArchivalServiceNotification) throws ServiceException {
        EventID eventId = rdArchivalServiceNotification.getEventId();
        int templateId = rdArchivalServiceNotification.getTemplateId();
        Integer num = new Integer(templateId);
        long sequenceNumber = rdArchivalServiceNotification.getSequenceNumber();
        if (!this.templatesAndSyncs.containsKey(num)) {
            this.templatesAndSyncs.put(num, new Long(0L));
        }
        if (!this.templatesAndAcks.containsKey(num)) {
            this.templatesAndAcks.put(num, new Vector());
        }
        if (notificationToBeDiscarded(rdArchivalServiceNotification)) {
            if (this.receivedEvents.containsKey(eventId)) {
                this.receivedEvents.remove(eventId);
            }
        } else {
            if (!this.receivedEvents.containsKey(eventId)) {
                this.invoicer.processMissedSequence(templateId, sequenceNumber);
                return;
            }
            Vector vector = (Vector) this.templatesAndAcks.get(num);
            Long l = new Long(sequenceNumber);
            if (vector.contains(l)) {
                System.out.println(new StringBuffer().append(this.moduleName).append(l).append("Should NOT have been").append(" present in the ackSet ").toString());
            } else {
                vector.add(l);
            }
            this.entityActuator.qosEventAvailableForRelease((QosEvent) this.receivedEvents.remove(eventId), rdArchivalServiceNotification);
            issueAcknowledgement(templateId);
        }
    }

    private boolean notificationToBeDiscarded(RdArchivalServiceNotification rdArchivalServiceNotification) throws ServiceException {
        rdArchivalServiceNotification.getEventId();
        Integer num = new Integer(rdArchivalServiceNotification.getTemplateId());
        long sequenceNumber = rdArchivalServiceNotification.getSequenceNumber();
        return ((Long) this.templatesAndSyncs.get(num)).longValue() > sequenceNumber || ((Vector) this.templatesAndAcks.get(num)).contains(new Long(sequenceNumber));
    }

    public void processRetransmittedEvent(RdRetransmissionServiceEvent rdRetransmissionServiceEvent) throws ServiceException {
        this.entityActuator.processRetransmittedEvent(rdRetransmissionServiceEvent);
        this.invoicer.processRetransmittedEvent(rdRetransmissionServiceEvent);
    }

    public void processReceivedEvent(QosEvent qosEvent) {
        EventID eventId = qosEvent.getEventId();
        if (this.receivedEvents.containsKey(eventId)) {
            return;
        }
        this.receivedEvents.put(eventId, qosEvent);
    }

    public void processRecoveryResponse(RdRecoveryResponse rdRecoveryResponse) throws ServiceException {
        System.out.println(new StringBuffer().append(this.moduleName).append("Received recovery response with SubscriberInfo").toString());
        this.invoicer.manageRecoveryResponse(rdRecoveryResponse);
        this.entityActuator.processRecoveryResponse(rdRecoveryResponse);
        Integer num = new Integer(rdRecoveryResponse.getTemplateId());
        this.templatesAndSyncs.put(num, new Long(0L));
        this.templatesAndAcks.put(num, new Vector());
        processAckResponseEvent(rdRecoveryResponse.getSubscriberRecoveryInfo().getAckResponse());
    }

    private void issueRetransmissionRequest(RdNakInvoiceEntityEvent rdNakInvoiceEntityEvent) throws ServiceException {
        NBEvent generateEvent = this.producer.generateEvent(1, new StringBuffer().append(this.rdsString).append(rdNakInvoiceEntityEvent.getTemplateId()).append("/Invoice").toString(), rdNakInvoiceEntityEvent.getBytes());
        NBEventGenerator.setEventType(generateEvent, 1);
        this.producer.publishEvent(generateEvent);
    }

    private void issueAcknowledgement(int i) throws ServiceException {
        Vector vector = (Vector) this.templatesAndAcks.get(new Integer(i));
        if (vector.size() == 0) {
            System.out.println(new StringBuffer().append(this.moduleName).append("ackSet.size() == 0, not sure if right").toString());
            return;
        }
        RdAckInvoiceEntityEvent createAckInvoiceEvent = this.invoicer.createAckInvoiceEvent(i, vector);
        if (createAckInvoiceEvent == null) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Not time yet to issue an ACK invoice.").append(" This was determined in the invoicer thread. Maybe").append(" we have just initiated recovery ...").toString());
            return;
        }
        NBEvent generateEvent = this.producer.generateEvent(1, new StringBuffer().append(this.rdsString).append(i).append("/Invoice").toString(), createAckInvoiceEvent.getBytes());
        NBEventGenerator.setEventType(generateEvent, 1);
        this.producer.publishEvent(generateEvent);
    }

    public void issueProfileUpdateRequest(RdProfileUpdateRequest rdProfileUpdateRequest) throws ServiceException {
        NBEvent generateEvent = this.producer.generateEvent(1, new StringBuffer().append(this.rdsString).append(rdProfileUpdateRequest.getTemplateId()).append("/ProfileManagement").toString(), rdProfileUpdateRequest.getBytes());
        NBEventGenerator.setEventType(generateEvent, 1);
        this.producer.publishEvent(generateEvent);
    }

    public void processProfileUpdateResponse(RdProfileUpdateResponse rdProfileUpdateResponse) throws ServiceException {
        this.entityActuator.processProfileUpdateResponse(rdProfileUpdateResponse);
    }

    @Override // cgl.narada.service.client.NBEventListener
    public void onEvent(NBEvent nBEvent) {
        int eventType = nBEvent.getEventType();
        if (eventType == 0) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Received normal event ..").append(nBEvent).append(" The SubscriberPercolator should NOT receive this").toString());
        }
        if (eventType == 1) {
            this.rdsMultiplexer.manageReliableDeliveryExchange(nBEvent);
        } else {
            System.out.println(new StringBuffer().append(this.moduleName).append("Unknown event type [").append(eventType).append("] received ").toString());
        }
    }
}
