package org.sensorhub.impl.client.sost;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.opengis.gml.v32.AbstractFeature;
import net.opengis.swe.v20.DataBlock;
import org.sensorhub.api.client.ClientException;
import org.sensorhub.api.client.IClientModule;
import org.sensorhub.api.common.Event;
import org.sensorhub.api.common.IEventListener;
import org.sensorhub.api.common.SensorHubException;
import org.sensorhub.api.data.DataEvent;
import org.sensorhub.api.module.ModuleEvent;
import org.sensorhub.api.sensor.ISensorDataInterface;
import org.sensorhub.api.sensor.ISensorModule;
import org.sensorhub.api.sensor.SensorDataEvent;
import org.sensorhub.api.sensor.SensorEvent;
import org.sensorhub.impl.SensorHub;
import org.sensorhub.impl.comm.RobustIPConnection;
import org.sensorhub.impl.module.AbstractModule;
import org.sensorhub.impl.module.RobustConnection;
import org.sensorhub.impl.security.ClientAuth;
import org.sensorhub.utils.MsgUtils;
import org.vast.cdm.common.DataStreamWriter;
import org.vast.ogc.om.ObservationImpl;
import org.vast.ows.GetCapabilitiesRequest;
import org.vast.ows.OWSException;
import org.vast.ows.sos.InsertResultRequest;
import org.vast.ows.sos.InsertResultTemplateRequest;
import org.vast.ows.sos.InsertResultTemplateResponse;
import org.vast.ows.sos.InsertSensorRequest;
import org.vast.ows.sos.SOSInsertionCapabilities;
import org.vast.ows.sos.SOSServiceCapabilities;
import org.vast.ows.sos.SOSUtils;
import org.vast.ows.swe.UpdateSensorRequest;
import org.vast.swe.SWEData;

/* loaded from: input_file:org/sensorhub/impl/client/sost/SOSTClient.class */
public class SOSTClient extends AbstractModule<SOSTClientConfig> implements IClientModule<SOSTClientConfig>, IEventListener {
    RobustConnection connection;
    ISensorModule<?> sensor;
    String sosEndpointUrl;
    String offering;
    SOSUtils sosUtils = new SOSUtils();
    Map<ISensorDataInterface, StreamInfo> dataStreams = new LinkedHashMap();

    /* loaded from: input_file:org/sensorhub/impl/client/sost/SOSTClient$StreamInfo.class */
    public class StreamInfo {
        String templateID;
        private ThreadPoolExecutor threadPool;
        private DataStreamWriter persistentWriter;
        public long lastEventTime = Long.MIN_VALUE;
        public int measPeriodMs = 1000;
        public int errorCount = 0;
        private int minRecordsPerRequest = 10;
        private SWEData resultData = new SWEData();
        private volatile boolean connecting = false;
        private volatile boolean stopping = false;

        public StreamInfo() {
        }
    }

    private void setAuth() {
        ClientAuth.getInstance().setUser(this.config.sos.user);
        if (this.config.sos.password != null) {
            ClientAuth.getInstance().setPassword(this.config.sos.password.toCharArray());
        }
    }

    protected String getSosEndpointUrl() {
        setAuth();
        return this.sosEndpointUrl;
    }

    public void setConfiguration(SOSTClientConfig sOSTClientConfig) {
        super.setConfiguration(sOSTClientConfig);
        this.sosEndpointUrl = (sOSTClientConfig.sos.enableTLS ? "https" : "http") + "://" + sOSTClientConfig.sos.remoteHost + ":" + sOSTClientConfig.sos.remotePort;
        if (sOSTClientConfig.sos.resourcePath != null) {
            if (sOSTClientConfig.sos.resourcePath.charAt(0) != '/') {
                this.sosEndpointUrl += '/';
            }
            this.sosEndpointUrl += sOSTClientConfig.sos.resourcePath;
        }
    }

    public void init() throws SensorHubException {
        try {
            this.sensor = SensorHub.getInstance().getSensorManager().getModuleById(this.config.sensorID);
            this.connection = new RobustIPConnection(this, this.config.connection, "SOS server") { // from class: org.sensorhub.impl.client.sost.SOSTClient.1
                public boolean tryConnect() throws Exception {
                    if (!tryConnectTCP(SOSTClient.this.config.sos.remoteHost, SOSTClient.this.config.sos.remotePort)) {
                        return false;
                    }
                    try {
                        GetCapabilitiesRequest getCapabilitiesRequest = new GetCapabilitiesRequest();
                        getCapabilitiesRequest.setConnectTimeOut(this.connectConfig.connectTimeout);
                        getCapabilitiesRequest.setService("SOS");
                        getCapabilitiesRequest.setGetServer(SOSTClient.this.getSosEndpointUrl());
                        SOSServiceCapabilities sendRequest = SOSTClient.this.sosUtils.sendRequest(getCapabilitiesRequest, false);
                        if (!sendRequest.getPostServers().isEmpty()) {
                            for (String str : new String[]{"InsertSensor", "InsertResultTemplate", "InsertResult"}) {
                                if (!sendRequest.getPostServers().containsKey(str)) {
                                    throw new ClientException(str + " operation not supported by this SOS endpoint");
                                }
                            }
                        }
                        SOSInsertionCapabilities insertionCapabilities = sendRequest.getInsertionCapabilities();
                        if (insertionCapabilities == null) {
                            return true;
                        }
                        if (!insertionCapabilities.getProcedureFormats().contains(InsertSensorRequest.DEFAULT_PROCEDURE_FORMAT)) {
                            throw new ClientException("SensorML v2.0 format not supported by this SOS endpoint");
                        }
                        if (insertionCapabilities.getObservationTypes().contains("http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_ComplexObservation")) {
                            return true;
                        }
                        throw new ClientException("DataRecord observation type not supported by this SOS endpoint");
                    } catch (OWSException e) {
                        SOSTClient.this.reportError("Cannot fetch SOS capabilities", e, true);
                        return false;
                    }
                }
            };
        } catch (Exception e) {
            throw new ClientException("Cannot find sensor with local ID " + this.config.sensorID);
        }
    }

    public void requestStart() throws SensorHubException {
        if (canStart()) {
            try {
                this.sensor.registerListener(this);
                reportStatus("Waiting for data source " + MsgUtils.moduleString(this.sensor));
            } catch (Exception e) {
                reportError("Error while starting module", e);
                requestStop();
                throw e;
            }
        }
    }

    public void start() throws SensorHubException {
        this.connection.updateConfig(this.config.connection);
        this.connection.waitForConnection();
        reportStatus("Connected to " + getSosEndpointUrl());
        try {
            registerSensor(this.sensor);
            getLogger().info("Sensor " + MsgUtils.moduleString(this.sensor) + " registered with SOS");
            for (ISensorDataInterface iSensorDataInterface : this.sensor.getAllOutputs().values()) {
                try {
                    registerDataStream(iSensorDataInterface);
                } catch (Exception e) {
                    throw new ClientException("Error while registering " + iSensorDataInterface.getName() + " data stream with remote SOS", e);
                }
            }
            getLogger().info("Result templates registered with SOS");
            setState(ModuleEvent.ModuleState.STARTED);
        } catch (Exception e2) {
            throw new ClientException("Error while registering sensor with remote SOS", e2);
        }
    }

    public void stop() throws SensorHubException {
        if (this.connection != null) {
            this.connection.cancel();
        }
        if (this.sensor != null) {
            this.sensor.unregisterListener(this);
        }
        for (Map.Entry<ISensorDataInterface, StreamInfo> entry : this.dataStreams.entrySet()) {
            stopStream(entry.getKey(), entry.getValue());
        }
    }

    protected void stopStream(ISensorDataInterface iSensorDataInterface, StreamInfo streamInfo) {
        iSensorDataInterface.unregisterListener(this);
        try {
            streamInfo.stopping = true;
            if (streamInfo.persistentWriter != null) {
                streamInfo.persistentWriter.close();
            }
        } catch (IOException e) {
        }
        try {
            if (streamInfo.threadPool != null && !streamInfo.threadPool.isShutdown()) {
                streamInfo.threadPool.shutdownNow();
                streamInfo.threadPool.awaitTermination(3L, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e2) {
        }
    }

    protected void registerSensor(ISensorModule<?> iSensorModule) throws OWSException {
        InsertSensorRequest insertSensorRequest = new InsertSensorRequest();
        insertSensorRequest.setConnectTimeOut(this.config.connection.connectTimeout);
        insertSensorRequest.setPostServer(getSosEndpointUrl());
        insertSensorRequest.setVersion("2.0");
        insertSensorRequest.setProcedureDescription(iSensorModule.getCurrentDescription());
        insertSensorRequest.setProcedureDescriptionFormat(InsertSensorRequest.DEFAULT_PROCEDURE_FORMAT);
        insertSensorRequest.getObservationTypes().add("http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_ComplexObservation");
        insertSensorRequest.getFoiTypes().add("gml:Feature");
        this.offering = this.sosUtils.sendRequest(insertSensorRequest, false).getAssignedOffering();
    }

    protected void updateSensor(ISensorModule<?> iSensorModule) throws OWSException {
        UpdateSensorRequest updateSensorRequest = new UpdateSensorRequest("SOS");
        updateSensorRequest.setConnectTimeOut(this.config.connection.connectTimeout);
        updateSensorRequest.setPostServer(getSosEndpointUrl());
        updateSensorRequest.setVersion("2.0");
        updateSensorRequest.setProcedureId(iSensorModule.getUniqueIdentifier());
        updateSensorRequest.setProcedureDescription(iSensorModule.getCurrentDescription());
        updateSensorRequest.setProcedureDescriptionFormat(InsertSensorRequest.DEFAULT_PROCEDURE_FORMAT);
        this.sosUtils.sendRequest(updateSensorRequest, false);
    }

    protected void registerDataStream(ISensorDataInterface iSensorDataInterface) throws OWSException {
        InsertResultTemplateRequest insertResultTemplateRequest = new InsertResultTemplateRequest();
        insertResultTemplateRequest.setConnectTimeOut(this.config.connection.connectTimeout);
        insertResultTemplateRequest.setPostServer(getSosEndpointUrl());
        insertResultTemplateRequest.setVersion("2.0");
        insertResultTemplateRequest.setOffering(this.offering);
        insertResultTemplateRequest.setResultStructure(iSensorDataInterface.getRecordDescription());
        insertResultTemplateRequest.setResultEncoding(iSensorDataInterface.getRecommendedEncoding());
        ObservationImpl observationImpl = new ObservationImpl();
        AbstractFeature currentFeatureOfInterest = iSensorDataInterface.getParentModule().getCurrentFeatureOfInterest();
        if (currentFeatureOfInterest != null) {
            observationImpl.setFeatureOfInterest(currentFeatureOfInterest);
        }
        insertResultTemplateRequest.setObservationTemplate(observationImpl);
        InsertResultTemplateResponse sendRequest = this.sosUtils.sendRequest(insertResultTemplateRequest, false);
        StreamInfo streamInfo = new StreamInfo();
        streamInfo.templateID = sendRequest.getAcceptedTemplateId();
        streamInfo.resultData.setElementType(iSensorDataInterface.getRecordDescription());
        streamInfo.resultData.setEncoding(iSensorDataInterface.getRecommendedEncoding());
        streamInfo.measPeriodMs = (int) (iSensorDataInterface.getAverageSamplingPeriod() * 1000.0d);
        streamInfo.minRecordsPerRequest = 1;
        this.dataStreams.put(iSensorDataInterface, streamInfo);
        streamInfo.threadPool = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue(this.config.connection.maxQueueSize));
        iSensorDataInterface.registerListener(this);
    }

    public void handleEvent(Event<?> event) {
        StreamInfo streamInfo;
        if (event instanceof ModuleEvent) {
            if (((ModuleEvent) event).getNewState() == ModuleEvent.ModuleState.STARTED) {
                try {
                    start();
                    return;
                } catch (SensorHubException e) {
                    reportError("Could not start SOS-T Client", e);
                    setState(ModuleEvent.ModuleState.STOPPED);
                    return;
                }
            }
            return;
        }
        if (event instanceof SensorEvent) {
            if (((SensorEvent) event).getType() == SensorEvent.Type.SENSOR_CHANGED) {
                try {
                    updateSensor(this.sensor);
                    return;
                } catch (OWSException e2) {
                    getLogger().error("Error when sending updates sensor description to SOS-T", e2);
                    return;
                }
            }
            return;
        }
        if (!(event instanceof DataEvent) || (streamInfo = this.dataStreams.get(event.getSource())) == null) {
            return;
        }
        if (streamInfo.errorCount >= this.config.connection.maxConnectErrors) {
            reportError("Too many errors sending '" + ((SensorDataEvent) event).getSource().getName() + "' data to SOS-T. Stopping Stream.", null);
            stopStream((ISensorDataInterface) event.getSource(), streamInfo);
            checkDisconnected();
        } else if (streamInfo.threadPool.getQueue().remainingCapacity() == 0) {
            getLogger().warn("Too many '" + ((SensorDataEvent) event).getSource().getName() + "' records to send to SOS-T. Bandwidth cannot keep up.");
            getLogger().info("Skipping records by purging record queue");
            streamInfo.threadPool.getQueue().clear();
        } else {
            streamInfo.lastEventTime = event.getTimeStamp();
            if (this.config.connection.usePersistentConnection) {
                sendInPersistentRequest((SensorDataEvent) event, streamInfo);
            } else {
                sendAsNewRequest((SensorDataEvent) event, streamInfo);
            }
        }
    }

    private void checkDisconnected() {
        boolean z = true;
        Iterator<StreamInfo> it = this.dataStreams.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (!it.next().stopping) {
                z = false;
                break;
            }
        }
        if (z) {
            reportStatus("All streams stopped on error. Trying to reconnect...");
            this.connection.reconnect();
        }
    }

    private void sendAsNewRequest(final SensorDataEvent sensorDataEvent, final StreamInfo streamInfo) {
        for (DataBlock dataBlock : sensorDataEvent.getRecords()) {
            streamInfo.resultData.pushNextDataBlock(dataBlock);
        }
        if (streamInfo.resultData.getNumElements() >= streamInfo.minRecordsPerRequest) {
            final InsertResultRequest insertResultRequest = new InsertResultRequest();
            insertResultRequest.setPostServer(getSosEndpointUrl());
            insertResultRequest.setVersion("2.0");
            insertResultRequest.setTemplateId(streamInfo.templateID);
            insertResultRequest.setResultData(streamInfo.resultData);
            streamInfo.resultData = streamInfo.resultData.copy();
            streamInfo.threadPool.execute(new Runnable() { // from class: org.sensorhub.impl.client.sost.SOSTClient.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (SOSTClient.this.getLogger().isTraceEnabled()) {
                            SOSTClient.this.getLogger().trace("Sending " + insertResultRequest.getResultData().getComponentCount() + " '" + sensorDataEvent.getSource().getName() + "' record(s) to SOS-T");
                            SOSTClient.this.getLogger().trace("Queue size is " + streamInfo.threadPool.getQueue().size());
                        }
                        SOSTClient.this.sosUtils.sendRequest(insertResultRequest, false);
                    } catch (Exception e) {
                        SOSTClient.this.reportError("Error when sending '" + sensorDataEvent.getSource().getName() + "' data to SOS-T", e, true);
                        streamInfo.errorCount++;
                    }
                }
            });
        }
    }

    private void sendInPersistentRequest(final SensorDataEvent sensorDataEvent, final StreamInfo streamInfo) {
        if (streamInfo.connecting) {
            return;
        }
        streamInfo.threadPool.execute(new Runnable() { // from class: org.sensorhub.impl.client.sost.SOSTClient.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (streamInfo.persistentWriter == null) {
                        streamInfo.connecting = true;
                        if (SOSTClient.this.getLogger().isDebugEnabled()) {
                            SOSTClient.this.getLogger().debug("Initiating streaming request");
                        }
                        InsertResultRequest insertResultRequest = new InsertResultRequest();
                        insertResultRequest.setPostServer(SOSTClient.this.getSosEndpointUrl());
                        insertResultRequest.setVersion("2.0");
                        insertResultRequest.setTemplateId(streamInfo.templateID);
                        HttpURLConnection sendPostRequestWithQuery = SOSTClient.this.sosUtils.sendPostRequestWithQuery(insertResultRequest);
                        sendPostRequestWithQuery.setRequestProperty("Content-type", "text/plain");
                        sendPostRequestWithQuery.setChunkedStreamingMode(32);
                        sendPostRequestWithQuery.connect();
                        streamInfo.persistentWriter = streamInfo.resultData.getDataWriter();
                        streamInfo.persistentWriter.setOutput(new BufferedOutputStream(sendPostRequestWithQuery.getOutputStream()));
                        streamInfo.connecting = false;
                    }
                    if (SOSTClient.this.getLogger().isTraceEnabled()) {
                        SOSTClient.this.getLogger().trace("Sending " + sensorDataEvent.getRecords().length + " '" + sensorDataEvent.getSource().getName() + "' record(s) to SOS-T");
                        SOSTClient.this.getLogger().trace("Queue size is " + streamInfo.threadPool.getQueue().size());
                    }
                    for (DataBlock dataBlock : sensorDataEvent.getRecords()) {
                        streamInfo.persistentWriter.write(dataBlock);
                    }
                    streamInfo.persistentWriter.flush();
                } catch (Exception e) {
                    if (streamInfo.stopping) {
                        return;
                    }
                    SOSTClient.this.reportError("Error when sending '" + sensorDataEvent.getSource().getName() + "' data to SOS-T", e, true);
                    streamInfo.errorCount++;
                    try {
                        if (streamInfo.persistentWriter != null) {
                            streamInfo.persistentWriter.close();
                        }
                    } catch (IOException e2) {
                    }
                    streamInfo.persistentWriter = null;
                    streamInfo.connecting = false;
                }
            }
        });
    }

    public boolean isConnected() {
        return this.connection.isConnected();
    }

    public Map<ISensorDataInterface, StreamInfo> getDataStreams() {
        return this.dataStreams;
    }

    public void cleanup() throws SensorHubException {
    }
}
