Skip to content

Commit

Permalink
[CDAP-20910] Reuse the Spark ClassLoader in app-fabric
Browse files Browse the repository at this point in the history
- For non local spark program execution purposes, we don’t need to isolate the Spark classloader, since there is no creation of the SparkContext object.
  • Loading branch information
chtyim committed Dec 4, 2023
1 parent 1be1fa0 commit 9ddb843
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.cdap.cdap.internal.app.runtime.ProgramClassLoader;

/**
* A provider for for program classloading creation.
* A provider for program classloading creation.
*/
public interface ProgramClassLoaderProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,22 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Provider for runtime system of programs.
*/
/** Provider for runtime system of programs. */
public interface ProgramRuntimeProvider {

/**
* Annotation for implementation to specify what are the supported {@link ProgramType}.
*/
/** Annotation for implementation to specify what are the supported {@link ProgramType}. */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@interface SupportedProgramType {

/**
* Returns the list of supported {@link ProgramType}.
*/
/** Returns the list of supported {@link ProgramType}. */
ProgramType[] value();
}

/**
* The execution mode of the program runtime system.
*/
/** The execution mode of the program runtime system. */
enum Mode {
LOCAL, DISTRIBUTED
LOCAL,
DISTRIBUTED
}

/**
Expand All @@ -70,12 +63,15 @@ enum Mode {
boolean isSupported(ProgramType programType, CConfiguration cConf);

/**
* Creates a ClassLoader for the given program type. This is useful if you need the program class
* loader but do not need to run a program.
* Returns a ClassLoader for the given program type. This is useful if you only need the runtime
* classloader for the given program type, but not for program execution.
*
* @param cConf The configuration to use
* @param programType The type of program
* @param cConf The configuration to use
* @return a {@link ClassLoader} for the given program runner
* @throws UnsupportedOperationException if the given program type is not supported by this
* provider. Caller can use the {@link #isSupported(ProgramType, CConfiguration)} method to
* check.
*/
ClassLoader createProgramClassLoader(CConfiguration cConf, ProgramType programType);
ClassLoader getRuntimeClassLoader(ProgramType programType, CConfiguration cConf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.cdap.cdap.internal.app.runtime.ProgramRuntimeProviderLoader;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.security.impersonation.EntityImpersonator;
import java.io.Closeable;
import java.io.File;
import java.util.Iterator;
import javax.annotation.Nullable;
Expand All @@ -48,21 +47,21 @@ final class ArtifactClassLoaderFactory {
private static final Logger LOG = LoggerFactory.getLogger(ArtifactClassLoaderFactory.class);

private final CConfiguration cConf;
@Nullable
private final ProgramRuntimeProviderLoader programRuntimeProviderLoader;
@Nullable private final ProgramRuntimeProviderLoader programRuntimeProviderLoader;
private final File tmpDir;

@VisibleForTesting
ArtifactClassLoaderFactory(CConfiguration cConf) {
this(cConf, null);
}

ArtifactClassLoaderFactory(CConfiguration cConf,
@Nullable ProgramRuntimeProviderLoader programRuntimeProviderLoader) {
ArtifactClassLoaderFactory(
CConfiguration cConf, @Nullable ProgramRuntimeProviderLoader programRuntimeProviderLoader) {
this.cConf = cConf;
this.programRuntimeProviderLoader = programRuntimeProviderLoader;
this.tmpDir = new File(cConf.get(Constants.CFG_LOCAL_DATA_DIR),
cConf.get(Constants.AppFabric.TEMP_DIR)).getAbsoluteFile();
this.tmpDir =
new File(cConf.get(Constants.CFG_LOCAL_DATA_DIR), cConf.get(Constants.AppFabric.TEMP_DIR))
.getAbsoluteFile();
}

/**
Expand All @@ -77,35 +76,30 @@ final class ArtifactClassLoaderFactory {
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
*/
CloseableClassLoader createClassLoader(File unpackDir) {
ClassLoader sparkClassLoader = null;
ClassLoader parentClassLoader = null;
if (programRuntimeProviderLoader != null) {
try {
// Try to create a ProgramClassLoader from the Spark runtime system if it is available.
// It is needed because we don't know what program types that an artifact might have.
// TODO: CDAP-5613. We shouldn't always expose the Spark classes.
sparkClassLoader = programRuntimeProviderLoader.get(ProgramType.SPARK)
.createProgramClassLoader(cConf, ProgramType.SPARK);
parentClassLoader =
programRuntimeProviderLoader
.get(ProgramType.SPARK)
.getRuntimeClassLoader(ProgramType.SPARK, cConf);
} catch (Exception e) {
// If Spark is not supported, exception is expected. We'll use the default filter.
LOG.warn("Spark is not supported. Not using ProgramClassLoader from Spark");
LOG.trace("Failed to create spark program runner with error:", e);
}
}

ProgramClassLoader programClassLoader = null;
if (sparkClassLoader != null) {
programClassLoader = new ProgramClassLoader(cConf, unpackDir, sparkClassLoader);
} else {
programClassLoader = new ProgramClassLoader(cConf, unpackDir,
FilterClassLoader.create(getClass().getClassLoader()));
if (parentClassLoader == null) {
parentClassLoader = FilterClassLoader.create(getClass().getClassLoader());
}

final ClassLoader finalProgramClassLoader = programClassLoader;
return new CloseableClassLoader(programClassLoader, () -> {
if (finalProgramClassLoader instanceof Closeable) {
Closeables.closeQuietly((Closeable) finalProgramClassLoader);
}
});
ProgramClassLoader programClassLoader =
new ProgramClassLoader(cConf, unpackDir, parentClassLoader);
return new CloseableClassLoader(
programClassLoader, () -> Closeables.closeQuietly(programClassLoader));
}

/**
Expand All @@ -117,18 +111,22 @@ CloseableClassLoader createClassLoader(File unpackDir) {
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
* @see #createClassLoader(File)
*/
CloseableClassLoader createClassLoader(Location artifactLocation,
EntityImpersonator entityImpersonator) {
CloseableClassLoader createClassLoader(
Location artifactLocation, EntityImpersonator entityImpersonator) {
try {
ClassLoaderFolder classLoaderFolder = entityImpersonator.impersonate(
() -> BundleJarUtil.prepareClassLoaderFolder(artifactLocation,
() -> DirUtils.createTempDir(tmpDir)));
ClassLoaderFolder classLoaderFolder =
entityImpersonator.impersonate(
() ->
BundleJarUtil.prepareClassLoaderFolder(
artifactLocation, () -> DirUtils.createTempDir(tmpDir)));

CloseableClassLoader classLoader = createClassLoader(classLoaderFolder.getDir());
return new CloseableClassLoader(classLoader, () -> {
Closeables.closeQuietly(classLoader);
Closeables.closeQuietly(classLoaderFolder);
});
return new CloseableClassLoader(
classLoader,
() -> {
Closeables.closeQuietly(classLoader);
Closeables.closeQuietly(classLoaderFolder);
});
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand All @@ -143,8 +141,8 @@ CloseableClassLoader createClassLoader(Location artifactLocation,
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
* @see #createClassLoader(File)
*/
CloseableClassLoader createClassLoader(Iterator<Location> artifactLocations,
EntityImpersonator entityImpersonator) {
CloseableClassLoader createClassLoader(
Iterator<Location> artifactLocations, EntityImpersonator entityImpersonator) {
if (!artifactLocations.hasNext()) {
throw new IllegalArgumentException("Cannot create a classloader without an artifact.");
}
Expand All @@ -155,17 +153,20 @@ CloseableClassLoader createClassLoader(Iterator<Location> artifactLocations,
}

try {
ClassLoaderFolder classLoaderFolder = entityImpersonator.impersonate(
() -> BundleJarUtil.prepareClassLoaderFolder(artifactLocation,
() -> DirUtils.createTempDir(tmpDir)));

CloseableClassLoader parentClassLoader = createClassLoader(artifactLocations,
entityImpersonator);
return new CloseableClassLoader(new DirectoryClassLoader(classLoaderFolder.getDir(),
parentClassLoader, "lib"), () -> {
Closeables.closeQuietly(parentClassLoader);
Closeables.closeQuietly(classLoaderFolder);
});
ClassLoaderFolder classLoaderFolder =
entityImpersonator.impersonate(
() ->
BundleJarUtil.prepareClassLoaderFolder(
artifactLocation, () -> DirUtils.createTempDir(tmpDir)));

CloseableClassLoader parentClassLoader =
createClassLoader(artifactLocations, entityImpersonator);
return new CloseableClassLoader(
new DirectoryClassLoader(classLoaderFolder.getDir(), parentClassLoader, "lib"),
() -> {
Closeables.closeQuietly(parentClassLoader);
Closeables.closeQuietly(classLoaderFolder);
});
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.utils.Instances;

/**
* Default implementation of {@link MasterEnvironmentRunnableContext}.
*/
/** Default implementation of {@link MasterEnvironmentRunnableContext}. */
public class DefaultMasterEnvironmentRunnableContext implements MasterEnvironmentRunnableContext {

private final LocationFactory locationFactory;
Expand All @@ -46,13 +44,14 @@ public class DefaultMasterEnvironmentRunnableContext implements MasterEnvironmen

private ClassLoader extensionCombinedClassLoader;

public DefaultMasterEnvironmentRunnableContext(LocationFactory locationFactory,
public DefaultMasterEnvironmentRunnableContext(

Check warning on line 47 in cdap-app-fabric/src/main/java/io/cdap/cdap/master/environment/DefaultMasterEnvironmentRunnableContext.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck

Missing a Javadoc comment.
LocationFactory locationFactory,
RemoteClientFactory remoteClientFactory,
CConfiguration cConf) {
this.locationFactory = locationFactory;
this.remoteClient = remoteClientFactory.createRemoteClient(
Constants.Service.APP_FABRIC_HTTP,
new DefaultHttpRequestConfig(false), "");
this.remoteClient =
remoteClientFactory.createRemoteClient(
Constants.Service.APP_FABRIC_HTTP, new DefaultHttpRequestConfig(false), "");
this.cConf = cConf;
this.programRuntimeProviderLoader = new ProgramRuntimeProviderLoader(cConf);
this.extensionCombinedClassLoader = null;
Expand All @@ -63,9 +62,7 @@ public LocationFactory getLocationFactory() {
return locationFactory;
}

/**
* Opens a {@link HttpURLConnection} for the given resource path.
*/
/** Opens a {@link HttpURLConnection} for the given resource path. */
@Override
public HttpURLConnection openHttpURLConnection(String resource) throws IOException {
return remoteClient.openConnection(resource);
Expand All @@ -79,20 +76,22 @@ public TwillRunnable instantiateTwillRunnable(String className) {
} catch (ClassNotFoundException e) {
// Try loading the class from the runtime extensions.
if (extensionCombinedClassLoader == null) {
Map<ProgramType, ProgramRuntimeProvider> classLoaderProviderMap = programRuntimeProviderLoader.getAll();
extensionCombinedClassLoader = new CombineClassLoader(getClass().getClassLoader(),
classLoaderProviderMap.entrySet().stream()
.map(entry -> entry.getValue()
.createProgramClassLoader(cConf, entry.getKey()))
.collect(Collectors.toList())
.toArray(new ClassLoader[0]));
Map<ProgramType, ProgramRuntimeProvider> classLoaderProviderMap =
programRuntimeProviderLoader.getAll();
extensionCombinedClassLoader =
new CombineClassLoader(
getClass().getClassLoader(),
classLoaderProviderMap.entrySet().stream()
.map(entry -> entry.getValue().getRuntimeClassLoader(entry.getKey(), cConf))
.collect(Collectors.toList()));
}
try {
runnableClass = extensionCombinedClassLoader.loadClass(className);
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException(
String.format("Failed to load twill runnable class from runtime extension '%s'",
className), cnfe);
String.format(
"Failed to load twill runnable class from runtime extension '%s'", className),
cnfe);
}
}
if (!TwillRunnable.class.isAssignableFrom(runnableClass)) {
Expand Down
Loading

0 comments on commit 9ddb843

Please sign in to comment.