diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go
index dedd9218a181..6a60bc308a96 100644
--- a/xds/internal/balancer/clusterresolver/clusterresolver.go
+++ b/xds/internal/balancer/clusterresolver/clusterresolver.go
@@ -280,7 +280,7 @@ func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bo
 	// EDS resource was removed. No action needs to be taken for this, and we
 	// should continue watching the same EDS resource.
 	if fromParent && xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
-		b.resourceWatcher.stop()
+		b.resourceWatcher.stop(false)
 	}
 
 	if b.child != nil {
@@ -326,7 +326,7 @@ func (b *clusterResolverBalancer) run() {
 		// Close results in stopping the endpoint resolvers and closing the
 		// underlying child policy and is the only way to exit this goroutine.
 		case <-b.closed.Done():
-			b.resourceWatcher.stop()
+			b.resourceWatcher.stop(true)
 
 			if b.child != nil {
 				b.child.Close()
diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go
index d544360bba63..f466dcca7bda 100644
--- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go
+++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go
@@ -18,9 +18,10 @@ package e2e_test
 
 import (
 	"context"
-	"errors"
 	"fmt"
+	"net"
 	"sort"
+	"strconv"
 	"strings"
 	"testing"
 	"time"
@@ -28,6 +29,7 @@ import (
 	"github.com/google/go-cmp/cmp"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/credentials/insecure"
 	"google.golang.org/grpc/internal"
 	"google.golang.org/grpc/internal/stubserver"
@@ -317,34 +319,41 @@ func (s) TestAggregateCluster_WithTwoEDSClusters_PrioritiesChange(t *testing.T)
 	}
 }
 
+func hostAndPortFromAddress(t *testing.T, addr string) (string, uint32) {
+	t.Helper()
+
+	host, p, err := net.SplitHostPort(addr)
+	if err != nil {
+		t.Fatalf("Invalid serving address: %v", addr)
+	}
+	port, err := strconv.ParseUint(p, 10, 32)
+	if err != nil {
+		t.Fatalf("Invalid serving port %q: %v", p, err)
+	}
+	return host, uint32(port)
+}
+
 // TestAggregateCluster_WithOneDNSCluster tests the case where the top-level
 // cluster resource is an aggregate cluster that resolves to a single
 // LOGICAL_DNS cluster. The test verifies that RPCs can be made to backends that
 // make up the LOGICAL_DNS cluster.
 func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) {
-	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
-	defer cleanup1()
-
 	// Start an xDS management server.
 	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
 	defer cleanup2()
 
-	// Start two test backends.
-	servers, cleanup3 := startTestServiceBackends(t, 2)
-	defer cleanup3()
-	addrs, _ := backendAddressesAndPorts(t, servers)
+	// Start a test service backend.
+	server := stubserver.StartTestService(t, nil)
+	defer server.Stop()
+	host, port := hostAndPortFromAddress(t, server.Address)
 
 	// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
-	const (
-		dnsClusterName = clusterName + "-dns"
-		dnsHostName    = "dns_host"
-		dnsPort        = uint32(8080)
-	)
+	const dnsClusterName = clusterName + "-dns"
 	resources := e2e.UpdateOptions{
 		NodeID: nodeID,
 		Clusters: []*v3clusterpb.Cluster{
 			makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
-			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
+			makeLogicalDNSClusterResource(dnsClusterName, host, uint32(port)),
 		},
 		SkipValidation: true,
 	}
@@ -359,19 +368,93 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) {
 	cc, cleanup := setupAndDial(t, bootstrapContents)
 	defer cleanup()
 
-	// Ensure that the DNS resolver is started for the expected target.
-	select {
-	case <-ctx.Done():
-		t.Fatal("Timeout when waiting for DNS resolver to be started")
-	case target := <-dnsTargetCh:
-		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
-		if got != want {
-			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
+	// Make an RPC and ensure that it gets routed to the first backend since the
+	// child policy for a LOGICAL_DNS cluster is pick_first by default.
+	client := testgrpc.NewTestServiceClient(cc)
+	peer := &peer.Peer{}
+	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
+		t.Fatalf("EmptyCall() failed: %v", err)
+	}
+	if peer.Addr.String() != server.Address {
+		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
+	}
+}
+
+// Tests the case where the top-level cluster resource is an aggregate cluster
+// that resolves to a single LOGICAL_DNS cluster. The specified dns hostname is
+// expected to fail url parsing. The test verifies that the channel moves to
+// TRANSIENT_FAILURE.
+func (s) TestAggregateCluster_WithOneDNSCluster_ParseFailure(t *testing.T) {
+	// Start an xDS management server.
+	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+	defer cleanup2()
+
+	// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
+	const dnsClusterName = clusterName + "-dns"
+	resources := e2e.UpdateOptions{
+		NodeID: nodeID,
+		Clusters: []*v3clusterpb.Cluster{
+			makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
+			makeLogicalDNSClusterResource(dnsClusterName, "%gh&%ij", uint32(8080)),
+		},
+		SkipValidation: true,
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+	if err := managementServer.Update(ctx, resources); err != nil {
+		t.Fatal(err)
+	}
+
+	// Create xDS client, configure cds_experimental LB policy with a manual
+	// resolver, and dial the test backends.
+	cc, cleanup := setupAndDial(t, bootstrapContents)
+	defer cleanup()
+
+	// Ensure that the ClientConn moves to TransientFailure.
+	for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
+		if !cc.WaitForStateChange(ctx, state) {
+			t.Fatalf("Timed out waiting for state change. got %v; want %v", state, connectivity.TransientFailure)
 		}
 	}
+}
 
-	// Update DNS resolver with test backend addresses.
-	dnsR.UpdateState(resolver.State{Addresses: addrs})
+// Tests the case where the top-level cluster resource is an aggregate cluster
+// that resolves to a single LOGICAL_DNS cluster. The test verifies that RPCs
+// can be made to backends that make up the LOGICAL_DNS cluster. The hostname of
+// the LOGICAL_DNS cluster is updated, and the test verifies that RPCs can be
+// made to backends that the new hostname resolves to.
+func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) {
+	// Start an xDS management server.
+	managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+	defer cleanup1()
+
+	// Start two test backends and extract their host and port. The first
+	// backend is used initially for the LOGICAL_DNS cluster and an update
+	// switches the cluster to use the second backend.
+	servers, cleanup2 := startTestServiceBackends(t, 2)
+	defer cleanup2()
+
+	// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
+	const dnsClusterName = clusterName + "-dns"
+	dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[0].Address)
+	resources := e2e.UpdateOptions{
+		NodeID: nodeID,
+		Clusters: []*v3clusterpb.Cluster{
+			makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
+			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
+		},
+		SkipValidation: true,
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+	if err := managementServer.Update(ctx, resources); err != nil {
+		t.Fatal(err)
+	}
+
+	// Create xDS client, configure cds_experimental LB policy with a manual
+	// resolver, and dial the test backends.
+	cc, cleanup := setupAndDial(t, bootstrapContents)
+	defer cleanup()
 
 	// Make an RPC and ensure that it gets routed to the first backend since the
 	// child policy for a LOGICAL_DNS cluster is pick_first by default.
@@ -380,8 +463,35 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) {
 	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
 		t.Fatalf("EmptyCall() failed: %v", err)
 	}
-	if peer.Addr.String() != addrs[0].Addr {
-		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
+	if peer.Addr.String() != servers[0].Address {
+		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, servers[0].Address)
+	}
+
+	// Update the LOGICAL_DNS cluster's hostname to point to the second backend.
+	dnsHostName, dnsPort = hostAndPortFromAddress(t, servers[1].Address)
+	resources = e2e.UpdateOptions{
+		NodeID: nodeID,
+		Clusters: []*v3clusterpb.Cluster{
+			makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
+			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
+		},
+		SkipValidation: true,
+	}
+	if err := managementServer.Update(ctx, resources); err != nil {
+		t.Fatal(err)
+	}
+
+	// Ensure that traffic moves to the second backend eventually.
+	for ctx.Err() == nil {
+		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
+			t.Fatalf("EmptyCall() failed: %v", err)
+		}
+		if peer.Addr.String() == servers[1].Address {
+			break
+		}
+	}
+	if ctx.Err() != nil {
+		t.Fatal("Timeout when waiting for RPCs to switch to the second backend")
 	}
 }
 
@@ -500,9 +610,6 @@ func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) {
 // cluster. The test verifies that RPCs are successful, this time to backends in
 // the DNS cluster.
 func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
-	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
-	defer cleanup1()
-
 	// Start an xDS management server.
 	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
 	defer cleanup2()
@@ -513,15 +620,12 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
 	servers, cleanup3 := startTestServiceBackends(t, 2)
 	defer cleanup3()
 	addrs, ports := backendAddressesAndPorts(t, servers)
+	dnsHostName, dnsPort := hostAndPortFromAddress(t, addrs[1].Addr)
 
 	// Configure an aggregate cluster pointing to a single EDS cluster. Also,
 	// configure the underlying EDS cluster (and the corresponding endpoints
 	// resource) and DNS cluster (will be used later in the test).
-	const (
-		dnsClusterName = clusterName + "-dns"
-		dnsHostName    = "dns_host"
-		dnsPort        = uint32(8080)
-	)
+	const dnsClusterName = clusterName + "-dns"
 	resources := e2e.UpdateOptions{
 		NodeID: nodeID,
 		Clusters: []*v3clusterpb.Cluster{
@@ -563,20 +667,6 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	// Ensure that the DNS resolver is started for the expected target.
-	select {
-	case <-ctx.Done():
-		t.Fatal("Timeout when waiting for DNS resolver to be started")
-	case target := <-dnsTargetCh:
-		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
-		if got != want {
-			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
-		}
-	}
-
-	// Update DNS resolver with test backend addresses.
-	dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
-
 	// Ensure that start getting routed to the backend corresponding to the
 	// LOGICAL_DNS cluster.
 	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
@@ -705,17 +795,14 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
 // the DNS resolver pushes an update, the test verifies that we switch to the
 // DNS cluster and can make a successful RPC.
 func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
-	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
-	defer cleanup1()
-
 	// Start an xDS management server.
 	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
 	defer cleanup2()
 
-	// Start two test backends.
-	servers, cleanup3 := startTestServiceBackends(t, 2)
-	defer cleanup3()
-	addrs, _ := backendAddressesAndPorts(t, servers)
+	// Start a test service backend.
+	server := stubserver.StartTestService(t, nil)
+	defer server.Stop()
+	dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address)
 
 	// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
 	// cluster. Also configure an empty endpoints resource for the EDS cluster
@@ -723,8 +810,6 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
 	const (
 		edsClusterName = clusterName + "-eds"
 		dnsClusterName = clusterName + "-dns"
-		dnsHostName    = "dns_host"
-		dnsPort        = uint32(8080)
 	)
 	nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
 	nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{
@@ -755,23 +840,9 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
 	cc, cleanup := setupAndDial(t, bootstrapContents)
 	defer cleanup()
 
-	// Ensure that the DNS resolver is started for the expected target.
-	select {
-	case <-ctx.Done():
-		t.Fatal("Timeout when waiting for DNS resolver to be started")
-	case target := <-dnsTargetCh:
-		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
-		if got != want {
-			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
-		}
-	}
-
-	// Update DNS resolver with test backend addresses.
-	dnsR.UpdateState(resolver.State{Addresses: addrs})
-
 	// Ensure that RPCs start getting routed to the first backend since the
 	// child policy for a LOGICAL_DNS cluster is pick_first by default.
-	pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0])
+	pickfirst.CheckRPCsToBackend(ctx, cc, resolver.Address{Addr: server.Address})
 }
 
 // TestAggregateCluster_BadDNS_GoodEDS tests the case where the top-level
@@ -780,34 +851,29 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
 // good update, this test verifies the cluster_resolver balancer correctly falls
 // back from the LOGICAL_DNS cluster to the EDS cluster.
 func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
-	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
-	defer cleanup1()
-
 	// Start an xDS management server.
 	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
 	defer cleanup2()
 
-	// Start two test backends.
-	servers, cleanup3 := startTestServiceBackends(t, 2)
-	defer cleanup3()
-	addrs, ports := backendAddressesAndPorts(t, servers)
+	// Start a test service backend.
+	server := stubserver.StartTestService(t, nil)
+	defer server.Stop()
+	_, edsPort := hostAndPortFromAddress(t, server.Address)
 
 	// Configure an aggregate cluster pointing to an LOGICAL_DNS and EDS
 	// cluster. Also configure an endpoints resource for the EDS cluster.
 	const (
 		edsClusterName = clusterName + "-eds"
 		dnsClusterName = clusterName + "-dns"
-		dnsHostName    = "dns_host"
-		dnsPort        = uint32(8080)
 	)
 	resources := e2e.UpdateOptions{
 		NodeID: nodeID,
 		Clusters: []*v3clusterpb.Cluster{
 			makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}),
-			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
+			makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080),
 			e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
 		},
-		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})},
+		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(edsPort)})},
 		SkipValidation: true,
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -821,20 +887,6 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
 	cc, cleanup := setupAndDial(t, bootstrapContents)
 	defer cleanup()
 
-	// Ensure that the DNS resolver is started for the expected target.
-	select {
-	case <-ctx.Done():
-		t.Fatal("Timeout when waiting for DNS resolver to be started")
-	case target := <-dnsTargetCh:
-		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
-		if got != want {
-			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
-		}
-	}
-
-	// Push an error through the DNS resolver.
-	dnsR.ReportError(errors.New("some error"))
-
 	// RPCs should work, higher level DNS cluster errors so should fallback to
 	// EDS cluster.
 	client := testgrpc.NewTestServiceClient(cc)
@@ -842,8 +894,8 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
 	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
 		t.Fatalf("EmptyCall() failed: %v", err)
 	}
-	if peer.Addr.String() != addrs[0].Addr {
-		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
+	if peer.Addr.String() != server.Address {
+		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
 	}
 }
 
@@ -854,9 +906,6 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
 // error, the test verifies that RPCs fail with the error triggered by the DNS
 // Discovery Mechanism (from sending an empty address list down).
 func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
-	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
-	defer cleanup1()
-
 	// Start an xDS management server.
 	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
 	defer cleanup2()
@@ -867,8 +916,6 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
 	const (
 		edsClusterName = clusterName + "-eds"
 		dnsClusterName = clusterName + "-dns"
-		dnsHostName    = "dns_host"
-		dnsPort        = uint32(8080)
 	)
 	emptyEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
 	resources := e2e.UpdateOptions{
@@ -876,7 +923,7 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
 		Clusters: []*v3clusterpb.Cluster{
 			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
 			e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
-			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
+			makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080),
 		},
 		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{emptyEndpointResource},
 		SkipValidation: true,
@@ -892,39 +939,20 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
 	cc, cleanup := setupAndDial(t, bootstrapContents)
 	defer cleanup()
 
-	// Make an RPC with a short deadline. We expect this RPC to not succeed
-	// because the EDS resource came back with no endpoints, and we are yet to
-	// push an update through the DNS resolver.
-	client := testgrpc.NewTestServiceClient(cc)
-	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
-	defer sCancel()
-	if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
-		t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
-	}
-
-	// Ensure that the DNS resolver is started for the expected target.
-	select {
-	case <-ctx.Done():
-		t.Fatal("Timeout when waiting for DNS resolver to be started")
-	case target := <-dnsTargetCh:
-		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
-		if got != want {
-			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
-		}
-	}
-
-	// Push an error from the DNS resolver as well.
-	dnsErr := fmt.Errorf("DNS error")
-	dnsR.ReportError(dnsErr)
-
 	// Ensure that the error from the DNS Resolver leads to an empty address
 	// update for both priorities.
-	_, err := client.EmptyCall(ctx, &testpb.Empty{})
-	if code := status.Code(err); code != codes.Unavailable {
-		t.Fatalf("EmptyCall() failed with code %s, want %s", code, codes.Unavailable)
+	client := testgrpc.NewTestServiceClient(cc)
+	for ctx.Err() == nil {
+		_, err := client.EmptyCall(ctx, &testpb.Empty{})
+		if err == nil {
+			t.Fatal("EmptyCall() succeeded when expected to fail")
+		}
+		if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") {
+			break
+		}
 	}
-	if err == nil || !strings.Contains(err.Error(), "produced zero addresses") {
-		t.Fatalf("EmptyCall() failed with error: %v, want: produced zero addresses", err)
+	if ctx.Err() != nil {
+		t.Fatalf("Timeout when waiting for RPCs to fail with expected code and error")
 	}
 }
 
@@ -937,9 +965,6 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
 // previously received good update and that RPCs still get routed to the EDS
 // cluster.
 func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) {
-	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
-	defer cleanup1()
-
 	// Start an xDS management server.
 	mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
 	defer cleanup2()
@@ -950,14 +975,13 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test
 	servers, cleanup3 := startTestServiceBackends(t, 2)
 	defer cleanup3()
 	addrs, ports := backendAddressesAndPorts(t, servers)
+	dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address)
 
 	// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also
 	// configure an endpoints resource for the EDS cluster.
 	const (
 		edsClusterName = clusterName + "-eds"
 		dnsClusterName = clusterName + "-dns"
-		dnsHostName    = "dns_host"
-		dnsPort        = uint32(8080)
 	)
 	resources := e2e.UpdateOptions{
 		NodeID: nodeID,
@@ -980,20 +1004,6 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test
 	cc, cleanup := setupAndDial(t, bootstrapContents)
 	defer cleanup()
 
-	// Ensure that the DNS resolver is started for the expected target.
-	select {
-	case <-ctx.Done():
-		t.Fatal("Timeout when waiting for DNS resolver to be started")
-	case target := <-dnsTargetCh:
-		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
-		if got != want {
-			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
-		}
-	}
-
-	// Update DNS resolver with test backend addresses.
-	dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
-
 	// Make an RPC and ensure that it gets routed to the first backend since the
 	// EDS cluster is of higher priority than the LOGICAL_DNS cluster.
 	client := testgrpc.NewTestServiceClient(cc)
@@ -1032,9 +1042,6 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test
 // the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response
 // as though it received an update with no endpoints.
 func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) {
-	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
-	defer cleanup1()
-
 	// Start an xDS management server.
 	mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
 	defer cleanup2()
@@ -1045,13 +1052,12 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes
 	servers, cleanup3 := startTestServiceBackends(t, 2)
 	defer cleanup3()
 	addrs, ports := backendAddressesAndPorts(t, servers)
+	dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address)
 
 	// Configure an aggregate cluster pointing to an EDS and DNS cluster.
 	const (
 		edsClusterName = clusterName + "-eds"
 		dnsClusterName = clusterName + "-dns"
-		dnsHostName    = "dns_host"
-		dnsPort        = uint32(8080)
 	)
 	resources := e2e.UpdateOptions{
 		NodeID: nodeID,
@@ -1080,20 +1086,6 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes
 	cc, cleanup := setupAndDial(t, bootstrapContents)
 	defer cleanup()
 
-	// Ensure that the DNS resolver is started for the expected target.
-	select {
-	case <-ctx.Done():
-		t.Fatal("Timeout when waiting for DNS resolver to be started")
-	case target := <-dnsTargetCh:
-		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
-		if got != want {
-			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
-		}
-	}
-
-	// Update DNS resolver with test backend addresses.
-	dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
-
 	// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
 	peer := &peer.Peer{}
 	client := testgrpc.NewTestServiceClient(cc)
@@ -1111,9 +1103,6 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes
 // cluster. The test verifies that the cluster_resolver LB policy falls back to
 // the LOGICAL_DNS cluster in this case.
 func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
-	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
-	defer cleanup1()
-
 	// Start an xDS management server.
 	mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
 	defer cleanup2()
@@ -1121,14 +1110,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
 	// Start a test backend for the LOGICAL_DNS cluster.
 	server := stubserver.StartTestService(t, nil)
 	defer server.Stop()
+	dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address)
 
 	// Configure an aggregate cluster pointing to an EDS and DNS cluster. No
 	// endpoints are configured for the EDS cluster.
 	const (
 		edsClusterName = clusterName + "-eds"
 		dnsClusterName = clusterName + "-dns"
-		dnsHostName    = "dns_host"
-		dnsPort        = uint32(8080)
 	)
 	resources := e2e.UpdateOptions{
 		NodeID: nodeID,
@@ -1177,35 +1165,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
 	}
 	defer cc.Close()
 
-	// Make an RPC with a short deadline. We expect this RPC to not succeed
-	// because the DNS resolver has not responded with endpoint addresses.
-	client := testgrpc.NewTestServiceClient(cc)
-	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
-	defer sCancel()
-	if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
-		t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
-	}
-
-	// Ensure that the DNS resolver is started for the expected target.
-	select {
-	case <-ctx.Done():
-		t.Fatal("Timeout when waiting for DNS resolver to be started")
-	case target := <-dnsTargetCh:
-		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
-		if got != want {
-			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
-		}
-	}
-
-	// Update DNS resolver with test backend addresses.
-	dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}})
-
 	// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
 	// Even though the EDS cluster is of higher priority, since the management
 	// server does not respond with an EDS resource, the cluster_resolver LB
 	// policy is expected to fallback to the LOGICAL_DNS cluster once the watch
 	// timeout expires.
 	peer := &peer.Peer{}
+	client := testgrpc.NewTestServiceClient(cc)
 	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
 		t.Fatalf("EmptyCall() failed: %v", err)
 	}
diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go
index aaababa71c57..b9a81e9ba829 100644
--- a/xds/internal/balancer/clusterresolver/resource_resolver.go
+++ b/xds/internal/balancer/clusterresolver/resource_resolver.go
@@ -19,9 +19,11 @@
 package clusterresolver
 
 import (
+	"context"
 	"sync"
 
 	"google.golang.org/grpc/internal/grpclog"
+	"google.golang.org/grpc/internal/grpcsync"
 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
 )
 
@@ -83,9 +85,11 @@ type discoveryMechanismAndResolver struct {
 }
 
 type resourceResolver struct {
-	parent        *clusterResolverBalancer
-	logger        *grpclog.PrefixLogger
-	updateChannel chan *resourceUpdate
+	parent           *clusterResolverBalancer
+	logger           *grpclog.PrefixLogger
+	updateChannel    chan *resourceUpdate
+	serializer       *grpcsync.CallbackSerializer
+	serializerCancel context.CancelFunc
 
 	// mu protects the slice and map, and content of the resolvers in the slice.
 	mu         sync.Mutex
@@ -106,12 +110,16 @@ type resourceResolver struct {
 }
 
 func newResourceResolver(parent *clusterResolverBalancer, logger *grpclog.PrefixLogger) *resourceResolver {
-	return &resourceResolver{
+	rr := &resourceResolver{
 		parent:        parent,
 		logger:        logger,
 		updateChannel: make(chan *resourceUpdate, 1),
 		childrenMap:   make(map[discoveryMechanismKey]discoveryMechanismAndResolver),
 	}
+	ctx, cancel := context.WithCancel(context.Background())
+	rr.serializer = grpcsync.NewCallbackSerializer(ctx)
+	rr.serializerCancel = cancel
+	return rr
 }
 
 func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool {
@@ -210,8 +218,9 @@ func (rr *resourceResolver) resolveNow() {
 	}
 }
 
-func (rr *resourceResolver) stop() {
+func (rr *resourceResolver) stop(closing bool) {
 	rr.mu.Lock()
+
 	// Save the previous childrenMap to stop the children outside the mutex,
 	// and reinitialize the map.  We only need to reinitialize to allow for the
 	// policy to be reused if the resource comes back.  In practice, this does
@@ -222,12 +231,18 @@ func (rr *resourceResolver) stop() {
 	rr.childrenMap = make(map[discoveryMechanismKey]discoveryMechanismAndResolver)
 	rr.mechanisms = nil
 	rr.children = nil
+
 	rr.mu.Unlock()
 
 	for _, r := range cm {
 		r.r.stop()
 	}
 
+	if closing {
+		rr.serializerCancel()
+		<-rr.serializer.Done()
+	}
+
 	// stop() is called when the LB policy is closed or when the underlying
 	// cluster resource is removed by the management server. In the latter case,
 	// an empty config update needs to be pushed to the child policy to ensure
@@ -272,7 +287,9 @@ func (rr *resourceResolver) generateLocked() {
 }
 
 func (rr *resourceResolver) onUpdate() {
-	rr.mu.Lock()
-	rr.generateLocked()
-	rr.mu.Unlock()
+	rr.serializer.Schedule(func(context.Context) {
+		rr.mu.Lock()
+		rr.generateLocked()
+		rr.mu.Unlock()
+	})
 }