IGNITE-7437: Partition based dataset implementation
authordmitrievanthony <dmitrievanthony@gmail.com>
Sun, 4 Feb 2018 18:06:02 +0000 (21:06 +0300)
committerYury Babak <ybabak@gridgain.com>
Sun, 4 Feb 2018 18:06:02 +0000 (21:06 +0300)
this closes #3410

61 files changed:
examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java [new file with mode: 0644]
examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java [new file with mode: 0644]
examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java [new file with mode: 0644]
examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java [new file with mode: 0644]
examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java [new file with mode: 0644]
examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java [new file with mode: 0644]
examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java [new file with mode: 0644]
examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java [new file with mode: 0644]
modules/ml/pom.xml
modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapper.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorage.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDataset.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapper.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleDataset.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleLabeledDataset.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/EmptyContextBuilder.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleDatasetDataBuilder.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleLabeledDatasetDataBuilder.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/EmptyContext.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleDatasetData.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleLabeledDatasetData.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/PreprocessingTrainer.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPartitionData.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPreprocessor.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationTrainer.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/package-info.java [new file with mode: 0644]
modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/package-info.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/IgniteMLTestSuite.java
modules/ml/src/test/java/org/apache/ignite/ml/dataset/DatasetTestSuite.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapperTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorageTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapperTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/PreprocessingTestSuite.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPreprocessorTest.java [new file with mode: 0644]
modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationTrainerTest.java [new file with mode: 0644]

diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java
new file mode 100644 (file)
index 0000000..98f85cd
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.dataset;
+
+import com.github.fommil.netlib.BLAS;
+import java.io.Serializable;
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.primitive.DatasetWrapper;
+import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData;
+
+/**
+ * Example that shows how to implement your own algorithm (gradient descent trainer for linear regression) which uses
+ * dataset as an underlying infrastructure.
+ *
+ * The common idea behind using algorithm specific datasets is to write a simple local version algorithm at first, then
+ * find operations which involves data manipulations, and finally define algorithm specific version of the dataset
+ * extended by introducing these new operations. As result your algorithm will work with extended dataset (based on
+ * {@link DatasetWrapper}) in a sequential manner.
+ *
+ * In this example we need to implement gradient descent. This is iterative method that involves calculation of gradient
+ * on every step. In according with the common idea we defines {@link AlgorithmSpecificDataset} - extended version
+ * of {@code Dataset} with {@code gradient} method. As result our gradient descent method looks like a simple loop where
+ * every iteration includes call of the {@code gradient} method. In the example we want to keep iteration number as well
+ * for logging. Iteration number cannot be recovered from the {@code upstream} data and we need to keep it in the custom
+ * partition {@code context} which is represented by {@link AlgorithmSpecificPartitionContext} class.
+ */
+public class AlgorithmSpecificDatasetExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Algorithm Specific Dataset example started.");
+
+            IgniteCache<Integer, Person> persons = createCache(ignite);
+
+            // Creates a algorithm specific dataset to perform linear regression. Here we defines the way features and
+            // labels are extracted, and partition data and context are created.
+            try (AlgorithmSpecificDataset dataset = DatasetFactory.create(
+                ignite,
+                persons,
+                (upstream, upstreamSize) -> new AlgorithmSpecificPartitionContext(),
+                new SimpleLabeledDatasetDataBuilder<Integer, Person, AlgorithmSpecificPartitionContext>(
+                    (k, v) -> new double[] {v.getAge()},
+                    (k, v) -> v.getSalary(),
+                    1
+                ).andThen((data, ctx) -> {
+                    double[] features = data.getFeatures();
+                    int rows = data.getRows();
+
+                    // Makes a copy of features to supplement it by columns with values equal to 1.0.
+                    double[] a = new double[features.length + rows];
+
+                    for (int i = 0; i < rows; i++)
+                        a[i] = 1.0;
+
+                    System.arraycopy(features, 0, a, rows, features.length);
+
+                    return new SimpleLabeledDatasetData(a, rows, data.getCols() + 1, data.getLabels());
+                })
+            ).wrap(AlgorithmSpecificDataset::new)) {
+                // Trains linear regression model using gradient descent.
+                double[] linearRegressionMdl = new double[2];
+
+                for (int i = 0; i < 1000; i++) {
+                    double[] gradient = dataset.gradient(linearRegressionMdl);
+
+                    if (BLAS.getInstance().dnrm2(gradient.length, gradient, 1) < 1e-4)
+                        break;
+
+                    for (int j = 0; j < gradient.length; j++)
+                        linearRegressionMdl[j] -= 0.1 / persons.size() * gradient[j];
+                }
+
+                System.out.println("Linear Regression Model: " + Arrays.toString(linearRegressionMdl));
+            }
+
+            System.out.println(">>> Algorithm Specific Dataset example completed.");
+        }
+    }
+
+    /**
+     * Algorithm specific dataset. Extended version of {@code Dataset} with {@code gradient} method.
+     */
+    private static class AlgorithmSpecificDataset
+        extends DatasetWrapper<AlgorithmSpecificPartitionContext, SimpleLabeledDatasetData> {
+        /** BLAS (Basic Linear Algebra Subprograms) instance. */
+        private static final BLAS blas = BLAS.getInstance();
+
+        /**
+         * Constructs a new instance of dataset wrapper that delegates {@code compute} actions to the actual delegate.
+         *
+         * @param delegate Delegate that performs {@code compute} actions.
+         */
+        AlgorithmSpecificDataset(
+            Dataset<AlgorithmSpecificPartitionContext, SimpleLabeledDatasetData> delegate) {
+            super(delegate);
+        }
+
+        /** Calculate gradient. */
+        double[] gradient(double[] x) {
+            return computeWithCtx((ctx, data, partIdx) -> {
+                double[] tmp = Arrays.copyOf(data.getLabels(), data.getRows());
+                blas.dgemv("N", data.getRows(), data.getCols(), 1.0, data.getFeatures(),
+                    Math.max(1, data.getRows()), x, 1, -1.0, tmp, 1);
+
+                double[] res = new double[data.getCols()];
+                blas.dgemv("T", data.getRows(), data.getCols(), 1.0, data.getFeatures(),
+                    Math.max(1, data.getRows()), tmp, 1, 0.0, res, 1);
+
+                int iteration = ctx.getIteration();
+
+                System.out.println("Iteration " + iteration + " on partition " + partIdx
+                    + " completed with local result " + Arrays.toString(res));
+
+                ctx.setIteration(iteration + 1);
+
+                return res;
+            }, this::sum);
+        }
+
+        /** Sum of two vectors. */
+        public double[] sum(double[] a, double[] b) {
+            if (a == null)
+                return b;
+
+            if (b == null)
+                return a;
+
+            blas.daxpy(a.length, 1.0, a, 1, b, 1);
+
+            return b;
+        }
+    }
+
+    /**
+     * Algorithm specific partition context which keeps iteration number.
+     */
+    private static class AlgorithmSpecificPartitionContext implements Serializable {
+        /** */
+        private static final long serialVersionUID = 1887368924266684044L;
+
+        /** Iteration number. */
+        private int iteration;
+
+        /** */
+        public int getIteration() {
+            return iteration;
+        }
+
+        /** */
+        public void setIteration(int iteration) {
+            this.iteration = iteration;
+        }
+    }
+
+    /** */
+    private static IgniteCache<Integer, Person> createCache(Ignite ignite) {
+        CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<>();
+
+        cacheConfiguration.setName("PERSONS");
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 2));
+
+        IgniteCache<Integer, Person> persons = ignite.createCache(cacheConfiguration);
+
+        persons.put(1, new Person("Mike", 1, 1));
+        persons.put(2, new Person("John", 2, 2));
+        persons.put(3, new Person("George", 3, 3));
+        persons.put(4, new Person("Karl", 4, 4));
+
+        return persons;
+    }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java
new file mode 100644 (file)
index 0000000..b1413ad
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.dataset;
+
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+
+/**
+ * Example that shows how to create dataset based on an existing Ignite Cache and then use it to calculate {@code mean}
+ * and {@code std} values as well as {@code covariance} and {@code correlation} matrices.
+ */
+public class CacheBasedDatasetExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Cache Based Dataset example started.");
+
+            IgniteCache<Integer, Person> persons = createCache(ignite);
+
+            // Creates a cache based simple dataset containing features and providing standard dataset API.
+            try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset(
+                ignite,
+                persons,
+                (k, v) -> new double[]{ v.getAge(), v.getSalary() },
+                2
+            )) {
+                // Calculation of the mean value. This calculation will be performed in map-reduce manner.
+                double[] mean = dataset.mean();
+                System.out.println("Mean \n\t" + Arrays.toString(mean));
+
+                // Calculation of the standard deviation. This calculation will be performed in map-reduce manner.
+                double[] std = dataset.std();
+                System.out.println("Standard deviation \n\t" + Arrays.toString(std));
+
+                // Calculation of the covariance matrix.  This calculation will be performed in map-reduce manner.
+                double[][] cov = dataset.cov();
+                System.out.println("Covariance matrix ");
+                for (double[] row : cov)
+                    System.out.println("\t" + Arrays.toString(row));
+
+                // Calculation of the correlation matrix.  This calculation will be performed in map-reduce manner.
+                double[][] corr = dataset.corr();
+                System.out.println("Correlation matrix ");
+                for (double[] row : corr)
+                    System.out.println("\t" + Arrays.toString(row));
+            }
+
+            System.out.println(">>> Cache Based Dataset example completed.");
+        }
+    }
+
+    /** */
+    private static IgniteCache<Integer, Person> createCache(Ignite ignite) {
+        CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<>();
+
+        cacheConfiguration.setName("PERSONS");
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 2));
+
+        IgniteCache<Integer, Person> persons = ignite.createCache(cacheConfiguration);
+
+        persons.put(1, new Person("Mike", 42, 10000));
+        persons.put(2, new Person("John", 32, 64000));
+        persons.put(3, new Person("George", 53, 120000));
+        persons.put(4, new Person("Karl", 24, 70000));
+
+        return persons;
+    }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java
new file mode 100644 (file)
index 0000000..af14836
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.dataset;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+
+/**
+ * Example that shows how to create dataset based on an existing local storage and then use it to calculate {@code mean}
+ * and {@code std} values as well as {@code covariance} and {@code correlation} matrices.
+ */
+public class LocalDatasetExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Local Dataset example started.");
+
+            Map<Integer, Person> persons = createCache(ignite);
+
+            // Creates a local simple dataset containing features and providing standard dataset API.
+            try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset(
+                persons,
+                2,
+                (k, v) -> new double[]{ v.getAge(), v.getSalary() },
+                2
+            )) {
+                // Calculation of the mean value. This calculation will be performed in map-reduce manner.
+                double[] mean = dataset.mean();
+                System.out.println("Mean \n\t" + Arrays.toString(mean));
+
+                // Calculation of the standard deviation. This calculation will be performed in map-reduce manner.
+                double[] std = dataset.std();
+                System.out.println("Standard deviation \n\t" + Arrays.toString(std));
+
+                // Calculation of the covariance matrix.  This calculation will be performed in map-reduce manner.
+                double[][] cov = dataset.cov();
+                System.out.println("Covariance matrix ");
+                for (double[] row : cov)
+                    System.out.println("\t" + Arrays.toString(row));
+
+                // Calculation of the correlation matrix.  This calculation will be performed in map-reduce manner.
+                double[][] corr = dataset.corr();
+                System.out.println("Correlation matrix ");
+                for (double[] row : corr)
+                    System.out.println("\t" + Arrays.toString(row));
+            }
+
+            System.out.println(">>> Local Dataset example completed.");
+        }
+    }
+
+    /** */
+    private static Map<Integer, Person> createCache(Ignite ignite) {
+        Map<Integer, Person> persons = new HashMap<>();
+
+        persons.put(1, new Person("Mike", 42, 10000));
+        persons.put(2, new Person("John", 32, 64000));
+        persons.put(3, new Person("George", 53, 120000));
+        persons.put(4, new Person("Karl", 24, 70000));
+
+        return persons;
+    }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java
new file mode 100644 (file)
index 0000000..3770de8
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.dataset.model;
+
+/** Person model. */
+public class Person {
+    /** Name. */
+    private final String name;
+
+    /** Age. */
+    private final double age;
+
+    /** Salary. */
+    private final double salary;
+
+    /**
+     * Constructs a new instance of person.
+     *
+     * @param name Name.
+     * @param age Age.
+     * @param salary Salary.
+     */
+    public Person(String name, double age, double salary) {
+        this.name = name;
+        this.age = age;
+        this.salary = salary;
+    }
+
+    /** */
+    public String getName() {
+        return name;
+    }
+
+    /** */
+    public double getAge() {
+        return age;
+    }
+
+    /** */
+    public double getSalary() {
+        return salary;
+    }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java
new file mode 100644 (file)
index 0000000..86df332
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Models used in machine learning dataset examples.
+ */
+package org.apache.ignite.examples.ml.dataset.model;
\ No newline at end of file
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java
new file mode 100644 (file)
index 0000000..2d0fc1d
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Machine learning dataset examples.
+ */
+package org.apache.ignite.examples.ml.dataset;
\ No newline at end of file
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
new file mode 100644 (file)
index 0000000..008b4ca
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.preprocessing;
+
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.preprocessing.normalization.NormalizationPreprocessor;
+import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer;
+
+/**
+ * Example that shows how to use normalization preprocessor to normalize data.
+ *
+ * Machine learning preprocessors are built as a chain. Most often a first preprocessor is a feature extractor as shown
+ * in this example. The second preprocessor here is a normalization preprocessor which is built on top of the feature
+ * extractor and represents a chain of itself and the underlying feature extractor.
+ */
+public class NormalizationExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Normalization example started.");
+
+            IgniteCache<Integer, Person> persons = createCache(ignite);
+
+            DatasetBuilder<Integer, Person> builder = new CacheBasedDatasetBuilder<>(ignite, persons);
+
+            // Defines first preprocessor that extracts features from an upstream data.
+            IgniteBiFunction<Integer, Person, double[]> featureExtractor = (k, v) -> new double[] {
+                v.getAge(),
+                v.getSalary()
+            };
+
+            // Defines second preprocessor that normalizes features.
+            NormalizationPreprocessor<Integer, Person> preprocessor = new NormalizationTrainer<Integer, Person>()
+                .fit(builder, featureExtractor, 2);
+
+            // Creates a cache based simple dataset containing features and providing standard dataset API.
+            try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset(
+                builder,
+                preprocessor,
+                2
+            )) {
+                // Calculation of the mean value. This calculation will be performed in map-reduce manner.
+                double[] mean = dataset.mean();
+                System.out.println("Mean \n\t" + Arrays.toString(mean));
+
+                // Calculation of the standard deviation. This calculation will be performed in map-reduce manner.
+                double[] std = dataset.std();
+                System.out.println("Standard deviation \n\t" + Arrays.toString(std));
+
+                // Calculation of the covariance matrix.  This calculation will be performed in map-reduce manner.
+                double[][] cov = dataset.cov();
+                System.out.println("Covariance matrix ");
+                for (double[] row : cov)
+                    System.out.println("\t" + Arrays.toString(row));
+
+                // Calculation of the correlation matrix.  This calculation will be performed in map-reduce manner.
+                double[][] corr = dataset.corr();
+                System.out.println("Correlation matrix ");
+                for (double[] row : corr)
+                    System.out.println("\t" + Arrays.toString(row));
+            }
+
+            System.out.println(">>> Normalization example completed.");
+        }
+    }
+
+    /** */
+    private static IgniteCache<Integer, Person> createCache(Ignite ignite) {
+        CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<>();
+
+        cacheConfiguration.setName("PERSONS");
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 2));
+
+        IgniteCache<Integer, Person> persons = ignite.createCache(cacheConfiguration);
+
+        persons.put(1, new Person("Mike", 42, 10000));
+        persons.put(2, new Person("John", 32, 64000));
+        persons.put(3, new Person("George", 53, 120000));
+        persons.put(4, new Person("Karl", 24, 70000));
+
+        return persons;
+    }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java
new file mode 100644 (file)
index 0000000..164f14d
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Machine learning preprocessing examples.
+ */
+package org.apache.ignite.examples.ml.preprocessing;
\ No newline at end of file
index 2e64582..f8d56d7 100644 (file)
             <artifactId>SparseBitSet</artifactId>
             <version>1.0</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java
new file mode 100644 (file)
index 0000000..24a2063
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDataset;
+import org.apache.ignite.ml.dataset.impl.local.LocalDataset;
+import org.apache.ignite.ml.math.functions.IgniteBiConsumer;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
+import org.apache.ignite.ml.math.functions.IgniteConsumer;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.functions.IgniteTriConsumer;
+import org.apache.ignite.ml.math.functions.IgniteTriFunction;
+
+/**
+ * A dataset providing an API that allows to perform generic computations on a distributed data represented as a set of
+ * partitions distributed across a cluster or placed locally. Every partition contains a {@code context} (reliably
+ * stored segment) and {@code data} (unreliably stored segment, which can be recovered from an upstream data and a
+ * {@code context} if needed). Computations are performed in a {@code MapReduce} manner, what allows to reduce a
+ * network traffic for most of the machine learning algorithms.
+ *
+ * <p>Dataset functionality allows to implement iterative machine learning algorithms via introducing computation
+ * context. In case iterative algorithm requires to maintain a state available and updatable on every iteration this
+ * state can be stored in the {@code context} of the partition and after that it will be available in further
+ * computations even if the Ignite Cache partition will be moved to another node because of node failure or rebalancing.
+ *
+ * <p>Partition {@code context} should be {@link Serializable} to be saved in Ignite Cache. Partition {@code data}
+ * should be {@link AutoCloseable} to allow system to clean up correspondent resources when partition {@code data} is
+ * not needed anymore.
+ *
+ * @param <C> Type of a partition {@code context}.
+ * @param <D> Type of a partition {@code data}.
+ *
+ * @see CacheBasedDataset
+ * @see LocalDataset
+ * @see DatasetFactory
+ */
+public interface Dataset<C extends Serializable, D extends AutoCloseable> extends AutoCloseable {
+    /**
+     * Applies the specified {@code map} function to every partition {@code data}, {@code context} and partition
+     * index in the dataset and then reduces {@code map} results to final result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data}, {@code context} and partition index.
+     * @param reduce Function applied to results of {@code map} to get final result.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity);
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} and partition index in the dataset
+     * and then reduces {@code map} results to final result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data} and partition index.
+     * @param reduce Function applied to results of {@code map} to get final result.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity);
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data}, {@code context} and partition
+     * index in the dataset and then reduces {@code map} results to final result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data}, {@code context} and partition index.
+     * @param reduce Function applied to results of {@code map} to get final result.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce) {
+        return computeWithCtx(map, reduce, null);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} and partition index in the dataset
+     * and then reduces {@code map} results to final result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data} and partition index.
+     * @param reduce Function applied to results of {@code map} to get final result.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce) {
+        return compute(map, reduce, null);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} and {@code context} in the dataset
+     * and then reduces {@code map} results to final result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data} and {@code context}.
+     * @param reduce Function applied to results of {@code map} to get final result.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R computeWithCtx(IgniteBiFunction<C, D, R> map, IgniteBinaryOperator<R> reduce, R identity) {
+        return computeWithCtx((ctx, data, partIdx) -> map.apply(ctx, data), reduce, identity);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} in the dataset and then reduces
+     * {@code map} results to final result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data}.
+     * @param reduce Function applied to results of {@code map} to get final result.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R compute(IgniteFunction<D, R> map, IgniteBinaryOperator<R> reduce, R identity) {
+        return compute((data, partIdx) -> map.apply(data), reduce, identity);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} and {@code context} in the dataset
+     * and then reduces {@code map} results to final result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data} and {@code context}.
+     * @param reduce Function applied to results of {@code map} to get final result.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R computeWithCtx(IgniteBiFunction<C, D, R> map, IgniteBinaryOperator<R> reduce) {
+        return computeWithCtx((ctx, data, partIdx) -> map.apply(ctx, data), reduce);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} in the dataset and then reduces
+     * {@code map} results to final result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data}.
+     * @param reduce Function applied to results of {@code map} to get final result.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R compute(IgniteFunction<D, R> map, IgniteBinaryOperator<R> reduce) {
+        return compute((data, partIdx) -> map.apply(data), reduce);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data}, {@code context} and partition
+     * index in the dataset.
+     *
+     * @param map Function applied to every partition {@code data}, {@code context} and partition index.
+     */
+    default public void computeWithCtx(IgniteTriConsumer<C, D, Integer> map) {
+        computeWithCtx((ctx, data, partIdx) -> {
+            map.accept(ctx, data, partIdx);
+            return null;
+        }, (a, b) -> null);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} in the dataset and partition index.
+     *
+     * @param map Function applied to every partition {@code data} and partition index.
+     */
+    default public void compute(IgniteBiConsumer<D, Integer> map) {
+        compute((data, partIdx) -> {
+            map.accept(data, partIdx);
+            return null;
+        }, (a, b) -> null);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} and {@code context} in the dataset.
+     *
+     * @param map Function applied to every partition {@code data} and {@code context}.
+     */
+    default public void computeWithCtx(IgniteBiConsumer<C, D> map) {
+        computeWithCtx((ctx, data, partIdx) -> map.accept(ctx, data));
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code data} in the dataset.
+     *
+     * @param map Function applied to every partition {@code data}.
+     */
+    default public void compute(IgniteConsumer<D> map) {
+        compute((data, partIdx) -> map.accept(data));
+    }
+
+    /**
+     * Wraps this dataset into the specified wrapper to introduce new functionality based on {@code compute} and
+     * {@code computeWithCtx} methods.
+     *
+     * @param wrapper Dataset wrapper.
+     * @param <I> Type of a new wrapped dataset.
+     * @return New wrapped dataset.
+     */
+    default public <I extends Dataset<C ,D>> I wrap(IgniteFunction<Dataset<C, D>, I> wrapper) {
+        return wrapper.apply(this);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java
new file mode 100644 (file)
index 0000000..a6757ff
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
+
+/**
+ * A builder constructing instances of a {@link Dataset}. Implementations of this interface encapsulate logic of
+ * building specific datasets such as allocation required data structures and initialization of {@code context} part of
+ * partitions.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ *
+ * @see CacheBasedDatasetBuilder
+ * @see LocalDatasetBuilder
+ * @see Dataset
+ */
+public interface DatasetBuilder<K, V> {
+    /**
+     * Constructs a new instance of {@link Dataset} that includes allocation required data structures and
+     * initialization of {@code context} part of partitions.
+     *
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Dataset.
+     */
+    public <C extends Serializable, D extends AutoCloseable> Dataset<C, D> build(
+        PartitionContextBuilder<K, V, C> partCtxBuilder, PartitionDataBuilder<K, V, C, D> partDataBuilder);
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java
new file mode 100644 (file)
index 0000000..af44a8a
--- /dev/null
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+import org.apache.ignite.ml.dataset.primitive.SimpleLabeledDataset;
+import org.apache.ignite.ml.dataset.primitive.builder.context.EmptyContextBuilder;
+import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleDatasetDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * Factory providing a client facing API that allows to construct basic and the most frequently used types of dataset.
+ *
+ *
+ * <p>Dataset construction is based on three major concepts: a partition {@code upstream}, {@code context} and
+ * {@code data}. A partition {@code upstream} is a data source, which assumed to be available all the time regardless
+ * node failures and rebalancing events. A partition {@code context} is a part of a partition maintained during the
+ * whole computation process and stored in a reliable storage so that a {@code context} is staying available and
+ * consistent regardless node failures and rebalancing events as well as an {@code upstream}. A partition {@code data}
+ * is a part of partition maintained during a computation process in unreliable local storage such as heap, off-heap or
+ * GPU memory on the node where current computation is performed, so that partition {@code data} can be lost as result
+ * of node failure or rebalancing, but it can be restored from an {@code upstream} and a partition {@code context}.
+ *
+ * <p>A partition {@code context} and {@code data} are built on top of an {@code upstream} by using specified
+ * builders: {@link PartitionContextBuilder} and {@link PartitionDataBuilder} correspondingly. To build a generic
+ * dataset the following approach is used:
+ *
+ * <code>
+ * {@code
+ * Dataset<C, D> dataset = DatasetFactory.create(
+ *     ignite,
+ *     cache,
+ *     partitionContextBuilder,
+ *     partitionDataBuilder
+ * );
+ * }
+ * </code>
+ *
+ * <p>As well as the generic building method {@code create} this factory provides methods that allow to create a
+ * specific dataset types such as method {@code createSimpleDataset} to create {@link SimpleDataset} and method
+ * {@code createSimpleLabeledDataset} to create {@link SimpleLabeledDataset}.
+ *
+ * @see Dataset
+ * @see PartitionContextBuilder
+ * @see PartitionDataBuilder
+ */
+public class DatasetFactory {
+    /**
+     * Creates a new instance of distributed dataset using the specified {@code partCtxBuilder} and
+     * {@code partDataBuilder}. This is the generic methods that allows to create any Ignite Cache based datasets with
+     * any desired partition {@code context} and {@code data}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> ype of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable, D extends AutoCloseable> Dataset<C, D> create(
+        DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+        return datasetBuilder.build(partCtxBuilder, partDataBuilder);
+    }
+    /**
+     * Creates a new instance of distributed dataset using the specified {@code partCtxBuilder} and
+     * {@code partDataBuilder}. This is the generic methods that allows to create any Ignite Cache based datasets with
+     * any desired partition {@code context} and {@code data}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable, D extends AutoCloseable> Dataset<C, D> create(
+        Ignite ignite, IgniteCache<K, V> upstreamCache, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+        return create(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), partCtxBuilder, partDataBuilder);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleDataset} using the specified {@code partCtxBuilder} and
+     * {@code featureExtractor}. This methods determines partition {@code data} to be {@link SimpleDatasetData}, but
+     * allows to use any desired type of partition {@code context}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleDataset<C> createSimpleDataset(
+        DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return create(
+            datasetBuilder,
+            partCtxBuilder,
+            new SimpleDatasetDataBuilder<>(featureExtractor, cols)
+        ).wrap(SimpleDataset::new);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleDataset} using the specified {@code partCtxBuilder} and
+     * {@code featureExtractor}. This methods determines partition {@code data} to be {@link SimpleDatasetData}, but
+     * allows to use any desired type of partition {@code context}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleDataset<C> createSimpleDataset(Ignite ignite,
+        IgniteCache<K, V> upstreamCache, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), partCtxBuilder,
+            featureExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleLabeledDataset} using the specified {@code partCtxBuilder},
+     * {@code featureExtractor} and {@code lbExtractor}. This method determines partition {@code data} to be
+     * {@link SimpleLabeledDatasetData}, but allows to use any desired type of partition {@code context}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleLabeledDataset<C> createSimpleLabeledDataset(
+        DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor, int cols) {
+        return create(
+            datasetBuilder,
+            partCtxBuilder,
+            new SimpleLabeledDatasetDataBuilder<>(featureExtractor, lbExtractor, cols)
+        ).wrap(SimpleLabeledDataset::new);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleLabeledDataset} using the specified {@code partCtxBuilder},
+     * {@code featureExtractor} and {@code lbExtractor}. This method determines partition {@code data} to be
+     * {@link SimpleLabeledDatasetData}, but allows to use any desired type of partition {@code context}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleLabeledDataset<C> createSimpleLabeledDataset(Ignite ignite,
+        IgniteCache<K, V> upstreamCache, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor, int cols) {
+        return createSimpleLabeledDataset(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), partCtxBuilder,
+            featureExtractor, lbExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleDataset} using the specified {@code featureExtractor}. This
+     * methods determines partition {@code context} to be {@link EmptyContext} and partition {@code data} to be
+     * {@link SimpleDatasetData}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleDataset<EmptyContext> createSimpleDataset(DatasetBuilder<K, V> datasetBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(datasetBuilder, new EmptyContextBuilder<>(), featureExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleDataset} using the specified {@code featureExtractor}. This
+     * methods determines partition {@code context} to be {@link EmptyContext} and partition {@code data} to be
+     * {@link SimpleDatasetData}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleDataset<EmptyContext> createSimpleDataset(Ignite ignite, IgniteCache<K, V> upstreamCache,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), featureExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleLabeledDataset} using the specified {@code featureExtractor}
+     * and {@code lbExtractor}. This methods determines partition {@code context} to be {@link EmptyContext} and
+     * partition {@code data} to be {@link SimpleLabeledDatasetData}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleLabeledDataset<EmptyContext> createSimpleLabeledDataset(
+        DatasetBuilder<K, V> datasetBuilder, IgniteBiFunction<K, V, double[]> featureExtractor,
+        IgniteBiFunction<K, V, Double> lbExtractor, int cols) {
+        return createSimpleLabeledDataset(datasetBuilder, new EmptyContextBuilder<>(), featureExtractor, lbExtractor,
+            cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleLabeledDataset} using the specified {@code featureExtractor}
+     * and {@code lbExtractor}. This methods determines partition {@code context} to be {@link EmptyContext} and
+     * partition {@code data} to be {@link SimpleLabeledDatasetData}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleLabeledDataset<EmptyContext> createSimpleLabeledDataset(Ignite ignite,
+        IgniteCache<K, V> upstreamCache, IgniteBiFunction<K, V, double[]> featureExtractor,
+        IgniteBiFunction<K, V, Double> lbExtractor, int cols) {
+        return createSimpleLabeledDataset(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), featureExtractor,
+            lbExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of local dataset using the specified {@code partCtxBuilder} and {@code partDataBuilder}.
+     * This is the generic methods that allows to create any Ignite Cache based datasets with any desired partition
+     * {@code context} and {@code data}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable, D extends AutoCloseable> Dataset<C, D> create(
+        Map<K, V> upstreamMap, int partitions, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+        return create(new LocalDatasetBuilder<>(upstreamMap, partitions), partCtxBuilder, partDataBuilder);
+    }
+
+    /**
+     * Creates a new instance of local {@link SimpleDataset} using the specified {@code partCtxBuilder} and
+     * {@code featureExtractor}. This methods determines partition {@code data} to be {@link SimpleDatasetData}, but
+     * allows to use any desired type of partition {@code context}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleDataset<C> createSimpleDataset(Map<K, V> upstreamMap,
+        int partitions, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(new LocalDatasetBuilder<>(upstreamMap, partitions), partCtxBuilder, featureExtractor,
+            cols);
+    }
+
+    /**
+     * Creates a new instance of local {@link SimpleLabeledDataset} using the specified {@code partCtxBuilder},
+     * {@code featureExtractor} and {@code lbExtractor}. This method determines partition {@code data} to be
+     * {@link SimpleLabeledDatasetData}, but allows to use any desired type of partition {@code context}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleLabeledDataset<C> createSimpleLabeledDataset(
+        Map<K, V> upstreamMap, int partitions, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor, int cols) {
+        return createSimpleLabeledDataset(new LocalDatasetBuilder<>(upstreamMap, partitions), partCtxBuilder,
+            featureExtractor, lbExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of local {@link SimpleDataset} using the specified {@code featureExtractor}. This
+     * methods determines partition {@code context} to be {@link EmptyContext} and partition {@code data} to be
+     * {@link SimpleDatasetData}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleDataset<EmptyContext> createSimpleDataset(Map<K, V> upstreamMap, int partitions,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(new LocalDatasetBuilder<>(upstreamMap, partitions), featureExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of local {@link SimpleLabeledDataset} using the specified {@code featureExtractor}
+     * and {@code lbExtractor}. This methods determines partition {@code context} to be {@link EmptyContext} and
+     * partition {@code data} to be {@link SimpleLabeledDatasetData}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on.
+     * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and build {@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleLabeledDataset<EmptyContext> createSimpleLabeledDataset(Map<K, V> upstreamMap,
+        int partitions, IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor,
+        int cols) {
+        return createSimpleLabeledDataset(new LocalDatasetBuilder<>(upstreamMap, partitions), featureExtractor,
+            lbExtractor, cols);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java
new file mode 100644 (file)
index 0000000..21c9907
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import org.apache.ignite.ml.dataset.primitive.builder.context.EmptyContextBuilder;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+
+/**
+ * Builder that accepts a partition {@code upstream} data and makes partition {@code context}. This builder is used to
+ * build a partition {@code context} and assumed to be called only once for every partition during a dataset
+ * initialization.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ * @param <C> Type of a partition {@code context}.
+ *
+ * @see EmptyContextBuilder
+ */
+@FunctionalInterface
+public interface PartitionContextBuilder<K, V, C extends Serializable> extends Serializable {
+    /**
+     * Builds a new partition {@code context} from an {@code upstream} data.
+     *
+     * @param upstreamData Partition {@code upstream} data.
+     * @param upstreamDataSize Partition {@code upstream} data size.
+     * @return Partition {@code context}.
+     */
+    public C build(Iterator<UpstreamEntry<K, V>> upstreamData, long upstreamDataSize);
+
+    /**
+     * Makes a composed partition {@code context} builder that first builds a {@code context} and then applies the
+     * specified function on the result.
+     *
+     * @param fun Function that applied after first partition {@code context} is built.
+     * @param <C2> New type of a partition {@code context}.
+     * @return Composed partition {@code context} builder.
+     */
+    default public <C2 extends Serializable> PartitionContextBuilder<K, V, C2> andThen(IgniteFunction<C, C2> fun) {
+        return (upstreamData, upstreamDataSize) -> fun.apply(build(upstreamData, upstreamDataSize));
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java
new file mode 100644 (file)
index 0000000..361719f
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleDatasetDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * Builder that accepts a partition {@code upstream} data and partition {@code context} and makes partition
+ * {@code data}. This builder is used to build a partition {@code data} and assumed to be called in all cases when
+ * partition {@code data} not found on the node that performs computation (it might be the result of a previous node
+ * failure or rebalancing).
+ *
+ * @param <K> Type of a key in <tt>upstream</tt> data.
+ * @param <V> Type of a value in <tt>upstream</tt> data.
+ * @param <C> Type of a partition <tt>context</tt>.
+ * @param <D> Type of a partition <tt>data</tt>.
+ * @see SimpleDatasetDataBuilder
+ * @see SimpleLabeledDatasetDataBuilder
+ */
+@FunctionalInterface
+public interface PartitionDataBuilder<K, V, C extends Serializable, D extends AutoCloseable> extends Serializable {
+    /**
+     * Builds a new partition {@code data} from a partition {@code upstream} data and partition {@code context}
+     *
+     * @param upstreamData Partition {@code upstream} data.
+     * @param upstreamDataSize Partition {@code upstream} data size.
+     * @param ctx Partition {@code context}.
+     * @return Partition {@code data}.
+     */
+    public D build(Iterator<UpstreamEntry<K, V>> upstreamData, long upstreamDataSize, C ctx);
+
+    /**
+     * Makes a composed partition {@code data} builder that first builds a {@code data} and then applies the specified
+     * function on the result.
+     *
+     * @param fun Function that applied after first partition {@code data} is built.
+     * @param <D2> New type of a partition {@code data}.
+     * @return Composed partition {@code data} builder.
+     */
+    default public <D2 extends AutoCloseable> PartitionDataBuilder<K, V, C, D2> andThen(
+        IgniteBiFunction<D, C, D2> fun) {
+        return (upstreamData, upstreamDataSize, ctx) -> fun.apply(build(upstreamData, upstreamDataSize, ctx), ctx);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java
new file mode 100644 (file)
index 0000000..58226d9
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+/**
+ * Entry of the {@code upstream}.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class UpstreamEntry<K, V> {
+    /** Key. */
+    private final K key;
+
+    /** Value. */
+    private final V val;
+
+    /**
+     * Constructs a new instance of upstream entry.
+     *
+     * @param key Key.
+     * @param val Value.
+     */
+    public UpstreamEntry(K key, V val) {
+        this.key = key;
+        this.val = val;
+    }
+
+    /** */
+    public K getKey() {
+        return key;
+    }
+
+    /** */
+    public V getValue() {
+        return val;
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
new file mode 100644 (file)
index 0000000..463d496
--- /dev/null
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtils;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.functions.IgniteTriFunction;
+
+/**
+ * An implementation of dataset based on Ignite Cache, which is used as {@code upstream} and as reliable storage for
+ * partition {@code context} as well.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ * @param <C> Type of a partition {@code context}.
+ * @param <D> Type of a partition {@code data}.
+ */
+public class CacheBasedDataset<K, V, C extends Serializable, D extends AutoCloseable>
+    implements Dataset<C, D> {
+    /** Number of retries for the case when one of partitions not found on the node where computation is performed. */
+    private static final int RETRIES = 15 * 60;
+
+    /** Retry interval (ms) for the case when one of partitions not found on the node where computation is performed. */
+    private static final int RETRY_INTERVAL = 1000;
+
+    /** Ignite instance. */
+    private final Ignite ignite;
+
+    /** Ignite Cache with {@code upstream} data. */
+    private final IgniteCache<K, V> upstreamCache;
+
+    /** Ignite Cache with partition {@code context}. */
+    private final IgniteCache<Integer, C> datasetCache;
+
+    /** Partition {@code data} builder. */
+    private final PartitionDataBuilder<K, V, C, D> partDataBuilder;
+
+    /** Dataset ID that is used to identify dataset in local storage on the node where computation is performed. */
+    private final UUID datasetId;
+
+    /**
+     * Constructs a new instance of dataset based on Ignite Cache, which is used as {@code upstream} and as reliable storage for
+     * partition {@code context} as well.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param datasetCache Ignite Cache with partition {@code context}.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param datasetId Dataset ID.
+     */
+    public CacheBasedDataset(Ignite ignite, IgniteCache<K, V> upstreamCache,
+        IgniteCache<Integer, C> datasetCache, PartitionDataBuilder<K, V, C, D> partDataBuilder,
+        UUID datasetId) {
+        this.ignite = ignite;
+        this.upstreamCache = upstreamCache;
+        this.datasetCache = datasetCache;
+        this.partDataBuilder = partDataBuilder;
+        this.datasetId = datasetId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity) {
+        String upstreamCacheName = upstreamCache.getName();
+        String datasetCacheName = datasetCache.getName();
+
+        return computeForAllPartitions(part -> {
+            C ctx = ComputeUtils.getContext(Ignition.localIgnite(), datasetCacheName, part);
+
+            D data = ComputeUtils.getData(
+                Ignition.localIgnite(),
+                upstreamCacheName,
+                datasetCacheName,
+                datasetId,
+                part,
+                partDataBuilder
+            );
+
+            R res = map.apply(ctx, data, part);
+
+            // Saves partition context after update.
+            ComputeUtils.saveContext(Ignition.localIgnite(), datasetCacheName, part, ctx);
+
+            return res;
+        }, reduce, identity);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity) {
+        String upstreamCacheName = upstreamCache.getName();
+        String datasetCacheName = datasetCache.getName();
+
+        return computeForAllPartitions(part -> {
+            D data = ComputeUtils.getData(
+                Ignition.localIgnite(),
+                upstreamCacheName,
+                datasetCacheName,
+                datasetId,
+                part,
+                partDataBuilder
+            );
+
+            return map.apply(data, part);
+        }, reduce, identity);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        datasetCache.destroy();
+    }
+
+    /**
+     * Calls the {@code MapReduce} job specified as the {@code fun} function and the {@code reduce} reducer on all
+     * partitions with guarantee that partitions with the same index of upstream and partition {@code context} caches
+     * will be on the same node during the computation and will not be moved before computation is finished.
+     *
+     * @param fun Function that applies to all partitions.
+     * @param reduce Function that reduces results of {@code fun}.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    private <R> R computeForAllPartitions(IgniteFunction<Integer, R> fun, IgniteBinaryOperator<R> reduce, R identity) {
+        Collection<String> cacheNames = Arrays.asList(datasetCache.getName(), upstreamCache.getName());
+        Collection<R> results = ComputeUtils.affinityCallWithRetries(ignite, cacheNames, fun, RETRIES, RETRY_INTERVAL);
+
+        R res = identity;
+        for (R partRes : results)
+            res = reduce.apply(res, partRes);
+
+        return res;
+    }
+
+    /** */
+    public IgniteCache<K, V> getUpstreamCache() {
+        return upstreamCache;
+    }
+
+    /** */
+    public IgniteCache<Integer, C> getDatasetCache() {
+        return datasetCache;
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java
new file mode 100644 (file)
index 0000000..5c0d583
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.PartitionContextBuilder;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtils;
+import org.apache.ignite.ml.dataset.impl.cache.util.DatasetAffinityFunctionWrapper;
+
+/**
+ * A dataset builder that makes {@link CacheBasedDataset}. Encapsulate logic of building cache based dataset such as
+ * allocation required data structures and initialization of {@code context} part of partitions.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class CacheBasedDatasetBuilder<K, V> implements DatasetBuilder<K, V> {
+    /** Number of retries for the case when one of partitions not found on the node where loading is performed. */
+    private static final int RETRIES = 15 * 60;
+
+    /** Retry interval (ms) for the case when one of partitions not found on the node where loading is performed. */
+    private static final int RETRY_INTERVAL = 1000;
+
+    /** Template of the name of Ignite Cache containing partition {@code context}. */
+    private static final String DATASET_CACHE_TEMPLATE = "%s_DATASET_%s";
+
+    /** Ignite instance. */
+    private final Ignite ignite;
+
+    /** Ignite Cache with {@code upstream} data. */
+    private final IgniteCache<K, V> upstreamCache;
+
+    /**
+     * Constructs a new instance of cache based dataset builder that makes {@link CacheBasedDataset}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     */
+    public CacheBasedDatasetBuilder(Ignite ignite, IgniteCache<K, V> upstreamCache) {
+        this.ignite = ignite;
+        this.upstreamCache = upstreamCache;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <C extends Serializable, D extends AutoCloseable> CacheBasedDataset<K, V, C, D> build(
+        PartitionContextBuilder<K, V, C> partCtxBuilder, PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+        UUID datasetId = UUID.randomUUID();
+
+        // Retrieves affinity function of the upstream Ignite Cache.
+        CacheConfiguration<K, V> upstreamCacheConfiguration = upstreamCache.getConfiguration(CacheConfiguration.class);
+        AffinityFunction upstreamCacheAffinity = upstreamCacheConfiguration.getAffinity();
+
+        // Creates dataset cache configuration with affinity function that mimics to affinity function of the upstream
+        // cache.
+        CacheConfiguration<Integer, C> datasetCacheConfiguration = new CacheConfiguration<>();
+        datasetCacheConfiguration.setName(String.format(DATASET_CACHE_TEMPLATE, upstreamCache.getName(), datasetId));
+        datasetCacheConfiguration.setAffinity(new DatasetAffinityFunctionWrapper(upstreamCacheAffinity));
+
+        IgniteCache<Integer, C> datasetCache = ignite.createCache(datasetCacheConfiguration);
+
+        ComputeUtils.initContext(
+            ignite,
+            upstreamCache.getName(),
+            datasetCache.getName(),
+            partCtxBuilder,
+            RETRIES,
+            RETRY_INTERVAL
+        );
+
+        return new CacheBasedDataset<>(ignite, upstreamCache, datasetCache, partDataBuilder, datasetId);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/package-info.java
new file mode 100644 (file)
index 0000000..74629d7
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Base package for cache based implementation of machine learning dataset.
+ */
+package org.apache.ignite.ml.dataset.impl.cache;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java
new file mode 100644 (file)
index 0000000..0785db2
--- /dev/null
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.LockSupport;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.ml.dataset.PartitionContextBuilder;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+
+/**
+ * Util class that provides common methods to perform computations on top of the Ignite Compute Grid.
+ */
+public class ComputeUtils {
+    /** Template of the key used to store partition {@code data} in local storage. */
+    private static final String DATA_STORAGE_KEY_TEMPLATE = "part_data_storage_%s";
+
+    /**
+     * Calls the specified {@code fun} function on all partitions so that is't guaranteed that partitions with the same
+     * index of all specified caches will be placed on the same node and will not be moved before computation is
+     * finished. If partitions are placed on different nodes then call will be retried, but not more than {@code
+     * retries} times with {@code interval} interval specified in milliseconds.
+     *
+     * @param ignite Ignite instance.
+     * @param cacheNames Collection of cache names.
+     * @param fun Function to be applied on all partitions.
+     * @param retries Number of retries for the case when one of partitions not found on the node.
+     * @param interval Interval of retries for the case when one of partitions not found on the node.
+     * @param <R> Type of a result.
+     * @return Collection of results.
+     */
+    public static <R> Collection<R> affinityCallWithRetries(Ignite ignite, Collection<String> cacheNames,
+        IgniteFunction<Integer, R> fun, int retries, int interval) {
+        assert cacheNames.size() > 0;
+        assert interval >= 0;
+
+        String primaryCache = cacheNames.iterator().next();
+
+        Affinity<?> affinity = ignite.affinity(primaryCache);
+        int partitions = affinity.partitions();
+
+        BitSet completionFlags = new BitSet(partitions);
+        Collection<R> results = new ArrayList<>();
+
+        for (int t = 0; t <= retries; t++) {
+            ClusterGroup clusterGrp = ignite.cluster().forDataNodes(primaryCache);
+
+            // Sends jobs.
+            Map<Integer, IgniteFuture<R>> futures = new HashMap<>();
+            for (int part = 0; part < partitions; part++)
+                if (!completionFlags.get(part)) {
+                    final int currPart = part;
+
+                    futures.put(
+                        currPart,
+                        ignite.compute(clusterGrp).affinityCallAsync(cacheNames, currPart, () -> fun.apply(currPart))
+                    );
+                }
+
+            // Collects results.
+            for (int part : futures.keySet())
+                try {
+                    R res = futures.get(part).get();
+                    results.add(res);
+                    completionFlags.set(part);
+                }
+                catch (IgniteException ignore) {
+                }
+
+            if (completionFlags.cardinality() == partitions)
+                return results;
+
+            LockSupport.parkNanos(interval * 1_000_000);
+        }
+
+        throw new IllegalStateException();
+    }
+
+    /**
+     * Calls the specified {@code fun} function on all partitions so that is't guaranteed that partitions with the same
+     * index of all specified caches will be placed on the same node and will not be moved before computation is
+     * finished. If partitions are placed on different nodes then call will be retried, but not more than {@code
+     * retries} times.
+     *
+     * @param ignite Ignite instance.
+     * @param cacheNames Collection of cache names.
+     * @param fun Function to be applied on all partitions.
+     * @param retries Number of retries for the case when one of partitions not found on the node.
+     * @param <R> Type of a result.
+     * @return Collection of results.
+     */
+    public static <R> Collection<R> affinityCallWithRetries(Ignite ignite, Collection<String> cacheNames,
+        IgniteFunction<Integer, R> fun, int retries) {
+        return affinityCallWithRetries(ignite, cacheNames, fun, retries, 0);
+    }
+
+    /**
+     * Extracts partition {@code data} from the local storage, if it's not found in local storage recovers this {@code
+     * data} from a partition {@code upstream} and {@code context}. Be aware that this method should be called from
+     * the node where partition is placed.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCacheName Name of an {@code upstream} cache.
+     * @param datasetCacheName Name of a partition {@code context} cache.
+     * @param datasetId Dataset ID.
+     * @param part Partition index.
+     * @param partDataBuilder Partition data builder.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Partition {@code data}.
+     */
+    public static <K, V, C extends Serializable, D extends AutoCloseable> D getData(Ignite ignite,
+        String upstreamCacheName, String datasetCacheName, UUID datasetId, int part,
+        PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+
+        PartitionDataStorage dataStorage = (PartitionDataStorage)ignite
+            .cluster()
+            .nodeLocalMap()
+            .computeIfAbsent(String.format(DATA_STORAGE_KEY_TEMPLATE, datasetId), key -> new PartitionDataStorage());
+
+        return dataStorage.computeDataIfAbsent(part, () -> {
+            IgniteCache<Integer, C> learningCtxCache = ignite.cache(datasetCacheName);
+            C ctx = learningCtxCache.get(part);
+
+            IgniteCache<K, V> upstreamCache = ignite.cache(upstreamCacheName);
+
+            ScanQuery<K, V> qry = new ScanQuery<>();
+            qry.setLocal(true);
+            qry.setPartition(part);
+
+            long cnt = upstreamCache.localSizeLong(part);
+            try (QueryCursor<Cache.Entry<K, V>> cursor = upstreamCache.query(qry)) {
+                return partDataBuilder.build(new UpstreamCursorAdapter<>(cursor.iterator(), cnt), cnt, ctx);
+            }
+        });
+    }
+
+    /**
+     * Initializes partition {@code context} by loading it from a partition {@code upstream}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCacheName Name of an {@code upstream} cache.
+     * @param datasetCacheName Name of a partition {@code context} cache.
+     * @param ctxBuilder Partition {@code context} builder.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     */
+    public static <K, V, C extends Serializable> void initContext(Ignite ignite, String upstreamCacheName,
+        String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries, int interval) {
+        affinityCallWithRetries(ignite, Arrays.asList(datasetCacheName, upstreamCacheName), part -> {
+            Ignite locIgnite = Ignition.localIgnite();
+
+            IgniteCache<K, V> locUpstreamCache = locIgnite.cache(upstreamCacheName);
+
+            ScanQuery<K, V> qry = new ScanQuery<>();
+            qry.setLocal(true);
+            qry.setPartition(part);
+
+            long cnt = locUpstreamCache.localSizeLong(part);
+            C ctx;
+            try (QueryCursor<Cache.Entry<K, V>> cursor = locUpstreamCache.query(qry)) {
+                ctx = ctxBuilder.build(new UpstreamCursorAdapter<>(cursor.iterator(), cnt), cnt);
+            }
+
+            IgniteCache<Integer, C> datasetCache = locIgnite.cache(datasetCacheName);
+
+            datasetCache.put(part, ctx);
+
+            return part;
+        }, retries, interval);
+    }
+
+    /**
+     * Initializes partition {@code context} by loading it from a partition {@code upstream}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCacheName Name of an {@code upstream} cache.
+     * @param datasetCacheName Name of a partition {@code context} cache.
+     * @param ctxBuilder Partition {@code context} builder.
+     * @param retries Number of retries for the case when one of partitions not found on the node.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     */
+    public static <K, V, C extends Serializable> void initContext(Ignite ignite, String upstreamCacheName,
+        String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries) {
+        initContext(ignite, upstreamCacheName, datasetCacheName, ctxBuilder, retries, 0);
+    }
+
+    /**
+     * Extracts partition {@code context} from the Ignite Cache.
+     *
+     * @param ignite Ignite instance.
+     * @param datasetCacheName Dataset cache names.
+     * @param part Partition index.
+     * @param <C> Type of a partition {@code context}.
+     * @return Partition {@code context}.
+     */
+    public static <C extends Serializable> C getContext(Ignite ignite, String datasetCacheName, int part) {
+        IgniteCache<Integer, C> datasetCache = ignite.cache(datasetCacheName);
+        return datasetCache.get(part);
+    }
+
+    /**
+     * Saves the specified partition {@code context} into the Ignite Cache.
+     *
+     * @param ignite Ignite instance.
+     * @param datasetCacheName Dataset cache name.
+     * @param part Partition index.
+     * @param <C> Type of a partition {@code context}.
+     */
+    public static <C extends Serializable> void saveContext(Ignite ignite, String datasetCacheName, int part, C ctx) {
+        IgniteCache<Integer, C> datasetCache = ignite.cache(datasetCacheName);
+        datasetCache.put(part, ctx);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapper.java
new file mode 100644 (file)
index 0000000..a8f6826
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Affinity function wrapper that uses key as a partition index and delegates all other functions to specified
+ * delegate.
+ */
+public class DatasetAffinityFunctionWrapper implements AffinityFunction {
+    /** */
+    private static final long serialVersionUID = -8233787063079973753L;
+
+    /** Delegate that actually performs all methods except {@code partition()}. */
+    private final AffinityFunction delegate;
+
+    /**
+     * Constructs a new instance of affinity function wrapper.
+     *
+     * @param delegate Affinity function which actually performs all methods except {@code partition()}.
+     */
+    public DatasetAffinityFunctionWrapper(AffinityFunction delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        delegate.reset();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partitions() {
+        return delegate.partitions();
+    }
+
+    /**
+     * Returns key as a partition index.
+     *
+     * @param key Partition index.
+     * @return Partition index.
+     */
+    @Override public int partition(Object key) {
+        return (Integer) key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+        return delegate.assignPartitions(affCtx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeNode(UUID nodeId) {
+        delegate.removeNode(nodeId);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorage.java
new file mode 100644 (file)
index 0000000..d5c47ee
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Local storage used to keep partition {@code data}.
+ */
+class PartitionDataStorage {
+    /** Storage of a partition {@code data}. */
+    private final ConcurrentMap<Integer, Object> storage = new ConcurrentHashMap<>();
+
+    /** Storage of locks correspondent to partition {@code data} objects. */
+    private final ConcurrentMap<Integer, Lock> locks = new ConcurrentHashMap<>();
+
+    /**
+     * Retrieves partition {@code data} correspondent to specified partition index if it exists in local storage or
+     * loads it using the specified {@code supplier}. Unlike {@link ConcurrentMap#computeIfAbsent(Object, Function)},
+     * this method guarantees that function will be called only once.
+     *
+     * @param <D> Type of data.
+     * @param part Partition index.
+     * @param supplier Partition {@code data} supplier.
+     * @return Partition {@code data}.
+     */
+    @SuppressWarnings("unchecked")
+    <D> D computeDataIfAbsent(int part, Supplier<D> supplier) {
+        Object data = storage.get(part);
+
+        if (data == null) {
+            Lock lock = locks.computeIfAbsent(part, p -> new ReentrantLock());
+
+            lock.lock();
+            try {
+                data = storage.computeIfAbsent(part, p -> supplier.get());
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        return (D)data;
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java
new file mode 100644 (file)
index 0000000..4482af7
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import javax.cache.Cache;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
+
+/**
+ * Cursor adapter used to transform {@code Cache.Entry} received from Ignite Cache query cursor into DLC-specific
+ * {@link UpstreamEntry}.
+ *
+ * @param <K> Type of an upstream value key.
+ * @param <V> Type of an upstream value.
+ */
+public class UpstreamCursorAdapter<K, V> implements Iterator<UpstreamEntry<K, V>> {
+    /** Cache entry iterator. */
+    private final Iterator<Cache.Entry<K, V>> delegate;
+
+    /** Size. */
+    private long cnt;
+
+    /**
+     * Constructs a new instance of iterator.
+     *
+     * @param delegate Cache entry iterator.
+     */
+    UpstreamCursorAdapter(Iterator<Cache.Entry<K, V>> delegate, long cnt) {
+        this.delegate = delegate;
+        this.cnt = cnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        return delegate.hasNext() && cnt > 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UpstreamEntry<K, V> next() {
+        if (cnt == 0)
+            throw new NoSuchElementException();
+
+        cnt--;
+
+        Cache.Entry<K, V> next = delegate.next();
+
+        if (next == null)
+            return null;
+
+        return new UpstreamEntry<>(next.getKey(), next.getValue());
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/package-info.java
new file mode 100644 (file)
index 0000000..89e248f
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains util classes used in cache based implementation of dataset.
+ */
+package org.apache.ignite.ml.dataset.impl.cache.util;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDataset.java
new file mode 100644 (file)
index 0000000..c08b7de
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.local;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
+import org.apache.ignite.ml.math.functions.IgniteTriFunction;
+
+/**
+ * An implementation of dataset based on local data structures such as {@code Map} and {@code List} and doesn't requires
+ * Ignite environment. Introduces for testing purposes mostly, but can be used for simple local computations as well.
+ *
+ * @param <C> Type of a partition {@code context}.
+ * @param <D> Type of a partition {@code data}.
+ */
+public class LocalDataset<C extends Serializable, D extends AutoCloseable> implements Dataset<C, D> {
+    /** Partition {@code context} storage. */
+    private final List<C> ctx;
+
+    /** Partition {@code data} storage. */
+    private final List<D> data;
+
+    /**
+     * Constructs a new instance of dataset based on local data structures such as {@code Map} and {@code List} and
+     * doesn't requires Ignite environment.
+     *
+     * @param ctx Partition {@code context} storage.
+     * @param data Partition {@code data} storage.
+     */
+    LocalDataset(List<C> ctx, List<D> data) {
+        this.ctx = ctx;
+        this.data = data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce,
+        R identity) {
+        R res = identity;
+
+        for (int part = 0; part < ctx.size(); part++)
+            res = reduce.apply(res, map.apply(ctx.get(part), data.get(part), part));
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity) {
+        R res = identity;
+
+        for (int part = 0; part < data.size(); part++)
+            res = reduce.apply(res, map.apply(data.get(part), part));
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // Do nothing, GC will clean up.
+    }
+
+    /** */
+    public List<C> getCtx() {
+        return ctx;
+    }
+
+    /** */
+    public List<D> getData() {
+        return data;
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java
new file mode 100644 (file)
index 0000000..0dc1ed6
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.local;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.PartitionContextBuilder;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+
+/**
+ * A dataset builder that makes {@link LocalDataset}. Encapsulate logic of building local dataset such as allocation
+ * required data structures and initialization of {@code context} part of partitions.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class LocalDatasetBuilder<K, V> implements DatasetBuilder<K, V> {
+    /** {@code Map} with upstream data. */
+    private final Map<K, V> upstreamMap;
+
+    /** Number of partitions. */
+    private final int partitions;
+
+    /**
+     * Constructs a new instance of local dataset builder that makes {@link LocalDataset}.
+     *
+     * @param upstreamMap {@code Map} with upstream data.
+     * @param partitions Number of partitions.
+     */
+    public LocalDatasetBuilder(Map<K, V> upstreamMap, int partitions) {
+        this.upstreamMap = upstreamMap;
+        this.partitions = partitions;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <C extends Serializable, D extends AutoCloseable> LocalDataset<C, D> build(
+        PartitionContextBuilder<K, V, C> partCtxBuilder, PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+        List<C> ctxList = new ArrayList<>();
+        List<D> dataList = new ArrayList<>();
+
+        int partSize = Math.max(1, upstreamMap.size() / partitions);
+
+        Iterator<K> firstKeysIter = upstreamMap.keySet().iterator();
+        Iterator<K> secondKeysIter = upstreamMap.keySet().iterator();
+
+        int ptr = 0;
+        for (int part = 0; part < partitions; part++) {
+            int cnt = part == partitions - 1 ? upstreamMap.size() - ptr : Math.min(partSize, upstreamMap.size() - ptr);
+
+            C ctx = partCtxBuilder.build(
+                new IteratorWindow<>(firstKeysIter, k -> new UpstreamEntry<>(k, upstreamMap.get(k)), cnt),
+                cnt
+            );
+
+            D data = partDataBuilder.build(
+                new IteratorWindow<>(secondKeysIter, k -> new UpstreamEntry<>(k, upstreamMap.get(k)), cnt),
+                cnt,
+                ctx
+            );
+
+            ctxList.add(ctx);
+            dataList.add(data);
+
+            ptr += cnt;
+        }
+
+        return new LocalDataset<>(ctxList, dataList);
+    }
+
+    /**
+     * Utils class that wraps iterator so that it produces only specified number of entries and allows to transform
+     * entries from one type to another.
+     *
+     * @param <K> Initial type of entries.
+     * @param <T> Target type of entries.
+     */
+    private static class IteratorWindow<K, T> implements Iterator<T> {
+        /** Delegate iterator. */
+        private final Iterator<K> delegate;
+
+        /** Transformer that transforms entries from one type to another. */
+        private final IgniteFunction<K, T> map;
+
+        /** Count of entries to produce. */
+        private final int cnt;
+
+        /** Number of already produced entries. */
+        private int ptr;
+
+        /**
+         * Constructs a new instance of iterator window wrapper.
+         *
+         * @param delegate Delegate iterator.
+         * @param map Transformer that transforms entries from one type to another.
+         * @param cnt Count of entries to produce.
+         */
+        IteratorWindow(Iterator<K> delegate, IgniteFunction<K, T> map, int cnt) {
+            this.delegate = delegate;
+            this.map = map;
+            this.cnt = cnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return delegate.hasNext() && ptr < cnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public T next() {
+            ++ptr;
+
+            return map.apply(delegate.next());
+        }
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/package-info.java
new file mode 100644 (file)
index 0000000..2b1b195
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Base package for local implementation of machine learning dataset.
+ */
+package org.apache.ignite.ml.dataset.impl.local;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/package-info.java
new file mode 100644 (file)
index 0000000..031a56a
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Base package for implementations of machine learning dataset.
+ */
+package org.apache.ignite.ml.dataset.impl;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/package-info.java
new file mode 100644 (file)
index 0000000..96a63a7
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Base package for machine learning dataset classes.
+ */
+package org.apache.ignite.ml.dataset;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapper.java
new file mode 100644 (file)
index 0000000..578a149
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
+import org.apache.ignite.ml.math.functions.IgniteTriFunction;
+
+/**
+ * A dataset wrapper that allows to introduce new functionality based on common {@code compute} methods.
+ *
+ * @param <C> Type of a partition {@code context}.
+ * @param <D> Type of a partition {@code data}.
+ *
+ * @see SimpleDataset
+ * @see SimpleLabeledDataset
+ */
+public class DatasetWrapper<C extends Serializable, D extends AutoCloseable> implements Dataset<C, D> {
+    /** Delegate that performs {@code compute} actions. */
+    protected final Dataset<C, D> delegate;
+
+    /**
+     * Constructs a new instance of dataset wrapper that delegates {@code compute} actions to the actual delegate.
+     *
+     * @param delegate Delegate that performs {@code compute} actions.
+     */
+    public DatasetWrapper(Dataset<C, D> delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce,
+        R identity) {
+        return delegate.computeWithCtx(map, reduce, identity);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity) {
+        return delegate.compute(map, reduce, identity);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        delegate.close();
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleDataset.java
new file mode 100644 (file)
index 0000000..47c0c4b
--- /dev/null
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive;
+
+import com.github.fommil.netlib.BLAS;
+import java.io.Serializable;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData;
+
+/**
+ * A simple dataset introduces additional methods based on a matrix of features.
+ *
+ * @param <C> Type of a partition {@code context}.
+ */
+public class SimpleDataset<C extends Serializable> extends DatasetWrapper<C, SimpleDatasetData> {
+    /** BLAS (Basic Linear Algebra Subprograms) instance. */
+    private static final BLAS blas = BLAS.getInstance();
+
+    /**
+     * Creates a new instance of simple dataset that introduces additional methods based on a matrix of features.
+     *
+     * @param delegate Delegate that performs {@code compute} actions.
+     */
+    public SimpleDataset(Dataset<C, SimpleDatasetData> delegate) {
+        super(delegate);
+    }
+
+    /**
+     * Calculates mean value by all columns.
+     *
+     * @return Mean values.
+     */
+    public double[] mean() {
+        ValueWithCount<double[]> res = delegate.compute((data, partIdx) -> {
+            double[] features = data.getFeatures();
+            int rows = data.getRows();
+            int cols = data.getCols();
+
+            double[] y = new double[cols];
+
+            for (int col = 0; col < cols; col++)
+                for (int j = col * rows; j < (col + 1) * rows; j++)
+                    y[col] += features[j];
+
+            return new ValueWithCount<>(y, rows);
+        }, (a, b) -> a == null ? b : b == null ? a : new ValueWithCount<>(sum(a.val, b.val), a.cnt + b.cnt));
+
+        if (res != null) {
+            blas.dscal(res.val.length, 1.0 / res.cnt, res.val, 1);
+            return res.val;
+        }
+
+        return null;
+    }
+
+    /**
+     * Calculates standard deviation by all columns.
+     *
+     * @return Standard deviations.
+     */
+    public double[] std() {
+        double[] mean = mean();
+        ValueWithCount<double[]> res = delegate.compute(data -> {
+            double[] features = data.getFeatures();
+            int rows = data.getRows();
+            int cols = data.getCols();
+
+            double[] y = new double[cols];
+
+            for (int col = 0; col < cols; col++)
+                for (int j = col * rows; j < (col + 1) * rows; j++)
+                    y[col] += Math.pow(features[j] - mean[col], 2);
+
+            return new ValueWithCount<>(y, rows);
+        }, (a, b) -> a == null ? b : b == null ? a : new ValueWithCount<>(sum(a.val, b.val), a.cnt + b.cnt));
+
+        if (res != null) {
+            blas.dscal(res.val.length, 1.0 / res.cnt, res.val, 1);
+            for (int i = 0; i < res.val.length; i++)
+                res.val[i] = Math.sqrt(res.val[i]);
+            return res.val;
+        }
+
+        return null;
+    }
+
+    /**
+     * Calculates covariance matrix by all columns.
+     *
+     * @return Covariance matrix.
+     */
+    public double[][] cov() {
+        double[] mean = mean();
+        ValueWithCount<double[][]> res = delegate.compute(data -> {
+            double[] features = data.getFeatures();
+            int rows = data.getRows();
+            int cols = data.getCols();
+
+            double[][] y = new double[cols][cols];
+
+            for (int firstCol = 0; firstCol < cols; firstCol++)
+                for (int secondCol = 0; secondCol < cols; secondCol++) {
+
+                    for (int k = 0; k < rows; k++) {
+                        double firstVal = features[rows * firstCol + k];
+                        double secondVal = features[rows * secondCol + k];
+                        y[firstCol][secondCol] += ((firstVal - mean[firstCol]) * (secondVal - mean[secondCol]));
+                    }
+                }
+
+            return new ValueWithCount<>(y, rows);
+        }, (a, b) -> a == null ? b : b == null ? a : new ValueWithCount<>(sum(a.val, b.val), a.cnt + b.cnt));
+
+        return res != null ? scale(res.val, 1.0 / res.cnt) : null;
+    }
+
+    /**
+     * Calculates correlation matrix by all columns.
+     *
+     * @return Correlation matrix.
+     */
+    public double[][] corr() {
+        double[][] cov = cov();
+        double[] std = std();
+
+        for (int i = 0; i < cov.length; i++)
+            for (int j = 0; j < cov[0].length; j++)
+                cov[i][j] /= (std[i]*std[j]);
+
+        return cov;
+    }
+
+    /**
+     * Returns the sum of the two specified vectors.  Be aware that it is in-place operation.
+     *
+     * @param a First vector.
+     * @param b Second vector.
+     * @return Sum of the two specified vectors.
+     */
+    private static double[] sum(double[] a, double[] b) {
+        for (int i = 0; i < a.length; i++)
+            a[i] += b[i];
+
+        return a;
+    }
+
+    /**
+     * Returns the sum of the two specified matrices. Be aware that it is in-place operation.
+     *
+     * @param a First matrix.
+     * @param b Second matrix.
+     * @return Sum of the two specified matrices.
+     */
+    private static double[][] sum(double[][] a, double[][] b) {
+        for (int i = 0; i < a.length; i++)
+            for (int j = 0; j < a[i].length; j++)
+                a[i][j] += b[i][j];
+
+        return a;
+    }
+
+    /**
+     * Multiplies all elements of the specified matrix on specified multiplier {@code alpha}. Be aware that it is
+     * in-place operation.
+     *
+     * @param a Matrix to be scaled.
+     * @param alpha Multiplier.
+     * @return Scaled matrix.
+     */
+    private static double[][] scale(double[][] a, double alpha) {
+        for (int i = 0; i < a.length; i++)
+            for (int j = 0; j < a[i].length; j++)
+                a[i][j] *= alpha;
+
+        return a;
+    }
+
+    /**
+     * Util class that keeps values and count of rows this value has been calculated on.
+     *
+     * @param <V> Type of a value.
+     */
+    private static class ValueWithCount<V> {
+        /** Value. */
+        private final V val;
+
+        /** Count of rows the value has been calculated on. */
+        private final int cnt;
+
+        /**
+         * Constructs a new instance of value with count.
+         *
+         * @param val Value.
+         * @param cnt Count of rows the value has been calculated on.
+         */
+        ValueWithCount(V val, int cnt) {
+            this.val = val;
+            this.cnt = cnt;
+        }
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleLabeledDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleLabeledDataset.java
new file mode 100644 (file)
index 0000000..1e91eec
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData;
+
+/**
+ * A simple labeled dataset introduces additional methods based on a matrix of features and labels vector.
+ *
+ * @param <C> Type of a partition {@code context}.
+ */
+public class SimpleLabeledDataset<C extends Serializable> extends DatasetWrapper<C, SimpleLabeledDatasetData> {
+    /**
+     * Creates a new instance of simple labeled dataset that introduces additional methods based on a matrix of features
+     * and labels vector.
+     *
+     * @param delegate Delegate that performs {@code compute} actions.
+     */
+    public SimpleLabeledDataset(Dataset<C, SimpleLabeledDatasetData> delegate) {
+        super(delegate);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/EmptyContextBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/EmptyContextBuilder.java
new file mode 100644 (file)
index 0000000..03b69b5
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive.builder.context;
+
+import java.util.Iterator;
+import org.apache.ignite.ml.dataset.PartitionContextBuilder;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
+import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
+
+/**
+ * A partition {@code context} builder that makes {@link EmptyContext}.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class EmptyContextBuilder<K, V> implements PartitionContextBuilder<K, V, EmptyContext> {
+    /** */
+    private static final long serialVersionUID = 6620781747993467186L;
+
+    /** {@inheritDoc} */
+    @Override public EmptyContext build(Iterator<UpstreamEntry<K, V>> upstreamData, long upstreamDataSize) {
+        return new EmptyContext();
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/package-info.java
new file mode 100644 (file)
index 0000000..90d51df
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains partition {@code context} builders.
+ */
+package org.apache.ignite.ml.dataset.primitive.builder.context;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleDatasetDataBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleDatasetDataBuilder.java
new file mode 100644 (file)
index 0000000..6f29e2f
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive.builder.data;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * A partition {@code data} builder that makes {@link SimpleDatasetData}.
+ *
+ * @param <K> Type of a key in <tt>upstream</tt> data.
+ * @param <V> Type of a value in <tt>upstream</tt> data.
+ * @param <C> Type of a partition <tt>context</tt>.
+ */
+public class SimpleDatasetDataBuilder<K, V, C extends Serializable>
+    implements PartitionDataBuilder<K, V, C, SimpleDatasetData> {
+    /** */
+    private static final long serialVersionUID = 756800193212149975L;
+
+    /** Function that extracts features from an {@code upstream} data. */
+    private final IgniteBiFunction<K, V, double[]> featureExtractor;
+
+    /** Number of columns (features). */
+    private final int cols;
+
+    /**
+     * Construct a new instance of partition {@code data} builder that makes {@link SimpleDatasetData}.
+     *
+     * @param featureExtractor Function that extracts features from an {@code upstream} data.
+     * @param cols Number of columns (features).
+     */
+    public SimpleDatasetDataBuilder(IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        this.featureExtractor = featureExtractor;
+        this.cols = cols;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SimpleDatasetData build(Iterator<UpstreamEntry<K, V>> upstreamData, long upstreamDataSize, C ctx) {
+        // Prepares the matrix of features in flat column-major format.
+        double[] features = new double[Math.toIntExact(upstreamDataSize * cols)];
+
+        int ptr = 0;
+        while (upstreamData.hasNext()) {
+            UpstreamEntry<K, V> entry = upstreamData.next();
+            double[] row = featureExtractor.apply(entry.getKey(), entry.getValue());
+
+            assert row.length == cols : "Feature extractor must return exactly " + cols + " features";
+
+            for (int i = 0; i < cols; i++)
+                features[Math.toIntExact(i * upstreamDataSize + ptr)] = row[i];
+
+            ptr++;
+        }
+
+        return new SimpleDatasetData(features, Math.toIntExact(upstreamDataSize), cols);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleLabeledDatasetDataBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleLabeledDatasetDataBuilder.java
new file mode 100644 (file)
index 0000000..12fcc4c
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive.builder.data;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * A partition {@code data} builder that makes {@link SimpleLabeledDatasetData}.
+ *
+ * @param <K> Type of a key in <tt>upstream</tt> data.
+ * @param <V> Type of a value in <tt>upstream</tt> data.
+ * @param <C> type of a partition <tt>context</tt>.
+ */
+public class SimpleLabeledDatasetDataBuilder<K, V, C extends Serializable>
+    implements PartitionDataBuilder<K, V, C, SimpleLabeledDatasetData> {
+    /** */
+    private static final long serialVersionUID = 3678784980215216039L;
+
+    /** Function that extracts features from an {@code upstream} data. */
+    private final IgniteBiFunction<K, V, double[]> featureExtractor;
+
+    /** Function that extracts labels from an {@code upstream} data. */
+    private final IgniteBiFunction<K, V, Double> lbExtractor;
+
+    /** Number of columns (features). */
+    private final int cols;
+
+    /**
+     * Constructs a new instance of partition {@code data} builder that makes {@link SimpleLabeledDatasetData}.
+     *
+     * @param featureExtractor Function that extracts features from an {@code upstream} data.
+     * @param lbExtractor Function that extracts labels from an {@code upstream} data.
+     * @param cols Number of columns (features).
+     */
+    public SimpleLabeledDatasetDataBuilder(IgniteBiFunction<K, V, double[]> featureExtractor,
+        IgniteBiFunction<K, V, Double> lbExtractor, int cols) {
+        this.featureExtractor = featureExtractor;
+        this.lbExtractor = lbExtractor;
+        this.cols = cols;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SimpleLabeledDatasetData build(Iterator<UpstreamEntry<K, V>> upstreamData,
+        long upstreamDataSize, C ctx) {
+        // Prepares the matrix of features in flat column-major format.
+        double[] features = new double[Math.toIntExact(upstreamDataSize * cols)];
+        double[] labels = new double[Math.toIntExact(upstreamDataSize)];
+
+        int ptr = 0;
+        while (upstreamData.hasNext()) {
+            UpstreamEntry<K, V> entry = upstreamData.next();
+            double[] row = featureExtractor.apply(entry.getKey(), entry.getValue());
+
+            assert row.length == cols : "Feature extractor must return exactly " + cols + " features";
+
+            for (int i = 0; i < cols; i++)
+                features[Math.toIntExact(i * upstreamDataSize) + ptr] = row[i];
+
+            labels[ptr] = lbExtractor.apply(entry.getKey(), entry.getValue());
+
+            ptr++;
+        }
+
+        return new SimpleLabeledDatasetData(features, Math.toIntExact(upstreamDataSize), cols, labels);
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/package-info.java
new file mode 100644 (file)
index 0000000..3a38043
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains partition {@code data} builders.
+ */
+package org.apache.ignite.ml.dataset.primitive.builder.data;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/package-info.java
new file mode 100644 (file)
index 0000000..7511639
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Base package for partition {@code data} and {@code context} builders.
+ */
+package org.apache.ignite.ml.dataset.primitive.builder;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/EmptyContext.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/EmptyContext.java
new file mode 100644 (file)
index 0000000..12a61d4
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive.context;
+
+import java.io.Serializable;
+
+/**
+ * An empty partition {@code context}.
+ */
+public class EmptyContext implements Serializable {
+    /** */
+    private static final long serialVersionUID = 4108938672110578991L;
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/package-info.java
new file mode 100644 (file)
index 0000000..6d48b7d
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains implementation of partition {@code context}.
+ */
+package org.apache.ignite.ml.dataset.primitive.context;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleDatasetData.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleDatasetData.java
new file mode 100644 (file)
index 0000000..7f82720
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive.data;
+
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+
+/**
+ * A partition {@code data} of the {@link SimpleDataset} containing matrix of features in flat column-major format
+ * stored in heap.
+ */
+public class SimpleDatasetData implements AutoCloseable {
+    /** Matrix of features in a dense flat column-major format. */
+    private final double[] features;
+
+    /** Number of rows. */
+    private final int rows;
+
+    /** Number of columns. */
+    private final int cols;
+
+    /**
+     * Constructs a new instance of partition {@code data} of the {@link SimpleDataset} containing matrix of features in
+     * flat column-major format stored in heap.
+     *
+     * @param features Matrix of features in a dense flat column-major format.
+     * @param rows Number of rows.
+     * @param cols Number of columns.
+     */
+    public SimpleDatasetData(double[] features, int rows, int cols) {
+        this.features = features;
+        this.rows = rows;
+        this.cols = cols;
+    }
+
+    /** */
+    public double[] getFeatures() {
+        return features;
+    }
+
+    /** */
+    public int getRows() {
+        return rows;
+    }
+
+    /** */
+    public int getCols() {
+        return cols;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // Do nothing, GC will clean up.
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleLabeledDatasetData.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleLabeledDatasetData.java
new file mode 100644 (file)
index 0000000..38041f8
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive.data;
+
+import org.apache.ignite.ml.dataset.primitive.SimpleLabeledDataset;
+
+/**
+ * A partition {@code data} of the {@link SimpleLabeledDataset} containing matrix of features in flat column-major
+ * format stored in heap and vector of labels stored in heap as well.
+ */
+public class SimpleLabeledDatasetData implements AutoCloseable {
+    /** Matrix with features in a dense flat column-major format. */
+    private final double[] features;
+
+    /** Number of rows. */
+    private final int rows;
+
+    /** Number of columns. */
+    private final int cols;
+
+    /** Vector with labels. */
+    private final double[] labels;
+
+    /**
+     * Constructs a new instance of partition {@code data} of the {@link SimpleLabeledDataset} containing matrix of
+     * features in flat column-major format stored in heap and vector of labels stored in heap as well.
+     *
+     * @param features Matrix with features in a dense flat column-major format.
+     * @param rows Number of rows.
+     * @param cols Number of columns.
+     * @param labels Vector with labels.
+     */
+    public SimpleLabeledDatasetData(double[] features, int rows, int cols, double[] labels) {
+        this.features = features;
+        this.rows = rows;
+        this.cols = cols;
+        this.labels = labels;
+    }
+
+    /** */
+    public double[] getFeatures() {
+        return features;
+    }
+
+    /** */
+    public int getRows() {
+        return rows;
+    }
+
+    /** */
+    public int getCols() {
+        return cols;
+    }
+
+    /** */
+    public double[] getLabels() {
+        return labels;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // Do nothing, GC will clean up.
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/package-info.java
new file mode 100644 (file)
index 0000000..2676ad8
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains implementation of partition {@code data}.
+ */
+package org.apache.ignite.ml.dataset.primitive.data;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/package-info.java
new file mode 100644 (file)
index 0000000..e4f1c5e
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Package that contains basic primitives build on top of {@link org.apache.ignite.ml.dataset.Dataset}. Primitives are
+ * simple components that can be used in other algorithms based on the dataset infrastructure or for debugging.
+ *
+ * Primitives include partition {@code context} implementations, partition {@code data} implementations and extensions
+ * of dataset. Partition {@code context} and {@code data} implementations can be used in other algorithm in case these
+ * algorithm doesn't need to keep specific data and can work with standard primitive {@code data} or {@code context}.
+ * Extensions of dataset provides basic most often used functions that can be used for debugging or data analysis.
+ */
+package org.apache.ignite.ml.dataset.primitive;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/PreprocessingTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/PreprocessingTrainer.java
new file mode 100644 (file)
index 0000000..cb321e4
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing;
+
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ * @param <T> Type of a value returned by base preprocessor.
+ * @param <R> Type of a value returned by preprocessor fitted by this trainer.
+ */
+public interface PreprocessingTrainer<K, V, T, R> {
+    /**
+     * Fits preprocessor.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param basePreprocessor Base preprocessor.
+     * @param cols Number of columns.
+     * @return Preprocessor.
+     */
+    public IgniteBiFunction<K, V, R> fit(DatasetBuilder<K, V> datasetBuilder,
+        IgniteBiFunction<K, V, T> basePreprocessor, int cols);
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPartitionData.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPartitionData.java
new file mode 100644 (file)
index 0000000..1e7a93f
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.normalization;
+
+/**
+ * Partition data used in normalization preprocessor.
+ *
+ * @see NormalizationTrainer
+ * @see NormalizationPreprocessor
+ */
+public class NormalizationPartitionData implements AutoCloseable {
+    /** Minimal values. */
+    private final double[] min;
+
+    /** Maximum values. */
+    private final double[] max;
+
+    /**
+     * Constructs a new instance of normalization partition data.
+     *
+     * @param min Minimal values.
+     * @param max Maximum values.
+     */
+    public NormalizationPartitionData(double[] min, double[] max) {
+        this.min = min;
+        this.max = max;
+    }
+
+    /** */
+    public double[] getMin() {
+        return min;
+    }
+
+    /** */
+    public double[] getMax() {
+        return max;
+    }
+
+    /** */
+    @Override public void close() {
+        // Do nothing, GC will clean up.
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPreprocessor.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPreprocessor.java
new file mode 100644 (file)
index 0000000..7c94b8f
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.normalization;
+
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * Preprocessing function that makes normalization. From mathematical point of view it's the following function which
+ * is applied to every element in dataset:
+ *
+ * {@code a_i = (a_i - min_i) / (max_i - min_i) for all i},
+ *
+ * where {@code i} is a number of column, {@code max_i} is the value of the maximum element in this columns,
+ * {@code min_i} is the value of the minimal element in this column.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class NormalizationPreprocessor<K, V> implements IgniteBiFunction<K, V, double[]> {
+    /** */
+    private static final long serialVersionUID = 6997800576392623469L;
+
+    /** Minimal values. */
+    private final double[] min;
+
+    /** Maximum values. */
+    private final double[] max;
+
+    /** Base preprocessor. */
+    private final IgniteBiFunction<K, V, double[]> basePreprocessor;
+
+    /**
+     * Constructs a new instance of normalization preprocessor.
+     *
+     * @param min Minimal values.
+     * @param max Maximum values.
+     * @param basePreprocessor Base preprocessor.
+     */
+    public NormalizationPreprocessor(double[] min, double[] max, IgniteBiFunction<K, V, double[]> basePreprocessor) {
+        this.min = min;
+        this.max = max;
+        this.basePreprocessor = basePreprocessor;
+    }
+
+    /**
+     * Applies this preprocessor.
+     *
+     * @param k Key.
+     * @param v Value.
+     * @return Preprocessed row.
+     */
+    @Override public double[] apply(K k, V v) {
+        double[] res = basePreprocessor.apply(k, v);
+
+        assert res.length == min.length;
+        assert res.length == max.length;
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = (res[i] - min[i]) / (max[i] - min[i]);
+
+        return res;
+    }
+
+    /** */
+    public double[] getMin() {
+        return min;
+    }
+
+    /** */
+    public double[] getMax() {
+        return max;
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationTrainer.java
new file mode 100644 (file)
index 0000000..16623ba
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.normalization;
+
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
+import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.preprocessing.PreprocessingTrainer;
+
+/**
+ * Trainer of the normalization preprocessor.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class NormalizationTrainer<K, V> implements PreprocessingTrainer<K, V, double[], double[]> {
+    /** {@inheritDoc} */
+    @Override public NormalizationPreprocessor<K, V> fit(DatasetBuilder<K, V> datasetBuilder,
+        IgniteBiFunction<K, V, double[]> basePreprocessor, int cols) {
+        try (Dataset<EmptyContext, NormalizationPartitionData> dataset = datasetBuilder.build(
+            (upstream, upstreamSize) -> new EmptyContext(),
+            (upstream, upstreamSize, ctx) -> {
+                double[] min = new double[cols];
+                double[] max = new double[cols];
+
+                for (int i = 0; i < cols; i++) {
+                    min[i] = Double.MAX_VALUE;
+                    max[i] = -Double.MAX_VALUE;
+                }
+
+                while (upstream.hasNext()) {
+                    UpstreamEntry<K, V> entity = upstream.next();
+                    double[] row = basePreprocessor.apply(entity.getKey(), entity.getValue());
+                    for (int i = 0; i < cols; i++) {
+                        if (row[i] < min[i])
+                            min[i] = row[i];
+                        if (row[i] > max[i])
+                            max[i] = row[i];
+                    }
+                }
+                return new NormalizationPartitionData(min, max);
+            }
+        )) {
+            double[][] minMax = dataset.compute(
+                data -> new double[][]{ data.getMin(), data.getMax() },
+                (a, b) -> {
+                    if (a == null)
+                        return b;
+
+                    if (b == null)
+                        return a;
+
+                    double[][] res = new double[2][];
+
+                    res[0] = new double[a[0].length];
+                    for (int i = 0; i < res[0].length; i++)
+                        res[0][i] = Math.min(a[0][i], b[0][i]);
+
+                    res[1] = new double[a[1].length];
+                    for (int i = 0; i < res[1].length; i++)
+                        res[1][i] = Math.max(a[1][i], b[1][i]);
+
+                    return res;
+                }
+            );
+
+            return new NormalizationPreprocessor<>(minMax[0], minMax[1], basePreprocessor);
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/package-info.java
new file mode 100644 (file)
index 0000000..5c3146f
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains standardization preprocessor.
+ */
+package org.apache.ignite.ml.preprocessing.normalization;
\ No newline at end of file
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/package-info.java
new file mode 100644 (file)
index 0000000..ca04410
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Base package for machine learning preprocessing classes.
+ */
+package org.apache.ignite.ml.preprocessing;
\ No newline at end of file
index c42efc5..7102d6a 100644 (file)
 package org.apache.ignite.ml;
 
 import org.apache.ignite.ml.clustering.ClusteringTestSuite;
+import org.apache.ignite.ml.dataset.DatasetTestSuite;
 import org.apache.ignite.ml.knn.KNNTestSuite;
 import org.apache.ignite.ml.math.MathImplMainTestSuite;
 import org.apache.ignite.ml.nn.MLPTestSuite;
 import org.apache.ignite.ml.optimization.OptimizationTestSuite;
+import org.apache.ignite.ml.preprocessing.PreprocessingTestSuite;
 import org.apache.ignite.ml.regressions.RegressionsTestSuite;
 import org.apache.ignite.ml.svm.SVMTestSuite;
 import org.apache.ignite.ml.trainers.group.TrainersGroupTestSuite;
@@ -43,7 +45,9 @@ import org.junit.runners.Suite;
     LocalModelsTest.class,
     MLPTestSuite.class,
     TrainersGroupTestSuite.class,
-    OptimizationTestSuite.class
+    OptimizationTestSuite.class,
+    DatasetTestSuite.class,
+    PreprocessingTestSuite.class
 })
 public class IgniteMLTestSuite {
     // No-op.
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/DatasetTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/DatasetTestSuite.java
new file mode 100644 (file)
index 0000000..3be79a4
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilderTest;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetTest;
+import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtilsTest;
+import org.apache.ignite.ml.dataset.impl.cache.util.DatasetAffinityFunctionWrapperTest;
+import org.apache.ignite.ml.dataset.impl.cache.util.PartitionDataStorageTest;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilderTest;
+import org.apache.ignite.ml.dataset.primitive.DatasetWrapperTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Test suite for all tests located in org.apache.ignite.ml.dataset.* package.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    DatasetWrapperTest.class,
+    ComputeUtilsTest.class,
+    DatasetAffinityFunctionWrapperTest.class,
+    PartitionDataStorageTest.class,
+    CacheBasedDatasetBuilderTest.class,
+    CacheBasedDatasetTest.class,
+    LocalDatasetBuilderTest.class
+})
+public class DatasetTestSuite {
+    // No-op.
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java
new file mode 100644 (file)
index 0000000..c35cdc3
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@link CacheBasedDatasetBuilder}.
+ */
+public class CacheBasedDatasetBuilderTest extends GridCommonAbstractTest {
+    /** Number of nodes in grid. */
+    private static final int NODE_COUNT = 10;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 1; i <= NODE_COUNT; i++)
+            startGrid(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        /* Grid instance. */
+        ignite = grid(NODE_COUNT);
+        ignite.configuration().setPeerClassLoadingEnabled(true);
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+    }
+
+    /**
+     * Tests that partitions of the dataset cache are placed on the same nodes as upstream cache.
+     */
+    public void testBuild() {
+        IgniteCache<Integer, String> upstreamCache = createTestCache(100, 10);
+        CacheBasedDatasetBuilder<Integer, String> builder = new CacheBasedDatasetBuilder<>(ignite, upstreamCache);
+
+        CacheBasedDataset<Integer, String, Long, AutoCloseable> dataset = builder.build(
+            (upstream, upstreamSize) -> upstreamSize,
+            (upstream, upstreamSize, ctx) -> null
+        );
+
+        Affinity<Integer> upstreamAffinity = ignite.affinity(upstreamCache.getName());
+        Affinity<Integer> datasetAffinity = ignite.affinity(dataset.getDatasetCache().getName());
+
+        int upstreamPartitions = upstreamAffinity.partitions();
+        int datasetPartitions = datasetAffinity.partitions();
+
+        assertEquals(upstreamPartitions, datasetPartitions);
+
+        for (int part = 0; part < upstreamPartitions; part++) {
+            Collection<ClusterNode> upstreamPartNodes = upstreamAffinity.mapPartitionToPrimaryAndBackups(part);
+            Collection<ClusterNode> datasetPartNodes = datasetAffinity.mapPartitionToPrimaryAndBackups(part);
+
+            assertEqualsCollections(upstreamPartNodes, datasetPartNodes);
+        }
+    }
+
+    /**
+     * Generate an Ignite Cache with the specified size and number of partitions for testing purposes.
+     *
+     * @param size Size of an Ignite Cache.
+     * @param parts Number of partitions.
+     * @return Ignite Cache instance.
+     */
+    private IgniteCache<Integer, String> createTestCache(int size, int parts) {
+        CacheConfiguration<Integer, String> cacheConfiguration = new CacheConfiguration<>();
+        cacheConfiguration.setName(UUID.randomUUID().toString());
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, parts));
+
+        IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration);
+
+        for (int i = 0; i < size; i++)
+            cache.put(i, "DATA_" + i);
+
+        return cache;
+    }
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetTest.java
new file mode 100644 (file)
index 0000000..f9ecb0b
--- /dev/null
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLock;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@link CacheBasedDataset}.
+ */
+public class CacheBasedDatasetTest extends GridCommonAbstractTest {
+    /** Number of nodes in grid. */
+    private static final int NODE_COUNT = 4;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 1; i <= NODE_COUNT; i++)
+            startGrid(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        /* Grid instance. */
+        ignite = grid(NODE_COUNT);
+        ignite.configuration().setPeerClassLoadingEnabled(true);
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+    }
+
+    /**
+     * Tests that partitions of the upstream cache and the partition {@code context} cache are reserved during
+     * computations on dataset. Reservation means that partitions won't be unloaded from the node before computation is
+     * completed.
+     */
+    public void testPartitionExchangeDuringComputeCall() {
+        int partitions = 4;
+
+        IgniteCache<Integer, String> upstreamCache = generateTestData(4, 0);
+
+        CacheBasedDatasetBuilder<Integer, String> builder = new CacheBasedDatasetBuilder<>(ignite, upstreamCache);
+
+        CacheBasedDataset<Integer, String, Long, AutoCloseable> dataset = builder.build(
+            (upstream, upstreamSize) -> upstreamSize,
+            (upstream, upstreamSize, ctx) -> null
+        );
+
+        assertTrue("Before computation all partitions should not be reserved",
+            areAllPartitionsNotReserved(upstreamCache.getName(), dataset.getDatasetCache().getName()));
+
+        UUID numOfStartedComputationsId = UUID.randomUUID();
+        IgniteAtomicLong numOfStartedComputations = ignite.atomicLong(numOfStartedComputationsId.toString(), 0, true);
+
+        UUID computationsLockId = UUID.randomUUID();
+        IgniteLock computationsLock = ignite.reentrantLock(computationsLockId.toString(), false, true, true);
+
+        // lock computations lock to stop computations in the middle
+        computationsLock.lock();
+
+        try {
+            new Thread(() -> dataset.compute((data, partIndex) -> {
+                // track number of started computations
+                ignite.atomicLong(numOfStartedComputationsId.toString(), 0, false).incrementAndGet();
+                ignite.reentrantLock(computationsLockId.toString(), false, true, false).lock();
+                ignite.reentrantLock(computationsLockId.toString(), false, true, false).unlock();
+            })).start();
+            // wait all computations to start
+
+            while (numOfStartedComputations.get() < partitions) {
+            }
+
+            assertTrue("During computation all partitions should be reserved",
+                areAllPartitionsReserved(upstreamCache.getName(), dataset.getDatasetCache().getName()));
+        }
+        finally {
+            computationsLock.unlock();
+        }
+
+        assertTrue("All partitions should be released",
+            areAllPartitionsNotReserved(upstreamCache.getName(), dataset.getDatasetCache().getName()));
+    }
+
+    /**
+     * Tests that partitions of the upstream cache and the partition {@code context} cache are reserved during
+     * computations on dataset. Reservation means that partitions won't be unloaded from the node before computation is
+     * completed.
+     */
+    public void testPartitionExchangeDuringComputeWithCtxCall() {
+        int partitions = 4;
+
+        IgniteCache<Integer, String> upstreamCache = generateTestData(4, 0);
+
+        CacheBasedDatasetBuilder<Integer, String> builder = new CacheBasedDatasetBuilder<>(ignite, upstreamCache);
+
+        CacheBasedDataset<Integer, String, Long, AutoCloseable> dataset = builder.build(
+            (upstream, upstreamSize) -> upstreamSize,
+            (upstream, upstreamSize, ctx) -> null
+        );
+
+        assertTrue("Before computation all partitions should not be reserved",
+            areAllPartitionsNotReserved(upstreamCache.getName(), dataset.getDatasetCache().getName()));
+
+        UUID numOfStartedComputationsId = UUID.randomUUID();
+        IgniteAtomicLong numOfStartedComputations = ignite.atomicLong(numOfStartedComputationsId.toString(), 0, true);
+
+        UUID computationsLockId = UUID.randomUUID();
+        IgniteLock computationsLock = ignite.reentrantLock(computationsLockId.toString(), false, true, true);
+
+        // lock computations lock to stop computations in the middle
+        computationsLock.lock();
+
+        try {
+            new Thread(() -> dataset.computeWithCtx((ctx, data, partIndex) -> {
+                // track number of started computations
+                ignite.atomicLong(numOfStartedComputationsId.toString(), 0, false).incrementAndGet();
+                ignite.reentrantLock(computationsLockId.toString(), false, true, false).lock();
+                ignite.reentrantLock(computationsLockId.toString(), false, true, false).unlock();
+            })).start();
+            // wait all computations to start
+
+            while (numOfStartedComputations.get() < partitions) {
+            }
+
+            assertTrue("During computation all partitions should be reserved",
+                areAllPartitionsReserved(upstreamCache.getName(), dataset.getDatasetCache().getName()));
+        }
+        finally {
+            computationsLock.unlock();
+        }
+
+        assertTrue("All partitions should be released",
+            areAllPartitionsNotReserved(upstreamCache.getName(), dataset.getDatasetCache().getName()));
+    }
+
+    /**
+     * Checks that all partitions of all specified caches are not reserved.
+     *
+     * @param cacheNames Cache names to be checked.
+     * @return {@code true} if all partitions are not reserved, otherwise {@code false}.
+     */
+    private boolean areAllPartitionsNotReserved(String... cacheNames) {
+        return checkAllPartitions(partition -> partition.reservations() == 0, cacheNames);
+    }
+
+    /**
+     * Checks that all partitions of all specified caches not reserved.
+     *
+     * @param cacheNames Cache names to be checked.
+     * @return {@code true} if all partitions are reserved, otherwise {@code false}.
+     */
+    private boolean areAllPartitionsReserved(String... cacheNames) {
+        return checkAllPartitions(partition -> partition.reservations() != 0, cacheNames);
+    }
+
+    /**
+     * Checks that all partitions of all specified caches satisfies the given predicate.
+     *
+     * @param pred Predicate.
+     * @param cacheNames Cache names.
+     * @return {@code true} if all partitions satisfies the given predicate.
+     */
+    private boolean checkAllPartitions(IgnitePredicate<GridDhtLocalPartition> pred, String... cacheNames) {
+        boolean flag = false;
+        long checkingStartTs = System.currentTimeMillis();
+
+        while (!flag && (System.currentTimeMillis() - checkingStartTs) < 30_000) {
+            LockSupport.parkNanos(200 * 1000 * 1000);
+            flag = true;
+
+            for (String cacheName : cacheNames) {
+                IgniteClusterPartitionsState state = IgniteClusterPartitionsState.getCurrentState(cacheName);
+
+                for (IgniteInstancePartitionsState instanceState : state.instances.values())
+                    for (GridDhtLocalPartition partition : instanceState.parts)
+                        if (partition != null)
+                            flag &= pred.apply(partition);
+            }
+        }
+
+        return flag;
+    }
+
+    /**
+     * Aggregated data about cache partitions in Ignite cluster.
+     */
+    private static class IgniteClusterPartitionsState {
+        /** */
+        private final String cacheName;
+
+        /** */
+        private final Map<UUID, IgniteInstancePartitionsState> instances;
+
+        /** */
+        static IgniteClusterPartitionsState getCurrentState(String cacheName) {
+            Map<UUID, IgniteInstancePartitionsState> instances = new HashMap<>();
+
+            for (Ignite ignite : G.allGrids()) {
+                IgniteKernal igniteKernal = (IgniteKernal)ignite;
+                IgniteCacheProxy<?, ?> cache = igniteKernal.context().cache().jcache(cacheName);
+
+                GridDhtCacheAdapter<?, ?> dht = dht(cache);
+
+                GridDhtPartitionTopology top = dht.topology();
+
+                AffinityTopologyVersion topVer = dht.context().shared().exchange().readyAffinityVersion();
+                List<GridDhtLocalPartition> parts = new ArrayList<>();
+                for (int p = 0; p < cache.context().config().getAffinity().partitions(); p++) {
+                    GridDhtLocalPartition part = top.localPartition(p, AffinityTopologyVersion.NONE, false);
+                    parts.add(part);
+                }
+                instances.put(ignite.cluster().localNode().id(), new IgniteInstancePartitionsState(topVer, parts));
+            }
+
+            return new IgniteClusterPartitionsState(cacheName, instances);
+        }
+
+        /** */
+        IgniteClusterPartitionsState(String cacheName,
+            Map<UUID, IgniteInstancePartitionsState> instances) {
+            this.cacheName = cacheName;
+            this.instances = instances;
+        }
+
+        /** */
+        @Override public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("Cache ").append(cacheName).append(" is in following state:").append("\n");
+            for (Map.Entry<UUID, IgniteInstancePartitionsState> e : instances.entrySet()) {
+                UUID instanceId = e.getKey();
+                IgniteInstancePartitionsState instanceState = e.getValue();
+                builder.append("\n\t")
+                    .append("Node ")
+                    .append(instanceId)
+                    .append(" with topology version [")
+                    .append(instanceState.topVer.topologyVersion())
+                    .append(", ")
+                    .append(instanceState.topVer.minorTopologyVersion())
+                    .append("] contains following partitions:")
+                    .append("\n\n");
+                builder.append("\t\t---------------------------------------------------------------------------------");
+                builder.append("--------------------\n");
+                builder.append("\t\t|  ID  |   STATE  |  RELOAD  |  RESERVATIONS  |  SHOULD BE RENTING  |  PRIMARY  |");
+                builder.append("  DATA STORE SIZE  |\n");
+                builder.append("\t\t---------------------------------------------------------------------------------");
+                builder.append("--------------------\n");
+                for (GridDhtLocalPartition partition : instanceState.parts)
+                    if (partition != null) {
+                        builder.append("\t\t")
+                            .append(String.format("| %3d  |", partition.id()))
+                            .append(String.format(" %7s  |", partition.state()))
+                            .append(String.format(" %7s  |", partition.reload()))
+                            .append(String.format(" %13s  |", partition.reservations()))
+                            .append(String.format(" %18s  |", partition.shouldBeRenting()))
+                            .append(String.format(" %8s  |", partition.primary(instanceState.topVer)))
+                            .append(String.format(" %16d  |", partition.dataStore().fullSize()))
+                            .append("\n");
+                        builder.append("\t\t-------------------------------------------------------------------------");
+                        builder.append("----------------------------\n");
+                    }
+            }
+            return builder.toString();
+        }
+    }
+
+    /**
+     * Aggregated data about cache partitions in Ignite instance.
+     */
+    private static class IgniteInstancePartitionsState {
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
+        private final List<GridDhtLocalPartition> parts;
+
+        /** */
+        IgniteInstancePartitionsState(AffinityTopologyVersion topVer,
+            List<GridDhtLocalPartition> parts) {
+            this.topVer = topVer;
+            this.parts = parts;
+        }
+
+        /** */
+        public AffinityTopologyVersion getTopVer() {
+            return topVer;
+        }
+
+        /** */
+        public List<GridDhtLocalPartition> getParts() {
+            return parts;
+        }
+    }
+
+    /**
+     * Generates Ignite Cache with data for tests.
+     *
+     * @return Ignite Cache with data for tests.
+     */
+    private IgniteCache<Integer, String> generateTestData(int partitions, int backups) {
+        CacheConfiguration<Integer, String> cacheConfiguration = new CacheConfiguration<>();
+
+        cacheConfiguration.setName(UUID.randomUUID().toString());
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, partitions));
+        cacheConfiguration.setBackups(backups);
+
+        IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration);
+
+        for (int i = 0; i < 1000; i++)
+            cache.put(i, "TEST" + i);
+
+        return cache;
+    }
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java
new file mode 100644 (file)
index 0000000..4926a90
--- /dev/null
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@link ComputeUtils}.
+ */
+public class ComputeUtilsTest extends GridCommonAbstractTest {
+    /** Number of nodes in grid. */
+    private static final int NODE_COUNT = 10;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 1; i <= NODE_COUNT; i++)
+            startGrid(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        /* Grid instance. */
+        ignite = grid(NODE_COUNT);
+        ignite.configuration().setPeerClassLoadingEnabled(true);
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+    }
+
+    /**
+     * Tests that in case two caches maintain their partitions on different nodes, affinity call won't be completed.
+     */
+    public void testAffinityCallWithRetriesNegative() {
+        ClusterNode node1 = grid(1).cluster().localNode();
+        ClusterNode node2 = grid(2).cluster().localNode();
+
+        String firstCacheName = "CACHE_1_" + UUID.randomUUID();
+        String secondCacheName = "CACHE_2_" + UUID.randomUUID();
+
+        CacheConfiguration<Integer, Integer> cacheConfiguration1 = new CacheConfiguration<>();
+        cacheConfiguration1.setName(firstCacheName);
+        cacheConfiguration1.setAffinity(new TestAffinityFunction(node1));
+        IgniteCache<Integer, Integer> cache1 = ignite.createCache(cacheConfiguration1);
+
+        CacheConfiguration<Integer, Integer> cacheConfiguration2 = new CacheConfiguration<>();
+        cacheConfiguration2.setName(secondCacheName);
+        cacheConfiguration2.setAffinity(new TestAffinityFunction(node2));
+        IgniteCache<Integer, Integer> cache2 = ignite.createCache(cacheConfiguration2);
+
+        try {
+            try {
+                ComputeUtils.affinityCallWithRetries(
+                    ignite,
+                    Arrays.asList(firstCacheName, secondCacheName),
+                    part -> part,
+                    0
+                );
+            }
+            catch (IllegalStateException expectedException) {
+                return;
+            }
+
+            fail("Missing IllegalStateException");
+        }
+        finally {
+            cache1.destroy();
+            cache2.destroy();
+        }
+    }
+
+    /**
+     * Test that in case two caches maintain their partitions on the same node, affinity call will be completed.
+     */
+    public void testAffinityCallWithRetriesPositive() {
+        ClusterNode node = grid(1).cluster().localNode();
+
+        String firstCacheName = "CACHE_1_" + UUID.randomUUID();
+        String secondCacheName = "CACHE_2_" + UUID.randomUUID();
+
+        CacheConfiguration<Integer, Integer> cacheConfiguration1 = new CacheConfiguration<>();
+        cacheConfiguration1.setName(firstCacheName);
+        cacheConfiguration1.setAffinity(new TestAffinityFunction(node));
+        IgniteCache<Integer, Integer> cache1 = ignite.createCache(cacheConfiguration1);
+
+        CacheConfiguration<Integer, Integer> cacheConfiguration2 = new CacheConfiguration<>();
+        cacheConfiguration2.setName(secondCacheName);
+        cacheConfiguration2.setAffinity(new TestAffinityFunction(node));
+        IgniteCache<Integer, Integer> cache2 = ignite.createCache(cacheConfiguration2);
+
+        try (IgniteAtomicLong cnt = ignite.atomicLong("COUNTER_" + UUID.randomUUID(), 0, true)) {
+
+            ComputeUtils.affinityCallWithRetries(ignite, Arrays.asList(firstCacheName, secondCacheName), part -> {
+                Ignite locIgnite = Ignition.localIgnite();
+
+                assertEquals(node, locIgnite.cluster().localNode());
+
+                cnt.incrementAndGet();
+
+                return part;
+            }, 0);
+
+            assertEquals(1, cnt.get());
+        }
+        finally {
+            cache1.destroy();
+            cache2.destroy();
+        }
+    }
+
+    /**
+     * Tests {@code getData()} method.
+     */
+    public void testGetData() {
+        ClusterNode node = grid(1).cluster().localNode();
+
+        String upstreamCacheName = "CACHE_1_" + UUID.randomUUID();
+        String datasetCacheName = "CACHE_2_" + UUID.randomUUID();
+
+        CacheConfiguration<Integer, Integer> upstreamCacheConfiguration = new CacheConfiguration<>();
+        upstreamCacheConfiguration.setName(upstreamCacheName);
+        upstreamCacheConfiguration.setAffinity(new TestAffinityFunction(node));
+        IgniteCache<Integer, Integer> upstreamCache = ignite.createCache(upstreamCacheConfiguration);
+
+        CacheConfiguration<Integer, Integer> datasetCacheConfiguration = new CacheConfiguration<>();
+        datasetCacheConfiguration.setName(datasetCacheName);
+        datasetCacheConfiguration.setAffinity(new TestAffinityFunction(node));
+        IgniteCache<Integer, Integer> datasetCache = ignite.createCache(datasetCacheConfiguration);
+
+        upstreamCache.put(42, 42);
+        datasetCache.put(0, 0);
+
+        UUID datasetId = UUID.randomUUID();
+
+        IgniteAtomicLong cnt = ignite.atomicLong("CNT_" + datasetId, 0, true);
+
+        for (int i = 0; i < 10; i++) {
+            Collection<TestPartitionData> data = ComputeUtils.affinityCallWithRetries(
+                ignite,
+                Arrays.asList(datasetCacheName, upstreamCacheName),
+                part -> ComputeUtils.<Integer, Integer, Serializable, TestPartitionData>getData(
+                    ignite,
+                    upstreamCacheName,
+                    datasetCacheName,
+                    datasetId,
+                    0,
+                    (upstream, upstreamSize, ctx) -> {
+                        cnt.incrementAndGet();
+
+                        assertEquals(1, upstreamSize);
+
+                        UpstreamEntry<Integer, Integer> e = upstream.next();
+                        return new TestPartitionData(e.getKey() + e.getValue());
+                    }
+                ),
+                0
+            );
+
+            assertEquals(1, data.size());
+
+            TestPartitionData dataElement = data.iterator().next();
+            assertEquals(84, dataElement.val.intValue());
+        }
+
+        assertEquals(1, cnt.get());
+    }
+
+    /**
+     * Tests {@code initContext()} method.
+     */
+    public void testInitContext() {
+        ClusterNode node = grid(1).cluster().localNode();
+
+        String upstreamCacheName = "CACHE_1_" + UUID.randomUUID();
+        String datasetCacheName = "CACHE_2_" + UUID.randomUUID();
+
+        CacheConfiguration<Integer, Integer> upstreamCacheConfiguration = new CacheConfiguration<>();
+        upstreamCacheConfiguration.setName(upstreamCacheName);
+        upstreamCacheConfiguration.setAffinity(new TestAffinityFunction(node));
+        IgniteCache<Integer, Integer> upstreamCache = ignite.createCache(upstreamCacheConfiguration);
+
+        CacheConfiguration<Integer, Integer> datasetCacheConfiguration = new CacheConfiguration<>();
+        datasetCacheConfiguration.setName(datasetCacheName);
+        datasetCacheConfiguration.setAffinity(new TestAffinityFunction(node));
+        IgniteCache<Integer, Integer> datasetCache = ignite.createCache(datasetCacheConfiguration);
+
+        upstreamCache.put(42, 42);
+
+        ComputeUtils.<Integer, Integer, Integer>initContext(
+            ignite,
+            upstreamCacheName,
+            datasetCacheName,
+            (upstream, upstreamSize) -> {
+
+                assertEquals(1, upstreamSize);
+
+                UpstreamEntry<Integer, Integer> e = upstream.next();
+                return e.getKey() + e.getValue();
+            },
+            0
+        );
+
+        assertEquals(1, datasetCache.size());
+        assertEquals(84, datasetCache.get(0).intValue());
+    }
+
+    /**
+     * Test partition data.
+     */
+    private static class TestPartitionData implements AutoCloseable {
+        /** Value. */
+        private final Integer val;
+
+        /**
+         * Constructs a new instance of test partition data.
+         *
+         * @param val Value.
+         */
+        TestPartitionData(Integer val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            // Do nothing, GC will clean up.
+        }
+    }
+
+    /**
+     * Affinity function used in tests in this class. Defines one partition and assign it on the specified cluster node.
+     */
+    private static class TestAffinityFunction implements AffinityFunction {
+        /** */
+        private static final long serialVersionUID = -1353725303983563094L;
+
+        /** Cluster node partition will be assigned on. */
+        private final ClusterNode node;
+
+        /**
+         * Constructs a new instance of test affinity function.
+         *
+         * @param node Cluster node partition will be assigned on.
+         */
+        TestAffinityFunction(ClusterNode node) {
+            this.node = node;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // Do nothing.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return 1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+            return Collections.singletonList(Collections.singletonList(node));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            // Do nothing.
+        }
+    }
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapperTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapperTest.java
new file mode 100644 (file)
index 0000000..2628aa6
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link DatasetAffinityFunctionWrapper}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DatasetAffinityFunctionWrapperTest {
+    /** Mocked affinity function. */
+    @Mock
+    private AffinityFunction affinityFunction;
+
+    /** Wrapper. */
+    private DatasetAffinityFunctionWrapper wrapper;
+
+    /** Initialization. */
+    @Before
+    public void beforeTest() {
+        wrapper = new DatasetAffinityFunctionWrapper(affinityFunction);
+    }
+
+    /** Tests {@code reset()} method. */
+    @Test
+    public void testReset() {
+        wrapper.reset();
+
+        verify(affinityFunction, times(1)).reset();
+    }
+
+    /** Tests {@code partitions()} method. */
+    @Test
+    public void testPartitions() {
+        doReturn(42).when(affinityFunction).partitions();
+
+        int partitions = wrapper.partitions();
+
+        assertEquals(42, partitions);
+        verify(affinityFunction, times(1)).partitions();
+    }
+
+    /** Tests {@code partition} method. */
+    @Test
+    public void testPartition() {
+        doReturn(0).when(affinityFunction).partition(eq(42));
+
+        int part = wrapper.partition(42);
+
+        assertEquals(42, part);
+        verify(affinityFunction, times(0)).partition(any());
+    }
+
+    /** Tests {@code assignPartitions()} method. */
+    @Test
+    public void testAssignPartitions() {
+        List<List<ClusterNode>> nodes = Collections.singletonList(Collections.singletonList(mock(ClusterNode.class)));
+
+        doReturn(nodes).when(affinityFunction).assignPartitions(any());
+
+        List<List<ClusterNode>> resNodes = wrapper.assignPartitions(mock(AffinityFunctionContext.class));
+
+        assertEquals(nodes, resNodes);
+        verify(affinityFunction, times(1)).assignPartitions(any());
+    }
+
+    /** Tests {@code removeNode()} method. */
+    @Test
+    public void testRemoveNode() {
+        UUID nodeId = UUID.randomUUID();
+
+        wrapper.removeNode(nodeId);
+
+        verify(affinityFunction, times(1)).removeNode(eq(nodeId));
+    }
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorageTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorageTest.java
new file mode 100644 (file)
index 0000000..eab2be1
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link PartitionDataStorage}.
+ */
+public class PartitionDataStorageTest {
+    /** Data storage. */
+    private PartitionDataStorage dataStorage = new PartitionDataStorage();
+
+    /** Tests {@code computeDataIfAbsent()} method. */
+    @Test
+    public void testComputeDataIfAbsent() {
+        AtomicLong cnt = new AtomicLong();
+
+        for (int i = 0; i < 10; i++) {
+            Integer res = (Integer) dataStorage.computeDataIfAbsent(0, () -> {
+                cnt.incrementAndGet();
+
+                return 42;
+            });
+
+            assertEquals(42, res.intValue());
+        }
+
+        assertEquals(1, cnt.intValue());
+    }
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java
new file mode 100644 (file)
index 0000000..0628580
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.local;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link LocalDatasetBuilder}.
+ */
+public class LocalDatasetBuilderTest {
+    /** Tests {@code build()} method. */
+    @Test
+    public void testBuild() {
+        Map<Integer, Integer> data = new HashMap<>();
+        for (int i = 0; i < 100; i++)
+            data.put(i, i);
+
+        LocalDatasetBuilder<Integer, Integer> builder = new LocalDatasetBuilder<>(data, 10);
+
+        LocalDataset<Serializable, TestPartitionData> dataset = builder.build(
+            (upstream, upstreamSize) -> null,
+            (upstream, upstreamSize, ctx) -> {
+                int[] arr = new int[Math.toIntExact(upstreamSize)];
+
+                int ptr = 0;
+                while (upstream.hasNext())
+                    arr[ptr++] = upstream.next().getValue();
+
+                return new TestPartitionData(arr);
+            }
+        );
+
+        AtomicLong cnt = new AtomicLong();
+
+        dataset.compute((partData, partIdx) -> {
+           cnt.incrementAndGet();
+
+           int[] arr = partData.data;
+
+           assertEquals(10, arr.length);
+
+           for (int i = 0; i < 10; i++)
+               assertEquals(partIdx * 10 + i, arr[i]);
+        });
+
+        assertEquals(10, cnt.intValue());
+    }
+
+    /**
+     * Test partition {@code data}.
+     */
+    private static class TestPartitionData implements AutoCloseable {
+        /** Data. */
+        private int[] data;
+
+        /**
+         * Constructs a new test partition data instance.
+         *
+         * @param data Data.
+         */
+        TestPartitionData(int[] data) {
+            this.data = data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            // Do nothing, GC will clean up.
+        }
+    }
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapperTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapperTest.java
new file mode 100644 (file)
index 0000000..b42b604
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.primitive;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
+import org.apache.ignite.ml.math.functions.IgniteTriFunction;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link DatasetWrapper}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DatasetWrapperTest {
+    /** Mocked dataset. */
+    @Mock
+    private Dataset<Serializable, AutoCloseable> dataset;
+
+    /** Dataset wrapper. */
+    private DatasetWrapper<Serializable, AutoCloseable> wrapper;
+
+    /** Initialization. */
+    @Before
+    public void beforeTest() {
+        wrapper = new DatasetWrapper<>(dataset);
+    }
+
+    /** Tests {@code computeWithCtx()} method. */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testComputeWithCtx() {
+        doReturn(42).when(dataset).computeWithCtx(any(IgniteTriFunction.class), any(), any());
+
+        Integer res = wrapper.computeWithCtx(mock(IgniteTriFunction.class), mock(IgniteBinaryOperator.class), null);
+
+        assertEquals(42, res.intValue());
+        verify(dataset, times(1)).computeWithCtx(any(IgniteTriFunction.class), any(), any());
+    }
+
+    /** Tests {@code compute()} method. */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCompute() {
+        doReturn(42).when(dataset).compute(any(IgniteBiFunction.class), any(), any());
+
+        Integer res = wrapper.compute(mock(IgniteBiFunction.class), mock(IgniteBinaryOperator.class), null);
+
+        assertEquals(42, res.intValue());
+        verify(dataset, times(1)).compute(any(IgniteBiFunction.class), any(), any());
+    }
+
+    /** Tests {@code close()} method. */
+    @Test
+    public void testClose() throws Exception {
+        wrapper.close();
+
+        verify(dataset, times(1)).close();
+    }
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/PreprocessingTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/PreprocessingTestSuite.java
new file mode 100644 (file)
index 0000000..1b25908
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing;
+
+import org.apache.ignite.ml.preprocessing.normalization.NormalizationPreprocessorTest;
+import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainerTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Test suite for all tests located in org.apache.ignite.ml.preprocessing.* package.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    NormalizationPreprocessorTest.class,
+    NormalizationTrainerTest.class
+})
+public class PreprocessingTestSuite {
+    // No-op.
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPreprocessorTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPreprocessorTest.java
new file mode 100644 (file)
index 0000000..c9eb765
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.normalization;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link NormalizationPreprocessor}.
+ */
+public class NormalizationPreprocessorTest {
+    /** Tests {@code apply()} method. */
+    @Test
+    public void testApply() {
+        double[][] data = new double[][]{
+            {2., 4., 1.},
+            {1., 8., 22.},
+            {4., 10., 100.},
+            {0., 22., 300.}
+        };
+
+        NormalizationPreprocessor<Integer, double[]> preprocessor = new NormalizationPreprocessor<>(
+            new double[] {0, 4, 1},
+            new double[] {4, 22, 300},
+            (k, v) -> v
+        );
+
+        double[][] standardData = new double[][]{
+            {2. / 4, (4. - 4.) / 18.,  0.},
+            {1. / 4, (8. - 4.) / 18.,  (22. - 1.) / 299.},
+            {1.,     (10. - 4.) / 18., (100. - 1.) / 299.},
+            {0.,     (22. - 4.) / 18., (300. - 1.) / 299.}
+        };
+
+       for (int i = 0; i < data.length; i++)
+           assertArrayEquals(standardData[i], preprocessor.apply(i, data[i]), 1e-8);
+    }
+}
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationTrainerTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationTrainerTest.java
new file mode 100644 (file)
index 0000000..1548253
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.normalization;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link NormalizationTrainer}.
+ */
+@RunWith(Parameterized.class)
+public class NormalizationTrainerTest {
+    /** Parameters. */
+    @Parameterized.Parameters(name = "Data divided on {0} partitions")
+    public static Iterable<Integer[]> data() {
+        return Arrays.asList(
+            new Integer[] {1},
+            new Integer[] {2},
+            new Integer[] {3},
+            new Integer[] {5},
+            new Integer[] {7},
+            new Integer[] {100},
+            new Integer[] {1000}
+        );
+    }
+
+    /** Number of partitions. */
+    @Parameterized.Parameter
+    public int parts;
+
+    /** Tests {@code fit()} method. */
+    @Test
+    public void testFit() {
+        Map<Integer, double[]> data = new HashMap<>();
+        data.put(1, new double[] {2, 4, 1});
+        data.put(2, new double[] {1, 8, 22});
+        data.put(3, new double[] {4, 10, 100});
+        data.put(4, new double[] {0, 22, 300});
+
+        DatasetBuilder<Integer, double[]> datasetBuilder = new LocalDatasetBuilder<>(data, parts);
+
+        NormalizationTrainer<Integer, double[]> standardizationTrainer = new NormalizationTrainer<>();
+
+        NormalizationPreprocessor<Integer, double[]> preprocessor = standardizationTrainer.fit(
+            datasetBuilder,
+            (k, v) -> v,
+            3
+        );
+
+        assertArrayEquals(new double[] {0, 4, 1}, preprocessor.getMin(), 1e-8);
+        assertArrayEquals(new double[] {4, 22, 300}, preprocessor.getMax(), 1e-8);
+    }
+}