Skip to content

Commit

Permalink
Merge pull request #11 from AutoMQ/develop_merge
Browse files Browse the repository at this point in the history
merge from AutoMQ for Apache Kafka
  • Loading branch information
superhx authored Aug 22, 2023
2 parents e32cce3 + 8ca1527 commit af0be94
Show file tree
Hide file tree
Showing 15 changed files with 297 additions and 69 deletions.
42 changes: 42 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Build
on:
pull_request:
types: [opened, reopened, synchronize]
push:
branches: ["master", "develop"]

jobs:
paths-filter:
runs-on: ubuntu-latest
outputs:
es-unit-test: ${{ steps.filter.outputs.es-unit-test }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
es-unit-test:
- '.github/workflows/**'
- 'core/**'
- 'metadata/**'
es-unit-test:
needs: [paths-filter]
if: ${{ needs.paths-filter.outputs.es-unit-test == 'true' || github.event_name == 'push' }}
uses: ./.github/workflows/es_unit_tests.yml
build-result:
runs-on: ubuntu-latest
needs: [es-unit-test]
if: ${{ always() }}
steps:
- uses: actions/checkout@v3
- name: Collect build result
run: |
if echo es-unit-test-${{ needs.es-unit-test.result }} | grep -E 'cancelled|failure' -o > null
then
echo "There are failed/cancelled builds"
exit 1
else
echo "All builds are successful/skipped"
exit 0
fi
35 changes: 35 additions & 0 deletions .github/workflows/es_unit_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
# This workflow will build a Java project with Gradle and cache/restore any dependencies to improve the workflow execution time
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-gradle

name: ES Unit Tests

on:
workflow_call:

permissions:
contents: read

jobs:
build:
name: "${{ matrix.os }}, jdk-${{ matrix.jdk }}"
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-22.04]
jdk: [11, 17]
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up JDK ${{ matrix.jdk }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.jdk }}
distribution: "zulu"
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Execute Gradle build
run: ./gradlew metadata:esUnitTest core:esUnitTest
17 changes: 17 additions & 0 deletions .github/workflows/validate_pr_title.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: "Lint PR"

on:
pull_request_target:
types:
- opened
- edited
- synchronize

jobs:
main:
name: Validate PR title
runs-on: ubuntu-latest
steps:
- uses: amannn/action-semantic-pull-request@v5
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors.es;

import org.apache.kafka.common.errors.RetriableException;

/**
* Indicates that the fetch request was too slow to be served. The request should be served in separated thread pool.
*/
public class SlowFetchHintException extends RetriableException {
private static final long serialVersionUID = 1L;
public SlowFetchHintException() { super();}

public SlowFetchHintException(String message) { super(message); }

public SlowFetchHintException(Throwable cause) { super(cause); }

public SlowFetchHintException(String message, Throwable cause) { super(message, cause); }

}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS
protected var _maxEntries: Int = adjustedMaxIndexSize / entrySize

@volatile
protected var _entries: Int = (stream.nextOffset() / entrySize).toInt
protected var _entries: Int = stream.nextOffset().toInt

@volatile
protected var cache: MappedByteBuffer = {
Expand Down
37 changes: 34 additions & 3 deletions core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.Stream;
import com.automq.elasticstream.client.api.StreamClient;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -123,6 +127,8 @@ private void openStream0(long streamId, OpenStreamOptions options, CompletableFu
static class StreamImpl implements Stream {
private final Stream stream;
private volatile boolean closed = false;
private final Map<String, Boolean> slowFetchingOffsetMap = new ConcurrentHashMap<>();
private final long SLOW_FETCH_TIMEOUT_MILLIS = 10;

public StreamImpl(Stream stream) {
this.stream = stream;
Expand Down Expand Up @@ -160,22 +166,47 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint) {
String slowFetchKey = startOffset + "-" + endOffset;
CompletableFuture<FetchResult> cf = new CompletableFuture<>();
fetch0(startOffset, endOffset, maxBytesHint, cf);
// If it is recorded as slowFetching, then skip timeout check.
if (slowFetchingOffsetMap.containsKey(slowFetchKey)) {
fetch0(startOffset, endOffset, maxBytesHint, cf, slowFetchKey);
} else {
// Try to have a quick stream. If fetching is timeout, then complete with SlowFetchHintException.
stream.fetch(startOffset, endOffset, maxBytesHint)
.orTimeout(SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
.whenComplete((rst, ex) -> FutureUtil.suppress(() -> {
if (ex != null) {
if (closed) {
cf.completeExceptionally(new IllegalStateException("stream already closed"));
} else if (ex instanceof TimeoutException){
LOGGER.info("Fetch stream[{}] [{},{}) timeout for {} ms, retry with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(new SlowFetchHintException("fetch data too slowly, retry with slow fetching"));
slowFetchingOffsetMap.put(slowFetchKey, true);
} else {
cf.completeExceptionally(ex);
}
} else {
slowFetchingOffsetMap.remove(slowFetchKey);
cf.complete(rst);
}
}, LOGGER));
}
return cf;
}

private void fetch0(long startOffset, long endOffset, int maxBytesHint, CompletableFuture<FetchResult> cf) {
private void fetch0(long startOffset, long endOffset, int maxBytesHint, CompletableFuture<FetchResult> cf, String slowFetchKey) {
stream.fetch(startOffset, endOffset, maxBytesHint).whenCompleteAsync((rst, ex) -> {
FutureUtil.suppress(() -> {
if (ex != null) {
LOGGER.error("Fetch stream[{}] [{},{}) fail, retry later", streamId(), startOffset, endOffset);
if (!closed) {
FETCH_RETRY_SCHEDULER.schedule(() -> fetch0(startOffset, endOffset, maxBytesHint, cf), 3, TimeUnit.SECONDS);
FETCH_RETRY_SCHEDULER.schedule(() -> fetch0(startOffset, endOffset, maxBytesHint, cf, slowFetchKey), 3, TimeUnit.SECONDS);
} else {
cf.completeExceptionally(new IllegalStateException("stream already closed"));
}
} else {
slowFetchingOffsetMap.remove(slowFetchKey);
cf.complete(rst);
}
}, LOGGER);
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.Utils;

public class DefaultElasticStreamSlice implements ElasticStreamSlice {
/**
Expand Down Expand Up @@ -71,11 +74,17 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
}

@Override
public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) {
public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) throws SlowFetchHintException {
long fixedStartOffset = Utils.max(startOffset, 0);
try {
return stream.fetch(startOffsetInStream + startOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get();
} catch (Throwable e) {
// TODO: specific exception
return stream.fetch(startOffsetInStream + fixedStartOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof SlowFetchHintException) {
throw (SlowFetchHintException)(e.getCause());
} else {
throw new RuntimeException(e.getCause());
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.ConvertedRecords;
Expand Down Expand Up @@ -94,15 +95,15 @@ public long appendedOffset() {
return nextOffset.get() - baseOffset;
}

public Records read(long startOffset, long maxOffset, int maxSize) {
public Records read(long startOffset, long maxOffset, int maxSize) throws SlowFetchHintException {
if (ReadManualReleaseHint.isMarked()) {
return readAll0(startOffset, maxOffset, maxSize);
} else {
return new BatchIteratorRecordsAdaptor(this, startOffset, maxOffset, maxSize);
}
}

private Records readAll0(long startOffset, long maxOffset, int maxSize) {
private Records readAll0(long startOffset, long maxOffset, int maxSize) throws SlowFetchHintException {
int readSize = 0;
long nextFetchOffset = startOffset - baseOffset;
long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/es/ElasticLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ object ElasticLogManager {
.build())
INSTANCE = Some(new ElasticLogManager(streamClient))
} else if (endpoint.startsWith(MEMORY_ENDPOINT_PREFIX)) {
INSTANCE = Some(new ElasticLogManager(new MemoryClient()))
val streamClient = new AlwaysSuccessClient(new MemoryClient())
INSTANCE = Some(new ElasticLogManager(streamClient))
} else if (endpoint.startsWith(REDIS_ENDPOINT_PREFIX)) {
INSTANCE = Some(new ElasticLogManager(new ElasticRedisClient(endpoint.substring(REDIS_ENDPOINT_PREFIX.length))))
} else {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/log/es/ElasticStreamSlice.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.automq.elasticstream.client.api.Stream;

import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.errors.es.SlowFetchHintException;

/**
* Elastic stream slice is a slice from elastic stream, the position of slice is start from 0.
* Elastic stream slice is a slice from elastic stream, the offset of slice starts from 0.
* In the same time, there is only one writable slice in a stream, and the writable slice is always the last slice.
*/
public interface ElasticStreamSlice {
Expand All @@ -45,9 +46,9 @@ public interface ElasticStreamSlice {
* @param maxBytesHint max fetch data size hint, the real return data size may be larger than maxBytesHint.
* @return {@link FetchResult}
*/
FetchResult fetch(long startOffset, long endOffset, int maxBytesHint);
FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) throws SlowFetchHintException;

default FetchResult fetch(long startOffset, long endOffset) {
default FetchResult fetch(long startOffset, long endOffset) throws SlowFetchHintException {
return fetch(startOffset, endOffset, (int) (endOffset - startOffset));
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier,
stream = streamSliceSupplier.reset()
_entries = 0
_lastEntry = lastEntryFromIndexFile
resize(maxIndexSize)
}

def truncate(): Unit = {
Expand Down
Loading

0 comments on commit af0be94

Please sign in to comment.