From 919936bed1cc5eeb07759edfdb0d0d137b9cb13c Mon Sep 17 00:00:00 2001
From: Waterkin <1055905911@qq.com>
Date: Wed, 17 Jan 2024 10:37:54 +0800
Subject: [PATCH 1/2] Fix a bug while building website.
Signed-off-by: Waterkin <1055905911@qq.com>
---
.../docs/operators/feature/univariatefeatureselector.md | 3 +++
1 file changed, 3 insertions(+)
diff --git a/docs/content/docs/operators/feature/univariatefeatureselector.md b/docs/content/docs/operators/feature/univariatefeatureselector.md
index 873919e21..c12288c2e 100644
--- a/docs/content/docs/operators/feature/univariatefeatureselector.md
+++ b/docs/content/docs/operators/feature/univariatefeatureselector.md
@@ -220,3 +220,6 @@ for result in t_env.to_data_stream(output).execute_and_collect():
'\tOutput Value: ' + str(result[output_index]))
```
+{{< /tab >}}
+
+{{< /tab>}}
\ No newline at end of file
From 10007c1a8d995765073c8d7d2c820d46fa9bd20a Mon Sep 17 00:00:00 2001
From: Waterkin <1055905911@qq.com>
Date: Wed, 17 Jan 2024 10:39:26 +0800
Subject: [PATCH 2/2] Add a Chinese version documentation.
Signed-off-by: Waterkin <1055905911@qq.com>
---
docs/config.toml | 11 +
docs/content.zh/_index.md | 51 +++
docs/content.zh/docs/development/_index.md | 26 ++
.../docs/development/build-and-install.md | 111 ++++++
docs/content.zh/docs/development/iteration.md | 225 +++++++++++
docs/content.zh/docs/development/overview.md | 246 ++++++++++++
docs/content.zh/docs/development/types.md | 74 ++++
docs/content.zh/docs/operators/_index.md | 25 ++
.../docs/operators/classification/_index.md | 25 ++
.../docs/operators/classification/knn.md | 216 +++++++++++
.../operators/classification/linearsvc.md | 197 ++++++++++
.../classification/logisticregression.md | 364 +++++++++++++++++
.../operators/classification/naivebayes.md | 192 +++++++++
.../docs/operators/clustering/_index.md | 25 ++
.../clustering/agglomerativeclustering.md | 181 +++++++++
.../docs/operators/clustering/kmeans.md | 329 ++++++++++++++++
.../docs/operators/evaluation/_index.md | 25 ++
.../binaryclassificationevaluator.md | 192 +++++++++
.../docs/operators/feature/_index.md | 25 ++
.../docs/operators/feature/binarizer.md | 183 +++++++++
.../docs/operators/feature/bucketizer.md | 179 +++++++++
.../docs/operators/feature/countvectorizer.md | 182 +++++++++
docs/content.zh/docs/operators/feature/dct.md | 151 ++++++++
.../operators/feature/elementwiseproduct.md | 157 ++++++++
.../docs/operators/feature/featurehasher.md | 177 +++++++++
.../docs/operators/feature/hashingtf.md | 165 ++++++++
docs/content.zh/docs/operators/feature/idf.md | 172 ++++++++
.../docs/operators/feature/imputer.md | 196 ++++++++++
.../docs/operators/feature/indextostring.md | 184 +++++++++
.../docs/operators/feature/interaction.md | 169 ++++++++
.../operators/feature/kbinsdiscretizer.md | 185 +++++++++
.../docs/operators/feature/maxabsscaler.md | 180 +++++++++
.../docs/operators/feature/minhashlsh.md | 288 ++++++++++++++
.../docs/operators/feature/minmaxscaler.md | 180 +++++++++
.../docs/operators/feature/ngram.md | 155 ++++++++
.../docs/operators/feature/normalizer.md | 154 ++++++++
.../docs/operators/feature/onehotencoder.md | 161 ++++++++
.../operators/feature/onlinestandardscaler.md | 259 +++++++++++++
.../operators/feature/polynomialexpansion.md | 160 ++++++++
.../docs/operators/feature/randomsplitter.md | 148 +++++++
.../docs/operators/feature/regextokenizer.md | 156 ++++++++
.../docs/operators/feature/robustscaler.md | 211 ++++++++++
.../docs/operators/feature/sqltransformer.md | 142 +++++++
.../docs/operators/feature/standardscaler.md | 158 ++++++++
.../operators/feature/stopwordsremover.md | 165 ++++++++
.../docs/operators/feature/stringindexer.md | 219 +++++++++++
.../docs/operators/feature/tokenizer.md | 148 +++++++
.../feature/univariatefeatureselector.md | 225 +++++++++++
.../feature/variancethresholdselector.md | 189 +++++++++
.../docs/operators/feature/vectorassembler.md | 193 +++++++++
.../docs/operators/feature/vectorindexer.md | 210 ++++++++++
.../docs/operators/feature/vectorslicer.md | 158 ++++++++
docs/content.zh/docs/operators/functions.md | 236 +++++++++++
.../docs/operators/recommendation/_index.md | 25 ++
.../docs/operators/recommendation/swing.md | 194 ++++++++++
.../docs/operators/regression/_index.md | 25 ++
.../operators/regression/linearregression.md | 188 +++++++++
.../content.zh/docs/operators/stats/_index.md | 25 ++
.../docs/operators/stats/chisqtest.md | 187 +++++++++
docs/content.zh/docs/try-flink-ml/_index.md | 25 ++
.../docs/try-flink-ml/java/_index.md | 23 ++
.../java/build-your-own-project.md | 366 ++++++++++++++++++
.../docs/try-flink-ml/java/quick-start.md | 139 +++++++
.../docs/try-flink-ml/python/_index.md | 23 ++
.../docs/try-flink-ml/python/quick-start.md | 338 ++++++++++++++++
docs/content.zh/versions.md | 29 ++
66 files changed, 10322 insertions(+)
create mode 100644 docs/content.zh/_index.md
create mode 100644 docs/content.zh/docs/development/_index.md
create mode 100644 docs/content.zh/docs/development/build-and-install.md
create mode 100644 docs/content.zh/docs/development/iteration.md
create mode 100644 docs/content.zh/docs/development/overview.md
create mode 100644 docs/content.zh/docs/development/types.md
create mode 100644 docs/content.zh/docs/operators/_index.md
create mode 100644 docs/content.zh/docs/operators/classification/_index.md
create mode 100644 docs/content.zh/docs/operators/classification/knn.md
create mode 100644 docs/content.zh/docs/operators/classification/linearsvc.md
create mode 100644 docs/content.zh/docs/operators/classification/logisticregression.md
create mode 100644 docs/content.zh/docs/operators/classification/naivebayes.md
create mode 100644 docs/content.zh/docs/operators/clustering/_index.md
create mode 100644 docs/content.zh/docs/operators/clustering/agglomerativeclustering.md
create mode 100644 docs/content.zh/docs/operators/clustering/kmeans.md
create mode 100644 docs/content.zh/docs/operators/evaluation/_index.md
create mode 100644 docs/content.zh/docs/operators/evaluation/binaryclassificationevaluator.md
create mode 100644 docs/content.zh/docs/operators/feature/_index.md
create mode 100644 docs/content.zh/docs/operators/feature/binarizer.md
create mode 100644 docs/content.zh/docs/operators/feature/bucketizer.md
create mode 100644 docs/content.zh/docs/operators/feature/countvectorizer.md
create mode 100644 docs/content.zh/docs/operators/feature/dct.md
create mode 100644 docs/content.zh/docs/operators/feature/elementwiseproduct.md
create mode 100644 docs/content.zh/docs/operators/feature/featurehasher.md
create mode 100644 docs/content.zh/docs/operators/feature/hashingtf.md
create mode 100644 docs/content.zh/docs/operators/feature/idf.md
create mode 100644 docs/content.zh/docs/operators/feature/imputer.md
create mode 100644 docs/content.zh/docs/operators/feature/indextostring.md
create mode 100644 docs/content.zh/docs/operators/feature/interaction.md
create mode 100644 docs/content.zh/docs/operators/feature/kbinsdiscretizer.md
create mode 100644 docs/content.zh/docs/operators/feature/maxabsscaler.md
create mode 100644 docs/content.zh/docs/operators/feature/minhashlsh.md
create mode 100644 docs/content.zh/docs/operators/feature/minmaxscaler.md
create mode 100644 docs/content.zh/docs/operators/feature/ngram.md
create mode 100644 docs/content.zh/docs/operators/feature/normalizer.md
create mode 100644 docs/content.zh/docs/operators/feature/onehotencoder.md
create mode 100644 docs/content.zh/docs/operators/feature/onlinestandardscaler.md
create mode 100644 docs/content.zh/docs/operators/feature/polynomialexpansion.md
create mode 100644 docs/content.zh/docs/operators/feature/randomsplitter.md
create mode 100644 docs/content.zh/docs/operators/feature/regextokenizer.md
create mode 100644 docs/content.zh/docs/operators/feature/robustscaler.md
create mode 100644 docs/content.zh/docs/operators/feature/sqltransformer.md
create mode 100644 docs/content.zh/docs/operators/feature/standardscaler.md
create mode 100644 docs/content.zh/docs/operators/feature/stopwordsremover.md
create mode 100644 docs/content.zh/docs/operators/feature/stringindexer.md
create mode 100644 docs/content.zh/docs/operators/feature/tokenizer.md
create mode 100644 docs/content.zh/docs/operators/feature/univariatefeatureselector.md
create mode 100644 docs/content.zh/docs/operators/feature/variancethresholdselector.md
create mode 100644 docs/content.zh/docs/operators/feature/vectorassembler.md
create mode 100644 docs/content.zh/docs/operators/feature/vectorindexer.md
create mode 100644 docs/content.zh/docs/operators/feature/vectorslicer.md
create mode 100644 docs/content.zh/docs/operators/functions.md
create mode 100644 docs/content.zh/docs/operators/recommendation/_index.md
create mode 100644 docs/content.zh/docs/operators/recommendation/swing.md
create mode 100644 docs/content.zh/docs/operators/regression/_index.md
create mode 100644 docs/content.zh/docs/operators/regression/linearregression.md
create mode 100644 docs/content.zh/docs/operators/stats/_index.md
create mode 100644 docs/content.zh/docs/operators/stats/chisqtest.md
create mode 100644 docs/content.zh/docs/try-flink-ml/_index.md
create mode 100644 docs/content.zh/docs/try-flink-ml/java/_index.md
create mode 100644 docs/content.zh/docs/try-flink-ml/java/build-your-own-project.md
create mode 100644 docs/content.zh/docs/try-flink-ml/java/quick-start.md
create mode 100644 docs/content.zh/docs/try-flink-ml/python/_index.md
create mode 100644 docs/content.zh/docs/try-flink-ml/python/quick-start.md
create mode 100644 docs/content.zh/versions.md
diff --git a/docs/config.toml b/docs/config.toml
index 4fc5cff5f..667639f0a 100644
--- a/docs/config.toml
+++ b/docs/config.toml
@@ -71,3 +71,14 @@ pygmentsUseClasses = true
[markup]
[markup.goldmark.renderer]
unsafe = true
+
+[languages]
+[languages.en]
+ languageName = 'English'
+ contentDir = 'content'
+ weight = 1
+
+[languages.zh]
+ languageName = '中文版'
+ contentDir = 'content.zh'
+ weight = 2
\ No newline at end of file
diff --git a/docs/content.zh/_index.md b/docs/content.zh/_index.md
new file mode 100644
index 000000000..b64b14b4d
--- /dev/null
+++ b/docs/content.zh/_index.md
@@ -0,0 +1,51 @@
+---
+title: Apache Flink Machine Learning Library
+type: docs
+bookToc: false
+---
+
+
+# Flink ML: Apache Flink Machine Learning Library
+
+Flink ML is a library which provides machine learning (ML) APIs and
+infrastructures that simplify the building of ML pipelines. Users can implement
+ML algorithms with the standard ML APIs and further use these infrastructures to
+build ML pipelines for both training and inference jobs.
+
+{{< columns >}}
+## Try Flink ML
+
+If you’re interested in playing around with Flink ML, check out our [quick
+start]({{< ref "docs/try-flink-ml/java/quick-start" >}}). It provides a simple
+example to submit and execute a Flink ML job on a Flink cluster.
+
+<--->
+
+## Get Help with Flink ML
+
+If you get stuck, check out our [community support
+resources](https://flink.apache.org/community.html). In particular, Apache
+Flink’s user mailing list is consistently ranked as one of the most active of
+any Apache project, and is a great way to get help quickly.
+
+{{< /columns >}}
+
+Flink ML is developed under the umbrella of [Apache
+Flink](https://flink.apache.org/).
diff --git a/docs/content.zh/docs/development/_index.md b/docs/content.zh/docs/development/_index.md
new file mode 100644
index 000000000..322929622
--- /dev/null
+++ b/docs/content.zh/docs/development/_index.md
@@ -0,0 +1,26 @@
+---
+title: Development
+icon:
+bold: true
+sectionBreak: true
+bookCollapseSection: true
+weight: 2
+---
+
diff --git a/docs/content.zh/docs/development/build-and-install.md b/docs/content.zh/docs/development/build-and-install.md
new file mode 100644
index 000000000..6d4ae0646
--- /dev/null
+++ b/docs/content.zh/docs/development/build-and-install.md
@@ -0,0 +1,111 @@
+---
+title: "Building And Installing Flink ML From Source"
+weight: 999
+type: docs
+aliases:
+- /development/build-and-install.html
+
+---
+
+
+
+# Building And Installing Flink ML From Source
+
+This page covers how to build and install Flink ML from sources.
+
+## Build and Install Java SDK
+
+In order to build Flink ML you need the source code. Either [download the source
+of a release](https://flink.apache.org/downloads.html) or [clone the git
+repository](https://github.com/apache/flink-ml.git).
+
+In addition, you need **Maven 3** and a **JDK** (Java Development Kit). Flink ML
+requires **at least Java 8** to build.
+
+To clone from git, enter:
+
+```bash
+git clone https://github.com/apache/flink-ml.git
+```
+
+The simplest way of building Flink ML is by running:
+
+```bash
+mvn clean install -DskipTests
+```
+
+This instructs [Maven](http://maven.apache.org/) (`mvn`) to first remove all
+existing builds (`clean`) and then create a new Flink ML binary (`install`).
+
+Optionally, you can also specify the Flink version used by Flink ML with maven
+profile. Currently, Flink ML supports running on Flink 1.15, 1.16, and 1.17.
+For example, you can build Flink ML with Flink 1.16 by running:
+```bash
+mvn clean install -DskipTests -Pflink-1.16
+```
+
+After the build finishes, you can acquire the build result in the following path
+from the root directory of Flink ML:
+
+```
+./flink-ml-dist/target/flink-ml-*-bin/flink-ml*/
+```
+
+The `mvn clean install` command would have installed the binary into your local
+Maven repository so other projects can refer to it and grab it from the
+repository. There is no additional step required for installation.
+
+## Build and Install Python SDK
+
+### Prerequisites
+
+1. Building Flink ML Java SDK
+
+ If you want to build Flink ML's Python SDK that can be used for pip
+ installation, you must first build the Java SDK, as described in the section
+ above.
+
+2. Python version(3.7, or 3.8) is required
+ ```shell
+ $ python --version
+ # the version printed here must be 3.7 or 3.8
+ ```
+
+3. Install the dependencies with the following command:
+ ```shell
+ $ python -m pip install -r flink-ml-python/dev/dev-requirements.txt
+ ```
+
+### Installation
+
+Then go to the root directory of Flink ML source code and run this command to
+build the sdist package of `apache-flink-ml`:
+
+```shell
+cd flink-ml-python; python setup.py sdist; cd ..;
+```
+
+The sdist package of `apache-flink-ml` will be found under
+`./flink-ml-python/dist/`. It could be installed as follows:
+
+```shell
+python -m pip install flink-ml-python/dist/*.tar.gz
+```
+
diff --git a/docs/content.zh/docs/development/iteration.md b/docs/content.zh/docs/development/iteration.md
new file mode 100644
index 000000000..6e656beec
--- /dev/null
+++ b/docs/content.zh/docs/development/iteration.md
@@ -0,0 +1,225 @@
+---
+title: "Iteration"
+weight: 2
+type: docs
+aliases:
+- /development/iteration.html
+---
+
+
+# Iteration
+
+Iteration is a basic building block for a ML library. In machine learning
+algorithms, iteration might be used in offline or online training process. In
+general, two types of iterations are required and Flink ML supports both of them
+in order to provide the infrastructure for a variety of algorithms.
+
+1. **Bounded Iteration**: Usually used in the offline case. In this case the
+ algorithm usually trains on a bounded dataset, it updates the parameters for
+ multiple rounds until convergence.
+2. **Unbounded Iteration**: Usually used in the online case, in this case the
+ algorithm usually trains on an unbounded dataset. It accumulates a mini-batch
+ of data and then do one update to the parameters.
+
+## Iteration Paradigm
+
+An iterative algorithm has the following behavior pattern:
+
+- The iterative algorithm has an ***iteration body*** that is repeatedly invoked
+ until some termination criteria is reached (e.g. after a user-specified number
+ of epochs has been reached). An iteration body is a subgraph of operators that
+ implements the computation logic of e.g. an iterative machine learning
+ algorithm, whose outputs might be fed back as the inputs of this subgraph.
+- In each invocation, the iteration body updates the model parameters based on
+ the user-provided data as well as the most recent model parameters.
+- The iterative algorithm takes as inputs the user-provided data and the initial
+ model parameters.
+- The iterative algorithm could output arbitrary user-defined information, such
+ as the loss after each epoch, or the final model parameters.
+
+Therefore, the behavior of an iterative algorithm could be characterized with
+the following iteration paradigm (w.r.t. Flink concepts):
+
+- An iteration-body is a Flink subgraph with the following inputs and outputs:
+ - Inputs: **model-variables** (as a list of data streams) and
+ **user-provided-data** (as another list of data streams)
+ - Outputs: **feedback-model-variables** (as a list of data streams) and
+ **user-observed-outputs** (as a list of data streams)
+- A **termination-condition** that specifies when the iterative execution of the
+ iteration body should terminate.
+- In order to execute an iteration body, a user needs to execute the iteration
+ body with the following inputs, and gets the following outputs.
+ - Inputs: **initial-model-variables** (as a list of bounded data streams) and
+ **user-provided-data** (as a list of data streams)
+ - Outputs: the **user-observed-output** emitted by the iteration body.
+
+It is important to note that the **model-variables** expected by the iteration
+body is not the same as the **initial-model-variables** provided by the user.
+Instead, **model-variables** are computed as the union of the
+**feedback-model-variables** (emitted by the iteration body) and the
+**initial-model-variables** (provided by the caller of the iteration body).
+Flink ML provides utility class (see Iterations) to run an iteration-body with
+the user-provided inputs.
+
+The figure below summarizes the iteration paradigm described above.
+
+{{< mermaid >}}
+flowchart LR
+
+subgraph Iteration Body
+union1
+union2
+node11
+node12
+node21
+node22
+nodeX
+end
+
+input0 --> node11
+
+union1 -. feedback .- node12
+input1 --> union1
+union1 --> node11
+node11 --> nodeX
+nodeX --> node12
+node12 --> output1
+
+input2 --> union2
+union2 --> node21
+node21 --> nodeX
+nodeX --> node22
+node22 --> output2
+union2 -. feedback .- node22
+
+input0[non-iterate input]
+input1[iterate input]
+input2[iterate input]
+union1[union]
+union2[union]
+node11( )
+node12( )
+nodeX( )
+node21( )
+node22( )
+output1[output]
+output2[output]
+
+{{< /mermaid >}}
+
+## API
+
+The main entry of Flink ML's iteration lies in `Iterations` class. It mainly
+provides two public methods and users may choose to use either of them based on
+whether the input data is bounded or unbounded.
+
+```java
+public class Iterations {
+ public static DataStreamList iterateUnboundedStreams(
+ DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}
+ ...
+ public static DataStreamList iterateBoundedStreamsUntilTermination(
+ DataStreamList initVariableStreams,
+ ReplayableDataStreamList dataStreams,
+ IterationConfig config,
+ IterationBody body){...}
+}
+```
+
+To construct an iteration, Users are required to provide
+
+- `initVariableStreams`: the initial values of the variable data streams which
+ would be updated in each round.
+- `dataStreams`: the other data streams used inside the iteration, but would not
+ be updated.
+- `iterationBody`: specifies the subgraph to update the variable streams and the
+ outputs.
+
+The `IterationBody` will be invoked with two parameters: The first parameter is
+a list of input variable streams, which are created as the union of the initial
+variable streams and the corresponding feedback variable streams (returned by
+the iteration body); The second parameter is the data streams given to this
+method.
+
+```java
+public interface IterationBody extends Serializable {
+ ...
+ IterationBodyResult process(DataStreamList variableStreams, DataStreamList dataStreams);
+ ...
+}
+```
+
+During the execution of iteration body, each of the records involved in the
+iteration has an epoch attached, which marks the progress of the iteration. The
+epoch is computed as:
+
+- All records in the initial variable streams and initial data streams has epoch
+ = 0.
+- For any record emitted by this operator into a non-feedback stream, the epoch
+ of this emitted record = the epoch of the input record that triggers this
+ emission. If this record is emitted by onEpochWatermarkIncremented(), then the
+ epoch of this record = epochWatermark.
+- For any record emitted by this operator into a feedback variable stream, the
+ epoch of the emitted record = the epoch of the input record that triggers this
+ emission + 1.
+
+The framework would deliver notification at the end of each epoch to operators
+and UDFs that implements `IterationListener`.
+
+```java
+public interface IterationListener {
+ void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector collector)
+ throws Exception;
+ ...
+ void onIterationTerminated(Context context, Collector collector) throws Exception;
+}
+```
+
+## Example Usage
+
+Example codes of utilizing iterations is as below。
+
+```java
+DataStream initParameters = ...
+DataStream> dataset = ...
+
+DataStreamList resultStreams = Iterations.iterateUnboundedStreams(
+ DataStreamList.of(initParameters),
+ ReplayableDataStreamList.notReplay(dataset),
+ IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
+ (variableStreams, dataStreams) -> {
+ DataStream modelUpdate = variableStreams.get(0);
+ DataStream> dataset = dataStreams.get(0);
+ DataStream newModelUpdate = ...
+ DataStream modelOutput = ...
+ return new IterationBodyResult(
+ DataStreamList.of(newModelUpdate),
+ DataStreamList.of(modelOutput)
+});
+
+DataStream finalModel = resultStreams.get("final_model");
+```
+
+- `initParameters`: input data that needs to be transmitted through feedback
+ edge.
+- `dataset`: input data that does not need to be tarnsmitted through feed back
+ edge.
+- `newModelUpdate`: data to be transmitted through feedback edge
+- `modelOutput`: final output of the iteration body
diff --git a/docs/content.zh/docs/development/overview.md b/docs/content.zh/docs/development/overview.md
new file mode 100644
index 000000000..5be08ebe4
--- /dev/null
+++ b/docs/content.zh/docs/development/overview.md
@@ -0,0 +1,246 @@
+---
+title: "Overview"
+weight: 1
+type: docs
+aliases:
+- /development/overview.html
+---
+
+
+# Overview
+
+This document provides a brief introduction to the basic concepts in Flink ML.
+
+## Table API
+
+Flink ML's API is based on Flink's Table API. The Table API is a
+language-integrated query API for Java, Scala, and Python that allows the
+composition of queries from relational operators such as selection, filter, and
+join in a very intuitive way.
+
+Table API allows the usage of a wide range of data types. [Flink Document Data
+Types](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/types/)
+page provides a list of supported types. In addition to these types, Flink ML
+also provides support for `Vector` Type.
+
+The Table API integrates seamlessly with Flink’s DataStream API. You can easily
+switch between all APIs and libraries which build upon them. Please refer to
+Flink's document for how to convert between `Table` and `DataStream`, as well as
+other usage of Flink Table API.
+
+## Stage
+
+A `Stage` is a node in a `Pipeline` or `Graph`. It is the fundamental component
+in Flink ML. This interface is only a concept, and does not have any actual
+functionality. Its subclasses include the follows.
+
+- `Estimator`: An `Estimator` is a `Stage` that is reponsible for the training
+ process in machine learning algorithms. It implements a `fit()` method that
+ takes a list of tables and produces a `Model`.
+
+- `AlgoOperator`: An `AlgoOperator` is a `Stage` that is used to encode generic
+ multi-input multi-output computation logic. It implements a `transform()`
+ method, which applies certain computation logic on the given input tables and
+ returns a list of result tables.
+
+- `Transformer`: A `Transformer` is an `AlgoOperator` with the semantic
+ difference that it encodes the Transformation logic, such that a record in the
+ output typically corresponds to one record in the input. In contrast, an
+ `AlgoOperator` is a better fit to express aggregation logic where a record in
+ the output could be computed from an arbitrary number of records in the input.
+
+- `Model`: A `Model` is a `Transformer` with the extra APIs to set and get model
+ data. It is typically generated by fitting an `Estimator` on a list of tables.
+ It provides `getModelData()` and `setModelData()`, which allows users to
+ explicitly read or write model data tables to the transformer. Each table
+ could be an unbounded stream of model data changes.
+
+A typical usage of `Stage` is to create an `Estimator` instance first, trigger
+its training process by invoking its `fit()` method, and to perform predictions
+with the resulting `Model` instance. This example usage is shown in the code
+below.
+
+```java
+// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.
+
+Table trainData = ...;
+Table predictData = ...;
+
+SumEstimator estimator = new SumEstimator();
+SumModel model = estimator.fit(trainData);
+Table predictResult = model.transform(predictData)[0];
+```
+
+## Builders
+
+In order to organize Flink ML stages into more complexed format so as to achieve
+advanced functionalities, like chaining data processing and machine learning
+algorithms together, Flink ML provides APIs that help to manage the relationship
+and structure of stages in Flink jobs. The entry of these APIs includes
+`Pipeline` and `Graph`.
+
+### Pipeline
+
+A `Pipeline` acts as an `Estimator`. It consists of an ordered list of stages,
+each of which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`.
+Its `fit()` method goes through all stages of this pipeline in order and does
+the following on each stage until the last `Estimator` (inclusive).
+
+- If a stage is an `Estimator`, it would invoke the stage's `fit()` method with
+ the input tables to generage a `Model`. And if there is `Estimator` after this
+ stage, it would transform the input tables using the generated `Model` to get
+ result tables, then pass the result tables to the next stage as inputs.
+- If a stage is an `AlgoOperator` AND there is `Estimator` after this stage, it
+ would transform the input tables using this stage to get result tables, then
+ pass the result tables to the next stage as inputs.
+
+After all the `Estimators` are trained to fit their input tables, a new
+`PipelineModel` will be created with the same stages in this pipeline, except
+that all the `Estimator`s in the `PipelineModel` are replaced with the models
+generated in the above process.
+
+A `PipelineModel` acts as a `Model`. It consists of an ordered list of stages,
+each of which could be a `Model`, `Transformer` or `AlgoOperator`. Its
+`transform()` method applies all stages in this `PipelineModel` on the input
+tables in order. The output of one stage is used as the input of the next stage
+(if any). The output of the last stage is returned as the result of this method.
+
+A `Pipeline` can be created by passing a list of `Stage`s to Pipeline's
+constructor. For example,
+
+```java
+// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.
+
+Model modelA = new SumModel().setModelData(tEnv.fromValues(10));
+Estimator estimatorA = new SumEstimator();
+Model modelB = new SumModel().setModelData(tEnv.fromValues(30));
+
+List> stages = Arrays.asList(modelA, estimatorA, modelB);
+Estimator, ?> estimator = new Pipeline(stages);
+```
+
+The commands above creates a Pipeline like follows.
+
+{{< mermaid >}}
+
+graph LR
+
+empty0[ ] --> modelA --> estimatorA --> modelB --> empty1[ ]
+
+style empty0 fill:#FFFFFF, stroke:#FFFFFF;
+style empty1 fill:#FFFFFF, stroke:#FFFFFF;
+
+{{< /mermaid >}}
+
+### Graph
+
+A `Graph` acts as an `Estimator`. A `Graph` consists of a DAG of stages, each of
+which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`. When
+`Graph::fit` is called, the stages are executed in a topologically-sorted order.
+If a stage is an `Estimator`, its `Estimator::fit` method will be called on the
+input tables (from the input edges) to fit a `Model`. Then the `Model` will be
+used to transform the input tables and produce output tables to the output
+edges. If a stage is an `AlgoOperator`, its `AlgoOperator::transform` method
+will be called on the input tables and produce output tables to the output
+edges. The `GraphModel` fitted from a `Graph` consists of the fitted `Models`
+and `AlgoOperators`, corresponding to the `Graph`'s stages.
+
+A `GraphModel` acts as a `Model`. A `GraphModel` consists of a DAG of stages,
+each of which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`.
+When `GraphModel::transform` is called, the stages are executed in a
+topologically-sorted order. When a stage is executed, its
+`AlgoOperator::transform` method will be called on the input tables (from the
+input edges) and produce output tables to the output edges.
+
+A `Graph` can be constructed via the `GraphBuilder` class, which provides
+methods like `addAlgoOperator` or `addEstimator` to help adding stages to a
+graph. Flink ML also introduces `TableId` class to represent the input/output of
+a stage and to help express the relationship between stages in a graph, thus
+allowing users to construct the DAG before they have the concrete tables
+available.
+
+The example codes below shows how to build a `Graph`.
+
+```java
+// Suppose SumModel is a concrete subclass of Model.
+
+GraphBuilder builder = new GraphBuilder();
+// Creates nodes.
+SumModel stage1 = new SumModel().setModelData(tEnv.fromValues(1));
+SumModel stage2 = new SumModel();
+SumModel stage3 = new SumModel().setModelData(tEnv.fromValues(3));
+// Creates inputs and modelDataInputs.
+TableId input = builder.createTableId();
+TableId modelDataInput = builder.createTableId();
+// Feeds inputs and gets outputs.
+TableId output1 = builder.addAlgoOperator(stage1, input)[0];
+TableId output2 = builder.addAlgoOperator(stage2, output1)[0];
+builder.setModelDataOnModel(stage2, modelDataInput);
+TableId output3 = builder.addAlgoOperator(stage3, output2)[0];
+TableId modelDataOutput = builder.getModelDataFromModel(stage3)[0];
+
+// Builds a Model from the graph.
+TableId[] inputs = new TableId[] {input};
+TableId[] outputs = new TableId[] {output3};
+TableId[] modelDataInputs = new TableId[] {modelDataInput};
+TableId[] modelDataOutputs = new TableId[] {modelDataOutput};
+Model> model = builder.buildModel(inputs, outputs, modelDataInputs, modelDataOutputs);
+```
+
+The code above constructs a `Graph` like follows.
+
+{{< mermaid >}}
+
+graph LR
+
+empty0[ ] --> |input| stage1
+stage1 --> |output1| stage2
+empty1[ ] --> |modelDataInput| stage2
+stage2 --> |output2| stage3
+stage3 --> |output3| empty3[ ]
+stage3 --> |modelDataOutput| empty2[ ]
+
+style empty0 fill:#FFFFFF, stroke:#FFFFFF;
+style empty1 fill:#FFFFFF, stroke:#FFFFFF;
+style empty2 fill:#FFFFFF, stroke:#FFFFFF;
+style empty3 fill:#FFFFFF, stroke:#FFFFFF;
+
+{{< /mermaid >}}
+
+## Parameter
+
+Flink ML `Stage` is a subclass of `WithParams`, which provides a uniform API to
+get and set parameters.
+
+A `Param` is the definition of a parameter, including name, class, description,
+default value and the validator.
+
+In order to set the parameter of an algorithm, users can use any of the
+following ways.
+
+- Invoke the parameter's specific set method. For example, in order to set `K`,
+ the number of clusters, of a K-means algorithm, users can directly invoke
+ `setK()` method on that `KMeans` instance.
+- Pass a parameter map containing new values to the stage through
+ `ParamUtils.updateExistingParams()` method.
+
+If a `Model` is generated through an `Estimator`'s `fit()` method, the `Model`
+would inherit the `Estimator` object's parameters. Thus there is no need to set
+the parameters for a second time if the parameters are not changed.
diff --git a/docs/content.zh/docs/development/types.md b/docs/content.zh/docs/development/types.md
new file mode 100644
index 000000000..aba0388aa
--- /dev/null
+++ b/docs/content.zh/docs/development/types.md
@@ -0,0 +1,74 @@
+---
+title: "Data Types"
+weight: 3
+type: docs
+aliases:
+- /development/types.html
+---
+
+
+# Data Types
+
+Flink ML supports all data types that have been supported by Flink Table API, as
+well as data types listed in sections below.
+
+## Vector
+
+Flink ML provides support for vectors of double values. A `Vector` in Flink ML
+can be either dense(`DenseVector`) or sparse(`SparseVector`), depending on how
+users create them accordig to the vector's sparsity. Each vector is initialized
+with a fixed size and users may get or set the double value of any 0-based index
+location in the vector.
+
+Flink ML also has a class named `Vectors` providing utility methods for
+instantiating vectors.
+
+{{< tabs vector >}}
+
+{{< tab "Java">}}
+```java
+int n = 4;
+int[] indices = new int[] {0, 2, 3};
+double[] values = new double[] {0.1, 0.3, 0.4};
+
+SparseVector vector = Vectors.sparse(n, indices, values);
+```
+{{< /tab>}}
+
+{{< tab "Python">}}
+```python
+# Create a dense vector of 64-bit floats from a Python list or numbers.
+>>> Vectors.dense([1, 2, 3])
+DenseVector([1.0, 2.0, 3.0])
+>>> Vectors.dense(1.0, 2.0)
+DenseVector([1.0, 2.0])
+
+# Create a sparse vector, using either a dict, a list of (index, value) pairs, or two separate
+# arrays of indices and values.
+
+>>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
+SparseVector(4, {1: 1.0, 3: 5.5})
+>>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+SparseVector(4, {1: 1.0, 3: 5.5})
+>>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
+SparseVector(4, {1: 1.0, 3: 5.5})
+```
+{{< /tab>}}
+{{< /tabs>}}
\ No newline at end of file
diff --git a/docs/content.zh/docs/operators/_index.md b/docs/content.zh/docs/operators/_index.md
new file mode 100644
index 000000000..5038fcfa7
--- /dev/null
+++ b/docs/content.zh/docs/operators/_index.md
@@ -0,0 +1,25 @@
+---
+title: Operators
+icon:
+bold: true
+bookCollapseSection: true
+weight: 3
+---
+
diff --git a/docs/content.zh/docs/operators/classification/_index.md b/docs/content.zh/docs/operators/classification/_index.md
new file mode 100644
index 000000000..007d663f3
--- /dev/null
+++ b/docs/content.zh/docs/operators/classification/_index.md
@@ -0,0 +1,25 @@
+---
+title: Classification
+bookCollapseSection: true
+weight: 1
+aliases:
+ - /operators/feature/
+---
+
diff --git a/docs/content.zh/docs/operators/classification/knn.md b/docs/content.zh/docs/operators/classification/knn.md
new file mode 100644
index 000000000..0724f2daf
--- /dev/null
+++ b/docs/content.zh/docs/operators/classification/knn.md
@@ -0,0 +1,216 @@
+---
+title: "KNN"
+type: docs
+aliases:
+- /operators/classification/knn.html
+---
+
+
+## KNN
+
+K Nearest Neighbor(KNN) is a classification algorithm. The basic assumption of
+KNN is that if most of the nearest K neighbors of the provided sample belong to
+the same label, then it is highly probable that the provided sample also belongs
+to that label.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+| :---------- | :------ | :----------- |:------------------|
+| featuresCol | Vector | `"features"` | Feature vector. |
+| labelCol | Integer | `"label"` | Label to predict. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+| :------------ | :------ | :------------- |:-----------------|
+| predictionCol | Integer | `"prediction"` | Predicted label. |
+
+### Parameters
+
+Below are the parameters required by `KnnModel`.
+
+| Key | Default | Type | Required | Description |
+|---------------| -------------- | ------- | -------- | -------------------------------- |
+| k | `5` | Integer | no | The number of nearest neighbors. |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+
+`Knn` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+| -------- | --------- | ------ | -------- | ------------------ |
+| labelCol | `"label"` | String | no | Label column name. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+```java
+import org.apache.flink.ml.classification.knn.Knn;
+import org.apache.flink.ml.classification.knn.KnnModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a Knn model and uses it for classification. */
+public class KnnExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input training and prediction data.
+ DataStream trainStream =
+ env.fromElements(
+ Row.of(Vectors.dense(2.0, 3.0), 1.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(200.1, 300.1), 2.0),
+ Row.of(Vectors.dense(200.2, 300.2), 2.0),
+ Row.of(Vectors.dense(200.3, 300.3), 2.0),
+ Row.of(Vectors.dense(200.4, 300.4), 2.0),
+ Row.of(Vectors.dense(200.4, 300.4), 2.0),
+ Row.of(Vectors.dense(200.6, 300.6), 2.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(2.3, 3.2), 1.0),
+ Row.of(Vectors.dense(2.3, 3.2), 1.0),
+ Row.of(Vectors.dense(2.8, 3.2), 3.0),
+ Row.of(Vectors.dense(300., 3.2), 4.0),
+ Row.of(Vectors.dense(2.2, 3.2), 1.0),
+ Row.of(Vectors.dense(2.4, 3.2), 5.0),
+ Row.of(Vectors.dense(2.5, 3.2), 5.0),
+ Row.of(Vectors.dense(2.5, 3.2), 5.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0));
+ Table trainTable = tEnv.fromDataStream(trainStream).as("features", "label");
+
+ DataStream predictStream =
+ env.fromElements(
+ Row.of(Vectors.dense(4.0, 4.1), 5.0), Row.of(Vectors.dense(300, 42), 2.0));
+ Table predictTable = tEnv.fromDataStream(predictStream).as("features", "label");
+
+ // Creates a Knn object and initializes its parameters.
+ Knn knn = new Knn().setK(4);
+
+ // Trains the Knn Model.
+ KnnModel knnModel = knn.fit(trainTable);
+
+ // Uses the Knn Model for predictions.
+ Table outputTable = knnModel.transform(predictTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ DenseVector features = (DenseVector) row.getField(knn.getFeaturesCol());
+ double expectedResult = (Double) row.getField(knn.getLabelCol());
+ double predictionResult = (Double) row.getField(knn.getPredictionCol());
+ System.out.printf(
+ "Features: %-15s \tExpected Result: %s \tPrediction Result: %s\n",
+ features, expectedResult, predictionResult);
+ }
+ }
+}
+
+```
+{{< /tab>}}
+
+{{< tab "Python">}}
+```python
+# Simple program that trains a Knn model and uses it for classification.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.classification.knn import KNN
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([2.0, 3.0]), 1.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([200.1, 300.1]), 2.0),
+ (Vectors.dense([200.2, 300.2]), 2.0),
+ (Vectors.dense([200.3, 300.3]), 2.0),
+ (Vectors.dense([200.4, 300.4]), 2.0),
+ (Vectors.dense([200.4, 300.4]), 2.0),
+ (Vectors.dense([200.6, 300.6]), 2.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([2.3, 3.2]), 1.0),
+ (Vectors.dense([2.3, 3.2]), 1.0),
+ (Vectors.dense([2.8, 3.2]), 3.0),
+ (Vectors.dense([300., 3.2]), 4.0),
+ (Vectors.dense([2.2, 3.2]), 1.0),
+ (Vectors.dense([2.4, 3.2]), 5.0),
+ (Vectors.dense([2.5, 3.2]), 5.0),
+ (Vectors.dense([2.5, 3.2]), 5.0),
+ (Vectors.dense([2.1, 3.1]), 1.0)
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label'],
+ [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+predict_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([4.0, 4.1]), 5.0),
+ (Vectors.dense([300, 42]), 2.0),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label'],
+ [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+# create a knn object and initialize its parameters
+knn = KNN().set_k(4)
+
+# train the knn model
+model = knn.fit(train_data)
+
+# use the knn model for predictions
+output = model.transform(predict_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(knn.get_features_col())]
+ expected_result = result[field_names.index(knn.get_label_col())]
+ actual_result = result[field_names.index(knn.get_prediction_col())]
+ print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+ + ' \tActual Result: ' + str(actual_result))
+```
+{{< /tab>}}
+
+{{< /tabs>}}
+
diff --git a/docs/content.zh/docs/operators/classification/linearsvc.md b/docs/content.zh/docs/operators/classification/linearsvc.md
new file mode 100644
index 000000000..5b134a995
--- /dev/null
+++ b/docs/content.zh/docs/operators/classification/linearsvc.md
@@ -0,0 +1,197 @@
+---
+title: "Linear SVC"
+type: docs
+aliases:
+- /operators/classification/linearsvc.html
+---
+
+
+
+## Linear Support Vector Machine
+
+Linear Support Vector Machine (Linear SVC) is an algorithm that attempts to find
+a hyperplane to maximize the distance between classified samples.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+| :---------- | :------ | :----------- |:------------------|
+| featuresCol | Vector | `"features"` | Feature vector. |
+| labelCol | Integer | `"label"` | Label to predict. |
+| weightCol | Double | `"weight"` | Weight of sample. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+| :--------------- | :------ | :---------------- |:-----------------------------------------|
+| predictionCol | Integer | `"prediction"` | Label of the max probability. |
+| rawPredictionCol | Vector | `"rawPrediction"` | Vector of the probability of each label. |
+
+### Parameters
+
+Below are the parameters required by `LinearSVCModel`.
+
+| Key | Default | Type | Required | Description |
+|------------------|-------------------|--------|----------|-------------------------------------------------------------------------|
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+| rawPredictionCol | `"rawPrediction"` | String | no | Raw prediction column name. |
+| threshold | `0.0` | Double | no | Threshold in binary classification prediction applied to rawPrediction. |
+
+`LinearSVC` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+| --------------- | --------- | ------- | -------- | ----------------------------------------------- |
+| labelCol | `"label"` | String | no | Label column name. |
+| weightCol | `null` | String | no | Weight column name. |
+| maxIter | `20` | Integer | no | Maximum number of iterations. |
+| reg | `0.` | Double | no | Regularization parameter. |
+| elasticNet | `0.` | Double | no | ElasticNet parameter. |
+| learningRate | `0.1` | Double | no | Learning rate of optimization method. |
+| globalBatchSize | `32` | Integer | no | Global batch size of training algorithms. |
+| tol | `1e-6` | Double | no | Convergence tolerance for iterative algorithms. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.classification.linearsvc.LinearSVC;
+import org.apache.flink.ml.classification.linearsvc.LinearSVCModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a LinearSVC model and uses it for classification. */
+public class LinearSVCExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream =
+ env.fromElements(
+ Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
+ Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
+ Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
+ Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
+ Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
+ Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
+ Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
+ Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
+ Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
+ Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("features", "label", "weight");
+
+ // Creates a LinearSVC object and initializes its parameters.
+ LinearSVC linearSVC = new LinearSVC().setWeightCol("weight");
+
+ // Trains the LinearSVC Model.
+ LinearSVCModel linearSVCModel = linearSVC.fit(inputTable);
+
+ // Uses the LinearSVC Model for predictions.
+ Table outputTable = linearSVCModel.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ DenseVector features = (DenseVector) row.getField(linearSVC.getFeaturesCol());
+ double expectedResult = (Double) row.getField(linearSVC.getLabelCol());
+ double predictionResult = (Double) row.getField(linearSVC.getPredictionCol());
+ DenseVector rawPredictionResult =
+ (DenseVector) row.getField(linearSVC.getRawPredictionCol());
+ System.out.printf(
+ "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+ features, expectedResult, predictionResult, rawPredictionResult);
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that trains a LinearSVC model and uses it for classification.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.classification.linearsvc import LinearSVC
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([1, 2, 3, 4]), 0., 1.),
+ (Vectors.dense([2, 2, 3, 4]), 0., 2.),
+ (Vectors.dense([3, 2, 3, 4]), 0., 3.),
+ (Vectors.dense([4, 2, 3, 4]), 0., 4.),
+ (Vectors.dense([5, 2, 3, 4]), 0., 5.),
+ (Vectors.dense([11, 2, 3, 4]), 1., 1.),
+ (Vectors.dense([12, 2, 3, 4]), 1., 2.),
+ (Vectors.dense([13, 2, 3, 4]), 1., 3.),
+ (Vectors.dense([14, 2, 3, 4]), 1., 4.),
+ (Vectors.dense([15, 2, 3, 4]), 1., 5.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label', 'weight'],
+ [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+ ))
+
+# create a linear svc object and initialize its parameters
+linear_svc = LinearSVC().set_weight_col('weight')
+
+# train the linear svc model
+model = linear_svc.fit(input_table)
+
+# use the linear svc model for predictions
+output = model.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(linear_svc.get_features_col())]
+ expected_result = result[field_names.index(linear_svc.get_label_col())]
+ prediction_result = result[field_names.index(linear_svc.get_prediction_col())]
+ raw_prediction_result = result[field_names.index(linear_svc.get_raw_prediction_col())]
+ print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+ + ' \tPrediction Result: ' + str(prediction_result)
+ + ' \tRaw Prediction Result: ' + str(raw_prediction_result))
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/classification/logisticregression.md b/docs/content.zh/docs/operators/classification/logisticregression.md
new file mode 100644
index 000000000..b45b8480a
--- /dev/null
+++ b/docs/content.zh/docs/operators/classification/logisticregression.md
@@ -0,0 +1,364 @@
+---
+title: "Logistic Regression"
+type: docs
+aliases:
+- /operators/classification/logisticregression.html
+---
+
+
+## Logistic Regression
+
+Logistic regression is a special case of the Generalized Linear Model. It is
+widely used to predict a binary response.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+| :---------- | :------ | :----------- |:------------------|
+| featuresCol | Vector | `"features"` | Feature vector. |
+| labelCol | Integer | `"label"` | Label to predict. |
+| weightCol | Double | `"weight"` | Weight of sample. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+| :--------------- | :------ | :---------------- |:-----------------------------------------|
+| predictionCol | Integer | `"prediction"` | Label of the max probability. |
+| rawPredictionCol | Vector | `"rawPrediction"` | Vector of the probability of each label. |
+
+### Parameters
+
+Below are the parameters required by `LogisticRegressionModel`.
+
+| Key | Default | Type | Required | Description |
+| ---------------- | ----------------- | ------ | -------- | --------------------------- |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+| rawPredictionCol | `"rawPrediction"` | String | no | Raw prediction column name. |
+
+`LogisticRegression` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+|-----------------|-----------|---------|----------|---------------------------------------------------------------------------|
+| labelCol | `"label"` | String | no | Label column name. |
+| weightCol | `null` | String | no | Weight column name. |
+| maxIter | `20` | Integer | no | Maximum number of iterations. |
+| reg | `0.` | Double | no | Regularization parameter. |
+| elasticNet | `0.` | Double | no | ElasticNet parameter. |
+| learningRate | `0.1` | Double | no | Learning rate of optimization method. |
+| globalBatchSize | `32` | Integer | no | Global batch size of training algorithms. |
+| tol | `1e-6` | Double | no | Convergence tolerance for iterative algorithms. |
+| multiClass | `"auto"` | String | no | Classification type. Supported values: "auto", "binomial", "multinomial". |
+
+### Examples
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+```java
+import org.apache.flink.ml.classification.logisticregression.LogisticRegression;
+import org.apache.flink.ml.classification.logisticregression.LogisticRegressionModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a LogisticRegression model and uses it for classification. */
+public class LogisticRegressionExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream =
+ env.fromElements(
+ Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
+ Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
+ Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
+ Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
+ Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
+ Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
+ Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
+ Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
+ Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
+ Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("features", "label", "weight");
+
+ // Creates a LogisticRegression object and initializes its parameters.
+ LogisticRegression lr = new LogisticRegression().setWeightCol("weight");
+
+ // Trains the LogisticRegression Model.
+ LogisticRegressionModel lrModel = lr.fit(inputTable);
+
+ // Uses the LogisticRegression Model for predictions.
+ Table outputTable = lrModel.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ DenseVector features = (DenseVector) row.getField(lr.getFeaturesCol());
+ double expectedResult = (Double) row.getField(lr.getLabelCol());
+ double predictionResult = (Double) row.getField(lr.getPredictionCol());
+ DenseVector rawPredictionResult = (DenseVector) row.getField(lr.getRawPredictionCol());
+ System.out.printf(
+ "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+ features, expectedResult, predictionResult, rawPredictionResult);
+ }
+ }
+}
+
+```
+{{< /tab>}}
+
+{{< tab "Python">}}
+```python
+# Simple program that trains a LogisticRegression model and uses it for
+# classification.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.classification.logisticregression import LogisticRegression
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([1, 2, 3, 4]), 0., 1.),
+ (Vectors.dense([2, 2, 3, 4]), 0., 2.),
+ (Vectors.dense([3, 2, 3, 4]), 0., 3.),
+ (Vectors.dense([4, 2, 3, 4]), 0., 4.),
+ (Vectors.dense([5, 2, 3, 4]), 0., 5.),
+ (Vectors.dense([11, 2, 3, 4]), 1., 1.),
+ (Vectors.dense([12, 2, 3, 4]), 1., 2.),
+ (Vectors.dense([13, 2, 3, 4]), 1., 3.),
+ (Vectors.dense([14, 2, 3, 4]), 1., 4.),
+ (Vectors.dense([15, 2, 3, 4]), 1., 5.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label', 'weight'],
+ [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+ ))
+
+# create a logistic regression object and initialize its parameters
+logistic_regression = LogisticRegression().set_weight_col('weight')
+
+# train the logistic regression model
+model = logistic_regression.fit(input_data)
+
+# use the logistic regression model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(logistic_regression.get_features_col())]
+ expected_result = result[field_names.index(logistic_regression.get_label_col())]
+ prediction_result = result[field_names.index(logistic_regression.get_prediction_col())]
+ raw_prediction_result = result[field_names.index(logistic_regression.get_raw_prediction_col())]
+ print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+ + ' \tPrediction Result: ' + str(prediction_result)
+ + ' \tRaw Prediction Result: ' + str(raw_prediction_result))
+
+```
+{{< /tab>}}
+
+{{< /tabs>}}
+
+## OnlineLogisticRegression
+
+Online Logistic Regression supports training online regression model on an
+unbounded stream of training data.
+
+The online optimizer of this algorithm is The FTRL-Proximal proposed by
+H.Brendan McMahan et al. See [H. Brendan McMahan et al., Ad click prediction: a
+view from the trenches.](https://doi.org/10.1145/2487575.2488200)
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector | `"features"` | Feature vector |
+| labelCol | Integer | `"label"` | Label to predict |
+| weightCol | Double | `"weight"` | Weight of sample |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+| :--------------- | :------ | :---------------- | :----------------------------------------------------- |
+| predictionCol | Integer | `"prediction"` | Label of the max probability |
+| rawPredictionCol | Vector | `"rawPrediction"` | Vector of the probability of each label |
+| modelVersionCol | Long | `"modelVersion"` | The version of the model data used for this prediction |
+
+### Parameters
+
+Below are the parameters required by `OnlineLogisticRegressionModel`.
+
+| Key | Default | Type | Required | Description |
+| ---------------- | ----------------- | ------ | -------- | --------------------------- |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+| rawPredictionCol | `"rawPrediction"` | String | no | Raw prediction column name. |
+| modelVersionCol | `"modelVersion"` | String | no | Model version column name. |
+
+`OnlineLogisticRegression` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+| --------------- | ---------------- | ------- | -------- | ----------------------------------------------------- |
+| labelCol | `"label"` | String | no | Label column name. |
+| weightCol | `null` | String | no | Weight column name. |
+| batchStrategy | `COUNT_STRATEGY` | String | no | Strategy to create mini batch from online train data. |
+| globalBatchSize | `32` | Integer | no | Global batch size of training algorithms. |
+| reg | `0.` | Double | no | Regularization parameter. |
+| elasticNet | `0.` | Double | no | ElasticNet parameter. |
+
+### Examples
+
+{{< tabs online_examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.classification.logisticregression.OnlineLogisticRegression;
+import org.apache.flink.ml.classification.logisticregression.OnlineLogisticRegressionModel;
+import org.apache.flink.ml.examples.util.PeriodicSourceFunction;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Simple program that trains an OnlineLogisticRegression model and uses it for classification. */
+public class OnlineLogisticRegressionExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input training and prediction data. Both are infinite streams that periodically
+ // sends out provided data to trigger model update and prediction.
+ List trainData1 =
+ Arrays.asList(
+ Row.of(Vectors.dense(0.1, 2.), 0.),
+ Row.of(Vectors.dense(0.2, 2.), 0.),
+ Row.of(Vectors.dense(0.3, 2.), 0.),
+ Row.of(Vectors.dense(0.4, 2.), 0.),
+ Row.of(Vectors.dense(0.5, 2.), 0.),
+ Row.of(Vectors.dense(11., 12.), 1.),
+ Row.of(Vectors.dense(12., 11.), 1.),
+ Row.of(Vectors.dense(13., 12.), 1.),
+ Row.of(Vectors.dense(14., 12.), 1.),
+ Row.of(Vectors.dense(15., 12.), 1.));
+
+ List trainData2 =
+ Arrays.asList(
+ Row.of(Vectors.dense(0.2, 3.), 0.),
+ Row.of(Vectors.dense(0.8, 1.), 0.),
+ Row.of(Vectors.dense(0.7, 1.), 0.),
+ Row.of(Vectors.dense(0.6, 2.), 0.),
+ Row.of(Vectors.dense(0.2, 2.), 0.),
+ Row.of(Vectors.dense(14., 17.), 1.),
+ Row.of(Vectors.dense(15., 10.), 1.),
+ Row.of(Vectors.dense(16., 16.), 1.),
+ Row.of(Vectors.dense(17., 10.), 1.),
+ Row.of(Vectors.dense(18., 13.), 1.));
+
+ List predictData =
+ Arrays.asList(
+ Row.of(Vectors.dense(0.8, 2.7), 0.0),
+ Row.of(Vectors.dense(15.5, 11.2), 1.0));
+
+ RowTypeInfo typeInfo =
+ new RowTypeInfo(
+ new TypeInformation[] {DenseVectorTypeInfo.INSTANCE, Types.DOUBLE},
+ new String[] {"features", "label"});
+
+ SourceFunction trainSource =
+ new PeriodicSourceFunction(1000, Arrays.asList(trainData1, trainData2));
+ DataStream trainStream = env.addSource(trainSource, typeInfo);
+ Table trainTable = tEnv.fromDataStream(trainStream).as("features");
+
+ SourceFunction predictSource =
+ new PeriodicSourceFunction(1000, Collections.singletonList(predictData));
+ DataStream predictStream = env.addSource(predictSource, typeInfo);
+ Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+ // Creates an online LogisticRegression object and initializes its parameters and initial
+ // model data.
+ Row initModelData = Row.of(Vectors.dense(0.41233679404769874, -0.18088118293232122), 0L);
+ Table initModelDataTable = tEnv.fromDataStream(env.fromElements(initModelData));
+ OnlineLogisticRegression olr =
+ new OnlineLogisticRegression()
+ .setFeaturesCol("features")
+ .setLabelCol("label")
+ .setPredictionCol("prediction")
+ .setReg(0.2)
+ .setElasticNet(0.5)
+ .setGlobalBatchSize(10)
+ .setInitialModelData(initModelDataTable);
+
+ // Trains the online LogisticRegression Model.
+ OnlineLogisticRegressionModel onlineModel = olr.fit(trainTable);
+
+ // Uses the online LogisticRegression Model for predictions.
+ Table outputTable = onlineModel.transform(predictTable)[0];
+
+ // Extracts and displays the results. As training data stream continuously triggers the
+ // update of the internal model data, raw prediction results of the same predict dataset
+ // would change over time.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ DenseVector features = (DenseVector) row.getField(olr.getFeaturesCol());
+ Double expectedResult = (Double) row.getField(olr.getLabelCol());
+ Double predictionResult = (Double) row.getField(olr.getPredictionCol());
+ DenseVector rawPredictionResult = (DenseVector) row.getField(olr.getRawPredictionCol());
+ System.out.printf(
+ "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+ features, expectedResult, predictionResult, rawPredictionResult);
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/classification/naivebayes.md b/docs/content.zh/docs/operators/classification/naivebayes.md
new file mode 100644
index 000000000..3fe9beb8e
--- /dev/null
+++ b/docs/content.zh/docs/operators/classification/naivebayes.md
@@ -0,0 +1,192 @@
+---
+title: "Naive Bayes"
+type: docs
+aliases:
+- /operators/classification/naivebayes.html
+---
+
+
+## Naive Bayes
+
+Naive Bayes is a multiclass classifier. Based on Bayes’ theorem, it assumes that
+there is strong (naive) independence between every pair of features.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+| :---------- | :------ | :----------- |:------------------|
+| featuresCol | Vector | `"features"` | Feature vector. |
+| labelCol | Integer | `"label"` | Label to predict. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+| :------------ | :------ | :------------- |:-----------------|
+| predictionCol | Integer | `"prediction"` | Predicted label. |
+
+### Parameters
+
+Below are parameters required by `NaiveBayesModel`.
+
+| Key | Default | Type | Required | Description |
+| ------------- | --------------- | ------ | -------- |--------------------------------------------------|
+| modelType | `"multinomial"` | String | no | The model type. Supported values: "multinomial". |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+
+`NaiveBayes` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+| --------- | --------- | ------ | -------- | ------------------------ |
+| labelCol | `"label"` | String | no | Label column name. |
+| smoothing | `1.0` | Double | no | The smoothing parameter. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+```java
+import org.apache.flink.ml.classification.naivebayes.NaiveBayes;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a NaiveBayes model and uses it for classification. */
+public class NaiveBayesExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input training and prediction data.
+ DataStream trainStream =
+ env.fromElements(
+ Row.of(Vectors.dense(0, 0.), 11),
+ Row.of(Vectors.dense(1, 0), 10),
+ Row.of(Vectors.dense(1, 1.), 10));
+ Table trainTable = tEnv.fromDataStream(trainStream).as("features", "label");
+
+ DataStream predictStream =
+ env.fromElements(
+ Row.of(Vectors.dense(0, 1.)),
+ Row.of(Vectors.dense(0, 0.)),
+ Row.of(Vectors.dense(1, 0)),
+ Row.of(Vectors.dense(1, 1.)));
+ Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+ // Creates a NaiveBayes object and initializes its parameters.
+ NaiveBayes naiveBayes =
+ new NaiveBayes()
+ .setSmoothing(1.0)
+ .setFeaturesCol("features")
+ .setLabelCol("label")
+ .setPredictionCol("prediction")
+ .setModelType("multinomial");
+
+ // Trains the NaiveBayes Model.
+ NaiveBayesModel naiveBayesModel = naiveBayes.fit(trainTable);
+
+ // Uses the NaiveBayes Model for predictions.
+ Table outputTable = naiveBayesModel.transform(predictTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ DenseVector features = (DenseVector) row.getField(naiveBayes.getFeaturesCol());
+ double predictionResult = (Double) row.getField(naiveBayes.getPredictionCol());
+ System.out.printf("Features: %s \tPrediction Result: %s\n", features, predictionResult);
+ }
+ }
+}
+
+```
+{{< /tab>}}
+
+
+{{< tab "Python">}}
+```python
+
+# Simple program that trains a NaiveBayes model and uses it for classification.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.classification.naivebayes import NaiveBayes
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([0, 0.]), 11.),
+ (Vectors.dense([1, 0]), 10.),
+ (Vectors.dense([1, 1.]), 10.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label'],
+ [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+predict_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([0, 1.]),),
+ (Vectors.dense([0, 0.]),),
+ (Vectors.dense([1, 0]),),
+ (Vectors.dense([1, 1.]),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features'],
+ [DenseVectorTypeInfo()])))
+
+# create a naive bayes object and initialize its parameters
+naive_bayes = NaiveBayes() \
+ .set_smoothing(1.0) \
+ .set_features_col('features') \
+ .set_label_col('label') \
+ .set_prediction_col('prediction') \
+ .set_model_type('multinomial')
+
+# train the naive bayes model
+model = naive_bayes.fit(train_table)
+
+# use the naive bayes model for predictions
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(naive_bayes.get_features_col())]
+ prediction_result = result[field_names.index(naive_bayes.get_prediction_col())]
+ print('Features: ' + str(features) + ' \tPrediction Result: ' + str(prediction_result))
+
+```
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/clustering/_index.md b/docs/content.zh/docs/operators/clustering/_index.md
new file mode 100644
index 000000000..86f8a7e87
--- /dev/null
+++ b/docs/content.zh/docs/operators/clustering/_index.md
@@ -0,0 +1,25 @@
+---
+title: Clustering
+bookCollapseSection: true
+weight: 1
+aliases:
+ - /operators/clustering/
+---
+
diff --git a/docs/content.zh/docs/operators/clustering/agglomerativeclustering.md b/docs/content.zh/docs/operators/clustering/agglomerativeclustering.md
new file mode 100644
index 000000000..9ded65cca
--- /dev/null
+++ b/docs/content.zh/docs/operators/clustering/agglomerativeclustering.md
@@ -0,0 +1,181 @@
+---
+title: "AgglomerativeClustering"
+type: docs
+aliases:
+- /operators/clustering/agglomerativeclustering.html
+---
+
+
+## AgglomerativeClustering
+
+AgglomerativeClustering performs a hierarchical clustering
+using a bottom-up approach. Each observation starts in its
+own cluster and the clusters are merged together one by one.
+
+The output contains two tables. The first one assigns one
+cluster Id for each data point. The second one contains the
+information of merging two clusters at each step. The data
+format of the merging information is
+(clusterId1, clusterId2, distance, sizeOfMergedCluster).
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:------------|:-------|:-------------|:----------------|
+| featuresCol | Vector | `"features"` | Feature vector. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:--------------|:--------|:---------------|:--------------------------|
+| predictionCol | Integer | `"prediction"` | Predicted cluster center. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|:------------------|:------------------------------|:--------|:---------|:-------------------------------------------------------------------------------|
+| numClusters | `2` | Integer | no | The max number of clusters to create. |
+| distanceThreshold | `null` | Double | no | Threshold to decide whether two clusters should be merged. |
+| linkage | `"ward"` | String | no | Criterion for computing distance between two clusters. |
+| computeFullTree | `false` | Boolean | no | Whether computes the full tree after convergence. |
+| distanceMeasure | `"euclidean"` | String | no | Distance measure. |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+| windows | `GlobalWindows.getInstance()` | Windows | no | Windowing strategy that determines how to create mini-batches from input data. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+```java
+import org.apache.flink.ml.clustering.agglomerativeclustering.AgglomerativeClustering;
+import org.apache.flink.ml.clustering.agglomerativeclustering.AgglomerativeClusteringParams;
+import org.apache.flink.ml.common.distance.EuclideanDistanceMeasure;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that creates an AgglomerativeClustering instance and uses it for clustering. */
+public class AgglomerativeClusteringExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream =
+ env.fromElements(
+ Vectors.dense(1, 1),
+ Vectors.dense(1, 4),
+ Vectors.dense(1, 0),
+ Vectors.dense(4, 1.5),
+ Vectors.dense(4, 4),
+ Vectors.dense(4, 0));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("features");
+
+ // Creates an AgglomerativeClustering object and initializes its parameters.
+ AgglomerativeClustering agglomerativeClustering =
+ new AgglomerativeClustering()
+ .setLinkage(AgglomerativeClusteringParams.LINKAGE_WARD)
+ .setDistanceMeasure(EuclideanDistanceMeasure.NAME)
+ .setPredictionCol("prediction");
+
+ // Uses the AgglomerativeClustering object for clustering.
+ Table[] outputs = agglomerativeClustering.transform(inputTable);
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputs[0].execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ DenseVector features =
+ (DenseVector) row.getField(agglomerativeClustering.getFeaturesCol());
+ int clusterId = (Integer) row.getField(agglomerativeClustering.getPredictionCol());
+ System.out.printf("Features: %s \tCluster ID: %s\n", features, clusterId);
+ }
+ }
+}
+
+```
+{{< /tab>}}
+
+{{< tab "Python">}}
+```python
+# Simple program that creates an agglomerativeclustering instance and uses it for clustering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.clustering.agglomerativeclustering import AgglomerativeClustering
+from pyflink.table import StreamTableEnvironment
+from matplotlib import pyplot as plt
+from scipy.cluster.hierarchy import dendrogram
+
+# Creates a new StreamExecutionEnvironment.
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# Creates a StreamTableEnvironment.
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([1, 1]),),
+ (Vectors.dense([1, 4]),),
+ (Vectors.dense([1, 0]),),
+ (Vectors.dense([4, 1.5]),),
+ (Vectors.dense([4, 4]),),
+ (Vectors.dense([4, 0]),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features'],
+ [DenseVectorTypeInfo()])))
+
+# Creates an AgglomerativeClustering object and initializes its parameters.
+agglomerative_clustering = AgglomerativeClustering() \
+ .set_linkage('ward') \
+ .set_distance_measure('euclidean') \
+ .set_prediction_col('prediction')
+
+# Uses the AgglomerativeClustering for clustering.
+outputs = agglomerative_clustering.transform(input_data)
+
+# Extracts and display the clustering results.
+field_names = outputs[0].get_schema().get_field_names()
+for result in t_env.to_data_stream(outputs[0]).execute_and_collect():
+ features = result[field_names.index(agglomerative_clustering.features_col)]
+ cluster_id = result[field_names.index(agglomerative_clustering.prediction_col)]
+ print('Features: ' + str(features) + '\tCluster ID: ' + str(cluster_id))
+
+# Visualizes the merge info.
+merge_info = [result for result in
+ t_env.to_data_stream(outputs[1]).execute_and_collect()]
+plt.title("Agglomerative Clustering Dendrogram")
+dendrogram(merge_info)
+plt.xlabel("Index of data point.")
+plt.ylabel("Distances between merged clusters.")
+plt.show()
+```
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/clustering/kmeans.md b/docs/content.zh/docs/operators/clustering/kmeans.md
new file mode 100644
index 000000000..eeeff603a
--- /dev/null
+++ b/docs/content.zh/docs/operators/clustering/kmeans.md
@@ -0,0 +1,329 @@
+---
+title: "Kmeans"
+type: docs
+aliases:
+- /operators/clustering/kmeans.html
+---
+
+
+## K-means
+
+K-means is a commonly-used clustering algorithm. It groups given data points
+into a predefined number of clusters.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:------------|:-------|:-------------|:----------------|
+| featuresCol | Vector | `"features"` | Feature vector. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:--------------|:--------|:---------------|:--------------------------|
+| predictionCol | Integer | `"prediction"` | Predicted cluster center. |
+
+### Parameters
+
+Below are the parameters required by `KMeansModel`.
+
+| Key | Default | Type | Required | Description |
+|-----------------|----------------|---------|----------|---------------------------------------------------------------------------|
+| distanceMeasure | `euclidean` | String | no | Distance measure. Supported values: `'euclidean', 'manhattan', 'cosine'`. |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+| k | `2` | Integer | no | The max number of clusters to create. |
+
+`KMeans` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+|----------|------------|---------|----------|------------------------------------------------------------|
+| initMode | `"random"` | String | no | The initialization algorithm. Supported options: 'random'. |
+| seed | `null` | Long | no | The random seed. |
+| maxIter | `20` | Integer | no | Maximum number of iterations. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+```java
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.clustering.kmeans.KMeansModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a KMeans model and uses it for clustering. */
+public class KMeansExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream =
+ env.fromElements(
+ Vectors.dense(0.0, 0.0),
+ Vectors.dense(0.0, 0.3),
+ Vectors.dense(0.3, 0.0),
+ Vectors.dense(9.0, 0.0),
+ Vectors.dense(9.0, 0.6),
+ Vectors.dense(9.6, 0.0));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("features");
+
+ // Creates a K-means object and initializes its parameters.
+ KMeans kmeans = new KMeans().setK(2).setSeed(1L);
+
+ // Trains the K-means Model.
+ KMeansModel kmeansModel = kmeans.fit(inputTable);
+
+ // Uses the K-means Model for predictions.
+ Table outputTable = kmeansModel.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ DenseVector features = (DenseVector) row.getField(kmeans.getFeaturesCol());
+ int clusterId = (Integer) row.getField(kmeans.getPredictionCol());
+ System.out.printf("Features: %s \tCluster ID: %s\n", features, clusterId);
+ }
+ }
+}
+
+```
+{{< /tab>}}
+
+{{< tab "Python">}}
+```python
+# Simple program that trains a KMeans model and uses it for clustering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.clustering.kmeans import KMeans
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([0.0, 0.0]),),
+ (Vectors.dense([0.0, 0.3]),),
+ (Vectors.dense([0.3, 3.0]),),
+ (Vectors.dense([9.0, 0.0]),),
+ (Vectors.dense([9.0, 0.6]),),
+ (Vectors.dense([9.6, 0.0]),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features'],
+ [DenseVectorTypeInfo()])))
+
+# create a kmeans object and initialize its parameters
+kmeans = KMeans().set_k(2).set_seed(1)
+
+# train the kmeans model
+model = kmeans.fit(input_data)
+
+# use the kmeans model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(kmeans.get_features_col())]
+ cluster_id = result[field_names.index(kmeans.get_prediction_col())]
+ print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))
+
+```
+{{< /tab>}}
+
+{{< /tabs>}}
+
+## Online K-means
+
+Online K-Means extends the function of K-Means, supporting to train a K-Means
+model continuously according to an unbounded stream of train data.
+
+Online K-Means makes updates with the "mini-batch" K-Means rule, generalized to
+incorporate forgetfulness (i.e. decay). After the centroids estimated on the
+current batch are acquired, Online K-Means computes the new centroids from the
+weighted average between the original and the estimated centroids. The weight of
+the estimated centroids is the number of points assigned to them. The weight of
+the original centroids is also the number of points, but additionally
+multiplying with the decay factor.
+
+The decay factor scales the contribution of the clusters as estimated thus far.
+If the decay factor is 1, all batches are weighted equally. If the decay factor
+is 0, new centroids are determined entirely by recent data. Lower values
+correspond to more forgetting.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:------------|:-------|:-------------|:---------------|
+| featuresCol | Vector | `"features"` | Feature vector |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:--------------|:--------|:---------------|:-------------------------|
+| predictionCol | Integer | `"prediction"` | Predicted cluster center |
+
+### Parameters
+
+Below are the parameters required by `OnlineKMeansModel`.
+
+| Key | Default | Type | Required | Description |
+|-----------------|----------------|---------|----------|---------------------------------------------------------------------------|
+| distanceMeasure | `euclidean` | String | no | Distance measure. Supported values: `'euclidean', 'manhattan', 'cosine'`. |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+| k | `2` | Integer | no | The max number of clusters to create. |
+
+`OnlineKMeans` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+|-----------------|------------------|---------|----------|-------------------------------------------------------|
+| batchStrategy | `COUNT_STRATEGY` | String | no | Strategy to create mini batch from online train data. |
+| globalBatchSize | `32` | Integer | no | Global batch size of training algorithms. |
+| decayFactor | `0.` | Double | no | The forgetfulness of the previous centroids. |
+| seed | null | Long | no | The random seed. |
+
+### Examples
+
+{{< tabs online_examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.clustering.kmeans.KMeansModelData;
+import org.apache.flink.ml.clustering.kmeans.OnlineKMeans;
+import org.apache.flink.ml.clustering.kmeans.OnlineKMeansModel;
+import org.apache.flink.ml.examples.util.PeriodicSourceFunction;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/** Simple program that trains an OnlineKMeans model and uses it for clustering. */
+public class OnlineKMeansExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input training and prediction data. Both are infinite streams that periodically
+ // sends out provided data to trigger model update and prediction.
+ List trainData1 =
+ Arrays.asList(
+ Row.of(Vectors.dense(0.0, 0.0)),
+ Row.of(Vectors.dense(0.0, 0.3)),
+ Row.of(Vectors.dense(0.3, 0.0)),
+ Row.of(Vectors.dense(9.0, 0.0)),
+ Row.of(Vectors.dense(9.0, 0.6)),
+ Row.of(Vectors.dense(9.6, 0.0)));
+
+ List trainData2 =
+ Arrays.asList(
+ Row.of(Vectors.dense(10.0, 100.0)),
+ Row.of(Vectors.dense(10.0, 100.3)),
+ Row.of(Vectors.dense(10.3, 100.0)),
+ Row.of(Vectors.dense(-10.0, -100.0)),
+ Row.of(Vectors.dense(-10.0, -100.6)),
+ Row.of(Vectors.dense(-10.6, -100.0)));
+
+ List predictData =
+ Arrays.asList(
+ Row.of(Vectors.dense(10.0, 10.0)), Row.of(Vectors.dense(-10.0, 10.0)));
+
+ SourceFunction trainSource =
+ new PeriodicSourceFunction(1000, Arrays.asList(trainData1, trainData2));
+ DataStream trainStream =
+ env.addSource(trainSource, new RowTypeInfo(DenseVectorTypeInfo.INSTANCE));
+ Table trainTable = tEnv.fromDataStream(trainStream).as("features");
+
+ SourceFunction predictSource =
+ new PeriodicSourceFunction(1000, Collections.singletonList(predictData));
+ DataStream predictStream =
+ env.addSource(predictSource, new RowTypeInfo(DenseVectorTypeInfo.INSTANCE));
+ Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+ // Creates an online K-means object and initializes its parameters and initial model data.
+ OnlineKMeans onlineKMeans =
+ new OnlineKMeans()
+ .setFeaturesCol("features")
+ .setPredictionCol("prediction")
+ .setGlobalBatchSize(6)
+ .setInitialModelData(
+ KMeansModelData.generateRandomModelData(tEnv, 2, 2, 0.0, 0));
+
+ // Trains the online K-means Model.
+ OnlineKMeansModel onlineModel = onlineKMeans.fit(trainTable);
+
+ // Uses the online K-means Model for predictions.
+ Table outputTable = onlineModel.transform(predictTable)[0];
+
+ // Extracts and displays the results. As training data stream continuously triggers the
+ // update of the internal k-means model data, clustering results of the same predict dataset
+ // would change over time.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row1 = it.next();
+ DenseVector features1 = (DenseVector) row1.getField(onlineKMeans.getFeaturesCol());
+ Integer clusterId1 = (Integer) row1.getField(onlineKMeans.getPredictionCol());
+ Row row2 = it.next();
+ DenseVector features2 = (DenseVector) row2.getField(onlineKMeans.getFeaturesCol());
+ Integer clusterId2 = (Integer) row2.getField(onlineKMeans.getPredictionCol());
+ if (Objects.equals(clusterId1, clusterId2)) {
+ System.out.printf("%s and %s are now in the same cluster.\n", features1, features2);
+ } else {
+ System.out.printf(
+ "%s and %s are now in different clusters.\n", features1, features2);
+ }
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/evaluation/_index.md b/docs/content.zh/docs/operators/evaluation/_index.md
new file mode 100644
index 000000000..355e5577e
--- /dev/null
+++ b/docs/content.zh/docs/operators/evaluation/_index.md
@@ -0,0 +1,25 @@
+---
+title: Evaluation
+bookCollapseSection: true
+weight: 1
+aliases:
+ - /operators/evaluation/
+---
+
diff --git a/docs/content.zh/docs/operators/evaluation/binaryclassificationevaluator.md b/docs/content.zh/docs/operators/evaluation/binaryclassificationevaluator.md
new file mode 100644
index 000000000..6e2d1a9ee
--- /dev/null
+++ b/docs/content.zh/docs/operators/evaluation/binaryclassificationevaluator.md
@@ -0,0 +1,192 @@
+---
+title: "Binary Classification Evaluator"
+weight: 1
+type: docs
+aliases:
+- /operators/evaluation/binaryclassificationevaluator.html
+---
+
+
+
+## Binary Classification Evaluator
+
+Binary Classification Evaluator calculates the evaluation metrics for binary
+classification. The input data has `rawPrediction`, `label`, and an optional
+weight column. The `rawPrediction` can be of type double (binary 0/1 prediction,
+or probability of label 1) or of type vector (length-2 vector of raw
+predictions, scores, or label probabilities). The output may contain different
+metrics defined by the parameter `MetricsNames`.
+### Input Columns
+
+| Param name | Type | Default | Description |
+| :--------------- | :------------ | :-------------- |:---------------------------|
+| labelCol | Number | `"label"` | The label of this entry. |
+| rawPredictionCol | Vector/Number | `rawPrediction` | The raw prediction result. |
+| weightCol | Number | `null` | The weight of this entry. |
+
+### Output Columns
+
+| Column name | Type | Description |
+| ----------------- | ------ |--------------------------------------------------------------------------------------------------|
+| "areaUnderROC" | Double | The area under the receiver operating characteristic (ROC) curve. |
+| "areaUnderPR" | Double | The area under the precision-recall curve. |
+| "areaUnderLorenz" | Double | Kolmogorov-Smirnov, measures the ability of the model to separate positive and negative samples. |
+| "ks" | Double | The area under the lorenz curve. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|------------------|-----------------------------------|----------|----------|--------------------------------------------------------------------------------------------------------|
+| labelCol | `"label"` | String | no | Label column name. |
+| weightCol | `null` | String | no | Weight column name. |
+| rawPredictionCol | `"rawPrediction"` | String | no | Raw prediction column name. |
+| metricsNames | `["areaUnderROC", "areaUnderPR"]` | String[] | no | Names of the output metrics. Supported values: 'areaUnderROC', 'areaUnderPR', 'areaUnderLorenz', 'ks'. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.evaluation.binaryclassification.BinaryClassificationEvaluator;
+import org.apache.flink.ml.evaluation.binaryclassification.BinaryClassificationEvaluatorParams;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+/**
+ * Simple program that creates a BinaryClassificationEvaluator instance and uses it for evaluation.
+ */
+public class BinaryClassificationEvaluatorExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream =
+ env.fromElements(
+ Row.of(1.0, Vectors.dense(0.1, 0.9)),
+ Row.of(1.0, Vectors.dense(0.2, 0.8)),
+ Row.of(1.0, Vectors.dense(0.3, 0.7)),
+ Row.of(0.0, Vectors.dense(0.25, 0.75)),
+ Row.of(0.0, Vectors.dense(0.4, 0.6)),
+ Row.of(1.0, Vectors.dense(0.35, 0.65)),
+ Row.of(1.0, Vectors.dense(0.45, 0.55)),
+ Row.of(0.0, Vectors.dense(0.6, 0.4)),
+ Row.of(0.0, Vectors.dense(0.7, 0.3)),
+ Row.of(1.0, Vectors.dense(0.65, 0.35)),
+ Row.of(0.0, Vectors.dense(0.8, 0.2)),
+ Row.of(1.0, Vectors.dense(0.9, 0.1)));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("label", "rawPrediction");
+
+ // Creates a BinaryClassificationEvaluator object and initializes its parameters.
+ BinaryClassificationEvaluator evaluator =
+ new BinaryClassificationEvaluator()
+ .setMetricsNames(
+ BinaryClassificationEvaluatorParams.AREA_UNDER_PR,
+ BinaryClassificationEvaluatorParams.KS,
+ BinaryClassificationEvaluatorParams.AREA_UNDER_ROC);
+
+ // Uses the BinaryClassificationEvaluator object for evaluations.
+ Table outputTable = evaluator.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ Row evaluationResult = outputTable.execute().collect().next();
+ System.out.printf(
+ "Area under the precision-recall curve: %s\n",
+ evaluationResult.getField(BinaryClassificationEvaluatorParams.AREA_UNDER_PR));
+ System.out.printf(
+ "Area under the receiver operating characteristic curve: %s\n",
+ evaluationResult.getField(BinaryClassificationEvaluatorParams.AREA_UNDER_ROC));
+ System.out.printf(
+ "Kolmogorov-Smirnov value: %s\n",
+ evaluationResult.getField(BinaryClassificationEvaluatorParams.KS));
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a BinaryClassificationEvaluator instance and uses
+# it for evaluation.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.evaluation.binaryclassification import BinaryClassificationEvaluator
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+ env.from_collection([
+ (1.0, Vectors.dense(0.1, 0.9)),
+ (1.0, Vectors.dense(0.2, 0.8)),
+ (1.0, Vectors.dense(0.3, 0.7)),
+ (0.0, Vectors.dense(0.25, 0.75)),
+ (0.0, Vectors.dense(0.4, 0.6)),
+ (1.0, Vectors.dense(0.35, 0.65)),
+ (1.0, Vectors.dense(0.45, 0.55)),
+ (0.0, Vectors.dense(0.6, 0.4)),
+ (0.0, Vectors.dense(0.7, 0.3)),
+ (1.0, Vectors.dense(0.65, 0.35)),
+ (0.0, Vectors.dense(0.8, 0.2)),
+ (1.0, Vectors.dense(0.9, 0.1))
+ ],
+ type_info=Types.ROW_NAMED(
+ ['label', 'rawPrediction'],
+ [Types.DOUBLE(), DenseVectorTypeInfo()]))
+)
+
+# create a binary classification evaluator object and initialize its parameters
+evaluator = BinaryClassificationEvaluator() \
+ .set_metrics_names('areaUnderPR', 'ks', 'areaUnderROC')
+
+# use the binary classification evaluator model for evaluations
+output = evaluator.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+result = t_env.to_data_stream(output).execute_and_collect().next()
+print('Area under the precision-recall curve: '
+ + str(result[field_names.index('areaUnderPR')]))
+print('Area under the receiver operating characteristic curve: '
+ + str(result[field_names.index('areaUnderROC')]))
+print('Kolmogorov-Smirnov value: '
+ + str(result[field_names.index('ks')]))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/feature/_index.md b/docs/content.zh/docs/operators/feature/_index.md
new file mode 100644
index 000000000..a87ec2326
--- /dev/null
+++ b/docs/content.zh/docs/operators/feature/_index.md
@@ -0,0 +1,25 @@
+---
+title: Feature Engineering
+bookCollapseSection: true
+weight: 1
+aliases:
+ - /operators/feature/
+---
+
diff --git a/docs/content.zh/docs/operators/feature/binarizer.md b/docs/content.zh/docs/operators/feature/binarizer.md
new file mode 100644
index 000000000..68e700abd
--- /dev/null
+++ b/docs/content.zh/docs/operators/feature/binarizer.md
@@ -0,0 +1,183 @@
+---
+title: "Binarizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/binarizer.html
+---
+
+
+
+## Binarizer
+
+Binarizer binarizes the columns of continuous features by the given thresholds.
+The continuous features may be DenseVector, SparseVector, or Numerical Value.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:--------------|:--------|:--------------------------------|
+| inputCols | Number/Vector | `null` | Number/Vectors to be binarized. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:--------------|:--------|:--------------------------|
+| outputCols | Number/Vector | `null` | Binarized Number/Vectors. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|-------------|-----------|----------|----------|------------------------------------------------------|
+| inputCols | `null` | String[] | yes | Input column names. |
+| outputCols | `null` | String[] | yes | Output column name. |
+| thresholds | `null` | Double[] | yes | The thresholds used to binarize continuous features. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.binarizer.Binarizer;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a Binarizer instance and uses it for feature engineering. */
+public class BinarizerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream =
+ env.fromElements(
+ Row.of(
+ 1,
+ Vectors.dense(1, 2),
+ Vectors.sparse(
+ 17, new int[] {0, 3, 9}, new double[] {1.0, 2.0, 7.0})),
+ Row.of(
+ 2,
+ Vectors.dense(2, 1),
+ Vectors.sparse(
+ 17, new int[] {0, 2, 14}, new double[] {5.0, 4.0, 1.0})),
+ Row.of(
+ 3,
+ Vectors.dense(5, 18),
+ Vectors.sparse(
+ 17, new int[] {0, 11, 12}, new double[] {2.0, 4.0, 4.0})));
+
+ Table inputTable = tEnv.fromDataStream(inputStream).as("f0", "f1", "f2");
+
+ // Creates a Binarizer object and initializes its parameters.
+ Binarizer binarizer =
+ new Binarizer()
+ .setInputCols("f0", "f1", "f2")
+ .setOutputCols("of0", "of1", "of2")
+ .setThresholds(0.0, 0.0, 0.0);
+
+ // Transforms input data.
+ Table outputTable = binarizer.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ Object[] inputValues = new Object[binarizer.getInputCols().length];
+ Object[] outputValues = new Object[binarizer.getInputCols().length];
+ for (int i = 0; i < inputValues.length; i++) {
+ inputValues[i] = row.getField(binarizer.getInputCols()[i]);
+ outputValues[i] = row.getField(binarizer.getOutputCols()[i]);
+ }
+
+ System.out.printf(
+ "Input Values: %s\tOutput Values: %s\n",
+ Arrays.toString(inputValues), Arrays.toString(outputValues));
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a Binarizer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.feature.binarizer import Binarizer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ (1,
+ Vectors.dense(3, 4)),
+ (2,
+ Vectors.dense(6, 2))
+ ],
+ type_info=Types.ROW_NAMED(
+ ['f0', 'f1'],
+ [Types.INT(), DenseVectorTypeInfo()])))
+
+# create an binarizer object and initialize its parameters
+binarizer = Binarizer() \
+ .set_input_cols('f0', 'f1') \
+ .set_output_cols('of0', 'of1') \
+ .set_thresholds(1.5, 3.5)
+
+# use the binarizer for feature engineering
+output = binarizer.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in binarizer.get_input_cols()]
+output_values = [None for _ in binarizer.get_output_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+ for i in range(len(binarizer.get_input_cols())):
+ input_values[i] = result[field_names.index(binarizer.get_input_cols()[i])]
+ output_values[i] = result[field_names.index(binarizer.get_output_cols()[i])]
+ print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/feature/bucketizer.md b/docs/content.zh/docs/operators/feature/bucketizer.md
new file mode 100644
index 000000000..c19abfa80
--- /dev/null
+++ b/docs/content.zh/docs/operators/feature/bucketizer.md
@@ -0,0 +1,179 @@
+---
+title: "Bucketizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/bucketizer.html
+---
+
+
+
+## Bucketizer
+
+Bucketizer is an algorithm that maps multiple columns of continuous features to
+multiple columns of discrete features, i.e., buckets indices. The indices are in
+[0, numSplitsInThisColumn - 1].
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:--------|:--------------------------------------|
+| inputCols | Number | `null` | Continuous features to be bucketized. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:--------|:----------------------|
+| outputCols | Double | `null` | Discretized features. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols | `null` | String[] | yes | Input column names. |
+| outputCols | `null` | String[] | yes | Output column names. |
+| handleInvalid | `"error"` | String | no | Strategy to handle invalid entries. Supported values: 'error', 'skip', 'keep'. |
+| splitsArray | `null` | Double\[][] | yes | Array of split points for mapping continuous features into buckets. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.common.param.HasHandleInvalid;
+import org.apache.flink.ml.feature.bucketizer.Bucketizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a Bucketizer instance and uses it for feature engineering. */
+public class BucketizerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream = env.fromElements(Row.of(-0.5, 0.0, 1.0, 0.0));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("f1", "f2", "f3", "f4");
+
+ // Creates a Bucketizer object and initializes its parameters.
+ Double[][] splitsArray =
+ new Double[][] {
+ new Double[] {-0.5, 0.0, 0.5},
+ new Double[] {-1.0, 0.0, 2.0},
+ new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY},
+ new Double[] {Double.NEGATIVE_INFINITY, 1.5, Double.POSITIVE_INFINITY}
+ };
+ Bucketizer bucketizer =
+ new Bucketizer()
+ .setInputCols("f1", "f2", "f3", "f4")
+ .setOutputCols("o1", "o2", "o3", "o4")
+ .setSplitsArray(splitsArray)
+ .setHandleInvalid(HasHandleInvalid.SKIP_INVALID);
+
+ // Uses the Bucketizer object for feature transformations.
+ Table outputTable = bucketizer.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ double[] inputValues = new double[bucketizer.getInputCols().length];
+ double[] outputValues = new double[bucketizer.getInputCols().length];
+ for (int i = 0; i < inputValues.length; i++) {
+ inputValues[i] = (double) row.getField(bucketizer.getInputCols()[i]);
+ outputValues[i] = (double) row.getField(bucketizer.getOutputCols()[i]);
+ }
+
+ System.out.printf(
+ "Input Values: %s\tOutput Values: %s\n",
+ Arrays.toString(inputValues), Arrays.toString(outputValues));
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a Bucketizer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.feature.bucketizer import Bucketizer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (-0.5, 0.0, 1.0, 0.0),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['f1', 'f2', 'f3', 'f4'],
+ [Types.DOUBLE(), Types.DOUBLE(), Types.DOUBLE(), Types.DOUBLE()])
+ ))
+
+# create a bucketizer object and initialize its parameters
+splits_array = [
+ [-0.5, 0.0, 0.5],
+ [-1.0, 0.0, 2.0],
+ [float('-inf'), 10.0, float('inf')],
+ [float('-inf'), 1.5, float('inf')],
+]
+
+bucketizer = Bucketizer() \
+ .set_input_cols('f1', 'f2', 'f3', 'f4') \
+ .set_output_cols('o1', 'o2', 'o3', 'o4') \
+ .set_splits_array(splits_array)
+
+# use the bucketizer model for feature engineering
+output = bucketizer.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in bucketizer.get_input_cols()]
+output_values = [None for _ in bucketizer.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+ for i in range(len(bucketizer.get_input_cols())):
+ input_values[i] = result[field_names.index(bucketizer.get_input_cols()[i])]
+ output_values[i] = result[field_names.index(bucketizer.get_output_cols()[i])]
+ print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/feature/countvectorizer.md b/docs/content.zh/docs/operators/feature/countvectorizer.md
new file mode 100644
index 000000000..b6658c06c
--- /dev/null
+++ b/docs/content.zh/docs/operators/feature/countvectorizer.md
@@ -0,0 +1,182 @@
+---
+title: "CountVectorizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/countvectorizer.html
+---
+
+
+
+## CountVectorizer
+
+CountVectorizer is an algorithm that converts a collection of text
+documents to vectors of token counts. When an a-priori dictionary is not
+available, CountVectorizer can be used as an estimator to extract the
+vocabulary, and generates a CountVectorizerModel. The model produces sparse
+representations for the documents over the vocabulary, which can then be
+passed to other algorithms like LDA.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:---------|:----------|:--------------------|
+| inputCol | String[] | `"input"` | Input string array. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------------|:-----------|:------------------------|
+| outputCol | SparseVector | `"output"` | Vector of token counts. |
+
+### Parameters
+
+Below are the parameters required by `CountVectorizerModel`.
+
+| Key | Default | Type | Required | Description |
+|------------|------------|---------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+| minTF | `1.0` | Double | no | Filter to ignore rare words in a document. For each document, terms with frequency/count less than the given threshold are ignored. If this is an integer >= 1, then this specifies a count (of times the term must appear in the document); if this is a double in [0,1), then this specifies a fraction (out of the document's token count). |
+| binary | `false` | Boolean | no | Binary toggle to control the output vector values. If True, all nonzero counts (after minTF filter applied) are set to 1.0. |
+
+`CountVectorizer` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+|:---------------|:-----------|:---------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| vocabularySize | `2^18` | Integer | no | Max size of the vocabulary. CountVectorizer will build a vocabulary that only considers the top vocabulary size terms ordered by term frequency across the corpus. |
+| minDF | `1.0` | Double | no | Specifies the minimum number of different documents a term must appear in to be included in the vocabulary. If this is an integer >= 1, this specifies the number of documents the term must appear in; if this is a double in [0,1), then this specifies the fraction of documents. |
+| maxDF | `2^63 - 1` | Double | no | Specifies the maximum number of different documents a term could appear in to be included in the vocabulary. A term that appears more than the threshold will be ignored. If this is an integer >= 1, this specifies the maximum number of documents the term could appear in; if this is a double in [0,1), then this specifies the maximum fraction of documents the term could appear in. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.countvectorizer.CountVectorizer;
+import org.apache.flink.ml.feature.countvectorizer.CountVectorizerModel;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/**
+ * Simple program that trains a {@link CountVectorizer} model and uses it for feature engineering.
+ */
+public class CountVectorizerExample {
+
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input training and prediction data.
+ DataStream dataStream =
+ env.fromElements(
+ Row.of((Object) new String[] {"a", "c", "b", "c"}),
+ Row.of((Object) new String[] {"c", "d", "e"}),
+ Row.of((Object) new String[] {"a", "b", "c"}),
+ Row.of((Object) new String[] {"e", "f"}),
+ Row.of((Object) new String[] {"a", "c", "a"}));
+ Table inputTable = tEnv.fromDataStream(dataStream).as("input");
+
+ // Creates an CountVectorizer object and initialize its parameters
+ CountVectorizer countVectorizer = new CountVectorizer();
+
+ // Trains the CountVectorizer model
+ CountVectorizerModel model = countVectorizer.fit(inputTable);
+
+ // Uses the CountVectorizer model for predictions.
+ Table outputTable = model.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ String[] inputValue = (String[]) row.getField(countVectorizer.getInputCol());
+ SparseVector outputValue = (SparseVector) row.getField(countVectorizer.getOutputCol());
+ System.out.printf(
+ "Input Value: %-15s \tOutput Value: %s\n",
+ Arrays.toString(inputValue), outputValue.toString());
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+
+# Simple program that creates an CountVectorizer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.feature.countvectorizer import CountVectorizer
+from pyflink.table import StreamTableEnvironment
+
+# Creates a new StreamExecutionEnvironment.
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# Creates a StreamTableEnvironment.
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input training and prediction data.
+input_table = t_env.from_data_stream(
+ env.from_collection([
+ (1, ['a', 'c', 'b', 'c'],),
+ (2, ['c', 'd', 'e'],),
+ (3, ['a', 'b', 'c'],),
+ (4, ['e', 'f'],),
+ (5, ['a', 'c', 'a'],),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['id', 'input', ],
+ [Types.INT(), Types.OBJECT_ARRAY(Types.STRING())])
+ ))
+
+# Creates an CountVectorizer object and initializes its parameters.
+count_vectorizer = CountVectorizer()
+
+# Trains the CountVectorizer Model.
+model = count_vectorizer.fit(input_table)
+
+# Uses the CountVectorizer Model for predictions.
+output = model.transform(input_table)[0]
+
+# Extracts and displays the results.
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_index = field_names.index(count_vectorizer.get_input_col())
+ output_index = field_names.index(count_vectorizer.get_output_col())
+ print('Input Value: %-20s Output Value: %10s' %
+ (str(result[input_index]), str(result[output_index])))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/feature/dct.md b/docs/content.zh/docs/operators/feature/dct.md
new file mode 100644
index 000000000..356260be5
--- /dev/null
+++ b/docs/content.zh/docs/operators/feature/dct.md
@@ -0,0 +1,151 @@
+---
+title: "DCT"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/dct.html
+---
+
+
+
+## DCT
+
+DCT is a Transformer that takes the 1D discrete cosine transform of a real
+vector. No zero padding is performed on the input vector. It returns a real
+vector of the same length representing the DCT. The return vector is scaled such
+that the transform matrix is unitary (aka scaled DCT-II).
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:----------|:---------------------------------------|
+| inputCol | Vector | `"input"` | Input vector to be cosine transformed. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:-----------|:----------------------------------|
+| outputCol | Vector | `"output"` | Cosine transformed output vector. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|-----------|------------|---------|----------|-------------------------------------------------------------------|
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+| inverse | `false` | Boolean | no | Whether to perform the inverse DCT (true) or forward DCT (false). |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.dct.DCT;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Simple program that creates a DCT instance and uses it for feature engineering. */
+public class DCTExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ List inputData =
+ Arrays.asList(
+ Vectors.dense(1.0, 1.0, 1.0, 1.0), Vectors.dense(1.0, 0.0, -1.0, 0.0));
+ Table inputTable = tEnv.fromDataStream(env.fromCollection(inputData)).as("input");
+
+ // Creates a DCT object and initializes its parameters.
+ DCT dct = new DCT();
+
+ // Uses the DCT object for feature transformations.
+ Table outputTable = dct.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ Vector inputValue = row.getFieldAs(dct.getInputCol());
+ Vector outputValue = row.getFieldAs(dct.getOutputCol());
+
+ System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
+ }
+ }
+}
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a DCT instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.feature.dct import DCT
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(1.0, 1.0, 1.0, 1.0),),
+ (Vectors.dense(1.0, 0.0, -1.0, 0.0),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [DenseVectorTypeInfo()])))
+
+# create a DCT object and initialize its parameters
+dct = DCT()
+
+# use the dct for feature engineering
+output = dct.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(dct.get_input_col())]
+ output_value = result[field_names.index(dct.get_output_col())]
+ print('Input Value: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/feature/elementwiseproduct.md b/docs/content.zh/docs/operators/feature/elementwiseproduct.md
new file mode 100644
index 000000000..0021c15b4
--- /dev/null
+++ b/docs/content.zh/docs/operators/feature/elementwiseproduct.md
@@ -0,0 +1,157 @@
+---
+title: "ElementwiseProduct"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/elementwiseproduct.html
+---
+
+
+
+## ElementwiseProduct
+
+ElementwiseProduct multiplies each input vector with a given scaling vector using
+Hadamard product. If the size of the input vector does not equal the size of the
+scaling vector, the transformer will throw an IllegalArgumentException.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:----------|:-----------------------|
+| inputCol | Vector | `"input"` | Features to be scaled. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:-----------|:-----------------|
+| outputCol | Vector | `"output"` | Scaled features. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|------------|------------|--------|----------|---------------------|
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+| scalingVec | `null` | String | yes | The scaling vector. |
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/**
+ * Simple program that creates an ElementwiseProduct instance and uses it for feature engineering.
+ */
+public class ElementwiseProductExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream =
+ env.fromElements(
+ Row.of(0, Vectors.dense(1.1, 3.2)), Row.of(1, Vectors.dense(2.1, 3.1)));
+
+ Table inputTable = tEnv.fromDataStream(inputStream).as("id", "vec");
+
+ // Creates an ElementwiseProduct object and initializes its parameters.
+ ElementwiseProduct elementwiseProduct =
+ new ElementwiseProduct()
+ .setInputCol("vec")
+ .setOutputCol("outputVec")
+ .setScalingVec(Vectors.dense(1.1, 1.1));
+
+ // Transforms input data.
+ Table outputTable = elementwiseProduct.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ Vector inputValue = (Vector) row.getField(elementwiseProduct.getInputCol());
+ Vector outputValue = (Vector) row.getField(elementwiseProduct.getOutputCol());
+ System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates an ElementwiseProduct instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.feature.elementwiseproduct import ElementwiseProduct
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ (1, Vectors.dense(2.1, 3.1)),
+ (2, Vectors.dense(1.1, 3.3))
+ ],
+ type_info=Types.ROW_NAMED(
+ ['id', 'vec'],
+ [Types.INT(), DenseVectorTypeInfo()])))
+
+# create an elementwise product object and initialize its parameters
+elementwise_product = ElementwiseProduct() \
+ .set_input_col('vec') \
+ .set_output_col('output_vec') \
+ .set_scaling_vec(Vectors.dense(1.1, 1.1))
+
+# use the elementwise product object for feature engineering
+output = elementwise_product.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(elementwise_product.get_input_col())]
+ output_value = result[field_names.index(elementwise_product.get_output_col())]
+ print('Input Value: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/feature/featurehasher.md b/docs/content.zh/docs/operators/feature/featurehasher.md
new file mode 100644
index 000000000..e804d9ae6
--- /dev/null
+++ b/docs/content.zh/docs/operators/feature/featurehasher.md
@@ -0,0 +1,177 @@
+---
+title: "FeatureHasher"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/featurehasher.html
+---
+
+
+
+## FeatureHasher
+
+FeatureHasher transforms a set of categorical or numerical features into a sparse vector of
+a specified dimension. The rules of hashing categorical columns and numerical columns are as
+follows:
+
+
+
For numerical columns, the index of this feature in the output vector is the hash value of
+ the column name and its correponding value is the same as the input.
+
For categorical columns, the index of this feature in the output vector is the hash value
+ of the string "column_name=value" and the corresponding value is 1.0.
+
+
+
If multiple features are projected into the same column, the output values are accumulated.
+For the hashing trick, see https://en.wikipedia.org/wiki/Feature_hashing for details.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:----------------------|:--------|:----------------------|
+| inputCols | Number/String/Boolean | `null` | Columns to be hashed. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:-----------|:---------------|
+| outputCol | Vector | `"output"` | Output vector. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|-----------------|------------|-----------|----------|---------------------------|
+| inputCols | `null` | String[] | yes | Input column names. |
+| outputCol | `"output"` | String | no | Output column name. |
+| categoricalCols | `[]` | String[] | no | Categorical column names. |
+| numFeatures | `262144` | Integer | no | The number of features. |
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.featurehasher.FeatureHasher;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a FeatureHasher instance and uses it for feature engineering. */
+public class FeatureHasherExample {
+ public static void main(String[] args) {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream dataStream =
+ env.fromCollection(
+ Arrays.asList(Row.of(0, "a", 1.0, true), Row.of(1, "c", 1.0, false)));
+ Table inputDataTable = tEnv.fromDataStream(dataStream).as("id", "f0", "f1", "f2");
+
+ // Creates a FeatureHasher object and initializes its parameters.
+ FeatureHasher featureHash =
+ new FeatureHasher()
+ .setInputCols("f0", "f1", "f2")
+ .setCategoricalCols("f0", "f2")
+ .setOutputCol("vec")
+ .setNumFeatures(1000);
+
+ // Uses the FeatureHasher object for feature transformations.
+ Table outputTable = featureHash.transform(inputDataTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ Object[] inputValues = new Object[featureHash.getInputCols().length];
+ for (int i = 0; i < inputValues.length; i++) {
+ inputValues[i] = row.getField(featureHash.getInputCols()[i]);
+ }
+ Vector outputValue = (Vector) row.getField(featureHash.getOutputCol());
+
+ System.out.printf(
+ "Input Values: %s \tOutput Value: %s\n",
+ Arrays.toString(inputValues), outputValue);
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a FeatureHasher instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.feature.featurehasher import FeatureHasher
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ (0, 'a', 1.0, True),
+ (1, 'c', 1.0, False),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['id', 'f0', 'f1', 'f2'],
+ [Types.INT(), Types.STRING(), Types.DOUBLE(), Types.BOOLEAN()])))
+
+# create a feature hasher object and initialize its parameters
+feature_hasher = FeatureHasher() \
+ .set_input_cols('f0', 'f1', 'f2') \
+ .set_categorical_cols('f0', 'f2') \
+ .set_output_col('vec') \
+ .set_num_features(1000)
+
+# use the feature hasher for feature engineering
+output = feature_hasher.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in feature_hasher.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+ for i in range(len(feature_hasher.get_input_cols())):
+ input_values[i] = result[field_names.index(feature_hasher.get_input_cols()[i])]
+ output_value = result[field_names.index(feature_hasher.get_output_col())]
+ print('Input Values: ' + str(input_values) + '\tOutput Value: ' + str(output_value))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content.zh/docs/operators/feature/hashingtf.md b/docs/content.zh/docs/operators/feature/hashingtf.md
new file mode 100644
index 000000000..d340d9096
--- /dev/null
+++ b/docs/content.zh/docs/operators/feature/hashingtf.md
@@ -0,0 +1,165 @@
+---
+title: "HashingTF"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/hashingtf.html
+---
+
+
+
+## HashingTF
+
+HashingTF maps a sequence of terms(strings, numbers, booleans)
+to a sparse vector with a specified dimension using the hashing
+trick. If multiple features are projected into the same column,
+the output values are accumulated by default.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:----------------------------------------------|:----------|:-------------------------|
+| inputCol | List/Array of primitive data types or strings | `"input"` | Input sequence of terms. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------------|:-----------|:----------------------|
+| outputCol | SparseVector | `"output"` | Output sparse vector. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|:------------|:-----------|:--------|:---------|:--------------------------------------------------------------------|
+| binary | `false` | Boolean | no | Whether each dimension of the output vector is binary or not. |
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+| numFeatures | `262144` | Integer | no | The number of features. It will be the length of the output vector. |
+
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+
+import org.apache.flink.ml.feature.hashingtf.HashingTF;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Simple program that creates a HashingTF instance and uses it for feature engineering. */
+public class HashingTFExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream inputStream =
+ env.fromElements(
+ Row.of(
+ Arrays.asList(
+ "HashingTFTest", "Hashing", "Term", "Frequency", "Test")),
+ Row.of(
+ Arrays.asList(
+ "HashingTFTest", "Hashing", "Hashing", "Test", "Test")));
+
+ Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+ // Creates a HashingTF object and initializes its parameters.
+ HashingTF hashingTF =
+ new HashingTF().setInputCol("input").setOutputCol("output").setNumFeatures(128);
+
+ // Uses the HashingTF object for feature transformations.
+ Table outputTable = hashingTF.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ List