-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Non-nullable columns implies invariants but invariants not enabled in v3,7 #2882
Comments
For some reason the protocol requires invariants to be enabled even if it's V7. I honestly don't see why this is required since the idea behind table features is that you can opt in |
@ion-elgreco I agree. Meanwhile I also created the same table using pyspark and see no difference in the protocol it creates, so maybe it is something else. |
Can you share the first json log of both tables, can just paste them as code or share as gists |
python-0.19.1 {"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
{"metaData":{"id":"6950fa52-78c5-4e69-8c62-14d6da440d58","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"short\",\"nullable\":false,\"metadata\":{}},{\"name\":\"values\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1726471364355,"configuration":{}}}
{"add":{"path":"part-00001-57ecdc29-2b6c-4c64-805d-537216102e83-c000.snappy.parquet","partitionValues":{},"size":1179,"modificationTime":1726471364356,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"values\":1.0,\"id\":1},\"maxValues\":{\"values\":2.0,\"id\":2},\"nullCount\":{\"id\":0,\"date\":2,\"values\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1726471364356,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.19.1"}} python-0.15.3 {"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"ee5190ed-3786-4db9-b518-6b97d343a975","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"short\",\"nullable\":false,\"metadata\":{}},{\"name\":\"values\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1726471484655,"configuration":{}}}
{"add":{"path":"0-f28acb3f-a592-43f7-b163-1e6cc6740429-0.parquet","partitionValues":{},"size":1172,"modificationTime":1726471484654,"dataChange":true,"stats":"{\"numRecords\": 2, \"minValues\": {\"id\": 1, \"values\": 1.0}, \"maxValues\": {\"id\": 2, \"values\": 2.0}, \"nullCount\": {\"id\": 0, \"values\": 0, \"date\": 2}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1726471484655,"operation":"CREATE TABLE","operationParameters":{"location":"file:///.../zz-delta-table","protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","mode":"ErrorIfExists","metadata":"{\"configuration\":{},\"createdTime\":1726471484655,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"ee5190ed-3786-4db9-b518-6b97d343a975\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"id\\\",\\\"type\\\":\\\"short\\\",\\\"nullable\\\":false,\\\"metadata\\\":{}},{\\\"name\\\":\\\"values\\\",\\\"type\\\":\\\"double\\\",\\\"nullable\\\":false,\\\"metadata\\\":{}},{\\\"name\\\":\\\"date\\\",\\\"type\\\":\\\"timestamp\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}"},"clientVersion":"delta-rs.0.17.0"}} pyspark-3.5.0 {"commitInfo":{"timestamp":1726471612932,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"2","numOutputBytes":"2247"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"c2be61a6-4831-4b3b-8599-937b00ce781c"}}
{"metaData":{"id":"49fd7cb1-1ec1-4cd0-9415-5686380e8f66","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"values\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1726471610014}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
{"add":{"path":"part-00004-2e0632a4-9306-4fb7-9eb8-80caa199d2dc-c000.snappy.parquet","partitionValues":{},"size":887,"modificationTime":1726471612873,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"values\":2.0},\"maxValues\":{\"id\":1,\"values\":2.0},\"nullCount\":{\"id\":0,\"values\":0,\"date\":1}}"}}
{"add":{"path":"part-00009-9b8d1a42-dfa9-43b2-9c77-7550d83e0754-c000.snappy.parquet","partitionValues":{},"size":887,"modificationTime":1726471612873,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"values\":1.0},\"maxValues\":{\"id\":2,\"values\":1.0},\"nullCount\":{\"id\":0,\"values\":0,\"date\":1}}"}} |
@wahani there is actually no difference between spark-delta and delta-rs here (python-0.15.3 you have to ignore though since that was never valid). Are you sure your table does not have delta.invariants somewhere? |
@ion-elgreco well it is just the code I provide that reproduces the error. Strictly speaking I would say the nullable property of a column is an invariant, but I don't know how those are handled really, or how those are represented other than in the metadata. As I wrote, I think the problem must be something else then the protocol, it was just the error message from pyspark pointing into that direction. |
@wahani if you create a table with nullable only columns, it does work then? |
@ion-elgreco Seems to be the right direction. I can confirm with schema = pa.schema(
[
pa.field("id", pa.int16(), True),
pa.field("values", pa.float64(), True),
pa.field("date", pa.timestamp("us"), True),
]
) it does work. Tested now with python-0.20.0. |
Ok so nullable columns implies invariants I guess, will take a look at this in couple weeks |
To further complicate things, also adding a timezone to the timestamp data but keeping the nullable constraint works. I.e. schema schema = pa.schema(
[
pa.field("id", pa.int16(), False),
pa.field("values", pa.float64(), False),
pa.field("date", pa.timestamp("us", "UTC"), True),
]
) will not result in an error with pyspark. This is also how I resolved the issue in our codebase. |
@wahani that creates a table without table features so that's why it works because then invariants is always enabled in lower than 3,7 |
Thanks for clarifying. |
@wahani can you please recreate that spark table with nullable False, the previous one you shared had nullable is True on all cols, so it's not entirely clear to me how the logs will look like then |
Sure. With the schema: schema = pa.schema(
[
pa.field("id", pa.int16(), False),
pa.field("values", pa.float64(), False),
pa.field("date", pa.timestamp("us", "UTC"), False),
]
) we get the following metadata: {"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"20dc260d-15ba-4271-bb18-152d971bf71e","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"short\",\"nullable\":false,\"metadata\":{}},{\"name\":\"values\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1727786433560,"configuration":{}}}
{"add":{"path":"part-00001-2d622120-57a6-4e1f-b433-ad0947430aff-c000.snappy.parquet","partitionValues":{},"size":1310,"modificationTime":1727786433600,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"date\":\"2024-10-01T14:40:33.545349Z\",\"values\":1.0,\"id\":1},\"maxValues\":{\"id\":2,\"values\":2.0,\"date\":\"2024-10-01T14:40:33.545355Z\"},\"nullCount\":{\"id\":0,\"date\":0,\"values\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1727786433601,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists"},"operationMetrics":{"execution_time_ms":46,"num_added_files":1,"num_added_rows":2,"num_partitions":0,"num_removed_files":0},"clientVersion":"delta-rs.0.20.0"}} |
You need to re-create it with spark ;) |
Yes Delta-Spark automatically creates (incorrect) "not-null invariants" for non-nullable columns, enabling the "invariant" feature: https://github.com/delta-io/delta/blob/8e0b133f46f641941ad15ed8cbe7c2d1cc777a5b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Invariants.scala#L73. I've been trying to get the arguably incorrect handling of nested not-null columns fixed for a while: delta-io/delta#860 |
Environment
Delta-rs version: python-0.16.0+ (example works until 0.15.3, also tested with 0.19.2)
Binding: Python
Environment:
Bug
What happened:
Beginning with version python-0.16.0+ I receive a DeltaTableFeatureException when reading a delta table with a local spark session. This is related to the newly introduced handling of timezones. In 0.15.3 the write operation will create the following protocol
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
with 0.16.0 and onward we will see{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
. However, when reading the delta table using a local spark session, I will get the following error message:Adding 'invariants' to the writerFeatures manually will resolve the issue. So far I haven't been able to find an option to add the feature explicitly using the python bindings. E.g. by providing table configuration like "delta.minReaderVersion" or "delta.writerFeatures" - they seem to be ignored or not correct.
What you expected to happen:
I can write timestamp data with deltalake and read it back in using spark without any additional configuration (0.15.3 behavior).
How to reproduce it:
Thanks for any help or guidance with this.
The text was updated successfully, but these errors were encountered: