Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix for SPARK-5242: "ec2/spark_ec2.py lauch" does not work with VPC if no public DNS or IP is available #4038

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,10 +704,36 @@ def get_instances(group_names):
return (master_instances, slave_instances)


def get_connection_address(instance):
"""
Return some address that can be used to connect to instance.
It is common that VPC instance does not have either public DNS or public IP or private DNS.
Private DNS name might be problematic to use if you're VPN does not resolve private DNS.
But private IP is always there.

TODO: maybe add static cache of instance-id to resolved address

:type instance: :class:`ec2.instance.Instance`
:return: string
"""
import socket
addresses = [instance.public_dns_name, instance.ip_address, instance.private_dns_name]
for address in addresses:
if address:
try:
# try to resolve address
socket.gethostbyaddr(address)
return address
except (socket.gaierror, socket.herror):
pass

return instance.private_ip_address


# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
master = get_connection_address(master_nodes[0])
if deploy_ssh_key:
print "Generating cluster's SSH key on master..."
key_setup = """
Expand All @@ -719,8 +745,9 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
print "Transferring cluster's SSH key to slaves..."
for slave in slave_nodes:
print slave.public_dns_name
ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
slave_address = get_connection_address(slave)
print slave_address
ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)

modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone', 'tachyon']
Expand Down Expand Up @@ -809,7 +836,7 @@ def is_cluster_ssh_available(cluster_instances, opts):
Check if SSH is available on all the instances in a cluster.
"""
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
if not is_ssh_available(host=get_connection_address(i), opts=opts):
return False
else:
return True
Expand Down Expand Up @@ -923,7 +950,7 @@ def get_num_disks(instance_type):
#
# root_dir should be an absolute path to the directory with the files we want to deploy.
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
active_master = master_nodes[0].public_dns_name
active_master = get_connection_address(master_nodes[0])

num_disks = get_num_disks(opts.instance_type)
hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
Expand All @@ -949,9 +976,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
modules = filter(lambda x: x != "tachyon", modules)

template_vars = {
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
"master_list": '\n'.join([get_connection_address(i) for i in master_nodes]),
"active_master": active_master,
"slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
"slave_list": '\n'.join([get_connection_address(i) for i in slave_nodes]),
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs,
Expand Down Expand Up @@ -1226,12 +1253,14 @@ def real_main():
elif action == "destroy":
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
for inst in master_nodes + slave_nodes:
print "> %s" % inst.public_dns_name

if any(master_nodes + slave_nodes):
print "The following instances will be terminated:"
for inst in master_nodes + slave_nodes:
print "> %s" % inst.public_dns_name
print "ALL DATA ON ALL NODES WILL BE LOST!!"
for inst in master_nodes + slave_nodes:
print "> %s" % get_connection_address(inst)
print "ALL DATA ON ALL NODES WILL BE LOST!!"

msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
response = raw_input(msg)
Expand Down Expand Up @@ -1294,7 +1323,7 @@ def real_main():

elif action == "login":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
master = master_nodes[0].public_dns_name
master = get_connection_address(master_nodes[0])
print "Logging into master " + master + "..."
proxy_opt = []
if opts.proxy_port is not None:
Expand All @@ -1318,7 +1347,7 @@ def real_main():

elif action == "get-master":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print master_nodes[0].public_dns_name
print get_connection_address(master_nodes[0])

elif action == "stop":
response = raw_input(
Expand Down