Skip to content

Commit

Permalink
some commit
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed May 28, 2024
1 parent 02fa2d3 commit 9b40ba8
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.batch;

import io.cdap.cdap.api.exception.WrappedException;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class WrappedInputFormat<K, V> extends InputFormat<K, V> {
private final InputFormat<K, V> inputFormat;
private final String stageName;

@Override
public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
try {
return inputFormat.getSplits(jobContext);
} catch (Exception e) {
if (stageName != null) {
throw new WrappedException(e, stageName);
}
throw e;
}
}

@Override
public RecordReader<K, V> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
try {
return new WrappedRecordReader<>(inputFormat.createRecordReader(inputSplit,
taskAttemptContext), stageName);
} catch (Exception e) {
if (stageName != null) {
throw new WrappedException(e, stageName);
}
throw e;
}
}

/**
* Returns the delegating {@link InputFormat} based on the current configuration.
*
* @param classLoader the {@link ClassLoader} for loading input format
* @param inputFormatClassName the name of {@link InputFormat} class
* @throws IOException if failed to instantiate the input format class
*/
public WrappedInputFormat(ClassLoader classLoader, String inputFormatClassName,
String stageName) throws IOException {
this.stageName = stageName;
if (inputFormatClassName == null) {
throw new IllegalArgumentException("Missing configuration for the InputFormat to use");
}
if (inputFormatClassName.equals(getClass().getName())) {
throw new IllegalArgumentException("Cannot delegate InputFormat to the same class");
}
try {
//noinspection unchecked
@SuppressWarnings("unchecked")
Class<InputFormat<K, V>> inputFormatClass = (Class<InputFormat<K, V>>) classLoader.loadClass(
inputFormatClassName);
this.inputFormat = inputFormatClass.newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new IOException(
String.format("Unable to instantiate delegate input format %s", inputFormatClassName), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.batch;

import io.cdap.cdap.api.exception.WrappedException;
import java.io.IOException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class WrappedRecordReader<K, V> extends RecordReader<K, V> {
private final RecordReader<K, V> recordReader;
private final String stageName;

public WrappedRecordReader(RecordReader<K, V> recordReader, String stageName) {
this.recordReader = recordReader;
this.stageName = stageName;
}

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
try {
recordReader.initialize(inputSplit, taskAttemptContext);
} catch (Exception e) {
if (stageName != null) {
throw new WrappedException(e, stageName);
}
throw e;
}
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
try {
return recordReader.nextKeyValue();
} catch (Exception e) {
if (stageName != null) {
throw new WrappedException(e, stageName);
}
throw e;
}
}

@Override
public K getCurrentKey() throws IOException, InterruptedException {
try {
return recordReader.getCurrentKey();
} catch (Exception e) {
if (stageName != null) {
throw new WrappedException(e, stageName);
}
throw e;
}
}

@Override
public V getCurrentValue() throws IOException, InterruptedException {
try {
return recordReader.getCurrentValue();
} catch (Exception e) {
if (stageName != null) {
throw new WrappedException(e, stageName);
}
throw e;
}
}

@Override
public float getProgress() throws IOException, InterruptedException {
try {
return recordReader.getProgress();
} catch (Exception e) {
if (stageName != null) {
throw new WrappedException(e, stageName);
}
throw e;
}
}

@Override
public void close() throws IOException {
try {
recordReader.close();
} catch (Exception e) {
if (stageName != null) {
throw new WrappedException(e, stageName);
}
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private <T> BatchCollectionFactory<T> pullInternal(SQLDataset dataset) throws SQ
ClassLoader classLoader = Objects.firstNonNull(Thread.currentThread().getContextClassLoader(),
getClass().getClassLoader());
JavaPairRDD pairRDD = RDDUtils.readUsingInputFormat(jsc, sqlPullDataset, classLoader, Object.class,
Object.class);
Object.class, null);
JavaRDD<T> rdd = pairRDD.flatMap(new TransformFromPairFunction(sqlPullDataset.fromKeyValue()))
.map(f -> {
return f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package io.cdap.cdap.etl.spark.batch;

import com.google.common.base.Throwables;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.exception.WrappedException;
import io.cdap.cdap.etl.batch.WrappedInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.api.java.JavaPairRDD;
Expand All @@ -31,6 +31,7 @@

import java.io.IOException;
import java.util.Map;
import javax.annotation.Nullable;

/**
* Common RDD operations
Expand Down Expand Up @@ -60,21 +61,22 @@ public static <K, V> void saveHadoopDataset(JavaPairRDD<K, V> rdd, Configuration

@SuppressWarnings("unchecked")
public static <K, V> JavaPairRDD<K, V> readUsingInputFormat(JavaSparkContext jsc,
InputFormatProvider inputFormatProvider,
ClassLoader classLoader,
Class<K> keyClass, Class<V> valueClass) {
InputFormatProvider inputFormatProvider, ClassLoader classLoader,
Class<K> keyClass, Class<V> valueClass, @Nullable String stageName) {
Configuration hConf = new Configuration();
hConf.clear();
for (Map.Entry<String, String> entry : inputFormatProvider.getInputFormatConfiguration().entrySet()) {
hConf.set(entry.getKey(), entry.getValue());
}
try {
// Instantiate the wrapper class, passing the necessary parameters
@SuppressWarnings("unchecked")
Class<InputFormat> inputFormatClass = (Class<InputFormat>) classLoader.loadClass(
inputFormatProvider.getInputFormatClassName());
return jsc.newAPIHadoopRDD(hConf, inputFormatClass, keyClass, valueClass);
} catch (ClassNotFoundException e) {
throw Throwables.propagate(e);
WrappedInputFormat wrappedInputFormat = new WrappedInputFormat<>(classLoader,
inputFormatProvider.getInputFormatClassName(), stageName);

return jsc.newAPIHadoopRDD(hConf, wrappedInputFormat.getClass(), keyClass, valueClass);
} catch (IOException e) {
throw new WrappedException(e, stageName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,27 @@ public <K, V> JavaPairRDD<K, V> createRDD(JavaSparkExecutionContext sec, JavaSpa
}

Iterator<String> inputsIter = inputNames.iterator();
JavaPairRDD<K, V> inputRDD = createInputRDD(sec, jsc, inputsIter.next(), keyClass, valueClass);
JavaPairRDD<K, V> inputRDD = createInputRDD(sec, jsc, inputsIter.next(), keyClass, valueClass,
sourceName);
while (inputsIter.hasNext()) {
inputRDD = inputRDD.union(createInputRDD(sec, jsc, inputsIter.next(), keyClass, valueClass));
inputRDD = inputRDD.union(createInputRDD(sec, jsc, inputsIter.next(), keyClass, valueClass,
sourceName));
}
return inputRDD;
}

@SuppressWarnings("unchecked")
private <K, V> JavaPairRDD<K, V> createInputRDD(JavaSparkExecutionContext sec, JavaSparkContext jsc, String inputName,
Class<K> keyClass, Class<V> valueClass) {
private <K, V> JavaPairRDD<K, V> createInputRDD(JavaSparkExecutionContext sec,
JavaSparkContext jsc, String inputName, Class<K> keyClass,
Class<V> valueClass, String sourceName) {
if (inputFormatProviders.containsKey(inputName)) {
InputFormatProvider inputFormatProvider = inputFormatProviders.get(inputName);

ClassLoader classLoader = Objects.firstNonNull(currentThread().getContextClassLoader(),
getClass().getClassLoader());
getClass().getClassLoader());

return RDDUtils.readUsingInputFormat(jsc, inputFormatProvider, classLoader, keyClass,
valueClass);
valueClass, sourceName);
}

if (datasetInfos.containsKey(inputName)) {
Expand Down

0 comments on commit 9b40ba8

Please sign in to comment.