Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added functions for get_selected_streams #100

Merged
merged 2 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion singer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@
resolve_schema_references
)

from singer.catalog import Catalog
from singer.catalog import (
Catalog,
CatalogEntry
)
from singer.schema import Schema

from singer.bookmarks import (
Expand Down
37 changes: 34 additions & 3 deletions singer/catalog.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
'''Provides an object model for a Singer Catalog.'''

import json
import sys

from singer.schema import Schema
from . import metadata as metadata_module
from .bookmarks import get_currently_syncing
from .logger import get_logger
from .schema import Schema

LOGGER = get_logger()


# pylint: disable=too-many-instance-attributes
class CatalogEntry():
Expand Down Expand Up @@ -33,7 +38,9 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__

def is_selected(self):
return self.schema.selected # pylint: disable=no-member
mdata = metadata_module.to_map(self.metadata)
# pylint: disable=no-member
return self.schema.selected or metadata_module.get(mdata, (), 'selected')

def to_dict(self):
result = {}
Expand Down Expand Up @@ -116,3 +123,27 @@ def get_stream(self, tap_stream_id):
if stream.tap_stream_id == tap_stream_id:
return stream
return None

def _shuffle_streams(self, state):
currently_syncing = get_currently_syncing(state)

if currently_syncing is None:
return self.streams

matching_index = 0
for i, catalog_entry in enumerate(self.streams):
if catalog_entry.tap_stream_id == currently_syncing:
matching_index = i
break
top_half = self.streams[matching_index:]
bottom_half = self.streams[:matching_index]
return top_half + bottom_half


def get_selected_streams(self, state):
for stream in self._shuffle_streams(state):
if not stream.is_selected():
LOGGER.info('Skipping stream: %s', stream.tap_stream_id)
continue

yield stream
36 changes: 35 additions & 1 deletion tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,40 @@
from singer.schema import Schema
from singer.catalog import Catalog, CatalogEntry

class TestGetSelectedStreams(unittest.TestCase):
def test_one_selected_stream(self):
selected_entry = CatalogEntry(tap_stream_id='a',
schema=Schema(),
metadata=[{'metadata':
{'selected': True},
'breadcrumb': []}])
catalog = Catalog(
[selected_entry,
CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]),
CatalogEntry(tap_stream_id='c',schema=Schema(),metadata=[])])
state = {}
selected_streams = catalog.get_selected_streams(state)
self.assertEquals([e for e in selected_streams],[selected_entry])

def test_resumes_currently_syncing_stream(self):
selected_entry_a = CatalogEntry(tap_stream_id='a',
schema=Schema(),
metadata=[{'metadata':
{'selected': True},
'breadcrumb': []}])
selected_entry_c = CatalogEntry(tap_stream_id='c',
schema=Schema(),
metadata=[{'metadata':
{'selected': True},
'breadcrumb': []}])
catalog = Catalog(
[selected_entry_a,
CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]),
selected_entry_c])
state = {'currently_syncing': 'c'}
selected_streams = catalog.get_selected_streams(state)
self.assertEquals([e for e in selected_streams][0],selected_entry_c)

class TestToDictAndFromDict(unittest.TestCase):

dict_form = {
Expand Down Expand Up @@ -89,7 +123,7 @@ def test_from_dict(self):

def test_to_dict(self):
self.assertEqual(self.dict_form, self.obj_form.to_dict())


class TestGetStream(unittest.TestCase):
def test(self):
Expand Down