Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLite Code 5 Database Is Locked #451

Closed
apexys opened this issue Jun 25, 2020 · 9 comments
Closed

SQLite Code 5 Database Is Locked #451

apexys opened this issue Jun 25, 2020 · 9 comments

Comments

@apexys
Copy link

apexys commented Jun 25, 2020

After writing #326 (comment) I tried to build a sort of minimum shareable example that would reproduce the problem.

The problem itself is that when I try to simultaneously read and write to the database, I don't just get varying replies, but I actually get the result, that the database is locked.

I would expect the database to just, well, wait a bit longer if it is currently locked (maybe something I could check for with the timeout function from tokio), but I just get an SQLite error.

I'm currently using sqlx straight from the master branch:
https://github.com/apexys/sqlx_locked_mvp/blob/master/Cargo.toml

sqlx = {git="https://github.com/launchbadge/sqlx", branch="master", default-features = false, features=["runtime-tokio", "sqlite", "chrono", "json"]}

In the database code, I create a pool, a table and an index
https://github.com/apexys/sqlx_locked_mvp/blob/master/src/main.rs

    if std::path::PathBuf::from("cache.db").exists(){
        println!("Removing old database");
        std::fs::remove_file("cache.db");
    }
    println!("Creating database");
    let pool = SqlitePool::builder().max_size(num_cpus::get() as u32).build("sqlite:cache.db").await.unwrap();    
    //Table for the cache
    query("CREATE TABLE IF NOT EXISTS data (source TEXT, data BLOB)").execute(&pool).await.unwrap();
    query("CREATE INDEX IF NOT EXISTS data_index on data(source)").execute(&pool).await.unwrap();

and then start two tokio processes each for inserting and requesting data

    //start process for inserts
    for i in 0 .. 2{
        let insert_pool = pool.clone();
        tokio::spawn(async {timeout(Duration::from_secs(10), async{insert(insert_pool).await}).await});
    }

    //start process for requests
    for i in 0 .. 2{
        let insert_pool = pool.clone();
        tokio::spawn(async {timeout(Duration::from_secs(10), async{request(insert_pool).await}).await});
    }

the inserts are just random data under a string key

async fn insert(pool: SqlitePool){
    let mut ctr:i32 = 0;
    loop{
        let mut data = vec![0u8; 4096];
        thread_rng().try_fill(&mut *data).unwrap();
        let title = ctr.to_string();
        let result = sqlx::query("INSERT INTO data VALUES (?, ?)")
        .bind(&title)
        .bind(&data)
        .execute(&pool)
        .await;
        match result {
            Ok(_) => eprintln!("Inserting packet {}: OK", &title),
            Err(e) => eprintln!("Inserting packet {}: OErr({:?})", &title, e)
        }
        delay_for(Duration::from_millis(10)).await;
        ctr += 1;
    }
}

and the requests are basically the same

async fn request(pool: SqlitePool){
    let mut ctr:i32 = 0;
    loop{
        let title = ctr.to_string();
        let mut cursor = sqlx::query("SELECT data FROM data WHERE source = ?")
        .bind(&title)
        .fetch(&pool);

        match cursor.next().await{
            Some(Ok(row)) => {let data: &[u8] = row.get("data"); eprintln!("Fetching {} OK, got {} bytes", &title, data.len())},
            Some(Err(err)) => eprintln!("Fetching {} failed, got error {:?}", &title, err),
            None => eprintln!("Fetching {} returned None", &title)
        }
        delay_for(Duration::from_millis(10)).await;
        ctr += 1;
    }
}

Naively, I would expect this code to not run into an error. The database is brand new, and even if no request ever returns anything but none I would be happy.

Instead, my log looks like this:

Running `target\debug\sqlx_mvp.exe`
Fetching 0 OK, got 4096 bytes
Fetching 0 OK, got 4096 bytes
Inserting packet 0: OK
Fetching 1 OK, got 4096 bytes
Fetching 1 OK, got 4096 bytes
Fetching 2 OK, got 4096 bytes
Inserting packet 0: OK
Fetching 3 OK, got 4096 bytes
Fetching 2 OK, got 4096 bytes
Inserting packet 1: OK
Fetching 4 OK, got 4096 bytes
Inserting packet 2: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 3: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 4: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 5: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 6: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 7: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 8: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 9: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 10: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 11: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 12: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 13: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 14: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 15: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 16: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 17: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 18: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 19: OErr(Database(SqliteError { code: 5, message: "database is locked" }))
...

Am I doing something wrong here?

Thanks for your help!

Quick edit: I just noticed that I get a result for packets 1-3 before I even inserted them instead of none. This is probably just the logging being late, right?

@Cocalus
Copy link

Cocalus commented Jun 25, 2020

It's been a while, but I believe you should configure the sqlite database with

# Allows simultaneous reads with one writer
PRAGMA journal_mode = WAL;  

# and/or (I forget if you need both)

# How to wait for the database to not be busy
PRAGMA busy_timeout = milliseconds;

@apexys
Copy link
Author

apexys commented Jun 25, 2020

So from trying this just now, it definitely works with both.

Just busy_timeout gives slower errors (i.e. it waits for the timeout before erroring out, even if I set it to something like 60 seconds).

Just journal_mode=WAL means all the fetches for the data that should be None return None, but every second write fails.

But both work!

Thank you!

@apexys apexys closed this as completed Jun 25, 2020
@apexys
Copy link
Author

apexys commented Jun 25, 2020

Okay, maybe I didn't test that solution enough.
I managed to get some database locked messages by just inserting and fetching the same three packets again and again.

My pragmas for the test were

    query("PRAGMA journal_mode=WAL").execute(&pool).await.unwrap();
    query("PRAGMA busy_timeout=60000").execute(&pool).await.unwrap();

and the modified code is

async fn insert(pool: SqlitePool){
    let mut ctr:i32 = 0;
    loop{
        ctr = ctr % 3;
        let mut data = vec![0u8; 1024 * 1024];
        thread_rng().try_fill(&mut *data).unwrap();
        let title = ctr.to_string();
        let result = sqlx::query("INSERT INTO data VALUES (?, ?)")
        .bind(&title)
        .bind(&data)
        .execute(&pool)
        .await;
        match result {
            Ok(_) => println!("Inserting packet {}: OK", &title),
            Err(e) => println!("Inserting packet {}: Err({:?})", &title, e)
        }
        delay_for(Duration::from_millis(10)).await;
        ctr += 1;
    }
}

with the request function similar.

I updated the git repo with the test code I used.

@apexys apexys reopened this Jun 25, 2020
@apexys
Copy link
Author

apexys commented Jun 25, 2020

This also works with the old packet size of 4096, and throws more errors in that case:

Inserting packet 0: OK
Fetching 1 OK, got 4096 bytes
Fetching 1 OK, got 4096 bytes
Inserting packet 0: Err(Database(SqliteError { code: 517, message: "database is locked" }))
Inserting packet 1: Err(Database(SqliteError { code: 517, message: "database is locked" }))
Fetching 2 returned None
Fetching 2 OK, got 4096 bytes
Inserting packet 2: Err(Database(SqliteError { code: 517, message: "database is locked" }))
Inserting packet 1: OK
Fetching 0 OK, got 4096 bytes
Fetching 0 OK, got 4096 bytes
Inserting packet 0: Err(Database(SqliteError { code: 5, message: "database is locked" }))
Inserting packet 2: OK
Fetching 1 OK, got 4096 bytes
Fetching 1 OK, got 4096 bytes

@Cocalus
Copy link

Cocalus commented Jun 25, 2020

So your data rate may just exceed what can be written to disk, in that case lock timeouts will always happen with multiple threads. You could add back pressure if the timeouts start happening, or just reduce the load to well below what you need. If you DB is on a hard drive instead of an SSD switching to a SSD will drastically raise the amount of data needed to overload it. If you're on MacOS the full fsync's sqlite needs to enforce durability are (or at least were) extra expensive.

Since there can be only a single writer in SQLite I would just send all write requests to a single writer thread/task via a channel with a size limit, then the inserts will get blocked automatically when the channel gets full. With WAL only writers can block each other, so there should be no more busy errors.

Also there are tricks to speed up the writes. Make sure any active write transactions are minimally ideal. Batching into fewer bigger statements/transactions (mixes well with a single writer task). But it'll still get busy if it's over loaded. If you don't need persistence use an in memory DB.

There's a possibility some bug in the Rust libraries is causing a write transaction to get stuck on an await point. That would be harder to prove.

@mehcode
Copy link
Member

mehcode commented Jun 25, 2020

From my experience, Ideally you should use two pools. One for writers that has a max size of 1 and one for readers. SQLx should probably provide this kind of abstraction as well.

@apexys
Copy link
Author

apexys commented Jun 26, 2020

Wait that would work? I mean, opening two pools on the same file?

@apexys
Copy link
Author

apexys commented Jun 26, 2020

Apparently it does. It would be great if SQLx would provide this abstraction. Maybe even something as simple as splitting off one connection for writing from the pool might work. So like marking one of the connections as being able to write and the others only as being able to read?

@mehcode
Copy link
Member

mehcode commented Jun 28, 2020

This would remove the potential to lock and allow a much higher performance than just using 1 pool with a max-size of 1. Opened #459

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants