diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py
index 4bdc0b11..403ff659 100644
--- a/airbyte/_util/api_util.py
+++ b/airbyte/_util/api_util.py
@@ -14,7 +14,7 @@
 from __future__ import annotations
 
 import json
-from typing import Any
+from typing import TYPE_CHECKING, Any
 
 import airbyte_api
 from airbyte_api import api, models
@@ -24,9 +24,14 @@
     AirbyteError,
     AirbyteMissingResourceError,
     AirbyteMultipleResourcesError,
+    PyAirbyteInputError,
 )
 
 
+if TYPE_CHECKING:
+    from collections.abc import Callable
+
+
 JOB_WAIT_INTERVAL_SECS = 2.0
 JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60  # 1 hour
 CLOUD_API_ROOT = "https://api.airbyte.com/v1"
@@ -43,7 +48,7 @@ def get_airbyte_server_instance(
     *,
     api_key: str,
     api_root: str,
-) -> airbyte_api.Airbyte:
+) -> airbyte_api.AirbyteAPI:
     """Get an Airbyte instance."""
     return airbyte_api.AirbyteAPI(
         security=models.Security(
@@ -84,7 +89,7 @@ def get_workspace(
     )
 
 
-# List, get, and run connections
+# List resources
 
 
 def list_connections(
@@ -92,28 +97,118 @@ def list_connections(
     *,
     api_root: str,
     api_key: str,
-) -> list[api.ConnectionResponse]:
+    name: str | None = None,
+    name_filter: Callable[[str], bool] | None = None,
+) -> list[models.ConnectionResponse]:
     """Get a connection."""
+    if name and name_filter:
+        raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")
+
+    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+
     _ = workspace_id  # Not used (yet)
     airbyte_instance = get_airbyte_server_instance(
         api_key=api_key,
         api_root=api_root,
     )
     response = airbyte_instance.connections.list_connections(
-        api.ListConnectionsRequest()(
+        api.ListConnectionsRequest(
             workspace_ids=[workspace_id],
         ),
     )
 
-    if status_ok(response.status_code) and response.connections_response:
-        return response.connections_response.data
+    if not status_ok(response.status_code) and response.connections_response:
+        raise AirbyteError(
+            context={
+                "workspace_id": workspace_id,
+                "response": response,
+            }
+        )
+    assert response.connections_response is not None
+    return [
+        connection
+        for connection in response.connections_response.data
+        if name_filter(connection.name)
+    ]
+
 
-    raise AirbyteError(
-        context={
-            "workspace_id": workspace_id,
-            "response": response,
-        }
+def list_sources(
+    workspace_id: str,
+    *,
+    api_root: str,
+    api_key: str,
+    name: str | None = None,
+    name_filter: Callable[[str], bool] | None = None,
+) -> list[models.SourceResponse]:
+    """Get a connection."""
+    if name and name_filter:
+        raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")
+
+    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+
+    _ = workspace_id  # Not used (yet)
+    airbyte_instance = get_airbyte_server_instance(
+        api_key=api_key,
+        api_root=api_root,
+    )
+    response = airbyte_instance.sources.list_sources(
+        api.ListSourcesRequest(
+            workspace_ids=[workspace_id],
+        ),
+    )
+
+    if not status_ok(response.status_code) and response.sources_response:
+        raise AirbyteError(
+            context={
+                "workspace_id": workspace_id,
+                "response": response,
+            }
+        )
+    assert response.sources_response is not None
+    return [source for source in response.sources_response.data if name_filter(source.name)]
+
+
+def list_destinations(
+    workspace_id: str,
+    *,
+    api_root: str,
+    api_key: str,
+    name: str | None = None,
+    name_filter: Callable[[str], bool] | None = None,
+) -> list[models.DestinationResponse]:
+    """Get a connection."""
+    if name and name_filter:
+        raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")
+
+    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+
+    _ = workspace_id  # Not used (yet)
+    airbyte_instance = get_airbyte_server_instance(
+        api_key=api_key,
+        api_root=api_root,
     )
+    response = airbyte_instance.destinations.list_destinations(
+        api.ListDestinationsRequest(
+            workspace_ids=[workspace_id],
+        ),
+    )
+
+    if not status_ok(response.status_code) and response.destinations_response:
+        raise AirbyteError(
+            context={
+                "workspace_id": workspace_id,
+                "response": response,
+            }
+        )
+    assert response.destinations_response is not None
+    return [
+        destination
+        for destination in response.destinations_response.data
+        if name_filter(destination.name)
+    ]
+
+
+# Get and run connections
 
 
 def get_connection(
@@ -122,7 +217,7 @@ def get_connection(
     *,
     api_root: str,
     api_key: str,
-) -> api.ConnectionResponse:
+) -> models.ConnectionResponse:
     """Get a connection."""
     _ = workspace_id  # Not used (yet)
     airbyte_instance = get_airbyte_server_instance(
@@ -137,7 +232,11 @@ def get_connection(
     if status_ok(response.status_code) and response.connection_response:
         return response.connection_response
 
-    raise AirbyteMissingResourceError(connection_id, "connection", response.text)
+    raise AirbyteMissingResourceError(
+        resource_name_or_id=connection_id,
+        resource_type="connection",
+        log_text=response.raw_response.text,
+    )
 
 
 def run_connection(
@@ -146,7 +245,7 @@ def run_connection(
     *,
     api_root: str,
     api_key: str,
-) -> api.ConnectionResponse:
+) -> models.JobResponse:
     """Get a connection.
 
     If block is True, this will block until the connection is finished running.
@@ -186,7 +285,7 @@ def get_job_logs(
     *,
     api_root: str,
     api_key: str,
-) -> list[api.JobResponse]:
+) -> list[models.JobResponse]:
     """Get a job's logs."""
     airbyte_instance = get_airbyte_server_instance(
         api_key=api_key,
@@ -213,11 +312,11 @@ def get_job_logs(
 
 
 def get_job_info(
-    job_id: str,
+    job_id: int,
     *,
     api_root: str,
     api_key: str,
-) -> api.JobResponse:
+) -> models.JobResponse:
     """Get a job."""
     airbyte_instance = get_airbyte_server_instance(
         api_key=api_key,
@@ -231,7 +330,11 @@ def get_job_info(
     if status_ok(response.status_code) and response.job_response:
         return response.job_response
 
-    raise AirbyteMissingResourceError(job_id, "job", response.text)
+    raise AirbyteMissingResourceError(
+        resource_name_or_id=str(job_id),
+        resource_type="job",
+        log_text=response.raw_response.text,
+    )
 
 
 # Create, get, and delete sources
@@ -244,7 +347,7 @@ def create_source(
     config: dict[str, Any],
     api_root: str,
     api_key: str,
-) -> api.SourceResponse:
+) -> models.SourceResponse:
     """Get a connection."""
     airbyte_instance = get_airbyte_server_instance(
         api_key=api_key,
@@ -273,7 +376,7 @@ def get_source(
     *,
     api_root: str,
     api_key: str,
-) -> api.SourceResponse:
+) -> models.SourceResponse:
     """Get a connection."""
     airbyte_instance = get_airbyte_server_instance(
         api_key=api_key,
@@ -284,10 +387,14 @@ def get_source(
             source_id=source_id,
         ),
     )
-    if status_ok(response.status_code) and response.connection_response:
-        return response.connection_response
+    if status_ok(response.status_code) and response.source_response:
+        return response.source_response
 
-    raise AirbyteMissingResourceError(source_id, "source", response.text)
+    raise AirbyteMissingResourceError(
+        resource_name_or_id=source_id,
+        resource_type="source",
+        log_text=response.raw_response.text,
+    )
 
 
 def delete_source(
@@ -327,7 +434,7 @@ def create_destination(
     config: dict[str, Any],
     api_root: str,
     api_key: str,
-) -> api.DestinationResponse:
+) -> models.DestinationResponse:
     """Get a connection."""
     airbyte_instance = get_airbyte_server_instance(
         api_key=api_key,
@@ -354,7 +461,7 @@ def get_destination(
     *,
     api_root: str,
     api_key: str,
-) -> api.DestinationResponse:
+) -> models.DestinationResponse:
     """Get a connection."""
     airbyte_instance = get_airbyte_server_instance(
         api_key=api_key,
@@ -365,33 +472,38 @@ def get_destination(
             destination_id=destination_id,
         ),
     )
-    if status_ok(response.status_code):
+    if status_ok(response.status_code) and response.destination_response:
         # TODO: This is a temporary workaround to resolve an issue where
         # the destination API response is of the wrong type.
         # https://github.com/airbytehq/pyairbyte/issues/320
         raw_response: dict[str, Any] = json.loads(response.raw_response.text)
         raw_configuration: dict[str, Any] = raw_response["configuration"]
+
         destination_type = raw_response.get("destinationType")
         if destination_type == "snowflake":
-            response.destination_response.configuration = models.DestinationSnowflake.from_dict(
-                raw_configuration,
+            response.destination_response.configuration = models.DestinationSnowflake(
+                **raw_configuration,
             )
         if destination_type == "bigquery":
-            response.destination_response.configuration = models.DestinationBigquery.from_dict(
-                raw_configuration,
+            response.destination_response.configuration = models.DestinationBigquery(
+                **raw_configuration,
             )
         if destination_type == "postgres":
-            response.destination_response.configuration = models.DestinationPostgres.from_dict(
-                raw_configuration,
+            response.destination_response.configuration = models.DestinationPostgres(
+                **raw_configuration,
             )
         if destination_type == "duckdb":
-            response.destination_response.configuration = models.DestinationDuckdb.from_dict(
-                raw_configuration,
+            response.destination_response.configuration = models.DestinationDuckdb(
+                **raw_configuration,
             )
 
         return response.destination_response
 
-    raise AirbyteMissingResourceError(destination_id, "destination", response.text)
+    raise AirbyteMissingResourceError(
+        resource_name_or_id=destination_id,
+        resource_type="destination",
+        log_text=response.raw_response.text,
+    )
 
 
 def delete_destination(
@@ -448,17 +560,17 @@ def create_connection(
             )
             stream_configurations.append(stream_configuration)
 
-    stream_configurations = models.StreamConfigurations(stream_configurations)
+    stream_configurations_obj = models.StreamConfigurations(stream_configurations)
     response = airbyte_instance.connections.create_connection(
         models.ConnectionCreateRequest(
             name=name,
             source_id=source_id,
             destination_id=destination_id,
-            configurations=stream_configurations,
+            configurations=stream_configurations_obj,
             prefix=prefix,
         ),
     )
-    if not status_ok(response.status_code):
+    if not status_ok(response.status_code) or response.connection_response is None:
         raise AirbyteError(
             context={
                 "source_id": source_id,
diff --git a/airbyte/_util/text_util.py b/airbyte/_util/text_util.py
new file mode 100644
index 00000000..ed3578d3
--- /dev/null
+++ b/airbyte/_util/text_util.py
@@ -0,0 +1,22 @@
+# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+"""Utility functions for working with text."""
+
+from __future__ import annotations
+
+import ulid
+
+
+def generate_ulid() -> str:
+    """Generate a new ULID."""
+    return str(ulid.ULID())
+
+
+def generate_random_suffix() -> str:
+    """Generate a random suffix for use in temporary names.
+
+    By default, this function generates a ULID and returns a 9-character string
+    which will be monotonically sortable. It is not guaranteed to be unique but
+    is sufficient for small-scale and medium-scale use cases.
+    """
+    ulid_str = generate_ulid()
+    return ulid_str[:6] + ulid_str[-3:]
diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py
index 775a64b3..dd1d740e 100644
--- a/airbyte/caches/base.py
+++ b/airbyte/caches/base.py
@@ -60,10 +60,6 @@ class CacheBase(SqlConfig, AirbyteWriterInterface):
 
     _name: str = PrivateAttr()
 
-    _deployed_api_root: str | None = PrivateAttr(default=None)
-    _deployed_workspace_id: str | None = PrivateAttr(default=None)
-    _deployed_destination_id: str | None = PrivateAttr(default=None)
-
     _sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
     _read_processor: SqlProcessorBase = PrivateAttr()
 
diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py
index b4543a1f..0a725bb8 100644
--- a/airbyte/cloud/connections.py
+++ b/airbyte/cloud/connections.py
@@ -6,6 +6,7 @@
 from typing import TYPE_CHECKING, cast
 
 from airbyte._util import api_util
+from airbyte.cloud.connectors import CloudDestination, CloudSource
 from airbyte.cloud.sync_results import SyncResult
 
 
@@ -68,6 +69,14 @@ def source_id(self) -> str:
 
         return cast(str, self._source_id)
 
+    @property
+    def source(self) -> CloudSource:
+        """Get the source object."""
+        return CloudSource(
+            workspace=self.workspace,
+            connector_id=self.source_id,
+        )
+
     @property
     def destination_id(self) -> str:
         """The ID of the destination."""
@@ -79,13 +88,21 @@ def destination_id(self) -> str:
 
         return cast(str, self._destination_id)
 
+    @property
+    def destination(self) -> CloudDestination:
+        """Get the source object."""
+        return CloudDestination(
+            workspace=self.workspace,
+            connector_id=self.destination_id,
+        )
+
     @property
     def stream_names(self) -> list[str]:
         """The stream names."""
         if not self._connection_info:
             self._connection_info = self._fetch_connection_info()
 
-        return [stream.name for stream in self._connection_info.configurations.streams]
+        return [stream.name for stream in self._connection_info.configurations.streams or []]
 
     @property
     def table_prefix(self) -> str:
@@ -93,7 +110,7 @@ def table_prefix(self) -> str:
         if not self._connection_info:
             self._connection_info = self._fetch_connection_info()
 
-        return self._connection_info.prefix
+        return self._connection_info.prefix or ""
 
     @property
     def connection_url(self) -> str | None:
@@ -154,7 +171,7 @@ def get_previous_sync_logs(
             SyncResult(
                 workspace=self.workspace,
                 connection=self,
-                job_id=sync_log.job_id,
+                job_id=str(sync_log.job_id),
                 _latest_job_info=sync_log,
             )
             for sync_log in sync_logs
@@ -189,28 +206,22 @@ def get_sync_result(
 
     # Deletions
 
-    def _permanently_delete(
+    def permanently_delete(
         self,
         *,
-        delete_source: bool = False,
-        delete_destination: bool = False,
+        cascade_delete_source: bool = False,
+        cascade_delete_destination: bool = False,
     ) -> None:
         """Delete the connection.
 
         Args:
-            delete_source: Whether to also delete the source.
-            delete_destination: Whether to also delete the destination.
+            cascade_delete_source: Whether to also delete the source.
+            cascade_delete_destination: Whether to also delete the destination.
         """
-        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
-            connection=self
-        )
+        self.workspace.permanently_delete_connection(self)
 
-        if delete_source:
-            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
-                source=self.source_id
-            )
+        if cascade_delete_source:
+            self.workspace.permanently_delete_source(self.source_id)
 
-        if delete_destination:
-            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
-                destination=self.destination_id,
-            )
+        if cascade_delete_destination:
+            self.workspace.permanently_delete_destination(self.destination_id)
diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py
new file mode 100644
index 00000000..eb8a5e68
--- /dev/null
+++ b/airbyte/cloud/connectors.py
@@ -0,0 +1,81 @@
+# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+"""Cloud connectors module for working with Cloud sources and destinations."""
+
+from __future__ import annotations
+
+import abc
+from typing import TYPE_CHECKING, Literal
+
+
+if TYPE_CHECKING:
+    from airbyte.cloud.workspaces import CloudWorkspace
+
+
+class CloudConnector(abc.ABC):
+    """A cloud connector is a deployed source or destination on Airbyte Cloud.
+
+    You can use a connector object to manage the connector.
+    """
+
+    @property
+    @abc.abstractmethod
+    def connector_type(self) -> Literal["source", "destination"]:
+        """Get the type of the connector."""
+        ...
+
+    def __init__(
+        self,
+        workspace: CloudWorkspace,
+        connector_id: str,
+    ) -> None:
+        """Initialize a cloud connector object."""
+        self.workspace = workspace
+        """The workspace that the connector belongs to."""
+        self.connector_id = connector_id
+        """The ID of the connector."""
+
+    @property
+    def connector_url(self) -> str:
+        """Get the URL of the source connector."""
+        return f"{self.workspace.workspace_url}/{self.connector_type}s/{self.connector_id}"
+
+    def permanently_delete(self) -> None:
+        """Permanently delete the connector."""
+        if self.connector_type == "source":
+            self.workspace.permanently_delete_source(self.connector_id)
+        else:
+            self.workspace.permanently_delete_destination(self.connector_id)
+
+
+class CloudSource(CloudConnector):
+    """A cloud source is a source that is deployed on Airbyte Cloud."""
+
+    @property
+    def source_id(self) -> str:
+        """Get the ID of the source.
+
+        This is an alias for `connector_id`.
+        """
+        return self.connector_id
+
+    @property
+    def connector_type(self) -> Literal["source", "destination"]:
+        """Get the type of the connector."""
+        return "source"
+
+
+class CloudDestination(CloudConnector):
+    """A cloud destination is a destination that is deployed on Airbyte Cloud."""
+
+    @property
+    def destination_id(self) -> str:
+        """Get the ID of the destination.
+
+        This is an alias for `connector_id`.
+        """
+        return self.connector_id
+
+    @property
+    def connector_type(self) -> Literal["source", "destination"]:
+        """Get the type of the connector."""
+        return "destination"
diff --git a/airbyte/cloud/experimental.py b/airbyte/cloud/experimental.py
deleted file mode 100644
index c5ccf23a..00000000
--- a/airbyte/cloud/experimental.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
-"""Experimental features for interacting with the Airbyte Cloud API.
-
-You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These
-features are subject to change and may not be available in all environments. **Future versions of
-PyAirbyte may remove or change these features without notice.**
-
-To use this module, replace an import like this:
-
-```python
-from airbyte.cloud import CloudConnection, CloudWorkspace
-```
-
-with an import like this:
-
-```python
-from airbyte.cloud.experimental import CloudConnection, CloudWorkspace
-```
-
-You can toggle between the stable and experimental versions of these classes by changing the import
-path. This allows you to test new features without requiring substantial changes to your codebase.
-
-"""
-# ruff: noqa: SLF001  # This file accesses private members of other classes.
-
-from __future__ import annotations
-
-import warnings
-
-from airbyte import exceptions as exc
-from airbyte.cloud.connections import CloudConnection as Stable_CloudConnection
-from airbyte.cloud.workspaces import CloudWorkspace as Stable_CloudWorkspace
-
-
-# This module is not imported anywhere by default, so this warning should only print if the user
-# explicitly imports it.
-warnings.warn(
-    message="The `airbyte.cloud.experimental` module is experimental and may change in the future.",
-    category=exc.AirbyteExperimentalFeatureWarning,
-    stacklevel=2,
-)
-
-
-class CloudWorkspace(Stable_CloudWorkspace):  # noqa: D101  # Docstring inherited from parent.
-    __doc__ = (
-        f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}"
-    )
-    deploy_connection = Stable_CloudWorkspace._deploy_connection
-    deploy_source = Stable_CloudWorkspace._deploy_source
-    deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination
-    permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection
-    permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source
-    permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination
-
-
-class CloudConnection(Stable_CloudConnection):  # noqa: D101  # Docstring inherited from parent.
-    __doc__ = (
-        f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}"
-    )
-    permanently_delete = Stable_CloudConnection._permanently_delete
diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py
index 6bd52a24..915935ec 100644
--- a/airbyte/cloud/workspaces.py
+++ b/airbyte/cloud/workspaces.py
@@ -11,25 +11,17 @@
 from typing import TYPE_CHECKING
 
 from airbyte import exceptions as exc
-from airbyte._util.api_util import (
-    CLOUD_API_ROOT,
-    create_connection,
-    create_destination,
-    create_source,
-    delete_connection,
-    delete_destination,
-    delete_source,
-    get_workspace,
-)
-from airbyte.cloud._destination_util import get_destination_config_from_cache
+from airbyte._util import api_util, text_util
 from airbyte.cloud.connections import CloudConnection
+from airbyte.cloud.connectors import CloudDestination, CloudSource
 from airbyte.cloud.sync_results import SyncResult
-from airbyte.sources.base import Source
 
 
 if TYPE_CHECKING:
-    from airbyte._util.api_imports import DestinationResponse
-    from airbyte.caches.base import CacheBase
+    from collections.abc import Callable
+
+    from airbyte.destinations.base import Destination
+    from airbyte.sources.base import Source
 
 
 @dataclass
@@ -42,7 +34,7 @@ class CloudWorkspace:
 
     workspace_id: str
     api_key: str
-    api_root: str = CLOUD_API_ROOT
+    api_root: str = api_util.CLOUD_API_ROOT
 
     @property
     def workspace_url(self) -> str | None:
@@ -58,194 +50,193 @@ def connect(self) -> None:
               serves primarily as a simple check to ensure that the workspace is reachable
               and credentials are correct.
         """
-        _ = get_workspace(
+        _ = api_util.get_workspace(
             api_root=self.api_root,
             api_key=self.api_key,
             workspace_id=self.workspace_id,
         )
         print(f"Successfully connected to workspace: {self.workspace_url}")
 
-    # Deploy and delete sources
+    # Deploy sources and destinations
 
-    # TODO: Make this a public API
-    # https://github.com/airbytehq/pyairbyte/issues/228
-    def _deploy_source(
+    def deploy_source(
         self,
+        name: str,
         source: Source,
-    ) -> str:
+        *,
+        unique: bool = True,
+        random_name_suffix: bool = False,
+    ) -> CloudSource:
         """Deploy a source to the workspace.
 
-        Returns the newly deployed source ID.
+        Returns the newly deployed source.
+
+        Args:
+            name: The name to use when deploying.
+            source: The source object to deploy.
+            unique: Whether to require a unique name. If `True`, duplicate names
+                are not allowed. Defaults to `True`.
+            random_name_suffix: Whether to append a random suffix to the name.
         """
         source_configuration = source.get_config().copy()
         source_configuration["sourceType"] = source.name.replace("source-", "")
 
-        deployed_source = create_source(
-            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
+        if random_name_suffix:
+            name += f" (ID: {text_util.generate_random_suffix()})"
+
+        if unique:
+            existing = self.list_sources(name=name)
+            if existing:
+                raise exc.AirbyteDuplicateResourcesError(
+                    resource_type="destination",
+                    resource_name=name,
+                )
+
+        deployed_source = api_util.create_source(
+            name=name,
             api_root=self.api_root,
             api_key=self.api_key,
             workspace_id=self.workspace_id,
             config=source_configuration,
         )
+        return CloudSource(
+            workspace=self,
+            connector_id=deployed_source.source_id,
+        )
 
-        # Set the deployment Ids on the source object
-        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
-        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
-        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
-
-        return deployed_source.source_id
-
-    def _permanently_delete_source(
+    def deploy_destination(
         self,
-        source: str | Source,
-    ) -> None:
-        """Delete a source from the workspace.
-
-        You can pass either the source ID `str` or a deployed `Source` object.
-        """
-        if not isinstance(source, str | Source):
-            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
-
-        if isinstance(source, Source):
-            if not source._deployed_source_id:  # noqa: SLF001
-                raise ValueError("Source has not been deployed.")  # noqa: TRY003
+        name: str,
+        destination: Destination,
+        *,
+        unique: bool = True,
+        random_name_suffix: bool = False,
+    ) -> CloudDestination:
+        """Deploy a destination to the workspace.
 
-            source_id = source._deployed_source_id  # noqa: SLF001
+        Returns the newly deployed destination ID.
 
-        elif isinstance(source, str):
-            source_id = source
+        Args:
+            name: The name to use when deploying.
+            destination: The destination object to deploy.
+            unique: Whether to require a unique name. If `True`, duplicate names
+                are not allowed. Defaults to `True`.
+            random_name_suffix: Whether to append a random suffix to the name.
+        """
+        destination_configuration = destination.get_config().copy()
+        destination_configuration["destinationType"] = destination.name.replace("destination-", "")
+
+        if random_name_suffix:
+            name += f" (ID: {text_util.generate_random_suffix()})"
+
+        if unique:
+            existing = self.list_destinations(name=name)
+            if existing:
+                raise exc.AirbyteDuplicateResourcesError(
+                    resource_type="destination",
+                    resource_name=name,
+                )
 
-        delete_source(
-            source_id=source_id,
+        deployed_destination = api_util.create_destination(
+            name=name,
             api_root=self.api_root,
             api_key=self.api_key,
+            workspace_id=self.workspace_id,
+            config=destination_configuration,
+        )
+        return CloudDestination(
+            workspace=self,
+            connector_id=deployed_destination.destination_id,
         )
 
-    # Deploy and delete destinations
-
-    # TODO: Make this a public API
-    # https://github.com/airbytehq/pyairbyte/issues/228
-    def _deploy_cache_as_destination(
+    def permanently_delete_source(
         self,
-        cache: CacheBase,
-    ) -> str:
-        """Deploy a cache to the workspace as a new destination.
+        source: str | CloudSource,
+    ) -> None:
+        """Delete a source from the workspace.
 
-        Returns the newly deployed destination ID.
+        You can pass either the source ID `str` or a deployed `Source` object.
         """
-        cache_type_name = cache.__class__.__name__.replace("Cache", "")
+        if not isinstance(source, str | CloudSource):
+            raise exc.PyAirbyteInputError(
+                message="Invalid source type.",
+                input_value=type(source).__name__,
+            )
 
-        deployed_destination: DestinationResponse = create_destination(
-            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
+        api_util.delete_source(
+            source_id=source.connector_id if isinstance(source, CloudSource) else source,
             api_root=self.api_root,
             api_key=self.api_key,
-            workspace_id=self.workspace_id,
-            config=get_destination_config_from_cache(cache),
         )
 
-        # Set the deployment Ids on the source object
-        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
-        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
-        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
-
-        return deployed_destination.destination_id
+    # Deploy and delete destinations
 
-    def _permanently_delete_destination(
+    def permanently_delete_destination(
         self,
-        *,
-        destination: str | None = None,
-        cache: CacheBase | None = None,
+        destination: str | CloudDestination,
     ) -> None:
         """Delete a deployed destination from the workspace.
 
         You can pass either the `Cache` class or the deployed destination ID as a `str`.
         """
-        if destination is None and cache is None:
-            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
-        if destination is not None and cache is not None:
-            raise ValueError(  # noqa: TRY003
-                "You must provide either a destination ID or a cache object, not both."
+        if not isinstance(destination, str | CloudDestination):
+            raise exc.PyAirbyteInputError(
+                message="Invalid destination type.",
+                input_value=type(destination).__name__,
             )
 
-        if cache:
-            if not cache._deployed_destination_id:  # noqa: SLF001
-                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
-
-            destination = cache._deployed_destination_id  # noqa: SLF001
-
-        if destination is None:
-            raise ValueError("No destination ID provided.")  # noqa: TRY003
-
-        delete_destination(
-            destination_id=destination,
+        api_util.delete_destination(
+            destination_id=(
+                destination if isinstance(destination, str) else destination.destination_id
+            ),
             api_root=self.api_root,
             api_key=self.api_key,
         )
 
     # Deploy and delete connections
 
-    # TODO: Make this a public API
-    # https://github.com/airbytehq/pyairbyte/issues/228
-    def _deploy_connection(
+    def deploy_connection(
         self,
-        source: Source | str,
-        cache: CacheBase | None = None,
-        destination: str | None = None,
+        connection_name: str,
+        *,
+        source: CloudSource | str,
+        destination: CloudDestination | str,
         table_prefix: str | None = None,
         selected_streams: list[str] | None = None,
     ) -> CloudConnection:
-        """Deploy a source and cache to the workspace as a new connection.
+        """Create a new connection between an already deployed source and destination.
 
-        Returns the newly deployed connection ID as a `str`.
+        Returns the newly deployed connection object.
 
         Args:
-            source (Source | str): The source to deploy. You can pass either an already deployed
-                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
-                it will be deployed automatically.
-            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
-                `cache` or `destination`, but not both.
-            destination (str, optional): The destination ID to use. You can provide
-                `cache` or `destination`, but not both.
-            table_prefix (str, optional): The table prefix to use for the cache. If not provided,
-                the cache's table prefix will be used.
-            selected_streams (list[str], optional): The selected stream names to use for the
-                connection. If not provided, the source's selected streams will be used.
+            connection_name: The name of the connection.
+            source: The deployed source. You can pass a source ID or a CloudSource object.
+            destination: The deployed destination. You can pass a destination ID or a
+                CloudDestination object.
+            table_prefix: Optional. The table prefix to use when syncing to the destination.
+            selected_streams: The selected stream names to sync within the connection.
         """
+        if not selected_streams:
+            raise exc.PyAirbyteInputError(
+                guidance="You must provide `selected_streams` when creating a connection."
+            )
+
         # Resolve source ID
         source_id: str
-        if isinstance(source, Source):
-            selected_streams = selected_streams or source.get_selected_streams()
-            source_id = (
-                source._deployed_source_id  # noqa: SLF001  # Access to non-public API
-                or self._deploy_source(source)
-            )
-        else:
-            source_id = source
-            if not selected_streams:
-                raise exc.PyAirbyteInputError(
-                    guidance="You must provide `selected_streams` when deploying a source ID."
-                )
+        source_id = source if isinstance(source, str) else source.connector_id
 
-        # Resolve destination ID
-        destination_id: str
-        if destination:
-            destination_id = destination
-        elif cache:
-            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
-            if not cache._deployed_destination_id:  # noqa: SLF001
-                destination_id = self._deploy_cache_as_destination(cache)
-            else:
-                destination_id = cache._deployed_destination_id  # noqa: SLF001
-        else:
+        # destination is already deployed
+        destination_id = destination if isinstance(destination, str) else destination.connector_id
+        if not selected_streams:
             raise exc.PyAirbyteInputError(
-                guidance="You must provide either a destination ID or a cache object."
+                guidance=(
+                    "You must provide `selected_streams` when creating a connection "
+                    "from an existing destination."
+                )
             )
 
-        assert source_id is not None
-        assert destination_id is not None
-
-        deployed_connection = create_connection(
-            name="Connection (Deployed by PyAirbyte)",
+        deployed_connection = api_util.create_connection(
+            name=connection_name,
             source_id=source_id,
             destination_id=destination_id,
             api_root=self.api_root,
@@ -255,15 +246,6 @@ def _deploy_connection(
             prefix=table_prefix or "",
         )
 
-        if isinstance(source, Source):
-            source._deployed_api_root = self.api_root  # noqa: SLF001
-            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
-            source._deployed_source_id = source_id  # noqa: SLF001
-        if cache:
-            cache._deployed_api_root = self.api_root  # noqa: SLF001
-            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
-            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
-
         return CloudConnection(
             workspace=self,
             connection_id=deployed_connection.connection_id,
@@ -285,7 +267,7 @@ def get_connection(
             connection_id=connection_id,
         )
 
-    def _permanently_delete_connection(
+    def permanently_delete_connection(
         self,
         connection: str | CloudConnection,
         *,
@@ -302,17 +284,17 @@ def _permanently_delete_connection(
                 connection_id=connection,
             )
 
-        delete_connection(
+        api_util.delete_connection(
             connection_id=connection.connection_id,
             api_root=self.api_root,
             api_key=self.api_key,
             workspace_id=self.workspace_id,
         )
         if delete_source:
-            self._permanently_delete_source(source=connection.source_id)
+            self.permanently_delete_source(source=connection.source_id)
 
         if delete_destination:
-            self._permanently_delete_destination(destination=connection.destination_id)
+            self.permanently_delete_destination(destination=connection.destination_id)
 
     # Run syncs
 
@@ -380,3 +362,76 @@ def get_previous_sync_logs(
         return connection.get_previous_sync_logs(
             limit=limit,
         )
+
+    # List sources, destinations, and connections
+
+    def list_connections(
+        self,
+        name: str | None = None,
+        *,
+        name_filter: Callable | None = None,
+    ) -> list[CloudConnection]:
+        """List connections by name in the workspace."""
+        connections = api_util.list_connections(
+            api_root=self.api_root,
+            api_key=self.api_key,
+            workspace_id=self.workspace_id,
+            name=name,
+            name_filter=name_filter,
+        )
+        return [
+            CloudConnection(
+                workspace=self,
+                connection_id=connection.connection_id,
+                source=None,
+                destination=None,
+            )
+            for connection in connections
+            if name is None or connection.name == name
+        ]
+
+    def list_sources(
+        self,
+        name: str | None = None,
+        *,
+        name_filter: Callable | None = None,
+    ) -> list[CloudSource]:
+        """List all sources in the workspace."""
+        sources = api_util.list_sources(
+            api_root=self.api_root,
+            api_key=self.api_key,
+            workspace_id=self.workspace_id,
+            name=name,
+            name_filter=name_filter,
+        )
+        return [
+            CloudSource(
+                workspace=self,
+                connector_id=source.source_id,
+            )
+            for source in sources
+            if name is None or source.name == name
+        ]
+
+    def list_destinations(
+        self,
+        name: str | None = None,
+        *,
+        name_filter: Callable | None = None,
+    ) -> list[CloudDestination]:
+        """List all destinations in the workspace."""
+        destinations = api_util.list_destinations(
+            api_root=self.api_root,
+            api_key=self.api_key,
+            workspace_id=self.workspace_id,
+            name=name,
+            name_filter=name_filter,
+        )
+        return [
+            CloudDestination(
+                workspace=self,
+                connector_id=destination.destination_id,
+            )
+            for destination in destinations
+            if name is None or destination.name == name
+        ]
diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py
index 0713bb64..87159d59 100644
--- a/airbyte/exceptions.py
+++ b/airbyte/exceptions.py
@@ -486,6 +486,15 @@ class AirbyteMissingResourceError(AirbyteError):
     resource_name_or_id: str | None = None
 
 
+@dataclass
+class AirbyteDuplicateResourcesError(AirbyteError):
+    """Process failed because resource name was not unique."""
+
+    resource_type: str | None = None
+    resource_name: str | None = None
+
+
+# Custom Warnings
 @dataclass
 class AirbyteMultipleResourcesError(AirbyteError):
     """Could not locate the resource because multiple matching resources were found."""
diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py
index aeffc8ed..564215f4 100644
--- a/airbyte/sources/base.py
+++ b/airbyte/sources/base.py
@@ -51,7 +51,7 @@
 class Source(ConnectorBase):
     """A class representing a source that can be called."""
 
-    connector_type: Literal["source"] = "source"
+    connector_type = "source"
 
     def __init__(
         self,
@@ -84,10 +84,6 @@ def __init__(
         if streams is not None:
             self.select_streams(streams)
 
-        self._deployed_api_root: str | None = None
-        self._deployed_workspace_id: str | None = None
-        self._deployed_source_id: str | None = None
-
     def set_streams(self, streams: list[str]) -> None:
         """Deprecated. See select_streams()."""
         warnings.warn(