package cgl.narada.service.reliable.impl;

import cgl.narada.event.EventHeaders;
import cgl.narada.event.EventID;
import cgl.narada.event.NBEvent;
import cgl.narada.event.TemplateInfo;
import cgl.narada.event.impl.NBEventGenerator;
import cgl.narada.matching.EntityProfileMatchingTrees;
import cgl.narada.matching.Profile;
import cgl.narada.matching.string.StringProfile;
import cgl.narada.service.ServiceException;
import cgl.narada.service.client.ClientService;
import cgl.narada.service.client.EventConsumer;
import cgl.narada.service.client.EventProducer;
import cgl.narada.service.client.NBEventListener;
import cgl.narada.service.client.SessionService;
import cgl.narada.service.reliable.events.RdAckInvoiceEntityEvent;
import cgl.narada.service.reliable.events.RdAckResponseServiceInvoiceEvent;
import cgl.narada.service.reliable.events.RdArchivalServiceNotification;
import cgl.narada.service.reliable.events.RdCompanionEntityEvent;
import cgl.narada.service.reliable.events.RdNakInvoiceEntityEvent;
import cgl.narada.service.reliable.events.RdProfileUpdateRequest;
import cgl.narada.service.reliable.events.RdProfileUpdateResponse;
import cgl.narada.service.reliable.events.RdPublisherRecoveryInfo;
import cgl.narada.service.reliable.events.RdRecoveryEntityRequest;
import cgl.narada.service.reliable.events.RdRecoveryResponse;
import cgl.narada.service.reliable.events.RdRepublishedEntityEvent;
import cgl.narada.service.reliable.events.RdRetransmissionServiceEvent;
import cgl.narada.service.reliable.events.RdSubscriberRecoveryInfo;
import cgl.narada.service.replay.ReplayService;
import cgl.narada.service.storage.InventoryEvent;
import cgl.narada.service.storage.SequenceDestinations;
import cgl.narada.service.storage.StorageService;
import cgl.narada.service.storage.StorageServiceFactory;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Properties;
import java.util.Vector;

/* loaded from: input_file:cgl/narada/service/reliable/impl/ReliableDeliveryServiceImpl.class */
public class ReliableDeliveryServiceImpl extends Thread implements NBEventListener {
    private Hashtable entities;
    private Hashtable templatesAndEntities;
    private Hashtable templates;
    private Hashtable profiles;
    private Hashtable engines;
    private EntityProfileMatchingTrees entityMatchingTrees;
    private StorageService storageService;
    private int rdsEntityId;
    private RdCommunicationsMultiplexerImpl rdsMultiplexer;
    private RdsEventPercolator rdsPercolator;
    private ClientService clientService;
    private EventConsumer eventConsumer;
    private EventProducer eventProducer;
    private ReplayService replayService;
    private Hashtable sequencesLastAssigned;
    private Object syncObject;
    private Hashtable junk;
    private String moduleName = "ReliableDeliveryServiceImpl: ";
    private String rdsString = "ReliableDeliveryService/";
    private long sequenceNumber = 0;
    private boolean recovering = true;
    private boolean debug = false;

    public ReliableDeliveryServiceImpl(int i, StorageService storageService, Properties properties, String str) throws ServiceException {
        this.storageService = null;
        this.rdsEntityId = i;
        this.storageService = storageService;
        initializeProducerConsumer(i, properties, str);
        setPriority(10);
        this.rdsMultiplexer = new RdCommunicationsMultiplexerImpl();
        this.rdsMultiplexer.registerReliableDeliveryService(this);
        this.entities = new Hashtable();
        this.templates = new Hashtable();
        this.templatesAndEntities = new Hashtable();
        this.profiles = new Hashtable();
        this.sequencesLastAssigned = new Hashtable();
        this.entityMatchingTrees = new EntityProfileMatchingTrees();
        this.rdsPercolator = new RdsEventPercolator(this, storageService);
        this.syncObject = new Object();
        recoverFromFailure();
        this.replayService = ReplayService.getInstance();
        this.replayService.initialize(i, storageService, this.templatesAndEntities);
        this.junk = new Hashtable();
    }

    public TemplateInfo createTemplateInfo(int i, int i2, Object obj) throws ServiceException {
        return this.clientService.createTemplateInfo(i, i2, obj);
    }

    public int getEntityId() {
        return this.rdsEntityId;
    }

    private void initializeProducerConsumer(int i, Properties properties, String str) throws ServiceException {
        this.clientService = SessionService.getClientService(i);
        this.clientService.initializeBrokerCommunications(properties, str);
        this.eventConsumer = this.clientService.createEventConsumer(this);
        this.eventProducer = this.clientService.createEventProducer();
    }

    public void registerEntity(int i) throws ServiceException {
        Integer num = new Integer(i);
        if (this.entities.containsKey(num)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [").append(i).append("] already registered").toString());
        }
        this.entities.put(num, num);
        if (this.recovering) {
            return;
        }
        this.storageService.storeRegisteredEntity(i);
    }

    public void deRegisterEntity(int i) throws ServiceException {
        Integer num = new Integer(i);
        if (!this.entities.containsKey(num)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [").append(i).append("] not registered with Service").toString());
        }
        Enumeration elements = this.templatesAndEntities.elements();
        while (elements.hasMoreElements()) {
            Hashtable hashtable = (Hashtable) elements.nextElement();
            if (hashtable.containsKey(num)) {
                hashtable.remove(num);
            }
        }
        this.storageService.removeRegisteredEntity(i);
    }

    public void registerEntityForTemplate(int i, int i2) throws ServiceException {
        Integer num = new Integer(i);
        Integer num2 = new Integer(i2);
        if (!this.entities.containsKey(num)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [").append(i).append("] not registered with Service").toString());
        }
        if (!this.templates.containsKey(num2)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Unknown template ").append(num2).toString());
        }
        Hashtable hashtable = (Hashtable) this.templatesAndEntities.get(num2);
        if (hashtable.containsKey(num)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [").append(i).append("] already registered with template [").append(i2).append("]").toString());
        }
        hashtable.put(num, num);
        if (this.recovering) {
            return;
        }
        this.storageService.storeEntityForTemplate(i, i2);
    }

    public void deregisterEntityFromTemplate(int i, int i2) throws ServiceException {
        Integer num = new Integer(i);
        Integer num2 = new Integer(i2);
        if (!this.entities.containsKey(num)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [").append(i).append("] not registered with Service").toString());
        }
        if (!this.templates.containsKey(num2)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Unknown template ").append(num2).toString());
        }
        Hashtable hashtable = (Hashtable) this.templatesAndEntities.get(num2);
        if (!hashtable.containsKey(num)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [").append(i).append("] not registered with template [").append(i2).append("]").toString());
        }
        hashtable.remove(num);
        this.storageService.removeEntityFromTemplate(i, i2);
    }

    public void addTemplateManagement(TemplateInfo templateInfo) throws ServiceException {
        int templateId = templateInfo.getTemplateId();
        Integer num = new Integer(templateId);
        if (this.templates.containsKey(num)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Template [").append(templateInfo).append("] Previously registered!").toString());
        }
        this.templates.put(num, templateInfo);
        this.templatesAndEntities.put(num, new Hashtable());
        if (!this.recovering) {
            this.storageService.storeTemplate(templateInfo);
        }
        this.sequencesLastAssigned.put(num, new Long(0L));
        registerSubscriptionsForTemplate(templateId);
        if (this.recovering) {
            return;
        }
        this.replayService.registerSubscriptionsForTemplate(templateId);
    }

    public void removeTemplateManagement(int i) throws ServiceException {
        Integer num = new Integer(i);
        if (!this.templates.containsKey(num)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Template [").append(i).append("] NOT Previously registered!").toString());
        }
        this.templates.remove(num);
        this.templatesAndEntities.remove(num);
        this.storageService.removeTemplateManagement(i);
        this.replayService.deregisterSubscriptionsForTemplate(i);
    }

    public TemplateInfo[] getListOfManagedTemplates() {
        return (TemplateInfo[]) this.templates.values().toArray(new TemplateInfo[0]);
    }

    private boolean isRegistered(int i, int i2) throws ServiceException {
        Integer num = new Integer(i);
        Integer num2 = new Integer(i2);
        if (!this.templates.containsKey(num2)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Template [").append(i2).append("] NOT Previously registered!").toString());
        }
        if (this.entities.containsKey(num)) {
            return ((Hashtable) this.templatesAndEntities.get(num2)).containsKey(num);
        }
        throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [").append(i).append("] not registered with Service").toString());
    }

    public void processProfileUpdateRequest(RdProfileUpdateRequest rdProfileUpdateRequest) throws ServiceException {
        rdProfileUpdateRequest.getRequestId();
        Profile profile = rdProfileUpdateRequest.getProfile();
        boolean isAddProfile = rdProfileUpdateRequest.isAddProfile();
        int templateId = rdProfileUpdateRequest.getTemplateId();
        int entityId = rdProfileUpdateRequest.getEntityId();
        String str = "";
        boolean z = true;
        try {
            if (isAddProfile) {
                addProfile(templateId, profile);
                str = new StringBuffer().append(str).append("Addition of profile successful").toString();
            } else {
                removeProfile(templateId, profile);
                str = new StringBuffer().append(str).append("Removal of profile successful").toString();
            }
        } catch (ServiceException e) {
            z = false;
            str = new StringBuffer().append(str).append(e.toString()).toString();
        }
        NBEvent generateEvent = this.eventProducer.generateEvent(1, new StringBuffer().append(entityId).append("/ProfileManagement").toString(), new RdProfileUpdateResponse(rdProfileUpdateRequest, z, str).getBytes());
        NBEventGenerator.setEventType(generateEvent, 1);
        this.eventProducer.publishEvent(generateEvent);
    }

    public synchronized void addProfile(int i, Profile profile) throws ServiceException {
        Integer num = new Integer(i);
        int destination = profile.getDestination();
        if (!isRegistered(destination, i)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [ ").append(destination).append("] not registered to template ").append(i).toString());
        }
        TemplateInfo templateInfo = (TemplateInfo) this.templates.get(num);
        if (templateInfo.getTemplateType() != profile.getProfileType()) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Template Type[").append(templateInfo.getTemplateType()).append(" NOT ").append("the same as").append(profile.getProfileType()).toString());
        }
        String profileId = profile.getProfileId();
        if (this.profiles.containsKey(profileId)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Profile [").append(profile).append("] was previously registered!").toString());
        }
        manageProfile(profile, true);
        this.profiles.put(profileId, profile);
        if (this.recovering) {
            return;
        }
        this.storageService.storeProfile(i, profile);
    }

    private void manageProfile(Profile profile, boolean z) throws ServiceException {
        this.entityMatchingTrees.manageSubscriptionProfile(profile, z);
    }

    public void removeProfile(int i, Profile profile) throws ServiceException {
        Integer num = new Integer(i);
        int destination = profile.getDestination();
        if (!isRegistered(destination, i)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Entity [ ").append(destination).append("] not registered to template ").append(i).toString());
        }
        TemplateInfo templateInfo = (TemplateInfo) this.templates.get(num);
        if (templateInfo.getTemplateType() != profile.getProfileType()) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Template Type[").append(templateInfo.getTemplateType()).append(" NOT ").append("the same as").append(profile.getProfileType()).toString());
        }
        String profileId = profile.getProfileId();
        if (!this.profiles.containsKey(profileId)) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Profile [").append(profile).append("] was not previously registered!").toString());
        }
        manageProfile(profile, false);
        this.profiles.remove(profileId);
        this.storageService.removeProfile(i, profile);
    }

    public void processAckInvoiceEvent(RdAckInvoiceEntityEvent rdAckInvoiceEntityEvent) throws ServiceException {
        if (this.debug) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Received ACK Invoice event ").append(rdAckInvoiceEntityEvent).toString());
        }
        boolean containsSingleAck = rdAckInvoiceEntityEvent.containsSingleAck();
        int entityId = rdAckInvoiceEntityEvent.getEntityId();
        int templateId = rdAckInvoiceEntityEvent.getTemplateId();
        long j = 0;
        long[] jArr = null;
        long syncpoint = this.storageService.getSyncpoint(templateId, entityId);
        if (containsSingleAck) {
            j = rdAckInvoiceEntityEvent.getAckSequenceNumber();
            this.storageService.processAcknowledgement(j, entityId);
        } else {
            jArr = rdAckInvoiceEntityEvent.getAckSequenceNumbers();
            for (int i = 0; i < jArr.length; i++) {
                if (j < jArr[i]) {
                    j = jArr[i];
                }
                if (syncpoint < jArr[i]) {
                    this.storageService.processAcknowledgement(jArr[i], entityId);
                }
            }
        }
        if (this.debug) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Retrieving missed sequences for ").append("template [").append(templateId).append("] between sequence num [").append(syncpoint).append("] and [").append(j).append("]").toString());
        }
        long[] retrieveMissedSequences = this.storageService.retrieveMissedSequences(syncpoint, j, templateId, entityId);
        long j2 = syncpoint;
        if (retrieveMissedSequences == null) {
            j2 = j;
        } else if (!containsSingleAck) {
            Arrays.sort(retrieveMissedSequences);
            Arrays.sort(jArr);
            long j3 = retrieveMissedSequences[0];
            for (int i2 = 0; i2 < jArr.length; i2++) {
                if (jArr[i2] < j3) {
                    j2 = jArr[i2];
                }
            }
        }
        if (syncpoint != j2) {
            this.storageService.advanceSyncpoint(j2, templateId, entityId);
        }
        RdAckResponseServiceInvoiceEvent rdAckResponseServiceInvoiceEvent = new RdAckResponseServiceInvoiceEvent(templateId, entityId, j2);
        if (this.debug) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Advance SYNC for entityId=").append(entityId).append(" to => ").append(j2).toString());
        }
        if (retrieveMissedSequences != null) {
            if (retrieveMissedSequences.length == 1) {
                rdAckResponseServiceInvoiceEvent.setMissedSequence(retrieveMissedSequences[0]);
            } else {
                rdAckResponseServiceInvoiceEvent.setMissedSequences(retrieveMissedSequences);
            }
        }
        issueAckResponse(rdAckResponseServiceInvoiceEvent);
    }

    public void processNakInvoiceEvent(RdNakInvoiceEntityEvent rdNakInvoiceEntityEvent) throws ServiceException {
        boolean containsSingleNak = rdNakInvoiceEntityEvent.containsSingleNak();
        int entityId = rdNakInvoiceEntityEvent.getEntityId();
        rdNakInvoiceEntityEvent.getTemplateId();
        long retransmissionId = rdNakInvoiceEntityEvent.getRetransmissionId();
        if (containsSingleNak) {
            manageRetransmissionFor(entityId, rdNakInvoiceEntityEvent.getNakSequenceNumber(), retransmissionId);
            return;
        }
        for (long j : rdNakInvoiceEntityEvent.getNakSequenceNumbers()) {
            manageRetransmissionFor(entityId, j, retransmissionId);
        }
    }

    private void manageRetransmissionFor(int i, long j, long j2) throws ServiceException {
        controlMemoryUtilization();
        InventoryEvent storedEvent = this.storageService.getStoredEvent(j);
        int templateId = storedEvent.getTemplateId();
        long sequenceNumber = storedEvent.getSequenceNumber();
        RdRetransmissionServiceEvent rdRetransmissionServiceEvent = new RdRetransmissionServiceEvent(j2, storedEvent.getEvent(), new RdArchivalServiceNotification(templateId, storedEvent.getEvent().getEventHeaders().getEventId(), sequenceNumber, storedEvent.getPreviousSequenceNumber()));
        System.out.println(new StringBuffer().append(this.moduleName).append("Issuing nakResponse to entity=").append(i).append(", for templateId=").append(templateId).append(", and sequenceNumber = ").append(sequenceNumber).toString());
        NBEvent generateEvent = this.eventProducer.generateEvent(1, new StringBuffer().append(i).append("/Invoice").toString(), rdRetransmissionServiceEvent.getBytes());
        NBEventGenerator.setEventType(generateEvent, 1);
        this.eventProducer.publishEvent(generateEvent);
    }

    private void controlMemoryUtilization() {
        long j = Runtime.getRuntime().totalMemory();
        if (j > 50000000 / 3) {
            System.gc();
        }
        while (j > 50000000) {
            try {
                System.out.println(new StringBuffer().append(this.moduleName).append("Will yield()->").append(Thread.currentThread()).append(" to control Memory Utilization of [").append(j).append("] bytes").toString());
                Thread.yield();
                Thread.sleep(30L);
                j = Runtime.getRuntime().totalMemory();
            } catch (InterruptedException e) {
                System.out.println(new StringBuffer().append(this.moduleName).append("Error while sleeping to control ").append("memory utilization.").toString());
            }
        }
    }

    private void printMemoryUtilization() {
        System.out.println(new StringBuffer().append(this.moduleName).append("Memory Utilization = ").append(Runtime.getRuntime().totalMemory() * 1.0E-6d).append(" MB").toString());
    }

    public void processPublishedEvent(NBEvent nBEvent) {
        if (this.debug) {
            this.junk.put(nBEvent.getEventHeaders().getEventId(), new Long(System.currentTimeMillis()));
        }
        this.rdsPercolator.processPublishedEvent(nBEvent);
    }

    public void processCompanionEvent(RdCompanionEntityEvent rdCompanionEntityEvent) {
        Long l;
        if (this.debug && (l = (Long) this.junk.get(rdCompanionEntityEvent.getEventId())) != null) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Companion arrived after = ").append(System.currentTimeMillis() - l.longValue()).toString());
        }
        this.rdsPercolator.processCompanionEvent(rdCompanionEntityEvent);
    }

    public void processRepublishedEvent(RdRepublishedEntityEvent rdRepublishedEntityEvent) {
        this.rdsPercolator.processRepublishedEvent(rdRepublishedEntityEvent);
    }

    public void processEntityRecoveryRequest(RdRecoveryEntityRequest rdRecoveryEntityRequest) throws ServiceException {
        System.out.println(new StringBuffer().append(this.moduleName).append("Received ").append(rdRecoveryEntityRequest).toString());
        int templateId = rdRecoveryEntityRequest.getTemplateId();
        int entityId = rdRecoveryEntityRequest.getEntityId();
        long recoveryRequestId = rdRecoveryEntityRequest.getRecoveryRequestId();
        issueRecoveryResponse(rdRecoveryEntityRequest.isPublisherRecovery() ? new RdRecoveryResponse(templateId, entityId, recoveryRequestId, new RdPublisherRecoveryInfo(templateId, entityId, this.storageService.getCatenation(templateId, entityId))) : new RdRecoveryResponse(templateId, entityId, recoveryRequestId, constructSubscriberRecoveryInfo(templateId, entityId)));
    }

    private RdSubscriberRecoveryInfo constructSubscriberRecoveryInfo(int i, int i2) throws ServiceException {
        long currentTimeMillis = System.currentTimeMillis();
        long syncpoint = this.storageService.getSyncpoint(i, i2);
        System.out.println(new StringBuffer().append(this.moduleName).append("Retrieved entitySync point in (").append(System.currentTimeMillis() - currentTimeMillis).append(") mSecs").toString());
        long j = this.sequenceNumber;
        long currentTimeMillis2 = System.currentTimeMillis();
        long[] retrieveMissedSequences = this.storageService.retrieveMissedSequences(syncpoint, j, i, i2);
        long j2 = syncpoint;
        System.out.println(new StringBuffer().append(this.moduleName).append("Retrieved missed sequences in (").append(System.currentTimeMillis() - currentTimeMillis2).append(") mSecs").toString());
        if (retrieveMissedSequences == null) {
            j2 = j;
        }
        if (syncpoint != j2) {
            this.storageService.advanceSyncpoint(j2, i, i2);
        }
        RdAckResponseServiceInvoiceEvent rdAckResponseServiceInvoiceEvent = new RdAckResponseServiceInvoiceEvent(i, i2, j2);
        if (retrieveMissedSequences != null) {
            if (retrieveMissedSequences.length == 1) {
                rdAckResponseServiceInvoiceEvent.setMissedSequence(retrieveMissedSequences[0]);
            } else {
                rdAckResponseServiceInvoiceEvent.setMissedSequences(retrieveMissedSequences);
            }
        }
        Profile[] profileArr = null;
        Vector vector = new Vector();
        Enumeration elements = this.profiles.elements();
        while (elements.hasMoreElements()) {
            Profile profile = (Profile) elements.nextElement();
            if (profile.getDestination() == i2) {
                vector.addElement(profile);
            }
        }
        if (vector.size() != 0) {
            profileArr = (Profile[]) vector.toArray(new Profile[0]);
        }
        RdSubscriberRecoveryInfo rdSubscriberRecoveryInfo = new RdSubscriberRecoveryInfo(i, i2, rdAckResponseServiceInvoiceEvent, profileArr);
        System.out.println(new StringBuffer().append(this.moduleName).append("Created ").append(rdSubscriberRecoveryInfo).toString());
        return rdSubscriberRecoveryInfo;
    }

    public void processDiscoveryRequest() {
    }

    @Override // cgl.narada.service.client.NBEventListener
    public void onEvent(NBEvent nBEvent) {
        int eventType = nBEvent.getEventType();
        if (eventType == 0) {
            if (this.debug) {
                System.out.println(new StringBuffer().append(this.moduleName).append("Received NORMAL event \n\n").toString());
            }
            processPublishedEvent(nBEvent);
        } else if (eventType == 1) {
            this.rdsMultiplexer.manageReliableDeliveryExchange(nBEvent);
        } else {
            System.out.println(new StringBuffer().append(this.moduleName).append("Unknown event type(").append(eventType).append(") received.").toString());
        }
    }

    public void registerSubscriptionsForTemplate(int i) throws ServiceException {
        System.out.println(new StringBuffer().append(this.moduleName).append("Registering subscriptions for template [").append(i).append("]").toString());
        Integer num = new Integer(i);
        TemplateInfo templateInfo = (TemplateInfo) this.templates.get(num);
        if (templateInfo == null) {
            throw new ServiceException(new StringBuffer().append(this.moduleName).append("Unknown templateId [").append(num).append("]").toString());
        }
        Object template = templateInfo.getTemplate();
        int templateType = templateInfo.getTemplateType();
        String stringBuffer = new StringBuffer().append(this.rdsString).append(i).toString();
        this.eventConsumer.subscribeTo(this.clientService.createProfile(1, stringBuffer));
        Profile profile = null;
        if (templateType == 1 || templateType == 4 || templateType == 3 || templateType == 5) {
            profile = this.clientService.createProfile(templateType, (String) template);
        }
        if (templateType == 2) {
            profile = this.clientService.createProfile(templateType, new Integer(Integer.parseInt((String) template)));
        }
        if (profile != null) {
            this.eventConsumer.subscribeTo(profile);
        }
        Profile createProfile = this.clientService.createProfile(1, new StringBuffer().append(stringBuffer).append("/Invoice").toString());
        Profile createProfile2 = this.clientService.createProfile(1, new StringBuffer().append(stringBuffer).append("/Recovery").toString());
        Profile createProfile3 = this.clientService.createProfile(1, new StringBuffer().append(stringBuffer).append("/ProfileManagement").toString());
        this.eventConsumer.subscribeTo(createProfile);
        this.eventConsumer.subscribeTo(createProfile2);
        this.eventConsumer.subscribeTo(createProfile3);
    }

    public void deregisterSubscriptionsForTemplate(int i) throws ServiceException {
    }

    public void eventReadyForStorage() {
        synchronized (this.syncObject) {
            this.syncObject.notify();
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            try {
                if (this.rdsPercolator.hasEventsAvailableForStorage()) {
                    storeEventToStorage(this.rdsPercolator.getEventToStore());
                } else {
                    synchronized (this.syncObject) {
                        if (this.debug) {
                            System.out.println(new StringBuffer().append(this.moduleName).append("Waiting for events to be stored").toString());
                        }
                        this.syncObject.wait();
                        if (this.debug) {
                            System.out.println(new StringBuffer().append(this.moduleName).append("Waking up to store event(s)!!").toString());
                        }
                    }
                }
            } catch (ServiceException e) {
                System.out.println(new StringBuffer().append(this.moduleName).append(e).toString());
                return;
            } catch (InterruptedException e2) {
                System.out.println(new StringBuffer().append(this.moduleName).append(e2).toString());
                return;
            }
        }
    }

    public void storeEventToStorage(NBEvent nBEvent) throws ServiceException {
        Hashtable computeEventDestinations = this.entityMatchingTrees.computeEventDestinations(nBEvent);
        EventHeaders eventHeaders = nBEvent.getEventHeaders();
        int templateId = eventHeaders.getTemplateId();
        EventID eventId = eventHeaders.getEventId();
        Integer num = new Integer(templateId);
        long longValue = ((Long) this.sequencesLastAssigned.get(num)).longValue();
        this.sequenceNumber++;
        InventoryEvent inventoryEvent = new InventoryEvent(this.sequenceNumber, longValue, templateId, eventId, nBEvent);
        int[] iArr = null;
        if (computeEventDestinations != null) {
            iArr = new int[computeEventDestinations.size()];
            Enumeration keys = computeEventDestinations.keys();
            while (keys.hasMoreElements()) {
                iArr[0] = ((Integer) keys.nextElement()).intValue();
            }
        }
        if (iArr == null) {
            iArr = new int[]{0};
        }
        SequenceDestinations sequenceDestinations = new SequenceDestinations(this.sequenceNumber, iArr);
        if (this.debug) {
            System.out.println(new StringBuffer().append("\n\n").append(this.moduleName).append("Preparing to Store inventory/sequence destinations").append(inventoryEvent).toString());
        }
        this.storageService.store(inventoryEvent, sequenceDestinations);
        this.sequencesLastAssigned.remove(num);
        this.sequencesLastAssigned.put(num, new Long(this.sequenceNumber));
        this.rdsPercolator.yankEventOffReleaseBuffers(nBEvent);
        issueArchivalNotification(nBEvent, iArr, new RdArchivalServiceNotification(templateId, eventId, this.sequenceNumber, longValue));
    }

    private void issueArchivalNotification(NBEvent nBEvent, int[] iArr, RdArchivalServiceNotification rdArchivalServiceNotification) throws ServiceException {
        if (this.debug) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Issuing archival notification ").append("with sequence number = ").append(rdArchivalServiceNotification.getSequenceNumber()).toString());
        }
        Long l = (Long) this.junk.get(nBEvent.getEventHeaders().getEventId());
        if (l != null) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Total arch delay = ").append(System.currentTimeMillis() - l.longValue()).toString());
        }
        this.eventProducer.setSuppressRedistributionToSource(true);
        NBEvent generateEvent = this.eventProducer.generateEvent(nBEvent.getContentSynopsisType(), nBEvent.getContentSynopsis(), rdArchivalServiceNotification.getBytes());
        NBEventGenerator.setEventType(generateEvent, 1);
        this.eventProducer.publishEvent(generateEvent);
    }

    private void issueAckResponse(RdAckResponseServiceInvoiceEvent rdAckResponseServiceInvoiceEvent) throws ServiceException {
        int entityId = rdAckResponseServiceInvoiceEvent.getEntityId();
        rdAckResponseServiceInvoiceEvent.getTemplateId();
        if (this.debug) {
            System.out.println(new StringBuffer().append(this.moduleName).append("Issuing ->").append(rdAckResponseServiceInvoiceEvent).toString());
        }
        NBEvent generateEvent = this.eventProducer.generateEvent(1, new StringBuffer().append(entityId).append("/Invoice").toString(), rdAckResponseServiceInvoiceEvent.getBytes());
        NBEventGenerator.setEventType(generateEvent, 1);
        this.eventProducer.publishEvent(generateEvent);
    }

    private void issueRecoveryResponse(RdRecoveryResponse rdRecoveryResponse) throws ServiceException {
        int entityId = rdRecoveryResponse.getEntityId();
        int templateId = rdRecoveryResponse.getTemplateId();
        String stringBuffer = rdRecoveryResponse.hasPublisherRecoveryInfo() ? new StringBuffer().append(entityId).append("/Recovery/Publisher").toString() : new StringBuffer().append(entityId).append("/Recovery/Subscriber").toString();
        System.out.println(new StringBuffer().append(this.moduleName).append("Issuing Recovery Response to entity=").append(entityId).append(", for templateId=").append(templateId).toString());
        NBEvent generateEvent = this.eventProducer.generateEvent(1, stringBuffer, rdRecoveryResponse.getBytes());
        NBEventGenerator.setEventType(generateEvent, 1);
        this.eventProducer.publishEvent(generateEvent);
    }

    public void recoverFromFailure() throws ServiceException {
        int[] listOfRegisteredEntities = this.storageService.getListOfRegisteredEntities();
        if (listOfRegisteredEntities == null) {
            System.out.println(new StringBuffer().append(this.moduleName).append("No entities were previously registered").append(" initialization using storage service complete ").toString());
            this.recovering = false;
            return;
        }
        for (int i : listOfRegisteredEntities) {
            registerEntity(i);
        }
        TemplateInfo[] listOfManagedTemplates = this.storageService.getListOfManagedTemplates();
        if (listOfManagedTemplates == null) {
            System.out.println(new StringBuffer().append(this.moduleName).append("No templates were previously registered").append(" initialization using storage service complete ").toString());
            this.recovering = false;
            return;
        }
        if (listOfManagedTemplates.length == 0) {
            System.out.println(new StringBuffer().append(this.moduleName).append("No templates were previously registered").append(" initialization using storage service complete ").toString());
            this.recovering = false;
            return;
        }
        this.sequenceNumber = this.storageService.getSequenceNumberLastAssigned();
        System.out.println(new StringBuffer().append(this.moduleName).append("SequenceNumberLastAssigned=").append(this.sequenceNumber).toString());
        int length = listOfManagedTemplates.length;
        for (int i2 = 0; i2 < length; i2++) {
            int templateId = listOfManagedTemplates[i2].getTemplateId();
            System.out.println(new StringBuffer().append(this.moduleName).append("Recovering for TemplateID [").append(templateId).append("]").toString());
            Integer num = new Integer(templateId);
            addTemplateManagement(listOfManagedTemplates[i2]);
            this.sequencesLastAssigned.put(num, new Long(this.storageService.getSequenceNumberLastAssigned(templateId)));
            processEntitiesRegisteredToTemplate(templateId);
            processProfilesRegisteredToTemplate(templateId);
        }
        System.out.println(new StringBuffer().append("\n\n\n").append(this.moduleName).append("Recovery from Failure ").append("(If, indeed, there was one) complete \n\n").toString());
        this.recovering = false;
    }

    private void processEntitiesRegisteredToTemplate(int i) throws ServiceException {
        int[] listOfRegisteredEntities = this.storageService.getListOfRegisteredEntities(i);
        if (listOfRegisteredEntities == null) {
            System.out.println(new StringBuffer().append(this.moduleName).append("No entities registered for template [").append(i).append("]").toString());
            return;
        }
        for (int i2 : listOfRegisteredEntities) {
            registerEntityForTemplate(i2, i);
        }
    }

    private void processProfilesRegisteredToTemplate(int i) throws ServiceException {
        Profile[] listOfRegisteredProfiles = this.storageService.getListOfRegisteredProfiles(i);
        if (listOfRegisteredProfiles == null) {
            System.out.println(new StringBuffer().append(this.moduleName).append("No profiles registered for template [").append(i).append("]").toString());
            return;
        }
        for (Profile profile : listOfRegisteredProfiles) {
            addProfile(i, profile);
        }
    }

    public static void main(String[] strArr) {
        Properties properties = new Properties();
        properties.put("hostname", strArr[0]);
        properties.put("portnum", strArr[1]);
        try {
            ReliableDeliveryServiceImpl reliableDeliveryServiceImpl = new ReliableDeliveryServiceImpl(12345678, StorageServiceFactory.getStorageService("db", new Properties()), properties, "niotcp");
            reliableDeliveryServiceImpl.start();
            System.out.println(new StringBuffer().append("ReliableDeliveryServiceImpl.main() ->").append("REGISTERING Subscription ").toString());
            TemplateInfo createTemplateInfo = reliableDeliveryServiceImpl.createTemplateInfo(12345, 1, "Movie/Casablanca");
            if (strArr.length == 3) {
                reliableDeliveryServiceImpl.addTemplateManagement(createTemplateInfo);
                reliableDeliveryServiceImpl.registerEntity(7777);
                reliableDeliveryServiceImpl.registerEntity(7007);
                reliableDeliveryServiceImpl.registerEntityForTemplate(7777, 12345);
                reliableDeliveryServiceImpl.registerEntityForTemplate(7007, 12345);
                System.out.println(new StringBuffer().append("ReliableDeliveryServiceImpl.main() ->").append("Registering ").append(createTemplateInfo).append("\n ").append("Entities [7777] and [7007]").toString());
                new StringProfile("Movie/Casablanca", 7777, "11235Casab");
            }
        } catch (ServiceException e) {
            System.out.println(e);
        }
    }
}
