Skip to content

Commit

Permalink
Fix manager.id vs managerServer.id typo that broke security tests.
Browse files Browse the repository at this point in the history
Intercept IOException that's now thrown when a sending connection is closed
while there are unacknowledged messages.
  • Loading branch information
JoshRosen committed Aug 6, 2014
1 parent 659521f commit b8bb4d4
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network

import java.io.IOException
import java.nio._

import org.apache.spark.{SecurityManager, SparkConf}
Expand Down Expand Up @@ -47,7 +48,7 @@ class ConnectionManagerSuite extends FunSuite {
buffer.flip

val bufferMessage = Message.createBufferMessage(buffer.duplicate)
Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)

assert(receivedMessage == true)

Expand Down Expand Up @@ -80,7 +81,7 @@ class ConnectionManagerSuite extends FunSuite {

(0 until count).map(i => {
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
Await.result(manager.sendMessageReliably(managerServer.id, bufferMessage), 10 seconds)
})

assert(numReceivedServerMessages == 10)
Expand Down Expand Up @@ -119,7 +120,10 @@ class ConnectionManagerSuite extends FunSuite {
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
// Expect managerServer to close connection, which we'll report as an error:
intercept[IOException] {
Await.result(manager.sendMessageReliably(managerServer.id, bufferMessage), 10 seconds)
}

assert(numReceivedServerMessages == 0)
assert(numReceivedMessages == 0)
Expand Down Expand Up @@ -164,6 +168,8 @@ class ConnectionManagerSuite extends FunSuite {
val g = Await.result(f, 1 second)
assert(false)
} catch {
case i: IOException =>
assert(true)
case e: TimeoutException => {
// we should timeout here since the client can't do the negotiation
assert(true)
Expand Down

0 comments on commit b8bb4d4

Please sign in to comment.