From 0e21edac539c07a4a22835d82d65df5b45f5b57f Mon Sep 17 00:00:00 2001 From: momo-jun Date: Fri, 28 Oct 2022 17:05:28 +0800 Subject: [PATCH 01/10] re-org the schema chapter --- site2/docs/admin-api-schemas.md | 605 ++++++++++++++- site2/docs/schema-evolution-compatibility.md | 122 +-- site2/docs/schema-get-started.md | 322 ++++++-- site2/docs/schema-manage.md | 766 +------------------ site2/docs/schema-overview.md | 153 ++++ site2/docs/schema-understand.md | 495 +++--------- 6 files changed, 1171 insertions(+), 1292 deletions(-) create mode 100644 site2/docs/schema-overview.md diff --git a/site2/docs/admin-api-schemas.md b/site2/docs/admin-api-schemas.md index 8399a03b8723f..73e8ad9613131 100644 --- a/site2/docs/admin-api-schemas.md +++ b/site2/docs/admin-api-schemas.md @@ -1,6 +1,609 @@ --- id: admin-api-schemas -title: Managing Schemas +title: Manage Schemas sidebar_label: "Schemas" --- + +````mdx-code-block +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; +```` + +:::tip + +This page only shows **some frequently used operations**. + +- For the latest and complete information about `Pulsar admin`, including commands, flags, descriptions, and more, see [Pulsar admin doc](/tools/pulsar-admin/) + +- For the latest and complete information about `REST API`, including parameters, responses, samples, and more, see {@inject: rest:REST:/} API doc. + +- For the latest and complete information about `Java admin API`, including classes, methods, descriptions, and more, see [Java admin API doc](/api/admin/). + +::: + +## Manage AutoUpdate strategy + +### Enable AutoUpdate + +To enable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command. + +```bash +bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace +``` + +### Disable AutoUpdate + +To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command. + +```bash +bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace +``` + +Once the `AutoUpdate` is disabled, you can only register a new schema using the `pulsar-admin` command. + +### Adjust compatibility + +To adjust the schema compatibility level on a namespace, you can use the `pulsar-admin` command. + +```bash +bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility tenant/namespace +``` + +## Schema validation + +### Enable schema validation + +To enable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command. + +```bash +bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace +``` + +### Disable schema validation + +To disable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command. + +```bash +bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace +``` + +## Schema manual management + +### Upload a schema + +To upload (register) a new schema for a topic, you can use one of the following methods. + +````mdx-code-block + + + + +Use the `upload` subcommand. + +```bash +pulsar-admin schemas upload --filename +``` + +The `schema-definition-file` is in JSON format. + +```json +{ + "type": "", + "schema": "", + "properties": {} // the properties associated with the schema +} +``` + +The `schema-definition-file` includes the following fields: + +| Field | Description | +| --- | --- | +| `type` | The schema type. | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +Here are examples of the `schema-definition-file` for a JSON schema. + +**Example 1** + +```json +{ + "type": "JSON", + "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}", + "properties": {} +} +``` + +**Example 2** + +```json +{ + "type": "STRING", + "schema": "", + "properties": { + "key1": "value1" + } +} +``` + +
    + + +Send a `POST` request to this endpoint: {@inject: endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchema?version=@pulsar:version_number@} + +The post payload is in JSON format. + +```json +{ + "type": "", + "schema": "", + "properties": {} // the properties associated with the schema +} +``` + +The post payload includes the following fields: + +| Field | Description | +| --- | --- | +| `type` | The schema type. | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +
    + + +```java +void createSchema(String topic, PostSchemaPayload schemaPayload) +``` + +The `PostSchemaPayload` includes the following fields: + +| Field | Description | +| --- | --- | +| `type` | The schema type. | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +Here is an example of `PostSchemaPayload`: + +```java +PulsarAdmin admin = …; + +PostSchemaPayload payload = new PostSchemaPayload(); +payload.setType("INT8"); +payload.setSchema(""); + +admin.createSchema("my-tenant/my-ns/my-topic", payload); +``` + +
    + +
    +```` + +### Get a schema (latest) + +To get the latest schema for a topic, you can use one of the following methods. + +````mdx-code-block + + + + +Use the `get` subcommand. + +```bash +pulsar-admin schemas get + +{ + "version": 0, + "type": "String", + "timestamp": 0, + "data": "string", + "properties": { + "property1": "string", + "property2": "string" + } +} +``` + + + + +Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema?version=@pulsar:version_number@} + +Here is an example of a response, which is returned in JSON format. + +```json +{ + "version": "", + "type": "", + "timestamp": "", + "data": "", + "properties": {} // the properties associated with the schema +} +``` + +The response includes the following fields: + +| Field | Description | +| --- | --- | +| `version` | The schema version, which is a long number. | +| `type` | The schema type. | +| `timestamp` | The timestamp of creating this version of schema. | +| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +
    + + +```java +SchemaInfo createSchema(String topic) +``` + +The `SchemaInfo` includes the following fields: + +| Field | Description | +| --- | --- | +| `name` | The schema name. | +| `type` | The schema type. | +| `schema` | A byte array of the schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this byte array should be empty.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | +| `properties` | The additional properties associated with the schema. | + +Here is an example of `SchemaInfo`: + +```java +PulsarAdmin admin = …; + +SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic"); +``` + +
    + +
    +```` + +### Get a schema (specific) + +To get a specific version of a schema, you can use one of the following methods. + +````mdx-code-block + + + + +Use the `get` subcommand. + +```bash +pulsar-admin schemas get --version= +``` + + + + +Send a `GET` request to a schema endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchema?version=@pulsar:version_number@} + +Here is an example of a response, which is returned in JSON format. + +```json +{ + "version": "", + "type": "", + "timestamp": "", + "data": "", + "properties": {} // the properties associated with the schema +} +``` + +The response includes the following fields: + +| Field | Description | +| --- | --- | +| `version` | The schema version, which is a long number. | +| `type` | The schema type. | +| `timestamp` | The timestamp of creating this version of schema. | +| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +
    + + +```java +SchemaInfo createSchema(String topic, long version) +``` + +The `SchemaInfo` includes the following fields: + +| Field | Description | +| --- | --- | +| `name` | The schema name. | +| `type` | The schema type. | +| `schema` | A byte array of the schema definition data, which is encoded in UTF 8.
  • If the schema is a
  • **primitive**
  • schema, this byte array should be empty.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | +| `properties` | The additional properties associated with the schema. | + +Here is an example of `SchemaInfo`: + +```java +PulsarAdmin admin = …; + +SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L); +``` + +
    + +
    +```` + +### Extract a schema + +To provide a schema via a topic, you can use the following method. + +````mdx-code-block + + + + +Use the `extract` subcommand. + +```bash +pulsar-admin schemas extract --classname --jar --type +``` + + + + +```` + +### Delete a schema + +To delete a schema for a topic, you can use one of the following methods. + +:::note + +In any case, the **delete** action deletes **all versions** of a schema registered for a topic. + +::: + +````mdx-code-block + + + + +Use the `delete` subcommand. + +```bash +pulsar-admin schemas delete +``` + + + + +Send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema?version=@pulsar:version_number@} + +Here is an example of a response, which is returned in JSON format. + +```json +{ + "version": "", +} +``` + +The response includes the following field: + +Field | Description | +---|---| +`version` | The schema version, which is a long number. | + + + + +```java +void deleteSchema(String topic) +``` + +Here is an example of deleting a schema. + +```java +PulsarAdmin admin = …; + +admin.deleteSchema("my-tenant/my-ns/my-topic"); +``` + + + + +```` + +## Set schema compatibility check strategy + +You can set [schema compatibility check strategy](schema-evolution-compatibility.md#schema-compatibility-check-strategy) at the topic, namespace or broker level. + +The schema compatibility check strategy set at different levels has priority: topic level > namespace level > broker level. + +- If you set the strategy at both topic and namespace level, it uses the topic-level strategy. + +- If you set the strategy at both namespace and broker level, it uses the namespace-level strategy. + +- If you do not set the strategy at any level, it uses the `FULL` strategy. For all available values, see [here](schema-evolution-compatibility.md#schema-compatibility-check-strategy). + + +### Topic level + +To set a schema compatibility check strategy at the topic level, use one of the following methods. + +````mdx-code-block + + + + +Use the [`pulsar-admin topicPolicies set-schema-compatibility-strategy`](/tools/pulsar-admin/) command. + +```shell +pulsar-admin topicPolicies set-schema-compatibility-strategy +``` + + + + +Send a `PUT` request to this endpoint: {@inject: endpoint|PUT|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@} + + + + +```java +void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy) +``` + +Here is an example of setting a schema compatibility check strategy at the topic level. + +```java +PulsarAdmin admin = …; + +admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); +``` + + + + +```` + +To get the topic-level schema compatibility check strategy, use one of the following methods. + +````mdx-code-block + + + + +Use the [`pulsar-admin topicPolicies get-schema-compatibility-strategy`](/tools/pulsar-admin/) command. + +```shell +pulsar-admin topicPolicies get-schema-compatibility-strategy +``` + + + + +Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@} + + + + +```java +SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied) +``` + +Here is an example of getting the topic-level schema compatibility check strategy. + +```java +PulsarAdmin admin = …; + +// get the current applied schema compatibility strategy +admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", true); + +// only get the schema compatibility strategy from topic policies +admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", false); +``` + + + + +```` + +To remove the topic-level schema compatibility check strategy, use one of the following methods. + +````mdx-code-block + + + + +Use the [`pulsar-admin topicPolicies remove-schema-compatibility-strategy`](/tools/pulsar-admin/) command. + +```shell +pulsar-admin topicPolicies remove-schema-compatibility-strategy +``` + + + + +Send a `DELETE` request to this endpoint: {@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@} + + + + +```java +void removeSchemaCompatibilityStrategy(String topic) +``` + +Here is an example of removing the topic-level schema compatibility check strategy. + +```java +PulsarAdmin admin = …; + +admin.removeSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic"); +``` + + + + +```` + +### Namespace level + +You can set schema compatibility check strategy at namespace level using one of the following methods. + +````mdx-code-block + + + + +Use the [`pulsar-admin namespaces set-schema-compatibility-strategy`](/tools/pulsar-admin/) command. + +```shell +pulsar-admin namespaces set-schema-compatibility-strategy options +``` + + + + +Send a `PUT` request to this endpoint: {@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@} + + + + +Use the [`setSchemaCompatibilityStrategy`](/api/admin/) method. + +```java +admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL); +``` + + + + +```` + +### Broker level + +You can set schema compatibility check strategy at broker level by setting `schemaCompatibilityStrategy` in `conf/broker.conf` or `conf/standalone.conf` file. + +```conf +schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE +``` \ No newline at end of file diff --git a/site2/docs/schema-evolution-compatibility.md b/site2/docs/schema-evolution-compatibility.md index 7e9d48f381d21..8bb9565a64f0c 100644 --- a/site2/docs/schema-evolution-compatibility.md +++ b/site2/docs/schema-evolution-compatibility.md @@ -6,29 +6,21 @@ sidebar_label: "Schema evolution and compatibility" Normally, schemas do not stay the same over a long period of time. Instead, they undergo evolutions to satisfy new needs. -This chapter examines how Pulsar schema evolves and what Pulsar schema compatibility check strategies are. +This chapter introduces how Pulsar schema evolves and what compatibility check strategies it adopts. ## Schema evolution -Pulsar schema is defined in a data structure called `SchemaInfo`. - -Each `SchemaInfo` stored with a topic has a version. The version is used to manage the schema changes happening within a topic. - The message produced with `SchemaInfo` is tagged with a schema version. When a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and use the correct schema information to deserialize data. -### What is schema evolution? - Schemas store the details of attributes and types. To satisfy new business requirements, you need to update schemas inevitably over time, which is called **schema evolution**. Any schema changes affect downstream consumers. Schema evolution ensures that the downstream consumers can seamlessly handle data encoded with both old schemas and new schemas. -### How Pulsar schema should evolve? - -The answer is Pulsar schema compatibility check strategy. It determines how schema compares old schemas with new schemas in topics. +### How schema evolves? -For more information, see [Schema compatibility check strategy](#schema-compatibility-check-strategy). +The answer is [schema compatibility check strategy](#schema-compatibility-check-strategy). It determines how schema compares old schemas with new schemas in topics. -### How does Pulsar support schema evolution? +### How Pulsar supports schema evolution? The process of how Pulsar supports schema evolution is described as follows. @@ -53,31 +45,31 @@ For more details, see [`schemaRegistryCompatibilityCheckers`](https://github.com ## Schema compatibility check strategy -Pulsar has 8 schema compatibility check strategies, which are summarized in the following table. +The following table outlines 8 schema compatibility check strategies and how it works. -Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest: +Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is the oldest and V3 is the latest. | Compatibility check strategy | Definition | Changes allowed | Check against which schema | Upgrade first | | --- | --- | --- | --- | --- | | `ALWAYS_COMPATIBLE` | Disable schema compatibility check. | All changes are allowed | All previous versions | Any order | -| `ALWAYS_INCOMPATIBLE` | Disable schema evolution. | All changes are disabled | None | None | -| `BACKWARD` | Consumers using schema V3 can process data written by producers using schema V3 or V2. |
  • Add optional fields
  • Delete fields
  • | Latest version | Consumers | -| `BACKWARD_TRANSITIVE` | Consumers using schema V3 can process data written by producers using schema V3, V2 or V1. |
  • Add optional fields
  • Delete fields
  • | All previous versions | Consumers | -| `FORWARD` | Consumers using schema V3 or V2 can process data written by producers using schema V3. |
  • Add fields
  • Delete optional fields
  • | Latest version | Producers | -| `FORWARD_TRANSITIVE` | Consumers using schema V3, V2, or V1 can process data written by producers using schema V3. |
  • Add fields
  • Delete optional fields
  • | All previous versions | Producers | -| `FULL` | Backward and forward compatible between the schema V3 and V2. |
  • Modify optional fields
  • | Latest version | Any order | -| `FULL_TRANSITIVE` | Backward and forward compatible among schema V3, V2, and V1. |
  • Modify optional fields
  • | All previous versions | Any order | - -### ALWAYS_COMPATIBLE and ALWAYS_INCOMPATIBLE - -| Compatibility check strategy | Definition | Note | -| --- | --- | --- | -| `ALWAYS_COMPATIBLE` | Disable schema compatibility check. | None | -| `ALWAYS_INCOMPATIBLE` | Disable schema evolution, that is, any schema change is rejected. |
  • For all schema types except Avro and JSON, the default schema compatibility check strategy is `ALWAYS_INCOMPATIBLE`.
  • For Avro and JSON, the default schema compatibility check strategy is `FULL`.
  • | - -#### Example - -* Example 1 +| `ALWAYS_INCOMPATIBLE` | Disable schema evolution, that is, any schema change is rejected. | All changes are disabled | None | None | +| `BACKWARD` | Consumers using schema V3 can process data written by producers using the **last schema version** V2. |
  • Add optional fields
  • Delete fields
  • | Latest version | Consumers | +| `BACKWARD_TRANSITIVE` | Consumers using schema V3 can process data written by producers using **all previous schema versions** V2 and V1. |
  • Add optional fields
  • Delete fields
  • | All previous versions | Consumers | +| `FORWARD` | Consumers using the **last schema version** V2 can process data written by producers using a new schema V3, even though they may not be able to use the full capabilities of the new schema. |
  • Add fields
  • Delete optional fields
  • | Latest version | Producers | +| `FORWARD_TRANSITIVE` | Consumers using **all previous schema versions** V2 or V1 can process data written by producers using a new schema V3. |
  • Add fields
  • Delete optional fields
  • | All previous versions | Producers | +| `FULL` | Schemas are both backward and forward compatible.
  • Consumers using the last schema V2 can process data written by producers using the new schema V3.
  • Consumers using the new schema V3 can process data written by producers using the last schema V2.
  • |
  • Modify optional fields
  • | Latest version | Any order | +| `FULL_TRANSITIVE` | Backward and forward compatible among schema V3, V2, and V1.
  • Consumers using the schema V3 can process data written by producers using schema V2 and V1.
  • Consumers using the schema V2 or V1 can process data written by producers using the schema V3.
  • |
  • Modify optional fields
  • | All previous versions | Any order | + +:::tip + +* The default schema compatibility check strategy varies depending on schema types. + * For Avro and JSON, the default one is `FULL`. + * For others, the default one is `ALWAYS_INCOMPATIBLE`. +* You can set schema compatibility check strategy at the topic, namespace or broker level. For how to set the strategy, see [here](admin-api-schemas.md#set-schema-compatibility-check-strategy). + +::: + +#### ALWAYS_COMPATIBLE example In some situations, an application needs to store events of several different types in the same Pulsar topic. @@ -87,11 +79,9 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is t Consequently, those events need to go in the same Pulsar partition to maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different kinds of events to co-exist in the same topic. -* Example 2 - - Sometimes we also make incompatible changes. +#### ALWAYS_INCOMPATIBLE example - For example, you are modifying a field type from `string` to `int`. + Sometimes we also make incompatible changes. For example, you are modifying a field type from `string` to `int`. In this case, you need to: @@ -99,16 +89,7 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is t * Optionally, create a new topic and start migrating applications to use the new topic and the new schema, avoiding the need to handle two incompatible versions in the same topic. -### BACKWARD and BACKWARD_TRANSITIVE - -Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest: - -| Compatibility check strategy | Definition | Description | -|------------------------------|------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------| -| `BACKWARD` | Consumers using the new schema can process data written by producers using the **last schema**. | The consumers using the schema V3 can process data written by producers using the schema V3 or V2. | -| `BACKWARD_TRANSITIVE` | Consumers using the new schema can process data written by producers using **all previous schemas**. | The consumers using the schema V3 can process data written by producers using the schema V3, V2, or V1. | - -#### Example +#### BACKWARD and BACKWARD_TRANSITIVE example * Example 1 @@ -122,17 +103,8 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is t Same SQL queries must continue to work even if the data is changed. To support it, you can evolve the schemas using the `BACKWARD` strategy. -### FORWARD and FORWARD_TRANSITIVE +#### FORWARD and FORWARD_TRANSITIVE example -Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest: - -| Compatibility check strategy | Definition | Description | -|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------| -| `FORWARD` | Consumers using the **last schema** can process data written by producers using a new schema, even though they may not be able to use the full capabilities of the new schema. | The consumers using the schema V3 or V2 can process data written by producers using the schema V3. | -| `FORWARD_TRANSITIVE` | Consumers using **all previous schemas** can process data written by producers using a new schema. | The consumers using the schema V3, V2, or V1 can process data written by producers using the schema V3. | - -#### Example - * Example 1 Add a field. @@ -147,40 +119,28 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is t Consequently, you can evolve the schemas using the `FORWARD` strategy to ensure that the old schema can process data encoded with the new schema. -### FULL and FULL_TRANSITIVE - -Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest: - -| Compatibility check strategy | Definition | Description | Note | -| --- | --- | --- | --- | -| `FULL` | Schemas are both backward and forward compatible, which means: Consumers using the last schema can process data written by producers using the new schema. AND Consumers using the new schema can process data written by producers using the last schema. | Consumers using the schema V3 can process data written by producers using the schema V3 or V2. AND Consumers using the schema V3 or V2 can process data written by producers using the schema V3. |
  • For Avro and JSON, the default schema compatibility check strategy is `FULL`.
  • For all schema types except Avro and JSON, the default schema compatibility check strategy is `ALWAYS_INCOMPATIBLE`.
  • | -| `FULL_TRANSITIVE` | The new schema is backward and forward compatible with all previously registered schemas. | Consumers using the schema V3 can process data written by producers using the schema V3, V2 or V1. AND Consumers using the schema V3, V2 or V1 can process data written by producers using the schema V3. | None | - -#### Example +#### FULL and FULL_TRANSITIVE example In some data formats, for example, Avro, you can define fields with default values. Consequently, adding or removing a field with a default value is a fully compatible change. -:::tip -You can set schema compatibility check strategy at the topic, namespace or broker level. For how to set the strategy, see [here](schema-manage.md#set-schema-compatibility-check-strategy). - -::: - -## Schema verification +## Schema validation When a producer or a consumer tries to connect to a topic, a broker performs some checks to verify a schema. -### Producer +### Validation on producers -When a producer tries to connect to a topic (suppose ignore the schema auto-creation), a broker does the following checks: +By default, `schemaValidationEnforced` is **disabled** for producers, which means: +* A producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic. +* It allows non-java language clients that don’t support schema can produce messages to a topic with schemas. -* Check if the schema carried by the producer exists in the schema registry or not. +However, if you want a stronger guarantee on the topics with schemas, you can enable `schemaValidationEnforced` across the whole cluster or on a per-namespace basis. +With `schemaValidationEnforced` enabled, When a producer tries to connect to a topic (suppose ignore the schema auto-creation), the broker checks if the schema carried by the producer exists in the schema registry or not. * If the schema is already registered, then the producer is connected to a broker and produces messages with that schema. - * If the schema is not registered, then Pulsar verifies if the schema is allowed to be registered based on the configured compatibility check strategy. -### Consumer +### Validation on consumers When a consumer tries to connect to a topic, a broker checks if a carried schema is compatible with a registered schema based on the configured schema compatibility check strategy. @@ -197,7 +157,7 @@ When a consumer tries to connect to a topic, a broker checks if a carried schema ## Order of upgrading clients -The order of upgrading client applications is determined by the compatibility check strategy. +The order of upgrading client applications is determined by the [schema compatibility check strategy](#schema-compatibility-check-strategy). For example, the producers use schemas to write data to Pulsar and the consumers use schemas to read data from Pulsar. @@ -207,8 +167,4 @@ For example, the producers use schemas to write data to Pulsar and the consumers | `ALWAYS_INCOMPATIBLE` | None | The schema evolution is disabled. | |
  • `BACKWARD`
  • `BACKWARD_TRANSITIVE`
  • | Consumers | There is no guarantee that consumers using the old schema can read data produced using the new schema. Consequently, **upgrade all consumers first**, and then start producing new data. | |
  • `FORWARD`
  • `FORWARD_TRANSITIVE`
  • | Producers | There is no guarantee that consumers using the new schema can read data produced using the old schema. Consequently, **upgrade all producers first**
  • to use the new schema and ensure that the data already produced using the old schemas are not available to consumers, and then upgrades the consumers.
  • | -|
  • `FULL`
  • `FULL_TRANSITIVE`
  • | Any order | It is guaranteed that consumers using the old schema can read data produced using the new schema and consumers using the new schema can read data produced using the old schema. Consequently, you can upgrade the producers and consumers in **any order**. | - - - - +|
  • `FULL`
  • `FULL_TRANSITIVE`
  • | Any order | It is guaranteed that consumers using the old schema can read data produced using the new schema and consumers using the new schema can read data produced using the old schema. Consequently, you can upgrade the producers and consumers in **any order**. | \ No newline at end of file diff --git a/site2/docs/schema-get-started.md b/site2/docs/schema-get-started.md index be7a288dc7da9..eaf39d86dc7db 100644 --- a/site2/docs/schema-get-started.md +++ b/site2/docs/schema-get-started.md @@ -4,92 +4,318 @@ title: Get started sidebar_label: "Get started" --- -This chapter introduces Pulsar schemas and explains why they are important. -## Schema Registry +````mdx-code-block +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; +```` -Type safety is extremely important in any application built around a message bus like Pulsar. -Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid various potential problems arising. For example, serialization and deserialization issues. +This hands-on tutorial provides instructions and examples on how to construct and customize schemas. -Applications typically adopt one of the following approaches to guarantee type safety in messaging. Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis. +## Construct a string schema -#### Note -> -> Currently, the Pulsar schema registry is only available for the [Java client](client-libraries-java.md), [Go client](client-libraries-go.md), [Python client](client-libraries-python.md), and [C++ client](client-libraries-cpp.md). +This example demonstrates how to construct a [string schema](schema-understand.md#primitive-type) and use it to produce and consume messages in Java. -### Client-side approach +1. Create a producer with a string schema and send messages. -Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. + ```java + Producer producer = client.newProducer(Schema.STRING).create(); + producer.newMessage().value("Hello Pulsar!").send(); + ``` -If a producer is sending temperature sensor data on the topic `topic-1`, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings. +2. Create a consumer with a string schema and receive messages. -Producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis. + ```java + Consumer consumer = client.newConsumer(Schema.STRING).subscribe(); + consumer.receive(); + ``` -### Server-side approach +## Construct a key/value schema -Producers and consumers inform the system which data types can be transmitted via the topic. +This example shows how to construct a [key/value schema](schema-understand.md#keyvalue-schema) and use it to produce and consume messages in Java. -With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced. +1. Construct a key/value schema with `INLINE` encoding type. -Pulsar has a built-in **schema registry** that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic. + ```java + Schema> kvSchema = Schema.KeyValue( + Schema.INT32, + Schema.STRING, + KeyValueEncodingType.INLINE + ); + ``` -## Why use schema +2. Optionally, construct a key/value schema with `SEPARATED` encoding type. -When a schema is enabled, Pulsar does parse data, it takes bytes as inputs and sends bytes as outputs. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations: + ```java + Schema> kvSchema = Schema.KeyValue( + Schema.INT32, + Schema.STRING, + KeyValueEncodingType.SEPARATED + ); + ``` -* The field does not exist +3. Produce messages using a key/value schema. -* The field type has changed (for example, `string` is changed to `int`) + ```java + Schema> kvSchema = Schema.KeyValue( + Schema.INT32, + Schema.STRING, + KeyValueEncodingType.SEPARATED + ); -There are a few methods to prevent and overcome these exceptions, for example, you can catch exceptions when parsing errors, which makes code hard to maintain; or you can adopt a schema management system to perform schema evolution, not to break downstream applications, and enforces type safety to max extend in the language you are using, the solution is Pulsar Schema. + Producer> producer = client.newProducer(kvSchema) + .topic(TOPIC) + .create(); -Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types like `string` to more complex application-specific types. + final int key = 100; + final String value = "value-100"; + + // send the key/value message + producer.newMessage() + .value(new KeyValue(key, value)) + .send(); + ``` + +4. Consume messages using a key/value schema. + + ```java + Schema> kvSchema = Schema.KeyValue( + Schema.INT32, + Schema.STRING, + KeyValueEncodingType.SEPARATED + ); + + Consumer> consumer = client.newConsumer(kvSchema) + ... + .topic(TOPIC) + .subscriptionName(SubscriptionName).subscribe(); + + // receive key/value pair + Message> msg = consumer.receive(); + KeyValue kv = msg.getValue(); + ``` + +## Construct a struct schema + +This example shows how to construct a [struct schema](schema-understand.md#struct-schema) and use it to produce and consume messages using different methods. + +````mdx-code-block + + + + +You can predefine the `struct` schema, which can be a POJO in Java, a `struct` in Go, or classes generated by Avro or Protobuf tools. + +**Example** + +Pulsar gets the schema definition from the predefined `struct` using an Avro library. The schema definition is the schema data stored as a part of the `SchemaInfo`. + +1. Create the _User_ class to define the messages sent to Pulsar topics. + + ```java + @Builder + @AllArgsConstructor + @NoArgsConstructor + public static class User { + String name; + int age; + } + ``` + +2. Create a producer with a `struct` schema and send messages. + + ```java + Producer producer = client.newProducer(Schema.AVRO(User.class)).create(); + producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send(); + ``` + +3. Create a consumer with a `struct` schema and receive messages + + ```java + Consumer consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe(); + User user = consumer.receive().getValue(); + ``` + + + + +Sometimes applications do not have pre-defined structs, and you can use this method to define schema and access data. + +You can define the `struct` schema using the `GenericSchemaBuilder`, generate a generic struct using `GenericRecordBuilder` and consume messages into `GenericRecord`. + +**Example** + +1. Use `RecordSchemaBuilder` to build a schema. + + ```java + RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName"); + recordSchemaBuilder.field("intField").type(SchemaType.INT32); + SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO); + + Producer producer = client.newProducer(Schema.generic(schemaInfo)).create(); + ``` + +2. Use `RecordBuilder` to build the struct records. + + ```java + producer.newMessage().value(schema.newRecordBuilder() + .set("intField", 32) + .build()).send(); + ``` + + + + +You can define the `schemaDefinition` to generate a `struct` schema. **Example** -You can use the _User_ class to define the messages sent to Pulsar topics. +1. Create the _User_ class to define the messages sent to Pulsar topics. + + ```java + @Builder + @AllArgsConstructor + @NoArgsConstructor + public static class User { + String name; + int age; + } + ``` + +2. Create a producer with a `SchemaDefinition` and send messages. + + ```java + SchemaDefinition schemaDefinition = SchemaDefinition.builder().withPojo(User.class).build(); + Producer producer = client.newProducer(Schema.AVRO(schemaDefinition)).create(); + producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send(); + ``` + +3. Create a consumer with a `SchemaDefinition` schema and receive messages + + ```java + SchemaDefinition schemaDefinition = SchemaDefinition.builder().withPojo(User.class).build(); + Consumer consumer = client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe(); + User user = consumer.receive().getValue(); + ``` + + + + +```` + +## Construct an AUTO_PRODUCE schema + +Suppose you have a Pulsar topic _P_, a producer processing messages from a Kafka topic _K_, an application reading the messages from _K_ and writing the messages to _P_. + +This example shows how construct an [AUTO_PRODUCE](schema-understand.md#auto-schema) schema to verify whether the bytes produced by _K_ can be sent to _P_. ```java -public class User { - String name; - int age; -} +Produce pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE()) + … + .create(); + +byte[] kafkaMessageBytes = … ; + +pulsarProducer.produce(kafkaMessageBytes); +``` + +## Construct an AUTO_CONSUME schema + +Suppose you have a Pulsar topic _P_, a consumer (for example, _MySQL_) receiving messages from the topic _P_, an application reading the messages from _P_ and writing the messages to _MySQL_. + +This example shows how construct an [AUTO_CONSUME schema](schema-understand.md#auto-schema) to verify whether the bytes produced by _P_ can be sent to _MySQL_. + +```java +Consumer pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME()) + … + .subscribe(); + +Message msg = consumer.receive() ; +GenericRecord record = msg.getValue(); +``` + +## Construct a native Avro schema + +This example shows how construct a [native Avro schema](schema-understand.md#native-avro-schema). + +```java +org.apache.avro.Schema nativeAvroSchema = … ; + +Producer producer = pulsarClient.newProducer().topic("ingress").create(); + +byte[] content = … ; + +producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send(); ``` -When constructing a producer with the _User_ class, you can specify a schema or not as below. +## Customize schema storage -### Without schema +By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar. Alternatively, you can use another storage system if needed. -If you construct a producer without specifying a schema, then the producer can only produce messages of type `byte[]`. If you have a POJO class, you need to serialize the POJO into bytes before sending messages. +To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces: +* [SchemaStorage interface](#schemastorage-interface) +* [SchemaStorageFactory interface](#schemastoragefactory-interface) -**Example** +### Implement SchemaStorage interface + +The `SchemaStorage` interface has the following methods: ```java -Producer producer = client.newProducer() - .topic(topic) - .create(); -User user = new User("Tom", 28); -byte[] message = … // serialize the `user` by yourself; -producer.send(message); +public interface SchemaStorage { + // How schemas are updated + CompletableFuture put(String key, byte[] value, byte[] hash); + + // How schemas are fetched from storage + CompletableFuture get(String key, SchemaVersion version); + + // How schemas are deleted + CompletableFuture delete(String key); + + // Utility method for converting a schema version byte array to a SchemaVersion object + SchemaVersion versionFromBytes(byte[] version); + + // Startup behavior for the schema storage client + void start() throws Exception; + + // Shutdown behavior for the schema storage client + void close() throws Exception; +} ``` -### With schema +:::tip -If you construct a producer with specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. +For a complete example of **schema storage** implementation, see [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class. -**Example** +::: -This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. +### Implement SchemaStorageFactory interface + +The `SchemaStorageFactory` interface has the following method: ```java -Producer producer = client.newProducer(JSONSchema.of(User.class)) - .topic(topic) - .create(); -User user = new User("Tom", 28); -producer.send(user); +public interface SchemaStorageFactory { + @NotNull + SchemaStorage create(PulsarService pulsar) throws Exception; +} ``` -### Summary +:::tip + +For a complete example of **schema storage factory** implementation, see [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class. + +::: + +### Deploy custom schema storage + +To use your custom schema storage implementation, perform the following steps. -When constructing a producer with a schema, you do not need to serialize messages into bytes, instead Pulsar schema does this job in the background. +1. Package the implementation in a [JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html) file. + +2. Add the JAR file to the `lib` folder in your Pulsar binary or source distribution. + +3. Change the `schemaRegistryStorageClassName` configuration in the `conf/broker.conf` file to your custom factory class. + +4. Start Pulsar. diff --git a/site2/docs/schema-manage.md b/site2/docs/schema-manage.md index 8c65ac4d7b488..69311d1077040 100644 --- a/site2/docs/schema-manage.md +++ b/site2/docs/schema-manage.md @@ -5,767 +5,7 @@ sidebar_label: "Manage schema" --- ````mdx-code-block -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; -```` - - -This guide demonstrates the ways to manage schemas: - -* Automatically - - * [Schema AutoUpdate](#schema-autoupdate) - -* Manually - - * [Schema manual management](#schema-manual-management) - - * [Custom schema storage](#custom-schema-storage) - -## Schema AutoUpdate - -If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default. - -### AutoUpdate for producer - -For a producer, the `AutoUpdate` happens in the following cases: - -* If a **topic doesn’t have a schema**, Pulsar registers a schema automatically. - -* If a **topic has a schema**: - - * If a **producer doesn’t carry a schema**: - - * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **disabled** in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data. - - * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **enabled** in the namespace to which the topic belongs, the producer is rejected and disconnected. - - * If a **producer carries a schema**: - - A broker performs the compatibility check based on the configured compatibility check strategy of the namespace to which the topic belongs. - - * If the schema is registered, a producer is connected to a broker. - - * If the schema is not registered: - - * If `isAllowAutoUpdateSchema` sets to **false**, the producer is rejected to connect to a broker. - - * If `isAllowAutoUpdateSchema` sets to **true**: - - * If the schema passes the compatibility check, then the broker registers a new schema automatically for the topic and the producer is connected. - - * If the schema does not pass the compatibility check, then the broker does not register a schema and the producer is rejected to connect to a broker. - -![AutoUpdate Producer](/assets/schema-producer.png) - -### AutoUpdate for consumer - -For a consumer, the `AutoUpdate` happens in the following cases: - -* If a **consumer connects to a topic without a schema** (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check. - -* If a **consumer connects to a topic with a schema**. - - * If a topic does not have all of them (a schema/data/a local consumer and a local producer): - - * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker. - - * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker. - - * If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed. - - * If the schema passes the compatibility check, then the consumer is connected to the broker. - - * If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker. - -![AutoUpdate Consumer](/assets/schema-consumer.png) - - -### Manage AutoUpdate strategy - -You can use the `pulsar-admin` command to manage the `AutoUpdate` strategy as below: - -* [Enable AutoUpdate](#enable-autoupdate) - -* [Disable AutoUpdate](#disable-autoupdate) - -* [Adjust compatibility](#adjust-compatibility) - -#### Enable AutoUpdate - -To enable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command. - -```bash -bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace -``` - -#### Disable AutoUpdate - -To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command. - -```bash -bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace -``` - -Once the `AutoUpdate` is disabled, you can only register a new schema using the `pulsar-admin` command. - -#### Adjust compatibility - -To adjust the schema compatibility level on a namespace, you can use the `pulsar-admin` command. - -```bash -bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility tenant/namespace -``` - -### Schema validation - -By default, `schemaValidationEnforced` is **disabled** for producers: - -* This means a producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic. - -* This allows non-java language clients that don’t support schema can produce messages to a topic with schemas. - -However, if you want a stronger guarantee on the topics with schemas, you can enable `schemaValidationEnforced` across the whole cluster or on a per-namespace basis. - -#### Enable schema validation - -To enable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command. - -```bash -bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace -``` - -#### Disable schema validation - -To disable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command. - -```bash -bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace -``` - -## Schema manual management - -To manage schemas, you can use one of the following methods. - -| Method | Description | -| --- | --- | -| **Admin CLI**
  • | You can use the `pulsar-admin` tool to manage Pulsar schemas, brokers, clusters, sources, sinks, topics, tenants and so on. For more information about how to use the `pulsar-admin` tool, see [here](/tools/pulsar-admin/). | -| **REST API**
  • | Pulsar exposes schema related management API in Pulsar’s admin RESTful API. You can access the admin RESTful endpoint directly to manage schemas. For more information about how to use the Pulsar REST API, see [here](/admin-rest-api/). | -| **Java Admin API**
  • | Pulsar provides Java admin library. | - -### Upload a schema - -To upload (register) a new schema for a topic, you can use one of the following methods. - -````mdx-code-block - - - - -Use the `upload` subcommand. - -```bash -pulsar-admin schemas upload --filename -``` - -The `schema-definition-file` is in JSON format. - -```json -{ - "type": "", - "schema": "", - "properties": {} // the properties associated with the schema -} -``` - -The `schema-definition-file` includes the following fields: - -| Field | Description | -| --- | --- | -| `type` | The schema type. | -| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | -| `properties` | The additional properties associated with the schema. | - -Here are examples of the `schema-definition-file` for a JSON schema. - -**Example 1** - -```json -{ - "type": "JSON", - "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}", - "properties": {} -} -``` - -**Example 2** - -```json -{ - "type": "STRING", - "schema": "", - "properties": { - "key1": "value1" - } -} -``` - -
    - - -Send a `POST` request to this endpoint: {@inject: endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchema?version=@pulsar:version_number@} - -The post payload is in JSON format. - -```json -{ - "type": "", - "schema": "", - "properties": {} // the properties associated with the schema -} -``` - -The post payload includes the following fields: - -| Field | Description | -| --- | --- | -| `type` | The schema type. | -| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | -| `properties` | The additional properties associated with the schema. | - -
    - - -```java -void createSchema(String topic, PostSchemaPayload schemaPayload) -``` - -The `PostSchemaPayload` includes the following fields: - -| Field | Description | -| --- | --- | -| `type` | The schema type. | -| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | -| `properties` | The additional properties associated with the schema. | - -Here is an example of `PostSchemaPayload`: - -```java -PulsarAdmin admin = …; - -PostSchemaPayload payload = new PostSchemaPayload(); -payload.setType("INT8"); -payload.setSchema(""); - -admin.createSchema("my-tenant/my-ns/my-topic", payload); -``` - -
    - -
    -```` - -### Get a schema (latest) - -To get the latest schema for a topic, you can use one of the following methods. - -````mdx-code-block - - - - -Use the `get` subcommand. - -```bash -pulsar-admin schemas get - -{ - "version": 0, - "type": "String", - "timestamp": 0, - "data": "string", - "properties": { - "property1": "string", - "property2": "string" - } -} -``` - - - - -Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema?version=@pulsar:version_number@} - -Here is an example of a response, which is returned in JSON format. - -```json -{ - "version": "", - "type": "", - "timestamp": "", - "data": "", - "properties": {} // the properties associated with the schema -} -``` - -The response includes the following fields: - -| Field | Description | -| --- | --- | -| `version` | The schema version, which is a long number. | -| `type` | The schema type. | -| `timestamp` | The timestamp of creating this version of schema. | -| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | -| `properties` | The additional properties associated with the schema. | - -
    - - -```java -SchemaInfo createSchema(String topic) -``` - -The `SchemaInfo` includes the following fields: - -| Field | Description | -| --- | --- | -| `name` | The schema name. | -| `type` | The schema type. | -| `schema` | A byte array of the schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this byte array should be empty.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | -| `properties` | The additional properties associated with the schema. | - -Here is an example of `SchemaInfo`: - -```java -PulsarAdmin admin = …; - -SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic"); -``` - -
    - -
    -```` - -### Get a schema (specific) - -To get a specific version of a schema, you can use one of the following methods. - -````mdx-code-block - - - - -Use the `get` subcommand. - -```bash -pulsar-admin schemas get --version= -``` - - - - -Send a `GET` request to a schema endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchema?version=@pulsar:version_number@} - -Here is an example of a response, which is returned in JSON format. - -```json -{ - "version": "", - "type": "", - "timestamp": "", - "data": "", - "properties": {} // the properties associated with the schema -} -``` - -The response includes the following fields: - -| Field | Description | -| --- | --- | -| `version` | The schema version, which is a long number. | -| `type` | The schema type. | -| `timestamp` | The timestamp of creating this version of schema. | -| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | -| `properties` | The additional properties associated with the schema. | - -
    - - -```java -SchemaInfo createSchema(String topic, long version) -``` - -The `SchemaInfo` includes the following fields: - -| Field | Description | -| --- | --- | -| `name` | The schema name. | -| `type` | The schema type. | -| `schema` | A byte array of the schema definition data, which is encoded in UTF 8.
  • If the schema is a
  • **primitive**
  • schema, this byte array should be empty.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | -| `properties` | The additional properties associated with the schema. | - -Here is an example of `SchemaInfo`: - -```java -PulsarAdmin admin = …; - -SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L); -``` - -
    - -
    -```` - -### Extract a schema - -To provide a schema via a topic, you can use the following method. - -````mdx-code-block - - - - -Use the `extract` subcommand. - -```bash -pulsar-admin schemas extract --classname --jar --type -``` - - - - -```` - -### Delete a schema - -To delete a schema for a topic, you can use one of the following methods. - -:::note - -In any case, the **delete** action deletes **all versions** of a schema registered for a topic. - -::: - -````mdx-code-block - - - - -Use the `delete` subcommand. - -```bash -pulsar-admin schemas delete -``` - - - - -Send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema?version=@pulsar:version_number@} - -Here is an example of a response, which is returned in JSON format. - -```json -{ - "version": "", -} -``` - -The response includes the following field: - -Field | Description | ----|---| -`version` | The schema version, which is a long number. | - - - - -```java -void deleteSchema(String topic) -``` - -Here is an example of deleting a schema. - -```java -PulsarAdmin admin = …; - -admin.deleteSchema("my-tenant/my-ns/my-topic"); -``` - - - - -```` - -## Custom schema storage - -By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar. - -However, you can use another storage system if needed. - -### Implement - -To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces: - -* [SchemaStorage interface](#schemastorage-interface) - -* [SchemaStorageFactory interface](#schemastoragefactory-interface) - -#### SchemaStorage interface - -The `SchemaStorage` interface has the following methods: - -```java -public interface SchemaStorage { - // How schemas are updated - CompletableFuture put(String key, byte[] value, byte[] hash); - - // How schemas are fetched from storage - CompletableFuture get(String key, SchemaVersion version); - - // How schemas are deleted - CompletableFuture delete(String key); - - // Utility method for converting a schema version byte array to a SchemaVersion object - SchemaVersion versionFromBytes(byte[] version); - - // Startup behavior for the schema storage client - void start() throws Exception; - - // Shutdown behavior for the schema storage client - void close() throws Exception; -} -``` - -:::tip - -For a complete example of **schema storage** implementation, see [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class. - -::: - -#### SchemaStorageFactory interface - -The `SchemaStorageFactory` interface has the following method: - -```java -public interface SchemaStorageFactory { - @NotNull - SchemaStorage create(PulsarService pulsar) throws Exception; -} -``` - -:::tip - -For a complete example of **schema storage factory** implementation, see [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class. - -::: - -### Deploy - -To use your custom schema storage implementation, perform the following steps. - -1. Package the implementation in a [JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html) file. - -2. Add the JAR file to the `lib` folder in your Pulsar binary or source distribution. - -3. Change the `schemaRegistryStorageClassName` configuration in `broker.conf` to your custom factory class. - -4. Start Pulsar. - -## Set schema compatibility check strategy - -You can set [schema compatibility check strategy](schema-evolution-compatibility.md#schema-compatibility-check-strategy) at the topic, namespace or broker level. - -The schema compatibility check strategy set at different levels has priority: topic level > namespace level > broker level. - -- If you set the strategy at both topic and namespace level, it uses the topic-level strategy. - -- If you set the strategy at both namespace and broker level, it uses the namespace-level strategy. - -- If you do not set the strategy at any level, it uses the `FULL` strategy. For all available values, see [here](schema-evolution-compatibility.md#schema-compatibility-check-strategy). - - -### Topic level - -To set a schema compatibility check strategy at the topic level, use one of the following methods. - -````mdx-code-block - - - - -Use the [`pulsar-admin topicPolicies set-schema-compatibility-strategy`](/tools/pulsar-admin/) command. - -```shell -pulsar-admin topicPolicies set-schema-compatibility-strategy -``` - - - - -Send a `PUT` request to this endpoint: {@inject: endpoint|PUT|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@} - - - - -```java -void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy) -``` - -Here is an example of setting a schema compatibility check strategy at the topic level. - -```java -PulsarAdmin admin = …; - -admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); -``` - - - - -```` -
    -To get the topic-level schema compatibility check strategy, use one of the following methods. - -````mdx-code-block - - - - -Use the [`pulsar-admin topicPolicies get-schema-compatibility-strategy`](/tools/pulsar-admin/) command. - -```shell -pulsar-admin topicPolicies get-schema-compatibility-strategy -``` - - - - -Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@} - - - - -```java -SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied) -``` - -Here is an example of getting the topic-level schema compatibility check strategy. - -```java -PulsarAdmin admin = …; - -// get the current applied schema compatibility strategy -admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", true); - -// only get the schema compatibility strategy from topic policies -admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", false); -``` - - - - -```` -
    -To remove the topic-level schema compatibility check strategy, use one of the following methods. - -````mdx-code-block - - - - -Use the [`pulsar-admin topicPolicies remove-schema-compatibility-strategy`](/tools/pulsar-admin/) command. - -```shell -pulsar-admin topicPolicies remove-schema-compatibility-strategy -``` - - - - -Send a `DELETE` request to this endpoint: {@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@} - - - - -```java -void removeSchemaCompatibilityStrategy(String topic) -``` - -Here is an example of removing the topic-level schema compatibility check strategy. - -```java -PulsarAdmin admin = …; - -admin.removeSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic"); -``` - - - - -```` - - -### Namespace level - -You can set schema compatibility check strategy at namespace level using one of the following methods. - -````mdx-code-block - - - - -Use the [`pulsar-admin namespaces set-schema-compatibility-strategy`](/tools/pulsar-admin/) command. - -```shell -pulsar-admin namespaces set-schema-compatibility-strategy options -``` - - - - -Send a `PUT` request to this endpoint: {@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@} - - - - -Use the [`setSchemaCompatibilityStrategy`](/api/admin/)method. - -```java -admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL); -``` - - - - -```` - -### Broker level - -You can set schema compatibility check strategy at broker level by setting `schemaCompatibilityStrategy` in [`broker.conf`](https://github.com/apache/pulsar/blob/f24b4890c278f72a67fe30e7bf22dc36d71aac6a/conf/broker.conf#L1240) or [`standalone.conf`](https://github.com/apache/pulsar/blob/master/conf/standalone.conf) file. - -**Example** - -```conf -schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE -``` +import {Redirect} from '@docusaurus/router'; + +```` \ No newline at end of file diff --git a/site2/docs/schema-overview.md b/site2/docs/schema-overview.md new file mode 100644 index 0000000000000..e4308b0779810 --- /dev/null +++ b/site2/docs/schema-overview.md @@ -0,0 +1,153 @@ +--- +id: schema-overview +title: Pulsar Schema overview +sidebar_label: "Overview" +--- + +This section introduces the following content: +* [What is Pulsar Schema](#what-is-pulsar-schema) +* [Why use it](#why-use-it) +* [How it works](#how-it-works) +* [What's next?](#whats-next) + + +## What is Pulsar Schema + +Pulsar schema is a data structure to define how to serialize/deserialize data and how to evolve your data format with backward compatibility. + + +## Why use it + +When a schema is enabled, Pulsar does parse data, it takes bytes as inputs and sends bytes as outputs. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations: +* The field does not exist. +* The field type has changed (for example, `string` is changed to `int`). + +There are a few methods to prevent and overcome these exceptions, for example, you can catch exceptions when parsing errors, which makes code hard to maintain; or you can adopt a schema management system to perform schema evolution, not to break downstream applications, and enforces type safety to max extend in the language you are using, the solution is Pulsar Schema. + +Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types like `string` to more complex application-specific types. + +**Example** + +You can use the _User_ class to define the messages sent to Pulsar topics. + +```java +public class User { + String name; + int age; +} +``` + +When constructing a producer with the _User_ class, you can specify a schema or not as below. + +### Without schema + +If you construct a producer without specifying a schema, then the producer can only produce messages of type `byte[]`. If you have a POJO class, you need to serialize the POJO into bytes before sending messages. + +**Example** + +```java +Producer producer = client.newProducer() + .topic(topic) + .create(); +User user = new User("Tom", 28); +byte[] message = … // serialize the `user` by yourself; +producer.send(message); +``` + +### With schema + +If you construct a producer with specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. + +**Example** + +This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. + +```java +Producer producer = client.newProducer(JSONSchema.of(User.class)) + .topic(topic) + .create(); +User user = new User("Tom", 28); +producer.send(user); +``` + +## How it works + +Pulsar schemas are applied and enforced at the **topic** level (schemas cannot be applied at the namespace or tenant level). + +Producers and consumers upload schemas to brokers, so Pulsar schemas work on the producer side and the consumer side. + +### Producer side + +This diagram illustrates how does schema work on the Producer side. + +![Schema works at the producer side](/assets/schema-producer.png) + +1. The application uses a schema instance to construct a producer instance. + + The schema instance defines the schema for the data being produced using the producer instance. + + Take AVRO as an example, Pulsar extracts schema definition from the POJO class and constructs the `SchemaInfo` that the producer needs to pass to a broker when it connects. + +2. The producer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance. + +3. The broker looks up the schema in the schema storage to check if it is already a registered schema. + +4. If yes, the broker skips the schema validation since it is a known schema, and returns the schema version to the producer. + +5. If no, the broker verifies whether a schema can be automatically created in this namespace: + + * If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic. + + * If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be created, and the producer is rejected to connect to the broker. + +:::tip + +`isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST API.** + +For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage AutoUpdate Strategy](admin-api-schemas.md/#manage-autoupdate-strategy). + +::: + +6. If the schema is allowed to be updated, then the compatible strategy check is performed. + + * If the schema is compatible, the broker stores it and returns the schema version to the producer. All the messages produced by this producer are tagged with the schema version. + + * If the schema is incompatible, the broker rejects it. + +### Consumer side + +This diagram illustrates how does Schema work on the consumer side. + +![Schema works at the consumer side](/assets/schema-consumer.png) + +1. The application uses a schema instance to construct a consumer instance. + + The schema instance defines the schema that the consumer uses for decoding messages received from a broker. + +2. The consumer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance. + +3. The broker determines whether the topic has one of them (a schema/data/a local consumer and a local producer). + +4. If a topic does not have all of them (a schema/data/a local consumer and a local producer): + + * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker. + + * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker. + +5. If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed. + + * If the schema passes the compatibility check, then the consumer is connected to the broker. + + * If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker. + +6. The consumer receives messages from the broker. + + If the schema used by the consumer supports schema versioning (for example, AVRO schema), the consumer fetches the `SchemaInfo` of the version tagged in messages and uses the passed-in schema and the schema tagged in messages to decode the messages. + + +## What's next? + +* [Understand basic concepts](schema-understand.md) +* [Schema evolution and compatibility](schema-evolution-compatibility.md) +* [Get started](schema-get-started.md) +* [Manage schema](admin-api-schemas.md) diff --git a/site2/docs/schema-understand.md b/site2/docs/schema-understand.md index ed65c619ecd1e..97a2f58b4b0b5 100644 --- a/site2/docs/schema-understand.md +++ b/site2/docs/schema-understand.md @@ -10,20 +10,18 @@ import TabItem from '@theme/TabItem'; ```` -This chapter explains the basic concepts of Pulsar schema, focuses on the topics of particular importance, and provides additional background. +This section explains the basic concepts of Pulsar schema and provides additional reference. -## SchemaInfo +## Definition -Pulsar schema is defined in a data structure called `SchemaInfo`. - -The `SchemaInfo` is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level. +Pulsar schema is defined in a data structure called `SchemaInfo`. It is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level. A `SchemaInfo` consists of the following fields: | Field | Description | | --- | --- | | `name` | Schema name (a string). | -| `type` | Schema type, which determines how to interpret the schema data.
  • Predefined schema: see [here](schema-understand.md#schema-type).
  • Customized schema: it is left as an empty string.
  • | +| `type` | Schema type, which determines how to interpret the schema data.
  • Predefined schema: see [here](#schema-type).
  • Customized schema: it is left as an empty string.
  • | | `schema`(`payload`) | Schema data, which is a sequence of 8-bit unsigned bytes and schema-type specific. | | `properties` | It is a user defined properties as a string/string map. Applications can use this bag for carrying any application specific logics. Possible properties might be the Git hash associated with the schema, an environment string like `dev` or `prod`. | @@ -43,74 +41,36 @@ This is the `SchemaInfo` of a string. ## Schema type Pulsar supports various schema types, which are mainly divided into two categories: - -* Primitive type - -* Complex type +* [Primitive type](#primitive-type) +* [Complex type](#complex-type) ### Primitive type -Currently, Pulsar supports the following primitive types: - -| Primitive Type | Description | -|---|---| -| `BOOLEAN` | A binary value | -| `INT8` | A 8-bit signed integer | -| `INT16` | A 16-bit signed integer | -| `INT32` | A 32-bit signed integer | -| `INT64` | A 64-bit signed integer | -| `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number | -| `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number | -| `BYTES` | A sequence of 8-bit unsigned bytes | -| `STRING` | A Unicode character sequence | -| `TIMESTAMP` (`DATE`, `TIME`) | A logic type represents a specific instant in time with millisecond precision.
    It stores the number of milliseconds since `January 1, 1970, 00:00:00 GMT` as an `INT64` value | -| INSTANT | A single instantaneous point on the time-line with nanoseconds precision| -| LOCAL_DATE | An immutable date-time object that represents a date, often viewed as year-month-day| -| LOCAL_TIME | An immutable date-time object that represents a time, often viewed as hour-minute-second. Time is represented to nanosecond precision.| -| LOCAL_DATE_TIME | An immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second | - -For primitive types, Pulsar does not store any schema data in `SchemaInfo`. The `type` in `SchemaInfo` is used to determine how to serialize and deserialize the data. +The following table outlines the primitive types that Pulsar schema supports, and the conversions between **schema types** and **language-specific primitive types**. + +| Primitive Type | Description | Java Type| Python Type | Go Type | +|---|---|---|---|---| +| `BOOLEAN` | A binary value | boolean | bool | bool | +| `INT8` | A 8-bit signed integer | byte | | int8 | +| `INT16` | A 16-bit signed integer | short | | int16 | +| `INT32` | A 32-bit signed integer | int | | int32 | +| `INT64` | A 64-bit signed integer | long | | int64 | +| `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number | float | float | float32 | +| `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number | double | float | float64| +| `BYTES` | A sequence of 8-bit unsigned bytes | byte[], ByteBuffer, ByteBuf | bytes | []byte | +| `STRING` | A Unicode character sequence | string | str | string| +| `TIMESTAMP` (`DATE`, `TIME`) | A logic type represents a specific instant in time with millisecond precision.
    It stores the number of milliseconds since `January 1, 1970, 00:00:00 GMT` as an `INT64` value | java.sql.Timestamp (java.sql.Time, java.util.Date) | | | +| INSTANT | A single instantaneous point on the time-line with nanoseconds precision| java.time.Instant | | | +| LOCAL_DATE | An immutable date-time object that represents a date, often viewed as year-month-day| java.time.LocalDate | | | +| LOCAL_TIME | An immutable date-time object that represents a time, often viewed as hour-minute-second. Time is represented to nanosecond precision.| java.time.LocalDateTime | | +| LOCAL_DATE_TIME | An immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second | java.time.LocalTime | | + +For primitive types, Pulsar does not store any schema data in `SchemaInfo`. The `type` in `SchemaInfo` determines how to serialize and deserialize the data. Some of the primitive schema implementations can use `properties` to store implementation-specific tunable settings. For example, a `string` schema can use `properties` to store the encoding charset to serialize and deserialize strings. -The conversions between **Pulsar schema types** and **language-specific primitive types** are as below. - -| Schema Type | Java Type| Python Type | Go Type | -|---|---|---|---| -| BOOLEAN | boolean | bool | bool | -| INT8 | byte | | int8 | -| INT16 | short | | int16 | -| INT32 | int | | int32 | -| INT64 | long | | int64 | -| FLOAT | float | float | float32 | -| DOUBLE | double | float | float64| -| BYTES | byte[], ByteBuffer, ByteBuf | bytes | []byte | -| STRING | string | str | string| -| TIMESTAMP | java.sql.Timestamp | | | -| TIME | java.sql.Time | | | -| DATE | java.util.Date | | | -| INSTANT | java.time.Instant | | | -| LOCAL_DATE | java.time.LocalDate | | | -| LOCAL_TIME | java.time.LocalDateTime | | -| LOCAL_DATE_TIME | java.time.LocalTime | | - -**Example** - -This example demonstrates how to use a string schema. +For more instructions, see [Construct a string schema](schema-get-started.md#construct-a-string-schema). -1. Create a producer with a string schema and send messages. - - ```java - Producer producer = client.newProducer(Schema.STRING).create(); - producer.newMessage().value("Hello Pulsar!").send(); - ``` - -2. Create a consumer with a string schema and receive messages. - - ```java - Consumer consumer = client.newConsumer(Schema.STRING).subscribe(); - consumer.receive(); - ``` ### Complex type @@ -121,109 +81,15 @@ Currently, Pulsar supports the following complex types: | `keyvalue` | Represents a complex type of a key/value pair. | | `struct` | Handles structured data. It supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. | -#### keyvalue - -`Keyvalue` schema helps applications define schemas for both key and value. - -For `SchemaInfo` of `keyvalue` schema, Pulsar stores the `SchemaInfo` of key schema and the `SchemaInfo` of value schema together. - -Pulsar provides the following methods to encode a key/value pair in messages: - -* `INLINE` +#### keyvalue schema -* `SEPARATED` +`Keyvalue` schema helps applications define schemas for both key and value. Pulsar stores the `SchemaInfo` of key schema and the `SchemaInfo` of value schema together. -You can choose the encoding type when constructing the key/value schema. +You can choose the encoding type when constructing the key/value schema.: +* `INLINE` - Key/value pairs are encoded together in the message payload. +* `SEPARATED` - see [Construct a key/value schema](schema-get-started.md#construct-a-keyvalue-schema). -````mdx-code-block - - - - -Key/value pairs are encoded together in the message payload. - - - - -Key is encoded in the message key and the value is encoded in the message payload. - -**Example** - -This example shows how to construct a key/value schema and then use it to produce and consume messages. - -1. Construct a key/value schema with `INLINE` encoding type. - - ```java - Schema> kvSchema = Schema.KeyValue( - Schema.INT32, - Schema.STRING, - KeyValueEncodingType.INLINE - ); - ``` - -2. Optionally, construct a key/value schema with `SEPARATED` encoding type. - - ```java - Schema> kvSchema = Schema.KeyValue( - Schema.INT32, - Schema.STRING, - KeyValueEncodingType.SEPARATED - ); - ``` - -3. Produce messages using a key/value schema. - - ```java - Schema> kvSchema = Schema.KeyValue( - Schema.INT32, - Schema.STRING, - KeyValueEncodingType.SEPARATED - ); - - Producer> producer = client.newProducer(kvSchema) - .topic(TOPIC) - .create(); - - final int key = 100; - final String value = "value-100"; - - // send the key/value message - producer.newMessage() - .value(new KeyValue(key, value)) - .send(); - ``` - -4. Consume messages using a key/value schema. - - ```java - Schema> kvSchema = Schema.KeyValue( - Schema.INT32, - Schema.STRING, - KeyValueEncodingType.SEPARATED - ); - - Consumer> consumer = client.newConsumer(kvSchema) - ... - .topic(TOPIC) - .subscriptionName(SubscriptionName).subscribe(); - - // receive key/value pair - Message> msg = consumer.receive(); - KeyValue kv = msg.getValue(); - ``` - - - - -```` - -#### struct - -This section describes the details of type and usage of the `struct` schema. - -##### Type +#### struct schema `struct` schema supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. @@ -232,201 +98,24 @@ This section describes the details of type and usage of the `struct` schema. `AvroBaseStructSchema`|Pulsar uses [Avro Specification](http://avro.apache.org/docs/current/spec.html) to declare the schema definition for `AvroBaseStructSchema`, which supports `AvroSchema`, `JsonSchema`, and `ProtobufSchema`.

    This allows Pulsar:
    - to use the same tools to manage schema definitions
    - to use different serialization or deserialization methods to handle data| `ProtobufNativeSchema`|`ProtobufNativeSchema` is based on protobuf native Descriptor.

    This allows Pulsar:
    - to use native protobuf-v3 to serialize or deserialize data
    - to use `AutoConsume` to deserialize data. -##### Usage - -Pulsar provides the following methods to use the `struct` schema: - +Pulsar provides the following methods to [use the `struct` schema](schema-get-started.md#construct-a-struct-schema): * `static` - * `generic` - * `SchemaDefinition` -````mdx-code-block - - - - -You can predefine the `struct` schema, which can be a POJO in Java, a `struct` in Go, or classes generated by Avro or Protobuf tools. - -**Example** - -Pulsar gets the schema definition from the predefined `struct` using an Avro library. The schema definition is the schema data stored as a part of the `SchemaInfo`. - -1. Create the _User_ class to define the messages sent to Pulsar topics. - - ```java - @Builder - @AllArgsConstructor - @NoArgsConstructor - public static class User { - String name; - int age; - } - ``` - -2. Create a producer with a `struct` schema and send messages. - - ```java - Producer producer = client.newProducer(Schema.AVRO(User.class)).create(); - producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send(); - ``` - -3. Create a consumer with a `struct` schema and receive messages - - ```java - Consumer consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe(); - User user = consumer.receive().getValue(); - ``` - - - - -Sometimes applications do not have pre-defined structs, and you can use this method to define schema and access data. - -You can define the `struct` schema using the `GenericSchemaBuilder`, generate a generic struct using `GenericRecordBuilder` and consume messages into `GenericRecord`. - -**Example** - -1. Use `RecordSchemaBuilder` to build a schema. - - ```java - RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName"); - recordSchemaBuilder.field("intField").type(SchemaType.INT32); - SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO); - - Producer producer = client.newProducer(Schema.generic(schemaInfo)).create(); - ``` - -2. Use `RecordBuilder` to build the struct records. - - ```java - producer.newMessage().value(schema.newRecordBuilder() - .set("intField", 32) - .build()).send(); - ``` - - - - -You can define the `schemaDefinition` to generate a `struct` schema. - -**Example** - -1. Create the _User_ class to define the messages sent to Pulsar topics. - - ```java - @Builder - @AllArgsConstructor - @NoArgsConstructor - public static class User { - String name; - int age; - } - ``` - -2. Create a producer with a `SchemaDefinition` and send messages. - - ```java - SchemaDefinition schemaDefinition = SchemaDefinition.builder().withPojo(User.class).build(); - Producer producer = client.newProducer(Schema.AVRO(schemaDefinition)).create(); - producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send(); - ``` - -3. Create a consumer with a `SchemaDefinition` schema and receive messages - - ```java - SchemaDefinition schemaDefinition = SchemaDefinition.builder().withPojo(User.class).build(); - Consumer consumer = client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe(); - User user = consumer.receive().getValue(); - ``` - - - - -```` - ### Auto Schema If you don't know the schema type of a Pulsar topic in advance, you can use AUTO schema to produce or consume generic records to or from brokers. -| Auto Schema Type | Description | -|---|---| -| `AUTO_PRODUCE` | This is useful for transferring data **from a producer to a Pulsar topic that has a schema**. | -| `AUTO_CONSUME` | This is useful for transferring data **from a Pulsar topic that has a schema to a consumer**. | - -#### AUTO_PRODUCE - -`AUTO_PRODUCE` schema helps a producer validate whether the bytes sent by the producer is compatible with the schema of a topic. - -**Example** - -Suppose that: - -* You have a producer processing messages from a Kafka topic _K_. - -* You have a Pulsar topic _P_, and you do not know its schema type. - -* Your application reads the messages from _K_ and writes the messages to _P_. - -In this case, you can use `AUTO_PRODUCE` to verify whether the bytes produced by _K_ can be sent to _P_ or not. - -```java -Produce pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE()) - … - .create(); - -byte[] kafkaMessageBytes = … ; - -pulsarProducer.produce(kafkaMessageBytes); -``` - -#### AUTO_CONSUME - -`AUTO_CONSUME` schema helps a Pulsar topic validate whether the bytes sent by a Pulsar topic is compatible with a consumer, that is, the Pulsar topic deserializes messages into language-specific objects using the `SchemaInfo` retrieved from broker-side. - -Currently, `AUTO_CONSUME` supports AVRO, JSON and ProtobufNativeSchema schemas. It deserializes messages into `GenericRecord`. - -**Example** - -Suppose that: - -* You have a Pulsar topic _P_. - -* You have a consumer (for example, MySQL) receiving messages from the topic _P_. - -* Your application reads the messages from _P_ and writes the messages to MySQL. - -In this case, you can use `AUTO_CONSUME` to verify whether the bytes produced by _P_ can be sent to MySQL or not. - -```java -Consumer pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME()) - … - .subscribe(); - -Message msg = consumer.receive() ; -GenericRecord record = msg.getValue(); -``` +Auto schema contains two categories: +* `AUTO_PRODUCE` transfers data from a producer to a Pulsar topic that has a schema and helps the producer validate whether the out-bound bytes is compatible with the schema of the topic. For more instructions, see [Construct an AUTO_PRODUCE schema](schema-get-started.md#construct-an-auto_produce-schema). +* `AUTO_CONSUME` transfers data from a Pulsar topic that has a schema to a consumer and helps the topic validate whether the out-bound bytes is compatible with the consumer. In other words, the topic deserializes messages into language-specific objects `GenericRecord` using the `SchemaInfo` retrieved from brokers. Currently, `AUTO_CONSUME` supports AVRO, JSON and ProtobufNativeSchema schemas. For more instructions, see [Construct an AUTO_CONSUME schema](schema-get-started.md#construct-an-auto_consume-schema). ### Native Avro Schema When migrating or ingesting event or message data from external systems (such as Kafka and Cassandra), the events are often already serialized in Avro format. The applications producing the data typically have validated the data against their schemas (including compatibility checks) and stored them in a database or a dedicated service (such as a schema registry). The schema of each serialized data record is usually retrievable by some metadata attached to that record. In such cases, a Pulsar producer doesn't need to repeat the schema validation step when sending the ingested events to a topic. All it needs to do is passing each message or event with its schema to Pulsar. -Hence, we provide `Schema.NATIVE_AVRO` to wrap a native Avro schema of type `org.apache.avro.Schema`. The result is a schema instance of Pulsar that accepts a serialized Avro payload without validating it against the wrapped Avro schema. - -**Example** - -```java -org.apache.avro.Schema nativeAvroSchema = … ; - -Producer producer = pulsarClient.newProducer().topic("ingress").create(); - -byte[] content = … ; - -producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send(); -``` +Hence, we provide `Schema.NATIVE_AVRO` to wrap a native Avro schema of type `org.apache.avro.Schema`. The result is a schema instance of Pulsar that accepts a serialized Avro payload without validating it against the wrapped Avro schema. See for more details. ## Schema version @@ -462,78 +151,90 @@ The table below lists the possible scenarios when this connection attempt occurs |
  • No schema exists for the topic.
  • | (1) The producer is created using the given schema. (2) Since no existing schema is compatible with the `SensorReading` schema, the schema is transmitted to the broker and stored. (3) Any consumer created using the same schema or topic can consume messages from the `sensor-data` topic. | |
  • A schema already exists.
  • The producer connects using the same schema that is already stored.
  • | (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible. (3) The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it is used to tag produced messages. |
  • A schema already exists.
  • The producer connects using a new schema that is compatible.
  • | (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number). | -## How does schema work +## Schema Registry -Pulsar schemas are applied and enforced at the **topic** level (schemas cannot be applied at the namespace or tenant level). +Type safety is extremely important in any application built around a message bus like Pulsar. -Producers and consumers upload schemas to brokers, so Pulsar schemas work on the producer side and the consumer side. +Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid various potential problems arising. For example, serialization and deserialization issues. -### Producer side +Applications typically adopt one of the following approaches to guarantee type safety in messaging. Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis. -This diagram illustrates how does schema work on the Producer side. +:::note -![Schema works at the producer side](/assets/schema-producer.png) +Currently, the Pulsar schema registry is only available for the [Java client](client-libraries-java.md), [Go client](client-libraries-go.md), [Python client](client-libraries-python.md), and [C++ client](client-libraries-cpp.md). -1. The application uses a schema instance to construct a producer instance. +::: - The schema instance defines the schema for the data being produced using the producer instance. +### Client-side approach - Take AVRO as an example, Pulsar extracts schema definition from the POJO class and constructs the `SchemaInfo` that the producer needs to pass to a broker when it connects. +Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. -2. The producer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance. - -3. The broker looks up the schema in the schema storage to check if it is already a registered schema. - -4. If yes, the broker skips the schema validation since it is a known schema, and returns the schema version to the producer. +If a producer is sending temperature sensor data on the topic `topic-1`, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings. -5. If no, the broker verifies whether a schema can be automatically created in this namespace: +Producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis. - * If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic. - - * If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be created, and the producer is rejected to connect to the broker. - -:::tip +### Server-side approach -`isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST API.** +Producers and consumers inform the system which data types can be transmitted via the topic. -For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage AutoUpdate Strategy](schema-manage.md/#manage-autoupdate-strategy). +With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced. -::: +Pulsar has a built-in **schema registry** that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic. -6. If the schema is allowed to be updated, then the compatible strategy check is performed. - - * If the schema is compatible, the broker stores it and returns the schema version to the producer. +## Schema AutoUpdate + +If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default. + +### AutoUpdate for producer - All the messages produced by this producer are tagged with the schema version. +For a producer, the `AutoUpdate` happens in the following cases: - * If the schema is incompatible, the broker rejects it. +* If a **topic doesn’t have a schema**, Pulsar registers a schema automatically. -### Consumer side +* If a **topic has a schema**: -This diagram illustrates how does Schema work on the consumer side. + * If a **producer doesn’t carry a schema**: -![Schema works at the consumer side](/assets/schema-consumer.png) + * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **disabled** in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data. + + * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **enabled** in the namespace to which the topic belongs, the producer is rejected and disconnected. -1. The application uses a schema instance to construct a consumer instance. + * If a **producer carries a schema**: + + A broker performs the compatibility check based on the configured compatibility check strategy of the namespace to which the topic belongs. + + * If the schema is registered, a producer is connected to a broker. + + * If the schema is not registered: + + * If `isAllowAutoUpdateSchema` sets to **false**, the producer is rejected to connect to a broker. + + * If `isAllowAutoUpdateSchema` sets to **true**: - The schema instance defines the schema that the consumer uses for decoding messages received from a broker. + * If the schema passes the compatibility check, then the broker registers a new schema automatically for the topic and the producer is connected. + + * If the schema does not pass the compatibility check, then the broker does not register a schema and the producer is rejected to connect to a broker. -2. The consumer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance. +![AutoUpdate Producer](/assets/schema-producer.png) -3. The broker determines whether the topic has one of them (a schema/data/a local consumer and a local producer). +### AutoUpdate for consumer -4. If a topic does not have all of them (a schema/data/a local consumer and a local producer): - - * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker. - - * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker. - -5. If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed. - - * If the schema passes the compatibility check, then the consumer is connected to the broker. - - * If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker. +For a consumer, the `AutoUpdate` happens in the following cases: -6. The consumer receives messages from the broker. +* If a **consumer connects to a topic without a schema** (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check. - If the schema used by the consumer supports schema versioning (for example, AVRO schema), the consumer fetches the `SchemaInfo` of the version tagged in messages and uses the passed-in schema and the schema tagged in messages to decode the messages. +* If a **consumer connects to a topic with a schema**. + + * If a topic does not have all of them (a schema/data/a local consumer and a local producer): + + * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker. + + * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker. + + * If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed. + + * If the schema passes the compatibility check, then the consumer is connected to the broker. + + * If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker. + +![AutoUpdate Consumer](/assets/schema-consumer.png) \ No newline at end of file From e114bdb60c40b9381a8ef82298a320cc31788508 Mon Sep 17 00:00:00 2001 From: momo-jun Date: Fri, 28 Oct 2022 21:59:07 +0800 Subject: [PATCH 02/10] update sidebar --- site2/website/sidebars.json | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/site2/website/sidebars.json b/site2/website/sidebars.json index fa21f2fbceac2..58f890615ecdd 100644 --- a/site2/website/sidebars.json +++ b/site2/website/sidebars.json @@ -38,10 +38,10 @@ "type": "category", "label": "Pulsar Schema", "items": [ - "schema-get-started", + "schema-overview", "schema-understand", - "schema-evolution-compatibility", - "schema-manage" + "schema-get-started", + "schema-evolution-compatibility" ] }, { @@ -324,6 +324,7 @@ "admin-api-topics", "admin-api-functions", "admin-api-packages", + "admin-api-schemas", "admin-api-transactions" ] }, From 399980af5f22cf49f78be35b3347fd74343636ac Mon Sep 17 00:00:00 2001 From: momo-jun Date: Mon, 31 Oct 2022 14:59:40 +0800 Subject: [PATCH 03/10] add more info to What and Why --- site2/docs/schema-overview.md | 39 ++++++++++++++++++++++++--------- site2/docs/schema-understand.md | 34 ++-------------------------- 2 files changed, 31 insertions(+), 42 deletions(-) diff --git a/site2/docs/schema-overview.md b/site2/docs/schema-overview.md index e4308b0779810..dfdd7b67ceab6 100644 --- a/site2/docs/schema-overview.md +++ b/site2/docs/schema-overview.md @@ -11,18 +11,41 @@ This section introduces the following content: * [What's next?](#whats-next) -## What is Pulsar Schema +## What is Pulsar schema -Pulsar schema is a data structure to define how to serialize/deserialize data and how to evolve your data format with backward compatibility. +Pulsar messages are stored as unstructured byte arrays and the structure is applied to this data only when it's read. +Within Pulsar, each message consists of two distinct parts: +* the message payload is stored as raw bytes to provide maximum flexibility; +* a collection of user-defined properties are stored as key/value pairs. + +Pulsar has a built-in **schema registry** that enables producers/consumers to coordinate on the structure of a topic’s data through brokers and enables clients to upload schemas on a per-topic basis, without needing an additional serving layer for your metadata. + +Each schema is a data structure used to serialize the bytes before they are published to a topic, and to deserialize them before they are delivered to the consumers. These schemas dictate which data types are recognized as valid for that topic. + +:::note + +Currently, Pulsar schema is only available for the [Java client](client-libraries-java.md), [Go client](client-libraries-go.md), [Python client](client-libraries-python.md), and [C++ client](client-libraries-cpp.md). + +::: ## Why use it -When a schema is enabled, Pulsar does parse data, it takes bytes as inputs and sends bytes as outputs. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations: +Type safety is extremely important in any application built around a message bus like Pulsar. Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid various potential problems arising. For example, serialization and deserialization issues. + +Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. Producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis. If a producer is sending temperature sensor data on a topic, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings. + +With schema registry, producers and consumers inform the messaging system which data types can be transmitted via a topic. The messaging system enforces type safety and ensures that producers and consumers remain synced. The schema registry provides a central location for storing information about the schemas used within your organization, in turn greatly simplifies the sharing of this information across application teams. It serves as a single source of truth for all the message schemas used across all your services and development teams, which makes it easier for them to collaborate. + +Having a central schema registry along with the consistent use of schemas across the organization also makes data consumption and discovery much easier. If you define a standard schema for a common business entity such as a customer, product, or order that almost all applications will use, then all message producing applications will be able to generate messages in the latest format. Similarly, consuming applications won’t need to perform any transformations on the data in order to make it conform to a different format. + +## Use case + +When a schema is enabled, Pulsar does parse data. It takes bytes as inputs and sends bytes as outputs. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations: * The field does not exist. * The field type has changed (for example, `string` is changed to `int`). -There are a few methods to prevent and overcome these exceptions, for example, you can catch exceptions when parsing errors, which makes code hard to maintain; or you can adopt a schema management system to perform schema evolution, not to break downstream applications, and enforces type safety to max extend in the language you are using, the solution is Pulsar Schema. +You can adopt the Pulsar schema registry to perform schema evolution, to enforce data type safety in the language you are using and not break downstream applications. Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types like `string` to more complex application-specific types. @@ -39,12 +62,10 @@ public class User { When constructing a producer with the _User_ class, you can specify a schema or not as below. -### Without schema +**Without schema** If you construct a producer without specifying a schema, then the producer can only produce messages of type `byte[]`. If you have a POJO class, you need to serialize the POJO into bytes before sending messages. -**Example** - ```java Producer producer = client.newProducer() .topic(topic) @@ -54,12 +75,10 @@ byte[] message = … // serialize the `user` by yourself; producer.send(message); ``` -### With schema +**With schema** If you construct a producer with specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. -**Example** - This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. ```java diff --git a/site2/docs/schema-understand.md b/site2/docs/schema-understand.md index 97a2f58b4b0b5..9f73713c0ccc2 100644 --- a/site2/docs/schema-understand.md +++ b/site2/docs/schema-understand.md @@ -12,7 +12,7 @@ import TabItem from '@theme/TabItem'; This section explains the basic concepts of Pulsar schema and provides additional reference. -## Definition +## Schema definition Pulsar schema is defined in a data structure called `SchemaInfo`. It is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level. @@ -117,7 +117,7 @@ When migrating or ingesting event or message data from external systems (such as Hence, we provide `Schema.NATIVE_AVRO` to wrap a native Avro schema of type `org.apache.avro.Schema`. The result is a schema instance of Pulsar that accepts a serialized Avro payload without validating it against the wrapped Avro schema. See for more details. -## Schema version +## Schema versioning Each `SchemaInfo` stored with a topic has a version. Schema version manages schema changes happening within a topic. @@ -151,36 +151,6 @@ The table below lists the possible scenarios when this connection attempt occurs |
  • No schema exists for the topic.
  • | (1) The producer is created using the given schema. (2) Since no existing schema is compatible with the `SensorReading` schema, the schema is transmitted to the broker and stored. (3) Any consumer created using the same schema or topic can consume messages from the `sensor-data` topic. | |
  • A schema already exists.
  • The producer connects using the same schema that is already stored.
  • | (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible. (3) The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it is used to tag produced messages. |
  • A schema already exists.
  • The producer connects using a new schema that is compatible.
  • | (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number). | -## Schema Registry - -Type safety is extremely important in any application built around a message bus like Pulsar. - -Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid various potential problems arising. For example, serialization and deserialization issues. - -Applications typically adopt one of the following approaches to guarantee type safety in messaging. Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis. - -:::note - -Currently, the Pulsar schema registry is only available for the [Java client](client-libraries-java.md), [Go client](client-libraries-go.md), [Python client](client-libraries-python.md), and [C++ client](client-libraries-cpp.md). - -::: - -### Client-side approach - -Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. - -If a producer is sending temperature sensor data on the topic `topic-1`, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings. - -Producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis. - -### Server-side approach - -Producers and consumers inform the system which data types can be transmitted via the topic. - -With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced. - -Pulsar has a built-in **schema registry** that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic. - ## Schema AutoUpdate If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default. From d3bdf2efb4e4ab516bf16a7558026d273aaae582 Mon Sep 17 00:00:00 2001 From: momo-jun Date: Mon, 31 Oct 2022 16:12:06 +0800 Subject: [PATCH 04/10] relocate code snippets from java/cpp clients to Schema chapter --- site2/docs/client-libraries-cpp.md | 78 +------------- site2/docs/client-libraries-java.md | 91 +--------------- site2/docs/schema-get-started.md | 162 ++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 166 deletions(-) diff --git a/site2/docs/client-libraries-cpp.md b/site2/docs/client-libraries-cpp.md index 58d1cef4b3c20..3318d3087a5bb 100644 --- a/site2/docs/client-libraries-cpp.md +++ b/site2/docs/client-libraries-cpp.md @@ -412,80 +412,4 @@ For complete examples, refer to [C++ client examples](https://github.com/apache/ ## Schema -This section describes some examples about schema. For more information about schema, see [Pulsar schema](schema-get-started.md). - -### Avro schema - -- The following example shows how to create a producer with an Avro schema. - - ```cpp - static const std::string exampleSchema = - "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," - "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}"; - Producer producer; - ProducerConfiguration producerConf; - producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); - client.createProducer("topic-avro", producerConf, producer); - ``` - -- The following example shows how to create a consumer with an Avro schema. - - ```cpp - static const std::string exampleSchema = - "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," - "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}"; - ConsumerConfiguration consumerConf; - Consumer consumer; - consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); - client.subscribe("topic-avro", "sub-2", consumerConf, consumer) - ``` - -### ProtobufNative schema - -The following example shows how to create a producer and a consumer with a ProtobufNative schema. - -1. Generate the `User` class using Protobuf3 or later versions. - - ```protobuf - syntax = "proto3"; - - message User { - string name = 1; - int32 age = 2; - } - ``` - -2. Include the `ProtobufNativeSchema.h` in your source code. Ensure the Protobuf dependency has been added to your project. - - ```cpp - #include - ``` - -3. Create a producer to send a `User` instance. - - ```cpp - ProducerConfiguration producerConf; - producerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor())); - Producer producer; - client.createProducer("topic-protobuf", producerConf, producer); - User user; - user.set_name("my-name"); - user.set_age(10); - std::string content; - user.SerializeToString(&content); - producer.send(MessageBuilder().setContent(content).build()); - ``` - -4. Create a consumer to receive a `User` instance. - - ```cpp - ConsumerConfiguration consumerConf; - consumerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor())); - consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest); - Consumer consumer; - client.subscribe("topic-protobuf", "my-sub", consumerConf, consumer); - Message msg; - consumer.receive(msg); - User user2; - user2.ParseFromArray(msg.getData(), msg.getLength()); - ``` +To work with [Pulsar schema](schema-overview.md) using C++ clients, see [Schema - Get started](schema-get-started.md). For specific schema types that C++ clients support, see [code](https://github.com/apache/pulsar-client-cpp/blob/main/include/pulsar/Schema.h#L51-L132). \ No newline at end of file diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md index 2135c165a45b3..2797ede8993ab 100644 --- a/site2/docs/client-libraries-java.md +++ b/site2/docs/client-libraries-java.md @@ -1158,7 +1158,7 @@ tv.forEach((key, value) -> /*operations on all existing messages*/) ## Schema -In Pulsar, all message data consists of byte arrays "under the hood." [Message schemas](schema-get-started.md) enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a [producer](#producer) without specifying a schema, then the producer can only produce messages of type `byte[]`. The following is an example. +In Pulsar, all message data consists of byte arrays "under the hood." [Message schemas](schema-overview.md) enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a [producer](#producer) without specifying a schema, then the producer can only produce messages of type `byte[]`. The following is an example. ```java Producer producer = client.newProducer() @@ -1166,95 +1166,8 @@ Producer producer = client.newProducer() .create(); ``` -The producer above is equivalent to a `Producer` (in fact, you should *always* explicitly specify the type). If you'd like to use a producer for a different type of data, you'll need to specify a **schema** that informs Pulsar which data type will be transmitted over the [topic](reference-terminology.md#topic). +The producer above is equivalent to a `Producer` (in fact, you should *always* explicitly specify the type). If you'd like to use a producer for a different type of data, you need to specify a **schema** that informs Pulsar which data type will be transmitted over the topic. For more examples, see [Schema - Get started](schema-get-started.md). -### AvroBaseStructSchema example - -Let's say that you have a `SensorReading` class that you'd like to transmit over a Pulsar topic: - -```java -public class SensorReading { - public float temperature; - - public SensorReading(float temperature) { - this.temperature = temperature; - } - - // A no-arg constructor is required - public SensorReading() { - } - - public float getTemperature() { - return temperature; - } - - public void setTemperature(float temperature) { - this.temperature = temperature; - } -} -``` - -You could then create a `Producer` (or `Consumer`) like this: - -```java -Producer producer = client.newProducer(JSONSchema.of(SensorReading.class)) - .topic("sensor-readings") - .create(); -``` - -The following schema formats are currently available for Java: - -* No schema or the byte array schema (which can be applied using `Schema.BYTES`): - - ```java - Producer bytesProducer = client.newProducer(Schema.BYTES) - .topic("some-raw-bytes-topic") - .create(); - ``` - - Or, equivalently: - - ```java - Producer bytesProducer = client.newProducer() - .topic("some-raw-bytes-topic") - .create(); - ``` - -* `String` for normal UTF-8-encoded string data. Apply the schema using `Schema.STRING`: - - ```java - Producer stringProducer = client.newProducer(Schema.STRING) - .topic("some-string-topic") - .create(); - ``` - -* Create JSON schemas for POJOs using `Schema.JSON`. The following is an example. - - ```java - Producer pojoProducer = client.newProducer(Schema.JSON(MyPojo.class)) - .topic("some-pojo-topic") - .create(); - ``` - -* Generate Protobuf schemas using `Schema.PROTOBUF`. The following example shows how to create the Protobuf schema and use it to instantiate a new producer: - - ```java - Producer protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class)) - .topic("some-protobuf-topic") - .create(); - ``` - -* Define Avro schemas with `Schema.AVRO`. The following code snippet demonstrates how to create and use Avro schema. - - ```java - Producer avroProducer = client.newProducer(Schema.AVRO(MyAvro.class)) - .topic("some-avro-topic") - .create(); - ``` - -### ProtobufNativeSchema example - -For examples of ProtobufNativeSchema, see [`SchemaDefinition` in `Complex type`](schema-understand.md#complex-type). ## Authentication diff --git a/site2/docs/schema-get-started.md b/site2/docs/schema-get-started.md index eaf39d86dc7db..308f57c0624ce 100644 --- a/site2/docs/schema-get-started.md +++ b/site2/docs/schema-get-started.md @@ -206,6 +206,168 @@ You can define the `schemaDefinition` to generate a `struct` schema. ```` +### Avro schema using Java + +Suppose you have a `SensorReading` class as follows, and you'd like to transmit it over a Pulsar topic. + +```java +public class SensorReading { + public float temperature; + + public SensorReading(float temperature) { + this.temperature = temperature; + } + + // A no-arg constructor is required + public SensorReading() { + } + + public float getTemperature() { + return temperature; + } + + public void setTemperature(float temperature) { + this.temperature = temperature; + } +} +``` + +Create a `Producer` (or `Consumer`) like this: + +```java +Producer producer = client.newProducer(JSONSchema.of(SensorReading.class)) + .topic("sensor-readings") + .create(); +``` + +The following schema formats are currently available for Java: + +* No schema or the byte array schema (which can be applied using `Schema.BYTES`): + + ```java + Producer bytesProducer = client.newProducer(Schema.BYTES) + .topic("some-raw-bytes-topic") + .create(); + ``` + + Or, equivalently: + + ```java + Producer bytesProducer = client.newProducer() + .topic("some-raw-bytes-topic") + .create(); + ``` + +* `String` for normal UTF-8-encoded string data. Apply the schema using `Schema.STRING`: + + ```java + Producer stringProducer = client.newProducer(Schema.STRING) + .topic("some-string-topic") + .create(); + ``` + +* Create JSON schemas for POJOs using `Schema.JSON`. The following is an example. + + ```java + Producer pojoProducer = client.newProducer(Schema.JSON(MyPojo.class)) + .topic("some-pojo-topic") + .create(); + ``` + +* Generate Protobuf schemas using `Schema.PROTOBUF`. The following example shows how to create the Protobuf schema and use it to instantiate a new producer: + + ```java + Producer protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class)) + .topic("some-protobuf-topic") + .create(); + ``` + +* Define Avro schemas with `Schema.AVRO`. The following code snippet demonstrates how to create and use Avro schema. + + ```java + Producer avroProducer = client.newProducer(Schema.AVRO(MyAvro.class)) + .topic("some-avro-topic") + .create(); + ``` + + +### Avro schema using C++ + +- The following example shows how to create a producer with an Avro schema. + + ```cpp + static const std::string exampleSchema = + "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," + "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}"; + Producer producer; + ProducerConfiguration producerConf; + producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); + client.createProducer("topic-avro", producerConf, producer); + ``` + +- The following example shows how to create a consumer with an Avro schema. + + ```cpp + static const std::string exampleSchema = + "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," + "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}"; + ConsumerConfiguration consumerConf; + Consumer consumer; + consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); + client.subscribe("topic-avro", "sub-2", consumerConf, consumer) + ``` + +### ProtobufNative schema using C++ + +The following example shows how to create a producer and a consumer with a ProtobufNative schema. + +1. Generate the `User` class using Protobuf3 or later versions. + + ```protobuf + syntax = "proto3"; + + message User { + string name = 1; + int32 age = 2; + } + ``` + +2. Include the `ProtobufNativeSchema.h` in your source code. Ensure the Protobuf dependency has been added to your project. + + ```cpp + #include + ``` + +3. Create a producer to send a `User` instance. + + ```cpp + ProducerConfiguration producerConf; + producerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor())); + Producer producer; + client.createProducer("topic-protobuf", producerConf, producer); + User user; + user.set_name("my-name"); + user.set_age(10); + std::string content; + user.SerializeToString(&content); + producer.send(MessageBuilder().setContent(content).build()); + ``` + +4. Create a consumer to receive a `User` instance. + + ```cpp + ConsumerConfiguration consumerConf; + consumerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor())); + consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest); + Consumer consumer; + client.subscribe("topic-protobuf", "my-sub", consumerConf, consumer); + Message msg; + consumer.receive(msg); + User user2; + user2.ParseFromArray(msg.getData(), msg.getLength()); + ``` + + ## Construct an AUTO_PRODUCE schema Suppose you have a Pulsar topic _P_, a producer processing messages from a Kafka topic _K_, an application reading the messages from _K_ and writing the messages to _P_. From 4ab981f5fafbf6f6fef225fe3acf8665264549e7 Mon Sep 17 00:00:00 2001 From: momo-jun Date: Mon, 31 Oct 2022 16:48:24 +0800 Subject: [PATCH 05/10] preview style fix --- site2/docs/admin-api-schemas.md | 14 +++++++------- site2/docs/schema-overview.md | 16 ++++++++-------- site2/docs/schema-understand.md | 4 +++- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/site2/docs/admin-api-schemas.md b/site2/docs/admin-api-schemas.md index 73e8ad9613131..1779bbfc23fa4 100644 --- a/site2/docs/admin-api-schemas.md +++ b/site2/docs/admin-api-schemas.md @@ -102,7 +102,7 @@ The `schema-definition-file` includes the following fields: | Field | Description | | --- | --- | | `type` | The schema type. | -| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a **primitive** schema, this field should be blank.
  • If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
  • | | `properties` | The additional properties associated with the schema. | Here are examples of the `schema-definition-file` for a JSON schema. @@ -149,7 +149,7 @@ The post payload includes the following fields: | Field | Description | | --- | --- | | `type` | The schema type. | -| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a **primitive** schema, this field should be blank.
  • If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
  • | | `properties` | The additional properties associated with the schema. | @@ -164,7 +164,7 @@ The `PostSchemaPayload` includes the following fields: | Field | Description | | --- | --- | | `type` | The schema type. | -| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a **primitive** schema, this field should be blank.
  • If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
  • | | `properties` | The additional properties associated with the schema. | Here is an example of `PostSchemaPayload`: @@ -236,7 +236,7 @@ The response includes the following fields: | `version` | The schema version, which is a long number. | | `type` | The schema type. | | `timestamp` | The timestamp of creating this version of schema. | -| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a **primitive** schema, this field should be blank.
  • If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
  • | | `properties` | The additional properties associated with the schema. | @@ -252,7 +252,7 @@ The `SchemaInfo` includes the following fields: | --- | --- | | `name` | The schema name. | | `type` | The schema type. | -| `schema` | A byte array of the schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this byte array should be empty.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | +| `schema` | A byte array of the schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a **primitive** schema, this byte array should be empty.
  • If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | | `properties` | The additional properties associated with the schema. | Here is an example of `SchemaInfo`: @@ -309,7 +309,7 @@ The response includes the following fields: | `version` | The schema version, which is a long number. | | `type` | The schema type. | | `timestamp` | The timestamp of creating this version of schema. | -| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a **primitive** schema, this field should be blank.
  • If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
  • | | `properties` | The additional properties associated with the schema. | @@ -325,7 +325,7 @@ The `SchemaInfo` includes the following fields: | --- | --- | | `name` | The schema name. | | `type` | The schema type. | -| `schema` | A byte array of the schema definition data, which is encoded in UTF 8.
  • If the schema is a
  • **primitive**
  • schema, this byte array should be empty.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | +| `schema` | A byte array of the schema definition data, which is encoded in UTF 8.
  • If the schema is a **primitive** schema, this byte array should be empty.
  • If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | | `properties` | The additional properties associated with the schema. | Here is an example of `SchemaInfo`: diff --git a/site2/docs/schema-overview.md b/site2/docs/schema-overview.md index dfdd7b67ceab6..5adc1d03e52c0 100644 --- a/site2/docs/schema-overview.md +++ b/site2/docs/schema-overview.md @@ -115,23 +115,23 @@ This diagram illustrates how does schema work on the Producer side. 5. If no, the broker verifies whether a schema can be automatically created in this namespace: - * If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic. + * If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic. - * If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be created, and the producer is rejected to connect to the broker. + * If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be created, and the producer is rejected to connect to the broker. -:::tip + :::tip -`isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST API.** + `isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST API.** -For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage AutoUpdate Strategy](admin-api-schemas.md/#manage-autoupdate-strategy). + For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage AutoUpdate Strategy](admin-api-schemas.md#manage-autoupdate-strategy). -::: + ::: 6. If the schema is allowed to be updated, then the compatible strategy check is performed. - * If the schema is compatible, the broker stores it and returns the schema version to the producer. All the messages produced by this producer are tagged with the schema version. + * If the schema is compatible, the broker stores it and returns the schema version to the producer. All the messages produced by this producer are tagged with the schema version. - * If the schema is incompatible, the broker rejects it. + * If the schema is incompatible, the broker rejects it. ### Consumer side diff --git a/site2/docs/schema-understand.md b/site2/docs/schema-understand.md index 9f73713c0ccc2..28b971a647130 100644 --- a/site2/docs/schema-understand.md +++ b/site2/docs/schema-understand.md @@ -98,11 +98,13 @@ You can choose the encoding type when constructing the key/value schema.: `AvroBaseStructSchema`|Pulsar uses [Avro Specification](http://avro.apache.org/docs/current/spec.html) to declare the schema definition for `AvroBaseStructSchema`, which supports `AvroSchema`, `JsonSchema`, and `ProtobufSchema`.

    This allows Pulsar:
    - to use the same tools to manage schema definitions
    - to use different serialization or deserialization methods to handle data| `ProtobufNativeSchema`|`ProtobufNativeSchema` is based on protobuf native Descriptor.

    This allows Pulsar:
    - to use native protobuf-v3 to serialize or deserialize data
    - to use `AutoConsume` to deserialize data. -Pulsar provides the following methods to [use the `struct` schema](schema-get-started.md#construct-a-struct-schema): +Pulsar provides the following methods to use the `struct` schema. * `static` * `generic` * `SchemaDefinition` +For more examples, see [Construct a struct schema](schema-get-started.md#construct-a-struct-schema). + ### Auto Schema If you don't know the schema type of a Pulsar topic in advance, you can use AUTO schema to produce or consume generic records to or from brokers. From 6ec26b3ab7fe04a7b510e16ae92d557e7ec82b58 Mon Sep 17 00:00:00 2001 From: momo-jun Date: Tue, 1 Nov 2022 13:12:28 +0800 Subject: [PATCH 06/10] preview fixes --- site2/docs/admin-api-schemas.md | 4 ++-- site2/docs/schema-evolution-compatibility.md | 10 +++++----- site2/docs/schema-overview.md | 16 +++++++--------- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/site2/docs/admin-api-schemas.md b/site2/docs/admin-api-schemas.md index 1779bbfc23fa4..15f11f4a91b01 100644 --- a/site2/docs/admin-api-schemas.md +++ b/site2/docs/admin-api-schemas.md @@ -431,9 +431,9 @@ You can set [schema compatibility check strategy](schema-evolution-compatibility The schema compatibility check strategy set at different levels has priority: topic level > namespace level > broker level. -- If you set the strategy at both topic and namespace level, it uses the topic-level strategy. +- If you set the strategy at both topic and namespace levels, it uses the topic-level strategy. -- If you set the strategy at both namespace and broker level, it uses the namespace-level strategy. +- If you set the strategy at both namespace and broker levels, it uses the namespace-level strategy. - If you do not set the strategy at any level, it uses the `FULL` strategy. For all available values, see [here](schema-evolution-compatibility.md#schema-compatibility-check-strategy). diff --git a/site2/docs/schema-evolution-compatibility.md b/site2/docs/schema-evolution-compatibility.md index d50a24b400aba..419c17c604a57 100644 --- a/site2/docs/schema-evolution-compatibility.md +++ b/site2/docs/schema-evolution-compatibility.md @@ -69,7 +69,7 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is t ::: -#### ALWAYS_COMPATIBLE example +### ALWAYS_COMPATIBLE example In some situations, an application needs to store events of several different types in the same Pulsar topic. @@ -79,7 +79,7 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is t Consequently, those events need to go in the same Pulsar partition to maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different kinds of events to co-exist in the same topic. -#### ALWAYS_INCOMPATIBLE example +### ALWAYS_INCOMPATIBLE example Sometimes we also make incompatible changes. For example, you are modifying a field type from `string` to `int`. @@ -89,7 +89,7 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is t * Optionally, create a new topic and start migrating applications to use the new topic and the new schema, avoiding the need to handle two incompatible versions in the same topic. -#### BACKWARD and BACKWARD_TRANSITIVE example +### BACKWARD and BACKWARD_TRANSITIVE example * Example 1 @@ -103,7 +103,7 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is t Same SQL queries must continue to work even if the data is changed. To support it, you can evolve the schemas using the `BACKWARD` strategy. -#### FORWARD and FORWARD_TRANSITIVE example +### FORWARD and FORWARD_TRANSITIVE example * Example 1 @@ -119,7 +119,7 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is t Consequently, you can evolve the schemas using the `FORWARD` strategy to ensure that the old schema can process data encoded with the new schema. -#### FULL and FULL_TRANSITIVE example +### FULL and FULL_TRANSITIVE example In some data formats, for example, Avro, you can define fields with default values. Consequently, adding or removing a field with a default value is a fully compatible change. diff --git a/site2/docs/schema-overview.md b/site2/docs/schema-overview.md index 5adc1d03e52c0..91724243d2b0e 100644 --- a/site2/docs/schema-overview.md +++ b/site2/docs/schema-overview.md @@ -10,13 +10,12 @@ This section introduces the following content: * [How it works](#how-it-works) * [What's next?](#whats-next) - ## What is Pulsar schema Pulsar messages are stored as unstructured byte arrays and the structure is applied to this data only when it's read. Within Pulsar, each message consists of two distinct parts: -* the message payload is stored as raw bytes to provide maximum flexibility; +* message payload is stored as raw bytes to provide maximum flexibility; * a collection of user-defined properties are stored as key/value pairs. Pulsar has a built-in **schema registry** that enables producers/consumers to coordinate on the structure of a topic’s data through brokers and enables clients to upload schemas on a per-topic basis, without needing an additional serving layer for your metadata. @@ -33,11 +32,11 @@ Currently, Pulsar schema is only available for the [Java client](client-librarie Type safety is extremely important in any application built around a message bus like Pulsar. Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid various potential problems arising. For example, serialization and deserialization issues. -Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. Producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis. If a producer is sending temperature sensor data on a topic, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings. +Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. Producers and consumers can send and receive messages consisting of raw byte arrays and leave all types of safety enforcement to the application on an "out-of-band" basis. If a producer is sending temperature sensor data on a topic, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings. With schema registry, producers and consumers inform the messaging system which data types can be transmitted via a topic. The messaging system enforces type safety and ensures that producers and consumers remain synced. The schema registry provides a central location for storing information about the schemas used within your organization, in turn greatly simplifies the sharing of this information across application teams. It serves as a single source of truth for all the message schemas used across all your services and development teams, which makes it easier for them to collaborate. -Having a central schema registry along with the consistent use of schemas across the organization also makes data consumption and discovery much easier. If you define a standard schema for a common business entity such as a customer, product, or order that almost all applications will use, then all message producing applications will be able to generate messages in the latest format. Similarly, consuming applications won’t need to perform any transformations on the data in order to make it conform to a different format. +Having a central schema registry along with the consistent use of schemas across the organization also makes data consumption and discovery much easier. If you define a standard schema for a common business entity such as a customer, product, or order that almost all applications will use, then all message-producing applications will be able to generate messages in the latest format. Similarly, consuming applications won’t need to perform any transformations on the data in order to make it conform to a different format. ## Use case @@ -45,7 +44,7 @@ When a schema is enabled, Pulsar does parse data. It takes bytes as inputs and s * The field does not exist. * The field type has changed (for example, `string` is changed to `int`). -You can adopt the Pulsar schema registry to perform schema evolution, to enforce data type safety in the language you are using and not break downstream applications. +You can adopt the Pulsar schema registry to perform schema evolution, enforcing data type safety in the language you are using and not breaking downstream applications. Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types like `string` to more complex application-specific types. @@ -77,7 +76,7 @@ producer.send(message); **With schema** -If you construct a producer with specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. +If you construct a producer by specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. @@ -97,7 +96,7 @@ Producers and consumers upload schemas to brokers, so Pulsar schemas work on the ### Producer side -This diagram illustrates how does schema work on the Producer side. +This diagram illustrates how schema works on the Producer side. ![Schema works at the producer side](/assets/schema-producer.png) @@ -135,7 +134,7 @@ This diagram illustrates how does schema work on the Producer side. ### Consumer side -This diagram illustrates how does Schema work on the consumer side. +This diagram illustrates how schema works on the consumer side. ![Schema works at the consumer side](/assets/schema-consumer.png) @@ -163,7 +162,6 @@ This diagram illustrates how does Schema work on the consumer side. If the schema used by the consumer supports schema versioning (for example, AVRO schema), the consumer fetches the `SchemaInfo` of the version tagged in messages and uses the passed-in schema and the schema tagged in messages to decode the messages. - ## What's next? * [Understand basic concepts](schema-understand.md) From d5147dfe52623675d81400e2862abc405d9d8b53 Mon Sep 17 00:00:00 2001 From: momo-jun Date: Tue, 1 Nov 2022 16:54:26 +0800 Subject: [PATCH 07/10] overview updates --- site2/docs/schema-evolution-compatibility.md | 2 +- site2/docs/schema-get-started.md | 6 +- site2/docs/schema-overview.md | 121 +++++++++---------- site2/docs/schema-understand.md | 12 +- 4 files changed, 65 insertions(+), 76 deletions(-) diff --git a/site2/docs/schema-evolution-compatibility.md b/site2/docs/schema-evolution-compatibility.md index 419c17c604a57..438eea638bfb0 100644 --- a/site2/docs/schema-evolution-compatibility.md +++ b/site2/docs/schema-evolution-compatibility.md @@ -77,7 +77,7 @@ Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is t For example, for a user entity, there are `userCreated`, `userAddressChanged` and `userEnquiryReceived` events. The application requires that those events are always read in the same order. - Consequently, those events need to go in the same Pulsar partition to maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different kinds of events to co-exist in the same topic. + Consequently, those events need to go in the same Pulsar partition to maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different kinds of events to co-exist on the same topic. ### ALWAYS_INCOMPATIBLE example diff --git a/site2/docs/schema-get-started.md b/site2/docs/schema-get-started.md index 308f57c0624ce..fb1b272348d58 100644 --- a/site2/docs/schema-get-started.md +++ b/site2/docs/schema-get-started.md @@ -372,7 +372,7 @@ The following example shows how to create a producer and a consumer with a Proto Suppose you have a Pulsar topic _P_, a producer processing messages from a Kafka topic _K_, an application reading the messages from _K_ and writing the messages to _P_. -This example shows how construct an [AUTO_PRODUCE](schema-understand.md#auto-schema) schema to verify whether the bytes produced by _K_ can be sent to _P_. +This example shows how to construct an [AUTO_PRODUCE](schema-understand.md#auto-schema) schema to verify whether the bytes produced by _K_ can be sent to _P_. ```java Produce pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE()) @@ -388,7 +388,7 @@ pulsarProducer.produce(kafkaMessageBytes); Suppose you have a Pulsar topic _P_, a consumer (for example, _MySQL_) receiving messages from the topic _P_, an application reading the messages from _P_ and writing the messages to _MySQL_. -This example shows how construct an [AUTO_CONSUME schema](schema-understand.md#auto-schema) to verify whether the bytes produced by _P_ can be sent to _MySQL_. +This example shows how to construct an [AUTO_CONSUME schema](schema-understand.md#auto-schema) to verify whether the bytes produced by _P_ can be sent to _MySQL_. ```java Consumer pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME()) @@ -401,7 +401,7 @@ GenericRecord record = msg.getValue(); ## Construct a native Avro schema -This example shows how construct a [native Avro schema](schema-understand.md#native-avro-schema). +This example shows how to construct a [native Avro schema](schema-understand.md#native-avro-schema). ```java org.apache.avro.Schema nativeAvroSchema = … ; diff --git a/site2/docs/schema-overview.md b/site2/docs/schema-overview.md index 91724243d2b0e..9eed6b39db2e1 100644 --- a/site2/docs/schema-overview.md +++ b/site2/docs/schema-overview.md @@ -1,6 +1,6 @@ --- id: schema-overview -title: Pulsar Schema overview +title: Overview sidebar_label: "Overview" --- @@ -10,17 +10,11 @@ This section introduces the following content: * [How it works](#how-it-works) * [What's next?](#whats-next) -## What is Pulsar schema +## What is Pulsar Schema -Pulsar messages are stored as unstructured byte arrays and the structure is applied to this data only when it's read. +Pulsar messages are stored as unstructured byte arrays and the data structure (as known as schema) is applied to this data only when it's read. The schema serializes the bytes before they are published to a topic and deserializes them before they are delivered to the consumers, dictating which data types are recognized as valid for a given topic. -Within Pulsar, each message consists of two distinct parts: -* message payload is stored as raw bytes to provide maximum flexibility; -* a collection of user-defined properties are stored as key/value pairs. - -Pulsar has a built-in **schema registry** that enables producers/consumers to coordinate on the structure of a topic’s data through brokers and enables clients to upload schemas on a per-topic basis, without needing an additional serving layer for your metadata. - -Each schema is a data structure used to serialize the bytes before they are published to a topic, and to deserialize them before they are delivered to the consumers. These schemas dictate which data types are recognized as valid for that topic. +Pulsar schema registry is a central repository to store the schema information, which enables producers/consumers to coordinate on the schema of a topic’s data through brokers. :::note @@ -30,63 +24,14 @@ Currently, Pulsar schema is only available for the [Java client](client-librarie ## Why use it -Type safety is extremely important in any application built around a message bus like Pulsar. Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid various potential problems arising. For example, serialization and deserialization issues. - -Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. Producers and consumers can send and receive messages consisting of raw byte arrays and leave all types of safety enforcement to the application on an "out-of-band" basis. If a producer is sending temperature sensor data on a topic, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings. - -With schema registry, producers and consumers inform the messaging system which data types can be transmitted via a topic. The messaging system enforces type safety and ensures that producers and consumers remain synced. The schema registry provides a central location for storing information about the schemas used within your organization, in turn greatly simplifies the sharing of this information across application teams. It serves as a single source of truth for all the message schemas used across all your services and development teams, which makes it easier for them to collaborate. - -Having a central schema registry along with the consistent use of schemas across the organization also makes data consumption and discovery much easier. If you define a standard schema for a common business entity such as a customer, product, or order that almost all applications will use, then all message-producing applications will be able to generate messages in the latest format. Similarly, consuming applications won’t need to perform any transformations on the data in order to make it conform to a different format. - -## Use case - -When a schema is enabled, Pulsar does parse data. It takes bytes as inputs and sends bytes as outputs. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations: -* The field does not exist. -* The field type has changed (for example, `string` is changed to `int`). - -You can adopt the Pulsar schema registry to perform schema evolution, enforcing data type safety in the language you are using and not breaking downstream applications. - -Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types like `string` to more complex application-specific types. - -**Example** - -You can use the _User_ class to define the messages sent to Pulsar topics. - -```java -public class User { - String name; - int age; -} -``` - -When constructing a producer with the _User_ class, you can specify a schema or not as below. +Type safety is extremely important in any application built around a messaging and streaming system. Raw bytes are flexible for data transfer, but the flexibility and neutrality come with a cost: you have to overlay data type checking and serialization/deserialization to ensure that the bytes fed into the system can be read and successfully consumed. In other words, you need to make sure the data intelligible and usable to applications. -**Without schema** - -If you construct a producer without specifying a schema, then the producer can only produce messages of type `byte[]`. If you have a POJO class, you need to serialize the POJO into bytes before sending messages. - -```java -Producer producer = client.newProducer() - .topic(topic) - .create(); -User user = new User("Tom", 28); -byte[] message = … // serialize the `user` by yourself; -producer.send(message); -``` - -**With schema** - -If you construct a producer by specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. - -This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. - -```java -Producer producer = client.newProducer(JSONSchema.of(User.class)) - .topic(topic) - .create(); -User user = new User("Tom", 28); -producer.send(user); -``` +Pulsar schema resolves the pain points with the following capabilities: +* enforces the data type safety when a topic has a schema defined. As a result, producers/consumers are only allowed to connect if they are using a “compatible” schema. +* provides a central location for storing information about the schemas used within your organization, in turn greatly simplifies the sharing of this information across application teams. +* serves as a single source of truth for all the message schemas used across all your services and development teams, which makes it easier for them to collaborate. +* keeps data compatibility on-track between schema versions. When new schemas are uploaded, the new versions can be read by old consumers. +* stored in the existing storage layer BookKeeper, no additional system required. ## How it works @@ -162,6 +107,50 @@ This diagram illustrates how schema works on the consumer side. If the schema used by the consumer supports schema versioning (for example, AVRO schema), the consumer fetches the `SchemaInfo` of the version tagged in messages and uses the passed-in schema and the schema tagged in messages to decode the messages. +## Use case + +By default, Pulsar takes bytes as input and sends bytes as output. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations: +* The field does not exist. +* The field type has changed (for example, `string` is changed to `int`). + +You can use language-specific types of data when constructing and handling messages from simple data types like `string` to more complex application-specific types. + +For example, use the _User_ class to define the messages sent to Pulsar topics. + +```java +public class User { + String name; + int age; +} +``` + +**Without a schema** + +If you construct a producer without specifying a schema, then the producer can only produce messages of type `byte[]`. If you have a POJO class, you need to serialize the POJO into bytes before sending messages. + +```java +Producer producer = client.newProducer() + .topic(topic) + .create(); +User user = new User("Tom", 28); +byte[] message = … // serialize the `user` by yourself; +producer.send(message); +``` + +**With a schema** + +If you construct a producer by specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. + +This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. + +```java +Producer producer = client.newProducer(JSONSchema.of(User.class)) + .topic(topic) + .create(); +User user = new User("Tom", 28); +producer.send(user); +``` + ## What's next? * [Understand basic concepts](schema-understand.md) diff --git a/site2/docs/schema-understand.md b/site2/docs/schema-understand.md index 28b971a647130..36b30a3877731 100644 --- a/site2/docs/schema-understand.md +++ b/site2/docs/schema-understand.md @@ -23,7 +23,7 @@ A `SchemaInfo` consists of the following fields: | `name` | Schema name (a string). | | `type` | Schema type, which determines how to interpret the schema data.
  • Predefined schema: see [here](#schema-type).
  • Customized schema: it is left as an empty string.
  • | | `schema`(`payload`) | Schema data, which is a sequence of 8-bit unsigned bytes and schema-type specific. | -| `properties` | It is a user defined properties as a string/string map. Applications can use this bag for carrying any application specific logics. Possible properties might be the Git hash associated with the schema, an environment string like `dev` or `prod`. | +| `properties` | It is a user-defined property as a string/string map. Applications can use this bag for carrying any application-specific logics. Possible properties might be the Git hash associated with the schema, and an environment string like `dev` or `prod`. | **Example** @@ -81,7 +81,7 @@ Currently, Pulsar supports the following complex types: | `keyvalue` | Represents a complex type of a key/value pair. | | `struct` | Handles structured data. It supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. | -#### keyvalue schema +#### `keyvalue` schema `Keyvalue` schema helps applications define schemas for both key and value. Pulsar stores the `SchemaInfo` of key schema and the `SchemaInfo` of value schema together. @@ -89,7 +89,7 @@ You can choose the encoding type when constructing the key/value schema.: * `INLINE` - Key/value pairs are encoded together in the message payload. * `SEPARATED` - see [Construct a key/value schema](schema-get-started.md#construct-a-keyvalue-schema). -#### struct schema +#### `struct` schema `struct` schema supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. @@ -110,8 +110,8 @@ For more examples, see [Construct a struct schema](schema-get-started.md#constru If you don't know the schema type of a Pulsar topic in advance, you can use AUTO schema to produce or consume generic records to or from brokers. Auto schema contains two categories: -* `AUTO_PRODUCE` transfers data from a producer to a Pulsar topic that has a schema and helps the producer validate whether the out-bound bytes is compatible with the schema of the topic. For more instructions, see [Construct an AUTO_PRODUCE schema](schema-get-started.md#construct-an-auto_produce-schema). -* `AUTO_CONSUME` transfers data from a Pulsar topic that has a schema to a consumer and helps the topic validate whether the out-bound bytes is compatible with the consumer. In other words, the topic deserializes messages into language-specific objects `GenericRecord` using the `SchemaInfo` retrieved from brokers. Currently, `AUTO_CONSUME` supports AVRO, JSON and ProtobufNativeSchema schemas. For more instructions, see [Construct an AUTO_CONSUME schema](schema-get-started.md#construct-an-auto_consume-schema). +* `AUTO_PRODUCE` transfers data from a producer to a Pulsar topic that has a schema and helps the producer validate whether the out-bound bytes are compatible with the schema of the topic. For more instructions, see [Construct an AUTO_PRODUCE schema](schema-get-started.md#construct-an-auto_produce-schema). +* `AUTO_CONSUME` transfers data from a Pulsar topic that has a schema to a consumer and helps the topic validate whether the out-bound bytes are compatible with the consumer. In other words, the topic deserializes messages into language-specific objects `GenericRecord` using the `SchemaInfo` retrieved from brokers. Currently, `AUTO_CONSUME` supports AVRO, JSON and ProtobufNativeSchema schemas. For more instructions, see [Construct an AUTO_CONSUME schema](schema-get-started.md#construct-an-auto_consume-schema). ### Native Avro Schema @@ -121,7 +121,7 @@ Hence, we provide `Schema.NATIVE_AVRO` to wrap a native Avro schema of type `org ## Schema versioning -Each `SchemaInfo` stored with a topic has a version. Schema version manages schema changes happening within a topic. +Each `SchemaInfo` stored with a topic has a version. The schema version manages schema changes happening within a topic. Messages produced with a given `SchemaInfo` is tagged with a schema version, so when a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and then use the `SchemaInfo` to deserialize data. From 6b07633b8b7aa7802d1568d23d26888474ba6ab0 Mon Sep 17 00:00:00 2001 From: momo-jun Date: Thu, 3 Nov 2022 16:42:01 +0800 Subject: [PATCH 08/10] fix review comments --- site2/docs/client-libraries-cpp.md | 2 +- site2/docs/schema-get-started.md | 14 ++------------ site2/docs/schema-overview.md | 11 +++-------- site2/docs/schema-understand.md | 6 +++--- 4 files changed, 9 insertions(+), 24 deletions(-) diff --git a/site2/docs/client-libraries-cpp.md b/site2/docs/client-libraries-cpp.md index 87dce8cfa3120..7547487d10d00 100644 --- a/site2/docs/client-libraries-cpp.md +++ b/site2/docs/client-libraries-cpp.md @@ -412,4 +412,4 @@ For complete examples, refer to [C++ client examples](https://github.com/apache/ ## Schema -To work with [Pulsar schema](schema-overview.md) using C++ clients, see [Schema - Get started](schema-get-started.md). For specific schema types that C++ clients support, see [code](https://github.com/apache/pulsar-client-cpp/blob/main/include/pulsar/Schema.h#L51-L132). \ No newline at end of file +To work with [Pulsar schema](schema-overview.md) using C++ clients, see [Schema - Get started](schema-get-started.md). For specific schema types that C++ clients support, see [code](https://github.com/apache/pulsar-client-cpp/blob/main/include/pulsar/Schema.h). \ No newline at end of file diff --git a/site2/docs/schema-get-started.md b/site2/docs/schema-get-started.md index fb1b272348d58..a794db725f1d9 100644 --- a/site2/docs/schema-get-started.md +++ b/site2/docs/schema-get-started.md @@ -58,12 +58,6 @@ This example shows how to construct a [key/value schema](schema-understand.md#ke 3. Produce messages using a key/value schema. ```java - Schema> kvSchema = Schema.KeyValue( - Schema.INT32, - Schema.STRING, - KeyValueEncodingType.SEPARATED - ); - Producer> producer = client.newProducer(kvSchema) .topic(TOPIC) .create(); @@ -80,12 +74,6 @@ This example shows how to construct a [key/value schema](schema-understand.md#ke 4. Consume messages using a key/value schema. ```java - Schema> kvSchema = Schema.KeyValue( - Schema.INT32, - Schema.STRING, - KeyValueEncodingType.SEPARATED - ); - Consumer> consumer = client.newConsumer(kvSchema) ... .topic(TOPIC) @@ -240,6 +228,8 @@ Producer producer = client.newProducer(JSONSchema.of(SensorReadin .create(); ``` +### Avro-based schema using Java + The following schema formats are currently available for Java: * No schema or the byte array schema (which can be applied using `Schema.BYTES`): diff --git a/site2/docs/schema-overview.md b/site2/docs/schema-overview.md index 9eed6b39db2e1..9560a95443f63 100644 --- a/site2/docs/schema-overview.md +++ b/site2/docs/schema-overview.md @@ -8,6 +8,7 @@ This section introduces the following content: * [What is Pulsar Schema](#what-is-pulsar-schema) * [Why use it](#why-use-it) * [How it works](#how-it-works) +* [Use case](#use-case) * [What's next?](#whats-next) ## What is Pulsar Schema @@ -109,13 +110,9 @@ This diagram illustrates how schema works on the consumer side. ## Use case -By default, Pulsar takes bytes as input and sends bytes as output. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations: -* The field does not exist. -* The field type has changed (for example, `string` is changed to `int`). - You can use language-specific types of data when constructing and handling messages from simple data types like `string` to more complex application-specific types. -For example, use the _User_ class to define the messages sent to Pulsar topics. +For example, you are using the _User_ class to define the messages sent to Pulsar topics. ```java public class User { @@ -139,9 +136,7 @@ producer.send(message); **With a schema** -If you construct a producer by specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. - -This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. +This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize POJOs into bytes. ```java Producer producer = client.newProducer(JSONSchema.of(User.class)) diff --git a/site2/docs/schema-understand.md b/site2/docs/schema-understand.md index 36b30a3877731..c488ff57c761b 100644 --- a/site2/docs/schema-understand.md +++ b/site2/docs/schema-understand.md @@ -51,10 +51,10 @@ The following table outlines the primitive types that Pulsar schema supports, an | Primitive Type | Description | Java Type| Python Type | Go Type | |---|---|---|---|---| | `BOOLEAN` | A binary value | boolean | bool | bool | -| `INT8` | A 8-bit signed integer | byte | | int8 | -| `INT16` | A 16-bit signed integer | short | | int16 | +| `INT8` | A 8-bit signed integer | int | | int8 | +| `INT16` | A 16-bit signed integer | int | | int16 | | `INT32` | A 32-bit signed integer | int | | int32 | -| `INT64` | A 64-bit signed integer | long | | int64 | +| `INT64` | A 64-bit signed integer | int | | int64 | | `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number | float | float | float32 | | `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number | double | float | float64| | `BYTES` | A sequence of 8-bit unsigned bytes | byte[], ByteBuffer, ByteBuf | bytes | []byte | From 18db6c73a6e808cdaaff8872eda527f5887ecbab Mon Sep 17 00:00:00 2001 From: momo-jun Date: Thu, 3 Nov 2022 16:45:47 +0800 Subject: [PATCH 09/10] Update schema-get-started.md --- site2/docs/schema-get-started.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/site2/docs/schema-get-started.md b/site2/docs/schema-get-started.md index a794db725f1d9..5b6cea677e3b4 100644 --- a/site2/docs/schema-get-started.md +++ b/site2/docs/schema-get-started.md @@ -223,7 +223,7 @@ public class SensorReading { Create a `Producer` (or `Consumer`) like this: ```java -Producer producer = client.newProducer(JSONSchema.of(SensorReading.class)) +Producer producer = client.newProducer(AvroSchema.of(SensorReading.class)) .topic("sensor-readings") .create(); ``` From e862ab769582f23609bc3a967f99139612257e00 Mon Sep 17 00:00:00 2001 From: momo-jun Date: Fri, 4 Nov 2022 14:13:20 +0800 Subject: [PATCH 10/10] Update schema-understand.md --- site2/docs/schema-understand.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/site2/docs/schema-understand.md b/site2/docs/schema-understand.md index c488ff57c761b..64cb6c9b6d8ea 100644 --- a/site2/docs/schema-understand.md +++ b/site2/docs/schema-understand.md @@ -78,20 +78,20 @@ Currently, Pulsar supports the following complex types: | Complex Type | Description | |---|---| -| `keyvalue` | Represents a complex type of a key/value pair. | -| `struct` | Handles structured data. It supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. | +| `KeyValue` | Represents a complex type of a key/value pair. | +| `Struct` | Handles structured data. It supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. | -#### `keyvalue` schema +#### `KeyValue` schema -`Keyvalue` schema helps applications define schemas for both key and value. Pulsar stores the `SchemaInfo` of key schema and the `SchemaInfo` of value schema together. +`KeyValue` schema helps applications define schemas for both key and value. Pulsar stores the `SchemaInfo` of key schema and the value schema together. You can choose the encoding type when constructing the key/value schema.: * `INLINE` - Key/value pairs are encoded together in the message payload. * `SEPARATED` - see [Construct a key/value schema](schema-get-started.md#construct-a-keyvalue-schema). -#### `struct` schema +#### `Struct` schema -`struct` schema supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. +`Struct` schema supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. |Type|Description| ---|---|