Skip to content

Commit

Permalink
feat(system): support conditional write semantics (datahub-project#10868
Browse files Browse the repository at this point in the history
)
  • Loading branch information
david-leifker authored Jul 11, 2024
1 parent 623b6f9 commit 5327f80
Show file tree
Hide file tree
Showing 58 changed files with 2,750 additions and 366 deletions.
45 changes: 41 additions & 4 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ record MetadataChangeProposal {
**/
systemMetadata: optional SystemMetadata
/**
* Headers - intended to mimic http headers
*/
headers: optional map[string, string]
}
```

Each proposal comprises of the following:
Each proposal is comprised of the following:

1. entityType

Expand All @@ -82,12 +86,13 @@ Each proposal comprises of the following:
Type of change you are proposing: one of

- UPSERT: Insert if not exists, update otherwise
- CREATE: Insert if not exists, fail otherwise
- CREATE: Insert aspect if not exists, fail otherwise
- CREATE_ENTITY: Insert if entity does not exist, fail otherwise
- UPDATE: Update if exists, fail otherwise
- DELETE: Delete
- PATCH: Patch the aspect instead of doing a full replace

Only UPSERT, CREATE, DELETE, PATCH are supported as of now.
Only UPSERT, CREATE, CREATE_ENTITY, DELETE, PATCH are supported as of now.

5. aspectName

Expand All @@ -110,6 +115,10 @@ Each proposal comprises of the following:

Extra metadata about the proposal like run_id or updated timestamp.

8. headers

Optional headers which are meant to mimic http headers. These are currently used for implementing conditional write logic.

GMS processes the proposal and produces the Metadata Change Log, which looks like this.

```protobuf
Expand Down Expand Up @@ -156,4 +165,32 @@ entities:
keyAspect: datasetKey
aspects:
- datasetProfile
```
```

## Features

### Conditional Writes

Conditional write semantics use extra information contained in the MCP `headers` field to possibly avoid writing new aspects
if the conditions are not met.

#### If-Version-Match

Each time an aspect is updated a `version` is incremented to represent the change to the aspect. This `version` is stored and returned
in `SystemMetadata`.

A writer can provide a header with the expected `version` when initiating the request. If the expected `version` does not
match the actual `version` stored in the database, the write will fail. This prevents overwriting an aspect that has
been modified by another process.

#### If-Modified-Since / If-Unmodified-Since

A writer may also specify time-based conditions using http header semantics. Similar to version based conditional writes
this method can be used to prevent the write if the target aspect was modified after a reading the aspect. Per the
http specification dates must comply with ISO-8601 standard.

`If-Unmodified-Since`:
A writer can specify that the aspect must NOT have been modified after a specific time, following [If-Unmodified-Since](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Unmodified-Since) http headers.

`If-Modified-Since`
A writer can specify that the aspect must have been modified after a specific time, following [If-Modified-Since](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Modified-Since) http headers.
9 changes: 8 additions & 1 deletion docs/api/openapi/openapi-usage-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ curl --location --request POST 'localhost:8080/openapi/entities/v1/' \
The second POST example will write the update ONLY if the entity doesn't exist. If the entity does exist the
command will return an error instead of overwriting the entity.

In this example we've added an additional URL parameter `createEntityIfNotExists=true`
In this example we've added a URL parameter `createEntityIfNotExists=true`

```shell
curl --location --request POST 'localhost:8080/openapi/entities/v1/?createEntityIfNotExists=true' \
Expand Down Expand Up @@ -582,3 +582,10 @@ public class Main {
}
}
```

## OpenAPI v3 Features

### Conditional Writes

All the create/POST endpoints for aspects support `headers` in the POST body to support batch APIs. See the docs in the
[MetadataChangeProposal](../../advanced/mcp-mcl.md) section for the use of these headers to support conditional writes semantics.
32 changes: 32 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,38 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- Protobuf CLI will no longer create binary encoded protoc custom properties. Flag added `-protocProp` in case this
behavior is required.
- #10868 - OpenAPI V3 - Creation of aspects will need to be wrapped within a `value` key and the API is now symmetric with respect to input and outputs.

Example Global Tags Aspect:

Previous:
```json
{
"tags": [
{
"tag": "string",
"context": "string"
}
]
}
```

New (optional fields `systemMetadata` and `headers`):

```json
{
"value": {
"tags": [
{
"tag": "string",
"context": "string"
}
]
},
"systemMetadata": {},
"headers": {}
}
```

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.aspect;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.Aspect;
Expand Down Expand Up @@ -31,6 +32,23 @@ default Aspect getLatestAspectObject(@Nonnull final Urn urn, @Nonnull final Stri
@Nonnull
Map<Urn, Map<String, Aspect>> getLatestAspectObjects(Set<Urn> urns, Set<String> aspectNames);

@Nullable
default SystemAspect getLatestSystemAspect(
@Nonnull final Urn urn, @Nonnull final String aspectName) {
return getLatestSystemAspects(ImmutableMap.of(urn, ImmutableSet.of(aspectName)))
.getOrDefault(urn, Collections.emptyMap())
.get(aspectName);
}

/**
* Returns for each URN, the map of aspectName to Aspect
*
* @param urnAspectNames urns and aspect names to fetch
* @return urn to aspect name and values
*/
@Nonnull
Map<Urn, Map<String, SystemAspect>> getLatestSystemAspects(Map<Urn, Set<String>> urnAspectNames);

@Nonnull
default Map<Urn, Boolean> entityExists(Set<Urn> urns) {
Set<String> keyAspectNames =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.linkedin.metadata.aspect;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.SystemMetadata;
import java.sql.Timestamp;
import java.time.Instant;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;

/** Delegate to restli class */
public class EnvelopedSystemAspect implements SystemAspect {

public static SystemAspect of(
@Nonnull Urn urn, @Nonnull EnvelopedAspect envelopedAspect, @Nonnull EntitySpec entitySpec) {
return new EnvelopedSystemAspect(urn, envelopedAspect, entitySpec);
}

@Getter @Nonnull private final Urn urn;
@Nonnull private final EnvelopedAspect envelopedAspect;
@Getter @Nonnull private final EntitySpec entitySpec;
@Getter @Nonnull private final AspectSpec aspectSpec;

public EnvelopedSystemAspect(
@Nonnull Urn urn, @Nonnull EnvelopedAspect envelopedAspect, @Nonnull EntitySpec entitySpec) {
this.urn = urn;
this.envelopedAspect = envelopedAspect;
this.entitySpec = entitySpec;
this.aspectSpec = this.entitySpec.getAspectSpec(envelopedAspect.getName());
}

@Nullable
@Override
public RecordTemplate getRecordTemplate() {
return envelopedAspect.getValue();
}

@Nullable
@Override
public SystemMetadata getSystemMetadata() {
return envelopedAspect.getSystemMetadata();
}

@Override
public long getVersion() {
return envelopedAspect.getVersion();
}

@Override
public Timestamp getCreatedOn() {
return Timestamp.from(Instant.ofEpochMilli(envelopedAspect.getCreated().getTime()));
}

@Override
public String getCreatedBy() {
return envelopedAspect.getCreated().getActor().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.mxe.SystemMetadata;
import java.sql.Timestamp;
import java.util.Optional;
import javax.annotation.Nonnull;

/**
Expand All @@ -22,4 +24,16 @@ default AuditStamp getAuditStamp() {
.setActor(UrnUtils.getUrn(getCreatedBy()))
.setTime(getCreatedOn().getTime());
}

/**
* If aspect version exists in system metadata, return it
*
* @return version of the aspect
*/
default Optional<Long> getSystemMetadataVersion() {
return Optional.ofNullable(getSystemMetadata())
.filter(SystemMetadata::hasVersion)
.map(SystemMetadata::getVersion)
.map(Long::parseLong);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,42 @@
import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.mxe.MetadataChangeProposal;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** Represents a proposal to write to the primary data store which may be represented by an MCP */
public interface MCPItem extends BatchItem {

Set<ChangeType> CHANGE_TYPES =
ImmutableSet.of(ChangeType.UPSERT, ChangeType.CREATE, ChangeType.CREATE_ENTITY);
ImmutableSet.of(
ChangeType.UPSERT, ChangeType.UPDATE, ChangeType.CREATE, ChangeType.CREATE_ENTITY);

@Nullable
MetadataChangeProposal getMetadataChangeProposal();

@Nonnull
default Map<String, String> getHeaders() {
if (getMetadataChangeProposal() != null && getMetadataChangeProposal().getHeaders() != null) {
return getMetadataChangeProposal().getHeaders();
}
return Collections.emptyMap();
}

default boolean hasHeader(@Nonnull String headerName) {
return getHeaders().keySet().stream().anyMatch(hdr -> hdr.equalsIgnoreCase(headerName));
}

default Optional<String> getHeader(@Nonnull String headerName) {
return getHeaders().entrySet().stream()
.filter(entry -> entry.getKey().equalsIgnoreCase(headerName))
.map(Map.Entry::getValue)
.findAny();
}

/**
* Validates that a change type is valid for the given aspect
*
Expand Down
Loading

0 comments on commit 5327f80

Please sign in to comment.