From 87b86535563676a7331d44e2fa5d485ce468d7d8 Mon Sep 17 00:00:00 2001 From: Even Rouault Date: Thu, 26 Oct 2023 21:55:10 +0200 Subject: [PATCH] Arrow/Parquet: support LargeString and LargeBinary for geometry columns (read support only) --- autotest/ogr/ogr_parquet.py | 30 ++- ogr/ogrsf_frmts/arrow_common/ogr_arrow.h | 4 +- .../arrow_common/ograrrowlayer.hpp | 194 +++++++++++++++--- ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp | 6 +- 4 files changed, 195 insertions(+), 39 deletions(-) diff --git a/autotest/ogr/ogr_parquet.py b/autotest/ogr/ogr_parquet.py index 455117161d9e..b1e5203de94d 100755 --- a/autotest/ogr/ogr_parquet.py +++ b/autotest/ogr/ogr_parquet.py @@ -35,6 +35,7 @@ import gdaltest import ogrtest import pytest +import test_cli_utilities from osgeo import gdal, ogr, osr @@ -493,7 +494,6 @@ def test_ogr_parquet_1(use_vsi): def test_ogr_parquet_test_ogrsf_test(): - import test_cli_utilities if test_cli_utilities.get_test_ogrsf_path() is None: pytest.skip() @@ -511,7 +511,6 @@ def test_ogr_parquet_test_ogrsf_test(): def test_ogr_parquet_test_ogrsf_example(): - import test_cli_utilities if test_cli_utilities.get_test_ogrsf_path() is None: pytest.skip() @@ -529,7 +528,6 @@ def test_ogr_parquet_test_ogrsf_example(): def test_ogr_parquet_test_ogrsf_all_geoms(): - import test_cli_utilities if test_cli_utilities.get_test_ogrsf_path() is None: pytest.skip() @@ -3085,3 +3083,29 @@ def test_ogr_parquet_write_arrow_rewind_polygon(tmp_vsimem): lyr = ds.GetLayer(0) f = lyr.GetNextFeature() assert f.GetGeometryRef().ExportToWkt() == "POLYGON ((0 0,1 1,0 1,0 0))" + + +############################################################################### + + +@gdaltest.enable_exceptions() +@pytest.mark.parametrize( + "filename", + [ + "data/parquet/poly_wkb_large_binary.parquet", + "data/parquet/poly_wkt_large_string.parquet", + ], +) +def test_ogr_parquet_read_large_binary_or_string_for_geometry(filename): + ds = ogr.Open(filename) + lyr = ds.GetLayer(0) + f = lyr.GetNextFeature() + assert f.GetGeometryRef() is not None + + if test_cli_utilities.get_test_ogrsf_path() is None: + ret = gdaltest.runexternal( + test_cli_utilities.get_test_ogrsf_path() + " -ro " + filename + ) + + assert "INFO" in ret + assert "ERROR" not in ret diff --git a/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h b/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h index a503512d2937..b0cecfb12fb1 100644 --- a/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h +++ b/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h @@ -93,8 +93,9 @@ class OGRArrowLayer CPL_NON_FINAL void ExploreExprNode(const swq_expr_node *poNode); bool UseRecordBatchBaseImplementation() const; + template static struct ArrowArray * - CreateWKTArrayFromWKBArray(const struct ArrowArray *sourceArray); + CreateWKBArrayFromWKTArray(const struct ArrowArray *sourceArray); int GetArrowSchemaInternal(struct ArrowSchema *out) const; @@ -116,6 +117,7 @@ class OGRArrowLayer CPL_NON_FINAL int m_iBBOXMaxYField = -1; const arrow::BinaryArray *m_poArrayWKB = nullptr; + const arrow::LargeBinaryArray *m_poArrayWKBLarge = nullptr; const arrow::Array *m_poArrayBBOX = nullptr; const arrow::DoubleArray *m_poArrayMinX = nullptr; const arrow::DoubleArray *m_poArrayMinY = nullptr; diff --git a/ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp b/ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp index 4a4cfad9b489..46aab91dd7ab 100644 --- a/ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp +++ b/ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp @@ -603,11 +603,19 @@ inline OGRwkbGeometryType OGRArrowLayer::ComputeGeometryColumnTypeProcessBatch( const auto array = poBatch->column(iBatchCol); const auto castBinaryArray = (m_aeGeomEncoding[iGeomCol] == OGRArrowGeomEncoding::WKB) - ? std::static_pointer_cast(array) + ? std::dynamic_pointer_cast(array) + : nullptr; + const auto castLargeBinaryArray = + (m_aeGeomEncoding[iGeomCol] == OGRArrowGeomEncoding::WKB) + ? std::dynamic_pointer_cast(array) : nullptr; const auto castStringArray = (m_aeGeomEncoding[iGeomCol] == OGRArrowGeomEncoding::WKT) - ? std::static_pointer_cast(array) + ? std::dynamic_pointer_cast(array) + : nullptr; + const auto castLargeStringArray = + (m_aeGeomEncoding[iGeomCol] == OGRArrowGeomEncoding::WKT) + ? std::dynamic_pointer_cast(array) : nullptr; for (int64_t i = 0; i < poBatch->num_rows(); i++) { @@ -624,6 +632,17 @@ inline OGRwkbGeometryType OGRArrowLayer::ComputeGeometryColumnTypeProcessBatch( OGRReadWKBGeometryType(data, wkbVariantIso, &eThisGeomType); } } + else if (m_aeGeomEncoding[iGeomCol] == OGRArrowGeomEncoding::WKB && + castLargeBinaryArray) + { + arrow::LargeBinaryArray::offset_type out_length = 0; + const uint8_t *data = + castLargeBinaryArray->GetValue(i, &out_length); + if (out_length >= 5) + { + OGRReadWKBGeometryType(data, wkbVariantIso, &eThisGeomType); + } + } else if (m_aeGeomEncoding[iGeomCol] == OGRArrowGeomEncoding::WKT && castStringArray) { @@ -633,6 +652,15 @@ inline OGRwkbGeometryType OGRArrowLayer::ComputeGeometryColumnTypeProcessBatch( OGRReadWKTGeometryType(osWKT.c_str(), &eThisGeomType); } } + else if (m_aeGeomEncoding[iGeomCol] == OGRArrowGeomEncoding::WKT && + castLargeStringArray) + { + const auto osWKT = castLargeStringArray->GetString(i); + if (!osWKT.empty()) + { + OGRReadWKTGeometryType(osWKT.c_str(), &eThisGeomType); + } + } if (eThisGeomType != wkbNone) { @@ -761,7 +789,8 @@ inline bool OGRArrowLayer::IsValidGeometryEncoding( "ogc.wkt" // As used in ARROW:extension:name field metadata ) { - if (fieldTypeId != arrow::Type::STRING) + if (fieldTypeId != arrow::Type::LARGE_STRING && + fieldTypeId != arrow::Type::STRING) { CPLError(CE_Warning, CPLE_AppDefined, "Geometry column %s has a non String type: %s. " @@ -778,7 +807,8 @@ inline bool OGRArrowLayer::IsValidGeometryEncoding( "ogc.wkb" // As used in ARROW:extension:name field metadata ) { - if (fieldTypeId != arrow::Type::BINARY) + if (fieldTypeId != arrow::Type::LARGE_BINARY && + fieldTypeId != arrow::Type::BINARY) { CPLError(CE_Warning, CPLE_AppDefined, "Geometry column %s has a non Binary type: %s. " @@ -2108,11 +2138,28 @@ inline OGRGeometry *OGRArrowLayer::ReadGeometry(int iGeomField, { case OGRArrowGeomEncoding::WKB: { - CPLAssert(array->type_id() == arrow::Type::BINARY); - const auto castArray = - static_cast(array); int out_length = 0; - const uint8_t *data = castArray->GetValue(nIdxInBatch, &out_length); + const uint8_t *data; + if (array->type_id() == arrow::Type::BINARY) + { + const auto castArray = + static_cast(array); + data = castArray->GetValue(nIdxInBatch, &out_length); + } + else + { + CPLAssert(array->type_id() == arrow::Type::LARGE_BINARY); + const auto castArray = + static_cast(array); + int64_t out_length64 = 0; + data = castArray->GetValue(nIdxInBatch, &out_length64); + if (out_length64 > INT_MAX) + { + CPLError(CE_Failure, CPLE_AppDefined, "Too large geometry"); + return nullptr; + } + out_length = static_cast(out_length64); + } if (OGRGeometryFactory::createFromWkb( data, poGeomFieldDefn->GetSpatialRef(), &poGeometry, out_length) == OGRERR_NONE) @@ -2132,12 +2179,25 @@ inline OGRGeometry *OGRArrowLayer::ReadGeometry(int iGeomField, case OGRArrowGeomEncoding::WKT: { - CPLAssert(array->type_id() == arrow::Type::STRING); - const auto castArray = - static_cast(array); - const auto osWKT = castArray->GetString(nIdxInBatch); - OGRGeometryFactory::createFromWkt( - osWKT.c_str(), poGeomFieldDefn->GetSpatialRef(), &poGeometry); + if (array->type_id() == arrow::Type::STRING) + { + const auto castArray = + static_cast(array); + const auto osWKT = castArray->GetString(nIdxInBatch); + OGRGeometryFactory::createFromWkt( + osWKT.c_str(), poGeomFieldDefn->GetSpatialRef(), + &poGeometry); + } + else + { + CPLAssert(array->type_id() == arrow::Type::LARGE_STRING); + const auto castArray = + static_cast(array); + const auto osWKT = castArray->GetString(nIdxInBatch); + OGRGeometryFactory::createFromWkt( + osWKT.c_str(), poGeomFieldDefn->GetSpatialRef(), + &poGeometry); + } break; } @@ -3143,6 +3203,7 @@ OGRArrowLayer::SetBatch(const std::shared_ptr &poBatch) m_poBatch = poBatch; m_poBatchColumns.clear(); m_poArrayWKB = nullptr; + m_poArrayWKBLarge = nullptr; m_poArrayBBOX = nullptr; m_poArrayMinX = nullptr; m_poArrayMinY = nullptr; @@ -3167,8 +3228,15 @@ OGRArrowLayer::SetBatch(const std::shared_ptr &poBatch) m_aeGeomEncoding[m_iGeomFieldFilter] == OGRArrowGeomEncoding::WKB) { const arrow::Array *poArrayWKB = m_poBatchColumns[iCol].get(); - CPLAssert(poArrayWKB->type_id() == arrow::Type::BINARY); - m_poArrayWKB = static_cast(poArrayWKB); + if (poArrayWKB->type_id() == arrow::Type::BINARY) + m_poArrayWKB = + static_cast(poArrayWKB); + else + { + CPLAssert(poArrayWKB->type_id() == arrow::Type::LARGE_BINARY); + m_poArrayWKBLarge = + static_cast(poArrayWKB); + } if (m_iBBOXMinXField >= 0 && m_iBBOXMinYField >= 0 && m_iBBOXMaxXField >= 0 && m_iBBOXMaxYField >= 0 && @@ -3284,13 +3352,15 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature() if (iCol >= 0 && m_aeGeomEncoding[m_iGeomFieldFilter] == OGRArrowGeomEncoding::WKB) { - CPLAssert(m_poArrayWKB); + CPLAssert(m_poArrayWKB || m_poArrayWKBLarge); OGREnvelope sEnvelope; while (true) { bool bSkipToNextFeature = false; - if (m_poArrayWKB->IsNull(m_nIdxInBatch)) + if ((m_poArrayWKB && m_poArrayWKB->IsNull(m_nIdxInBatch)) || + (m_poArrayWKBLarge && + m_poArrayWKBLarge->IsNull(m_nIdxInBatch))) { bSkipToNextFeature = true; } @@ -3310,7 +3380,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature() bSkipToNextFeature = true; } } - else + else if (m_poArrayWKB) { int out_length = 0; const uint8_t *data = @@ -3321,6 +3391,21 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature() bSkipToNextFeature = true; } } + else + { + CPLAssert(m_poArrayWKBLarge); + int64_t out_length64 = 0; + const uint8_t *data = m_poArrayWKBLarge->GetValue( + m_nIdxInBatch, &out_length64); + if (out_length64 < INT_MAX && + OGRWKBGetBoundingBox(data, + static_cast(out_length64), + sEnvelope) && + !m_sFilterEnvelope.Intersects(sEnvelope)) + { + bSkipToNextFeature = true; + } + } } if (!bSkipToNextFeature) { @@ -3673,19 +3758,42 @@ inline OGRErr OGRArrowLayer::GetExtent(int iGeomField, OGREnvelope *psExtent, *psExtent = OGREnvelope(); auto array = m_poBatchColumns[iCol]; - CPLAssert(array->type_id() == arrow::Type::BINARY); - auto castArray = std::static_pointer_cast(array); + std::shared_ptr smallArray; + std::shared_ptr largeArray; + if (array->type_id() == arrow::Type::BINARY) + smallArray = std::static_pointer_cast(array); + else + { + CPLAssert(array->type_id() == arrow::Type::LARGE_BINARY); + largeArray = + std::static_pointer_cast(array); + } OGREnvelope sEnvelope; while (true) { if (!array->IsNull(m_nIdxInBatch)) { - int out_length = 0; - const uint8_t *data = - castArray->GetValue(m_nIdxInBatch, &out_length); - if (OGRWKBGetBoundingBox(data, out_length, sEnvelope)) + if (smallArray) { - psExtent->Merge(sEnvelope); + int out_length = 0; + const uint8_t *data = + smallArray->GetValue(m_nIdxInBatch, &out_length); + if (OGRWKBGetBoundingBox(data, out_length, sEnvelope)) + { + psExtent->Merge(sEnvelope); + } + } + else + { + int64_t out_length = 0; + const uint8_t *data = + largeArray->GetValue(m_nIdxInBatch, &out_length); + if (out_length < INT_MAX && + OGRWKBGetBoundingBox(data, static_cast(out_length), + sEnvelope)) + { + psExtent->Merge(sEnvelope); + } } } @@ -3705,8 +3813,16 @@ inline OGRErr OGRArrowLayer::GetExtent(int iGeomField, OGREnvelope *psExtent, return OGRERR_FAILURE; } array = m_poBatchColumns[iCol]; - CPLAssert(array->type_id() == arrow::Type::BINARY); - castArray = std::static_pointer_cast(array); + if (array->type_id() == arrow::Type::BINARY) + smallArray = + std::static_pointer_cast(array); + else + { + CPLAssert(array->type_id() == arrow::Type::LARGE_BINARY); + largeArray = + std::static_pointer_cast( + array); + } } } } @@ -4124,7 +4240,9 @@ inline int OGRArrowLayer::GetNextArrowArray(struct ArrowArrayStream *stream, } } - auto status = arrow::ExportRecordBatch(*m_poBatch, out_array, nullptr); + struct ArrowSchema schema; + memset(&schema, 0, sizeof(schema)); + auto status = arrow::ExportRecordBatch(*m_poBatch, out_array, &schema); m_nIdxInBatch = m_poBatch->num_rows(); if (!status.ok()) { @@ -4153,7 +4271,11 @@ inline int OGRArrowLayer::GetNextArrowArray(struct ArrowArrayStream *stream, : m_anMapGeomFieldIndexToArrowColumn[i]; auto sourceArray = out_array->children[nArrayIdx]; auto targetArray = - CreateWKTArrayFromWKBArray(sourceArray); + strcmp(schema.children[nArrayIdx]->format, "u") == 0 + ? CreateWKBArrayFromWKTArray( + sourceArray) + : CreateWKBArrayFromWKTArray( + sourceArray); if (targetArray) { sourceArray->release(sourceArray); @@ -4164,6 +4286,8 @@ inline int OGRArrowLayer::GetNextArrowArray(struct ArrowArrayStream *stream, { out_array->release(out_array); memset(out_array, 0, sizeof(*out_array)); + if (schema.release) + schema.release(&schema); return ENOMEM; } } @@ -4177,6 +4301,9 @@ inline int OGRArrowLayer::GetNextArrowArray(struct ArrowArrayStream *stream, } } + if (schema.release) + schema.release(&schema); + OverrideArrowRelease(m_poArrowDS, out_array); const auto nFeatureIdxCur = m_nFeatureIdx; @@ -4263,11 +4390,12 @@ class OGRArrowLayerAppendBuffer : public OGRAppendBuffer }; /************************************************************************/ -/* CreateWKTArrayFromWKBArray() */ +/* CreateWKBArrayFromWKTArray() */ /************************************************************************/ +template inline struct ArrowArray * -OGRArrowLayer::CreateWKTArrayFromWKBArray(const struct ArrowArray *sourceArray) +OGRArrowLayer::CreateWKBArrayFromWKTArray(const struct ArrowArray *sourceArray) { CPLAssert(sourceArray->n_buffers == 3); CPLAssert(sourceArray->buffers[1] != nullptr); @@ -4337,7 +4465,7 @@ OGRArrowLayer::CreateWKTArrayFromWKBArray(const struct ArrowArray *sourceArray) OGRWKTToWKBTranslator oTranslator(oOGRAppendBuffer); const auto sourceOffsets = - static_cast(sourceArray->buffers[1]) + nOffset; + static_cast(sourceArray->buffers[1]) + nOffset; auto sourceBytes = static_cast(const_cast(sourceArray->buffers[2])); auto targetOffsets = diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp index 6e1bb174df1a..e85ddc04b45d 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp @@ -344,7 +344,8 @@ bool OGRParquetLayerBase::DealWithGeometryColumn( { std::shared_ptr fieldType = field->type(); auto fieldTypeId = fieldType->id(); - if (fieldTypeId == arrow::Type::BINARY) + if (fieldTypeId == arrow::Type::BINARY || + fieldTypeId == arrow::Type::LARGE_BINARY) { CPLDebug("PARQUET", "Field %s detected as likely WKB geometry field", @@ -352,7 +353,8 @@ bool OGRParquetLayerBase::DealWithGeometryColumn( bRegularField = false; m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKB); } - else if (fieldTypeId == arrow::Type::STRING && + else if ((fieldTypeId == arrow::Type::STRING || + fieldTypeId == arrow::Type::LARGE_STRING) && (field->name().find("wkt") != std::string::npos || field->name().find("WKT") != std::string::npos)) {