Skip to content

Commit

Permalink
[SPARK-34520][CORE] Remove unused SecurityManager references
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is kind of a followup of #24033 and #30945.
Many of references in `SecurityManager` were introduced from SPARK-1189, and related usages were removed later from #24033 and #30945. This PR proposes to remove them out.

### Why are the changes needed?

For better readability of codes.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually complied. GitHub Actions and Jenkins build should test it out as well.

Closes #31636 from HyukjinKwon/SPARK-34520.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
HyukjinKwon authored and dongjoon-hyun committed Feb 25, 2021
1 parent 22383e3 commit 8a1e172
Show file tree
Hide file tree
Showing 27 changed files with 64 additions and 102 deletions.
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ object SparkEnv extends Logging {
}
}

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
val broadcastManager = new BroadcastManager(isDriver, conf)

val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
Expand Down Expand Up @@ -397,14 +397,13 @@ object SparkEnv extends Logging {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf, securityManager)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set(EXECUTOR_ID, executorId)
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,
securityManager)
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf)
ms.start(conf.get(METRICS_STATIC_SOURCES_ENABLED))
ms
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.broadcast

import scala.reflect.ClassTag

import org.apache.spark.SecurityManager
import org.apache.spark.SparkConf

/**
Expand All @@ -29,7 +28,7 @@ import org.apache.spark.SparkConf
*/
private[spark] trait BroadcastFactory {

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
def initialize(isDriver: Boolean, conf: SparkConf): Unit

/**
* Creates a new broadcast variable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ import scala.reflect.ClassTag

import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.internal.Logging

private[spark] class BroadcastManager(
val isDriver: Boolean,
conf: SparkConf,
securityManager: SecurityManager)
extends Logging {
val isDriver: Boolean, conf: SparkConf) extends Logging {

private var initialized = false
private var broadcastFactory: BroadcastFactory = null
Expand All @@ -44,7 +41,7 @@ private[spark] class BroadcastManager(
synchronized {
if (!initialized) {
broadcastFactory = new TorrentBroadcastFactory
broadcastFactory.initialize(isDriver, conf, securityManager)
broadcastFactory.initialize(isDriver, conf)
initialized = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.broadcast

import scala.reflect.ClassTag

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf

/**
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
Expand All @@ -28,8 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

override def initialize(isDriver: Boolean, conf: SparkConf,
securityMgr: SecurityManager): Unit = { }
override def initialize(isDriver: Boolean, conf: SparkConf): Unit = { }

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {
protected val masterMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.SHUFFLE_SERVICE,
sparkConf, securityManager)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.SHUFFLE_SERVICE, sparkConf)

private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ private[deploy] class Master(
Utils.checkHost(address.host)

private val masterMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.MASTER, conf, securityMgr)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.MASTER, conf)
private val applicationMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATIONS, conf, securityMgr)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATIONS, conf)
private val masterSource = new MasterSource(this)

// After onStart, webUi will be set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private[deploy] class Worker(
private var connectionAttemptCount = 0

private val metricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.WORKER, conf, securityMgr)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.WORKER, conf)
private val workerSource = new WorkerSource(this)

val reverseProxy = conf.get(UI_REVERSE_PROXY)
Expand Down
27 changes: 10 additions & 17 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable
import com.codahale.metrics.{Metric, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.sink.{MetricsServlet, PrometheusServlet, Sink}
Expand Down Expand Up @@ -68,10 +68,7 @@ import org.apache.spark.util.Utils
* [options] represent the specific property of this source or sink.
*/
private[spark] class MetricsSystem private (
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {
val instance: String, conf: SparkConf) extends Logging {

private[this] val metricsConfig = new MetricsConfig(conf)

Expand Down Expand Up @@ -200,21 +197,18 @@ private[spark] class MetricsSystem private (
try {
if (kv._1 == "servlet") {
val servlet = Utils.classForName[MetricsServlet](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
metricsServlet = Some(servlet)
} else if (kv._1 == "prometheusServlet") {
val servlet = Utils.classForName[PrometheusServlet](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
prometheusServlet = Some(servlet)
} else {
val sink = Utils.classForName[Sink](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
sinks += sink
}
} catch {
Expand Down Expand Up @@ -242,9 +236,8 @@ private[spark] object MetricsSystem {
}
}

def createMetricsSystem(
instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem = {
new MetricsSystem(instance, conf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.{ConsoleReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
private[spark] class ConsoleSink(
val property: Properties, val registry: MetricRegistry) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.{CsvReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
private[spark] class CsvSink(
val property: Properties, val registry: MetricRegistry) extends Sink {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
private[spark] class GraphiteSink(
val property: Properties, val registry: MetricRegistry) extends Sink {
val GRAPHITE_DEFAULT_PERIOD = 10
val GRAPHITE_DEFAULT_UNIT = "SECONDS"
val GRAPHITE_DEFAULT_PREFIX = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import java.util.Properties
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jmx.JmxReporter

import org.apache.spark.SecurityManager

private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
private[spark] class JmxSink(
val property: Properties, val registry: MetricRegistry) extends Sink {

val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ import com.codahale.metrics.json.MetricsModule
import com.fasterxml.jackson.databind.ObjectMapper
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.ui.JettyUtils._

private[spark] class MetricsServlet(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val property: Properties, val registry: MetricRegistry) extends Sink {

val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest
import com.codahale.metrics.MetricRegistry
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.ui.JettyUtils._

/**
Expand All @@ -34,10 +34,7 @@ import org.apache.spark.ui.JettyUtils._
* in terms of key string format.
*/
private[spark] class PrometheusServlet(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val property: Properties, val registry: MetricRegistry) extends Sink {

val SERVLET_KEY_PATH = "path"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.{MetricRegistry, Slf4jReporter}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class Slf4jSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val property: Properties, val registry: MetricRegistry) extends Sink {
val SLF4J_DEFAULT_PERIOD = 10
val SLF4J_DEFAULT_UNIT = "SECONDS"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.MetricRegistry

import org.apache.spark.SecurityManager
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem

Expand All @@ -41,10 +40,7 @@ private[spark] object StatsdSink {
}

private[spark] class StatsdSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink with Logging {
val property: Properties, val registry: MetricRegistry) extends Sink with Logging {
import StatsdSink._

val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
private val conf = new SparkConf

private def newTrackerMaster(sparkConf: SparkConf = conf) = {
val broadcastManager = new BroadcastManager(true, sparkConf,
new SecurityManager(sparkConf))
val broadcastManager = new BroadcastManager(true, sparkConf)
new MapOutputTrackerMaster(sparkConf, broadcastManager, true)
}

Expand Down
Loading

0 comments on commit 8a1e172

Please sign in to comment.