Skip to content

Commit

Permalink
[ISSUE #45]Optimize transform api (#46)
Browse files Browse the repository at this point in the history
* SinkTask and SourceTask implement the validate method apache/rocketmq-connect#85

* Adjust the init and start methods of the component interface

* Set pause and resume to deprecated methods. It feels like they can be removed

* Add struct object and optimize schema and schema builder API #41

* add offset storage writer #41

* add getter and setter method #41

* add SchemaAndValue #41

* add logical type #41

* Schemabuilder add required method

* schema add  hashCode and equals method

* fixed doc method

* Field add equals and hashcode method

* optimize api #85

* Optimize transform api #45

* Optimize transform api and add RecordConverter
  • Loading branch information
sunxiaojian authored Jun 21, 2022
1 parent c1b5606 commit 949e27b
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import io.openmessaging.KeyValue;

public interface Component<R extends ComponentContext> {
public interface Component {

/**
* Should invoke before start the connector.
Expand All @@ -25,13 +25,6 @@ public interface Component<R extends ComponentContext> {
*/
void validate(KeyValue config);

/**
* Init the component
*
* @param context component context
*/
void init(R context);

/**
* Start the component
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package io.openmessaging.connector.api.component;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;

/**
Expand All @@ -24,5 +25,19 @@
*/
public interface Transform<R extends ConnectRecord> extends Component {

/**
* Should invoke before start the connector.
*
* @param config component config
*/
@Override
default void validate(KeyValue config){

}
/**
* transform record
* @param record
* @return
*/
R doTransform(R record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Component;
import io.openmessaging.connector.api.component.ComponentContext;
import io.openmessaging.connector.api.component.task.Task;
import java.util.List;

public abstract class Connector implements Component<ConnectorContext> {
public abstract class Connector<R extends ComponentContext> implements Component {

protected ConnectorContext connectorContext;

@Override
public void init(ConnectorContext connectorContext) {
this.connectorContext = connectorContext;
}

protected ConnectorContext getConnectorContext() {
return connectorContext;
}

/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.openmessaging.connector.api.component.Component;
import io.openmessaging.connector.api.component.ComponentContext;

public interface Task<R extends ComponentContext> extends Component<R> {
public interface Task<R extends ComponentContext> extends Component{

/**
* Should invoke before start the connector.
Expand All @@ -27,4 +27,11 @@ public interface Task<R extends ComponentContext> extends Component<R> {
*/
@Override
default void validate(KeyValue config) {}

/**
* Init the component
*
* @param context component context
*/
void init(R context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import java.util.Map;

/**
* sink task
* The sink task API definition is used to define the logic for data writing
*/
public abstract class SinkTask implements Task<SinkTaskContext> {

protected SinkTaskContext sinkTaskContext;

@Override public void init(SinkTaskContext sinkTaskContext) {
@Override
public void init(SinkTaskContext sinkTaskContext) {
this.sinkTaskContext = sinkTaskContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import io.openmessaging.connector.api.data.ConnectRecord;
import java.util.List;

/**
* The source task API definition is used to define the logic for data pulling
*/
public abstract class SourceTask implements Task<SourceTaskContext> {

protected SourceTaskContext sourceTaskContext;

@Override
public void init(SourceTaskContext sourceTaskContext) {
this.sourceTaskContext = sourceTaskContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.connector.api.data;

import io.openmessaging.KeyValue;

import java.util.Map;

/**
* The topic parameter is mainly a compatible schema registry
* abstract converter
*/
public interface RecordConverter {


/**
* Config is used for parameter passing in the conversion process
* @param configs
*/
default void configure(Map<String, ?> configs) {

}

/**
* Convert ConnectRecord to byte[]
* @param topic the topic associated with the data
* @param schema record schema
* @param value record value
* @return
*/
byte[] fromConnectData(String topic, Schema schema, Object value);


/**
* The provided subject and extension may be used in the record as needed.
* @param topic the topic associated with the data
* @param extensions
* @param schema rocketmq connect record schema
* @param value rocketmq connect record value
* @return
*/
default byte[] fromConnectData(String topic, KeyValue extensions, Schema schema, Object value) {
return fromConnectData(topic, schema, value);
}

/**
* Convert a byte[] object to a Rocketmq Connect data object.
*
* @param topic the topic associated with the data
* @param value the value to convert
* @return an object containing the {@link Schema} and the converted value
*/
SchemaAndValue toConnectData(String topic, byte[] value);


/**
* The provided subject and extension may be used in the record as needed.
* @param topic the topic associated with the data
* @param extensions transform property
* @param value
* @return
*/
default SchemaAndValue toConnectData(String topic, KeyValue extensions, byte[] value) {
return toConnectData(topic, value);
}

}

0 comments on commit 949e27b

Please sign in to comment.