Skip to content

Commit

Permalink
Jbl/#89 czireader concurrency (#90)
Browse files Browse the repository at this point in the history
* clang-tidy fixes

* cosmetic

* initial steps

* update

* update

* fix

* cosmetic

* cosmetic

* bump version

* add to change-log, cosmetic

* review
  • Loading branch information
ptahmose authored Dec 22, 2023
1 parent 2539ce5 commit e6baa05
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 21 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.15)
cmake_policy(SET CMP0091 NEW) # enable new "MSVC runtime library selection" (https://cmake.org/cmake/help/latest/variable/CMAKE_MSVC_RUNTIME_LIBRARY.html)

project(libCZI
VERSION 0.57.1
VERSION 0.57.2
HOMEPAGE_URL "https://github.com/ZEISS/libczi"
DESCRIPTION "libCZI is an Open Source Cross-Platform C++ library to read and write CZI")

Expand Down
66 changes: 53 additions & 13 deletions Src/libCZI/CZIReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ CCZIReader::CCZIReader() : isOperational(false)
{
}

CCZIReader::~CCZIReader()
{
}

/*virtual */void CCZIReader::Open(const std::shared_ptr<libCZI::IStream>& stream, const ICZIReader::OpenOptions* options)
{
if (this->isOperational == true)
Expand All @@ -48,7 +44,7 @@ CCZIReader::~CCZIReader()

if (options == nullptr)
{
const auto default_options = OpenOptions{};
constexpr auto default_options = OpenOptions{};
return CCZIReader::Open(stream, &default_options);
}

Expand Down Expand Up @@ -171,7 +167,7 @@ CCZIReader::~CCZIReader()
CCziSubBlockDirectory::SubBlkEntry entry;
if (this->subBlkDir.TryGetSubBlock(index, entry) == false)
{
return std::shared_ptr<ISubBlock>();
return {};
}

return this->ReadSubBlock(entry);
Expand Down Expand Up @@ -249,6 +245,12 @@ CCZIReader::~CCZIReader()
{
this->ThrowIfNotOperational();
this->SetOperationalState(false);

// We need to have a critical-section around modifying the stream-shared_ptr - there may be concurrent calls to ReadSubBlock, ReadAttachment, etc.
// in which the stream-shared_ptr is accessed. While the stream-shared_ptr is thread-safe, it is not thread-safe to reset it while another thread
// is dealing with the same shared_ptr. C.f. https://stackoverflow.com/questions/14482830/stdshared-ptr-thread-safety. With C++20 we could use
// atomic<shared_ptr> instead of the manual critical-section (c.f. https://en.cppreference.com/w/cpp/memory/shared_ptr/atomic2).
std::unique_lock<std::mutex> lock(this->stream_mutex_);
this->stream.reset();
}

Expand Down Expand Up @@ -298,17 +300,31 @@ CCZIReader::~CCZIReader()
CCziAttachmentsDirectory::AttachmentEntry entry;
if (this->attachmentDir.TryGetAttachment(index, entry) == false)
{
return std::shared_ptr<IAttachment>();
return {};
}

return this->ReadAttachment(entry);
}

std::shared_ptr<ISubBlock> CCZIReader::ReadSubBlock(const CCziSubBlockDirectory::SubBlkEntry& entry)
{
CCZIParse::SubBlockStorageAllocate allocateInfo{ malloc,free };
const CCZIParse::SubBlockStorageAllocate allocateInfo{ malloc,free };

auto subBlkData = CCZIParse::ReadSubBlock(this->stream.get(), entry.FilePosition, allocateInfo);
// For thread-safety, we need to ensure that we hold a reference to the stream for the whole duration of the call,
// in order to prepare for concurrent calls to Close() (which will reset the stream-shared_ptr).
shared_ptr<libCZI::IStream> stream_reference;

{
unique_lock<mutex> lock(this->stream_mutex_);
stream_reference = this->stream;
}

if (!stream_reference)
{
throw logic_error("CZIReader::ReadSubBlock: stream is null (Close was already called for this instance)");
}

auto subBlkData = CCZIParse::ReadSubBlock(stream_reference.get(), entry.FilePosition, allocateInfo);

libCZI::SubBlockInfo info;
info.pixelType = CziUtils::PixelTypeFromInt(subBlkData.pixelType);
Expand All @@ -324,9 +340,21 @@ std::shared_ptr<ISubBlock> CCZIReader::ReadSubBlock(const CCziSubBlockDirectory:

std::shared_ptr<libCZI::IAttachment> CCZIReader::ReadAttachment(const CCziAttachmentsDirectory::AttachmentEntry& entry)
{
CCZIParse::SubBlockStorageAllocate allocateInfo{ malloc,free };
const CCZIParse::SubBlockStorageAllocate allocateInfo{ malloc,free };

shared_ptr<libCZI::IStream> stream_reference;

auto attchmnt = CCZIParse::ReadAttachment(this->stream.get(), entry.FilePosition, allocateInfo);
{
unique_lock<mutex> lock(this->stream_mutex_);
stream_reference = this->stream;
}

if (!stream_reference)
{
throw logic_error("CCZIReader::ReadAttachment: stream is null (Close was already called for this instance)");
}

auto attchmnt = CCZIParse::ReadAttachment(stream_reference.get(), entry.FilePosition, allocateInfo);
libCZI::AttachmentInfo attchmentInfo;
attchmentInfo.contentGuid = entry.ContentGuid;
static_assert(sizeof(attchmentInfo.contentFileType) > sizeof(entry.ContentFileType), "sizeof(attchmentInfo.contentFileType) must be greater than sizeof(entry.ContentFileType)");
Expand All @@ -339,9 +367,21 @@ std::shared_ptr<libCZI::IAttachment> CCZIReader::ReadAttachment(const CCziAttach

std::shared_ptr<libCZI::IMetadataSegment> CCZIReader::ReadMetadataSegment(std::uint64_t position)
{
CCZIParse::SubBlockStorageAllocate allocateInfo{ malloc,free };
const CCZIParse::SubBlockStorageAllocate allocateInfo{ malloc,free };

shared_ptr<libCZI::IStream> stream_reference;

{
unique_lock<mutex> lock(this->stream_mutex_);
stream_reference = this->stream;
}

if (!stream_reference)
{
throw logic_error("CCZIReader::ReadAttachment: stream is null (Close was already called for this instance)");
}

auto metaDataSegmentData = CCZIParse::ReadMetadataSegment(this->stream.get(), position, allocateInfo);
auto metaDataSegmentData = CCZIParse::ReadMetadataSegment(stream_reference.get(), position, allocateInfo);
return std::make_shared<CCziMetadataSegment>(metaDataSegmentData, free);
}

Expand Down
4 changes: 3 additions & 1 deletion Src/libCZI/CZIReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <functional>
#include <memory>
#include <mutex>
#include "libCZI.h"
#include "CziSubBlockDirectory.h"
#include "CziAttachmentsDirectory.h"
Expand All @@ -15,13 +16,14 @@ class CCZIReader : public libCZI::ICZIReader, public std::enable_shared_from_thi
{
private:
std::shared_ptr<libCZI::IStream> stream;
std::mutex stream_mutex_; ///< Mutex to protect access to the stream-object.
CFileHeaderSegmentData hdrSegmentData;
CCziSubBlockDirectory subBlkDir;
CCziAttachmentsDirectory attachmentDir;
bool isOperational; ///< If true, then stream, hdrSegmentData and subBlkDir can be considered valid and operational
public:
CCZIReader();
~CCZIReader() override;
~CCZIReader() override = default;

// interface ISubBlockRepository
void EnumerateSubBlocks(const std::function<bool(int index, const libCZI::SubBlockInfo& info)>& funcEnum) override;
Expand Down
3 changes: 2 additions & 1 deletion Src/libCZI/Doc/version-history.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ version history {#version_history}
0.55.1 | [80](https://github.com/ZEISS/libczi/pull/80) | bugfix for above optimization
0.56.0 | [82](https://github.com/ZEISS/libczi/pull/82) | add option "kCurlHttp_CaInfo" & "kCurlHttp_CaInfoBlob", allow to retrieve properties from a stream-class
0.57.0 | [84](https://github.com/ZEISS/libczi/pull/84) | add caching for accessors, update CLI11 to version 2.3.2
0.57.1 | [86](https://github.com/ZEISS/libczi/pull/86) | small improvement for CMake-build: allow to use an apt-provided CURL-package
0.57.1 | [86](https://github.com/ZEISS/libczi/pull/86) | small improvement for CMake-build: allow to use an apt-provided CURL-package
0.57.2 | [90](https://github.com/ZEISS/libczi/pull/90) | improve thread-safety of CziReader
7 changes: 5 additions & 2 deletions Src/libCZI/libCZI.h
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ namespace libCZI
};

/// This interface is used to represent the CZI-file.
/// A note on thread-safety - all methods of this interface may be called from multiple threads concurrently.
class LIBCZI_API ICZIReader : public ISubBlockRepository, public ISubBlockRepositoryEx, public IAttachmentRepository
{
public:
Expand Down Expand Up @@ -707,8 +708,10 @@ namespace libCZI

/// Closes CZI-reader. The underlying stream-object will be released, and further calls to
/// other methods will fail. The stream is also closed when the object is destroyed, so it
/// is usually not necessary to explicitly call `Close`. Also, take care that the ownership of
/// the class must be defined when calling `Close`.
/// is usually not necessary to explicitly call `Close`. Note that the stream is not closed
/// immediately (or - there is no guarantee that on return from this call all references to the
/// stream object are released). Concurrently executing operations continue to use the stream
/// and keep it referenced until they are finished.
virtual void Close() = 0;
public:
/// Creates a single channel tile accessor.
Expand Down
104 changes: 101 additions & 3 deletions Src/libCZI_UnitTests/test_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

#include "include_gtest.h"
#include "inc_libCZI.h"
#include "MemInputOutputStream.h"
#include "MemOutputStream.h"
#include <array>
#include <thread>

using namespace libCZI;
using namespace std;

TEST(DimCoordinate, ReaderException)
TEST(CziReader, ReaderException)
{
class MyException : public std::exception
{
Expand Down Expand Up @@ -74,7 +78,7 @@ TEST(DimCoordinate, ReaderException)
EXPECT_TRUE(exceptionCorrect) << "Incorrect result";
}

TEST(DimCoordinate, ReaderException2)
TEST(CziReader, ReaderException2)
{
class CTestStreamImp :public libCZI::IStream
{
Expand Down Expand Up @@ -114,7 +118,7 @@ TEST(DimCoordinate, ReaderException2)
EXPECT_TRUE(exceptionCorrect) << "Incorrect result";
}

TEST(DimCoordinate, ReaderStateException)
TEST(CziReader, ReaderStateException)
{
bool expectedExceptionCaught = false;
auto spReader = libCZI::CreateCZIReader();
Expand All @@ -129,3 +133,97 @@ TEST(DimCoordinate, ReaderStateException)

EXPECT_TRUE(expectedExceptionCaught) << "Incorrect behavior";
}

static tuple<shared_ptr<void>, size_t> CreateTestCzi()
{
const auto writer = CreateCZIWriter();
const auto outStream = make_shared<CMemOutputStream>(0);

const auto spWriterInfo = make_shared<CCziWriterInfo>(
GUID{ 0,0,0,{ 0,0,0,0,0,0,0,0 } },
CDimBounds{ { DimensionIndex::T, 0, 1 }, { DimensionIndex::C, 0, 1 } });

writer->Create(outStream, spWriterInfo);

int count = 0;
for (count = 0; count < 10; ++count)
{
++count;
const size_t size_of_bitmap = 100 * 100;
unique_ptr<uint8_t[]> bitmap(new uint8_t[size_of_bitmap]);
memset(bitmap.get(), count, size_of_bitmap);
AddSubBlockInfoStridedBitmap addSbBlkInfo;
addSbBlkInfo.Clear();
addSbBlkInfo.coordinate.Set(DimensionIndex::C, 0);
addSbBlkInfo.coordinate.Set(DimensionIndex::T, 0);
addSbBlkInfo.mIndexValid = true;
addSbBlkInfo.mIndex = count;
addSbBlkInfo.x = 0;
addSbBlkInfo.y = 0;
addSbBlkInfo.logicalWidth = 100;
addSbBlkInfo.logicalHeight = 100;
addSbBlkInfo.physicalWidth = 100;
addSbBlkInfo.physicalHeight = 100;
addSbBlkInfo.PixelType = PixelType::Gray8;
addSbBlkInfo.ptrBitmap = bitmap.get();
addSbBlkInfo.strideBitmap = 100;
writer->SyncAddSubBlock(addSbBlkInfo);
}

const auto metaDataBuilder = writer->GetPreparedMetadata(PrepareMetadataInfo{});

WriteMetadataInfo write_metadata_info;
const auto& strMetadata = metaDataBuilder->GetXml();
write_metadata_info.szMetadata = strMetadata.c_str();
write_metadata_info.szMetadataSize = strMetadata.size() + 1;
write_metadata_info.ptrAttachment = nullptr;
write_metadata_info.attachmentSize = 0;
writer->SyncWriteMetadata(write_metadata_info);

writer->Close();

return make_tuple(outStream->GetCopy(nullptr), outStream->GetDataSize());
}

TEST(CziReader, Concurrency)
{
// arrange
auto czi_document_as_blob = CreateTestCzi();
const auto memory_stream = make_shared<CMemInputOutputStream>(get<0>(czi_document_as_blob).get(), get<1>(czi_document_as_blob));
const auto reader = CreateCZIReader();
reader->Open(memory_stream);

constexpr int numThreads = 5; // Number of threads to create
std::array<thread, numThreads> threads; // Static array to store threads
bool readsubblock_problem_occurred = false;
for (int i = 0; i < 5; ++i)
{
threads[i] = thread([reader, i, &readsubblock_problem_occurred]()
{
try
{
reader->ReadSubBlock(i);
}
catch (logic_error&)
{
// Depending on the timing, we expect that either the operations succeeds (if the ReadSubBlock() call
// happens before the Close() call) or that it fails (if the ReadSubBlock() call happens after the Close() call).
// In the latter case, we expect a logic_error exception. Everything else is considered a problem.
return;
}
catch (...)
{
readsubblock_problem_occurred = true;
}
});
}

reader->Close();

for (thread& thread : threads)
{
thread.join();
}

EXPECT_FALSE(readsubblock_problem_occurred) << "Incorrect behavior";
}

0 comments on commit e6baa05

Please sign in to comment.