From 342fc8471b336446315ef1cf258ee32a3f00c9b8 Mon Sep 17 00:00:00 2001 From: Even Rouault Date: Sun, 29 Oct 2023 17:25:17 +0100 Subject: [PATCH] GetNextArrowArray() implementations: automatically adjust batch size of list/string/binary arrays do not saturate their capacity (2 billion elements) --- autotest/ogr/ogr_gpkg.py | 71 +++ autotest/ogr/ogr_mem.py | 174 ++++++ .../flatgeobuf/ogrflatgeobuflayer.cpp | 30 +- .../generic/ograrrowarrayhelper.cpp | 25 +- ogr/ogrsf_frmts/generic/ograrrowarrayhelper.h | 2 + ogr/ogrsf_frmts/generic/ogrlayerarrow.cpp | 518 +++++++++++------- ogr/ogrsf_frmts/gpkg/ogr_geopackage.h | 5 +- ogr/ogrsf_frmts/gpkg/ogrgeopackagelayer.cpp | 49 +- .../gpkg/ogrgeopackagetablelayer.cpp | 151 ++++- ogr/ogrsf_frmts/ogrsf_frmts.h | 2 + 10 files changed, 814 insertions(+), 213 deletions(-) diff --git a/autotest/ogr/ogr_gpkg.py b/autotest/ogr/ogr_gpkg.py index 22ab0732f05b..d7110630fd74 100755 --- a/autotest/ogr/ogr_gpkg.py +++ b/autotest/ogr/ogr_gpkg.py @@ -9515,3 +9515,74 @@ def test_ogr_gpkg_sql_exact_spatial_filter_for_feature_count(tmp_vsimem): lyr = ds.GetLayer(0) lyr.SetSpatialFilterRect(0.1, 0.2, 0.15, 0.3) assert lyr.GetFeatureCount() == 1 + + +############################################################################### + + +@pytest.mark.parametrize("too_big_field", ["huge_string", "huge_binary", "geometry"]) +def test_ogr_gpkg_arrow_stream_huge_array(tmp_vsimem, too_big_field): + pytest.importorskip("osgeo.gdal_array") + pytest.importorskip("numpy") + + filename = tmp_vsimem / "test_ogr_gpkg_arrow_stream_huge_array.gpkg" + ds = gdal.GetDriverByName("GPKG").Create(filename, 0, 0, 0, gdal.GDT_Unknown) + lyr = ds.CreateLayer("foo") + lyr.CreateField(ogr.FieldDefn("huge_string", ogr.OFTString)) + lyr.CreateField(ogr.FieldDefn("huge_binary", ogr.OFTBinary)) + for i in range(50): + f = ogr.Feature(lyr.GetLayerDefn()) + if too_big_field == "huge_string": + if i > 10: + f["huge_string"] = "x" * 10000 + elif too_big_field == "huge_binary": + if i > 10: + f["huge_binary"] = b"x" * 10000 + else: + geom = ogr.Geometry(ogr.wkbLineString) + geom.SetPoint_2D(500, 0, 0) + f.SetGeometry(geom) + lyr.CreateFeature(f) + ds.ExecuteSQL( + "CREATE VIEW my_view AS SELECT fid AS my_fid, geom AS my_geom, huge_string, huge_binary FROM foo" + ) + ds.ExecuteSQL( + "INSERT INTO gpkg_contents (table_name, identifier, data_type, srs_id) VALUES ( 'my_view', 'my_view', 'features', 0 )" + ) + ds.ExecuteSQL( + "INSERT INTO gpkg_geometry_columns (table_name, column_name, geometry_type_name, srs_id, z, m) values ('my_view', 'my_geom', 'GEOMETRY', 0, 0, 0)" + ) + ds = None + + ds = ogr.Open(filename) + for lyr_name in ["foo", "my_view"]: + lyr = ds.GetLayer(lyr_name) + + with gdaltest.config_option("OGR_ARROW_MEM_LIMIT", "20000", thread_local=False): + stream = lyr.GetArrowStreamAsNumPy( + ["INCLUDE_FID=YES", "MAX_FEATURES_IN_BATCH=10", "USE_MASKED_ARRAYS=NO"] + ) + batch_count = 0 + got_fids = [] + for batch in stream: + batch_count += 1 + for fid in batch[lyr.GetFIDColumn()]: + got_fids.append(fid) + assert got_fids == [i + 1 for i in range(50)] + assert batch_count == (25 if too_big_field == "geometry" else 21), lyr_name + del stream + + with ds.ExecuteSQL("SELECT * FROM foo") as sql_lyr: + with gdaltest.config_option("OGR_ARROW_MEM_LIMIT", "20000", thread_local=False): + stream = sql_lyr.GetArrowStreamAsNumPy( + ["INCLUDE_FID=YES", "MAX_FEATURES_IN_BATCH=10", "USE_MASKED_ARRAYS=NO"] + ) + batch_count = 0 + got_fids = [] + for batch in stream: + batch_count += 1 + for fid in batch[sql_lyr.GetFIDColumn()]: + got_fids.append(fid) + assert got_fids == [i + 1 for i in range(50)] + assert batch_count == (25 if too_big_field == "geometry" else 21), lyr_name + del stream diff --git a/autotest/ogr/ogr_mem.py b/autotest/ogr/ogr_mem.py index 28acfc02d1c7..e78c5bc119ab 100755 --- a/autotest/ogr/ogr_mem.py +++ b/autotest/ogr/ogr_mem.py @@ -886,6 +886,180 @@ def test_ogr_mem_arrow_stream_numpy(): assert len(batches) == 0 +############################################################################### + + +@pytest.mark.parametrize( + "limited_field", + [ + "str", + "strlist", + "int32list", + "int64list", + "float64list", + "boollist", + "binary", + "binary_fixed_width", + "geometry", + ], +) +def test_ogr_mem_arrow_stream_numpy_memlimit(limited_field): + pytest.importorskip("osgeo.gdal_array") + pytest.importorskip("numpy") + + ds = ogr.GetDriverByName("Memory").CreateDataSource("") + lyr = ds.CreateLayer("foo") + + field = ogr.FieldDefn("str", ogr.OFTString) + lyr.CreateField(field) + + field = ogr.FieldDefn("bool", ogr.OFTInteger) + field.SetSubType(ogr.OFSTBoolean) + lyr.CreateField(field) + + field = ogr.FieldDefn("int16", ogr.OFTInteger) + field.SetSubType(ogr.OFSTInt16) + lyr.CreateField(field) + + field = ogr.FieldDefn("int32", ogr.OFTInteger) + lyr.CreateField(field) + + field = ogr.FieldDefn("int64", ogr.OFTInteger64) + lyr.CreateField(field) + + field = ogr.FieldDefn("float32", ogr.OFTReal) + field.SetSubType(ogr.OFSTFloat32) + lyr.CreateField(field) + + field = ogr.FieldDefn("float64", ogr.OFTReal) + lyr.CreateField(field) + + field = ogr.FieldDefn("date", ogr.OFTDate) + lyr.CreateField(field) + + field = ogr.FieldDefn("time", ogr.OFTTime) + lyr.CreateField(field) + + field = ogr.FieldDefn("datetime", ogr.OFTDateTime) + lyr.CreateField(field) + + field = ogr.FieldDefn("binary", ogr.OFTBinary) + lyr.CreateField(field) + + field = ogr.FieldDefn("binary_fixed_width", ogr.OFTBinary) + field.SetWidth(50) + lyr.CreateField(field) + + field = ogr.FieldDefn("strlist", ogr.OFTStringList) + lyr.CreateField(field) + + field = ogr.FieldDefn("boollist", ogr.OFTIntegerList) + field.SetSubType(ogr.OFSTBoolean) + lyr.CreateField(field) + + field = ogr.FieldDefn("int16list", ogr.OFTIntegerList) + field.SetSubType(ogr.OFSTInt16) + lyr.CreateField(field) + + field = ogr.FieldDefn("int32list", ogr.OFTIntegerList) + lyr.CreateField(field) + + field = ogr.FieldDefn("int64list", ogr.OFTInteger64List) + lyr.CreateField(field) + + field = ogr.FieldDefn("float32list", ogr.OFTRealList) + field.SetSubType(ogr.OFSTFloat32) + lyr.CreateField(field) + + field = ogr.FieldDefn("float64list", ogr.OFTRealList) + lyr.CreateField(field) + + f = ogr.Feature(lyr.GetLayerDefn()) + f.SetField("bool", 1) + f.SetField("int16", -12345) + f.SetField("int32", 12345678) + f.SetField("int64", 12345678901234) + f.SetField("float32", 1.25) + f.SetField("float64", 1.250123) + f.SetField("str", "abc") + f.SetField("date", "2022-05-31") + f.SetField("time", "12:34:56.789") + f.SetField("datetime", "2022-05-31T12:34:56.789Z") + f.SetField("boollist", "[False,True]") + f.SetField("int16list", "[-12345,12345]") + f.SetField("int32list", "[-12345678,12345678]") + f.SetField("int64list", "[-12345678901234,12345678901234]") + f.SetField("float32list", "[-1.25,1.25]") + f.SetField("float64list", "[-1.250123,1.250123]") + f.SetField("strlist", '["abc","defghi"]') + f.SetField("binary", b"\xDE\xAD") + f.SetField("binary_fixed_width", b"\xDE\xAD" * 25) + f.SetGeometryDirectly(ogr.CreateGeometryFromWkt("POINT(1 2)")) + lyr.CreateFeature(f) + + f = ogr.Feature(lyr.GetLayerDefn()) + lyr.CreateFeature(f) + + f = ogr.Feature(lyr.GetLayerDefn()) + if limited_field == "str": + f["str"] = "x" * 100 + elif limited_field == "strlist": + f["strlist"] = ["x" * 100] + elif limited_field == "int32list": + f["int32list"] = [0] * 100 + elif limited_field == "int64list": + f["int64list"] = [0] * 100 + elif limited_field == "float64list": + f["float64list"] = [0] * 100 + elif limited_field == "boollist": + f["boollist"] = [False] * 100 + elif limited_field == "binary": + f["binary"] = b"x" * 100 + elif limited_field == "binary_fixed_width": + f.SetField("binary_fixed_width", b"\xDE\xAD" * 25) + elif limited_field == "geometry": + g = ogr.Geometry(ogr.wkbLineString) + sizeof_first_point = 21 + sizeof_linestring_preamble = 9 + sizeof_coord_pair = 2 * 8 + g.SetPoint_2D( + (100 - sizeof_linestring_preamble - sizeof_first_point) + // sizeof_coord_pair, + 0, + 0, + ) + f.SetGeometry(g) + lyr.CreateFeature(f) + + with gdaltest.config_option("OGR_ARROW_MEM_LIMIT", "100", thread_local=False): + stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"]) + batches = [batch for batch in stream] + assert len(batches) == 2 + assert [x for x in batches[0]["OGC_FID"]] == [0, 1] + assert [x for x in batches[1]["OGC_FID"]] == [2] + + with gdaltest.config_option("OGR_ARROW_MEM_LIMIT", "1", thread_local=False): + + stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"]) + with gdal.quiet_errors(): + gdal.ErrorReset() + batches = [batch for batch in stream] + assert ( + gdal.GetLastErrorMsg() + == "Too large feature: not even a single feature can be returned" + ) + assert len(batches) == 0 + + stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"]) + with gdaltest.enable_exceptions(): + with pytest.raises( + Exception, + match="Too large feature: not even a single feature can be returned", + ): + batches = [batch for batch in stream] + assert len(batches) == 0 + + ############################################################################### # Test optimization to save memory on string fields with huge strings compared # to the average size diff --git a/ogr/ogrsf_frmts/flatgeobuf/ogrflatgeobuflayer.cpp b/ogr/ogrsf_frmts/flatgeobuf/ogrflatgeobuflayer.cpp index c48db6308196..af78bb97fb83 100644 --- a/ogr/ogrsf_frmts/flatgeobuf/ogrflatgeobuflayer.cpp +++ b/ogr/ogrsf_frmts/flatgeobuf/ogrflatgeobuflayer.cpp @@ -1420,6 +1420,7 @@ int OGRFlatGeobufLayer::GetNextArrowArray(struct ArrowArrayStream *stream, const GIntBig nFeatureIdxStart = m_featuresPos; + const uint32_t nMemLimit = OGRArrowArrayHelper::GetMemLimit(); while (iFeat < sHelper.nMaxBatchSize) { bEOFOrError = true; @@ -1538,6 +1539,20 @@ int OGRFlatGeobufLayer::GetNextArrowArray(struct ArrowArrayStream *stream, const int iArrowField = sHelper.mapOGRGeomFieldToArrowField[0]; const size_t nWKBSize = poOGRGeometry->WkbSize(); + + if (iFeat > 0) + { + auto psArray = out_array->children[iArrowField]; + auto panOffsets = static_cast( + const_cast(psArray->buffers[1])); + const uint32_t nCurLength = + static_cast(panOffsets[iFeat]); + if (nWKBSize <= nMemLimit && nWKBSize > nMemLimit - nCurLength) + { + goto after_loop; + } + } + GByte *outPtr = sHelper.GetPtrForStringOrBinary(iArrowField, iFeat, nWKBSize); if (outPtr == nullptr) @@ -1794,6 +1809,19 @@ int OGRFlatGeobufLayer::GetNextArrowArray(struct ArrowArrayStream *stream, } if (!isIgnored) { + if (iFeat > 0) + { + auto panOffsets = static_cast( + const_cast(psArray->buffers[1])); + const uint32_t nCurLength = + static_cast(panOffsets[iFeat]); + if (len <= nMemLimit && + len > nMemLimit - nCurLength) + { + goto after_loop; + } + } + GByte *outPtr = sHelper.GetPtrForStringOrBinary( iArrowField, iFeat, len); if (outPtr == nullptr) @@ -1880,7 +1908,7 @@ int OGRFlatGeobufLayer::GetNextArrowArray(struct ArrowArrayStream *stream, m_featuresPos++; bEOFOrError = false; } - +after_loop: if (bEOFOrError) m_bEOF = true; diff --git a/ogr/ogrsf_frmts/generic/ograrrowarrayhelper.cpp b/ogr/ogrsf_frmts/generic/ograrrowarrayhelper.cpp index 976b1b18ba86..b23895bef903 100644 --- a/ogr/ogrsf_frmts/generic/ograrrowarrayhelper.cpp +++ b/ogr/ogrsf_frmts/generic/ograrrowarrayhelper.cpp @@ -34,9 +34,32 @@ //! @cond Doxygen_Suppress /************************************************************************/ -/* GetMaxFeaturesInBatch() */ +/* GetMemLimit() */ /************************************************************************/ +/*static*/ uint32_t OGRArrowArrayHelper::GetMemLimit() +{ + uint32_t nMemLimit = + static_cast(std::numeric_limits::max()); + // Just for tests + const char *pszOGR_ARROW_MEM_LIMIT = + CPLGetConfigOption("OGR_ARROW_MEM_LIMIT", nullptr); + if (pszOGR_ARROW_MEM_LIMIT) + nMemLimit = atoi(pszOGR_ARROW_MEM_LIMIT); + else + { + const uint64_t nUsableRAM = CPLGetUsablePhysicalRAM(); + if (nUsableRAM > 0 && nUsableRAM / 4 < nMemLimit) + nMemLimit = static_cast(nUsableRAM / 4); + } + return nMemLimit; +} + +/************************************************************************/ +/* GetMaxFeaturesInBatch() */ +/************************************************************************/ + +/* static */ int OGRArrowArrayHelper::GetMaxFeaturesInBatch( const CPLStringList &aosArrowArrayStreamOptions) { diff --git a/ogr/ogrsf_frmts/generic/ograrrowarrayhelper.h b/ogr/ogrsf_frmts/generic/ograrrowarrayhelper.h index 242ca2eb951b..77f3e3cb4178 100644 --- a/ogr/ogrsf_frmts/generic/ograrrowarrayhelper.h +++ b/ogr/ogrsf_frmts/generic/ograrrowarrayhelper.h @@ -57,6 +57,8 @@ class CPL_DLL OGRArrowArrayHelper int64_t *panFIDValues = nullptr; struct ArrowArray *m_out_array = nullptr; + static uint32_t GetMemLimit(); + static int GetMaxFeaturesInBatch(const CPLStringList &aosArrowArrayStreamOptions); diff --git a/ogr/ogrsf_frmts/generic/ogrlayerarrow.cpp b/ogr/ogrsf_frmts/generic/ogrlayerarrow.cpp index bdacca902446..86586d5d0359 100644 --- a/ogr/ogrsf_frmts/generic/ogrlayerarrow.cpp +++ b/ogr/ogrsf_frmts/generic/ogrlayerarrow.cpp @@ -739,18 +739,19 @@ static uint8_t *AllocValidityBitmap(size_t nSize) template static bool FillArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, - const bool bIsNullable, TMember member, const int i) + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, + TMember member, const int i) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); uint8_t *pabyValidity = nullptr; T *panValues = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * apoFeatures.size())); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * nFeatureCountLimit)); if (panValues == nullptr) return false; psChild->buffers[1] = panValues; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat) { auto &poFeature = apoFeatures[iFeat]; const auto psRawField = poFeature->GetRawFieldRef(i); @@ -764,7 +765,7 @@ static bool FillArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) return false; @@ -785,19 +786,20 @@ static bool FillArray(struct ArrowArray *psChild, template static bool FillBoolArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, TMember member, const int i) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); uint8_t *pabyValidity = nullptr; uint8_t *panValues = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE((apoFeatures.size() + 7) / 8)); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE((nFeatureCountLimit + 7) / 8)); if (panValues == nullptr) return false; - memset(panValues, 0, (apoFeatures.size() + 7) / 8); + memset(panValues, 0, (nFeatureCountLimit + 7) / 8); psChild->buffers[1] = panValues; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat) { auto &poFeature = apoFeatures[iFeat]; const auto psRawField = poFeature->GetRawFieldRef(i); @@ -811,7 +813,7 @@ static bool FillBoolArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) return false; @@ -866,22 +868,25 @@ struct GetFromRealList }; template -static bool FillListArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, - const bool bIsNullable, const int i) +static size_t +FillListArray(struct ArrowArray *psChild, + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, + const int i, const size_t nMemLimit) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); uint8_t *pabyValidity = nullptr; OffsetType *panOffsets = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE( - sizeof(OffsetType) * (1 + apoFeatures.size()))); + sizeof(OffsetType) * (1 + nFeatureCountLimit))); if (panOffsets == nullptr) - return false; + return 0; psChild->buffers[1] = panOffsets; OffsetType nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + size_t nFeatCount = 0; + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat, ++nFeatCount) { panOffsets[iFeat] = nOffset; auto &poFeature = apoFeatures[iFeat]; @@ -889,9 +894,12 @@ static bool FillListArray(struct ArrowArray *psChild, if (IsValidField(psRawField)) { const unsigned nCount = GetFromList::getCount(psRawField); - if (nCount > static_cast( - std::numeric_limits::max() - nOffset)) - return false; + if (nCount > static_cast(nMemLimit - nOffset)) + { + if (nFeatCount == 0) + return 0; + break; + } nOffset += static_cast(nCount); } else if (bIsNullable) @@ -899,15 +907,15 @@ static bool FillListArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) - return false; + return 0; } UnsetBit(pabyValidity, iFeat); } } - panOffsets[apoFeatures.size()] = nOffset; + panOffsets[nFeatCount] = nOffset; psChild->n_children = 1; psChild->children = static_cast( @@ -924,11 +932,11 @@ static bool FillListArray(struct ArrowArray *psChild, T *panValues = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * nOffset)); if (panValues == nullptr) - return false; + return 0; psValueChild->buffers[1] = panValues; nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatCount; ++iFeat) { auto &poFeature = apoFeatures[iFeat]; const auto psRawField = poFeature->GetRawFieldRef(i); @@ -949,27 +957,29 @@ static bool FillListArray(struct ArrowArray *psChild, } } - return true; + return nFeatCount; } template -static bool +static size_t FillListArrayBool(struct ArrowArray *psChild, - std::vector> &apoFeatures, - const bool bIsNullable, const int i) + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, + const int i, const size_t nMemLimit) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); uint8_t *pabyValidity = nullptr; OffsetType *panOffsets = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE( - sizeof(OffsetType) * (1 + apoFeatures.size()))); + sizeof(OffsetType) * (1 + nFeatureCountLimit))); if (panOffsets == nullptr) - return false; + return 0; psChild->buffers[1] = panOffsets; OffsetType nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + size_t nFeatCount = 0; + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat, ++nFeatCount) { panOffsets[iFeat] = nOffset; auto &poFeature = apoFeatures[iFeat]; @@ -977,9 +987,12 @@ FillListArrayBool(struct ArrowArray *psChild, if (IsValidField(psRawField)) { const unsigned nCount = GetFromList::getCount(psRawField); - if (nCount > static_cast( - std::numeric_limits::max() - nOffset)) - return false; + if (nCount > static_cast(nMemLimit - nOffset)) + { + if (nFeatCount == 0) + return 0; + break; + } nOffset += static_cast(nCount); } else if (bIsNullable) @@ -987,15 +1000,15 @@ FillListArrayBool(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) - return false; + return 0; } UnsetBit(pabyValidity, iFeat); } } - panOffsets[apoFeatures.size()] = nOffset; + panOffsets[nFeatCount] = nOffset; psChild->n_children = 1; psChild->children = static_cast( @@ -1012,12 +1025,12 @@ FillListArrayBool(struct ArrowArray *psChild, uint8_t *panValues = static_cast( VSI_MALLOC_ALIGNED_AUTO_VERBOSE((nOffset + 7) / 8)); if (panValues == nullptr) - return false; + return 0; memset(panValues, 0, (nOffset + 7) / 8); psValueChild->buffers[1] = panValues; nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat) { auto &poFeature = apoFeatures[iFeat]; const auto psRawField = poFeature->GetRawFieldRef(i); @@ -1035,7 +1048,7 @@ FillListArrayBool(struct ArrowArray *psChild, } } - return true; + return nFeatCount; } /************************************************************************/ @@ -1043,31 +1056,36 @@ FillListArrayBool(struct ArrowArray *psChild, /************************************************************************/ template -static bool +static size_t FillStringArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, - const bool bIsNullable, const int i) + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, + const int i, const size_t nMemLimit) { psChild->n_buffers = 3; psChild->buffers = static_cast(CPLCalloc(3, sizeof(void *))); uint8_t *pabyValidity = nullptr; T *panOffsets = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * (1 + apoFeatures.size()))); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * (1 + nFeatureCountLimit))); if (panOffsets == nullptr) - return false; + return 0; psChild->buffers[1] = panOffsets; size_t nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + size_t nFeatCount = 0; + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat, ++nFeatCount) { panOffsets[iFeat] = static_cast(nOffset); const auto psRawField = apoFeatures[iFeat]->GetRawFieldRef(i); if (IsValidField(psRawField)) { const size_t nLen = strlen(psRawField->String); - if (nLen > - static_cast(std::numeric_limits::max()) - nOffset) - return false; + if (nLen > nMemLimit - nOffset) + { + if (nFeatCount == 0) + return 0; + break; + } nOffset += static_cast(nLen); } else if (bIsNullable) @@ -1075,24 +1093,24 @@ FillStringArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) - return false; + return 0; } UnsetBit(pabyValidity, iFeat); } } - panOffsets[apoFeatures.size()] = static_cast(nOffset); + panOffsets[nFeatCount] = static_cast(nOffset); char *pachValues = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE(nOffset)); if (pachValues == nullptr) - return false; + return 0; psChild->buffers[2] = pachValues; nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatCount; ++iFeat) { const size_t nLen = static_cast(panOffsets[iFeat + 1] - panOffsets[iFeat]); @@ -1104,7 +1122,7 @@ FillStringArray(struct ArrowArray *psChild, } } - return true; + return nFeatCount; } /************************************************************************/ @@ -1112,24 +1130,26 @@ FillStringArray(struct ArrowArray *psChild, /************************************************************************/ template -static bool +static size_t FillStringListArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, - const bool bIsNullable, const int i) + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, + const int i, const size_t nMemLimit) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); uint8_t *pabyValidity = nullptr; OffsetType *panOffsets = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE( - sizeof(OffsetType) * (1 + apoFeatures.size()))); + sizeof(OffsetType) * (1 + nFeatureCountLimit))); if (panOffsets == nullptr) return false; psChild->buffers[1] = panOffsets; OffsetType nStrings = 0; OffsetType nCountChars = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + size_t nFeatCount = 0; + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat, ++nFeatCount) { panOffsets[iFeat] = nStrings; auto &poFeature = apoFeatures[iFeat]; @@ -1138,16 +1158,21 @@ FillStringListArray(struct ArrowArray *psChild, { const int nCount = psRawField->StringList.nCount; if (static_cast(nCount) > - static_cast(std::numeric_limits::max() - - nStrings)) - return false; + static_cast(nMemLimit - nStrings)) + { + if (nFeatCount == 0) + return 0; + goto after_loop; + } for (int j = 0; j < nCount; ++j) { const size_t nLen = strlen(psRawField->StringList.paList[j]); - if (nLen > - static_cast(std::numeric_limits::max() - - nCountChars)) - return false; + if (nLen > static_cast(nMemLimit - nCountChars)) + { + if (nFeatCount == 0) + return 0; + goto after_loop; + } nCountChars += static_cast(nLen); } nStrings += static_cast(nCount); @@ -1157,15 +1182,16 @@ FillStringListArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) - return false; + return 0; } UnsetBit(pabyValidity, iFeat); } } - panOffsets[apoFeatures.size()] = nStrings; +after_loop: + panOffsets[nFeatCount] = nStrings; psChild->n_children = 1; psChild->children = static_cast( @@ -1183,18 +1209,18 @@ FillStringListArray(struct ArrowArray *psChild, OffsetType *panChildOffsets = static_cast( VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(OffsetType) * (1 + nStrings))); if (panChildOffsets == nullptr) - return false; + return 0; psValueChild->buffers[1] = panChildOffsets; char *pachValues = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE(nCountChars)); if (pachValues == nullptr) - return false; + return 0; psValueChild->buffers[2] = pachValues; nStrings = 0; nCountChars = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatCount; ++iFeat) { auto &poFeature = apoFeatures[iFeat]; const auto psRawField = poFeature->GetRawFieldRef(i); @@ -1214,7 +1240,7 @@ FillStringListArray(struct ArrowArray *psChild, } panChildOffsets[nStrings] = nCountChars; - return true; + return nFeatCount; } /************************************************************************/ @@ -1222,31 +1248,36 @@ FillStringListArray(struct ArrowArray *psChild, /************************************************************************/ template -static bool +static size_t FillBinaryArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, - const bool bIsNullable, const int i) + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, + const int i, const size_t nMemLimit) { psChild->n_buffers = 3; psChild->buffers = static_cast(CPLCalloc(3, sizeof(void *))); uint8_t *pabyValidity = nullptr; T *panOffsets = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * (1 + apoFeatures.size()))); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * (1 + nFeatureCountLimit))); if (panOffsets == nullptr) - return false; + return 0; psChild->buffers[1] = panOffsets; T nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + size_t nFeatCount = 0; + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat, ++nFeatCount) { panOffsets[iFeat] = nOffset; const auto psRawField = apoFeatures[iFeat]->GetRawFieldRef(i); if (IsValidField(psRawField)) { const size_t nLen = psRawField->Binary.nCount; - if (nLen > - static_cast(std::numeric_limits::max() - nOffset)) - return false; + if (nLen > static_cast(nMemLimit - nOffset)) + { + if (iFeat == 0) + return 0; + break; + } nOffset += static_cast(nLen); } else if (bIsNullable) @@ -1254,24 +1285,24 @@ FillBinaryArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) - return false; + return 0; } UnsetBit(pabyValidity, iFeat); } } - panOffsets[apoFeatures.size()] = nOffset; + panOffsets[nFeatCount] = nOffset; GByte *pabyValues = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE(nOffset)); if (pabyValues == nullptr) - return false; + return 0; psChild->buffers[2] = pabyValues; nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatCount; ++iFeat) { const size_t nLen = static_cast(panOffsets[iFeat + 1] - panOffsets[iFeat]); @@ -1283,7 +1314,7 @@ FillBinaryArray(struct ArrowArray *psChild, } } - return true; + return nFeatCount; } /************************************************************************/ @@ -1292,22 +1323,23 @@ FillBinaryArray(struct ArrowArray *psChild, static bool FillFixedWidthBinaryArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, const int nWidth, const int i) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(3, sizeof(void *))); uint8_t *pabyValidity = nullptr; - if (apoFeatures.size() > std::numeric_limits::max() / nWidth) + if (nFeatureCountLimit > std::numeric_limits::max() / nWidth) return false; GByte *pabyValues = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE(apoFeatures.size() * nWidth)); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE(nFeatureCountLimit * nWidth)); if (pabyValues == nullptr) return false; psChild->buffers[1] = pabyValues; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat) { const auto psRawField = apoFeatures[iFeat]->GetRawFieldRef(i); if (IsValidField(psRawField)) @@ -1333,7 +1365,7 @@ FillFixedWidthBinaryArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) return false; @@ -1351,19 +1383,21 @@ FillFixedWidthBinaryArray(struct ArrowArray *psChild, /************************************************************************/ template -static bool +static size_t FillWKBGeometryArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, - const OGRGeomFieldDefn *poFieldDefn, const int i) + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, + const OGRGeomFieldDefn *poFieldDefn, const int i, + const size_t nMemLimit) { const bool bIsNullable = CPL_TO_BOOL(poFieldDefn->IsNullable()); psChild->n_buffers = 3; psChild->buffers = static_cast(CPLCalloc(3, sizeof(void *))); uint8_t *pabyValidity = nullptr; T *panOffsets = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * (1 + apoFeatures.size()))); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(T) * (1 + nFeatureCountLimit))); if (panOffsets == nullptr) - return false; + return 0; psChild->buffers[1] = panOffsets; const auto eGeomType = poFieldDefn->GetType(); auto poEmptyGeom = @@ -1373,16 +1407,20 @@ FillWKBGeometryArray(struct ArrowArray *psChild, : eGeomType)); size_t nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + size_t nFeatCount = 0; + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat, ++nFeatCount) { panOffsets[iFeat] = static_cast(nOffset); const auto poGeom = apoFeatures[iFeat]->GetGeomFieldRef(i); if (poGeom != nullptr) { const size_t nLen = poGeom->WkbSize(); - if (nLen > - static_cast(std::numeric_limits::max()) - nOffset) - return false; + if (nLen > nMemLimit - nOffset) + { + if (nFeatCount == 0) + return 0; + break; + } nOffset += static_cast(nLen); } else if (bIsNullable) @@ -1390,32 +1428,35 @@ FillWKBGeometryArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) - return false; + return 0; } UnsetBit(pabyValidity, iFeat); } else if (poEmptyGeom) { const size_t nLen = poEmptyGeom->WkbSize(); - if (nLen > - static_cast(std::numeric_limits::max()) - nOffset) - return false; + if (nLen > nMemLimit - nOffset) + { + if (nFeatCount == 0) + return 0; + break; + } nOffset += static_cast(nLen); } } - panOffsets[apoFeatures.size()] = static_cast(nOffset); + panOffsets[nFeatCount] = static_cast(nOffset); GByte *pabyValues = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE(nOffset)); if (pabyValues == nullptr) - return false; + return 0; psChild->buffers[2] = pabyValues; nOffset = 0; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatCount; ++iFeat) { const size_t nLen = static_cast(panOffsets[iFeat + 1] - panOffsets[iFeat]); @@ -1433,7 +1474,7 @@ FillWKBGeometryArray(struct ArrowArray *psChild, } } - return true; + return nFeatCount; } /************************************************************************/ @@ -1441,18 +1482,19 @@ FillWKBGeometryArray(struct ArrowArray *psChild, /************************************************************************/ static bool FillDateArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, const int i) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); uint8_t *pabyValidity = nullptr; int32_t *panValues = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(int32_t) * apoFeatures.size())); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(int32_t) * nFeatureCountLimit)); if (panValues == nullptr) return false; psChild->buffers[1] = panValues; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat) { auto &poFeature = apoFeatures[iFeat]; const auto psRawField = poFeature->GetRawFieldRef(i); @@ -1472,7 +1514,7 @@ static bool FillDateArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) return false; @@ -1492,18 +1534,19 @@ static bool FillDateArray(struct ArrowArray *psChild, /************************************************************************/ static bool FillTimeArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, const int i) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); uint8_t *pabyValidity = nullptr; int32_t *panValues = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(int32_t) * apoFeatures.size())); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(int32_t) * nFeatureCountLimit)); if (panValues == nullptr) return false; psChild->buffers[1] = panValues; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat) { auto &poFeature = apoFeatures[iFeat]; const auto psRawField = poFeature->GetRawFieldRef(i); @@ -1520,7 +1563,7 @@ static bool FillTimeArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) return false; @@ -1541,20 +1584,21 @@ static bool FillTimeArray(struct ArrowArray *psChild, static bool FillDateTimeArray(struct ArrowArray *psChild, - std::vector> &apoFeatures, - const bool bIsNullable, const int i, int nFieldTZFlag) + std::deque> &apoFeatures, + const size_t nFeatureCountLimit, const bool bIsNullable, + const int i, int nFieldTZFlag) { psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); uint8_t *pabyValidity = nullptr; int64_t *panValues = static_cast( - VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(int64_t) * apoFeatures.size())); + VSI_MALLOC_ALIGNED_AUTO_VERBOSE(sizeof(int64_t) * nFeatureCountLimit)); if (panValues == nullptr) return false; psChild->buffers[1] = panValues; struct tm brokenDown; memset(&brokenDown, 0, sizeof(brokenDown)); - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < nFeatureCountLimit; ++iFeat) { auto &poFeature = apoFeatures[iFeat]; const auto psRawField = poFeature->GetRawFieldRef(i); @@ -1595,7 +1639,7 @@ FillDateTimeArray(struct ArrowArray *psChild, ++psChild->null_count; if (pabyValidity == nullptr) { - pabyValidity = AllocValidityBitmap(apoFeatures.size()); + pabyValidity = AllocValidityBitmap(nFeatureCountLimit); psChild->buffers[0] = pabyValidity; if (pabyValidity == nullptr) return false; @@ -1637,22 +1681,10 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, if (nMaxBatchSize > INT_MAX - 1) nMaxBatchSize = INT_MAX - 1; - std::vector> apoFeatures; - try - { - apoFeatures.reserve(nMaxBatchSize); - } - catch (const std::exception &e) - { - CPLError(CE_Failure, CPLE_OutOfMemory, "%s", e.what()); - return ENOMEM; - } + auto &oFeatureQueue = + m_poSharedArrowArrayStreamPrivateData->m_oFeatureQueue; memset(out_array, 0, sizeof(*out_array)); - if (poPrivate->poShared->m_bEOF) - { - return 0; - } auto poLayerDefn = GetLayerDefn(); const int nFieldCount = poLayerDefn->GetFieldCount(); @@ -1661,16 +1693,18 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, (bIncludeFID ? 1 : 0) + nFieldCount + nGeomFieldCount; int iSchemaChild = 0; - out_array->release = OGRLayerDefaultReleaseArray; - if (!m_poSharedArrowArrayStreamPrivateData->m_anQueriedFIDs.empty()) { + if (poPrivate->poShared->m_bEOF) + { + return 0; + } if (m_poSharedArrowArrayStreamPrivateData->m_iQueriedFIDS == 0) { CPLDebug("OGR", "Using fast FID filtering"); } while ( - apoFeatures.size() < static_cast(nMaxBatchSize) && + oFeatureQueue.size() < static_cast(nMaxBatchSize) && m_poSharedArrowArrayStreamPrivateData->m_iQueriedFIDS < m_poSharedArrowArrayStreamPrivateData->m_anQueriedFIDs.size()) { @@ -1683,7 +1717,7 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, FilterGeometry(poFeature->GetGeomFieldRef( m_iGeomFieldFilter)))) { - apoFeatures.emplace_back(std::move(poFeature)); + oFeatureQueue.emplace_back(std::move(poFeature)); } } if (m_poSharedArrowArrayStreamPrivateData->m_iQueriedFIDS == @@ -1692,9 +1726,9 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, poPrivate->poShared->m_bEOF = true; } } - else + else if (!poPrivate->poShared->m_bEOF) { - for (int i = 0; i < nMaxBatchSize; i++) + while (oFeatureQueue.size() < static_cast(nMaxBatchSize)) { auto poFeature = std::unique_ptr(GetNextFeature()); if (!poFeature) @@ -1702,17 +1736,15 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, poPrivate->poShared->m_bEOF = true; break; } - apoFeatures.emplace_back(std::move(poFeature)); + oFeatureQueue.emplace_back(std::move(poFeature)); } } - if (apoFeatures.empty()) + if (oFeatureQueue.empty()) { - out_array->release(out_array); - memset(out_array, 0, sizeof(*out_array)); return 0; } - out_array->length = apoFeatures.size(); + out_array->release = OGRLayerDefaultReleaseArray; out_array->null_count = 0; out_array->n_children = nMaxChildren; @@ -1723,6 +1755,8 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, out_array->buffers = static_cast(CPLCalloc(1, sizeof(void *))); + size_t nFeatureCount = oFeatureQueue.size(); + const uint32_t nMemLimit = OGRArrowArrayHelper::GetMemLimit(); if (bIncludeFID) { out_array->children[iSchemaChild] = static_cast( @@ -1730,19 +1764,18 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, auto psChild = out_array->children[iSchemaChild]; ++iSchemaChild; psChild->release = OGRLayerDefaultReleaseArray; - psChild->length = apoFeatures.size(); psChild->n_buffers = 2; psChild->buffers = static_cast(CPLCalloc(2, sizeof(void *))); int64_t *panValues = static_cast(VSI_MALLOC_ALIGNED_AUTO_VERBOSE( - sizeof(int64_t) * apoFeatures.size())); + sizeof(int64_t) * oFeatureQueue.size())); if (panValues == nullptr) goto error; psChild->buffers[1] = panValues; - for (size_t iFeat = 0; iFeat < apoFeatures.size(); ++iFeat) + for (size_t iFeat = 0; iFeat < oFeatureQueue.size(); ++iFeat) { - panValues[iFeat] = apoFeatures[iFeat]->GetFID(); + panValues[iFeat] = oFeatureQueue[iFeat]->GetFID(); } } @@ -1759,7 +1792,6 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, auto psChild = out_array->children[iSchemaChild]; ++iSchemaChild; psChild->release = OGRLayerDefaultReleaseArray; - psChild->length = apoFeatures.size(); const bool bIsNullable = CPL_TO_BOOL(poFieldDefn->IsNullable()); const auto eSubType = poFieldDefn->GetSubType(); switch (poFieldDefn->GetType()) @@ -1768,19 +1800,21 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, { if (eSubType == OFSTBoolean) { - if (!FillBoolArray(psChild, apoFeatures, bIsNullable, - &OGRField::Integer, i)) + if (!FillBoolArray(psChild, oFeatureQueue, nFeatureCount, + bIsNullable, &OGRField::Integer, i)) goto error; } else if (eSubType == OFSTInt16) { - if (!FillArray(psChild, apoFeatures, bIsNullable, + if (!FillArray(psChild, oFeatureQueue, + nFeatureCount, bIsNullable, &OGRField::Integer, i)) goto error; } else { - if (!FillArray(psChild, apoFeatures, bIsNullable, + if (!FillArray(psChild, oFeatureQueue, + nFeatureCount, bIsNullable, &OGRField::Integer, i)) goto error; } @@ -1810,8 +1844,8 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, case OFTInteger64: { - if (!FillArray(psChild, apoFeatures, bIsNullable, - &OGRField::Integer64, i)) + if (!FillArray(psChild, oFeatureQueue, nFeatureCount, + bIsNullable, &OGRField::Integer64, i)) goto error; break; } @@ -1820,13 +1854,14 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, { if (eSubType == OFSTFloat32) { - if (!FillArray(psChild, apoFeatures, bIsNullable, - &OGRField::Real, i)) + if (!FillArray(psChild, oFeatureQueue, nFeatureCount, + bIsNullable, &OGRField::Real, i)) goto error; } else { - if (!FillArray(psChild, apoFeatures, bIsNullable, + if (!FillArray(psChild, oFeatureQueue, + nFeatureCount, bIsNullable, &OGRField::Real, i)) goto error; } @@ -1836,9 +1871,15 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, case OFTString: case OFTWideString: { - if (!FillStringArray(psChild, apoFeatures, bIsNullable, - i)) - goto error; + const size_t nThisFeatureCount = FillStringArray( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, i, + nMemLimit); + if (nThisFeatureCount == 0) + { + goto error_max_mem; + } + if (nThisFeatureCount < nFeatureCount) + nFeatureCount = nThisFeatureCount; break; } @@ -1847,90 +1888,141 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, const int nWidth = poFieldDefn->GetWidth(); if (nWidth > 0) { - if (!FillFixedWidthBinaryArray(psChild, apoFeatures, - bIsNullable, nWidth, i)) + if (nFeatureCount > nMemLimit / nWidth) + { + nFeatureCount = nMemLimit / nWidth; + if (nFeatureCount == 0) + goto error_max_mem; + } + if (!FillFixedWidthBinaryArray(psChild, oFeatureQueue, + nFeatureCount, bIsNullable, + nWidth, i)) goto error; } - else if (!FillBinaryArray(psChild, apoFeatures, - bIsNullable, i)) - goto error; + else + { + const size_t nThisFeatureCount = FillBinaryArray( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, i, + nMemLimit); + if (nThisFeatureCount == 0) + { + goto error_max_mem; + } + if (nThisFeatureCount < nFeatureCount) + nFeatureCount = nThisFeatureCount; + } break; } case OFTIntegerList: { + size_t nThisFeatureCount; if (eSubType == OFSTBoolean) { - if (!FillListArrayBool( - psChild, apoFeatures, bIsNullable, i)) - goto error; + nThisFeatureCount = + FillListArrayBool( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, + i, nMemLimit); } else if (eSubType == OFSTInt16) { - if (!FillListArray( - psChild, apoFeatures, bIsNullable, i)) - goto error; + nThisFeatureCount = + FillListArray( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, + i, nMemLimit); } else { - if (!FillListArray( - psChild, apoFeatures, bIsNullable, i)) - goto error; + nThisFeatureCount = + FillListArray( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, + i, nMemLimit); + } + if (nThisFeatureCount == 0) + { + goto error_max_mem; } + if (nThisFeatureCount < nFeatureCount) + nFeatureCount = nThisFeatureCount; break; } case OFTInteger64List: { - if (!FillListArray( - psChild, apoFeatures, bIsNullable, i)) - goto error; + const size_t nThisFeatureCount = + FillListArray( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, i, + nMemLimit); + if (nThisFeatureCount == 0) + { + goto error_max_mem; + } + if (nThisFeatureCount < nFeatureCount) + nFeatureCount = nThisFeatureCount; break; } case OFTRealList: { + size_t nThisFeatureCount; if (eSubType == OFSTFloat32) { - if (!FillListArray( - psChild, apoFeatures, bIsNullable, i)) - goto error; + nThisFeatureCount = + FillListArray( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, + i, nMemLimit); } else { - if (!FillListArray( - psChild, apoFeatures, bIsNullable, i)) - goto error; + nThisFeatureCount = + FillListArray( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, + i, nMemLimit); + } + if (nThisFeatureCount == 0) + { + goto error_max_mem; } + if (nThisFeatureCount < nFeatureCount) + nFeatureCount = nThisFeatureCount; break; } case OFTStringList: case OFTWideStringList: { - if (!FillStringListArray(psChild, apoFeatures, - bIsNullable, i)) - goto error; + const size_t nThisFeatureCount = FillStringListArray( + psChild, oFeatureQueue, nFeatureCount, bIsNullable, i, + nMemLimit); + if (nThisFeatureCount == 0) + { + goto error_max_mem; + } + if (nThisFeatureCount < nFeatureCount) + nFeatureCount = nThisFeatureCount; break; } case OFTDate: { - if (!FillDateArray(psChild, apoFeatures, bIsNullable, i)) + if (!FillDateArray(psChild, oFeatureQueue, nFeatureCount, + bIsNullable, i)) goto error; break; } case OFTTime: { - if (!FillTimeArray(psChild, apoFeatures, bIsNullable, i)) + if (!FillTimeArray(psChild, oFeatureQueue, nFeatureCount, + bIsNullable, i)) goto error; break; } case OFTDateTime: { - if (!FillDateTimeArray(psChild, apoFeatures, bIsNullable, i, + if (!FillDateTimeArray(psChild, oFeatureQueue, nFeatureCount, + bIsNullable, i, poFieldDefn->GetTZFlag())) goto error; break; @@ -1950,17 +2042,43 @@ int OGRLayer::GetNextArrowArray(struct ArrowArrayStream *stream, auto psChild = out_array->children[iSchemaChild]; ++iSchemaChild; psChild->release = OGRLayerDefaultReleaseArray; - psChild->length = apoFeatures.size(); - if (!FillWKBGeometryArray(psChild, apoFeatures, poFieldDefn, - i)) - goto error; + psChild->length = oFeatureQueue.size(); + const size_t nThisFeatureCount = FillWKBGeometryArray( + psChild, oFeatureQueue, nFeatureCount, poFieldDefn, i, nMemLimit); + if (nThisFeatureCount == 0) + { + goto error_max_mem; + } + if (nThisFeatureCount < nFeatureCount) + nFeatureCount = nThisFeatureCount; + } + + // Remove consumed features from the queue + if (nFeatureCount == oFeatureQueue.size()) + oFeatureQueue.clear(); + else + { + for (size_t i = 0; i < nFeatureCount; ++i) + { + oFeatureQueue.pop_front(); + } } out_array->n_children = iSchemaChild; + out_array->length = nFeatureCount; + for (int i = 0; i < out_array->n_children; ++i) + { + out_array->children[i]->length = nFeatureCount; + } return 0; +error_max_mem: + CPLError(CE_Failure, CPLE_AppDefined, + "Too large feature: not even a single feature can be returned"); error: + oFeatureQueue.clear(); + poPrivate->poShared->m_bEOF = true; out_array->release(out_array); memset(out_array, 0, sizeof(*out_array)); return ENOMEM; diff --git a/ogr/ogrsf_frmts/gpkg/ogr_geopackage.h b/ogr/ogrsf_frmts/gpkg/ogr_geopackage.h index f9cecfeb656d..18c0b8ba4701 100644 --- a/ogr/ogrsf_frmts/gpkg/ogr_geopackage.h +++ b/ogr/ogrsf_frmts/gpkg/ogr_geopackage.h @@ -101,6 +101,7 @@ struct OGRGPKGTableLayerFillArrowArray std::unique_ptr psHelper{}; int nCountRows = 0; bool bErrorOccurred = false; + bool bMemoryLimitReached = false; std::string osErrorMsg{}; OGRFeatureDefn *poFeatureDefn = nullptr; OGRGeoPackageLayer *poLayer = nullptr; @@ -745,6 +746,7 @@ class OGRGeoPackageTableLayer final : public OGRGeoPackageLayer bool m_bArrayReady = false; bool m_bFetchRows = false; bool m_bStop = false; + bool m_bMemoryLimitReached = false; std::string m_osErrorMsg{}; std::unique_ptr m_poDS{}; OGRGeoPackageTableLayer *m_poLayer{}; @@ -762,7 +764,8 @@ class OGRGeoPackageTableLayer final : public OGRGeoPackageLayer virtual int GetNextArrowArray(struct ArrowArrayStream *, struct ArrowArray *out_array) override; int GetNextArrowArrayInternal(struct ArrowArray *out_array, - std::string &osErrorMsg); + std::string &osErrorMsg, + bool &bMemoryLimitReached); int GetNextArrowArrayAsynchronous(struct ArrowArray *out_array); void GetNextArrowArrayAsynchronousWorker(); void CancelAsyncNextArrowArray(); diff --git a/ogr/ogrsf_frmts/gpkg/ogrgeopackagelayer.cpp b/ogr/ogrsf_frmts/gpkg/ogrgeopackagelayer.cpp index a07de24c129e..74c768932c47 100644 --- a/ogr/ogrsf_frmts/gpkg/ogrgeopackagelayer.cpp +++ b/ogr/ogrsf_frmts/gpkg/ogrgeopackagelayer.cpp @@ -558,6 +558,7 @@ int OGRGeoPackageLayer::GetNextArrowArray(struct ArrowArrayStream *stream, struct tm brokenDown; memset(&brokenDown, 0, sizeof(brokenDown)); + const uint32_t nMemLimit = OGRArrowArrayHelper::GetMemLimit(); int iFeat = 0; while (iFeat < sHelper.nMaxBatchSize) { @@ -675,6 +676,20 @@ int OGRGeoPackageLayer::GetNextArrowArray(struct ArrowArrayStream *stream, if (nWKBSize != 0) { + if (iFeat > 0) + { + auto panOffsets = static_cast( + const_cast(psArray->buffers[1])); + const uint32_t nCurLength = + static_cast(panOffsets[iFeat]); + if (nWKBSize <= nMemLimit && + nWKBSize > nMemLimit - nCurLength) + { + m_bDoStep = false; + break; + } + } + GByte *outPtr = sHelper.GetPtrForStringOrBinary( iArrowField, iFeat, nWKBSize); if (outPtr == nullptr) @@ -785,6 +800,22 @@ int OGRGeoPackageLayer::GetNextArrowArray(struct ArrowArrayStream *stream, sqlite3_column_blob(hStmt, iRawField); if (pabyData != nullptr || nBytes == 0) { + if (iFeat > 0) + { + auto panOffsets = static_cast( + const_cast(psArray->buffers[1])); + const uint32_t nCurLength = + static_cast(panOffsets[iFeat]); + if (nBytes <= nMemLimit && + nBytes > nMemLimit - nCurLength) + { + m_bDoStep = false; + m_iNextShapeId--; + m_nFeaturesRead--; + goto after_loop; + } + } + GByte *outPtr = sHelper.GetPtrForStringOrBinary( iArrowField, iFeat, nBytes); if (outPtr == nullptr) @@ -833,6 +864,22 @@ int OGRGeoPackageLayer::GetNextArrowArray(struct ArrowArrayStream *stream, if (pszTxt != nullptr) { const size_t nBytes = strlen(pszTxt); + if (iFeat > 0) + { + auto panOffsets = static_cast( + const_cast(psArray->buffers[1])); + const uint32_t nCurLength = + static_cast(panOffsets[iFeat]); + if (nBytes <= nMemLimit && + nBytes > nMemLimit - nCurLength) + { + m_bDoStep = false; + m_iNextShapeId--; + m_nFeaturesRead--; + goto after_loop; + } + } + GByte *outPtr = sHelper.GetPtrForStringOrBinary( iArrowField, iFeat, nBytes); if (outPtr == nullptr) @@ -859,7 +906,7 @@ int OGRGeoPackageLayer::GetNextArrowArray(struct ArrowArrayStream *stream, ++iFeat; } - +after_loop: sHelper.Shrink(iFeat); if (iFeat == 0) sHelper.ClearArray(); diff --git a/ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp b/ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp index df105e5a0728..0347fc26999a 100644 --- a/ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp +++ b/ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp @@ -7510,6 +7510,9 @@ void OGR_GPKG_FillArrowArray_Step(sqlite3_context *pContext, int /*argc*/, } if (psFillArrowArray->nCountRows < 0) return; + + const uint32_t nMemLimit = OGRArrowArrayHelper::GetMemLimit(); +begin: const int iFeat = psFillArrowArray->nCountRows; auto psHelper = psFillArrowArray->psHelper.get(); @@ -7609,6 +7612,42 @@ void OGR_GPKG_FillArrowArray_Step(sqlite3_context *pContext, int /*argc*/, } } + if (psFillArrowArray->nCountRows > 0) + { + auto panOffsets = static_cast( + const_cast(psArray->buffers[1])); + const uint32_t nCurLength = + static_cast(panOffsets[iFeat]); + if (nWKBSize <= nMemLimit && + nWKBSize > nMemLimit - nCurLength) + { + CPLDebug("GPKG", + "OGR_GPKG_FillArrowArray_Step(): premature " + "notification of %d features to consumer due " + "to too big array", + psFillArrowArray->nCountRows); + psFillArrowArray->bMemoryLimitReached = true; + if (psFillArrowArray->bAsynchronousMode) + { + std::unique_lock oLock( + psFillArrowArray->oMutex); + psFillArrowArray->psHelper->Shrink( + psFillArrowArray->nCountRows); + psFillArrowArray->oCV.notify_one(); + while (psFillArrowArray->nCountRows > 0) + { + psFillArrowArray->oCV.wait(oLock); + } + goto begin; + } + else + { + sqlite3_interrupt(psFillArrowArray->hDB); + return; + } + } + } + GByte *outPtr = psHelper->GetPtrForStringOrBinary( iArrowField, iFeat, nWKBSize); if (outPtr == nullptr) @@ -7709,6 +7748,42 @@ void OGR_GPKG_FillArrowArray_Step(sqlite3_context *pContext, int /*argc*/, const void *pabyData = sqlite3_value_blob(argv[iCol]); if (pabyData != nullptr || nBytes == 0) { + if (psFillArrowArray->nCountRows > 0) + { + auto panOffsets = static_cast( + const_cast(psArray->buffers[1])); + const uint32_t nCurLength = + static_cast(panOffsets[iFeat]); + if (nBytes <= nMemLimit && + nBytes > nMemLimit - nCurLength) + { + CPLDebug("GPKG", + "OGR_GPKG_FillArrowArray_Step(): " + "premature notification of %d features to " + "consumer due to too big array", + psFillArrowArray->nCountRows); + psFillArrowArray->bMemoryLimitReached = true; + if (psFillArrowArray->bAsynchronousMode) + { + std::unique_lock oLock( + psFillArrowArray->oMutex); + psFillArrowArray->psHelper->Shrink( + psFillArrowArray->nCountRows); + psFillArrowArray->oCV.notify_one(); + while (psFillArrowArray->nCountRows > 0) + { + psFillArrowArray->oCV.wait(oLock); + } + goto begin; + } + else + { + sqlite3_interrupt(psFillArrowArray->hDB); + return; + } + } + } + GByte *outPtr = psHelper->GetPtrForStringOrBinary( iArrowField, iFeat, nBytes); if (outPtr == nullptr) @@ -7763,6 +7838,42 @@ void OGR_GPKG_FillArrowArray_Step(sqlite3_context *pContext, int /*argc*/, if (pszTxt != nullptr) { const size_t nBytes = strlen(pszTxt); + if (psFillArrowArray->nCountRows > 0) + { + auto panOffsets = static_cast( + const_cast(psArray->buffers[1])); + const uint32_t nCurLength = + static_cast(panOffsets[iFeat]); + if (nBytes <= nMemLimit && + nBytes > nMemLimit - nCurLength) + { + CPLDebug("GPKG", + "OGR_GPKG_FillArrowArray_Step(): " + "premature notification of %d features to " + "consumer due to too big array", + psFillArrowArray->nCountRows); + psFillArrowArray->bMemoryLimitReached = true; + if (psFillArrowArray->bAsynchronousMode) + { + std::unique_lock oLock( + psFillArrowArray->oMutex); + psFillArrowArray->psHelper->Shrink( + psFillArrowArray->nCountRows); + psFillArrowArray->oCV.notify_one(); + while (psFillArrowArray->nCountRows > 0) + { + psFillArrowArray->oCV.wait(oLock); + } + goto begin; + } + else + { + sqlite3_interrupt(psFillArrowArray->hDB); + return; + } + } + } + GByte *outPtr = psHelper->GetPtrForStringOrBinary( iArrowField, iFeat, nBytes); if (outPtr == nullptr) @@ -7878,8 +7989,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArrayAsynchronous( // error) { std::unique_lock oLock(m_poFillArrowArray->oMutex); - while (m_poFillArrowArray->nCountRows < - m_poFillArrowArray->nMaxBatchSize && + while (m_poFillArrowArray->nCountRows == 0 && !m_poFillArrowArray->bIsFinished) { m_poFillArrowArray->oCV.wait(oLock); @@ -8170,11 +8280,18 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream, sizeof(struct ArrowArray)); memset(task->m_psArrowArray.get(), 0, sizeof(struct ArrowArray)); + if (task->m_bMemoryLimitReached) + { + m_nIsCompatOfOptimizedGetNextArrowArray = false; + stopThread(); + CancelAsyncNextArrowArray(); + return 0; + } // Are the records still available for reading beyond the current // queued tasks ? If so, recycle this task to read them - if (task->m_iStartShapeId + - static_cast(nTasks) * nMaxBatchSize <= - m_nTotalFeatureCount) + else if (task->m_iStartShapeId + + static_cast(nTasks) * nMaxBatchSize <= + m_nTotalFeatureCount) { task->m_iStartShapeId += static_cast(nTasks) * nMaxBatchSize; @@ -8287,9 +8404,12 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream, { taskPtr->m_bFetchRows = false; taskPtr->m_poLayer->GetNextArrowArrayInternal( - taskPtr->m_psArrowArray.get(), taskPtr->m_osErrorMsg); + taskPtr->m_psArrowArray.get(), taskPtr->m_osErrorMsg, + taskPtr->m_bMemoryLimitReached); taskPtr->m_bArrayReady = true; taskPtr->m_oCV.notify_one(); + if (taskPtr->m_bMemoryLimitReached) + break; // cppcheck-suppress knownConditionTrueFalse while (!taskPtr->m_bStop && !taskPtr->m_bFetchRows) { @@ -8314,9 +8434,16 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream, } std::string osErrorMsg; - int ret = GetNextArrowArrayInternal(out_array, osErrorMsg); + bool bMemoryLimitReached = false; + int ret = + GetNextArrowArrayInternal(out_array, osErrorMsg, bMemoryLimitReached); if (!osErrorMsg.empty()) CPLError(CE_Failure, CPLE_AppDefined, "%s", osErrorMsg.c_str()); + if (bMemoryLimitReached) + { + CancelAsyncNextArrowArray(); + m_nIsCompatOfOptimizedGetNextArrowArray = false; + } return ret; } @@ -8325,8 +8452,10 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream, /************************************************************************/ int OGRGeoPackageTableLayer::GetNextArrowArrayInternal( - struct ArrowArray *out_array, std::string &osErrorMsg) + struct ArrowArray *out_array, std::string &osErrorMsg, + bool &bMemoryLimitReached) { + bMemoryLimitReached = false; memset(out_array, 0, sizeof(*out_array)); if (m_iNextShapeId >= m_nTotalFeatureCount) @@ -8344,6 +8473,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArrayInternal( OGRGPKGTableLayerFillArrowArray sFillArrowArray; sFillArrowArray.psHelper = std::move(psHelper); sFillArrowArray.nCountRows = 0; + sFillArrowArray.bMemoryLimitReached = false; sFillArrowArray.bErrorOccurred = false; sFillArrowArray.poFeatureDefn = m_poFeatureDefn; sFillArrowArray.poLayer = this; @@ -8401,13 +8531,16 @@ int OGRGeoPackageTableLayer::GetNextArrowArrayInternal( if (sqlite3_exec(m_poDS->GetDB(), osSQL.c_str(), nullptr, nullptr, &pszErrMsg) != SQLITE_OK) { - if (!sFillArrowArray.bErrorOccurred) + if (!sFillArrowArray.bErrorOccurred && + !sFillArrowArray.bMemoryLimitReached) { osErrorMsg = pszErrMsg ? pszErrMsg : "unknown error"; } } sqlite3_free(pszErrMsg); + bMemoryLimitReached = sFillArrowArray.bMemoryLimitReached; + // Delete function sqlite3_create_function(m_poDS->GetDB(), "OGR_GPKG_FillArrowArray_INTERNAL", -1, SQLITE_UTF8 | SQLITE_DETERMINISTIC, nullptr, diff --git a/ogr/ogrsf_frmts/ogrsf_frmts.h b/ogr/ogrsf_frmts/ogrsf_frmts.h index 126a186a33bb..7a4f9fbcdfb0 100644 --- a/ogr/ogrsf_frmts/ogrsf_frmts.h +++ b/ogr/ogrsf_frmts/ogrsf_frmts.h @@ -37,6 +37,7 @@ #include "gdal_priv.h" #include +#include /** * \file ogrsf_frmts.h @@ -134,6 +135,7 @@ class CPL_DLL OGRLayer : public GDALMajorObject OGRLayer *m_poLayer = nullptr; std::vector m_anQueriedFIDs{}; size_t m_iQueriedFIDS = 0; + std::deque> m_oFeatureQueue{}; }; std::shared_ptr m_poSharedArrowArrayStreamPrivateData{};