This project provides a Swift wrapper of YARN Resource Manager REST API:
YARNResourceManager()
: access to cluster information of YARN, including cluster and its metrics, scheduler, application submit, etc.
To connect to your Hadoop YARN Resource Manager by Perfect, initialize a YARNResourceManager()
object with sufficient parameters:
// this connection could possibly do some basic operations
let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088)
or connect to Hadoop YARN Node Manager with a valid user name:
// add user name if need
let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088, user: "your user name")
If using Kerberos to authenticate, please try codes below:
// set auth to kerberos
let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088, user: "username", auth: .krb5)
Item | Data Type | Description |
---|---|---|
service | String | the service protocol of web request - http / https |
host | String | the hostname or ip address of the Hadoop YARN Resource Manager |
port | Int | the port of yarn host, default is 8088 |
auth | Authorization | .off or .krb5. Default value is .off |
proxyUser | String | proxy user, if applicable |
apibase | String | use this parameter ONLY the target server has a different api routine other than /ws/v1/cluster |
timeout | Int | timeout in seconds, zero means never timeout during transfer |
Call checkClusterInfo()
to get the general information of a YARN Resource Manager in form of a ClusterInfo
structure:
guard let i = try yarn.checkClusterInfo() else {
print("unable to check cluster info")
return
}//end guard
print(i.startedOn)
print(i.state)
print(i.hadoopVersion)
print(i.resourceManagerVersion)
Once called, checkClusterInfo()
would return a ClusterInfo
object, as described below:
Item | Data Type | Description |
---|---|---|
id | Int | The cluster id |
startedOn | Int | The time the cluster started (in ms since epoch) |
state | String | The ResourceManager state - valid values are: NOTINITED, INITED, STARTED, STOPPED |
haState | String | The ResourceManager HA state - valid values are: INITIALIZING, ACTIVE, STANDBY, STOPPED |
resourceManagerVersion | String | Version of the ResourceManager |
resourceManagerBuildVersion | String | ResourceManager build string with build version, user, and checksum |
resourceManagerVersionBuiltOn | String | Timestamp when ResourceManager was built (in ms since epoch) |
hadoopVersion | String | Version of hadoop common |
hadoopBuildVersion | String | Hadoop common build string with build version, user, and checksum |
hadoopVersionBuiltOn | String | Timestamp when hadoop common was built(in ms since epoch) |
Method checkClusterMetrics()
returns a detailed info structure of cluster.
guard let m = try yarn.checkClusterMetrics() else {
// something wrong
}
print(m.availableMB)
print(m.availableVirtualCores)
print(m.allocatedVirtualCores)
print(m.totalMB)
Once called, checkClusterMetrics()
would return a ClusterMetrics object as listed below:
Item | Data Type | Description |
---|---|---|
appsSubmitted | Int | The number of applications submitted |
appsCompleted | Int | The number of applications completed |
appsPending | Int | The number of applications pending |
appsRunning | Int | The number of applications running |
appsFailed | Int | The number of applications failed |
appsKilled | Int | The number of applications killed |
reservedMB | Int | The amount of memory reserved in MB |
availableMB | Int | The amount of memory available in MB |
allocatedMB | Int | The amount of memory allocated in MB |
totalMB | Int | The amount of total memory in MB |
reservedVirtualCores | Int | The number of reserved virtual cores |
availableVirtualCores | Int | The number of available virtual cores |
allocatedVirtualCores | Int | The number of allocated virtual cores |
totalVirtualCores | Int | The total number of virtual cores |
containersAllocated | Int | The number of containers allocated |
containersReserved | Int | The number of containers reserved |
containersPending | Int | The number of containers pending |
totalNodes | Int | The total number of nodes |
activeNodes | Int | The number of active nodes |
lostNodes | Int | The number of lost nodes |
unhealthyNodes | Int | The number of unhealthy nodes |
decommissionedNodes | Int | The number of nodes decommissioned |
rebootedNodes | Int | The number of nodes rebooted |
Method checkSchedulerInfo()
returns a detailed info structure of scheduler.
guard let sch = try yarn.checkSchedulerInfo() else {
// something wrong, must return
}
print(sch.capacity)
print(sch.maxCapacity)
print(sch.queueName)
print(sch.queues.count)
Once done, checkSchedulerInfo()
would return a SchedulerInfo
structure as listed below:
Item | Data Type | Description |
---|---|---|
availNodeCapacity | Int | The available node capacity |
capacity | Double | Configured queue capacity in percentage relative to its parent queue |
maxCapacity | Double | Max capacity of the queue |
maxQueueMemoryCapacity | Int | Configured maximum queue capacity in percentage relative to its parent queue |
minQueueMemoryCapacity | Int | Minimum queue memory capacity |
numContainers | Int | The number of containers |
numNodes | Int | The total number of nodes |
qstate | QState | State of the queue - valid values are: STOPPED, RUNNING |
queueName | String | Name of the queue |
queues | [Queue] | A collection of queue resources |
rootQueue | FairQueue | A collection of root queue resources |
totalNodeCapacity | Int | The total node capacity |
type | String | Scheduler type - capacityScheduler |
usedCapacity | Double | Used queue capacity in percentage |
usedNodeCapacity | Int | The used node capacity |
Item | Data Type | Description |
---|---|---|
maxApps | Int | The maximum number of applications the queue can have |
minResources | ResourcesUsed | The configured minimum resources that are guaranteed to the queue |
maxResources | ResourcesUsed | The configured maximum resources that are allowed to the queue |
usedResources | ResourcesUsed | The sum of resources allocated to containers within the queue |
fairResources | ResourcesUsed | The queue’s fair share of resources |
clusterResources | ResourcesUsed | The capacity of the cluster |
queueName | String | The name of the queue |
schedulingPolicy | String | The name of the scheduling policy used by the queue |
childQueues | FairQueue | A collection of sub-queue information. Omitted if the queue has no childQueues. |
type | String | type of the queue - fairSchedulerLeafQueueInfo |
numActiveApps | Int | The number of active applications in this queue |
numPendingApps | Int | The number of pending applications in this queue |
Item | Data Type | Description |
---|---|---|
absoluteCapacity | Double | Absolute capacity percentage this queue can use of entire cluster |
absoluteMaxCapacity | Double | Absolute maximum capacity percentage this queue can use of the entire cluster |
absoluteUsedCapacity | Double | Absolute used capacity percentage this queue is using of the entire cluster |
capacity | Double | Configured queue capacity in percentage relative to its parent queue |
maxActiveApplications | Int | The maximum number of active applications this queue can have |
maxActiveApplicationsPerUser | Int | The maximum number of active applications per user this queue can have |
maxApplications | Int | The maximum number of applications this queue can have |
maxApplicationsPerUser | Int | The maximum number of applications per user this queue can have |
maxCapacity | Double | Configured maximum queue capacity in percentage relative to its parent queue |
numActiveApplications | Int | The number of active applications in this queue |
numApplications | Int | The number of applications currently in the queue |
numContainers | Int | The number of containers being used |
numPendingApplications | Int | The number of pending applications in this queue |
queueName | String | The name of the queue |
queues | [Queue] | A collection of sub-queue information. Omitted if the queue has no sub-queues. |
resourcesUsed | ResourcesUsed | The total amount of resources used by this queue |
state | String | The state of the queue |
type | String | type of the queue - capacitySchedulerLeafQueueInfo |
usedCapacity | Double | Used queue capacity in percentage |
usedResources | String | A string describing the current resources used by the queue |
userLimit | Int | The minimum user limit percent set in the configuration |
userLimitFactor | Double | The user limit factor set in the configuration |
users | [User] | A collection of user objects containing resources used, see below: |
Item | Data Type | Description |
---|---|---|
username | String | The username of the user using the resources |
resourcesUsed | ResourcesUsed | The amount of resources used by the user in this queue, see definition below |
numActiveApplications | Int | The number of active applications for this user in this queue |
numPendingApplications | Int | The number of pending applications for this user in this queue |
Item | Data Type | Description |
---|---|---|
memory | int | Memory required for each container |
vCores | int | Virtual cores required for each container |
Method checkClusterNodes()
returns an array of node info of cluster.
let nodes = try yarn.checkClusterNodes()
nodes.forEach { node in
print(node.rack)
print(node.availableVirtualCores)
print(node.availMemoryMB)
print(node.healthReport)
print(node.healthStatus)
print(node.id)
print(node.lastHealthUpdate)
print(node.nodeHostName)
print(node.nodeHTTPAddress)
Once done, the checkClusterNodes()
would return an array of Node
object, as described below:
Item | Data Type | Description |
---|---|---|
rack | String | The rack location of this node |
state | String | State of the node - valid values are: NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST, REBOOTED |
id | String | The node id |
nodeHostName | String | The host name of the node |
nodeHTTPAddress | String | The nodes HTTP address |
healthStatus | String | The health status of the node - Healthy or Unhealthy |
healthReport | String | A detailed health report |
lastHealthUpdate | Int | The last time the node reported its health (in ms since epoch) |
usedMemoryMB | Int | The total amount of memory currently used on the node (in MB) |
availMemoryMB | Int | The total amount of memory currently available on the node (in MB) |
usedVirtualCores | Int | The total number of vCores currently used on the node |
availableVirtualCores | Int | The total number of vCores available on the node |
numContainers | int | The total number of containers currently running on the node |
Method checkClusterNode()
returns a detailed info structure of a node.
guard let n = try yarn.checkClusterNode(id: "host.domain.com:8041") else {
// something wrong, must return
}
Method checkApps()
returns an array of APP structure.
let apps = try yarn.checkApps()
// or alternatively, you can filter out those APPs you want by setting query parameters:
/// let apps = try yarn.checkApps(states: [APP.State.FINISHED, APP.State.RUNNING], finalStatus: APP.FinalStatus.SUCCEEDED)
apps.forEach{ a in
print(a.allocatedMB)
print(a.allocatedVCores)
print(a.amContainerLogs)
print(a.amHostHttpAddress)
print(a.amNodeLabelExpression)
print(a.amRPCAddress)
print(a.applicationPriority)
print(a.applicationTags)
}
Once done, the checkApps()
would return an array of APP objects, as described below:
Item | Data Type | Description |
---|---|---|
id | String | The application id |
user | String | The user who started the application |
name | String | The application name |
applicationType | String | The application type |
queue | String | The queue the application was submitted to |
state | String | The application state according to the ResourceManager - valid values are members of the YarnApplicationState enum: NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED |
finalStatus | String | The final status of the application if finished - reported by the application itself - valid values are: UNDEFINED, SUCCEEDED, FAILED, KILLED |
progress | Double | The progress of the application as a percent |
trackingUI | String | Where the tracking url is currently pointing - History (for history server) or ApplicationMaster |
trackingUrl | String | The web URL that can be used to track the application |
diagnostics | String | Detailed diagnostics information |
clusterId | Int | The cluster id |
startedTime | Int | The time in which application started (in ms since epoch) |
finishedTime | Int | The time in which the application finished (in ms since epoch) |
elapsedTime | Int | The elapsed time since the application started (in ms) |
amContainerLogs | String | The URL of the application master container logs |
amHostHttpAddress | String | The nodes http address of the application master |
amRPCAddress | String | The RPC address of the application master |
allocatedMB | Int | The sum of memory in MB allocated to the application’s running containers |
allocatedVCores | Int | The sum of virtual cores allocated to the application’s running containers |
runningContainers | Int | The number of containers currently running for the application |
memorySeconds | Int | The amount of memory the application has allocated (megabyte-seconds) |
vcoreSeconds | Int | The amount of CPU resources the application has allocated (virtual core-seconds) |
unmanagedApplication | Bool | Is the application unmanaged. |
applicationPriority | Int | priority of the submitted application |
appNodeLabelExpression | String | Node Label expression which is used to identify the nodes on which application’s containers are expected to run by default. |
amNodeLabelExpression | String | Node Label expression which is used to identify the node on which application’s AM container is expected to run. |
Method checkApp()
returns a specific APP Object.
let a = try yarn.checkApp(id: "application_1484231633049_0025")
print(a.allocatedMB)
print(a.allocatedVCores)
print(a.amContainerLogs)
print(a.amHostHttpAddress)
print(a.amNodeLabelExpression)
print(a.amRPCAddress)
print(a.applicationPriority)
print(a.applicationTags)
The return value is an APP structure, please check the above APP object for detail.
Method newApplication()
returns a new application handler.
guard let a = try yarn.newApplication() else {
// cannot create a new application, must return
}
Once completed, the newApplication()
method will return a NewApplication object, as described below:
Item | Data Type | Description |
---|---|---|
id | String | The newly created application id |
maximumResourceCapability | ResourcesUsed | The maximum resource capabilities available on this cluster |
The NewApplication
object will contain a special object called ResourceUsed
, as described below:
Item | Data Type | Description |
---|---|---|
memory | Int | The maximum memory available for a container |
vCores | Int | The maximum number of cores available for a container |
Method submit()
can submit modifications to a specific application.
// create an empty application to fill in the blanks
let sum = SubmitApplication()
// *MUST* set the application id
sum.id = "application_1484231633049_0025"
// set the application name
sum.name = "test"
// allocate a blank local resource
let local = LocalResource(resource: "hdfs://localhost:9000/user/rockywei/DistributedShell/demo-app/AppMaster.jar", type: .FILE, visibility: .APPLICATION, size: 43004, timestamp: 1405452071209)
// assign the resource into an array
let localResources = Entries([Entry(key:"AppMaster.jar", value: local)])
// *MUST* fill in the field of map reduce command
let commands = Commands("/hdp/bin/hadoop jar /hdp/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha1.jar grep input output 'dfs[a-z.]+'")
// setup environment as need
let environments = Entries([Entry(key:"DISTRIBUTEDSHELLSCRIPTTIMESTAMP", value: "1405459400754"), Entry(key:"CLASSPATH", value:"{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"), Entry(key:"DISTRIBUTEDSHELLSCRIPTLEN", value:6), Entry(key:"DISTRIBUTEDSHELLSCRIPTLOCATION", value: "hdfs://localhost:9000/user/rockywei/demo-app/shellCommands")])
// specify the container info
sum.amContainerSpec = AmContainerSpec(localResources: localResources, environment: environments, commands: commands)
// set other information as need
sum.unmanagedAM = false
sum.maxAppAttempts = 2
sum.resource = ResourceRequest(memory: 1024, vCores: 1)
sum.type = "MapReduce"
sum.keepContainersAcrossApplicationAttempts = false
// set the log context
sum.logAggregationContext = LogAggregationContext(logIncludePattern: "file1", logExcludePattern: "file2", rolledLogIncludePattern: "file3", rolledLogExcludePattern: "file4", logAggregationPolicyClassName: "org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy", logAggregationPolicyParameters: "")
// set the failures validity interval
sum.attemptFailuresValidityInterval = 3600000
// set the reservationId, if possible
sum.reservationId = "reservation_1454114874_1"
sum.amBlackListingRequests = AmBlackListingRequests(amBlackListingEnabled: true, disableFailureThreshold: 0.01)
try yarn.submit(application: sum)
SubmitApplication
class contains quite a few sub types, as described below:
Item | Data Type | Description |
---|---|---|
id | String | The application id |
name | String | The application name |
queue | String | The name of the queue to which the application should be submitted |
priority | Int | The priority of the application |
amContainerSpec | AmContainerSpec | The application master container launch context, described below |
unmanagedAM | Bool | Is the application using an unmanaged application master |
maxAppAttempts | int | The max number of attempts for this application |
resource | ResourceRequest | The resources the application master requires, described below |
type | String | The application type(MapReduce, Pig, Hive, etc) |
keepContainersAcrossApplicationAttempts | Bool | Should YARN keep the containers used by this application instead of destroying them |
tags | [String] | List of application tags, please see the request examples on how to specify the tags |
logAggregationContext | LogAggregationContext | Represents all of the information needed by the NodeManager to handle the logs for this application |
attemptFailuresValidityInterval | Int | The failure number will no take attempt failures which happen out of the validityInterval into failure count |
reservationId | string | Represent the unique id of the corresponding reserved resource allocation in the scheduler |
amBlackListingRequests | AmBlackListingRequests | Contains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold” |
Item | Data Type | Description |
---|---|---|
localResources | Entries | Object describing the resources that need to be localized, described below |
environment | Entries | Environment variables for your containers, specified as key value pairs |
commands | Commands | The commands for launching your container, in the order in which they should be executed |
serviceData | Entries | Application specific service data; key is the name of the auxiliary service, value is base-64 encoding of the data you wish to pass |
credentials | Credentials | The credentials required for your application to run, described below |
applicationAcls | Entries | ACLs for your application; the key can be “VIEW_APP” or “MODIFY_APP”, the value is the list of users with the permissions |
Item | Data Type | Description |
---|---|---|
resource | String | Location of the resource to be localized |
type | ResourceType | Type of the resource; options are “ARCHIVE”, “FILE”, and “PATTERN” |
visibility | Visibility | Visibility the resource to be localized; options are “PUBLIC”, “PRIVATE”, and “APPLICATION” |
size | Int | Size of the resource to be localized |
timestamp | Int | Timestamp of the resource to be localized |
Currently Commands object only one string field named command
.
Item | Data Type | Description |
---|---|---|
tokens | [String:String] | Tokens that you wish to pass to your application, specified as key-value pairs. The key is an identifier for the token and the value is the token(which should be obtained using the respective web-services) |
secrets | [String:String] | Secrets that you wish to use in your application, specified as key-value pairs. They key is an identifier and the value is the base-64 encoding of the secret |
Item | Data Type | Description |
---|---|---|
memory | int | Memory required for each container |
vCores | int | Virtual cores required for each container |
The Entries
Object has only one field of element: entry
, which is an array of Entry
object or EncryptedEntry
object. Each Entry
has two elements: a string key
and a Any
type value
.
The difference between Entry
and EncryptedEntry
is that the value of EncryptedEntry
will be encoded in base64 format before submission, providing the type of value is either String
or [UInt8]
.
Item | Data Type | Description |
---|---|---|
amBlackListingEnabled | Bool | Whether AM Blacklisting is enabled |
disableFailureThreshold | Float | AM Blacklisting disable failure threshold |
Item | Data Type | Description |
---|---|---|
logIncludePattern | String | The log files which match the defined include pattern will be uploaded when the application finishes |
logExcludePattern | String | The log files which match the defined exclude pattern will not be uploaded when the application finishes |
rolledLogIncludePattern | String | The log files which match the defined include pattern will be aggregated in a rolling fashion |
rolledLogExcludePattern | String | The log files which match the defined exclude pattern will not be aggregated in a rolling fashion |
logAggregationPolicyClassName | String | The policy which will be used by NodeManager to aggregate the logs |
logAggregationPolicyParameters | String | The parameters passed to the policy class |
Method getApplicationStatus()
returns the current state of application.
guard let state = try yarn.getApplicationStatus(id: "application_1484231633049_0025") else {
// something wrong, must return
}
print(state)
Method setApplicationStatus()
can set the current state of application to a designated one.
try yarn.setApplicationStatus(id: "application_1484231633049_0025", state: .KILLED)
Valid States includes:
NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
Method getApplicationQueue()
returns the current queue name of application.
guard let queue = try yarn.getApplicationQueue(id: "application_1484231633049_0025") else {
// something wrong, must return
}
print(queue)
Method setApplicationQueue()
can set the current queue name of application to a designated one.
try yarn.setApplicationQueue(id: "application_1484231633049_0025", queue:"a1a")
Method getApplicationPriority()
returns the current priority of application.
guard let priority = try yarn.getApplicationPriority(id: "application_1484231633049_0025") else {
// something wrong, must return
}
print(priority)
Currently the returned priority is an integer such as 0
.
Method setApplicationPriority()
can set the current priority of application to a designated one.
try yarn.setApplicationPriority(id: "application_1484231633049_0025", priority: 1)
Method checkAppAttempts()
returns an array of Attempt object of application.
guard let attempts = try yarn.checkAppAttempts(id: "application_1484231633049_0025") else {
// something wrong, must return
}
attempts.forEach { attempt in
print(attempt.containerId)
print(attempt.id)
print(attempt.nodeHttpAddress)
print(attempt.nodeId)
print(attempt.startTime)
}//next
Once done, checkAppAttempts()
would return an array of AppAttempt object, as described below:
Item | Data Type | Description |
---|---|---|
id | String | The app attempt id |
nodeId | String | The node id of the node the attempt ran on |
nodeHttpAddress | String | The node http address of the node the attempt ran on |
logsLink | String | The http link to the app attempt logs |
containerId | String | The id of the container for the app attempt |
startTime | Int | The start time of the attempt (in ms since epoch) |
Method checkAppStatistics()
returns an array of statistic variables of application.
let sta = try yarn.checkAppStatistics(states: [APP.State.FINISHED, APP.State.RUNNING])
sta.forEach{ s in
print(s.count)
print(s.state)
print(s.type)
}//next s
checkAppStatistics()
allows user to perform a query with two additional parameters:
Parameter | Data Type | Description |
---|---|---|
states | [APP.State] | states of the applications. If states is not provided, the API will enumerate all application states and return the counts of them. |
applicationTypes | [String] | types of the applications. If applicationTypes is not provided, the API will count the applications of any application type. In this case, the response shows * to indicate any application type. Note that we only support at most one applicationType temporarily, such as applicationTypes: ["MapReduce"] |