Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "ENSURE: empty page must be defined as empty type" #2504

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions LiteDB/Engine/Services/SnapShot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,18 @@ public void Dispose()
#region Page Version functions

/// <summary>
/// Get a a valid page for this snapshot (must consider local-index and wal-index)
/// Get a valid page for this snapshot (must consider local-index and wal-index)
/// </summary>
public T GetPage<T>(uint pageID)
public T GetPage<T>(uint pageID, bool useLatestVersion = false)
where T : BasePage
{
return this.GetPage<T>(pageID, out var origin, out var position, out var walVersion);
return this.GetPage<T>(pageID, out var origin, out var position, out var walVersion, useLatestVersion);
}

/// <summary>
/// Get a a valid page for this snapshot (must consider local-index and wal-index)
/// Get a valid page for this snapshot (must consider local-index and wal-index)
/// </summary>
public T GetPage<T>(uint pageID, out FileOrigin origin, out long position, out int walVersion)
public T GetPage<T>(uint pageID, out FileOrigin origin, out long position, out int walVersion, bool useLatestVersion = false)
where T : BasePage
{
ENSURE(!_disposed, "the snapshot is disposed");
Expand Down Expand Up @@ -198,7 +198,7 @@ public T GetPage<T>(uint pageID, out FileOrigin origin, out long position, out i
}

// if page is not in local cache, get from disk (log/wal/data)
page = this.ReadPage<T>(pageID, out origin, out position, out walVersion);
page = this.ReadPage<T>(pageID, out origin, out position, out walVersion, useLatestVersion);

// add into local pages
_localPages[pageID] = page;
Expand All @@ -212,7 +212,7 @@ public T GetPage<T>(uint pageID, out FileOrigin origin, out long position, out i
/// <summary>
/// Read page from disk (dirty, wal or data)
/// </summary>
private T ReadPage<T>(uint pageID, out FileOrigin origin, out long position, out int walVersion)
private T ReadPage<T>(uint pageID, out FileOrigin origin, out long position, out int walVersion, bool useLatestVersion = false)
where T : BasePage
{
// if not inside local pages can be a dirty page saved in log file
Expand All @@ -232,7 +232,7 @@ private T ReadPage<T>(uint pageID, out FileOrigin origin, out long position, out
}

// now, look inside wal-index
var pos = _walIndex.GetPageIndex(pageID, _readVersion, out walVersion);
var pos = _walIndex.GetPageIndex(pageID, useLatestVersion ? int.MaxValue : _readVersion, out walVersion);

if (pos != long.MaxValue)
{
Expand Down Expand Up @@ -352,7 +352,7 @@ public T NewPage<T>()
// try get page from Empty free list
if (_header.FreeEmptyPageList != uint.MaxValue)
{
var free = this.GetPage<BasePage>(_header.FreeEmptyPageList);
var free = this.GetPage<BasePage>(_header.FreeEmptyPageList, useLatestVersion: true);

ENSURE(free.PageType == PageType.Empty, "empty page must be defined as empty type");

Expand All @@ -375,11 +375,21 @@ public T NewPage<T>()

if (newLength > _header.Pragmas.LimitSize) throw new LiteException(0, $"Maximum data file size has been reached: {FileHelper.FormatFileSize(_header.Pragmas.LimitSize)}");

// increase LastPageID from shared page
pageID = ++_header.LastPageID;

// request for a new buffer
buffer = _reader.NewPage();
var savepoint = _header.Savepoint();
try
{
// increase LastPageID from shared page
pageID = ++_header.LastPageID;

// request for a new buffer
buffer = _reader.NewPage();
}
catch
{
// must revert all header content if any error occurs during header change
_header.Restore(savepoint);
throw;
}
}

// retain a list of created pages to, in a rollback situation, back pages to empty list
Expand Down
45 changes: 22 additions & 23 deletions LiteDB/Engine/Services/TransactionService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,29 +205,25 @@ IEnumerable<PageBuffer> source()
// in commit with header page change, last page will be header
if (commit && _transPages.HeaderChanged)
{
lock(_header)
{
// update this confirm page with current transactionID
_header.TransactionID = _transactionID;
// update this confirm page with current transactionID
_header.TransactionID = _transactionID;

// this header page will be marked as confirmed page in log file
_header.IsConfirmed = true;
// this header page will be marked as confirmed page in log file
_header.IsConfirmed = true;

// invoke all header callbacks (new/drop collections)
_transPages.OnCommit(_header);
// invoke all header callbacks (new/drop collections)
_transPages.OnCommit(_header);

// clone header page
var buffer = _header.UpdateBuffer();
var clone = _disk.NewPage();
// clone header page
var buffer = _header.UpdateBuffer();
var clone = _disk.NewPage();

// mem copy from current header to new header clone
Buffer.BlockCopy(buffer.Array, buffer.Offset, clone.Array, clone.Offset, clone.Count);
// mem copy from current header to new header clone
Buffer.BlockCopy(buffer.Array, buffer.Offset, clone.Array, clone.Offset, clone.Count);

// persist header in log file
yield return clone;
}
// persist header in log file
yield return clone;
}

};

// write all dirty pages, in sequence on log-file and store references into log pages on transPages
Expand Down Expand Up @@ -256,13 +252,16 @@ public void Commit()

if (_mode == LockMode.Write || _transPages.HeaderChanged)
{
// persist all dirty page as commit mode (mark last page as IsConfirm)
var count = this.PersistDirtyPages(true);

// update wal-index (if any page was added into log disk)
if(count > 0)
lock (_header)
{
_walIndex.ConfirmTransaction(_transactionID, _transPages.DirtyPages.Values);
// persist all dirty page as commit mode (mark last page as IsConfirm)
var count = this.PersistDirtyPages(true);

// update wal-index (if any page was added into log disk)
if (count > 0)
{
_walIndex.ConfirmTransaction(_transactionID, _transPages.DirtyPages.Values);
}
}
}

Expand Down
17 changes: 16 additions & 1 deletion LiteDB/Engine/Services/WalIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,22 @@ public WalIndexService(DiskService disk, LockService locker)
/// <summary>
/// Get current read version for all new transactions
/// </summary>
public int CurrentReadVersion => _currentReadVersion;
public int CurrentReadVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the lock in this property? It's read-only so it shouldn't matter how many threads access it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property itself is read-only, but its backing field is updatable. As I wrote in the comment above:

I made WalIndexService.CurrentReadVersion synchronized. Its backing field (_currentReadVersion) is accessed (read and modified) in the same class in Clear() and ConfirmTransaction() functions. These functions are atomic by using _indexLock. Especially ConfirmTransaction is interesting, because it first increases _currentReadVersion and then adds some data based on this increased value to the WAL index. So it seems unsafe to be able to access WalIndexService.CurrentReadVersion property while ConfirmTransaction() is still being executed.

{
get
{
_indexLock.TryEnterReadLock(-1);

try
{
return _currentReadVersion;
}
finally
{
_indexLock.ExitReadLock();
}
}
}

/// <summary>
/// Get current counter for transaction ID
Expand Down