Skip to content

Commit

Permalink
Read only endpoint for XCom apache#8134
Browse files Browse the repository at this point in the history
  • Loading branch information
randr97 committed Jun 23, 2020
1 parent bdef53b commit e6ac7bb
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 127 deletions.
55 changes: 45 additions & 10 deletions airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from flask import request
from sqlalchemy import and_, func
from sqlalchemy.orm.session import Session

from sqlalchemy import and_

from airflow.api_connexion.schemas.xcom_schema import xcom_collection_item_schema
from airflow.api_connexion import parameters
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.schemas.xcom_schema import (
XComCollection, XComCollectionItemSchema, XComCollectionSchema, xcom_collection_item_schema,
xcom_collection_schema,
)
from airflow.models import DagRun as DR, XCom
from airflow.utils.session import provide_session

Expand All @@ -29,15 +35,44 @@ def delete_xcom_entry():
raise NotImplementedError("Not implemented yet.")


def get_xcom_entries():
@provide_session
def get_xcom_entries(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Session
) -> XComCollectionSchema:
"""
Get all XCom values
"""
raise NotImplementedError("Not implemented yet.")
offset = request.args.get(parameters.page_offset, 0)
limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
query = session.query(XCom)
if dag_id != '~':
query = query.filter(XCom.dag_id == dag_id)
query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.execution_date == DR.execution_date))
else:
query.join(DR, XCom.execution_date == DR.execution_date)
if task_id != '~':
query = query.filter(XCom.task_id == task_id)
if dag_run_id != '~':
query = query.filter(DR.run_id == dag_run_id)
query = query.order_by(
XCom.execution_date, XCom.task_id, XCom.dag_id, XCom.key
)
total_entries = session.query(func.count(XCom.key)).scalar()
query = query.offset(offset).limit(limit)
return xcom_collection_schema.dump(XComCollection(xcom_entries=query.all(), total_entries=total_entries))


@provide_session
def get_xcom_entry(dag_id, task_id, dag_run_id, xcom_key, session):
def get_xcom_entry(
dag_id: str,
task_id: str,
dag_run_id: str,
xcom_key: str,
session: Session
) -> XComCollectionItemSchema:
"""
Get an XCom entry
"""
Expand All @@ -48,10 +83,10 @@ def get_xcom_entry(dag_id, task_id, dag_run_id, xcom_key, session):
query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.execution_date == DR.execution_date))
query = query.filter(DR.run_id == dag_run_id)

q_object = query.one_or_none()
if not q_object:
raise Exception("Object Not found")
return xcom_collection_item_schema.dump(q_object)
query_object = query.one_or_none()
if not query_object:
raise NotFound("XCom entry not found")
return xcom_collection_item_schema.dump(query_object)


def patch_xcom_entry():
Expand Down
11 changes: 0 additions & 11 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2193,17 +2193,6 @@ components:
required: true
description: The Variable Key.

ExecutionDate:
in: path
name: execution_date
schema:
type: string
format: date-time
required: true
description: The date-time notation as defined by
[RFC 3339, section 5.6](https://tools.ietf.org/html/rfc3339#section-5.6),
E.G. `2017-07-21T17:32:28Z`

# Logs
FullContent:
in: query
Expand Down
50 changes: 23 additions & 27 deletions airflow/api_connexion/schemas/xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,50 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from marshmallow import post_dump
from typing import List, NamedTuple

from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.models import XCom


class XComCollectionItemSchema(SQLAlchemySchema):
"""
Schema for a xcom item
"""

class Meta:
""" Meta """
model = XCom

COLLECTION_NAME = 'xcom_entries'
FIELDS_FROM_NONE_TO_EMPTY_STRING = ['key', 'task_id', 'dag_id']

key = auto_field()
timestamp = auto_field()
execution_date = auto_field()
task_id = auto_field()
dag_id = auto_field()

@post_dump(pass_many=True)
def wrap_with_envelope(self, data, many, **kwargs):
"""
:param data: Deserialized data
:param many: Collection or an item
"""
if many:
data = self._process_list_data(data)
return {self.COLLECTION_NAME: data, 'total_entries': len(data)}
data = self._process_data(data)
return data

def _process_list_data(self, data):
return [self._process_data(x) for x in data]
class XComSchema(XComCollectionItemSchema):
"""
XCom schema
"""

def _process_data(self, data):
for key in self.FIELDS_FROM_NONE_TO_EMPTY_STRING:
if not data[key]:
data.update({key: ''})
return data
value = auto_field()


class XComSchema(XComCollectionItemSchema):
class XComCollection(NamedTuple):
""" List of XComs with meta"""
xcom_entries: List[XCom]
total_entries: int

value = auto_field()

class XComCollectionSchema(Schema):
""" XCom Collection Schema"""
xcom_entries = fields.List(fields.Nested(XComCollectionItemSchema))
total_entries = fields.Int()


xcom_schema = XComSchema()
xcom_collection_item_schema = XComCollectionItemSchema()
xcom_collection_schema = XComCollectionItemSchema(many=True)
xcom_schema = XComSchema(strict=True)
xcom_collection_item_schema = XComCollectionItemSchema(strict=True)
xcom_collection_schema = XComCollectionSchema(strict=True)
9 changes: 1 addition & 8 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
Boolean, Column, DateTime, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_,
)
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import backref, relationship, synonym
from sqlalchemy.orm import synonym
from sqlalchemy.orm.session import Session

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -66,13 +66,6 @@ class DagRun(Base, LoggingMixin):
UniqueConstraint('dag_id', 'run_id'),
)

task_instances = relationship(
TI,
primaryjoin=and_(TI.dag_id == dag_id, TI.execution_date == execution_date),
foreign_keys=(dag_id, execution_date),
backref=backref('dag_run', uselist=False),
)

def __init__(self, dag_id=None, run_id=None, execution_date=None, start_date=None, external_trigger=None,
conf=None, state=None, run_type=None):
self.dag_id = dag_id
Expand Down
Loading

0 comments on commit e6ac7bb

Please sign in to comment.