Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34520][CORE] Remove unused SecurityManager references #31636

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ object SparkEnv extends Logging {
}
}

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
val broadcastManager = new BroadcastManager(isDriver, conf)

val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
Expand Down Expand Up @@ -397,14 +397,13 @@ object SparkEnv extends Logging {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf, securityManager)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set(EXECUTOR_ID, executorId)
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,
securityManager)
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf)
ms.start(conf.get(METRICS_STATIC_SOURCES_ENABLED))
ms
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.broadcast

import scala.reflect.ClassTag

import org.apache.spark.SecurityManager
import org.apache.spark.SparkConf

/**
Expand All @@ -29,7 +28,7 @@ import org.apache.spark.SparkConf
*/
private[spark] trait BroadcastFactory {

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
def initialize(isDriver: Boolean, conf: SparkConf): Unit

/**
* Creates a new broadcast variable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ import scala.reflect.ClassTag

import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.internal.Logging

private[spark] class BroadcastManager(
val isDriver: Boolean,
conf: SparkConf,
securityManager: SecurityManager)
extends Logging {
val isDriver: Boolean, conf: SparkConf) extends Logging {

private var initialized = false
private var broadcastFactory: BroadcastFactory = null
Expand All @@ -44,7 +41,7 @@ private[spark] class BroadcastManager(
synchronized {
if (!initialized) {
broadcastFactory = new TorrentBroadcastFactory
broadcastFactory.initialize(isDriver, conf, securityManager)
broadcastFactory.initialize(isDriver, conf)
initialized = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.broadcast

import scala.reflect.ClassTag

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf

/**
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
Expand All @@ -28,8 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

override def initialize(isDriver: Boolean, conf: SparkConf,
securityMgr: SecurityManager): Unit = { }
override def initialize(isDriver: Boolean, conf: SparkConf): Unit = { }

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {
protected val masterMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.SHUFFLE_SERVICE,
sparkConf, securityManager)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.SHUFFLE_SERVICE, sparkConf)

private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ private[deploy] class Master(
Utils.checkHost(address.host)

private val masterMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.MASTER, conf, securityMgr)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.MASTER, conf)
private val applicationMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATIONS, conf, securityMgr)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATIONS, conf)
private val masterSource = new MasterSource(this)

// After onStart, webUi will be set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private[deploy] class Worker(
private var connectionAttemptCount = 0

private val metricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.WORKER, conf, securityMgr)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.WORKER, conf)
private val workerSource = new WorkerSource(this)

val reverseProxy = conf.get(UI_REVERSE_PROXY)
Expand Down
27 changes: 10 additions & 17 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable
import com.codahale.metrics.{Metric, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.sink.{MetricsServlet, PrometheusServlet, Sink}
Expand Down Expand Up @@ -68,10 +68,7 @@ import org.apache.spark.util.Utils
* [options] represent the specific property of this source or sink.
*/
private[spark] class MetricsSystem private (
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {
val instance: String, conf: SparkConf) extends Logging {

private[this] val metricsConfig = new MetricsConfig(conf)

Expand Down Expand Up @@ -200,21 +197,18 @@ private[spark] class MetricsSystem private (
try {
if (kv._1 == "servlet") {
val servlet = Utils.classForName[MetricsServlet](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
metricsServlet = Some(servlet)
} else if (kv._1 == "prometheusServlet") {
val servlet = Utils.classForName[PrometheusServlet](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
prometheusServlet = Some(servlet)
} else {
val sink = Utils.classForName[Sink](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
sinks += sink
}
} catch {
Expand Down Expand Up @@ -242,9 +236,8 @@ private[spark] object MetricsSystem {
}
}

def createMetricsSystem(
instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem = {
new MetricsSystem(instance, conf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.{ConsoleReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
private[spark] class ConsoleSink(
val property: Properties, val registry: MetricRegistry) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.{CsvReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
private[spark] class CsvSink(
val property: Properties, val registry: MetricRegistry) extends Sink {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
private[spark] class GraphiteSink(
val property: Properties, val registry: MetricRegistry) extends Sink {
val GRAPHITE_DEFAULT_PERIOD = 10
val GRAPHITE_DEFAULT_UNIT = "SECONDS"
val GRAPHITE_DEFAULT_PREFIX = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import java.util.Properties
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jmx.JmxReporter

import org.apache.spark.SecurityManager

private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
private[spark] class JmxSink(
val property: Properties, val registry: MetricRegistry) extends Sink {

val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ import com.codahale.metrics.json.MetricsModule
import com.fasterxml.jackson.databind.ObjectMapper
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.ui.JettyUtils._

private[spark] class MetricsServlet(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val property: Properties, val registry: MetricRegistry) extends Sink {

val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest
import com.codahale.metrics.MetricRegistry
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.ui.JettyUtils._

/**
Expand All @@ -34,10 +34,7 @@ import org.apache.spark.ui.JettyUtils._
* in terms of key string format.
*/
private[spark] class PrometheusServlet(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val property: Properties, val registry: MetricRegistry) extends Sink {

val SERVLET_KEY_PATH = "path"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.{MetricRegistry, Slf4jReporter}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class Slf4jSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val property: Properties, val registry: MetricRegistry) extends Sink {
val SLF4J_DEFAULT_PERIOD = 10
val SLF4J_DEFAULT_UNIT = "SECONDS"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit

import com.codahale.metrics.MetricRegistry

import org.apache.spark.SecurityManager
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem

Expand All @@ -41,10 +40,7 @@ private[spark] object StatsdSink {
}

private[spark] class StatsdSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink with Logging {
val property: Properties, val registry: MetricRegistry) extends Sink with Logging {
import StatsdSink._

val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
private val conf = new SparkConf

private def newTrackerMaster(sparkConf: SparkConf = conf) = {
val broadcastManager = new BroadcastManager(true, sparkConf,
new SecurityManager(sparkConf))
val broadcastManager = new BroadcastManager(true, sparkConf)
new MapOutputTrackerMaster(sparkConf, broadcastManager, true)
}

Expand Down
Loading