Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Remove useless transform code come from setTypeInfo #5647

Merged
merged 6 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ public interface SeaTunnelPluginLifeCycle {
* org.apache.seatunnel.api.table.factory.Factory}
*/
@Deprecated
void prepare(Config pluginConfig) throws PrepareFailException;
default void prepare(Config pluginConfig) throws PrepareFailException {
throw new UnsupportedOperationException("prepare method is not supported");
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ default void open() {}
* @param inputDataType The data type info of upstream input.
*/
@Deprecated
void setTypeInfo(SeaTunnelDataType<T> inputDataType);
default void setTypeInfo(SeaTunnelDataType<T> inputDataType) {
throw new UnsupportedOperationException("setTypeInfo method is not supported");
}

/**
* Get the data type of the records produced by this transform.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
Expand Down Expand Up @@ -128,33 +127,26 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
return upstreamDataStreams;
}

private Dataset<Row> sparkTransform(SeaTunnelTransform transform, DatasetTableInfo tableInfo)
throws IOException {
private Dataset<Row> sparkTransform(SeaTunnelTransform transform, DatasetTableInfo tableInfo) {
Dataset<Row> stream = tableInfo.getDataset();
SeaTunnelDataType<?> seaTunnelDataType = tableInfo.getCatalogTable().getSeaTunnelRowType();
transform.setTypeInfo(seaTunnelDataType);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed refactor to remove it.

StructType outputSchema =
(StructType) TypeConverterUtils.convert(transform.getProducedType());
SeaTunnelRowConverter inputRowConverter = new SeaTunnelRowConverter(seaTunnelDataType);
SeaTunnelRowConverter outputRowConverter =
new SeaTunnelRowConverter(transform.getProducedType());
SeaTunnelDataType<?> inputDataType = tableInfo.getCatalogTable().getSeaTunnelRowType();
SeaTunnelDataType<?> outputDataTYpe =
transform.getProducedCatalogTable().getSeaTunnelRowType();
StructType outputSchema = (StructType) TypeConverterUtils.convert(outputDataTYpe);
SeaTunnelRowConverter inputRowConverter = new SeaTunnelRowConverter(inputDataType);
SeaTunnelRowConverter outputRowConverter = new SeaTunnelRowConverter(outputDataTYpe);
ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);
Dataset<Row> result =
stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) -> {
TransformIterator iterator =
new TransformIterator(
rowIterator,
transform,
outputSchema,
inputRowConverter,
outputRowConverter);
return iterator;
},
encoder)
.filter(Objects::nonNull);
return result;
return stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) ->
new TransformIterator(
rowIterator,
transform,
outputSchema,
inputRowConverter,
outputRowConverter),
encoder)
.filter(Objects::nonNull);
}

private static class TransformIterator implements Iterator<Row>, Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelT

protected volatile CatalogTable outputCatalogTable;

public AbstractCatalogSupportTransform() {
super();
}

public AbstractCatalogSupportTransform(@NonNull CatalogTable inputCatalogTable) {
this.inputCatalogTable = inputCatalogTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,19 @@

package org.apache.seatunnel.transform.common;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;

import java.util.Objects;

public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform<SeaTunnelRow> {

private static final String RESULT_TABLE_NAME = CommonOptions.RESULT_TABLE_NAME.key();
private static final String SOURCE_TABLE_NAME = CommonOptions.SOURCE_TABLE_NAME.key();

protected String inputTableName;
protected SeaTunnelRowType inputRowType;

protected String outputTableName;
protected SeaTunnelRowType outputRowType;

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
throw new IllegalArgumentException(
"The configuration missing key: " + SOURCE_TABLE_NAME);
}
if (!pluginConfig.hasPath(RESULT_TABLE_NAME)) {
throw new IllegalArgumentException(
"The configuration missing key: " + RESULT_TABLE_NAME);
}

this.inputTableName = pluginConfig.getString(SOURCE_TABLE_NAME);
this.outputTableName = pluginConfig.getString(RESULT_TABLE_NAME);
if (Objects.equals(inputTableName, outputTableName)) {
throw new IllegalArgumentException(
"source and result cannot be equals: "
+ inputTableName
+ ", "
+ outputTableName);
}

setConfig(pluginConfig);
}

@Override
public void setTypeInfo(SeaTunnelDataType<SeaTunnelRow> inputDataType) {
this.inputRowType = (SeaTunnelRowType) inputDataType;
this.outputRowType = transformRowType(clone(inputRowType));
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return outputRowType;
Expand All @@ -80,34 +40,13 @@ public SeaTunnelRow map(SeaTunnelRow row) {
return transformRow(row);
}

protected abstract void setConfig(Config pluginConfig);

/**
* Outputs transformed row type.
*
* @param inputRowType upstream input row type
* @return
*/
protected abstract SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType);

/**
* Outputs transformed row data.
*
* @param inputRow upstream input row data
* @return
*/
protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);

private static SeaTunnelRowType clone(SeaTunnelRowType rowType) {
String[] fieldNames = new String[rowType.getTotalFields()];
System.arraycopy(rowType.getFieldNames(), 0, fieldNames, 0, fieldNames.length);

SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[rowType.getTotalFields()];
System.arraycopy(rowType.getFieldTypes(), 0, fieldTypes, 0, fieldTypes.length);

return new SeaTunnelRowType(fieldNames, fieldTypes);
}

@Override
public CatalogTable getProducedCatalogTable() {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,15 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import lombok.NoArgsConstructor;
import lombok.NonNull;

@NoArgsConstructor
public abstract class FilterRowTransform extends AbstractCatalogSupportTransform {

public FilterRowTransform(@NonNull CatalogTable inputCatalogTable) {
super(inputCatalogTable);
}

@Override
protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
return inputRowType;
}

@Override
protected TableSchema transformTableSchema() {
return inputCatalogTable.getTableSchema().copy();
Expand Down
Loading