package org.sensorhub.impl.client.sost;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.opengis.gml.v32.AbstractFeature;
import net.opengis.swe.v20.DataBlock;
import org.sensorhub.api.common.Event;
import org.sensorhub.api.common.IEventListener;
import org.sensorhub.api.common.SensorHubException;
import org.sensorhub.api.data.DataEvent;
import org.sensorhub.api.sensor.ISensorDataInterface;
import org.sensorhub.api.sensor.ISensorModule;
import org.sensorhub.api.sensor.SensorDataEvent;
import org.sensorhub.api.sensor.SensorEvent;
import org.sensorhub.api.service.ServiceException;
import org.sensorhub.impl.SensorHub;
import org.sensorhub.impl.module.AbstractModule;
import org.sensorhub.utils.MsgUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vast.cdm.common.DataStreamWriter;
import org.vast.data.DataBlockList;
import org.vast.ogc.om.ObservationImpl;
import org.vast.ows.OWSException;
import org.vast.ows.sos.InsertResultRequest;
import org.vast.ows.sos.InsertResultTemplateRequest;
import org.vast.ows.sos.InsertResultTemplateResponse;
import org.vast.ows.sos.InsertSensorRequest;
import org.vast.ows.sos.SOSUtils;
import org.vast.ows.swe.UpdateSensorRequest;
import org.vast.swe.SWEData;

/* loaded from: input_file:org/sensorhub/impl/client/sost/SOSTClient.class */
public class SOSTClient extends AbstractModule<SOSTClientConfig> implements IEventListener {
    protected static final Logger log = LoggerFactory.getLogger(SOSTClient.class);
    ISensorModule<?> sensor;
    String offering;
    SOSUtils sosUtils = new SOSUtils();
    Map<ISensorDataInterface, StreamInfo> dataStreams = new LinkedHashMap();

    /* loaded from: input_file:org/sensorhub/impl/client/sost/SOSTClient$StreamInfo.class */
    public class StreamInfo {
        String templateID;
        public long lastEventTime = -1;
        public int errorCount = 0;
        private int minRecordsPerRequest = 10;
        private SWEData resultData = new SWEData();
        private ThreadPoolExecutor threadPool;
        private DataStreamWriter persistentWriter;

        public StreamInfo() {
        }
    }

    public void start() throws SensorHubException {
        this.sensor = SensorHub.getInstance().getSensorManager().getModuleById(this.config.sensorID);
        if (!this.sensor.isConnected()) {
            log.info("Sensor {} is not connected. Not connecting to SOS", MsgUtils.moduleString(this.sensor));
            return;
        }
        try {
            registerSensor(this.sensor);
            log.info("Sensor " + MsgUtils.moduleString(this.sensor) + " registered with SOS");
            Iterator it = this.sensor.getAllOutputs().values().iterator();
            while (it.hasNext()) {
                registerDataStream((ISensorDataInterface) it.next());
            }
            log.info("Result templates registered with SOS");
        } catch (Exception e) {
            throw new ServiceException("Error while registering sensor with remote SOS", e);
        }
    }

    public void stop() throws SensorHubException {
        if (this.sensor != null) {
            this.sensor.unregisterListener(this);
        }
        for (Map.Entry<ISensorDataInterface, StreamInfo> entry : this.dataStreams.entrySet()) {
            stopStream(entry.getKey(), entry.getValue());
        }
    }

    protected void stopStream(ISensorDataInterface iSensorDataInterface, StreamInfo streamInfo) {
        iSensorDataInterface.unregisterListener(this);
        streamInfo.threadPool.shutdown();
        try {
            if (streamInfo.persistentWriter != null) {
                streamInfo.persistentWriter.close();
            }
        } catch (IOException e) {
        }
    }

    protected void registerSensor(ISensorModule<?> iSensorModule) throws OWSException {
        InsertSensorRequest insertSensorRequest = new InsertSensorRequest();
        insertSensorRequest.setPostServer(this.config.sosEndpointUrl);
        insertSensorRequest.setVersion("2.0");
        insertSensorRequest.setProcedureDescription(iSensorModule.getCurrentDescription());
        insertSensorRequest.setProcedureDescriptionFormat(InsertSensorRequest.DEFAULT_PROCEDURE_FORMAT);
        insertSensorRequest.getObservationTypes().add("http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_ComplexObservation");
        insertSensorRequest.getFoiTypes().add("gml:Feature");
        this.offering = this.sosUtils.sendRequest(insertSensorRequest, false).getAssignedOffering();
        iSensorModule.registerListener(this);
    }

    protected void updateSensor(ISensorModule<?> iSensorModule) throws OWSException {
        UpdateSensorRequest updateSensorRequest = new UpdateSensorRequest("SOS");
        updateSensorRequest.setPostServer(this.config.sosEndpointUrl);
        updateSensorRequest.setVersion("2.0");
        updateSensorRequest.setProcedureId(iSensorModule.getCurrentDescription().getUniqueIdentifier());
        updateSensorRequest.setProcedureDescription(iSensorModule.getCurrentDescription());
        updateSensorRequest.setProcedureDescriptionFormat(InsertSensorRequest.DEFAULT_PROCEDURE_FORMAT);
        this.sosUtils.sendRequest(updateSensorRequest, false);
    }

    protected void registerDataStream(ISensorDataInterface iSensorDataInterface) throws OWSException {
        InsertResultTemplateRequest insertResultTemplateRequest = new InsertResultTemplateRequest();
        insertResultTemplateRequest.setPostServer(this.config.sosEndpointUrl);
        insertResultTemplateRequest.setVersion("2.0");
        insertResultTemplateRequest.setOffering(this.offering);
        insertResultTemplateRequest.setResultStructure(iSensorDataInterface.getRecordDescription());
        insertResultTemplateRequest.setResultEncoding(iSensorDataInterface.getRecommendedEncoding());
        ObservationImpl observationImpl = new ObservationImpl();
        AbstractFeature currentFeatureOfInterest = iSensorDataInterface.getParentModule().getCurrentFeatureOfInterest();
        if (currentFeatureOfInterest != null) {
            observationImpl.setFeatureOfInterest(currentFeatureOfInterest);
        }
        insertResultTemplateRequest.setObservationTemplate(observationImpl);
        InsertResultTemplateResponse sendRequest = this.sosUtils.sendRequest(insertResultTemplateRequest, false);
        StreamInfo streamInfo = new StreamInfo();
        streamInfo.templateID = sendRequest.getAcceptedTemplateId();
        streamInfo.resultData.setElementType(iSensorDataInterface.getRecordDescription());
        streamInfo.resultData.setEncoding(iSensorDataInterface.getRecommendedEncoding());
        streamInfo.minRecordsPerRequest = 1;
        this.dataStreams.put(iSensorDataInterface, streamInfo);
        streamInfo.threadPool = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue(2));
        iSensorDataInterface.registerListener(this);
    }

    public void handleEvent(Event<?> event) {
        StreamInfo streamInfo;
        if (event instanceof SensorEvent) {
            if (((SensorEvent) event).getType() == SensorEvent.Type.SENSOR_CHANGED) {
                try {
                    updateSensor(this.sensor);
                    return;
                } catch (OWSException e) {
                    log.error("Error when sending updates sensor description to SOS-T", e);
                    return;
                }
            }
            return;
        }
        if (!(event instanceof DataEvent) || (streamInfo = this.dataStreams.get(event.getSource())) == null) {
            return;
        }
        if (streamInfo.errorCount >= this.config.maxConnectErrors) {
            log.error("Too many errors sending '" + ((SensorDataEvent) event).getSource().getName() + "' data to SOS-T from " + MsgUtils.moduleString(this.sensor) + ". Stopping Stream.");
            stopStream((ISensorDataInterface) event.getSource(), streamInfo);
            return;
        }
        if (streamInfo.threadPool.getQueue().remainingCapacity() == 0) {
            String name = ((SensorDataEvent) event).getSource().getName();
            if (log.isDebugEnabled()) {
                log.debug("Too many requests to SOS-T for '" + name + "' of " + MsgUtils.moduleString(this.sensor) + ". Bandwidth cannot keep up.");
                return;
            }
            return;
        }
        streamInfo.lastEventTime = event.getTimeStamp();
        for (DataBlock dataBlock : ((SensorDataEvent) event).getRecords()) {
            streamInfo.resultData.pushNextDataBlock(dataBlock);
        }
        if (this.config.usePersistentConnection) {
            sendInPersistentRequest((SensorDataEvent) event, streamInfo);
        } else {
            sendAsNewRequest((SensorDataEvent) event, streamInfo);
        }
    }

    private void sendAsNewRequest(final SensorDataEvent sensorDataEvent, final StreamInfo streamInfo) {
        if (streamInfo.resultData.getNumElements() >= streamInfo.minRecordsPerRequest) {
            final InsertResultRequest insertResultRequest = new InsertResultRequest();
            insertResultRequest.setPostServer(this.config.sosEndpointUrl);
            insertResultRequest.setVersion("2.0");
            insertResultRequest.setTemplateId(streamInfo.templateID);
            insertResultRequest.setResultData(streamInfo.resultData);
            streamInfo.resultData = streamInfo.resultData.copy();
            streamInfo.threadPool.execute(new Runnable() { // from class: org.sensorhub.impl.client.sost.SOSTClient.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (SOSTClient.log.isDebugEnabled()) {
                            SOSTClient.log.debug("Sending '" + sensorDataEvent.getSource().getName() + "' record(s) to SOS-T");
                        }
                        SOSTClient.this.sosUtils.sendRequest(insertResultRequest, false);
                    } catch (Exception e) {
                        SOSTClient.log.error("Error when sending '" + sensorDataEvent.getSource().getName() + "' data to SOS-T from " + MsgUtils.moduleString(SOSTClient.this.sensor), e);
                        streamInfo.errorCount++;
                    }
                }
            });
        }
    }

    private void sendInPersistentRequest(final SensorDataEvent sensorDataEvent, final StreamInfo streamInfo) {
        final DataBlockList data = streamInfo.resultData.getData();
        streamInfo.resultData.clearData();
        streamInfo.threadPool.execute(new Runnable() { // from class: org.sensorhub.impl.client.sost.SOSTClient.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (streamInfo.persistentWriter == null) {
                        if (SOSTClient.log.isDebugEnabled()) {
                            SOSTClient.log.debug("Connecting to " + SOSTClient.this.config.sosEndpointUrl + "...");
                        }
                        InsertResultRequest insertResultRequest = new InsertResultRequest();
                        insertResultRequest.setPostServer(SOSTClient.this.config.sosEndpointUrl);
                        insertResultRequest.setVersion("2.0");
                        insertResultRequest.setTemplateId(streamInfo.templateID);
                        HttpURLConnection sendPostRequestWithQuery = SOSTClient.this.sosUtils.sendPostRequestWithQuery(insertResultRequest);
                        sendPostRequestWithQuery.setRequestProperty("Content-type", "text/plain");
                        sendPostRequestWithQuery.setChunkedStreamingMode(32);
                        sendPostRequestWithQuery.connect();
                        streamInfo.persistentWriter = streamInfo.resultData.getDataWriter();
                        streamInfo.persistentWriter.setOutput(new BufferedOutputStream(sendPostRequestWithQuery.getOutputStream()));
                    }
                    ListIterator blockIterator = data.blockIterator();
                    while (blockIterator.hasNext()) {
                        if (SOSTClient.log.isDebugEnabled()) {
                            SOSTClient.log.debug("Sending '" + sensorDataEvent.getSource().getName() + "' record(s) to SOS-T");
                        }
                        streamInfo.persistentWriter.write((DataBlock) blockIterator.next());
                    }
                    streamInfo.persistentWriter.flush();
                } catch (Exception e) {
                    SOSTClient.log.error("Error when sending '" + sensorDataEvent.getSource().getName() + "' data to SOS-T from " + MsgUtils.moduleString(SOSTClient.this.sensor), e);
                    streamInfo.errorCount++;
                    try {
                        if (streamInfo.persistentWriter != null) {
                            streamInfo.persistentWriter.close();
                        }
                    } catch (IOException e2) {
                    }
                    streamInfo.persistentWriter = null;
                    SOSTClient.log.debug("Waiting to reconnect...");
                    try {
                        Thread.sleep(3000L);
                    } catch (InterruptedException e3) {
                    }
                }
            }
        });
    }

    public boolean isConnected() {
        return this.offering != null;
    }

    public Map<ISensorDataInterface, StreamInfo> getDataStreams() {
        return this.dataStreams;
    }

    public void cleanup() throws SensorHubException {
    }
}
