diff --git a/examples/basic_read_write.ipynb b/examples/basic_read_write.ipynb index 55bc385..0a67897 100644 --- a/examples/basic_read_write.ipynb +++ b/examples/basic_read_write.ipynb @@ -26,7 +26,11 @@ { "cell_type": "code", "execution_count": 1, - "metadata": {}, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "!wget -q http://research.ics.aalto.fi/cog/data/udhr/txt/eng.txt" @@ -42,15 +46,19 @@ { "cell_type": "code", "execution_count": 2, - "metadata": {}, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [ { "name": "stdout", - "output_type": "stream", "text": [ "Universal Declaration of Human Rights\n", "Preamble Whereas recognition of the inherent dignity and of the equal and inalienable rights of all members of the human family is the foundation of freedom, justice and peace in the world, Whereas disregard and contempt for human rights have resulted in barbarous acts which have outraged the conscience of mankind, and the advent of a world in which human beings shall enjoy freedom of speech and belief and freedom from fear and want has been proclaimed as the highest aspiration of the common people, Whereas it is essential, if man is not to be compelled to have recourse, as a last resort, to rebellion against tyranny and oppression, that human rights should be protected by the rule of law, Whereas it is essential to promote the development of friendly relations between nations, Whereas the peoples of the United Nations have in the Charter reaffirmed their faith in fundamental human rights, in the dignity and worth of the human person and in the equal rights of men and women and have determined to promote social progress and better standards of life in larger freedom, Whereas Member States have pledged themselves to achieve, in cooperation with the United Nations, the promotion of universal respect for and observance of human rights and fundamental freedoms, Whereas a common understanding of these rights and freedoms is of the greatest importance for the full realization of this pledge, Now, therefore, The General Assembly, Proclaims this Universal Declaration of Human Rights as a common standard of achievement for all peoples and all nations, to the end that every individual and every organ of society, keeping this Declaration constantly in mind, shall strive by\n" - ] + ], + "output_type": "stream" } ], "source": [ @@ -76,7 +84,11 @@ { "cell_type": "code", "execution_count": 3, - "metadata": {}, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "import re\n", @@ -111,7 +123,11 @@ { "cell_type": "code", "execution_count": 4, - "metadata": {}, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "import numpy as np\n", @@ -124,11 +140,11 @@ " # save vocabulary along with the records\n", " writer.add_metadata({'vocab': vocab.to_json()})\n", "\n", - " for idx, line in enumerate(f):\n", + " for line in f:\n", " line = line.strip().lower()\n", " tokens = re.findall(r\"\\w+|[^\\w\\s]\", line, re.UNICODE)\n", " token_ids = vocab.encode(tokens, add_eos=False, use_unk=True)\n", - " writer.write(idx, np.array(token_ids))" + " writer.write(np.array(token_ids))" ] }, { @@ -147,15 +163,18 @@ { "cell_type": "code", "execution_count": 5, - "metadata": {}, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [ { "name": "stdout", - "output_type": "stream", "text": [ - "universal declaration of human rights\n", - "preamble whereas recognition of the inherent dignity and of the equal and inalienable rights of all members of the human family is the foundation of freedom , justice and peace in the world , whereas disregard and contempt for human rights have resulted in barbarous acts which have outraged the conscience of mankind , and the advent of a world in which human beings shall enjoy freedom of speech and belief and freedom from fear and want has been proclaimed as the highest aspiration of the common people , whereas it is essential , if man is not to be compelled to have recourse , as a last resort , to rebellion against tyranny and oppression , that human rights should be protected by the rule of law , whereas it is essential to promote the development of friendly relations between nations , whereas the peoples of the united nations have in the charter reaffirmed their faith in fundamental human rights , in the dignity and worth of the human person and in the equal rights of men and women and have determined to promote social progress and better standards of life in larger freedom , whereas member states have pledged themselves to achieve , in cooperation with the united nations , the promotion of universal respect for and observance of human rights and fundamental freedoms , whereas a common understanding of these rights and freedoms is of the greatest importance for the full realization of this pledge , now , therefore , the general assembly , proclaims this universal declaration of human rights as a common standard of achievement for all peoples and all nations , to the end that every individual and every organ of society , keeping this declaration constantly in mind , shall strive by\n" - ] + "universal declaration of human rights\n" + ], + "output_type": "stream" } ], "source": [ @@ -191,8 +210,17 @@ "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "source": [], + "metadata": { + "collapsed": false + } + } } }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/examples/bert.ipynb b/examples/bert.ipynb index e7fa702..5df9351 100644 --- a/examples/bert.ipynb +++ b/examples/bert.ipynb @@ -132,11 +132,11 @@ "with ShardedWriter(Hdf5RecordWriter,\n", " output_file_template,\n", " max_records_per_shard=10) as writer, open(input_file) as f:\n", - " for idx, line in enumerate(f):\n", + " for line in f:\n", " line = line.strip()\n", " tokens = bert.tokenize(line)\n", " ctx_representations = bert.encode(tokens)\n", - " writer.write(idx, ctx_representations)" + " writer.write(ctx_representations)" ] }, { @@ -193,8 +193,17 @@ "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "source": [], + "metadata": { + "collapsed": false + } + } } }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/examples/data_load.ipynb b/examples/data_load.ipynb index d9aa781..e9c346a 100644 --- a/examples/data_load.ipynb +++ b/examples/data_load.ipynb @@ -99,7 +99,7 @@ " line = line.strip().lower()\n", " tokens = re.findall(r\"\\w+|[^\\w\\s]\", line, re.UNICODE)\n", " token_ids = vocab.encode(tokens, add_eos=False, use_unk=True)\n", - " writer.write(idx, np.array(token_ids))" + " writer.write(np.array(token_ids))" ] }, { @@ -171,8 +171,17 @@ "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "source": [], + "metadata": { + "collapsed": false + } + } } }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/examples/fields.ipynb b/examples/fields.ipynb index 7372631..041d963 100644 --- a/examples/fields.ipynb +++ b/examples/fields.ipynb @@ -40,8 +40,12 @@ }, { "cell_type": "code", - "execution_count": 1, - "metadata": {}, + "execution_count": 4, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "!wget -q http://research.ics.aalto.fi/cog/data/udhr/txt/eng.txt" @@ -58,8 +62,12 @@ }, { "cell_type": "code", - "execution_count": 2, - "metadata": {}, + "execution_count": 5, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "import re\n", @@ -80,8 +88,12 @@ }, { "cell_type": "code", - "execution_count": 3, - "metadata": {}, + "execution_count": 6, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "import spacy\n", @@ -101,8 +113,12 @@ }, { "cell_type": "code", - "execution_count": 4, - "metadata": {}, + "execution_count": 7, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "tokens_in_text = [str(t) for sent in sents_in_text for t in sent]\n", @@ -125,8 +141,12 @@ }, { "cell_type": "code", - "execution_count": 5, - "metadata": {}, + "execution_count": 8, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [], "source": [ "import numpy as np\n", @@ -143,13 +163,13 @@ " # save vocabulary along with the records\n", " writer.add_metadata({'vocab': vocab.to_json()})\n", "\n", - " for idx, sent in enumerate(sents_in_text):\n", + " for sent in sents_in_text:\n", " tokens = [str(w) for w in sent]\n", " token_ids = vocab.encode(tokens, add_eos=False, use_unk=True)\n", " head_indexes = [w.head.i for w in sent]\n", " record = {SEQ_FIELD: np.array(token_ids),\n", " DEPS_FIELD: np.array(head_indexes)}\n", - " writer.write(idx, record)" + " writer.write(record)" ] }, { @@ -163,30 +183,24 @@ }, { "cell_type": "code", - "execution_count": 6, - "metadata": {}, + "execution_count": 9, + "metadata": { + "pycharm": { + "is_executing": false + } + }, "outputs": [ { "name": "stdout", - "output_type": "stream", "text": [ "Sentence: Universal Declaration of Human Rights\n", "Deps: [1 1 1 4 2]\n", "\n", "Sentence: Preamble\n", "Deps: [0]\n", - "\n", - "Sentence: Whereas recognition of the inherent dignity and of the equal and inalienable rights of all members of the human family is the foundation of freedom , justice and peace in the world , Whereas disregard and contempt for human rights have resulted in barbarous acts which have outraged the conscience of mankind , and the advent of a world in which human beings shall enjoy freedom of speech and belief and freedom from fear and want has been proclaimed as the highest aspiration of the common people , Whereas it is essential , if man is not to be compelled to have recourse , as a last resort , to rebellion against tyranny and oppression , that human rights should be protected by the rule of law , Whereas it is essential to promote the development of friendly relations between nations ,\n", - "Deps: [ 21 21 2 6 6 3 3 3 13 13 10 10 8 13 16 14 16 20\n", - " 20 17 42 23 21 23 24 25 25 27 27 23 32 30 42 35 42 35\n", - " 35 35 40 38 42 42 42 45 43 48 48 45 50 48 50 51 42 42\n", - " 56 79 56 59 57 65 60 63 65 65 59 65 66 67 68 68 68 68\n", - " 65 73 65 65 79 79 42 79 83 83 80 83 87 87 84 79 91 91\n", - " 122 91 91 96 96 122 100 100 100 96 102 100 102 100 96 108 108 105\n", - " 96 96 110 111 112 113 113 96 122 119 122 122 122 79 122 125 123 125\n", - " 126 122 131 131 122 131 134 131 136 134 136 139 137 139 140 122]\n", "\n" - ] + ], + "output_type": "stream" } ], "source": [ @@ -227,8 +241,17 @@ "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "source": [], + "metadata": { + "collapsed": false + } + } } }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/examples/sharded_storage.ipynb b/examples/sharded_storage.ipynb index d458186..9e0e55e 100644 --- a/examples/sharded_storage.ipynb +++ b/examples/sharded_storage.ipynb @@ -108,7 +108,6 @@ } ], "source": [ - "from Bio import SeqIO\n", "import json\n", "import numpy as np\n", "from seqp.hdf5 import Hdf5RecordWriter\n", @@ -126,11 +125,11 @@ " output_file_template,\n", " max_records_per_shard=5000) as writer:\n", "\n", - " for idx, seq_record in enumerate(tqdm(SeqIO.parse(file_name, \"fasta\"))):\n", + " for seq_record in tqdm(SeqIO.parse(file_name, \"fasta\")):\n", " _, _, protein = seq_record.id.split('|')\n", - " protein2idx[protein] = idx\n", " sequence = [nucleotide2num(letter) for letter in seq_record.seq]\n", - " writer.write(idx, np.array(sequence, dtype=np.uint8))\n", + " idx = writer.write(np.array(sequence, dtype=np.uint8))\n", + " protein2idx[protein] = idx\n", "\n", " writer.add_metadata({'protein_idx': json.dumps(protein2idx)})\n" ] @@ -196,8 +195,17 @@ "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "source": [], + "metadata": { + "collapsed": false + } + } } }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/seqp/hdf5.py b/seqp/hdf5.py index 56f63c4..5a3af51 100644 --- a/seqp/hdf5.py +++ b/seqp/hdf5.py @@ -13,9 +13,10 @@ from .record import RecordReader, RecordWriter _LENGTHS_KEY = 'lengths' +_INITIAL_INDEX_KEY = 'initial_index' +_FINAL_INDEX_KEY = 'final_index' _FIELDS_KEY = 'fields' _SEQUENCE_FIELD_KEY = 'sequence_field' -_MAX_LENGTH = float('inf') def _compose_key(idx: int, field: Optional[str]): @@ -30,19 +31,21 @@ class Hdf5RecordWriter(RecordWriter): def __init__(self, output_file: str, - fields: Iterable[str]=None, - sequence_field: str=None, - append: bool=False): + fields: Iterable[str] = None, + sequence_field: str = None, + append: bool = False, + initial_index: int = 1): super().__init__(fields=fields, sequence_field=sequence_field, append=append) if fields is not None or sequence_field is not None: assert fields is not None and sequence_field is not None assert sequence_field in fields - + self.initial_index = initial_index + self.next_index = initial_index self.output_file = output_file self.hdf5_file = h5py.File(output_file, 'a' if append else 'w') - self.index_to_length = (dict() if not append - else json.loads(self.hdf5_file[_LENGTHS_KEY][0])) + self.lengths = ([] if not append + else json.loads(self.hdf5_file[_LENGTHS_KEY][0])) if append and fields is not None: assert set(fields) == set(json.loads(self.hdf5_file[_FIELDS_KEY][0])) assert sequence_field == self.hdf5_file[_SEQUENCE_FIELD_KEY][0] @@ -65,8 +68,12 @@ def add_metadata(k, v): for key, value in self.metadata.items(): add_metadata(key, value) - # add extra piece of metadata with sequence lengths - add_metadata(_LENGTHS_KEY, json.dumps(self.index_to_length)) + # add extra piece of metadata with sequence lengths... + add_metadata(_LENGTHS_KEY, json.dumps(self.lengths)) + + # ...and initial/final indexes + add_metadata(_INITIAL_INDEX_KEY, str(self.initial_index)) + add_metadata(_FINAL_INDEX_KEY, str(self.next_index - 1)) # add extra piece of metadata with keys if they were provided if self.fields: @@ -75,49 +82,41 @@ def add_metadata(k, v): self.hdf5_file.close() - if len(self.index_to_length) == 0: + if len(self.lengths) == 0: os.remove(self.output_file) def write(self, - idx: int, - record: Optional[Union[np.ndarray, Dict[str, np.ndarray]]], - ) -> None: + record: Union[np.ndarray, Dict[str, np.ndarray]], + ) -> int: + idx = self.next_index + self.next_index += 1 if isinstance(record, dict): assert self.fields is not None, "Writers without fields need numpy arrays as record" field_records = record assert all(field in self.fields for field in field_records.keys()) - lengths = {field: self._write(idx, record, field) - for field, record in field_records.items()} - length = lengths[self.sequence_field] + record_lengths = {field: self._write(idx, record, field) + for field, record in field_records.items()} + length = record_lengths[self.sequence_field] else: assert self.fields is None, "Writers with fields need dictionaries as record" length = self._write(idx, record) - self.index_to_length[str(idx)] = length + self.lengths.append(length) + return idx - def _write(self, idx: int, - record: Optional[np.ndarray], + def _write(self, + idx: int, + record: np.ndarray, field: str=None, ) -> int: internal_key = _compose_key(idx, field) - assert internal_key not in self.index_to_length - if record is None: - # sentence did not match the needed criteria to be encoded (e.g. too long), so - # we add an empty dataset - # (see http://docs.h5py.org/en/stable/high/dataset.html#creating-and-reading-empty-or-null-datasets-and-attributes) - self.hdf5_file.create_dataset(internal_key, - data=h5py.Empty("f"), - track_times=False) - length = 0 - else: - self.hdf5_file.create_dataset(internal_key, - record.shape, - dtype=record.dtype, - data=record, - track_times=False) - - length = record.shape[0] + self.hdf5_file.create_dataset(internal_key, + record.shape, + dtype=record.dtype, + data=record, + track_times=False) + length = record.shape[0] return length @@ -131,40 +130,33 @@ class Hdf5RecordReader(RecordReader): """ def __init__(self, - file_names: Union[str, List[str]], - min_length: int = 0, - max_length: int = _MAX_LENGTH): + file_names: Union[str, List[str]]): """ Constructs an Hdf5RecordReader. :param file_names: HDF5 files to read. - :param min_length: Minimum sequence length threshold. - :param max_length: Maximum sequence length threshold. """ if isinstance(file_names, str): file_names = [file_names] self.file_names = file_names self.hdf5_stores = {file_name: h5py.File(file_name, 'r') for file_name in file_names} - self.index_to_filename = dict() self.total_count = 0 - self.index_to_length = {} + self.lengths = dict() + self.initial_index = dict() + self.final_index = dict() fields = None sequence_field = None for file_name, store in self.hdf5_stores.items(): - file_index_to_length = json.loads(store[_LENGTHS_KEY][0]) fields = json.loads(store[_FIELDS_KEY][0]) if _FIELDS_KEY in store else None sequence_field = (store[_SEQUENCE_FIELD_KEY][0] if _SEQUENCE_FIELD_KEY in store else None) - file_index_to_length = {int(index): length - for index, length in file_index_to_length.items() - if min_length <= length <= max_length} - - self.index_to_length.update(file_index_to_length) - self.total_count += len(file_index_to_length) - for index in file_index_to_length.keys(): - self.index_to_filename[index] = file_name + + self.lengths[file_name] = json.loads(store[_LENGTHS_KEY][0]) + self.total_count += len(self.lengths[file_name]) + self.initial_index[file_name] = int(store[_INITIAL_INDEX_KEY][0]) + self.final_index[file_name] = int(store[_FINAL_INDEX_KEY][0]) super().__init__(fields, sequence_field) def close(self): @@ -176,17 +168,36 @@ def close(self): hdf5_store.close() self.hdf5_stores.clear() + def _find_file(self, index: int) -> Optional[str]: + for file_name in self.file_names: + if self.initial_index[file_name] <= index <= self.final_index[file_name]: + return file_name + return None + def indexes(self) -> Iterable[int]: """ See super class docstring. """ - yield from self.index_to_filename.keys() + for file_name in self.file_names: + initial = self.initial_index[file_name] + final = self.final_index[file_name] + yield from range(initial, final + 1) def length(self, index: int) -> int: """ See super class docstring. """ - return self.index_to_length.get(index, None) + file_name = self._find_file(index) + if not file_name: + return 0 + initial = self.initial_index[file_name] + return self.lengths[file_name][index - initial] def indexes_and_lengths(self) -> Iterable[Tuple[int, int]]: """ See super class docstring. """ - yield from self.index_to_length.items() + for file_name in self.file_names: + initial = self.initial_index[file_name] + final = self.final_index[file_name] + lengths = self.lengths[file_name] + for index in range(initial, final + 1): + length = lengths[index - initial] + yield index, length def retrieve(self, idx: int) -> Union[np.ndarray, Dict[str, np.ndarray]]: """ See super class docstring. """ @@ -196,7 +207,9 @@ def retrieve(self, idx: int) -> Union[np.ndarray, Dict[str, np.ndarray]]: return self._retrieve(idx, None) def _retrieve(self, idx: int, field: Optional[str]) -> Optional[np.ndarray]: - file_name = self.index_to_filename.get(idx, None) + file_name = self._find_file(idx) + if not file_name: + return None internal_key = _compose_key(idx, field) return (_to_numpy(self.hdf5_stores[file_name][internal_key]) if file_name else None) diff --git a/seqp/record.py b/seqp/record.py index e3a2b71..e406f76 100644 --- a/seqp/record.py +++ b/seqp/record.py @@ -113,17 +113,20 @@ class RecordWriter: def __init__(self, fields: Optional[Iterable[str]]=None, sequence_field: str = None, - append: bool = False): + append: bool = False, + initial_index: int = 1): """ Constructor. :param fields: Optional fields for the records to write :param sequence_field: field containing the sequence itself (it will be used to compute the sequence length). :param append: whether to append or overwrite. + :param initial_index: initial record index for the writer. """ self.metadata = dict() self.fields = fields self.append = append + self.next_index = initial_index self.sequence_field = sequence_field if fields or sequence_field: assert fields and sequence_field @@ -162,16 +165,16 @@ def add_metadata(self, metadata: Dict[str, str]) -> None: """ self.metadata.update(metadata) - def write(self, idx: int, - record: Optional[Union[np.ndarray, Dict[str, np.ndarray]]], - ) -> None: + def write(self, + record: Union[np.ndarray, Dict[str, np.ndarray]], + ) -> int: """ Writes a record to the underlying storage. :param idx: Index of the record. Must be unique. :param record: Record to be stored, or dictionary with records (dictionary keys must match fields provided in the constructor). - :return: None + :return: index of the just written record. """ raise NotImplementedError @@ -224,10 +227,11 @@ def close(self): def _file_name(self, file_idx) -> str: return self.output_file_template.format(file_idx) - def _writer_for(self, shard_name, **extra_kwags) -> RecordWriter: + def _writer_for(self, shard_name, initial_index, **extra_kwags) -> RecordWriter: args = list(self.args) kwargs = dict(self.kwargs) kwargs.update(extra_kwags) + kwargs['initial_index'] = initial_index if isinstance(self.output_file_param, int): args.insert(self.output_file_param, shard_name) @@ -236,22 +240,24 @@ def _writer_for(self, shard_name, **extra_kwags) -> RecordWriter: return self.writer_class(*args, **kwargs) - def _next_writer(self) -> None: + def _next_writer(self, initial_index) -> None: self.former_files.append(self._file_name(self.current_output_file_idx)) self.current_writer.close() self.current_records = 0 self.current_output_file_idx += 1 shard_name = self._file_name(self.current_output_file_idx) - self.current_writer = self._writer_for(shard_name) + self.current_writer = self._writer_for(shard_name, initial_index) self.current_writer.add_metadata(self.metadata) - def write(self, idx: int, - record: Optional[Union[np.ndarray, Dict[str, np.ndarray]]], - ) -> None: + def write(self, + record: Union[np.ndarray, Dict[str, np.ndarray]], + ) -> int: if self.current_records >= self.max_records_per_shard: - self._next_writer() - self.current_writer.write(idx, record) + initial_index = 1 if self.current_writer is None else self.current_writer.next_index + self._next_writer(initial_index) + idx = self.current_writer.write(record) self.current_records += 1 + return idx def add_metadata(self, metadata: Dict[str, str]) -> None: self.current_writer.add_metadata(metadata) diff --git a/seqp/stress.py b/seqp/stress.py new file mode 100644 index 0000000..59683b1 --- /dev/null +++ b/seqp/stress.py @@ -0,0 +1,70 @@ +import argparse +from glob import glob +import numpy as np +from seqp.record import RecordReader, RecordWriter +from seqp.hdf5 import Hdf5RecordWriter, Hdf5RecordReader +from seqp.record import ShardedWriter +from tqdm import tqdm +from timeit import default_timer as timer +from datetime import timedelta +import sys +from types import ModuleType, FunctionType +from gc import get_referents + +# Custom objects know their class. +# Function objects seem to know way too much, including modules. +# Exclude modules as well. +BLACKLIST = type, ModuleType, FunctionType + + +def get_size(obj): + """sum size of object & members.""" + if isinstance(obj, BLACKLIST): + raise TypeError('getsize() does not take argument of type: '+ str(type(obj))) + seen_ids = set() + size = 0 + objects = [obj] + while objects: + need_referents = [] + for obj in objects: + if not isinstance(obj, BLACKLIST) and id(obj) not in seen_ids: + seen_ids.add(id(obj)) + size += sys.getsizeof(obj) + need_referents.append(obj) + objects = get_referents(*need_referents) + return size + + +def load(): + #files = glob('/tmp/paco_seqp/test_*.hdf5') + files = ['/tmp/paco_seqp/test_{}.hdf5'.format(k) for k in range(1, 54)] + + start_time = timer() + with Hdf5RecordReader(files) as reader: + num_records = sum(1 for _ in reader.indexes()) + end_time = timer() + size = get_size(reader) + print("Num. records: {}".format(num_records)) + print("Load time: {}".format(timedelta(seconds=end_time - start_time))) + print("Reader size: {} Mb".format(size // 1000000)) + + +def save(): + # parser = argparse.ArgumentParser() + # parser.add_argument('') + seq_length = 5 + num_fields = 5 + num_segments = 20000000 + output_file_template = "/tmp/paco_seqp/test_{}.hdf5" + fields = ['field_{}'.format(k) for k in range(num_fields)] + with ShardedWriter(Hdf5RecordWriter, + output_file_template, + max_records_per_shard=200000, + fields=fields, + sequence_field=fields[0]) as writer: + for idx in tqdm(range(num_segments)): + writer.write(idx, {f: np.random.randint(32000, size=seq_length) for f in fields}) + + +if __name__ == '__main__': + load()