Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement gRPC publishBlockStream streaming of blocks #15541

Merged
merged 63 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
d4172ab
init
petreze Sep 17, 2024
cc7c8ad
trakc the blocknumber within the writer
petreze Sep 18, 2024
58fb6d4
partial
petreze Sep 19, 2024
2c8c020
grpc connection established
petreze Sep 20, 2024
96668f3
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Sep 23, 2024
b0177a7
fix unit tests
petreze Sep 24, 2024
715e12c
add check unit tests
petreze Sep 24, 2024
de48e1a
close the ManagedChannel explicitly on closing the block
petreze Sep 25, 2024
d5620c4
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Sep 25, 2024
f7a45dc
tweak blockstream config
petreze Sep 25, 2024
e7dd403
tweak blockstream config
petreze Sep 25, 2024
846a20b
change config
petreze Sep 27, 2024
3868cb4
change config
petreze Sep 27, 2024
04bf960
change config
petreze Sep 27, 2024
d1f8786
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Sep 30, 2024
094b179
remove unused config
petreze Sep 30, 2024
4f33676
Merge remote-tracking branch 'origin/15526-blockstreams-grpc' into 15…
petreze Oct 1, 2024
0500e32
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 1, 2024
d3c5735
helidon testing
petreze Oct 1, 2024
dd45f9e
potential fix
petreze Oct 1, 2024
217bb15
spotless
petreze Oct 1, 2024
4f725f4
spotless
petreze Oct 1, 2024
f2eb468
Merge branch '15526-blockstreams-grpc' into grpc-test
petreze Oct 1, 2024
9f2b5fc
Merge branch 'develop' into grpc-test
petreze Oct 2, 2024
d158dd0
partial
petreze Oct 3, 2024
9aab9dd
address PR comments
petreze Oct 3, 2024
9945d57
fix tests
petreze Oct 4, 2024
54f2c33
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 4, 2024
49c062d
Merge branch '15526-blockstreams-grpc' into grpc-test
petreze Oct 4, 2024
e7de0f0
partial
petreze Oct 4, 2024
b87dde5
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 4, 2024
c0eab09
revert configs
petreze Oct 4, 2024
9c59d72
fix unit test
petreze Oct 4, 2024
8356a8c
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 4, 2024
9ffbe68
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 7, 2024
131ddb7
Merge branch '15526-blockstreams-grpc' into grpc-test
petreze Oct 7, 2024
eed0ef4
use helidon for grpc streaming
petreze Oct 7, 2024
7e347b7
use helidon for grpc streaming
petreze Oct 7, 2024
79e3a15
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 9, 2024
905b05a
Merge branch '15526-blockstreams-grpc' into blockstreams-helidon-grpc
petreze Oct 9, 2024
5892c56
use BlockStreamServiceGrpc for the method name
petreze Oct 9, 2024
9d2a0c4
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 11, 2024
c337b55
change to more reasonable documentation
petreze Oct 11, 2024
7bbfadd
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 11, 2024
9bcc966
Merge branch 'blockstreams-helidon-grpc' into 15526-blockstreams-grpc
petreze Oct 11, 2024
c54a1fd
spotless
petreze Oct 13, 2024
a98ff4a
fix module imports
petreze Oct 14, 2024
9ba353e
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 14, 2024
68182e6
remove unit tests which should be E2E
petreze Oct 14, 2024
d9cb76f
add some error handling on bn response
petreze Oct 14, 2024
9cf3e70
tweak
petreze Oct 15, 2024
9c9b5aa
tweakremove java logging from jpms-modules
petreze Oct 16, 2024
2ed6c83
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 16, 2024
98ed530
minor
petreze Oct 17, 2024
659e1d8
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 17, 2024
e51b361
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 18, 2024
af37011
fix after develop update
petreze Oct 18, 2024
60c9c6c
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 18, 2024
c61841a
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 21, 2024
14d3e4f
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 22, 2024
95ac65e
address comment for more readability
petreze Oct 23, 2024
577dbb4
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 29, 2024
15d0d42
Merge branch 'develop' into 15526-blockstreams-grpc
petreze Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ extraJavaModuleInfo {
module("io.grpc:grpc-util", "io.grpc.util")
module("io.grpc:grpc-protobuf", "io.grpc.protobuf")
module("io.grpc:grpc-protobuf-lite", "io.grpc.protobuf.lite")
module("io.helidon.common:helidon-common", "io.helidon.common") {
exportAllPackages()
patchRealModule()
}
module("io.helidon.webclient:helidon-webclient", "io.helidon.webclient") {
requireAllDefinedDependencies()
patchRealModule()
}
module("io.helidon.webclient:helidon-webclient-grpc", "io.helidon.webclient.grpc") {
exportAllPackages()
requireAllDefinedDependencies()
patchRealModule()
}
petreze marked this conversation as resolved.
Show resolved Hide resolved
module("com.github.spotbugs:spotbugs-annotations", "com.github.spotbugs.annotations")
module("com.google.code.findbugs:jsr305", "java.annotation")
module("com.google.protobuf:protobuf-java", "com.google.protobuf") {
Expand Down
3 changes: 3 additions & 0 deletions hapi/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
exports com.hedera.hapi.block.stream.schema;
exports com.hedera.hapi.node.state.tss;
exports com.hedera.hapi.services.auxiliary.tss;
exports com.hedera.hapi.block.protoc;
exports com.hedera.hapi.block.stream.protoc;
exports com.hedera.hapi.block;

requires transitive com.google.common;
requires transitive com.google.protobuf;
Expand Down
9 changes: 9 additions & 0 deletions hedera-dependency-versions/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ dependencies {
}

dependencies.constraints {
api("io.helidon.common:helidon-common:4.1.1") {
because("io.helidon.common")
}
api("io.helidon.webclient:helidon-webclient:4.1.1") {
because("io.helidon.webclient")
}
api("io.helidon.webclient:helidon-webclient-grpc:4.1.1") {
because("io.helidon.webclient.grpc")
}
jsync-swirlds marked this conversation as resolved.
Show resolved Hide resolved
api("org.awaitility:awaitility:4.2.0") {
because("awaitility")
}
Expand Down
3 changes: 3 additions & 0 deletions hedera-node/hedera-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ mainModuleInfo {
// This is needed to pick up and include the native libraries for the netty epoll transport
runtimeOnly("io.netty.transport.epoll.linux.x86_64")
runtimeOnly("io.netty.transport.epoll.linux.aarch_64")
runtimeOnly("io.helidon.grpc.core")
runtimeOnly("io.helidon.webclient")
runtimeOnly("io.helidon.webclient.grpc")
}

testModuleInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface BlockItemWriter {
* Writes a serialized item to the destination stream.
*
* @param bytes the serialized item to write
* @return the block item writer
*/
default BlockItemWriter writePbjItem(@NonNull final Bytes bytes) {
requireNonNull(bytes);
Expand All @@ -47,13 +48,15 @@ default BlockItemWriter writePbjItem(@NonNull final Bytes bytes) {
* Writes a serialized item to the destination stream.
*
* @param bytes the serialized item to write
* @return the block item writer
*/
BlockItemWriter writeItem(@NonNull byte[] bytes);

/**
* Writes a pre-serialized sequence of items to the destination stream.
*
* @param data the serialized item to write
* @return the block item writer
*/
BlockItemWriter writeItems(@NonNull BufferedData data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.hedera.node.app.blocks.impl.BlockStreamManagerImpl;
import com.hedera.node.app.blocks.impl.FileBlockItemWriter;
import com.hedera.node.app.blocks.impl.GrpcBlockItemWriter;
import com.hedera.node.config.ConfigProvider;
import com.hedera.node.config.data.BlockStreamConfig;
import com.swirlds.state.spi.info.NodeInfo;
Expand Down Expand Up @@ -45,7 +46,7 @@
final var blockStreamConfig = config.getConfigData(BlockStreamConfig.class);
return switch (blockStreamConfig.writerMode()) {
case FILE -> () -> new FileBlockItemWriter(configProvider, selfNodeInfo, fileSystem);
case GRPC -> throw new IllegalArgumentException("gRPC block writer not yet implemented");
case GRPC -> () -> new GrpcBlockItemWriter(blockStreamConfig);

Check warning on line 49 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/BlockStreamModule.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/BlockStreamModule.java#L49

Added line #L49 was not covered by tests
};
}
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.hedera.node.config.data.BlockRecordStreamConfig;
import com.hedera.node.config.data.BlockStreamConfig;
import com.hedera.node.config.data.VersionConfig;
import com.hedera.node.config.types.BlockStreamWriterMode;
import com.hedera.pbj.runtime.io.buffer.BufferedData;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.config.api.Configuration;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class BlockStreamManagerImpl implements BlockStreamManager {
private static final Logger log = LogManager.getLogger(BlockStreamManagerImpl.class);

private final int roundsPerBlock;
private final BlockStreamWriterMode streamWriterType;
private final int hashCombineBatchSize;
private final int serializationBatchSize;
private final TssBaseService tssBaseService;
Expand Down Expand Up @@ -174,6 +176,7 @@ public BlockStreamManagerImpl(
this.hapiVersion = hapiVersionFrom(config);
final var blockStreamConfig = config.getConfigData(BlockStreamConfig.class);
this.roundsPerBlock = blockStreamConfig.roundsPerBlock();
this.streamWriterType = blockStreamConfig.writerMode();
this.hashCombineBatchSize = blockStreamConfig.hashCombineBatchSize();
this.serializationBatchSize = blockStreamConfig.serializationBatchSize();
this.blockHashManager = new BlockHashManager(config);
Expand Down Expand Up @@ -394,7 +397,10 @@ public synchronized void accept(@NonNull final byte[] message, @NonNull final by
.blockSignature(blockSignature)
.siblingHashes(siblingHashes.stream().flatMap(List::stream).toList());
final var proofItem = BlockItem.newBuilder().blockProof(proof).build();
block.writer().writePbjItem(BlockItem.PROTOBUF.toBytes(proofItem)).closeBlock();
block.writer().writePbjItem(BlockItem.PROTOBUF.toBytes(proofItem));
if (streamWriterType == BlockStreamWriterMode.FILE) {
block.writer().closeBlock();
}
petreze marked this conversation as resolved.
Show resolved Hide resolved
if (block.number() != blockNumber) {
siblingHashes.removeFirst();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.node.app.blocks.impl;

import static com.hedera.hapi.block.protoc.PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN;
import static io.grpc.Status.fromThrowable;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc;
import com.hedera.hapi.block.protoc.PublishStreamRequest;
import com.hedera.hapi.block.protoc.PublishStreamResponse;
import com.hedera.hapi.block.protoc.PublishStreamResponse.Acknowledgement;
import com.hedera.hapi.block.protoc.PublishStreamResponse.EndOfStream;
import com.hedera.hapi.block.protoc.PublishStreamResponseCode;
import com.hedera.hapi.block.stream.protoc.BlockItem;
import com.hedera.node.app.blocks.BlockItemWriter;
import com.hedera.node.config.data.BlockStreamConfig;
import com.hedera.pbj.runtime.io.buffer.BufferedData;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.helidon.common.tls.Tls;
import io.helidon.webclient.grpc.GrpcClient;
import io.helidon.webclient.grpc.GrpcClientMethodDescriptor;
import io.helidon.webclient.grpc.GrpcClientProtocolConfig;
import io.helidon.webclient.grpc.GrpcServiceClient;
import io.helidon.webclient.grpc.GrpcServiceDescriptor;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Implements the bidirectional streaming RPC for the publishBlockStream rpc in BlockStreamService
* See <a href="https://grpc.io/docs/languages/java/basics/">gRPC Basics</a>
*/
public class GrpcBlockItemWriter implements BlockItemWriter {

private static final Logger logger = LogManager.getLogger(GrpcBlockItemWriter.class);
private static final String INVALID_MESSAGE = "Invalid protocol buffer converting %s from PBJ to protoc for %s";
private static final String GRPC_END_POINT =
BlockStreamServiceGrpc.getPublishBlockStreamMethod().getBareMethodName();

private StreamObserver<PublishStreamRequest> requestObserver;
private final GrpcServiceClient grpcServiceClient;
private long blockNumber;

/** The state of this writer */
private State state = State.UNINITIALIZED;

/**
* The current state of the gRPC writer.
*/
public enum State {
/**
* The gRPC client is not initialized.
*/
UNINITIALIZED,
/**
* The gRPC client is currently open and blocks can be streamed.
*/
OPEN,
/**
* The gRPC client is already closed and cannot be used to stream blocks.
*/
CLOSED
}

/**
* @param blockStreamConfig the block stream configuration
*/
public GrpcBlockItemWriter(@NonNull final BlockStreamConfig blockStreamConfig) {
requireNonNull(blockStreamConfig, "The supplied argument 'blockStreamConfig' cannot be null!");
GrpcClient client;
try {
client = GrpcClient.builder()
.tls(Tls.builder().enabled(false).build())
.baseUri(new URI(
null, null, blockStreamConfig.address(), blockStreamConfig.port(), null, null, null))
.protocolConfig(GrpcClientProtocolConfig.builder()
.abortPollTimeExpired(false)
.build())
.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);

Check warning on line 101 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L100-L101

Added lines #L100 - L101 were not covered by tests
}
grpcServiceClient = client.serviceClient(GrpcServiceDescriptor.builder()
.serviceName(BlockStreamServiceGrpc.SERVICE_NAME)
.putMethod(
GRPC_END_POINT,
GrpcClientMethodDescriptor.bidirectional(BlockStreamServiceGrpc.SERVICE_NAME, GRPC_END_POINT)
.requestType(PublishStreamRequest.class)
.responseType(PublishStreamResponse.class)
.build())
.build());
}

@Override
public void openBlock(long blockNumber) {
if (state != State.UNINITIALIZED) throw new IllegalStateException("GrpcBlockItemWriter initialized twice");

if (blockNumber < 0) throw new IllegalArgumentException("Block number must be non-negative");
this.blockNumber = blockNumber;
requestObserver = grpcServiceClient.bidi(GRPC_END_POINT, new StreamObserver<PublishStreamResponse>() {

Check warning on line 120 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L119-L120

Added lines #L119 - L120 were not covered by tests
@Override
public void onNext(PublishStreamResponse streamResponse) {
if (streamResponse.hasAcknowledgement()) {
final Acknowledgement acknowledgement = streamResponse.getAcknowledgement();

Check warning on line 124 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L124

Added line #L124 was not covered by tests
if (acknowledgement.hasBlockAck()) {
logger.info("PublishStreamResponse: a full block received: {}", acknowledgement.getBlockAck());

Check warning on line 126 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L126

Added line #L126 was not covered by tests
} else if (acknowledgement.hasItemAck()) {
petreze marked this conversation as resolved.
Show resolved Hide resolved
logger.info(

Check warning on line 128 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L128

Added line #L128 was not covered by tests
"PublishStreamResponse: a single block item is received: {}",
acknowledgement.getItemAck());

Check warning on line 130 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L130

Added line #L130 was not covered by tests
}
} else if (streamResponse.hasStatus()) {
final EndOfStream endOfStream = streamResponse.getStatus();

Check warning on line 133 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L133

Added line #L133 was not covered by tests
if (endOfStream.getStatus().equals(STREAM_ITEMS_UNKNOWN)) {
logger.info(

Check warning on line 135 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L135

Added line #L135 was not covered by tests
"Error returned from block node at block number {}: {}",
endOfStream.getBlockNumber(),

Check warning on line 137 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L137

Added line #L137 was not covered by tests
endOfStream);
onNext(buildErrorResponse(STREAM_ITEMS_UNKNOWN));

Check warning on line 139 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L139

Added line #L139 was not covered by tests
}
}
}

Check warning on line 142 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L142

Added line #L142 was not covered by tests

@Override
public void onError(Throwable t) {
// Maybe this should be considered in this case:
// https://github.com/hashgraph/hedera-services/issues/15530
final Status status = fromThrowable(t);
logger.error("error occurred with an exception: ", status.toString());
requestObserver.onError(t);
}

Check warning on line 151 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L148-L151

Added lines #L148 - L151 were not covered by tests

@Override
public void onCompleted() {
logger.info("PublishStreamResponse completed");
requestObserver.onCompleted();
}

Check warning on line 157 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L155-L157

Added lines #L155 - L157 were not covered by tests
});
this.state = State.OPEN;
}

Check warning on line 160 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L159-L160

Added lines #L159 - L160 were not covered by tests

@Override
public BlockItemWriter writeItem(@NonNull final byte[] bytes) {
requireNonNull(bytes);
if (state != State.OPEN) {
throw new IllegalStateException(
"Cannot write to a GrpcBlockItemWriter that is not open for block: " + this.blockNumber);
}

PublishStreamRequest request = PublishStreamRequest.newBuilder().build();

Check warning on line 170 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L170

Added line #L170 was not covered by tests
try {
request = PublishStreamRequest.newBuilder()
.setBlockItem(BlockItem.parseFrom(bytes))
.build();
requestObserver.onNext(request);
} catch (IOException e) {
final String message = INVALID_MESSAGE.formatted("PublishStreamResponse", request);
throw new RuntimeException(message, e);
}
return this;

Check warning on line 180 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L172-L180

Added lines #L172 - L180 were not covered by tests
}

@Override
public BlockItemWriter writeItems(@NonNull BufferedData data) {
requireNonNull(data);

Check warning on line 185 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L185

Added line #L185 was not covered by tests
if (state != State.OPEN) {
throw new IllegalStateException(

Check warning on line 187 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L187

Added line #L187 was not covered by tests
"Cannot write to a GrpcBlockItemWriter that is not open for block: " + this.blockNumber);
}

PublishStreamRequest request = PublishStreamRequest.newBuilder().build();

Check warning on line 191 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L191

Added line #L191 was not covered by tests
try {
request = PublishStreamRequest.newBuilder()
.setBlockItem(BlockItem.parseFrom(data.asInputStream()))
.build();
requestObserver.onNext(request);
} catch (IOException e) {
final String message = INVALID_MESSAGE.formatted("PublishStreamResponse", request);
throw new RuntimeException(message, e);
}
return this;

Check warning on line 201 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L193-L201

Added lines #L193 - L201 were not covered by tests
}

@Override
public void closeBlock() {
if (state.ordinal() < State.OPEN.ordinal()) {
throw new IllegalStateException("Cannot close a GrpcBlockItemWriter that is not open");
} else if (state.ordinal() == State.CLOSED.ordinal()) {
throw new IllegalStateException("Cannot close a GrpcBlockItemWriter that is already closed");

Check warning on line 209 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L209

Added line #L209 was not covered by tests
}

requestObserver.onCompleted();
petreze marked this conversation as resolved.
Show resolved Hide resolved
this.state = State.CLOSED;
}

Check warning on line 214 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L212-L214

Added lines #L212 - L214 were not covered by tests

/**
* @return the current state of the gRPC writer
*/
@VisibleForTesting
public long getBlockNumber() {
return blockNumber;

Check warning on line 221 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L221

Added line #L221 was not covered by tests
}

/**
* @return the current state of the gRPC writer
*/
@VisibleForTesting
public State getState() {
return state;

Check warning on line 229 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L229

Added line #L229 was not covered by tests
}

/**
* @param errorCode the error code for the stream response
* @return the error stream response
*/
private PublishStreamResponse buildErrorResponse(PublishStreamResponseCode errorCode) {
final EndOfStream endOfStream =
EndOfStream.newBuilder().setStatus(errorCode).build();
return PublishStreamResponse.newBuilder().setStatus(endOfStream).build();

Check warning on line 239 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/GrpcBlockItemWriter.java#L238-L239

Added lines #L238 - L239 were not covered by tests
}
}
3 changes: 3 additions & 0 deletions hedera-node/hedera-app/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
requires com.google.protobuf;
requires io.grpc.netty;
requires io.grpc;
requires io.helidon.common.tls;
requires io.helidon.webclient.api;
requires io.helidon.webclient.grpc;
requires io.netty.handler;
requires io.netty.transport.classes.epoll;
requires io.netty.transport;
Expand Down
Loading
Loading