Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(eap-spans): Add a field to suggest consumers to ingest spans in EAP #4206

Merged
merged 10 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- Add a tag `target_project_id` to both root project metrics for dynamic sampling (`c:transactions/count_per_root_project@none` and `c:spans/count_per_root_project@none`) which shows the flow trace traffic from root to target projects. ([#4170](https://github.com/getsentry/relay/pull/4170))
- Remove `buffer` entries and scrub array contents from MongoDB queries. ([#4186](https://github.com/getsentry/relay/pull/4186))
- Use `DateTime<Utc>` instead of `Instant` for tracking the received time of the `Envelope`. ([#4184](https://github.com/getsentry/relay/pull/4184))
- Add a field to suggest consumers to ingest spans in EAP. ([#4206](https://github.com/getsentry/relay/pull/4206))

## 24.10.0

Expand Down
5 changes: 5 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ pub enum Feature {
/// Serialized as `organizations:performance-queries-mongodb-extraction`.
#[serde(rename = "organizations:performance-queries-mongodb-extraction")]
ScrubMongoDbDescriptions,
/// Indicate if the EAP consumers should ingest a span.
///
/// Serialized as `organizations:ingest-spans-in-eap`
#[serde(rename = "organizations:ingest-spans-in-eap")]
IngestSpansInEap,

/// This feature has graduated and is hard-coded for external Relays.
#[doc(hidden)]
Expand Down
17 changes: 17 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,12 @@ pub struct ItemHeaders {
#[serde(default = "default_true", skip_serializing_if = "is_true")]
sampled: bool,

/// Indicates if we should ingest the item in the EAP
///
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
ingest_span_in_eap: bool,

/// Other attributes for forward compatibility.
#[serde(flatten)]
other: BTreeMap<String, Value>,
Expand Down Expand Up @@ -643,6 +649,7 @@ impl Item {
spans_extracted: false,
sampled: true,
fully_normalized: false,
ingest_span_in_eap: false,
},
payload: Bytes::new(),
}
Expand Down Expand Up @@ -843,6 +850,16 @@ impl Item {
self.headers.fully_normalized = fully_normalized;
}

/// Returns whether or not to ingest the span in EAP.
pub fn ingest_span_in_eap(&self) -> bool {
self.headers.ingest_span_in_eap
}

/// Set whether or not to ingest the span in EAP.
pub fn set_ingest_span_in_eap(&mut self, ingest_span_in_eap: bool) {
self.headers.ingest_span_in_eap = ingest_span_in_eap;
}

/// Gets the `sampled` flag.
pub fn sampled(&self) -> bool {
self.headers.sampled
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: relay-server/src/metrics_extraction/event.rs
expression: "(&event.value().unwrap().spans, metrics)"
expression: "(&event.value().unwrap().spans, metrics.project_metrics)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, where did this come from?

---
(
[
Expand Down
14 changes: 14 additions & 0 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ pub fn process(
};
new_item.set_payload(ContentType::Json, payload);
new_item.set_metrics_extracted(item.metrics_extracted());
new_item.set_ingest_span_in_eap(
state
.project_info
.config
.features
.has(Feature::IngestSpansInEap),
);

*item = new_item;

Expand Down Expand Up @@ -279,6 +286,11 @@ pub fn extract_from_event(
.envelope()
.dsc()
.and_then(|ctx| ctx.sample_rate);
let ingest_in_eap = state
.project_info
.config
.features
.has(Feature::IngestSpansInEap);

let mut add_span = |mut span: Span| {
add_sample_rate(
Expand Down Expand Up @@ -330,6 +342,7 @@ pub fn extract_from_event(
item.set_payload(ContentType::Json, span);
// If metrics extraction happened for the event, it also happened for its spans:
item.set_metrics_extracted(state.event_metrics_extracted);
item.set_ingest_span_in_eap(ingest_in_eap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make the header an envelope header and set it in a more central place (e.g. process_envelope), so we don't have to set it in two different code paths. Given this is temporary it does not really matter though.


relay_log::trace!("Adding span to envelope");
state.managed_envelope.envelope_mut().add_item(item);
Expand Down Expand Up @@ -360,6 +373,7 @@ pub fn extract_from_event(
) else {
return;
};

// Add child spans as envelope items.
if let Some(child_spans) = event.spans.value() {
for span in child_spans {
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ impl StoreService {
span.project_id = scoping.project_id.value();
span.retention_days = retention_days;
span.start_timestamp_ms = (span.start_timestamp_precise * 1e3) as u64;
span.ingest_in_eap = item.ingest_span_in_eap();

if let Some(measurements) = &mut span.measurements {
measurements.retain(|_, v| {
Expand Down Expand Up @@ -1360,6 +1361,9 @@ struct SpanKafkaMessage<'a> {

#[serde(borrow, default, skip_serializing)]
platform: Cow<'a, str>, // We only use this for logging for now

#[serde(default)]
ingest_in_eap: bool,
phacops marked this conversation as resolved.
Show resolved Hide resolved
}

fn none_or_empty_object(value: &Option<&RawValue>) -> bool {
Expand Down
54 changes: 54 additions & 0 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def test_span_extraction(
"start_timestamp_precise": start.timestamp(),
"end_timestamp_precise": start.timestamp() + duration.total_seconds(),
"trace_id": "ff62a8b040f340bda5d830223def1d81",
"ingest_in_eap": False,
}

start_timestamp = datetime.fromisoformat(event["start_timestamp"]).replace(
Expand Down Expand Up @@ -161,6 +162,7 @@ def test_span_extraction(
"start_timestamp_precise": start_timestamp.timestamp(),
"end_timestamp_precise": start_timestamp.timestamp() + duration,
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"ingest_in_eap": False,
}

spans_consumer.assert_empty()
Expand Down Expand Up @@ -541,6 +543,7 @@ def test_span_extraction_with_metrics_summary(
"start_timestamp_precise": start_timestamp.timestamp(),
"end_timestamp_precise": end_timestamp.timestamp(),
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"ingest_in_eap": False,
}

spans_consumer.assert_empty()
Expand Down Expand Up @@ -672,6 +675,7 @@ def test_span_extraction_with_ddm_missing_values(
"start_timestamp_precise": start_timestamp.timestamp(),
"end_timestamp_precise": end_timestamp.timestamp(),
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"ingest_in_eap": False,
}

spans_consumer.assert_empty()
Expand Down Expand Up @@ -907,6 +911,7 @@ def test_span_ingestion_with_performance_scores(
"ttfb": {"value": 500.0},
"score.cls": {"value": 0.0},
},
"ingest_in_eap": False,
},
{
"data": {
Expand Down Expand Up @@ -942,6 +947,7 @@ def test_span_ingestion_with_performance_scores(
"score.total": {"value": 0.9948129113413748},
"score.weight.inp": {"value": 1.0},
},
"ingest_in_eap": False,
},
]

Expand Down Expand Up @@ -1597,3 +1603,51 @@ def test_metrics_summary_with_standalone_spans(
expected_mris = ["c:spans/some_metric@none", "d:custom/my_metric@millisecond"]
for metric_summary in metrics_summaries:
assert metric_summary["mri"] in expected_mris


@pytest.mark.parametrize("ingest_in_eap", [True, False])
def test_ingest_in_eap(
mini_sentry,
relay_with_processing,
spans_consumer,
ingest_in_eap,
):
spans_consumer = spans_consumer()

relay = relay_with_processing(options=TEST_CONFIG)
project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"]["features"] = [
"organizations:indexed-spans-extraction",
]

if ingest_in_eap:
project_config["config"]["features"] += ["organizations:ingest-spans-in-eap"]

event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"})
end = datetime.now(timezone.utc) - timedelta(seconds=1)
duration = timedelta(milliseconds=500)
start = end - duration
event["spans"] = [
{
"description": "GET /api/0/organizations/?member=1",
"op": "http",
"origin": "manual",
"parent_span_id": "aaaaaaaaaaaaaaaa",
"span_id": "bbbbbbbbbbbbbbbb",
"start_timestamp": start.isoformat(),
"status": "success",
"timestamp": end.isoformat(),
"trace_id": "ff62a8b040f340bda5d830223def1d81",
},
]

relay.send_event(project_id, event)

child_span = spans_consumer.get_span()
assert child_span.get("ingest_in_eap", False) == ingest_in_eap

transaction_span = spans_consumer.get_span()
assert transaction_span.get("ingest_in_eap", False) == ingest_in_eap

spans_consumer.assert_empty()
Loading