Skip to content

Commit

Permalink
Merge pull request #1148 from gvlproject/vpc_address_filtering
Browse files Browse the repository at this point in the history
Improved support for VPC Address filtering
  • Loading branch information
JackDanger authored Sep 15, 2017
2 parents a300171 + 8776129 commit b3ce255
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 25 deletions.
44 changes: 42 additions & 2 deletions moto/ec2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2959,6 +2959,27 @@ def get_cfn_attribute(self, attribute_name):
return self.allocation_id
raise UnformattedGetAttTemplateException()

def get_filter_value(self, filter_name):
if filter_name == 'allocation-id':
return self.allocation_id
elif filter_name == 'association-id':
return self.association_id
elif filter_name == 'domain':
return self.domain
elif filter_name == 'instance-id' and self.instance:
return self.instance.id
elif filter_name == 'network-interface-id' and self.eni:
return self.eni.id
elif filter_name == 'network-interface-owner-id':
msg = "The filter '{0}' for DescribeAddresses has not been" \
" implemented in Moto yet. Feel free to open an issue at" \
" https://github.com/spulec/moto/issues".format(filter_name)
raise NotImplementedError(msg)
elif filter_name == 'private-ip-address' and self.eni:
return self.eni.private_ip_address
elif filter_name == 'public-ip':
return self.public_ip


class ElasticAddressBackend(object):
def __init__(self):
Expand Down Expand Up @@ -3019,6 +3040,9 @@ def associate_address(self, instance=None, eni=None, address=None, allocation_id
if new_instance_association or new_eni_association or reassociate:
eip.instance = instance
eip.eni = eni
if not eip.eni and instance:
# default to primary network interface
eip.eni = instance.nics[0]
if eip.eni:
eip.eni.public_ip = eip.public_ip
if eip.domain == "vpc":
Expand All @@ -3030,8 +3054,24 @@ def associate_address(self, instance=None, eni=None, address=None, allocation_id

raise ResourceAlreadyAssociatedError(eip.public_ip)

def describe_addresses(self):
return self.addresses
def describe_addresses(self, allocation_ids=None, public_ips=None, filters=None):
matches = self.addresses
if allocation_ids:
matches = [addr for addr in matches
if addr.allocation_id in allocation_ids]
if len(allocation_ids) > len(matches):
unknown_ids = set(allocation_ids) - set(matches)
raise InvalidAllocationIdError(unknown_ids)
if public_ips:
matches = [addr for addr in matches
if addr.public_ip in public_ips]
if len(public_ips) > len(matches):
unknown_ips = set(allocation_ids) - set(matches)
raise InvalidAddressError(unknown_ips)
if filters:
matches = generic_filter(filters, matches)

return matches

def disassociate_address(self, address=None, association_id=None):
eips = []
Expand Down
29 changes: 6 additions & 23 deletions moto/ec2/responses/elastic_ip_addresses.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.ec2.utils import sequence_from_querystring
from moto.ec2.utils import filters_from_querystring, sequence_from_querystring


class ElasticIPAddresses(BaseResponse):
Expand Down Expand Up @@ -51,29 +51,12 @@ def associate_address(self):
return template.render(address=eip)

def describe_addresses(self):
allocation_ids = sequence_from_querystring('AllocationId', self.querystring)
public_ips = sequence_from_querystring('PublicIp', self.querystring)
filters = filters_from_querystring(self.querystring)
addresses = self.ec2_backend.describe_addresses(
allocation_ids, public_ips, filters)
template = self.response_template(DESCRIBE_ADDRESS_RESPONSE)

if "Filter.1.Name" in self.querystring:
filter_by = sequence_from_querystring(
"Filter.1.Name", self.querystring)[0]
filter_value = sequence_from_querystring(
"Filter.1.Value", self.querystring)
if filter_by == 'instance-id':
addresses = filter(lambda x: x.instance.id == filter_value[
0], self.ec2_backend.describe_addresses())
else:
raise NotImplementedError(
"Filtering not supported in describe_address.")
elif "PublicIp.1" in self.querystring:
public_ips = sequence_from_querystring(
"PublicIp", self.querystring)
addresses = self.ec2_backend.address_by_ip(public_ips)
elif "AllocationId.1" in self.querystring:
allocation_ids = sequence_from_querystring(
"AllocationId", self.querystring)
addresses = self.ec2_backend.address_by_allocation(allocation_ids)
else:
addresses = self.ec2_backend.describe_addresses()
return template.render(addresses=addresses)

def disassociate_address(self):
Expand Down
81 changes: 81 additions & 0 deletions tests/test_ec2/test_elastic_ip_addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,84 @@ def test_eip_describe_none():
cm.exception.code.should.equal('InvalidAddress.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none


@mock_ec2
def test_eip_filters():
service = boto3.resource('ec2', region_name='us-west-1')
client = boto3.client('ec2', region_name='us-west-1')
vpc_res = client.create_vpc(CidrBlock='10.0.0.0/24')
subnet_res = client.create_subnet(
VpcId=vpc_res['Vpc']['VpcId'], CidrBlock='10.0.0.0/24')

def create_inst_with_eip():
instance = service.create_instances(**{
'InstanceType': 't2.micro',
'ImageId': 'ami-test',
'MinCount': 1,
'MaxCount': 1,
'SubnetId': subnet_res['Subnet']['SubnetId']
})[0]
allocation_id = client.allocate_address(Domain='vpc')['AllocationId']
_ = client.associate_address(
InstanceId=instance.id,
AllocationId=allocation_id,
AllowReassociation=False)
instance.load()
address = service.VpcAddress(allocation_id)
address.load()
return instance, address

inst1, eip1 = create_inst_with_eip()
inst2, eip2 = create_inst_with_eip()
inst3, eip3 = create_inst_with_eip()

# Param search by AllocationId
addresses = list(service.vpc_addresses.filter(AllocationIds=[eip2.allocation_id]))
len(addresses).should.be.equal(1)
addresses[0].public_ip.should.equal(eip2.public_ip)
inst2.public_ip_address.should.equal(addresses[0].public_ip)

# Param search by PublicIp
addresses = list(service.vpc_addresses.filter(PublicIps=[eip3.public_ip]))
len(addresses).should.be.equal(1)
addresses[0].public_ip.should.equal(eip3.public_ip)
inst3.public_ip_address.should.equal(addresses[0].public_ip)

# Param search by Filter
def check_vpc_filter_valid(filter_name, filter_values):
addresses = list(service.vpc_addresses.filter(
Filters=[{'Name': filter_name,
'Values': filter_values}]))
len(addresses).should.equal(2)
ips = [addr.public_ip for addr in addresses]
set(ips).should.equal(set([eip1.public_ip, eip2.public_ip]))
ips.should.contain(inst1.public_ip_address)

def check_vpc_filter_invalid(filter_name):
addresses = list(service.vpc_addresses.filter(
Filters=[{'Name': filter_name,
'Values': ['dummy1', 'dummy2']}]))
len(addresses).should.equal(0)

def check_vpc_filter(filter_name, filter_values):
check_vpc_filter_valid(filter_name, filter_values)
check_vpc_filter_invalid(filter_name)

check_vpc_filter('allocation-id', [eip1.allocation_id, eip2.allocation_id])
check_vpc_filter('association-id', [eip1.association_id, eip2.association_id])
check_vpc_filter('instance-id', [inst1.id, inst2.id])
check_vpc_filter(
'network-interface-id',
[inst1.network_interfaces_attribute[0].get('NetworkInterfaceId'),
inst2.network_interfaces_attribute[0].get('NetworkInterfaceId')])
check_vpc_filter(
'private-ip-address',
[inst1.network_interfaces_attribute[0].get('PrivateIpAddress'),
inst2.network_interfaces_attribute[0].get('PrivateIpAddress')])
check_vpc_filter('public-ip', [inst1.public_ip_address, inst2.public_ip_address])

# all the ips are in a VPC
addresses = list(service.vpc_addresses.filter(
Filters=[{'Name': 'domain', 'Values': ['vpc']}]))
len(addresses).should.equal(3)

0 comments on commit b3ce255

Please sign in to comment.