/*
 * Decompiled with CFR 0.152.
 */
package org.sensorhub.test.persistence;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.opengis.DateTimeDouble;
import net.opengis.IDateTime;
import net.opengis.gml.v32.TimeInstant;
import net.opengis.gml.v32.TimePeriod;
import net.opengis.gml.v32.TimePosition;
import net.opengis.gml.v32.impl.GMLFactory;
import net.opengis.sensorml.v20.AbstractProcess;
import net.opengis.sensorml.v20.IdentifierList;
import net.opengis.sensorml.v20.PhysicalSystem;
import net.opengis.sensorml.v20.Term;
import net.opengis.swe.v20.DataArray;
import net.opengis.swe.v20.DataBlock;
import net.opengis.swe.v20.DataComponent;
import net.opengis.swe.v20.DataEncoding;
import org.junit.Assert;
import org.junit.Test;
import org.sensorhub.api.persistence.DataFilter;
import org.sensorhub.api.persistence.DataKey;
import org.sensorhub.api.persistence.IDataFilter;
import org.sensorhub.api.persistence.IDataRecord;
import org.sensorhub.api.persistence.IRecordStorageModule;
import org.sensorhub.api.persistence.IRecordStoreInfo;
import org.sensorhub.test.TestUtils;
import org.vast.data.BinaryEncodingImpl;
import org.vast.data.CountImpl;
import org.vast.data.DataArrayImpl;
import org.vast.data.DataRecordImpl;
import org.vast.data.QuantityImpl;
import org.vast.data.TextEncodingImpl;
import org.vast.data.TextImpl;
import org.vast.sensorML.PhysicalSystemImpl;
import org.vast.sensorML.SMLFactory;
import org.vast.sensorML.SMLUtils;
import org.vast.swe.SWEHelper;
import org.vast.util.DateTimeFormat;

public abstract class AbstractTestBasicStorage<StorageType extends IRecordStorageModule<?>> {
    static String SENSOR_UID_PREFIX = "urn:domain:sensors:";
    protected StorageType storage;
    protected String producerID = SENSOR_UID_PREFIX + 1;
    protected Collection<String> producerFilterList = null;
    long refTime;
    int numWrittenMetadataObj;
    int numWrittenRecords;
    volatile int numWriteThreadsRunning;

    protected abstract void forceReadBackFromStorage() throws Exception;

    protected DataComponent createDs1() throws Exception {
        QuantityImpl recordDesc = new QuantityImpl();
        recordDesc.setName("ds1");
        this.storage.addRecordStore(recordDesc.getName(), (DataComponent)recordDesc, (DataEncoding)new TextEncodingImpl());
        return recordDesc;
    }

    protected DataComponent createDs2() throws Exception {
        DataRecordImpl recordDesc = new DataRecordImpl();
        recordDesc.setName("ds2");
        recordDesc.setDefinition("urn:auth:blabla:record-stuff");
        QuantityImpl q = new QuantityImpl();
        q.setLabel("My Quantity");
        q.getUom().setCode("m.s-2.kg-1");
        recordDesc.addComponent("c1", (DataComponent)q);
        recordDesc.addComponent("c2", (DataComponent)new CountImpl());
        recordDesc.addComponent("c3", (DataComponent)new TextImpl());
        this.storage.addRecordStore(recordDesc.getName(), (DataComponent)recordDesc, (DataEncoding)new TextEncodingImpl());
        return recordDesc;
    }

    protected DataComponent createDs3(DataComponent nestedRec) throws Exception {
        DataArrayImpl recordDesc = new DataArrayImpl(10);
        recordDesc.setName("ds3");
        recordDesc.setDefinition("urn:auth:blabla:array-stuff");
        ((DataArray)recordDesc).setElementType("elt", nestedRec);
        this.storage.addRecordStore(recordDesc.getName(), (DataComponent)recordDesc, (DataEncoding)new BinaryEncodingImpl());
        return recordDesc;
    }

    @Test
    public void testCreateDataStores() throws Exception {
        DataComponent recordDs1 = this.createDs1();
        Map recordTypes = this.storage.getRecordStores();
        Assert.assertEquals((long)1L, (long)recordTypes.size());
        DataComponent recordDs2 = this.createDs2();
        recordTypes = this.storage.getRecordStores();
        Assert.assertEquals((long)2L, (long)recordTypes.size());
        this.forceReadBackFromStorage();
        recordTypes = this.storage.getRecordStores();
        TestUtils.assertEquals(recordDs1, ((IRecordStoreInfo)recordTypes.get(recordDs1.getName())).getRecordDescription());
        Assert.assertEquals(TextEncodingImpl.class, ((IRecordStoreInfo)recordTypes.get(recordDs1.getName())).getRecommendedEncoding().getClass());
        TestUtils.assertEquals(recordDs2, ((IRecordStoreInfo)recordTypes.get(recordDs2.getName())).getRecordDescription());
        Assert.assertEquals(TextEncodingImpl.class, ((IRecordStoreInfo)recordTypes.get(recordDs2.getName())).getRecommendedEncoding().getClass());
        DataComponent recordDs3 = this.createDs3(recordDs2);
        this.forceReadBackFromStorage();
        recordTypes = this.storage.getRecordStores();
        Assert.assertEquals((long)3L, (long)recordTypes.size());
        TestUtils.assertEquals(recordDs3, ((IRecordStoreInfo)recordTypes.get(recordDs3.getName())).getRecordDescription());
        Assert.assertEquals(BinaryEncodingImpl.class, ((IRecordStoreInfo)recordTypes.get(recordDs3.getName())).getRecommendedEncoding().getClass());
    }

    @Test
    public void testStoreAndGetLatestSensorML() throws Exception {
        SMLUtils smlUtils = new SMLUtils("2.0");
        BufferedInputStream is = new BufferedInputStream(this.getClass().getResourceAsStream("/gamma2070_more.xml"));
        AbstractProcess smlIn = smlUtils.readProcess((InputStream)is);
        this.storage.storeDataSourceDescription(smlIn);
        this.forceReadBackFromStorage();
        AbstractProcess smlOut = this.storage.getLatestDataSourceDescription();
        TestUtils.assertEquals(smlIn, smlOut);
    }

    @Test
    public void testStoreAndGetSensorMLByTime() throws Exception {
        SMLUtils smlUtils = new SMLUtils("2.0");
        BufferedInputStream is = new BufferedInputStream(this.getClass().getResourceAsStream("/gamma2070_more.xml"));
        AbstractProcess smlIn1 = smlUtils.readProcess((InputStream)is);
        DateTimeDouble begin1 = new DateTimeDouble(new DateTimeFormat().parseIso("2010-05-15Z"));
        ((TimePeriod)smlIn1.getValidTimeList().get(0)).getBeginPosition().setDateTimeValue((IDateTime)begin1);
        DateTimeDouble end1 = new DateTimeDouble(new DateTimeFormat().parseIso("2010-09-23Z"));
        ((TimePeriod)smlIn1.getValidTimeList().get(0)).getEndPosition().setDateTimeValue((IDateTime)end1);
        this.storage.storeDataSourceDescription(smlIn1);
        this.forceReadBackFromStorage();
        AbstractProcess smlOut = this.storage.getLatestDataSourceDescription();
        TestUtils.assertEquals(smlIn1, smlOut);
        smlOut = this.storage.getDataSourceDescriptionAtTime(begin1.getAsDouble());
        TestUtils.assertEquals(smlIn1, smlOut);
        smlOut = this.storage.getDataSourceDescriptionAtTime(end1.getAsDouble());
        TestUtils.assertEquals(smlIn1, smlOut);
        smlOut = this.storage.getDataSourceDescriptionAtTime(begin1.getAsDouble() + 864000.0);
        TestUtils.assertEquals(smlIn1, smlOut);
        is = new BufferedInputStream(this.getClass().getResourceAsStream("/gamma2070_more.xml"));
        AbstractProcess smlIn2 = smlUtils.readProcess((InputStream)is);
        DateTimeDouble begin2 = new DateTimeDouble(new DateTimeFormat().parseIso("2010-09-24Z"));
        ((TimePeriod)smlIn2.getValidTimeList().get(0)).getBeginPosition().setDateTimeValue((IDateTime)begin2);
        DateTimeDouble end2 = new DateTimeDouble(new DateTimeFormat().parseIso("2010-12-08Z"));
        ((TimePeriod)smlIn2.getValidTimeList().get(0)).getEndPosition().setDateTimeValue((IDateTime)end2);
        this.storage.storeDataSourceDescription(smlIn2);
        this.forceReadBackFromStorage();
        smlOut = this.storage.getDataSourceDescriptionAtTime(begin1.getAsDouble());
        TestUtils.assertEquals(smlIn1, smlOut);
        smlOut = this.storage.getDataSourceDescriptionAtTime(end1.getAsDouble());
        TestUtils.assertEquals(smlIn1, smlOut);
        smlOut = this.storage.getDataSourceDescriptionAtTime(begin1.getAsDouble() + 864000.0);
        TestUtils.assertEquals(smlIn1, smlOut);
        smlOut = this.storage.getDataSourceDescriptionAtTime(begin2.getAsDouble());
        TestUtils.assertEquals(smlIn2, smlOut);
        smlOut = this.storage.getDataSourceDescriptionAtTime(end2.getAsDouble());
        TestUtils.assertEquals(smlIn2, smlOut);
        smlOut = this.storage.getDataSourceDescriptionAtTime(begin2.getAsDouble() + 864000.0);
        TestUtils.assertEquals(smlIn2, smlOut);
    }

    @Test
    public void testStoreAndGetRecordsByKey() throws Exception {
        DataComponent recordDs1 = this.createDs1();
        DataBlock data = recordDs1.createDataBlock();
        data.setDoubleValue(0.95);
        DataKey key = new DataKey(recordDs1.getName(), this.producerID, 12.0);
        this.storage.storeRecord(key, data);
        this.forceReadBackFromStorage();
        DataBlock readData = this.storage.getDataBlock(key);
        TestUtils.assertEquals(data, readData);
        DataComponent recordDs2 = this.createDs2();
        data = recordDs2.createDataBlock();
        data.setDoubleValue(0, 1.0);
        data.setIntValue(1, 2);
        data.setStringValue(2, "test");
        key = new DataKey(recordDs2.getName(), this.producerID, 123.0);
        this.storage.storeRecord(key, data);
        this.forceReadBackFromStorage();
        readData = this.storage.getDataBlock(key);
        TestUtils.assertEquals(data, readData);
        DataArray recordDs3 = (DataArray)this.createDs3(recordDs2);
        data = recordDs3.createDataBlock();
        int arraySize = recordDs3.getElementCount().getValue();
        int offset = 0;
        for (int i = 0; i < arraySize; ++i) {
            data.setDoubleValue(offset++, (double)i + 0.5);
            data.setIntValue(offset++, 2 * i);
            data.setStringValue(offset++, "test" + i);
        }
        key = new DataKey(recordDs3.getName(), this.producerID, 10.0);
        this.storage.storeRecord(key, data);
        this.forceReadBackFromStorage();
        readData = this.storage.getDataBlock(key);
        TestUtils.assertEquals(data, readData);
    }

    protected List<DataBlock> writeRecords(DataComponent recordDef, double firstTime, double timeStep, int numRecords) throws Exception {
        return this.writeRecords(recordDef, firstTime, timeStep, numRecords, Integer.MAX_VALUE);
    }

    protected List<DataBlock> writeRecords(DataComponent recordDef, double firstTime, double timeStep, int numRecords, int maxDuration) throws Exception {
        long t0 = System.currentTimeMillis();
        double timeStamp = firstTime;
        ArrayList<DataBlock> dataList = new ArrayList<DataBlock>(1000);
        for (int i = 0; i < numRecords; ++i) {
            DataBlock data = recordDef.createDataBlock();
            data.setDoubleValue(0, (double)i + 0.3);
            data.setIntValue(1, 2 * i);
            data.setStringValue(2, "test" + i);
            timeStamp = firstTime + (double)i * timeStep;
            DataKey key = new DataKey(recordDef.getName(), this.producerID, timeStamp);
            this.storage.storeRecord(key, data);
            dataList.add(data);
            if (Thread.interrupted() || System.currentTimeMillis() - t0 > (long)maxDuration) break;
            if (i % 10 == 0) {
                this.storage.commit();
            }
            Thread.sleep(1L);
        }
        this.storage.commit();
        return dataList;
    }

    @Test
    public void testStoreAndGetMultipleRecordsByKey() throws Exception {
        DataComponent recordDef = this.createDs2();
        int numRecords = 100;
        double timeStep = 0.1;
        List<DataBlock> dataList = this.writeRecords(recordDef, 0.0, timeStep, numRecords);
        this.forceReadBackFromStorage();
        for (int i = 0; i < numRecords; ++i) {
            DataKey key = new DataKey(recordDef.getName(), this.producerID, (double)i * timeStep);
            DataBlock data = this.storage.getDataBlock(key);
            TestUtils.assertEquals(dataList.get(i), data);
        }
    }

    @Test
    public void testStoreAndGetMultipleRecordsByFilter() throws Exception {
        DataComponent recordDef = this.createDs2();
        int numRecords = 100;
        double timeStep = 0.1;
        List<DataBlock> dataList = this.writeRecords(recordDef, 0.0, 0.1, 100);
        this.forceReadBackFromStorage();
        DataFilter filter = new DataFilter(recordDef.getName()){

            public double[] getTimeStampRange() {
                return new double[]{0.0, 10.0};
            }
        };
        int i = 0;
        Iterator it = this.storage.getRecordIterator((IDataFilter)filter);
        while (it.hasNext()) {
            Assert.assertTrue((String)"Wrong number of records returned", (i < 100 ? 1 : 0) != 0);
            TestUtils.assertEquals(dataList.get(i), ((IDataRecord)it.next()).getData());
            ++i;
        }
        Assert.assertEquals((String)"Wrong number of records returned", (long)100L, (long)i);
    }

    @Test
    public void testStoreAndGetTimeRange() throws Exception {
        DataComponent recordDef = this.createDs2();
        int numRecords = 100;
        double timeStep = 0.1;
        this.writeRecords(recordDef, 0.0, timeStep, numRecords);
        this.forceReadBackFromStorage();
        int recordCount = this.storage.getNumRecords(recordDef.getName());
        Assert.assertEquals((String)"Wrong number of records returned", (long)numRecords, (long)recordCount);
        double[] timeRange = this.storage.getRecordsTimeRange(recordDef.getName());
        Assert.assertEquals((String)"Invalid begin time", (double)0.0, (double)timeRange[0], (double)1.0E-6);
        Assert.assertEquals((String)"Invalid end time", (double)((double)(numRecords - 1) * timeStep), (double)timeRange[1], (double)1.0E-6);
    }

    @Test
    public void testStoreIncompatibleRecord() throws Exception {
    }

    protected void startWriteRecordsThreads(ExecutorService exec, int numWriteThreads, final DataComponent recordDef, final double timeStep, final int testDurationMs, final Collection<Throwable> errors) {
        this.numWriteThreadsRunning = numWriteThreads;
        int i = 0;
        while (i < numWriteThreads) {
            final int count = i++;
            exec.submit(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    long startTimeOffset = System.currentTimeMillis() - AbstractTestBasicStorage.this.refTime;
                    System.out.format("Begin Write Records Thread %d @ %dms\n", Thread.currentThread().getId(), startTimeOffset);
                    try {
                        List<DataBlock> dataList = AbstractTestBasicStorage.this.writeRecords(recordDef, (double)count * 10000.0, timeStep, Integer.MAX_VALUE, testDurationMs);
                        AbstractTestBasicStorage abstractTestBasicStorage = AbstractTestBasicStorage.this;
                        synchronized (abstractTestBasicStorage) {
                            AbstractTestBasicStorage.this.numWrittenRecords += dataList.size();
                        }
                    }
                    catch (Throwable e) {
                        errors.add(e);
                    }
                    AbstractTestBasicStorage e = AbstractTestBasicStorage.this;
                    synchronized (e) {
                        --AbstractTestBasicStorage.this.numWriteThreadsRunning;
                    }
                    long stopTimeOffset = System.currentTimeMillis() - AbstractTestBasicStorage.this.refTime;
                    System.out.format("End Write Records Thread %d @ %dms\n", Thread.currentThread().getId(), stopTimeOffset);
                }
            });
        }
    }

    protected void startReadRecordsThreads(ExecutorService exec, int numReadThreads, final DataComponent recordDef, final double timeStep, final Collection<Throwable> errors) {
        for (int i = 0; i < numReadThreads; ++i) {
            exec.submit(new Runnable(){

                @Override
                public void run() {
                    long tid = Thread.currentThread().getId();
                    long startTimeOffset = System.currentTimeMillis() - AbstractTestBasicStorage.this.refTime;
                    System.out.format("Begin Read Records Thread %d @ %dms\n", tid, startTimeOffset);
                    int readCount = 0;
                    try {
                        while (AbstractTestBasicStorage.this.numWriteThreadsRunning > 0 && !Thread.interrupted()) {
                            double[] timeRange = AbstractTestBasicStorage.this.storage.getRecordsTimeRange(recordDef.getName());
                            if (Double.isNaN(timeRange[0])) continue;
                            final double begin = timeRange[0] + Math.random() * (timeRange[1] - timeRange[0]);
                            final double end = begin + Math.max(timeStep * 100.0, Math.random() * (timeRange[1] - begin));
                            DataFilter filter = new DataFilter(recordDef.getName()){

                                public double[] getTimeStampRange() {
                                    return new double[]{begin, end};
                                }
                            };
                            Iterator it = AbstractTestBasicStorage.this.storage.getRecordIterator((IDataFilter)filter);
                            ++readCount;
                            double lastTimeStamp = Double.NEGATIVE_INFINITY;
                            while (it.hasNext()) {
                                IDataRecord rec = (IDataRecord)it.next();
                                double timeStamp = rec.getKey().timeStamp;
                                Assert.assertTrue((String)(tid + ": Time steps are not increasing: " + timeStamp + "<" + lastTimeStamp), (timeStamp > lastTimeStamp ? 1 : 0) != 0);
                                Assert.assertTrue((String)(tid + ": Time stamp lower than begin: " + timeStamp + "<" + begin), (timeStamp >= begin ? 1 : 0) != 0);
                                Assert.assertTrue((String)(tid + ": Time stamp higher than end: " + timeStamp + ">" + end), (timeStamp <= end ? 1 : 0) != 0);
                                lastTimeStamp = timeStamp;
                            }
                            Thread.sleep(1L);
                        }
                    }
                    catch (Throwable e) {
                        errors.add(e);
                    }
                    long stopTimeOffset = System.currentTimeMillis() - AbstractTestBasicStorage.this.refTime;
                    System.out.format("End Read Records Thread %d @%dms - %d read ops\n", Thread.currentThread().getId(), stopTimeOffset, readCount);
                }
            });
        }
    }

    protected void startWriteMetadataThreads(ExecutorService exec, int numWriteThreads, final Collection<Throwable> errors) {
        for (int i = 0; i < numWriteThreads; ++i) {
            final int startCount = i * 1000000;
            exec.submit(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    long startTimeOffset = System.currentTimeMillis() - AbstractTestBasicStorage.this.refTime;
                    System.out.format("Begin Write Desc Thread %d @%dms\n", Thread.currentThread().getId(), startTimeOffset);
                    try {
                        int count = startCount;
                        while (AbstractTestBasicStorage.this.numWriteThreadsRunning > 0 && !Thread.interrupted()) {
                            SMLFactory smlFac = new SMLFactory();
                            GMLFactory gmlFac = new GMLFactory();
                            PhysicalSystemImpl system = new PhysicalSystemImpl();
                            system.setUniqueIdentifier("TEST" + count++);
                            system.setName("blablabla");
                            system.setDescription("this is the description of my sensor that can be pretty long");
                            IdentifierList identifierList = smlFac.newIdentifierList();
                            system.addIdentification(identifierList);
                            Term term = smlFac.newTerm();
                            term.setDefinition(SWEHelper.getPropertyUri((String)"Manufacturer"));
                            term.setLabel("Manufacturer Name");
                            term.setValue("My manufacturer");
                            identifierList.addIdentifier2(term);
                            term = smlFac.newTerm();
                            term.setDefinition(SWEHelper.getPropertyUri((String)"ModelNumber"));
                            term.setLabel("Model Number");
                            term.setValue("SENSOR_2365");
                            identifierList.addIdentifier2(term);
                            term = smlFac.newTerm();
                            term.setDefinition(SWEHelper.getPropertyUri((String)"SerialNumber"));
                            term.setLabel("Serial Number");
                            term.setValue("FZEFZE154618989");
                            identifierList.addIdentifier2(term);
                            TimePosition timePos = gmlFac.newTimePosition((double)startCount + (double)System.currentTimeMillis() / 1000.0);
                            TimeInstant validTime = gmlFac.newTimeInstant(timePos);
                            system.addValidTimeAsTimeInstant(validTime);
                            AbstractTestBasicStorage.this.storage.storeDataSourceDescription((AbstractProcess)system);
                            AbstractTestBasicStorage abstractTestBasicStorage = AbstractTestBasicStorage.this;
                            synchronized (abstractTestBasicStorage) {
                                ++AbstractTestBasicStorage.this.numWrittenMetadataObj;
                            }
                            Thread.sleep(5L);
                        }
                    }
                    catch (Throwable e) {
                        errors.add(e);
                    }
                    long stopTimeOffset = System.currentTimeMillis() - AbstractTestBasicStorage.this.refTime;
                    System.out.format("End Write Desc Thread %d @%dms\n", Thread.currentThread().getId(), stopTimeOffset);
                }
            });
        }
    }

    protected void checkForAsyncErrors(Collection<Throwable> errors) throws Throwable {
        System.out.println(errors.size() + " error(s)");
        for (Throwable e : errors) {
            e.printStackTrace();
        }
        if (!errors.isEmpty()) {
            throw errors.iterator().next();
        }
    }

    protected void checkRecordsInStorage(DataComponent recordDef) throws Throwable {
        System.out.println(this.numWrittenRecords + " records written");
        int recordCount = this.storage.getNumRecords(recordDef.getName());
        Assert.assertEquals((String)"Wrong number of records in storage", (long)this.numWrittenRecords, (long)recordCount);
        recordCount = 0;
        Iterator it = this.storage.getRecordIterator((IDataFilter)new DataFilter(recordDef.getName()));
        while (it.hasNext()) {
            it.next();
            ++recordCount;
        }
        Assert.assertEquals((String)"Wrong number of records returned by iterator", (long)this.numWrittenRecords, (long)recordCount);
    }

    protected void checkMetadataInStorage() throws Throwable {
        System.out.println(this.numWrittenMetadataObj + " metadata objects written");
        int descCount = 0;
        List descList = this.storage.getDataSourceDescriptionHistory(Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY);
        for (AbstractProcess desc : descList) {
            Assert.assertTrue((boolean)(desc instanceof PhysicalSystem));
            Assert.assertEquals((Object)"blablabla", (Object)desc.getName());
            Assert.assertTrue((boolean)desc.getUniqueIdentifier().startsWith("TEST"));
            ++descCount;
        }
        Assert.assertEquals((String)"Wrong number of metadata objects in storage", (long)this.numWrittenMetadataObj, (long)descCount);
        AbstractProcess desc = this.storage.getLatestDataSourceDescription();
        Assert.assertTrue((boolean)(desc instanceof PhysicalSystem));
    }

    @Test
    public void testConcurrentWriteRecords() throws Throwable {
        DataComponent recordDef = this.createDs2();
        ExecutorService exec = Executors.newCachedThreadPool();
        Collection<Throwable> errors = Collections.synchronizedCollection(new ArrayList());
        int numWriteThreads = 10;
        int testDurationMs = 2000;
        this.refTime = System.currentTimeMillis();
        this.startWriteRecordsThreads(exec, numWriteThreads, recordDef, 0.1, testDurationMs, errors);
        exec.shutdown();
        exec.awaitTermination(testDurationMs * 2, TimeUnit.MILLISECONDS);
        this.forceReadBackFromStorage();
        this.checkRecordsInStorage(recordDef);
        this.checkForAsyncErrors(errors);
    }

    @Test
    public void testConcurrentWriteMetadata() throws Throwable {
        ExecutorService exec = Executors.newCachedThreadPool();
        Collection<Throwable> errors = Collections.synchronizedCollection(new ArrayList());
        int numWriteThreads = 10;
        int testDurationMs = 2000;
        this.refTime = System.currentTimeMillis();
        this.numWriteThreadsRunning = 1;
        this.startWriteMetadataThreads(exec, numWriteThreads, errors);
        Thread.sleep(testDurationMs);
        this.numWriteThreadsRunning = 0;
        exec.shutdown();
        exec.awaitTermination(testDurationMs * 2, TimeUnit.MILLISECONDS);
        this.forceReadBackFromStorage();
        this.checkMetadataInStorage();
        this.checkForAsyncErrors(errors);
    }

    @Test
    public void testConcurrentWriteThenReadRecords() throws Throwable {
        DataComponent recordDef = this.createDs2();
        ExecutorService exec = Executors.newCachedThreadPool();
        Collection<Throwable> errors = Collections.synchronizedCollection(new ArrayList());
        int numWriteThreads = 10;
        int numReadThreads = 10;
        int testDurationMs = 1000;
        double timeStep = 0.1;
        this.refTime = System.currentTimeMillis();
        this.startWriteRecordsThreads(exec, numWriteThreads, recordDef, timeStep, testDurationMs, errors);
        exec.shutdown();
        exec.awaitTermination(testDurationMs * 2, TimeUnit.MILLISECONDS);
        exec = Executors.newCachedThreadPool();
        this.numWriteThreadsRunning = 1;
        this.checkForAsyncErrors(errors);
        this.forceReadBackFromStorage();
        errors.clear();
        this.startReadRecordsThreads(exec, numReadThreads, recordDef, timeStep, errors);
        Thread.sleep(testDurationMs);
        this.numWriteThreadsRunning = 0;
        exec.shutdown();
        exec.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        this.checkForAsyncErrors(errors);
        this.checkRecordsInStorage(recordDef);
    }

    @Test
    public void testConcurrentReadWriteRecords() throws Throwable {
        DataComponent recordDef = this.createDs2();
        ExecutorService exec = Executors.newCachedThreadPool();
        Collection<Throwable> errors = Collections.synchronizedCollection(new ArrayList());
        int numWriteThreads = 10;
        int numReadThreads = 10;
        int testDurationMs = 2000;
        double timeStep = 0.1;
        this.refTime = System.currentTimeMillis();
        this.startWriteRecordsThreads(exec, numWriteThreads, recordDef, timeStep, testDurationMs, errors);
        this.startReadRecordsThreads(exec, numReadThreads, recordDef, timeStep, errors);
        exec.shutdown();
        exec.awaitTermination(testDurationMs * 200, TimeUnit.MILLISECONDS);
        this.forceReadBackFromStorage();
        this.checkForAsyncErrors(errors);
        this.checkRecordsInStorage(recordDef);
    }

    @Test
    public void testConcurrentReadWriteMetadataAndRecords() throws Throwable {
        DataComponent recordDef = this.createDs2();
        ExecutorService exec = Executors.newCachedThreadPool();
        Collection<Throwable> errors = Collections.synchronizedCollection(new ArrayList());
        int numWriteThreads = 10;
        int numReadThreads = 10;
        int testDurationMs = 3000;
        double timeStep = 0.1;
        this.refTime = System.currentTimeMillis();
        this.startWriteRecordsThreads(exec, numWriteThreads, recordDef, timeStep, testDurationMs, errors);
        this.startWriteMetadataThreads(exec, numWriteThreads, errors);
        this.startReadRecordsThreads(exec, numReadThreads, recordDef, timeStep, errors);
        exec.shutdown();
        exec.awaitTermination(testDurationMs * 2, TimeUnit.MILLISECONDS);
        this.forceReadBackFromStorage();
        this.checkForAsyncErrors(errors);
        this.checkRecordsInStorage(recordDef);
        this.checkMetadataInStorage();
    }
}

