diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 5507a9c5a4733..729980bbb7947 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -704,10 +704,36 @@ def get_instances(group_names): return (master_instances, slave_instances) +def get_connection_address(instance): + """ + Return some address that can be used to connect to instance. + It is common that VPC instance does not have either public DNS or public IP or private DNS. + Private DNS name might be problematic to use if you're VPN does not resolve private DNS. + But private IP is always there. + + TODO: maybe add static cache of instance-id to resolved address + + :type instance: :class:`ec2.instance.Instance` + :return: string + """ + import socket + addresses = [instance.public_dns_name, instance.ip_address, instance.private_dns_name] + for address in addresses: + if address: + try: + # try to resolve address + socket.gethostbyaddr(address) + return address + except (socket.gaierror, socket.herror): + pass + + return instance.private_ip_address + + # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): - master = master_nodes[0].public_dns_name + master = get_connection_address(master_nodes[0]) if deploy_ssh_key: print "Generating cluster's SSH key on master..." key_setup = """ @@ -719,8 +745,9 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) print "Transferring cluster's SSH key to slaves..." for slave in slave_nodes: - print slave.public_dns_name - ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) + slave_address = get_connection_address(slave) + print slave_address + ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon'] @@ -809,7 +836,7 @@ def is_cluster_ssh_available(cluster_instances, opts): Check if SSH is available on all the instances in a cluster. """ for i in cluster_instances: - if not is_ssh_available(host=i.ip_address, opts=opts): + if not is_ssh_available(host=get_connection_address(i), opts=opts): return False else: return True @@ -923,7 +950,7 @@ def get_num_disks(instance_type): # # root_dir should be an absolute path to the directory with the files we want to deploy. def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): - active_master = master_nodes[0].public_dns_name + active_master = get_connection_address(master_nodes[0]) num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" @@ -949,9 +976,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): modules = filter(lambda x: x != "tachyon", modules) template_vars = { - "master_list": '\n'.join([i.public_dns_name for i in master_nodes]), + "master_list": '\n'.join([get_connection_address(i) for i in master_nodes]), "active_master": active_master, - "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]), + "slave_list": '\n'.join([get_connection_address(i) for i in slave_nodes]), "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, @@ -1226,12 +1253,14 @@ def real_main(): elif action == "destroy": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) + for inst in master_nodes + slave_nodes: + print "> %s" % inst.public_dns_name if any(master_nodes + slave_nodes): print "The following instances will be terminated:" - for inst in master_nodes + slave_nodes: - print "> %s" % inst.public_dns_name - print "ALL DATA ON ALL NODES WILL BE LOST!!" + for inst in master_nodes + slave_nodes: + print "> %s" % get_connection_address(inst) + print "ALL DATA ON ALL NODES WILL BE LOST!!" msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) response = raw_input(msg) @@ -1294,7 +1323,7 @@ def real_main(): elif action == "login": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - master = master_nodes[0].public_dns_name + master = get_connection_address(master_nodes[0]) print "Logging into master " + master + "..." proxy_opt = [] if opts.proxy_port is not None: @@ -1318,7 +1347,7 @@ def real_main(): elif action == "get-master": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print master_nodes[0].public_dns_name + print get_connection_address(master_nodes[0]) elif action == "stop": response = raw_input(