package org.sensorhub.impl.service.sos;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import net.opengis.gml.v32.AbstractFeature;
import net.opengis.swe.v20.DataBlock;
import net.opengis.swe.v20.DataComponent;
import net.opengis.swe.v20.DataEncoding;
import org.sensorhub.api.common.Event;
import org.sensorhub.api.common.IEventListener;
import org.sensorhub.api.common.SensorHubException;
import org.sensorhub.api.data.DataEvent;
import org.sensorhub.api.data.IDataProducerModule;
import org.sensorhub.api.data.IMultiSourceDataProducer;
import org.sensorhub.api.data.IStreamingDataInterface;
import org.sensorhub.api.service.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vast.data.DataIterator;
import org.vast.ogc.def.DefinitionRef;
import org.vast.ogc.gml.FeatureRef;
import org.vast.ogc.om.IObservation;
import org.vast.ogc.om.ObservationImpl;
import org.vast.ogc.om.ProcedureRef;
import org.vast.util.TimeExtent;

/* loaded from: input_file:org/sensorhub/impl/service/sos/StreamDataProvider.class */
public abstract class StreamDataProvider implements ISOSDataProvider, IEventListener {
    private static final Logger log = LoggerFactory.getLogger(StreamDataProvider.class);
    IDataProducerModule<?> dataSource;
    long timeOut;
    long stopTime;
    boolean latestRecordOnly;
    DataEvent lastDataEvent;
    long lastQueueErrorTime = Long.MIN_VALUE;
    int nextEventRecordIndex = 0;
    List<IStreamingDataInterface> sourceOutputs = new ArrayList();
    BlockingQueue<DataEvent> eventQueue = new LinkedBlockingQueue(200);

    public StreamDataProvider(IDataProducerModule<?> iDataProducerModule, StreamDataProviderConfig streamDataProviderConfig, SOSDataFilter sOSDataFilter) throws ServiceException {
        this.dataSource = iDataProducerModule;
        this.stopTime = ((long) sOSDataFilter.getTimeRange().getStopTime()) * 1000;
        iDataProducerModule.getConfiguration();
        for (IStreamingDataInterface iStreamingDataInterface : iDataProducerModule.getAllOutputs().values()) {
            if (streamDataProviderConfig.hiddenOutputs == null || !streamDataProviderConfig.hiddenOutputs.contains(iStreamingDataInterface.getName())) {
                DataIterator dataIterator = new DataIterator(iStreamingDataInterface.getRecordDescription());
                while (true) {
                    if (!dataIterator.hasNext()) {
                        break;
                    }
                    if (sOSDataFilter.getObservables().contains(dataIterator.next().getDefinition())) {
                        this.timeOut = (long) (streamDataProviderConfig.liveDataTimeout * 1000.0d);
                        this.sourceOutputs.add(iStreamingDataInterface);
                        break;
                    }
                }
            }
        }
        for (IStreamingDataInterface iStreamingDataInterface2 : this.sourceOutputs) {
            DataBlock latestRecord = iStreamingDataInterface2.getLatestRecord();
            if (latestRecord != null) {
                this.eventQueue.offer(new DataEvent(System.currentTimeMillis(), iStreamingDataInterface2, new DataBlock[]{latestRecord}));
            }
            if (isNowTimeInstant(sOSDataFilter.getTimeRange())) {
                this.stopTime = Long.MAX_VALUE;
                this.timeOut = 0L;
                this.latestRecordOnly = true;
            } else {
                iStreamingDataInterface2.registerListener(this);
            }
        }
    }

    protected boolean isNowTimeInstant(TimeExtent timeExtent) {
        return timeExtent.isTimeInstant() && timeExtent.isBaseAtNow();
    }

    @Override // org.sensorhub.impl.service.sos.ISOSDataProvider
    public IObservation getNextObservation() throws SensorHubException {
        DataComponent nextComponent = getNextComponent();
        if (nextComponent == null) {
            return null;
        }
        double currentTimeMillis = System.currentTimeMillis() / 1000.0d;
        for (int i = 0; i < nextComponent.getComponentCount(); i++) {
            DataComponent component = nextComponent.getComponent(i);
            if (component.isSetDefinition() && component.getDefinition().equals("http://www.opengis.net/def/property/OGC/0/SamplingTime")) {
                currentTimeMillis = component.getData().getDoubleValue();
            }
        }
        TimeExtent timeExtent = new TimeExtent();
        timeExtent.setBaseTime(currentTimeMillis);
        TimeExtent timeExtent2 = new TimeExtent();
        timeExtent2.setBaseTime(currentTimeMillis);
        String definition = nextComponent.getDefinition();
        if (definition == null) {
            definition = "http://www.opengis.net/def/nil/OGC/0/unknown";
        }
        AbstractFeature currentFeatureOfInterest = this.dataSource.getCurrentFeatureOfInterest();
        if (this.dataSource instanceof IMultiSourceDataProducer) {
            currentFeatureOfInterest = this.dataSource.getCurrentFeatureOfInterest(this.lastDataEvent.getRelatedEntityID());
        }
        String uniqueIdentifier = currentFeatureOfInterest != null ? currentFeatureOfInterest.getUniqueIdentifier() : "http://www.opengis.net/def/nil/OGC/0/unknown";
        ObservationImpl observationImpl = new ObservationImpl();
        observationImpl.setFeatureOfInterest(new FeatureRef(uniqueIdentifier));
        observationImpl.setObservedProperty(new DefinitionRef(definition));
        observationImpl.setProcedure(new ProcedureRef(this.dataSource.getCurrentDescription().getUniqueIdentifier()));
        observationImpl.setPhenomenonTime(timeExtent);
        observationImpl.setResultTime(timeExtent2);
        observationImpl.setResult(nextComponent);
        return observationImpl;
    }

    private DataComponent getNextComponent() {
        DataBlock nextResultRecord = getNextResultRecord();
        if (nextResultRecord == null) {
            return null;
        }
        DataComponent copy = getResultStructure().copy();
        copy.setData(nextResultRecord);
        return copy;
    }

    @Override // org.sensorhub.impl.service.sos.ISOSDataProvider
    public DataBlock getNextResultRecord() {
        if (!hasMoreData()) {
            return null;
        }
        try {
            if (this.lastDataEvent == null || this.nextEventRecordIndex >= this.lastDataEvent.getRecords().length) {
                this.lastDataEvent = this.eventQueue.poll(this.timeOut, TimeUnit.MILLISECONDS);
                if (this.lastDataEvent == null || this.lastDataEvent.getTimeStamp() > this.stopTime) {
                    return null;
                }
                this.nextEventRecordIndex = 0;
            }
            DataBlock[] records = this.lastDataEvent.getRecords();
            int i = this.nextEventRecordIndex;
            this.nextEventRecordIndex = i + 1;
            return records[i];
        } catch (InterruptedException e) {
            return null;
        }
    }

    private boolean hasMoreData() {
        if (!this.dataSource.isStarted()) {
            return false;
        }
        boolean z = false;
        Iterator<IStreamingDataInterface> it = this.sourceOutputs.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().isEnabled()) {
                z = true;
                break;
            }
        }
        return z;
    }

    @Override // org.sensorhub.impl.service.sos.ISOSDataProvider
    public DataComponent getResultStructure() {
        return this.sourceOutputs.get(0).getRecordDescription();
    }

    @Override // org.sensorhub.impl.service.sos.ISOSDataProvider
    public DataEncoding getDefaultResultEncoding() {
        return this.sourceOutputs.get(0).getRecommendedEncoding();
    }

    public void handleEvent(Event<?> event) {
        if ((event instanceof DataEvent) && ((DataEvent) event).getType() == DataEvent.Type.NEW_DATA_AVAILABLE && !this.eventQueue.offer((DataEvent) event)) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastQueueErrorTime > 10000) {
                log.warn("Maximum queue size reached while streaming data from " + this.dataSource + ". Some records will be discarded. This is often due to insufficient bandwidth");
                this.lastQueueErrorTime = currentTimeMillis;
            }
        }
    }

    @Override // org.sensorhub.impl.service.sos.ISOSDataProvider
    public void close() {
        if (!this.latestRecordOnly) {
            Iterator<IStreamingDataInterface> it = this.sourceOutputs.iterator();
            while (it.hasNext()) {
                it.next().unregisterListener(this);
            }
        }
        this.eventQueue.clear();
    }
}
