Skip to content

Commit

Permalink
Add schema_url to Resource (#1871)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgetu authored Jun 1, 2021
1 parent c8ebda9 commit 56495ed
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1855](https://github.com/open-telemetry/opentelemetry-python/pull/1855))
- Fixed exporter OTLP header parsing to match baggage header formatting.
([#1869](https://github.com/open-telemetry/opentelemetry-python/pull/1869))
- Added optional `schema_url` field to `Resource` class
([#1871](https://github.com/open-telemetry/opentelemetry-python/pull/1871))

## [1.2.0, 0.21b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.2.0-0.21b0) - 2021-05-11

Expand Down
52 changes: 45 additions & 7 deletions opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,25 @@
class Resource:
"""A Resource is an immutable representation of the entity producing telemetry as Attributes."""

def __init__(self, attributes: Attributes):
def __init__(
self, attributes: Attributes, schema_url: typing.Optional[str] = None
):
_filter_attributes(attributes)
self._attributes = attributes.copy()
if schema_url is None:
schema_url = ""
self._schema_url = schema_url

@staticmethod
def create(attributes: typing.Optional[Attributes] = None) -> "Resource":
def create(
attributes: typing.Optional[Attributes] = None,
schema_url: typing.Optional[str] = None,
) -> "Resource":
"""Creates a new `Resource` from attributes.
Args:
attributes: Optional zero or more key-value pairs.
schema_url: Optional URL pointing to the schema
Returns:
The newly-created Resource.
Expand All @@ -159,7 +168,7 @@ def create(attributes: typing.Optional[Attributes] = None) -> "Resource":
attributes = {}
resource = _DEFAULT_RESOURCE.merge(
OTELResourceDetector().detect()
).merge(Resource(attributes))
).merge(Resource(attributes, schema_url))
if not resource.attributes.get(SERVICE_NAME, None):
default_service_name = "unknown_service"
process_executable_name = resource.attributes.get(
Expand All @@ -168,7 +177,7 @@ def create(attributes: typing.Optional[Attributes] = None) -> "Resource":
if process_executable_name:
default_service_name += ":" + process_executable_name
resource = resource.merge(
Resource({SERVICE_NAME: default_service_name})
Resource({SERVICE_NAME: default_service_name}, schema_url)
)
return resource

Expand All @@ -180,12 +189,21 @@ def get_empty() -> "Resource":
def attributes(self) -> Attributes:
return self._attributes.copy()

@property
def schema_url(self) -> str:
return self._schema_url

def merge(self, other: "Resource") -> "Resource":
"""Merges this resource and an updating resource into a new `Resource`.
If a key exists on both the old and updating resource, the value of the
updating resource will override the old resource value.
The updating resource's `schema_url` will be used only if the old
`schema_url` is empty. Attempting to merge two resources with
different, non-empty values for `schema_url` will result in an error
and return the old resource.
Args:
other: The other resource to be merged.
Expand All @@ -194,15 +212,35 @@ def merge(self, other: "Resource") -> "Resource":
"""
merged_attributes = self.attributes
merged_attributes.update(other.attributes)
return Resource(merged_attributes)

if self.schema_url == "":
schema_url = other.schema_url
elif other.schema_url == "":
schema_url = self.schema_url
elif self.schema_url == other.schema_url:
schema_url = other.schema_url
else:
logger.error(
"Failed to merge resources: The two schemas %s and %s are incompatible",
self.schema_url,
other.schema_url,
)
return self

return Resource(merged_attributes, schema_url)

def __eq__(self, other: object) -> bool:
if not isinstance(other, Resource):
return False
return self._attributes == other._attributes
return (
self._attributes == other._attributes
and self._schema_url == other._schema_url
)

def __hash__(self):
return hash(dumps(self._attributes, sort_keys=True))
return hash(
f"{dumps(self._attributes, sort_keys=True)}|{self._schema_url}"
)


_EMPTY_RESOURCE = Resource({})
Expand Down
143 changes: 141 additions & 2 deletions opentelemetry-sdk/tests/resources/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import os
import unittest
import uuid
from logging import ERROR
from unittest import mock

from opentelemetry.sdk import resources
Expand Down Expand Up @@ -51,6 +52,14 @@ def test_create(self):
resource = resources.Resource.create(attributes)
self.assertIsInstance(resource, resources.Resource)
self.assertEqual(resource.attributes, expected_attributes)
self.assertEqual(resource.schema_url, "")

schema_url = "https://opentelemetry.io/schemas/1.3.0"

resource = resources.Resource.create(attributes, schema_url)
self.assertIsInstance(resource, resources.Resource)
self.assertEqual(resource.attributes, expected_attributes)
self.assertEqual(resource.schema_url, schema_url)

os.environ[resources.OTEL_RESOURCE_ATTRIBUTES] = "key=value"
resource = resources.Resource.create(attributes)
Expand All @@ -67,17 +76,45 @@ def test_create(self):
self.assertEqual(
resource,
resources._DEFAULT_RESOURCE.merge(
resources.Resource({resources.SERVICE_NAME: "unknown_service"})
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
),
)
self.assertEqual(resource.schema_url, "")

resource = resources.Resource.create(None, None)
self.assertEqual(
resource,
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
),
)
self.assertEqual(resource.schema_url, "")

resource = resources.Resource.create({})
self.assertEqual(
resource,
resources._DEFAULT_RESOURCE.merge(
resources.Resource({resources.SERVICE_NAME: "unknown_service"})
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
),
)
self.assertEqual(resource.schema_url, "")

resource = resources.Resource.create({}, None)
self.assertEqual(
resource,
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
),
)
self.assertEqual(resource.schema_url, "")

def test_resource_merge(self):
left = resources.Resource({"service": "ui"})
Expand All @@ -86,6 +123,33 @@ def test_resource_merge(self):
left.merge(right),
resources.Resource({"service": "ui", "host": "service-host"}),
)
schema_urls = (
"https://opentelemetry.io/schemas/1.2.0",
"https://opentelemetry.io/schemas/1.3.0",
)

left = resources.Resource.create({}, None)
right = resources.Resource.create({}, None)
self.assertEqual(left.merge(right).schema_url, "")

left = resources.Resource.create({}, None)
right = resources.Resource.create({}, schema_urls[0])
self.assertEqual(left.merge(right).schema_url, schema_urls[0])

left = resources.Resource.create({}, schema_urls[0])
right = resources.Resource.create({}, None)
self.assertEqual(left.merge(right).schema_url, schema_urls[0])

left = resources.Resource.create({}, schema_urls[0])
right = resources.Resource.create({}, schema_urls[0])
self.assertEqual(left.merge(right).schema_url, schema_urls[0])

left = resources.Resource.create({}, schema_urls[0])
right = resources.Resource.create({}, schema_urls[1])
with self.assertLogs(level=ERROR) as log_entry:
self.assertEqual(left.merge(right), left)
self.assertIn(schema_urls[0], log_entry.output[0])
self.assertIn(schema_urls[1], log_entry.output[0])

def test_resource_merge_empty_string(self):
"""Verify Resource.merge behavior with the empty string.
Expand Down Expand Up @@ -130,6 +194,11 @@ def test_immutability(self):
attributes["cost"] = 999.91
self.assertEqual(resource.attributes, attributes_copy)

with self.assertRaises(AttributeError):
resource.schema_url = "bug"

self.assertEqual(resource.schema_url, "")

def test_service_name_using_process_name(self):
resource = resources.Resource.create(
{resources.PROCESS_EXECUTABLE_NAME: "test"}
Expand Down Expand Up @@ -220,6 +289,76 @@ def test_aggregated_resources_multiple_detectors(self):
),
)

def test_aggregated_resources_different_schema_urls(self):
resource_detector1 = mock.Mock(spec=resources.ResourceDetector)
resource_detector1.detect.return_value = resources.Resource(
{"key1": "value1"}, ""
)
resource_detector2 = mock.Mock(spec=resources.ResourceDetector)
resource_detector2.detect.return_value = resources.Resource(
{"key2": "value2", "key3": "value3"}, "url1"
)
resource_detector3 = mock.Mock(spec=resources.ResourceDetector)
resource_detector3.detect.return_value = resources.Resource(
{
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
},
"url2",
)
resource_detector4 = mock.Mock(spec=resources.ResourceDetector)
resource_detector4.detect.return_value = resources.Resource(
{
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
},
"url1",
)
self.assertEqual(
resources.get_aggregated_resources(
[resource_detector1, resource_detector2]
),
resources.Resource(
{"key1": "value1", "key2": "value2", "key3": "value3"},
"url1",
),
)
with self.assertLogs(level=ERROR) as log_entry:
self.assertEqual(
resources.get_aggregated_resources(
[resource_detector2, resource_detector3]
),
resources.Resource(
{"key2": "value2", "key3": "value3"}, "url1"
),
)
self.assertIn("url1", log_entry.output[0])
self.assertIn("url2", log_entry.output[0])
with self.assertLogs(level=ERROR):
self.assertEqual(
resources.get_aggregated_resources(
[
resource_detector2,
resource_detector3,
resource_detector4,
resource_detector1,
]
),
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
},
"url1",
),
)
self.assertIn("url1", log_entry.output[0])
self.assertIn("url2", log_entry.output[0])

def test_resource_detector_ignore_error(self):
resource_detector = mock.Mock(spec=resources.ResourceDetector)
resource_detector.detect.side_effect = Exception()
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ def test_span_override_start_and_end_time(self):
self.assertEqual(end_time, span.end_time)

def test_ended_span(self):
""""Events, attributes are not allowed after span is ended"""
"""Events, attributes are not allowed after span is ended"""

root = self.tracer.start_span("root")

Expand Down

0 comments on commit 56495ed

Please sign in to comment.