Skip to content

Commit

Permalink
Merge branch 'apache:master' into fix-test
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong authored Dec 6, 2022
2 parents 976a3c4 + b36e012 commit a57c20a
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 262 deletions.
2 changes: 1 addition & 1 deletion distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ The Apache Software License, Version 2.0
- com.fasterxml.jackson.jaxrs-jackson-jaxrs-json-provider-2.13.4.jar
- com.fasterxml.jackson.module-jackson-module-jaxb-annotations-2.13.4.jar
- com.fasterxml.jackson.module-jackson-module-jsonSchema-2.13.4.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-3.1.2.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.0.1.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.7.6.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ flexible messaging model and an intuitive client API.</description>
<javax.ws.rs-api.version>2.1</javax.ws.rs-api.version>
<hdrHistogram.version>2.1.9</hdrHistogram.version>
<javax.servlet-api>3.1.0</javax.servlet-api>
<caffeine.version>3.1.2</caffeine.version>
<caffeine.version>2.9.1</caffeine.version>
<java-semver.version>0.9.0</java-semver.version>
<jline.version>2.14.6</jline.version>
<jline3.version>3.21.0</jline3.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
Expand Down Expand Up @@ -114,6 +117,44 @@ public CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] has
return putSchema(key, value, hash).thenApply(LongSchemaVersion::new);
}

@Override
public CompletableFuture<SchemaVersion> put(String key,
Function<CompletableFuture<List<CompletableFuture<StoredSchema>>>,
CompletableFuture<Pair<byte[], byte[]>>> fn) {
CompletableFuture<SchemaVersion> promise = new CompletableFuture<>();
put(key, fn, promise);
return promise;
}

private void put(String key,
Function<CompletableFuture<List<CompletableFuture<StoredSchema>>>,
CompletableFuture<Pair<byte[], byte[]>>> fn,
CompletableFuture<SchemaVersion> promise) {
CompletableFuture<Pair<Optional<LocatorEntry>, List<CompletableFuture<StoredSchema>>>> schemasWithLocator =
getAllWithLocator(key);
schemasWithLocator.thenCompose(pair ->
fn.apply(completedFuture(pair.getRight())).thenCompose(p -> {
// The schema is existed
if (p == null) {
return CompletableFuture.completedFuture(null);
}
return putSchema(key, p.getLeft(), p.getRight(), pair.getLeft());
}).thenApply(version -> {
return version != null ? new LongSchemaVersion(version) : null;
})).whenComplete((v, ex) -> {
if (ex == null) {
promise.complete(v);
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
put(key, fn, promise);
} else {
promise.completeExceptionally(ex);
}
}
});
}

@Override
public CompletableFuture<StoredSchema> get(String key, SchemaVersion version) {
if (version == SchemaVersion.Latest) {
Expand All @@ -126,30 +167,32 @@ public CompletableFuture<StoredSchema> get(String key, SchemaVersion version) {

@Override
public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key) {
CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new CompletableFuture<>();
getLocator(key).thenAccept(locator -> {
return getAllWithLocator(key).thenApply(Pair::getRight);
}

private CompletableFuture<Pair<Optional<LocatorEntry>, List<CompletableFuture<StoredSchema>>>> getAllWithLocator(
String key) {
return getLocator(key).thenApply(locator -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Get all schemas - locator: {}", key, locator);
}

if (!locator.isPresent()) {
result.complete(Collections.emptyList());
return;
if (locator.isEmpty()) {
return Pair.of(locator, Collections.emptyList());
}

SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator;
List<CompletableFuture<StoredSchema>> list = new ArrayList<>();
schemaLocator.getIndexList().forEach(indexEntry -> list.add(readSchemaEntry(indexEntry.getPosition())
.thenApply(entry -> new StoredSchema
(
entry.getSchemaData().toByteArray(),
new LongSchemaVersion(indexEntry.getVersion())
.thenApply(entry -> new StoredSchema
(
entry.getSchemaData().toByteArray(),
new LongSchemaVersion(indexEntry.getVersion())
)
)
)
));
result.complete(list);
return Pair.of(locator, list);
});
return result;
}

CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
Expand Down Expand Up @@ -270,72 +313,29 @@ private CompletableFuture<StoredSchema> getSchema(String schemaId, long version)

@NotNull
private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] hash) {
return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {

if (optLocatorEntry.isPresent()) {
return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry ->
putSchema(schemaId, data, hash, optLocatorEntry));
}

SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] hash,
Optional<LocatorEntry> optLocatorEntry) {
if (optLocatorEntry.isPresent()) {

if (log.isDebugEnabled()) {
log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
}
SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;

//don't check the schema whether already exist
return readSchemaEntry(locator.getIndexList().get(0).getPosition())
.thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId,
locator.getIndexList(), data).thenCompose(
position -> {
CompletableFuture<Long> future = new CompletableFuture<>();
updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
if (ex.getCause() instanceof BadVersionException) {
// There was a race condition on the schema creation.
// Since it has now been created,
// retry the whole operation so that we have a chance to
// recover without bubbling error
putSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex2 -> {
future.completeExceptionally(ex2);
return null;
});
} else {
// For other errors, just fail the operation
future.completeExceptionally(ex);
}
return null;
});
return future;
})
);
} else {
// No schema was defined yet
CompletableFuture<Long> future = new CompletableFuture<>();
createNewSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException
|| ex.getCause() instanceof BadVersionException) {
// There was a race condition on the schema creation. Since it has now been created,
// retry the whole operation so that we have a chance to recover without bubbling error
// back to producer/consumer
putSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex2 -> {
future.completeExceptionally(ex2);
return null;
});
} else {
// For other errors, just fail the operation
future.completeExceptionally(ex);
}
return null;
});

return future;
if (log.isDebugEnabled()) {
log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
}
});

//don't check the schema whether already exist
return readSchemaEntry(locator.getIndexList().get(0).getPosition())
.thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId,
locator.getIndexList(), data).thenCompose(
position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash))
);
} else {
return createNewSchema(schemaId, data, hash);
}
}

private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, byte[] hash) {
Expand Down Expand Up @@ -471,7 +471,21 @@ private CompletableFuture<Long> updateSchemaLocator(
.addAllIndex(indexList)
.build()
, locatorEntry.version
).thenApply(ignore -> nextVersion);
).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn("[{}] Failed to update schema locator with position {}", schemaId, position, cause);
if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() {
@Override
public void deleteComplete(int rc, Object ctx) {
if (rc != BKException.Code.OK) {
log.warn("[{}] Failed to delete ledger {} after updating schema locator failed, rc: {}",
schemaId, position.getLedgerId(), rc);
}
}
}, null);
}
});
}

@NotNull
Expand Down
Loading

0 comments on commit a57c20a

Please sign in to comment.