Skip to content

Commit

Permalink
TEZ-4470: typo fixes (#264) (Michal Lorek reviewed by Laszlo Bodor)
Browse files Browse the repository at this point in the history
  • Loading branch information
mlorek authored Jun 12, 2023
1 parent 6722ef3 commit 987ec54
Show file tree
Hide file tree
Showing 65 changed files with 630 additions and 631 deletions.
8 changes: 4 additions & 4 deletions docs/src/site/markdown/tez_acls.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@

Access control in Tez can be categorized as follows:

- Modify permissions on the Tez AM ( or Session ). Users with this permision can:
- Modify permissions on the Tez AM ( or Session ). Users with this permission can:
- Submit a DAG to a Tez Session
- Kill any DAG within the given AM/Session
- Kill the Session
- View permissions on the Tez AM ( or Session ). Users with this permision can:
- View permissions on the Tez AM ( or Session ). Users with this permission can:
- Monitor/View the status of the Session
- Monitor/View the progress/status of any DAG within the given AM/Session
- Modify permissions on a particular Tez DAG. Users with this permision can:
- Modify permissions on a particular Tez DAG. Users with this permission can:
- Kill the DAG
- View permissions on a particular Tez DAG. Users with this permision can:
- View permissions on a particular Tez DAG. Users with this permission can:
- Monitor/View the progress/status of the DAG

From the above, you can see that All users/groups that have access to do operations on the AM also have access to similar operations on all DAGs within that AM/session. Also, by default, the owner of the Tez AM, i.e. the user who started the Tez AM, is considered a super-user and has access to all operations on the AM as well as all DAGs within the AM/Session.
Expand Down
68 changes: 34 additions & 34 deletions tez-api/src/main/java/org/apache/tez/client/TezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class TezClient {
private static final String appIdStrPrefix = "application";
private static final String APPLICATION_ID_PREFIX = appIdStrPrefix + '_';
private static final long PREWARM_WAIT_MS = 500;

@VisibleForTesting
static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found.";

Expand Down Expand Up @@ -303,7 +303,7 @@ public static TezClient create(String name, TezConfiguration tezConf, boolean is
}

/**
* Add local files for the DAG App Master. These may be files, archives,
* Add local files for the DAG App Master. These may be files, archives,
* jars etc.<br>
* <p>
* In non-session mode these will be added to the files of the App Master
Expand All @@ -321,7 +321,7 @@ public static TezClient create(String name, TezConfiguration tezConf, boolean is
* accumulate across DAG submissions and are never removed from the classpath.
* Only LocalResourceType.FILE is supported. All files will be treated as
* private.
*
*
* @param localFiles the files to be made available in the AM
*/
public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) {
Expand All @@ -331,7 +331,7 @@ public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> local
}
amConfig.addAMLocalResources(localFiles);
}

/**
* If the next DAG App Master needs different local files, then use this
* method to clear the local files and then add the new local files
Expand All @@ -341,7 +341,7 @@ public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> local
public synchronized void clearAppMasterLocalFiles() {
amConfig.clearAMLocalResources();
}

/**
* Set security credentials to be used inside the app master, if needed. Tez App
* Master needs credentials to access the staging directory and for most HDFS
Expand All @@ -351,7 +351,7 @@ public synchronized void clearAppMasterLocalFiles() {
* credentials must be supplied by the user. These will be used by the App
* Master for the next DAG. <br>In session mode, credentials, if needed, must be
* set before calling start()
*
*
* @param credentials credentials
*/
public synchronized void setAppMasterCredentials(Credentials credentials) {
Expand Down Expand Up @@ -387,16 +387,16 @@ public synchronized void start() throws TezException, IOException {
LOG.info("Session mode. Starting session.");
TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
amConfig.getTezConfiguration());

clientTimeout = amConfig.getTezConfiguration().getInt(
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);

try {
if (sessionAppId == null) {
sessionAppId = createApplication();
}

ApplicationSubmissionContext appContext = setupApplicationContext();
frameworkClient.submitApplication(appContext);
ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId);
Expand Down Expand Up @@ -597,15 +597,15 @@ public DAGClientAMProtocolBlockingPB sendAMHeartbeat(DAGClientAMProtocolBlocking
* cluster.<br>In session mode, it submits the DAG to the session App Master. It
* blocks until either the DAG is submitted to the session or configured
* timeout period expires. Cleans up session if the submission timed out.
*
*
* @param dag
* DAG to be submitted to Session
* @return DAGClient to monitor the DAG
* @throws TezException
* @throws IOException
* @throws DAGSubmissionTimedOut
* if submission timed out
*/
*/
public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException {
DAGClient result = isSession ? submitDAGSession(dag) : submitDAGApplication(dag);
if (result != null) {
Expand Down Expand Up @@ -642,12 +642,12 @@ private void closePrewarmDagClient() {
}
prewarmDagClient = null;
}

private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
Preconditions.checkState(isSession == true,
"submitDAG with additional resources applies to only session mode. " +
Preconditions.checkState(isSession,
"submitDAG with additional resources applies to only session mode. " +
"In non-session mode please specify all resources in the initial configuration");

verifySessionStateForSubmission();

String callerContextStr = "";
Expand All @@ -659,7 +659,7 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
+ ", applicationId=" + sessionAppId
+ ", dagName=" + dag.getName()
+ callerContextStr);

if (!additionalLocalResources.isEmpty()) {
for (LocalResource lr : additionalLocalResources.values()) {
Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: "
Expand All @@ -677,7 +677,7 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
requestBuilder.setAdditionalAmResources(DagTypeConverters
.convertFromLocalResources(additionalLocalResources));
}

additionalLocalResources.clear();

// if request size exceeds maxSubmitDAGRequestSizeThroughIPC, we serialize them to HDFS
Expand Down Expand Up @@ -804,7 +804,7 @@ private boolean isJobInTerminalState(YarnApplicationState yarnApplicationState)
public String getClientName() {
return clientName;
}

@Private
@VisibleForTesting
public synchronized ApplicationId getAppMasterApplicationId() {
Expand All @@ -817,16 +817,16 @@ public synchronized ApplicationId getAppMasterApplicationId() {

/**
* Get the status of the App Master executing the DAG
* In non-session mode it returns the status of the last submitted DAG App Master
* In non-session mode it returns the status of the last submitted DAG App Master
* In session mode, it returns the status of the App Master hosting the session
*
*
* @return State of the session
* @throws TezException
* @throws IOException
*/
public synchronized TezAppMasterStatus getAppMasterStatus() throws TezException, IOException {
// Supporting per-DAG app master case since user may choose to run the same
// code in that mode and the code should continue to work. Its easy to provide
// Supporting per-DAG app master case since user may choose to run the same
// code in that mode and the code should continue to work. Its easy to provide
// the correct view for per-DAG app master too.
ApplicationId appId = null;
if (isSession) {
Expand Down Expand Up @@ -869,21 +869,21 @@ public synchronized TezAppMasterStatus getAppMasterStatus() throws TezException,
}
return TezAppMasterStatus.INITIALIZING;
}

/**
* API to help pre-allocate containers in session mode. In non-session mode
* this is ignored. The pre-allocated containers may be re-used by subsequent
* job DAGs to improve performance.
* this is ignored. The pre-allocated containers may be re-used by subsequent
* job DAGs to improve performance.
* The preWarm vertex should be configured and setup exactly
* like the other vertices in the job DAGs so that the pre-allocated containers
* like the other vertices in the job DAGs so that the pre-allocated containers
* may be re-used by the subsequent DAGs to improve performance.
* The processor for the preWarmVertex may be used to pre-warm the containers
* by pre-loading classes etc. It should be short-running so that pre-warming
* by pre-loading classes etc. It should be short-running so that pre-warming
* does not block real execution. Users can specify their custom processors or
* use the PreWarmProcessor from the runtime library.
* The parallelism of the preWarmVertex will determine the number of preWarmed
* containers.
* Pre-warming is best efforts and among other factors is limited by the free
* Pre-warming is best efforts and among other factors is limited by the free
* resources on the cluster.
* @param preWarmVertex
* @throws TezException
Expand Down Expand Up @@ -929,7 +929,7 @@ public synchronized void preWarm(PreWarmVertex preWarmVertex,
}

verifySessionStateForSubmission();

DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX + "_"
+ preWarmDAGCounter++);
dag.addVertex(preWarmVertex);
Expand All @@ -948,15 +948,15 @@ public synchronized void preWarm(PreWarmVertex preWarmVertex,
}
}


/**
* Wait till the DAG is ready to be submitted.
* In non-session mode this is a no-op since the application can be immediately
* submitted.
* In session mode, this waits for the session host to be ready to accept a DAG
* @throws IOException
* @throws TezException
* @throws InterruptedException
* @throws InterruptedException
*/
@Evolving
public synchronized void waitTillReady() throws IOException, TezException, InterruptedException {
Expand Down Expand Up @@ -1042,7 +1042,7 @@ private void verifySessionStateForSubmission() throws SessionNotRunning {
throw new SessionNotRunning("Session stopped by user");
}
}

private DAGClient submitDAGApplication(DAG dag)
throws TezException, IOException {
ApplicationId appId = createApplication();
Expand All @@ -1055,7 +1055,7 @@ DAGClient submitDAGApplication(ApplicationId appId, DAG dag)
LOG.info("Submitting DAG application with id: " + appId);
try {
// Use the AMCredentials object in client mode, since this won't be re-used.
// Ensures we don't fetch credentially unnecessarily if the user has already provided them.
// Ensures we don't fetch credentials unnecessarily if the user has already provided them.
Credentials credentials = amConfig.getCredentials();
if (credentials == null) {
credentials = new Credentials();
Expand Down Expand Up @@ -1259,7 +1259,7 @@ public TezClient build() {
}
}

//Copied this helper method from
//Copied this helper method from
//org.apache.hadoop.yarn.api.records.ApplicationId in Hadoop 2.8+
//to simplify implementation on 2.7.x
@Public
Expand Down
28 changes: 14 additions & 14 deletions tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private static FileStatus[] getLRFileStatus(String fileName, Configuration conf)
FileSystem fs = p.getFileSystem(conf);
p = fs.resolvePath(p.makeQualified(fs.getUri(),
fs.getWorkingDirectory()));
FileSystem targetFS = p.getFileSystem(conf);
FileSystem targetFS = p.getFileSystem(conf);
if (targetFS.isDirectory(p)) {
return targetFS.listStatus(p);
} else {
Expand All @@ -150,7 +150,7 @@ private static FileStatus[] getLRFileStatus(String fileName, Configuration conf)

/**
* Setup LocalResource map for Tez jars based on provided Configuration
*
*
* @param conf
* Configuration to use to access Tez jars' locations
* @param credentials
Expand All @@ -167,7 +167,7 @@ static boolean setupTezJarsLocalResources(TezConfiguration conf,
boolean usingTezArchive = false;

if (conf.getBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, false)){
LOG.info("Ignoring '" + TezConfiguration.TEZ_LIB_URIS + "' since '" +
LOG.info("Ignoring '" + TezConfiguration.TEZ_LIB_URIS + "' since '" +
TezConfiguration.TEZ_IGNORE_LIB_URIS + "' is set to true");
} else {
// Add tez jars to local resource
Expand Down Expand Up @@ -357,7 +357,7 @@ public static FileSystem ensureStagingDirExists(Configuration conf,
}
return fs;
}

/**
* Populate {@link Credentials} for the URI's to access them from their {@link FileSystem}s
* @param uris URIs that need to be accessed
Expand Down Expand Up @@ -385,7 +385,7 @@ public Path apply(URI input) {
* Obtains tokens for the DAG based on the list of URIs setup in the DAG. The
* fetched credentials are populated back into the DAG and can be retrieved
* via dag.getCredentials
*
*
* @param dag
* the dag for which credentials need to be setup
* @param sessionCredentials
Expand All @@ -403,9 +403,9 @@ static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials,
Credentials dagCredentials = new Credentials();
// All session creds are required for the DAG.
dagCredentials.mergeAll(sessionCredentials);

// Add additional credentials based on any URIs that the user may have specified.

// Obtain Credentials for any paths that the user may have configured.
addFileSystemCredentialsFromURIs(dag.getURIsForCredentials(), dagCredentials, conf);

Expand All @@ -425,7 +425,7 @@ static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials,
addFileSystemCredentialsFromURIs(dataSink.getURIsForCredentials(), dagCredentials, conf);
}
}

for (LocalResource lr: dag.getTaskLocalFiles().values()) {
lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
}
Expand All @@ -436,7 +436,7 @@ static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials,
} catch (URISyntaxException e) {
throw new IOException(e);
}

return dagCredentials;
}

Expand Down Expand Up @@ -597,7 +597,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
if (amLocalResources != null && !amLocalResources.isEmpty()) {
amResourceProto = DagTypeConverters.convertFromLocalResources(amLocalResources);
} else {
amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance();
amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance();
}
amResourceProto.writeDelimitedTo(sessionJarsPBOutStream);
} finally {
Expand All @@ -619,7 +619,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
Map<ApplicationAccessType, String> acls = aclManager.toYARNACls();

if(dag != null) {

DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive,
sessionCreds, servicePluginsDescriptor, javaOptsChecker);

Expand Down Expand Up @@ -749,7 +749,7 @@ private static void populateTokenCache(TezConfiguration conf, Credentials creden
TokenCache.obtainTokensForFileSystems(credentials, ps, conf);
}
}

static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor,
Expand All @@ -760,7 +760,7 @@ static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
amConfig.getBinaryConfLR(), tezLrsAsArchive, servicePluginsDescriptor, javaOptsChecker);
}

static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
Objects.requireNonNull(vargs);
TezClientUtils.addLog4jSystemProperties(logLevel, vargs);
Expand Down Expand Up @@ -1119,7 +1119,7 @@ public static void setApplicationPriority(ApplicationSubmissionContext context,
int priority = amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_APPLICATION_PRIORITY, 0);
context.setPriority(Priority.newInstance(priority));
if (LOG.isDebugEnabled()) {
LOG.debug("Settting TEZ application priority, applicationId= " + context.getApplicationId() +
LOG.debug("Setting TEZ application priority, applicationId= " + context.getApplicationId() +
", priority= " + context.getPriority().getPriority());
}
}
Expand Down
Loading

0 comments on commit 987ec54

Please sign in to comment.