IGNITE-11234: [ML] Add fillCache method to DataStreamGenerator
authorAlexey Platonov <aplatonovv@gmail.com>
Tue, 12 Feb 2019 15:45:02 +0000 (18:45 +0300)
committerYury Babak <ybabak@gridgain.com>
Tue, 12 Feb 2019 15:45:02 +0000 (18:45 +0300)
This closes #6079

examples/src/main/java/org/apache/ignite/examples/ml/util/generators/DatasetCreationExamples.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/util/generators/DataStreamGenerator.java
modules/ml/src/test/java/org/apache/ignite/ml/util/generators/DataStreamGeneratorFillCacheTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/util/generators/DataStreamGeneratorTest.java
modules/ml/src/test/java/org/apache/ignite/ml/util/generators/DataStreamGeneratorTestSuite.java
modules/yardstick/README.txt

diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/util/generators/DatasetCreationExamples.java b/examples/src/main/java/org/apache/ignite/examples/ml/util/generators/DatasetCreationExamples.java
new file mode 100644 (file)
index 0000000..42f0500
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.util.generators;
+
+import java.util.UUID;
+import java.util.stream.DoubleStream;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDataset;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.dataset.primitive.builder.context.EmptyContextBuilder;
+import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleDatasetDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData;
+import org.apache.ignite.ml.environment.LearningEnvironmentBuilder;
+import org.apache.ignite.ml.math.primitives.vector.Vector;
+import org.apache.ignite.ml.structures.LabeledVector;
+import org.apache.ignite.ml.util.generators.DataStreamGenerator;
+import org.apache.ignite.ml.util.generators.primitives.scalar.UniformRandomProducer;
+
+/**
+ * Examples of using {@link DataStreamGenerator} methods for filling cache or creating local datasets.
+ */
+public class DatasetCreationExamples {
+    /**
+     * Size of dataset.
+     */
+    private static final int DATASET_SIZE = 1000;
+
+    /**
+     * Runs example.
+     *
+     * @param args Command line arguments.
+     */
+    public static void main(String[] args) throws Exception {
+        // Creates a simple generator based on 1-dimension vector uniformly distributed between [-1, 1] having mean = 0.0 .
+        DataStreamGenerator generator = new UniformRandomProducer(-1, 1., 0)
+            .vectorize(1)
+            .asDataStream();
+
+        // DataStreamGenerator can be represented as stream of (un-)labeled vectors.
+        double expMean = computeMean(generator.unlabeled());
+        // DataStreamGenerator can fill map of vectors as keys and labels as values.
+        double meanFromMap = computeMean(generator.asMap(DATASET_SIZE).keySet().stream());
+
+        // DataStreamGenerator can prepare local DatasetBuilder with data from gemerator.
+        double meanFromLocDataset;
+        try (Dataset<EmptyContext, SimpleDatasetData> dataset = generator.asDatasetBuilder(DATASET_SIZE, 10)
+            .build(LearningEnvironmentBuilder.defaultBuilder(), new EmptyContextBuilder<>(),
+                new SimpleDatasetDataBuilder<>((k, v) -> k))) {
+
+            meanFromLocDataset = dataset.compute(
+                data -> DoubleStream.of(data.getFeatures()).sum(),
+                (l, r) -> asPrimitive(l) + asPrimitive(r)
+            );
+
+            meanFromLocDataset /= DATASET_SIZE;
+        }
+
+        double meanFromCache;
+        IgniteConfiguration configuration = new IgniteConfiguration().setPeerClassLoadingEnabled(true);
+        try (Ignite ignite = Ignition.start(configuration)) {
+            String cacheName = "TEST_CACHE";
+            IgniteCache<UUID, LabeledVector<Double>> withCustomKeyCache = ignite.getOrCreateCache(
+                new CacheConfiguration<UUID, LabeledVector<Double>>(cacheName)
+                    .setAffinity(new RendezvousAffinityFunction(false, 10))
+            );
+
+            // DataStreamGenerator can fill cache with vectors as values and HashCodes/random UUID/custom keys.
+            generator.fillCacheWithVecUUIDAsKey(DATASET_SIZE, withCustomKeyCache);
+            meanFromCache = computeMean(ignite, withCustomKeyCache);
+            ignite.destroyCache(cacheName);
+        }
+
+        // Results should be near to expected value.
+        System.out.println(String.format("Expected mean from stream: %.2f", expMean));
+        System.out.println(String.format("Mean from map built from stream: %.2f", meanFromMap));
+        System.out.println(String.format("Mean from local dataset filled by data stream: %.2f", meanFromLocDataset));
+        System.out.println(String.format("Mean from cache filled by data stream: %.2f", meanFromCache));
+    }
+
+    /**
+     * Converts null values from Double to 0.
+     *
+     * @param val Value.
+     * @return Double converted to primitive type.
+     */
+    private static double asPrimitive(Double val) {
+        return val == null ? 0.0 : val;
+    }
+
+    /**
+     * Compute mean from stream with size = DATASET_SIZE.
+     *
+     * @param vectors Stream of 1-dimension vectors.
+     * @return mean value.
+     */
+    private static double computeMean(Stream<Vector> vectors) {
+        return vectors.mapToDouble(x -> x.get(0))
+            .limit(DATASET_SIZE)
+            .sum() / DATASET_SIZE;
+    }
+
+    /**
+     * Computes mean value for dataset in cache.
+     *
+     * @param ignite Ignite.
+     * @param cache Cache.
+     * @return mean value computed on cache.
+     */
+    private static double computeMean(Ignite ignite, IgniteCache<UUID, LabeledVector<Double>> cache) {
+        double result;
+        CacheBasedDatasetBuilder<UUID, LabeledVector<Double>> builder = new CacheBasedDatasetBuilder<>(ignite, cache);
+        try (CacheBasedDataset<UUID, LabeledVector<Double>, EmptyContext, SimpleDatasetData> dataset =
+                 builder.build(LearningEnvironmentBuilder.defaultBuilder(),
+                     new EmptyContextBuilder<>(),
+                     new SimpleDatasetDataBuilder<>((k, v) -> v.features()))) {
+
+            result = dataset.compute(
+                data -> DoubleStream.of(data.getFeatures()).sum(),
+                (l, r) -> asPrimitive(l) + asPrimitive(r)
+            );
+
+            result /= DATASET_SIZE;
+        }
+
+        return result;
+    }
+}
index a980250..5468913 100644 (file)
 
 package org.apache.ignite.ml.util.generators;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.ml.dataset.DatasetBuilder;
 import org.apache.ignite.ml.dataset.UpstreamTransformerBuilder;
@@ -30,11 +34,16 @@ import org.apache.ignite.ml.structures.LabeledVector;
 import org.apache.ignite.ml.util.generators.primitives.scalar.RandomProducer;
 
 /**
- * Provides general interface for generation of pseudorandom vectors according to shape defined
- * by logic of specific data stream generator.
+ * Provides general interface for generation of pseudorandom vectors according to shape defined by logic of specific
+ * data stream generator.
  */
 public interface DataStreamGenerator {
     /**
+     * Size of batch for {@link IgniteCache#putAll(Map)}.
+     */
+    public static final int FILL_CACHE_BATCH_SIZE = 1000;
+
+    /**
      * @return Stream of {@link LabeledVector} in according to dataset shape.
      */
     public Stream<LabeledVector<Double>> labeled();
@@ -70,8 +79,8 @@ public interface DataStreamGenerator {
     }
 
     /**
-     * Apply pseudorandom noize to vectors without labels mapping. Such method can be useful in cases
-     * when vectors with different labels should be mixed between them on class bounds.
+     * Apply pseudorandom noize to vectors without labels mapping. Such method can be useful in cases when vectors with
+     * different labels should be mixed between them on class bounds.
      *
      * @param rnd Generator of pseudorandom scalars modifying vector components with label saving.
      * @return Stream of blurred vectors with same labels.
@@ -110,7 +119,8 @@ public interface DataStreamGenerator {
      * @param partitions Partitions count.
      * @return Dataset builder.
      */
-    public default DatasetBuilder<Vector, Double> asDatasetBuilder(int datasetSize, IgniteBiPredicate<Vector, Double> filter,
+    public default DatasetBuilder<Vector, Double> asDatasetBuilder(int datasetSize,
+        IgniteBiPredicate<Vector, Double> filter,
         int partitions) {
 
         return new DatasetBuilderAdapter(this, datasetSize, filter, partitions);
@@ -125,10 +135,54 @@ public interface DataStreamGenerator {
      * @param upstreamTransformerBuilder Upstream transformer builder.
      * @return Dataset builder.
      */
-    public default DatasetBuilder<Vector, Double> asDatasetBuilder(int datasetSize, IgniteBiPredicate<Vector, Double> filter,
+    public default DatasetBuilder<Vector, Double> asDatasetBuilder(int datasetSize,
+        IgniteBiPredicate<Vector, Double> filter,
         int partitions, UpstreamTransformerBuilder upstreamTransformerBuilder) {
 
         return new DatasetBuilderAdapter(this, datasetSize, filter, partitions, upstreamTransformerBuilder);
     }
 
+    /**
+     * Fills given cache with labeled vectors from this generator and user defined mapper from vectors to keys.
+     *
+     * @param datasetSize Rows count to put.
+     * @param cache Cache.
+     * @param keyMapper Mapping from vectors to keys.
+     * @param <K> Key type.
+     */
+    public default <K> void fillCacheWithCustomKey(int datasetSize, IgniteCache<K, LabeledVector<Double>> cache,
+        Function<LabeledVector<Double>, K> keyMapper) {
+
+        Map<K, LabeledVector<Double>> batch = new HashMap<>();
+        labeled().limit(datasetSize).forEach(vec -> {
+            batch.put(keyMapper.apply(vec), vec);
+            if (batch.size() == FILL_CACHE_BATCH_SIZE) {
+                cache.putAll(batch);
+                batch.clear();
+            }
+        });
+
+        if (!batch.isEmpty())
+            cache.putAll(batch);
+    }
+
+    /**
+     * Fills given cache with labeled vectors from this generator as values and their hashcodes as keys.
+     *
+     * @param datasetSize Rows count to put.
+     * @param cache Cache.
+     */
+    public default void fillCacheWithVecHashAsKey(int datasetSize, IgniteCache<Integer, LabeledVector<Double>> cache) {
+        fillCacheWithCustomKey(datasetSize, cache, LabeledVector::hashCode);
+    }
+
+    /**
+     * Fills given cache with labeled vectors from this generator as values and random UUIDs as keys
+     *
+     * @param datasetSize Rows count to put.
+     * @param cache Cache.
+     */
+    public default void fillCacheWithVecUUIDAsKey(int datasetSize, IgniteCache<UUID, LabeledVector<Double>> cache) {
+        fillCacheWithCustomKey(datasetSize, cache, v -> UUID.randomUUID());
+    }
 }
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/util/generators/DataStreamGeneratorFillCacheTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/util/generators/DataStreamGeneratorFillCacheTest.java
new file mode 100644 (file)
index 0000000..97b0c85
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.util.generators;
+
+import java.util.UUID;
+import java.util.stream.DoubleStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDataset;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.dataset.primitive.builder.context.EmptyContextBuilder;
+import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleDatasetDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData;
+import org.apache.ignite.ml.environment.LearningEnvironmentBuilder;
+import org.apache.ignite.ml.structures.LabeledVector;
+import org.apache.ignite.ml.util.generators.primitives.scalar.GaussRandomProducer;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link DataStreamGenerator} cache filling.
+ */
+public class DataStreamGeneratorFillCacheTest extends GridCommonAbstractTest {
+    /** */
+    private Ignite ignite;
+
+    /** */
+    @Before
+    public void before() throws Exception {
+        ignite = startGrid();
+    }
+
+    /** */
+    @After
+    public void after() throws Exception {
+        ignite.close();
+    }
+
+    /** */
+    @Test
+    public void testCacheFilling() {
+        IgniteConfiguration configuration = new IgniteConfiguration().setPeerClassLoadingEnabled(true);
+        String cacheName = "TEST_CACHE";
+        CacheConfiguration<UUID, LabeledVector<Double>> cacheConfiguration =
+            new CacheConfiguration<UUID, LabeledVector<Double>>(cacheName)
+                .setAffinity(new RendezvousAffinityFunction(false, 10));
+        int datasetSize = 5000;
+
+        try (Ignite ignite = Ignition.start(configuration)) {
+            IgniteCache<UUID, LabeledVector<Double>> cache = ignite.getOrCreateCache(cacheConfiguration);
+            DataStreamGenerator generator = new GaussRandomProducer(0).vectorize(1).asDataStream();
+            generator.fillCacheWithVecUUIDAsKey(datasetSize, cache);
+
+            CacheBasedDatasetBuilder<UUID, LabeledVector<Double>> datasetBuilder = new CacheBasedDatasetBuilder<>(ignite, cache);
+            try (CacheBasedDataset<UUID, LabeledVector<Double>, EmptyContext, SimpleDatasetData> dataset =
+                     datasetBuilder.build(LearningEnvironmentBuilder.defaultBuilder(),
+                         new EmptyContextBuilder<>(), new SimpleDatasetDataBuilder<>((k, v) -> v.features()))) {
+
+                StatPair result = dataset.compute(
+                    data -> new StatPair(DoubleStream.of(data.getFeatures()).sum(), data.getRows()),
+                    StatPair::sum
+                );
+
+                assertEquals(datasetSize, result.countOfRows);
+                assertEquals(0.0, result.elementsSum / result.countOfRows, 1e-2);
+            }
+
+            ignite.destroyCache(cacheName);
+        }
+    }
+
+    /** */
+    static class StatPair {
+        /** */
+        private double elementsSum;
+
+        /** */
+        private int countOfRows;
+
+        /** */
+        public StatPair(double elementsSum, int countOfRows) {
+            this.elementsSum = elementsSum;
+            this.countOfRows = countOfRows;
+        }
+
+        /** */
+        static StatPair sum(StatPair left, StatPair right) {
+            if (left == null && right == null)
+                return new StatPair(0, 0);
+            else if (left == null)
+                return right;
+            else if (right == null)
+                return left;
+            else
+                return new StatPair(
+                    right.elementsSum + left.elementsSum,
+                    right.countOfRows + left.countOfRows
+                );
+        }
+    }
+}
index 07a7032..0a57e26 100644 (file)
@@ -158,6 +158,8 @@ public class DataStreamGeneratorTest {
         checkDataset(N / 2, b3, v -> (Double)v.label() < 0);
     }
 
+
+
     /** */
     private void checkDataset(int sampleSize, DatasetBuilder<Vector, Double> datasetBuilder,
         Predicate<LabeledVector> labelCheck) throws Exception {
index 640edb9..b9d47fe 100644 (file)
@@ -41,7 +41,8 @@ import org.junit.runners.Suite;
     VectorGeneratorPrimitivesTest.class,
     VectorGeneratorsFamilyTest.class,
     VectorGeneratorTest.class,
-    DataStreamGeneratorTest.class
+    DataStreamGeneratorTest.class,
+    DataStreamGeneratorFillCacheTest.class
 })
 public class DataStreamGeneratorTestSuite {
 }
index 768f783..71ed387 100644 (file)
@@ -16,9 +16,7 @@ Running Ignite Benchmarks Locally
 The simplest way to start with benchmarking is to use one of the executable scripts available under `benchmarks\bin`
 directory:
 
-modules/yardstick/target
 ./bin/benchmark-run-all.sh config/benchmark-sample.properties
-modules/yardstick/target/assembly/bin/benchmark-run-all.sh modules/yardstick/target/assembly/config/benchmark-ml.properties
 
 The command above will benchmark the cache put operation for a distributed atomic cache. The results of the
 benchmark will be added to an auto-generated `output/results-{DATE-TIME}` directory.