Skip to content

Commit

Permalink
Multi bufr (#16)
Browse files Browse the repository at this point in the history
* bufr fix to handle multiple BUFR messages per file.

* Fix of trailing white space.

* Tests to follow.

* flake8 error fixed in init.py

Test added, single BUFR file with 2 messages, 48 geojsons expected.

* flake8 error fixed in tests.py

* flake8 error fixed in tests.py
  • Loading branch information
david-i-berry authored Nov 25, 2022
1 parent bbd2e1f commit 2ab3523
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 46 deletions.
109 changes: 63 additions & 46 deletions bufr2geojson/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,57 +793,74 @@ def transform(data: bytes, serialize: bool = False) -> Iterator[dict]:
# check data type, only in situ supported
# not yet implemented
# split subsets into individual messages and process

imsg = 0
messages_remaining = True
with open(tmp.name, 'rb') as fh:
# get first message
bufr_handle = codes_bufr_new_from_file(fh)
try:
codes_set(bufr_handle, "unpack", True)
except Exception as e:
LOGGER.error("Error unpacking message")
LOGGER.error(e)
if FAIL_ON_ERROR:
raise e
error = True

if not error:
nsubsets = codes_get(bufr_handle, "numberOfSubsets")
LOGGER.info(f"{nsubsets} subsets")
collections = dict()
for idx in range(nsubsets):
LOGGER.debug(f"Extracting subset {idx}")
codes_set(bufr_handle, "extractSubset", idx+1)
codes_set(bufr_handle, "doExtractSubsets", 1)
LOGGER.debug("Cloning subset to new message")
single_subset = codes_clone(bufr_handle)
LOGGER.debug("Unpacking")
codes_set(single_subset, "unpack", True)

parser = BUFRParser()
# only include tag if more than 1 subset in file
tag = ""
if nsubsets > 1:
tag = f"-{idx}"
try:
data = parser.as_geojson(single_subset, id=tag,
serialize=serialize)

except Exception as e:
LOGGER.error("Error parsing BUFR to GeoJSON, no data written") # noqa
LOGGER.error(e)
if FAIL_ON_ERROR:
raise e
data = {}
del parser
collections = deepcopy(data)
if bufr_handle is None:
LOGGER.warning("No messages in file")
messages_remaining = False
while messages_remaining:
messages_remaining = False # noqa set to false to prevent infinite loop by accident
imsg += 1
LOGGER.info(f"Processing message {imsg} from file")

try:
codes_set(bufr_handle, "unpack", True)
except Exception as e:
LOGGER.error("Error unpacking message")
LOGGER.error(e)
if FAIL_ON_ERROR:
raise e
error = True

if not error:
nsubsets = codes_get(bufr_handle, "numberOfSubsets")
LOGGER.info(f"{nsubsets} subsets")
collections = dict()
for idx in range(nsubsets):
LOGGER.debug(f"Extracting subset {idx}")
codes_set(bufr_handle, "extractSubset", idx+1)
codes_set(bufr_handle, "doExtractSubsets", 1)
LOGGER.debug("Cloning subset to new message")
single_subset = codes_clone(bufr_handle)
LOGGER.debug("Unpacking")
codes_set(single_subset, "unpack", True)

parser = BUFRParser()
# only include tag if more than 1 subset in file
tag = ""
if nsubsets > 1:
tag = f"-{idx}"
try:
data = parser.as_geojson(single_subset, id=tag,
serialize=serialize)

except Exception as e:
LOGGER.error("Error parsing BUFR to GeoJSON, no data written") # noqa
LOGGER.error(e)
if FAIL_ON_ERROR:
raise e
data = {}
del parser
collections = deepcopy(data)

yield collections
codes_release(single_subset)
else:
collections = {}
yield collections
codes_release(single_subset)
else:
collections = {}
yield collections

if not error:
codes_release(bufr_handle)
if not error:
codes_release(bufr_handle)

bufr_handle = codes_bufr_new_from_file(fh)

if bufr_handle is not None:
messages_remaining = True

LOGGER.info(f"{imsg} messages processed from file")


def strip2(value) -> str:
Expand Down
38 changes: 38 additions & 0 deletions tests/test_bufr2geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
###############################################################################

from __future__ import annotations
import base64
import itertools

from jsonschema import validate, FormatChecker
Expand All @@ -44,6 +45,33 @@ def is_wsi(instance):
return True


@pytest.fixture
def multimsg_bufr():
bufr_b64 = \
"QlVGUgAA5wQAABYAABUAAAAAAAEADgAH" \
"5gMUDwAAAAAJAAABgMdQAAC8AHivpTS1" \
"MrYQILG0N7qwuhAQEBAQEBAvzGo8BgvH" \
"Qjc9SA/wCAJ//z8z2t//////+AZDi1t7" \
"bIAMgu4AZH////8sdQyTLlAQJkBkCMYQ" \
"QAP/yP+T/////////////////////H/V" \
"Kf//+/R/8AyP////////AMj/////////" \
"////A+jBP7B4C77+3///////////v0f/" \
"7///////////////////9+j/////////" \
"/////////////+A3Nzc3QlVGUgAA5wQA" \
"ABYAABUAAAAAAAEADgAH5gMUCQAAAAAJ" \
"AAABgMdQAAC8AHixqbW0tbIwkBAQEBAQ" \
"EBAQEBAQEBAvzGokBgzdYjpfoA+0B99/" \
"/z8kCF//////+AZDg9t5jRAMgfQAZH//" \
"//8sdgyTqFAQgkhkBYgQQAP/yP+T////" \
"/////////////////H/VKf//+/R/8AyP" \
"////////AMj/////////////A+jBP7G4" \
"Cn7+3///////////v0f/7///////////" \
"////////9+j/////////////////////" \
"/+A3Nzc3"
msg = base64.b64decode(bufr_b64.encode("ascii"))
return msg


@pytest.fixture
def geojson_schema():
with open(f"{RESOURCES}/schemas/wmo-om-profile-geojson.yaml") as fh:
Expand Down Expand Up @@ -99,6 +127,16 @@ def geojson_output():
}


def test_multi(multimsg_bufr):
results = transform(multimsg_bufr)
# count number of geojsons
icount = 0
for res in results:
for key, val in res.items():
icount += 1
assert icount == 48


def test_transform(geojson_schema, geojson_output):
test_bufr_file = 'A_ISIA21EIDB202100_C_EDZW_20220320210902_11839953.bin'
with open(test_bufr_file, 'rb') as fh:
Expand Down

0 comments on commit 2ab3523

Please sign in to comment.