From eb3d124f859925259396e026d0dedd5259958aaf Mon Sep 17 00:00:00 2001 From: Even Rouault Date: Thu, 1 Feb 2024 15:20:32 +0100 Subject: [PATCH] Parquet writer: add SORT_BY_BBOX=YES/NO layer creation option Defaults to NO Documentation: ``` - .. lco:: SORT_BY_BBOX :choices: YES, NO :default: NO :since: 3.9 Whether features should be sorted based on the bounding box of their geometries, before being written in the final file. Sorting them enables faster spatial filtering on reading, by grouping together spatially close features in the same group of rows. Note however that enabling this option involves creating a temporary GeoPackage file (in the same directory as the final Parquet file), and thus requires temporary storage (possibly up to several times the size of the final Parquet file, depending on Parquet compression) and additional processing time. The efficiency of spatial filtering depends on the ROW_GROUP_SIZE. If it is too large, too many features that are not spatially close will be grouped together. If it is too small, the file size will increase, and extra processing time will be necessary to browse through the row groups. Note also that when this option is enabled, the Arrow writing API (which is for example triggered when using ogr2ogr to convert from Parquet to Parquet), fallbacks to the generic implementation, which does not support advanced Arrow types (lists, maps, etc.). ``` Experiments with the canonical https://storage.googleapis.com/open-geodata/linz-examples/nz-building-outlines.parquet dataset: * Generation of datasets: // Organize in row groups of 65,536 features, no BBOX, no sorting ``` $ time ogr2ogr out_no_bbox.parquet nz-building-outlines.parquet -progress -lco WRITE_COVERING_BBOX=NO 0...10...20...30...40...50...60...70...80...90...100 - done. real 0m4,457s ``` // Organize in row groups of 65,536 features, add BBOX columns, no sorting ``` $ time ogr2ogr out_unsorted.parquet nz-building-outlines.parquet -progress 0...10...20...30...40...50...60...70...80...90...100 - done. real 0m5,408s ``` // Organize in row groups of max 65,536 features, add BBOX columns, sort using RTree ``` $ time ogr2ogr out_sorted.parquet nz-building-outlines.parquet -progress -lco SORT_BY_BBOX=YES 0...10...20...30...40...50...60...70...80...90...100 - done. real 0m40,311s ``` // Organize in row groups of max 16,384 features, add BBOX columns, sort using RTree ``` $ time ogr2ogr out_sorted_16384.parquet nz-building-outlines.parquet -progress -lco SORT_BY_BBOX=YES -lco ROW_GROUP_SIZE=16384 0...10...20...30...40...50...60...70...80...90...100 - done. real 0m44,149s ``` * File sizes: ``` out_no_bbox.parquet 436,475,127 out_unsorted.parquet 504,120,728 out_sorted.parquet 489,507,910 out_sorted_16384.parquet 492,760,561 ``` * Spatial filter selecting a single feature: ``` $ time ogrinfo out_no_bbox.parquet -spat 1818654 5546189 1818655 5546190 -al -so -json -noextent | jq .layers[0].featureCount 1 real 0m1,302s $ time ogrinfo out_unsorted.parquet -spat 1818654 5546189 1818655 5546190 -al -so -json -noextent | jq .layers[0].featureCount 1 real 0m0,947s $ time ogrinfo out_sorted.parquet -spat 1818654 5546189 1818655 5546190 -al -so -json -noextent | jq .layers[0].featureCount 1 real 0m0,278s $ time ogrinfo out_sorted_16384.parquet -spat 1818654 5546189 1818655 5546190 -al -so -json -noextent | jq .layers[0].featureCount 1 real 0m0,183s ``` * Spatial filter selecting ~ 470,000 features (over a total of 3.2 millions): ``` $ time ogrinfo out_no_bbox.parquet -spat 1750445 5812014 1912866 5906677 -al -so -json -noextent | jq .layers[0].featureCount 471147 real 0m1,957s $ time ogrinfo out_unsorted.parquet -spat 1750445 5812014 1912866 5906677 -al -so -json -noextent | jq .layers[0].featureCount 471147 real 0m1,718s $ time ogrinfo out_sorted.parquet -spat 1750445 5812014 1912866 5906677 -al -so -json -noextent | jq .layers[0].featureCount 471147 real 0m1,067s $ time ogrinfo out_sorted_16384.parquet -spat 1750445 5812014 1912866 5906677 -al -so -json -noextent | jq .layers[0].featureCount 471147 real 0m1,021s ``` --- autotest/ogr/ogr_parquet.py | 100 +++++ doc/source/drivers/vector/parquet.rst | 26 ++ ogr/ogrsf_frmts/arrow/ogr_feather.h | 2 +- .../arrow/ogrfeatherwriterlayer.cpp | 3 +- ogr/ogrsf_frmts/arrow_common/ogr_arrow.h | 6 +- .../arrow_common/ograrrowwriterlayer.hpp | 39 +- ogr/ogrsf_frmts/parquet/ogr_parquet.h | 28 +- ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp | 136 +++++++ ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp | 25 ++ .../parquet/ogrparquetwriterdataset.cpp | 23 ++ .../parquet/ogrparquetwriterlayer.cpp | 372 +++++++++++++++++- 11 files changed, 738 insertions(+), 22 deletions(-) diff --git a/autotest/ogr/ogr_parquet.py b/autotest/ogr/ogr_parquet.py index a277424f94db..f6b3d909c585 100755 --- a/autotest/ogr/ogr_parquet.py +++ b/autotest/ogr/ogr_parquet.py @@ -3383,3 +3383,103 @@ def test_ogr_parquet_get_extent_3d(tmp_vsimem): lyr = ds.GetLayer(0) assert lyr.TestCapability(ogr.OLCFastGetExtent3D) == 0 assert lyr.GetExtent3D() == (1.0, 4.0, 2.0, 5.0, 3.0, 6.0) + + +############################################################################### + + +@gdaltest.enable_exceptions() +@pytest.mark.require_driver("GPKG") +def test_ogr_parquet_sort_by_bbox(tmp_vsimem): + + outfilename = str(tmp_vsimem / "test_ogr_parquet_sort_by_bbox.parquet") + ds = ogr.GetDriverByName("Parquet").CreateDataSource(outfilename) + + gpkg_drv = gdal.GetDriverByName("GPKG") + gpkg_drv.Deregister() + ROW_GROUP_SIZE = 100 + try: + with pytest.raises( + Exception, + match="Driver GPKG required for SORT_BY_BBOX layer creation option", + ): + ds.CreateLayer( + "test", + geom_type=ogr.wkbPoint, + options=["SORT_BY_BBOX=YES", f"ROW_GROUP_SIZE={ROW_GROUP_SIZE}"], + ) + finally: + gpkg_drv.Register() + + lyr = ds.CreateLayer( + "test", + geom_type=ogr.wkbPoint, + options=["SORT_BY_BBOX=YES", f"ROW_GROUP_SIZE={ROW_GROUP_SIZE}", "FID=fid"], + ) + assert lyr.TestCapability(ogr.OLCFastWriteArrowBatch) == 0 + lyr.CreateField(ogr.FieldDefn("i", ogr.OFTInteger)) + COUNT_NON_SPATIAL = 501 + COUNT_SPATIAL = 601 + for i in range(COUNT_NON_SPATIAL): + f = ogr.Feature(lyr.GetLayerDefn()) + f["i"] = i + lyr.CreateFeature(f) + for i in range(COUNT_SPATIAL): + f = ogr.Feature(lyr.GetLayerDefn()) + f["i"] = i + COUNT_NON_SPATIAL + f.SetGeometryDirectly(ogr.CreateGeometryFromWkt(f"POINT({i} {i})")) + lyr.CreateFeature(f) + ds = None + + with gdaltest.config_option("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "YES"): + ds = ogr.Open(outfilename) + lyr = ds.GetLayer(0) + theorical_number_of_groups = ( + COUNT_SPATIAL + ROW_GROUP_SIZE - 1 + ) // ROW_GROUP_SIZE + assert lyr.GetFeatureCount() - theorical_number_of_groups <= max( + 1, 0.3 * theorical_number_of_groups + ) + assert sum([f["feature_count"] for f in lyr]) == COUNT_SPATIAL + + def check_file(filename): + ds = ogr.Open(filename) + lyr = ds.GetLayer(0) + + # First features should be non spatial ones + for i in range(COUNT_NON_SPATIAL): + f = lyr.GetNextFeature() + assert f.GetFID() == i + assert f["i"] == i + assert f.GetGeometryRef() is None + + # Now spatial features + count = 0 + foundNonSequential = False + set_i = set() + while True: + f = lyr.GetNextFeature() + if not f: + break + assert f["i"] >= COUNT_NON_SPATIAL + if f["i"] != i + COUNT_NON_SPATIAL: + foundNonSequential = True + assert f["i"] not in set_i + set_i.add(f["i"]) + assert f.GetFID() == f["i"] + assert f.GetGeometryRef().GetX() == f["i"] - COUNT_NON_SPATIAL + count += 1 + + assert count == COUNT_SPATIAL + assert foundNonSequential + + check_file(outfilename) + + # Check that this works also when using the Arrow interface for creation + outfilename2 = str(tmp_vsimem / "test_ogr_parquet_sort_by_bbox2.parquet") + gdal.VectorTranslate( + outfilename2, + outfilename, + layerCreationOptions=["SORT_BY_BBOX=YES", "ROW_GROUP_SIZE=100"], + ) + check_file(outfilename2) diff --git a/doc/source/drivers/vector/parquet.rst b/doc/source/drivers/vector/parquet.rst index b524e7953a0f..687d83f1ddf4 100644 --- a/doc/source/drivers/vector/parquet.rst +++ b/doc/source/drivers/vector/parquet.rst @@ -123,6 +123,32 @@ Layer creation options Whether to write xmin/ymin/xmax/ymax columns with the bounding box of geometries. +- .. lco:: SORT_BY_BBOX + :choices: YES, NO + :default: NO + :since: 3.9 + + Whether features should be sorted based on the bounding box of their + geometries, before being written in the final file. Sorting them enables + faster spatial filtering on reading, by grouping together spatially close + features in the same group of rows. + + Note however that enabling this option involves creating a temporary + GeoPackage file (in the same directory as the final Parquet file), + and thus requires temporary storage (possibly up to several times the size + of the final Parquet file, depending on Parquet compression) and additional + processing time. + + The efficiency of spatial filtering depends on the ROW_GROUP_SIZE. If it + is too large, too many features that are not spatially close will be grouped + together. If it is too small, the file size will increase, and extra + processing time will be necessary to browse through the row groups. + + Note also that when this option is enabled, the Arrow writing API (which + is for example triggered when using ogr2ogr to convert from Parquet to Parquet), + fallbacks to the generic implementation, which does not support advanced + Arrow types (lists, maps, etc.). + SQL support ----------- diff --git a/ogr/ogrsf_frmts/arrow/ogr_feather.h b/ogr/ogrsf_frmts/arrow/ogr_feather.h index 59bc6b51df16..d02c73563d97 100644 --- a/ogr/ogrsf_frmts/arrow/ogr_feather.h +++ b/ogr/ogrsf_frmts/arrow/ogr_feather.h @@ -165,7 +165,7 @@ class OGRFeatherWriterLayer final : public OGRArrowWriterLayer return m_poFileWriter != nullptr; } virtual void CreateWriter() override; - virtual void CloseFileWriter() override; + virtual bool CloseFileWriter() override; virtual void CreateSchema() override; virtual void PerformStepsBeforeFinalFlushGroup() override; diff --git a/ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp b/ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp index 4bb9db0cf2b2..c9fa9c7f8057 100644 --- a/ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp +++ b/ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp @@ -201,7 +201,7 @@ bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename, /* CloseFileWriter() */ /************************************************************************/ -void OGRFeatherWriterLayer::CloseFileWriter() +bool OGRFeatherWriterLayer::CloseFileWriter() { auto status = m_poFileWriter->Close(); if (!status.ok()) @@ -210,6 +210,7 @@ void OGRFeatherWriterLayer::CloseFileWriter() "FileWriter::Close() failed with %s", status.message().c_str()); } + return status.ok(); } /************************************************************************/ diff --git a/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h b/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h index 016cc1de30d2..ea8375b9da71 100644 --- a/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h +++ b/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h @@ -381,7 +381,7 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer virtual bool IsFileWriterCreated() const = 0; virtual void CreateWriter() = 0; - virtual void CloseFileWriter() = 0; + virtual bool CloseFileWriter() = 0; void CreateSchemaCommon(); void FinalizeSchema(); @@ -396,7 +396,7 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer void ClearArrayBuilers(); virtual bool FlushGroup() = 0; - void FinalizeWriting(); + bool FinalizeWriting(); bool WriteArrays(std::function &, const std::shared_ptr &)> postProcessArray); @@ -468,6 +468,8 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer protected: OGRErr ICreateFeature(OGRFeature *poFeature) override; + + bool FlushFeatures(); }; #endif // OGR_ARROW_H diff --git a/ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp b/ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp index a07e88395dc6..3f203b265901 100644 --- a/ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp +++ b/ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp @@ -94,8 +94,10 @@ inline OGRArrowWriterLayer::~OGRArrowWriterLayer() /* FinalizeWriting() */ /************************************************************************/ -inline void OGRArrowWriterLayer::FinalizeWriting() +inline bool OGRArrowWriterLayer::FinalizeWriting() { + bool ret = true; + if (!IsFileWriterCreated()) { CreateWriter(); @@ -105,10 +107,13 @@ inline void OGRArrowWriterLayer::FinalizeWriting() PerformStepsBeforeFinalFlushGroup(); if (!m_apoBuilders.empty() && m_apoFieldsFromArrowSchema.empty()) - FlushGroup(); + ret = FlushGroup(); - CloseFileWriter(); + if (!CloseFileWriter()) + ret = false; } + + return ret; } /************************************************************************/ @@ -1767,20 +1772,32 @@ inline OGRErr OGRArrowWriterLayer::ICreateFeature(OGRFeature *poFeature) // Flush the current row group if reaching the limit of rows per group. if (!m_apoBuilders.empty() && m_apoBuilders[0]->length() == m_nRowGroupSize) { - if (!IsFileWriterCreated()) - { - CreateWriter(); - if (!IsFileWriterCreated()) - return OGRERR_FAILURE; - } - - if (!FlushGroup()) + if (!FlushFeatures()) return OGRERR_FAILURE; } return OGRERR_NONE; } +/************************************************************************/ +/* FlushFeatures() */ +/************************************************************************/ + +inline bool OGRArrowWriterLayer::FlushFeatures() +{ + if (m_apoBuilders.empty() || m_apoBuilders[0]->length() == 0) + return true; + + if (!IsFileWriterCreated()) + { + CreateWriter(); + if (!IsFileWriterCreated()) + return false; + } + + return FlushGroup(); +} + /************************************************************************/ /* GetFeatureCount() */ /************************************************************************/ diff --git a/ogr/ogrsf_frmts/parquet/ogr_parquet.h b/ogr/ogrsf_frmts/parquet/ogr_parquet.h index 9488234a2c99..9c7e6440860d 100644 --- a/ogr/ogrsf_frmts/parquet/ogr_parquet.h +++ b/ogr/ogrsf_frmts/parquet/ogr_parquet.h @@ -197,6 +197,9 @@ class OGRParquetLayer final : public OGRParquetLayerBase OGRFieldType &eType, OGRFieldSubType &eSubType, std::string &osMinTmp, std::string &osMaxTmp) const; + + bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin, + int &iParquetXMax, int &iParquetYMax) const; }; /************************************************************************/ @@ -282,12 +285,19 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer bool m_bEdgesSpherical = false; parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{}; + //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode + std::unique_ptr m_poTmpGPKG{}; + //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode + OGRLayer *m_poTmpGPKGLayer = nullptr; + //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode + GIntBig m_nTmpFeatureCount = 0; + virtual bool IsFileWriterCreated() const override { return m_poFileWriter != nullptr; } virtual void CreateWriter() override; - virtual void CloseFileWriter() override; + virtual bool CloseFileWriter() override; virtual void CreateSchema() override; virtual void PerformStepsBeforeFinalFlushGroup() override; @@ -312,14 +322,15 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer std::string GetGeoMetadata() const; + //! Copy temporary GeoPackage layer to final Parquet file + bool CopyTmpGpkgLayerToFinalFile(); + public: OGRParquetWriterLayer( OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool, const std::shared_ptr &poOutputStream, const char *pszLayerName); - ~OGRParquetWriterLayer() override; - CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override; bool SetOptions(CSLConstList papszOptions, @@ -355,10 +366,19 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer bool IsArrowSchemaSupported(const struct ArrowSchema *schema, CSLConstList papszOptions, std::string &osErrorMsg) const override; + bool + CreateFieldFromArrowSchema(const struct ArrowSchema *schema, + CSLConstList papszOptions = nullptr) override; bool WriteArrowBatch(const struct ArrowSchema *schema, struct ArrowArray *array, CSLConstList papszOptions = nullptr) override; #endif + + protected: + OGRErr ICreateFeature(OGRFeature *poFeature) override; + + friend class OGRParquetWriterDataset; + bool Close(); }; /************************************************************************/ @@ -380,6 +400,8 @@ class OGRParquetWriterDataset final : public GDALPamDataset return m_poMemoryPool.get(); } + CPLErr Close() override; + int GetLayerCount() override; OGRLayer *GetLayer(int idx) override; int TestCapability(const char *pszCap) override; diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp index 9cd1a52559f5..746032728958 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp @@ -363,6 +363,124 @@ OpenParquetDatasetWithoutMetadata(const std::string &osBasePathIn, #endif +/************************************************************************/ +/* BuildMemDatasetWithRowGroupExtents() */ +/************************************************************************/ + +/** Builds a Memory dataset that contains, for each row-group of the input file, + * the feature count and spatial extent of the features of this row group, + * using Parquet statistics. This assumes that the Parquet file declares + * a "covering":{"bbox":{ ... }} metadata item. + * + * Only for debug purposes. + */ +static GDALDataset *BuildMemDatasetWithRowGroupExtents(OGRParquetLayer *poLayer) +{ + int iParquetXMin = -1; + int iParquetYMin = -1; + int iParquetXMax = -1; + int iParquetYMax = -1; + if (poLayer->GeomColsBBOXParquet(0, iParquetXMin, iParquetYMin, + iParquetXMax, iParquetYMax)) + { + auto poMemDrv = GetGDALDriverManager()->GetDriverByName("Memory"); + if (!poMemDrv) + return nullptr; + auto poMemDS = std::unique_ptr( + poMemDrv->Create("", 0, 0, 0, GDT_Unknown, nullptr)); + if (!poMemDS) + return nullptr; + OGRSpatialReference *poTmpSRS = nullptr; + const auto poSrcSRS = poLayer->GetSpatialRef(); + if (poSrcSRS) + poTmpSRS = poSrcSRS->Clone(); + auto poMemLayer = + poMemDS->CreateLayer("footprint", poTmpSRS, wkbPolygon, nullptr); + if (poTmpSRS) + poTmpSRS->Release(); + if (!poMemLayer) + return nullptr; + poMemLayer->CreateField( + std::make_unique("feature_count", OFTInteger64) + .get()); + + const auto metadata = + poLayer->GetReader()->parquet_reader()->metadata(); + const int numRowGroups = metadata->num_row_groups(); + for (int iRowGroup = 0; iRowGroup < numRowGroups; ++iRowGroup) + { + std::string osMinTmp, osMaxTmp; + OGRField unusedF; + bool unusedB; + OGRFieldSubType unusedSubType; + + OGRField sXMin; + OGR_RawField_SetNull(&sXMin); + bool bFoundXMin = false; + OGRFieldType eXMinType = OFTMaxType; + + OGRField sYMin; + OGR_RawField_SetNull(&sYMin); + bool bFoundYMin = false; + OGRFieldType eYMinType = OFTMaxType; + + OGRField sXMax; + OGR_RawField_SetNull(&sXMax); + bool bFoundXMax = false; + OGRFieldType eXMaxType = OFTMaxType; + + OGRField sYMax; + OGR_RawField_SetNull(&sYMax); + bool bFoundYMax = false; + OGRFieldType eYMaxType = OFTMaxType; + + if (poLayer->GetMinMaxForParquetCol( + iRowGroup, iParquetXMin, nullptr, + /* bComputeMin = */ true, sXMin, bFoundXMin, + /* bComputeMax = */ false, unusedF, unusedB, eXMinType, + unusedSubType, osMinTmp, osMaxTmp) && + bFoundXMin && eXMinType == OFTReal && + poLayer->GetMinMaxForParquetCol( + iRowGroup, iParquetYMin, nullptr, + /* bComputeMin = */ true, sYMin, bFoundYMin, + /* bComputeMax = */ false, unusedF, unusedB, eYMinType, + unusedSubType, osMinTmp, osMaxTmp) && + bFoundYMin && eYMinType == OFTReal && + poLayer->GetMinMaxForParquetCol( + iRowGroup, iParquetXMax, nullptr, + /* bComputeMin = */ false, unusedF, unusedB, + /* bComputeMax = */ true, sXMax, bFoundXMax, eXMaxType, + unusedSubType, osMaxTmp, osMaxTmp) && + bFoundXMax && eXMaxType == OFTReal && + poLayer->GetMinMaxForParquetCol( + iRowGroup, iParquetYMax, nullptr, + /* bComputeMin = */ false, unusedF, unusedB, + /* bComputeMax = */ true, sYMax, bFoundYMax, eYMaxType, + unusedSubType, osMaxTmp, osMaxTmp) && + bFoundYMax && eYMaxType == OFTReal) + { + OGRFeature oFeat(poMemLayer->GetLayerDefn()); + oFeat.SetField(0, + static_cast( + metadata->RowGroup(iRowGroup)->num_rows())); + auto poPoly = std::make_unique(); + auto poLR = std::make_unique(); + poLR->addPoint(sXMin.Real, sYMin.Real); + poLR->addPoint(sXMin.Real, sYMax.Real); + poLR->addPoint(sXMax.Real, sYMax.Real); + poLR->addPoint(sXMax.Real, sYMin.Real); + poLR->addPoint(sXMin.Real, sYMin.Real); + poPoly->addRingDirectly(poLR.release()); + oFeat.SetGeometryDirectly(poPoly.release()); + CPL_IGNORE_RET_VAL(poMemLayer->CreateFeature(&oFeat)); + } + } + + return poMemDS.release(); + } + return nullptr; +} + /************************************************************************/ /* Open() */ /************************************************************************/ @@ -533,6 +651,14 @@ static GDALDataset *OGRParquetDriverOpen(GDALOpenInfo *poOpenInfo) auto poLayer = std::make_unique( poDS.get(), CPLGetBasename(osFilename.c_str()), std::move(arrow_reader), poOpenInfo->papszOpenOptions); + + // For debug purposes: return a layer with the extent of each row group + if (CPLTestBool( + CPLGetConfigOption("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "NO"))) + { + return BuildMemDatasetWithRowGroupExtents(poLayer.get()); + } + poDS->SetLayer(std::move(poLayer)); return poDS.release(); } @@ -749,6 +875,16 @@ void OGRParquetDriver::InitMetadata() "geometries"); } + { + auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option"); + CPLAddXMLAttributeAndValue(psOption, "name", "SORT_BY_BBOX"); + CPLAddXMLAttributeAndValue(psOption, "type", "boolean"); + CPLAddXMLAttributeAndValue(psOption, "default", "NO"); + CPLAddXMLAttributeAndValue(psOption, "description", + "Whether features should be sorted based on " + "the bounding box of their geometries"); + } + char *pszXML = CPLSerializeXMLTree(oTree.get()); GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML); CPLFree(pszXML); diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp index e98d00474bb8..d8c092bb1ffe 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp @@ -2547,3 +2547,28 @@ bool OGRParquetLayer::GetMinMaxForParquetCol( return bFoundMin || bFoundMax; } + +/************************************************************************/ +/* GeomColsBBOXParquet() */ +/************************************************************************/ + +/** Return for a given geometry column (iGeom: in [0, GetGeomFieldCount()-1] range), + * the Parquet column number of the corresponding xmin,ymin,xmax,ymax bounding + * box columns, if existing. + */ +bool OGRParquetLayer::GeomColsBBOXParquet(int iGeom, int &iParquetXMin, + int &iParquetYMin, int &iParquetXMax, + int &iParquetYMax) const +{ + const auto oIter = m_oMapGeomFieldIndexToGeomColBBOXParquet.find(iGeom); + const bool bFound = + (oIter != m_oMapGeomFieldIndexToGeomColBBOXParquet.end()); + if (bFound) + { + iParquetXMin = oIter->second.iParquetXMin; + iParquetYMin = oIter->second.iParquetYMin; + iParquetXMax = oIter->second.iParquetXMax; + iParquetYMax = oIter->second.iParquetYMax; + } + return bFound; +} diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetwriterdataset.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetwriterdataset.cpp index 545b45a420b8..f970c001a243 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetwriterdataset.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetwriterdataset.cpp @@ -41,6 +41,29 @@ OGRParquetWriterDataset::OGRParquetWriterDataset( { } +/************************************************************************/ +/* Close() */ +/************************************************************************/ + +CPLErr OGRParquetWriterDataset::Close() +{ + CPLErr eErr = CE_None; + if (nOpenFlags != OPEN_FLAGS_CLOSED) + { + if (m_poLayer && !m_poLayer->Close()) + { + eErr = CE_Failure; + } + + if (GDALPamDataset::Close() != CE_None) + { + eErr = CE_Failure; + } + } + + return eErr; +} + /************************************************************************/ /* GetLayerCount() */ /************************************************************************/ diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetwriterlayer.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetwriterlayer.cpp index 860810e8a4d4..43b496932c5d 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetwriterlayer.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetwriterlayer.cpp @@ -35,6 +35,8 @@ #include "ogr_wkb.h" +#include + /************************************************************************/ /* OGRParquetWriterLayer() */ /************************************************************************/ @@ -51,13 +53,235 @@ OGRParquetWriterLayer::OGRParquetWriterLayer( } /************************************************************************/ -/* ~OGRParquetWriterLayer() */ +/* Close() */ /************************************************************************/ -OGRParquetWriterLayer::~OGRParquetWriterLayer() +bool OGRParquetWriterLayer::Close() { + if (m_poTmpGPKGLayer) + { + if (!CopyTmpGpkgLayerToFinalFile()) + return false; + } + if (m_bInitializationOK) - FinalizeWriting(); + { + if (!FinalizeWriting()) + return false; + } + + return true; +} + +/************************************************************************/ +/* CopyTmpGpkgLayerToFinalFile() */ +/************************************************************************/ + +bool OGRParquetWriterLayer::CopyTmpGpkgLayerToFinalFile() +{ + if (!m_poTmpGPKGLayer) + { + return true; + } + + CPLDebug("PARQUET", "CopyTmpGpkgLayerToFinalFile(): start..."); + + VSIUnlink(m_poTmpGPKG->GetDescription()); + + OGRFeature oFeat(m_poFeatureDefn); + + // Interval in terms of features between 2 debug progress report messages + constexpr int PROGRESS_FC_INTERVAL = 100 * 1000; + + // First, write features without geometries + { + auto poTmpLayer = std::unique_ptr(m_poTmpGPKG->ExecuteSQL( + "SELECT serialized_feature FROM tmp WHERE fid NOT IN (SELECT id " + "FROM rtree_tmp_geom)", + nullptr, nullptr)); + if (!poTmpLayer) + return false; + for (const auto &poSrcFeature : poTmpLayer.get()) + { + int nBytesFeature = 0; + const GByte *pabyFeatureData = + poSrcFeature->GetFieldAsBinary(0, &nBytesFeature); + if (!oFeat.DeserializeFromBinary(pabyFeatureData, nBytesFeature)) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot deserialize feature"); + return false; + } + if (OGRArrowWriterLayer::ICreateFeature(&oFeat) != OGRERR_NONE) + { + return false; + } + + if ((m_nFeatureCount % PROGRESS_FC_INTERVAL) == 0) + { + CPLDebugProgress( + "PARQUET", + "CopyTmpGpkgLayerToFinalFile(): %.02f%% progress", + 100.0 * double(m_nFeatureCount) / + double(m_nTmpFeatureCount)); + } + } + + if (!FlushFeatures()) + { + return false; + } + } + + // Now walk through the GPKG RTree for features with geometries + // Cf https://github.com/sqlite/sqlite/blob/master/ext/rtree/rtree.c + // for the description of the content of the rtree _node table + std::vector> aNodeNoDepthPair; + int nTreeDepth = 0; + // Queue the root node + aNodeNoDepthPair.emplace_back( + std::make_pair(/* nodeNo = */ 1, /* depth = */ 0)); + int nCountWrittenFeaturesSinceLastFlush = 0; + while (!aNodeNoDepthPair.empty()) + { + const auto &oLastPair = aNodeNoDepthPair.back(); + const int64_t nNodeNo = oLastPair.first; + const int nCurDepth = oLastPair.second; + //CPLDebug("PARQUET", "Reading nodeNode=%d, curDepth=%d", int(nNodeNo), nCurDepth); + aNodeNoDepthPair.pop_back(); + + auto poRTreeLayer = std::unique_ptr(m_poTmpGPKG->ExecuteSQL( + CPLSPrintf("SELECT data FROM rtree_tmp_geom_node WHERE nodeno " + "= " CPL_FRMT_GIB, + static_cast(nNodeNo)), + nullptr, nullptr)); + if (!poRTreeLayer) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot read node " CPL_FRMT_GIB, + static_cast(nNodeNo)); + return false; + } + const auto poRTreeFeature = + std::unique_ptr(poRTreeLayer->GetNextFeature()); + if (!poRTreeFeature) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot read node " CPL_FRMT_GIB, + static_cast(nNodeNo)); + return false; + } + + int nNodeBytes = 0; + const GByte *pabyNodeData = + poRTreeFeature->GetFieldAsBinary(0, &nNodeBytes); + constexpr int BLOB_HEADER_SIZE = 4; + if (nNodeBytes < BLOB_HEADER_SIZE) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Not enough bytes when reading node " CPL_FRMT_GIB, + static_cast(nNodeNo)); + return false; + } + if (nNodeNo == 1) + { + // Get the RTree depth from the root node + nTreeDepth = (pabyNodeData[0] << 8) | pabyNodeData[1]; + //CPLDebug("PARQUET", "nTreeDepth = %d", nTreeDepth); + } + + const int nCellCount = (pabyNodeData[2] << 8) | pabyNodeData[3]; + constexpr int SIZEOF_CELL = 24; // int64_t + 4 float + if (nNodeBytes < BLOB_HEADER_SIZE + SIZEOF_CELL * nCellCount) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Not enough bytes when reading node " CPL_FRMT_GIB, + static_cast(nNodeNo)); + return false; + } + + size_t nOffset = BLOB_HEADER_SIZE; + if (nCurDepth == nTreeDepth) + { + // Leaf node: it references feature IDs. + + // If we are about to go above m_nRowGroupSize, flush past + // features now, to improve the spatial compacity of the row group. + if (m_nRowGroupSize > nCellCount && + nCountWrittenFeaturesSinceLastFlush + nCellCount > + m_nRowGroupSize) + { + nCountWrittenFeaturesSinceLastFlush = 0; + if (!FlushFeatures()) + { + return false; + } + } + + for (int i = 0; i < nCellCount; ++i) + { + int64_t nFID; + memcpy(&nFID, pabyNodeData + nOffset, sizeof(int64_t)); + CPL_MSBPTR64(&nFID); + + const auto poSrcFeature = std::unique_ptr( + m_poTmpGPKGLayer->GetFeature(nFID)); + if (!poSrcFeature) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot get feature " CPL_FRMT_GIB, + static_cast(nFID)); + return false; + } + + int nBytesFeature = 0; + const GByte *pabyFeatureData = + poSrcFeature->GetFieldAsBinary(0, &nBytesFeature); + if (!oFeat.DeserializeFromBinary(pabyFeatureData, + nBytesFeature)) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot deserialize feature"); + return false; + } + if (OGRArrowWriterLayer::ICreateFeature(&oFeat) != OGRERR_NONE) + { + return false; + } + + nOffset += SIZEOF_CELL; + + ++nCountWrittenFeaturesSinceLastFlush; + + if ((m_nFeatureCount % PROGRESS_FC_INTERVAL) == 0 || + m_nFeatureCount == m_nTmpFeatureCount / 2) + { + CPLDebugProgress( + "PARQUET", + "CopyTmpGpkgLayerToFinalFile(): %.02f%% progress", + 100.0 * double(m_nFeatureCount) / + double(m_nTmpFeatureCount)); + } + } + } + else + { + // Non-leaf node: it references child nodes. + for (int i = 0; i < nCellCount; ++i) + { + int64_t nNode; + memcpy(&nNode, pabyNodeData + nOffset, sizeof(int64_t)); + CPL_MSBPTR64(&nNode); + aNodeNoDepthPair.emplace_back( + std::make_pair(nNode, nCurDepth + 1)); + nOffset += SIZEOF_CELL; + } + } + } + + CPLDebug("PARQUET", + "CopyTmpGpkgLayerToFinalFile(): 100%%, successfully finished"); + return true; } /************************************************************************/ @@ -99,6 +323,33 @@ bool OGRParquetWriterLayer::SetOptions(CSLConstList papszOptions, papszOptions, "WRITE_COVERING_BBOX", CPLGetConfigOption("OGR_PARQUET_WRITE_COVERING_BBOX", "YES"))); + if (CPLTestBool(CSLFetchNameValueDef(papszOptions, "SORT_BY_BBOX", "NO"))) + { + const std::string osTmpGPKG(std::string(m_poDataset->GetDescription()) + + ".tmp.gpkg"); + auto poGPKGDrv = GetGDALDriverManager()->GetDriverByName("GPKG"); + if (!poGPKGDrv) + { + CPLError( + CE_Failure, CPLE_AppDefined, + "Driver GPKG required for SORT_BY_BBOX layer creation option"); + return false; + } + m_poTmpGPKG.reset(poGPKGDrv->Create(osTmpGPKG.c_str(), 0, 0, 0, + GDT_Unknown, nullptr)); + if (!m_poTmpGPKG) + return false; + m_poTmpGPKG->MarkSuppressOnClose(); + m_poTmpGPKGLayer = m_poTmpGPKG->CreateLayer("tmp"); + if (!m_poTmpGPKGLayer) + return false; + // Serialized feature + m_poTmpGPKGLayer->CreateField( + std::make_unique("serialized_feature", OFTBinary) + .get()); + CPL_IGNORE_RET_VAL(m_poTmpGPKGLayer->StartTransaction()); + } + const char *pszGeomEncoding = CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING"); m_eGeomEncoding = OGRArrowGeomEncoding::WKB; @@ -234,7 +485,7 @@ bool OGRParquetWriterLayer::SetOptions(CSLConstList papszOptions, /* CloseFileWriter() */ /************************************************************************/ -void OGRParquetWriterLayer::CloseFileWriter() +bool OGRParquetWriterLayer::CloseFileWriter() { auto status = m_poFileWriter->Close(); if (!status.ok()) @@ -243,6 +494,7 @@ void OGRParquetWriterLayer::CloseFileWriter() "FileWriter::Close() failed with %s", status.message().c_str()); } + return status.ok(); } /************************************************************************/ @@ -740,6 +992,68 @@ void OGRParquetWriterLayer::CreateWriter() &m_poKeyValueMetadata)); } +/************************************************************************/ +/* ICreateFeature() */ +/************************************************************************/ + +OGRErr OGRParquetWriterLayer::ICreateFeature(OGRFeature *poFeature) +{ + // If not using SORT_BY_BBOX=YES layer creation option, we can directly + // write features to the final Parquet file + if (!m_poTmpGPKGLayer) + return OGRArrowWriterLayer::ICreateFeature(poFeature); + + // SORT_BY_BBOX=YES case: we write for now a serialized version of poFeature + // in a temporary GeoPackage file. + + GIntBig nFID = poFeature->GetFID(); + if (!m_osFIDColumn.empty() && nFID == OGRNullFID) + { + nFID = m_nTmpFeatureCount; + poFeature->SetFID(nFID); + } + ++m_nTmpFeatureCount; + + std::vector abyBuffer; + // Serialize the source feature as a single array of bytes to preserve it + // fully + if (!poFeature->SerializeToBinary(abyBuffer)) + { + return OGRERR_FAILURE; + } + + // SQLite3 limitation: a row must fit in slightly less than 1 GB. + constexpr int SOME_MARGIN = 128; + if (abyBuffer.size() > 1024 * 1024 * 1024 - SOME_MARGIN) + { + CPLError(CE_Failure, CPLE_NotSupported, + "Features larger than 1 GB are not supported"); + return OGRERR_FAILURE; + } + + OGRFeature oFeat(m_poTmpGPKGLayer->GetLayerDefn()); + oFeat.SetFID(nFID); + oFeat.SetField(0, static_cast(abyBuffer.size()), abyBuffer.data()); + const auto poSrcGeom = poFeature->GetGeometryRef(); + if (poSrcGeom && !poSrcGeom->IsEmpty()) + { + // For the purpose of building an RTree, just use the bounding box of + // the geometry as the geometry. + OGREnvelope sEnvelope; + poSrcGeom->getEnvelope(&sEnvelope); + auto poPoly = std::make_unique(); + auto poLR = std::make_unique(); + poLR->addPoint(sEnvelope.MinX, sEnvelope.MinY); + poLR->addPoint(sEnvelope.MinX, sEnvelope.MaxY); + poLR->addPoint(sEnvelope.MaxX, sEnvelope.MaxY); + poLR->addPoint(sEnvelope.MaxX, sEnvelope.MinY); + poLR->addPoint(sEnvelope.MinX, sEnvelope.MinY); + poPoly->addRingDirectly(poLR.release()); + oFeat.SetGeometryDirectly(poPoly.release()); + } + return m_poTmpGPKGLayer->CreateFeature(&oFeat); +} + /************************************************************************/ /* FlushGroup() */ /************************************************************************/ @@ -832,6 +1146,15 @@ OGRParquetWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema, struct ArrowArray *array, CSLConstList papszOptions) { + if (m_poTmpGPKGLayer) + { + // When using SORT_BY_BBOX=YES option, we can't directly write the + // input array, because we need to sort features. Hence we fallback + // to the OGRLayer base implementation, which will ultimately call + // OGRParquetWriterLayer::ICreateFeature() + return OGRLayer::WriteArrowBatch(schema, array, papszOptions); + } + return WriteArrowBatchInternal( schema, array, papszOptions, [this](const std::shared_ptr &poBatch) @@ -869,9 +1192,40 @@ inline int OGRParquetWriterLayer::TestCapability(const char *pszCap) if (EQUAL(pszCap, OLCFastWriteArrowBatch)) return false; #endif + + if (m_poTmpGPKGLayer && EQUAL(pszCap, OLCFastWriteArrowBatch)) + { + // When using SORT_BY_BBOX=YES option, we can't directly write the + // input array, because we need to sort features. So this is not + // fast + return false; + } + return OGRArrowWriterLayer::TestCapability(pszCap); } +/************************************************************************/ +/* CreateFieldFromArrowSchema() */ +/************************************************************************/ + +#if PARQUET_VERSION_MAJOR > 10 +bool OGRParquetWriterLayer::CreateFieldFromArrowSchema( + const struct ArrowSchema *schema, CSLConstList papszOptions) +{ + if (m_poTmpGPKGLayer) + { + // When using SORT_BY_BBOX=YES option, we can't directly write the + // input array, because we need to sort features. But this process + // only supports the base Arrow types supported by + // OGRLayer::WriteArrowBatch() + return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions); + } + + return OGRArrowWriterLayer::CreateFieldFromArrowSchema(schema, + papszOptions); +} +#endif + /************************************************************************/ /* IsArrowSchemaSupported() */ /************************************************************************/ @@ -881,6 +1235,16 @@ bool OGRParquetWriterLayer::IsArrowSchemaSupported( const struct ArrowSchema *schema, CSLConstList papszOptions, std::string &osErrorMsg) const { + if (m_poTmpGPKGLayer) + { + // When using SORT_BY_BBOX=YES option, we can't directly write the + // input array, because we need to sort features. But this process + // only supports the base Arrow types supported by + // OGRLayer::WriteArrowBatch() + return OGRLayer::IsArrowSchemaSupported(schema, papszOptions, + osErrorMsg); + } + if (schema->format[0] == 'e' && schema->format[1] == 0) { osErrorMsg = "float16 not supported";