Skip to content

Commit

Permalink
Merge pull request #163 from cul-it/task/ARXIVNG-281
Browse files Browse the repository at this point in the history
ARXIVNG-281 moved Kinesis BaseConsumer to arxiv-base
  • Loading branch information
erickpeirson authored Sep 11, 2018
2 parents e429150 + e455260 commit db111f3
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 557 deletions.
1 change: 0 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ werkzeug = "==0.13"
wtforms = "==2.1"
bleach = "*"


[dev-packages]

coveralls = "*"
37 changes: 3 additions & 34 deletions search/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,11 @@
and becomes available for discovery via :mod:`search.routes.ui`.
"""
from typing import Optional
from datetime import datetime
import warnings

from flask import current_app as app

from arxiv.base import logging
from arxiv.base import agent
from .consumer import MetadataRecordProcessor, DocumentFailed, IndexingFailed
from .base import CheckpointManager

logger = logging.getLogger(__name__)
logger.propagate = False


def process_stream(duration: Optional[int] = None) -> None:
Expand All @@ -35,31 +29,6 @@ def process_stream(duration: Optional[int] = None) -> None:
"""
# We use the Flask application instance for configuration, and to manage
# integrations with metadata service, search index.
with warnings.catch_warnings(): # boto3 is notoriously annoying.
warnings.simplefilter("ignore")
start_at = app.config.get('KINESIS_START_AT')
start_type = app.config.get('KINESIS_START_TYPE')
if not start_type:
start_type = 'AT_TIMESTAMP'
if start_type == 'AT_TIMESTAMP' and not start_at:
start_at = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
agent.process_stream(MetadataRecordProcessor, app.config,
duration=duration)

processor = MetadataRecordProcessor(
app.config['KINESIS_STREAM'],
app.config['KINESIS_SHARD_ID'],
app.config['AWS_ACCESS_KEY_ID'],
app.config['AWS_SECRET_ACCESS_KEY'],
app.config['AWS_REGION'],
CheckpointManager(
app.config['KINESIS_CHECKPOINT_VOLUME'],
app.config['KINESIS_STREAM'],
app.config['KINESIS_SHARD_ID'],
),
endpoint=app.config.get('KINESIS_ENDPOINT', None),
verify=app.config.get('KINESIS_VERIFY', 'true') == 'true',
duration=duration,
start_type=start_type,
start_at=start_at,
sleep=float(app.config['KINESIS_SLEEP'])
)
processor.go()
Loading

0 comments on commit db111f3

Please sign in to comment.