Skip to content

Commit

Permalink
Add multiple table file sink to base (#6049)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Dec 22, 2023
1 parent c8dcefc commit 085e0e5
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink;

import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
Expand All @@ -40,7 +41,10 @@
import java.util.UUID;
import java.util.stream.Collectors;

public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> {
public class BaseFileSinkWriter
implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>,
SupportMultiTableSinkWriter<WriteStrategy> {

protected final WriteStrategy writeStrategy;

public BaseFileSinkWriter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sink;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;

import java.util.List;
import java.util.Optional;

public abstract class BaseMultipleTableFileSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
SupportMultiTableSink {

private final HadoopConf hadoopConf;
private final HadoopFileSystemProxy hadoopFileSystemProxy;
private final FileSinkConfig fileSinkConfig;
private final WriteStrategy writeStrategy;
private String jobId;

public abstract String getPluginName();

public BaseMultipleTableFileSink(
HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
this.hadoopConf = hadoopConf;
this.fileSinkConfig =
new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType());
this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobId = jobContext.getJobId();
}

@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) {
return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, jobId, states);
}

@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(new FileSinkAggregatedCommitter(hadoopConf));
}

@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> createWriter(
SinkWriter.Context context) {
return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, jobId);
}

@Override
public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<FileAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,89 +17,16 @@

package org.apache.seatunnel.connectors.seatunnel.file.local.sink;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

import java.util.List;
import java.util.Optional;

public class LocalFileSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
SupportMultiTableSink {

private final HadoopConf hadoopConf;
private final HadoopFileSystemProxy hadoopFileSystemProxy;
private final FileSinkConfig fileSinkConfig;
private final WriteStrategy writeStrategy;
private String jobId;
public class LocalFileSink extends BaseMultipleTableFileSink {

public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
this.hadoopConf = new LocalFileHadoopConf();
this.fileSinkConfig =
new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType());
this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobId = jobContext.getJobId();
}

@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) {
return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId, states);
}

@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(new FileSinkAggregatedCommitter(hadoopConf));
}

@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> createWriter(
SinkWriter.Context context) {
return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId);
}

@Override
public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<FileAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
super(new LocalFileHadoopConf(), readonlyConfig, catalogTable);
}

@Override
Expand Down

This file was deleted.

0 comments on commit 085e0e5

Please sign in to comment.