Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction commit raises CockroachDB error #933

Open
imalsogreg opened this issue Dec 25, 2020 · 6 comments
Open

Transaction commit raises CockroachDB error #933

imalsogreg opened this issue Dec 25, 2020 · 6 comments
Labels
bug:db Involves a bug in the database server bug db:other DBs that are not explicitly supported but may work db:postgres Related to PostgreSQL

Comments

@imalsogreg
Copy link

Hello! I've been using sqlx to talk to CockroachDB databases and recently started an upgrade from sqlx-0.3.5 to sqlx-0.4.2. After upgrading, many tests began to fail with

PgDatabaseError {
  severity: Error, code: "0A000",
  message: "unimplemented: multiple active portals not supported",
  detail: None,
  hint: Some("You have attempted to use a feature that is not yet 
    implemented.\nSee: https://go.crdb.dev/issue-v/40195/v20.1"),
  position: None, 
  where: None, 
  schema: None, 
  table: None, 
  column: None, 
  data_type: None, 
  constraint: None, 
  file: Some("distsql_running.go"), 
  line: Some(775), 
  routine: Some("init") 
}

I came up with a minimal reproducing test case that does a SELECT 1. When this query is executed within a transaction, it fails. But if I run the query directly against the postgres connection pool, it succeeds.

    #[tokio::test]
    async fn test_sqlx_transaction() -> Result<(), sqlx::Error> {
        let pool = PgPoolOptions::new()
            .max_connections(5)
            .connect("postgres://root@localhost:26257/api_server")
            .await?;
        let mut txn = pool.begin().await.expect("should get txn");

        let row: (i64,) = sqlx::query_as("SELECT $1")
            .bind::<i64>(1)
            .fetch_one(&mut txn)
            .await
            .expect("query_as error");

        txn.commit().await.expect("txn should commit");

        println!("row.0: {}", row.0);
        Ok(())
    }

Both cases succeed for sqlx-0.3.5.

I was told in the CockroachDB community slack that my database driver is trying to have multiple active result sets (portals) concurrently on the same connection. I'm not sure where to go from there. Thanks for any ideas! I'm happy to work on a patch, given some clues, if you have ideas about what is happening.

@abonander abonander added bug db:other DBs that are not explicitly supported but may work labels Dec 26, 2020
@imalsogreg
Copy link
Author

We discovered that fetch_all within a transaction does not exhibit the bug. The bugged behavior only appears when using fetch_one or fetch_optional, and only within a transaction.

While trying to debug, I was confused by the way that stream writing and message-awaiting depend on the state of the StatementCache: https://github.com/launchbadge/sqlx/blob/master/sqlx-core/src/postgres/connection/executor.rs#L180-L188. insert returns the id of a newly evicted cache line (if there is one), and then the code writes Close for that evicted cache line, and waits for completion signal from the server only if a cache line is evicted. This doesn't seem like desirable behavior though - do I have the right idea?

@lacasaprivata2
Copy link

++++1

@sergeyshaykhullin
Copy link

Any existing workaround?

@imalsogreg
Copy link
Author

@sergeyshaykhullin I've been using the following code. It lets you write .fetch_one_shim(&mut txn) where you would have written .fetch_one(&mut txn), and .fetch_optional_shim(&mut txn) where you would have written .fetch_optional(&mut txn).

The implementation calls .fetch_all(&mut txn), so it's inefficient if your SQL query returns more than one row.

use sqlx::FromRow;

#[async_trait]
/// A trait for adding shim methods to sqlx::QueryAs instances.
/// This is a shim to be used while we work on an upstream fix for sqlx.
/// See https://github.com/launchbadge/sqlx/issues/933 and
/// https://github.com/cockroachdb/cockroach/issues/40195 for more details.
pub trait FetchShim<O> {
    /// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
    /// This will be less efficient than `sqlx::fetch_one`, if the query does not
    /// limit its result rows to 1
    async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
        E: 'e + Executor<'c, Database = Postgres>;

    /// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
    /// This will be less efficient than `sqlx::fetch_one`, if the query does not
    /// limit its result rows to 1
    async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
        E: 'e + Executor<'c, Database = Postgres>;
}

#[async_trait]
impl<'q, O> FetchShim<O> for sqlx::query::QueryAs<'q, sqlx::Postgres, O, PgArguments> {
    async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
        E: 'e + Executor<'c, Database = Postgres>,
    {
        let maybe_rows = self.fetch_all(query).await;
        match maybe_rows {
            Ok(rows) => match rows.into_iter().next() {
                Some(x) => Ok(x),
                None => Err(sqlx::error::Error::RowNotFound),
            },
            Err(y) => Err(y),
        }
    }

    async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
        E: 'e + Executor<'c, Database = Postgres>,
    {
        let maybe_rows = self.fetch_all(query).await;
        match maybe_rows {
            Ok(rows) => Ok(rows.into_iter().next()),
            Err(y) => Err(y),
        }
    }
}

@Lohann
Copy link

Lohann commented Jul 18, 2022

Thx @imalsogreg it worked! btw follow a more generic implementation that also includes Map and works with query_as! macro:

#[async_trait]
/// A trait for adding shim methods to sqlx::QueryAs instances.
/// This is a shim to be used while we work on an upstream fix for sqlx.
/// See https://github.com/launchbadge/sqlx/issues/933 and
/// https://github.com/cockroachdb/cockroach/issues/40195 for more details.
pub trait FetchShim<DB, O>
where
	DB: Database,
{
	/// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
	/// This will be less efficient than `sqlx::fetch_one`, if the query does not
	/// limit its result rows to 1
	async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
	where
		O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
		E: 'e + Executor<'c, Database = DB>;

	/// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
	/// This will be less efficient than `sqlx::fetch_one`, if the query does not
	/// limit its result rows to 1
	async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
	where
		O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
		E: 'e + Executor<'c, Database = DB>;
}

#[async_trait]
impl<'q, DB, O, A> FetchShim<DB, O> for sqlx::query::QueryAs<'q, DB, O, A>
where
	DB: sqlx::Database,
	A: 'q + sqlx::IntoArguments<'q, DB>,
{
	async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
	where
		DB: sqlx::Database,
		O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
		E: 'e + sqlx::Executor<'c, Database = DB>,
	{
		self.fetch_optional_shim(query).await?.ok_or(Error::RowNotFound)
	}

	async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
	where
		O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
		E: 'e + sqlx::Executor<'c, Database = DB>,
	{
		Ok(self.fetch_all(query).await?.into_iter().next())
	}
}

#[async_trait]
impl<'q, DB, F, O, A> FetchShim<DB, O> for sqlx::query::Map<'q, DB, F, A>
where
	DB: sqlx::Database,
	F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
	O: Send + Unpin,
	A: 'q + Send + sqlx::IntoArguments<'q, DB>,
{
	async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
	where
		O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
		E: 'e + sqlx::Executor<'c, Database = DB>,
	{
		self.fetch_optional_shim(query).await?.ok_or(Error::RowNotFound)
	}

	async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
	where
		O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
		E: 'e + sqlx::Executor<'c, Database = DB>,
	{
		Ok(self.fetch_all(query).await?.into_iter().next())
	}
}

@abonander
Copy link
Collaborator

abonander commented Jul 19, 2022

I believe this is a bug in CockroachDB, not SQLx. We never use anything but the unnamed portal, which should be replaced every time we send a Bind message or closed at the end of a transaction.

Trying to find the error message in the source didn't yield any code results, but did give me this commit: cockroachdb/cockroach@0ce4443

So it appears that this will likely be fixed in the next release of CockroachDB.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug:db Involves a bug in the database server bug db:other DBs that are not explicitly supported but may work db:postgres Related to PostgreSQL
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants