Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: gracefully shutdown when error encountered #152

Merged
merged 17 commits into from
Nov 6, 2024
Merged

Conversation

KeranYang
Copy link
Member

@KeranYang KeranYang commented Nov 5, 2024

Java Gracefully Shutdown Testing Notes

  1. map

Test Pipeline

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: simple-pipeline-map-error
spec:
  vertices:
    - name: in
      source:
        generator:
          rpu: 5
          duration: 1s
      scale:
        min: 1
    - name: cat       
      scale:
        min: 1
      udf:
        container:
          image: quay.io/numaio/numaflow-java/map-forward-message:keran-test-uderr
          imagePullPolicy: Always
    - name: sink
      scale:
        min: 1
      sink:
        log: {}
  edges:
    - from: in
      to: cat
    - from: cat
      to: sink

gRPC Server Log

"[mapper-akka.actor.default-dispatcher-11] ERROR io.numaproj.numaflow.mapper.MapSupervisorActor - Encountered error in mapFn - This is a test exception"
...

gRPC Client Log

{"level":"info","ts":"2024-11-06T16:21:06.625645988Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:236","msg":"Generating self-signed certificate","pipeline":"simple-pipeline-map-error","vertex":"cat"}
{"level":"info","ts":"2024-11-06T16:21:06.626895947Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"simple-pipeline-map-error","vertex":"cat"}
{"level":"info","ts":"2024-11-06T16:21:06.62704728Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:293","msg":"Start processing udf messages","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf","isbsvc":"jetstream","from":"default-simple-pipeline-map-error-cat-0","to":["default-simple-pipeline-map-error-sink-0"]}
{"level":"info","ts":"2024-11-06T16:21:06.627337947Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:121","msg":"Starting forwarder...","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf"}
{"level":"info","ts":"2024-11-06T16:21:06.627162447Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"simple-pipeline-map-error","vertex":"cat"}
{"level":"error","ts":"2024-11-06T16:21:07.49978053Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:628","msg":"mapUDF.Apply error","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).applyUDF\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:628\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).forwardAChunk\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:273\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:141"}
{"level":"error","ts":"2024-11-06T16:21:07.499879072Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:275","msg":"failed to applyUDF","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).forwardAChunk\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:275\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:141"}
2024/11/06 16:21:07 failed c.grpcClt.MapFn stream.Recv: NonRetryable: This is a test exception
{"level":"error","ts":"2024-11-06T16:21:07.502405447Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:142","msg":"Failed to forward a chunk","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:142"}
{"level":"info","ts":"2024-11-06T16:21:07.502558364Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:155","msg":"Closed buffer reader","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf","bufferFrom":"default-simple-pipeline-map-error-cat-0"}
{"level":"info","ts":"2024-11-06T16:21:07.502586655Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:162","msg":"Closed partition writer","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf","bufferTo":"default-simple-pipeline-map-error-sink-0"}
{"level":"error","ts":"2024-11-06T16:21:07.502623447Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:305","msg":"Map forwarder stopped with error","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf","fromPartition":"default-simple-pipeline-map-error-cat-0","error":"gRPC client.MapFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf.(*MapUDFProcessor).Start.func2\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/map_udf.go:305"}
{"level":"info","ts":"2024-11-06T16:21:07.502655239Z","logger":"numaflow.MapUDF-processor","caller":"publish/publisher.go:286","msg":"Closing watermark publisher","pipeline":"simple-pipeline-map-error","vertex":"cat","entityID":"simple-pipeline-map-error-cat-0","otStore":"default-simple-pipeline-map-error-cat-sink_OT","hbStore":"default-simple-pipeline-map-error-cat-sink_PROCESSORS"}
{"level":"info","ts":"2024-11-06T16:21:07.502742947Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"simple-pipeline-map-error","vertex":"cat","kvName":"default-simple-pipeline-map-error-in-cat_OT","watcher":"default-simple-pipeline-map-error-in-cat_OT"}
{"level":"info","ts":"2024-11-06T16:21:07.502796822Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"simple-pipeline-map-error","vertex":"cat","kvName":"default-simple-pipeline-map-error-in-cat_PROCESSORS","watcher":"default-simple-pipeline-map-error-in-cat_PROCESSORS"}
{"level":"info","ts":"2024-11-06T16:21:07.510396072Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:172","msg":"WatchAll successfully stopped","pipeline":"simple-pipeline-map-error","vertex":"cat","kvName":"default-simple-pipeline-map-error-in-cat_PROCESSORS","watcher":"default-simple-pipeline-map-error-in-cat_PROCESSORS"}
{"level":"info","ts":"2024-11-06T16:21:07.51126953Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:172","msg":"WatchAll successfully stopped","pipeline":"simple-pipeline-map-error","vertex":"cat","kvName":"default-simple-pipeline-map-error-in-cat_OT","watcher":"default-simple-pipeline-map-error-in-cat_OT"}
{"level":"info","ts":"2024-11-06T16:21:07.52305453Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:363","msg":"All udf data processors exited...","pipeline":"simple-pipeline-map-error","vertex":"cat","protocol":"uds-grpc-map-udf"}
{"level":"info","ts":"2024-11-06T16:21:07.523182947Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:293","msg":"Metrics server shutdown","pipeline":"simple-pipeline-map-error","vertex":"cat"}

build

mvn clean install && cd examples && mvn clean install -DskipTests=true && docker tag numaflow-java-examples/map-flatmap:stable quay.io/numaio/numaflow-java/map-forward-message:keran-test-uderr && docker push quay.io/numaio/numaflow-java/map-forward-message:keran-test-uderr
  1. map stream

Test Pipeline

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: map-stream-error-pipeline
spec:
  vertices:
    - name: in
      source:
        generator:
          rpu: 5
          duration: 1s
      scale:
        min: 1
    - name: cat       
      scale:
        min: 1
      udf:
        container:
          image: quay.io/numaio/numaflow-java/flat-map-stream:keran-test-uderr
          imagePullPolicy: Always
    - name: sink
      scale:
        min: 1
      sink:
        log: {}
  edges:
    - from: in
      to: cat
    - from: cat
      to: sink

gRPC Server Log

[main] INFO io.numaproj.numaflow.shared.GrpcServerUtils - Writing server info io.numaproj.numaflow.info.ServerInfo@4fb61f4a to /var/run/numaflow/mapper-server-info
[main] INFO io.numaproj.numaflow.mapstreamer.Server - server started, listening on socket path: /var/run/numaflow/mapstream.sock
[main] INFO io.numaproj.numaflow.mapstreamer.Server - sink server is waiting for termination
[netty-boss] WARN io.netty.bootstrap.ServerBootstrap - Unknown channel option 'SO_KEEPALIVE' for channel '[id: 0x65332952, L:/var/run/numaflow/mapstream.sock - R:]'
[grpc-default-executor-0] ERROR io.numaproj.numaflow.mapstreamer.Service - Encountered error in mapFn onNext - This is a test exception
*** shutting down map streamer gRPC server because of an exception - This is a test exception
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.mapstreamer.Server - stopping server
[main] INFO io.numaproj.numaflow.mapstreamer.Server - sink server has terminated
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.mapstreamer.Server - gracefully shutting down event loop groups
*** shutting down map streamer gRPC server since JVM is shutting down

gRPC Client Log

{"level":"warn","ts":"2024-11-06T16:59:25.444778001Z","logger":"numaflow.MapUDF-processor","caller":"mapper/client.go:77","msg":"Mapper server is not ready: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial unix /var/run/numaflow/mapstream.sock: connect: no such file or directory\"","pipeline":"map-stream-error-pipeline","vertex":"cat"}
{"level":"warn","ts":"2024-11-06T16:59:25.545206376Z","logger":"numaflow.MapUDF-processor","caller":"mapper/client.go:77","msg":"Mapper server is not ready: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial unix /var/run/numaflow/mapstream.sock: connect: no such file or directory\"","pipeline":"map-stream-error-pipeline","vertex":"cat"}
{"level":"info","ts":"2024-11-06T16:59:26.044606418Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:236","msg":"Generating self-signed certificate","pipeline":"map-stream-error-pipeline","vertex":"cat"}
{"level":"info","ts":"2024-11-06T16:59:26.044896043Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:293","msg":"Start processing udf messages","pipeline":"map-stream-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","isbsvc":"jetstream","from":"default-map-stream-error-pipeline-cat-0","to":["default-map-stream-error-pipeline-sink-0"]}
{"level":"info","ts":"2024-11-06T16:59:26.045080001Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:121","msg":"Starting forwarder...","pipeline":"map-stream-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf"}
{"level":"info","ts":"2024-11-06T16:59:26.045702793Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"map-stream-error-pipeline","vertex":"cat"}
{"level":"info","ts":"2024-11-06T16:59:26.04581921Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"map-stream-error-pipeline","vertex":"cat"}
{"level":"error","ts":"2024-11-06T16:59:26.340153168Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:258","msg":"failed to streamMessage","pipeline":"map-stream-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","error":"failed to applyUDF, error: gRPC client.MapStreamFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).forwardAChunk\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:258\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:141"}
2024/11/06 16:59:26 failed c.grpcClt.MapStreamFn: NonRetryable: This is a test exception
{"level":"error","ts":"2024-11-06T16:59:26.340317043Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:142","msg":"Failed to forward a chunk","pipeline":"map-stream-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","error":"failed to applyUDF, error: gRPC client.MapStreamFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:142"}
{"level":"info","ts":"2024-11-06T16:59:26.340433668Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:155","msg":"Closed buffer reader","pipeline":"map-stream-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","bufferFrom":"default-map-stream-error-pipeline-cat-0"}
{"level":"info","ts":"2024-11-06T16:59:26.340449376Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:162","msg":"Closed partition writer","pipeline":"map-stream-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","bufferTo":"default-map-stream-error-pipeline-sink-0"}
{"level":"error","ts":"2024-11-06T16:59:26.340574085Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:305","msg":"Map forwarder stopped with error","pipeline":"map-stream-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","fromPartition":"default-map-stream-error-pipeline-cat-0","error":"failed to applyUDF, error: gRPC client.MapStreamFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf.(*MapUDFProcessor).Start.func2\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/map_udf.go:305"}
{"level":"info","ts":"2024-11-06T16:59:26.340626793Z","logger":"numaflow.MapUDF-processor","caller":"publish/publisher.go:286","msg":"Closing watermark publisher","pipeline":"map-stream-error-pipeline","vertex":"cat","entityID":"map-stream-error-pipeline-cat-0","otStore":"default-map-stream-error-pipeline-cat-sink_OT","hbStore":"default-map-stream-error-pipeline-cat-sink_PROCESSORS"}
{"level":"info","ts":"2024-11-06T16:59:26.34067496Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"map-stream-error-pipeline","vertex":"cat","kvName":"default-map-stream-error-pipeline-in-cat_OT","watcher":"default-map-stream-error-pipeline-in-cat_OT"}
{"level":"info","ts":"2024-11-06T16:59:26.340795085Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"map-stream-error-pipeline","vertex":"cat","kvName":"default-map-stream-error-pipeline-in-cat_PROCESSORS","watcher":"default-map-stream-error-pipeline-in-cat_PROCESSORS"}
{"level":"info","ts":"2024-11-06T16:59:26.343650418Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:172","msg":"WatchAll successfully stopped","pipeline":"map-stream-error-pipeline","vertex":"cat","kvName":"default-map-stream-error-pipeline-in-cat_OT","watcher":"default-map-stream-error-pipeline-in-cat_OT"}
{"level":"info","ts":"2024-11-06T16:59:26.343721626Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:172","msg":"WatchAll successfully stopped","pipeline":"map-stream-error-pipeline","vertex":"cat","kvName":"default-map-stream-error-pipeline-in-cat_PROCESSORS","watcher":"default-map-stream-error-pipeline-in-cat_PROCESSORS"}
{"level":"info","ts":"2024-11-06T16:59:26.344832168Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:363","msg":"All udf data processors exited...","pipeline":"map-stream-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf"}
{"level":"info","ts":"2024-11-06T16:59:26.344924668Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:293","msg":"Metrics server shutdown","pipeline":"map-stream-error-pipeline","vertex":"cat"}

build

mvn clean install && cd examples && mvn clean install -DskipTests=true && docker tag numaflow-java-examples/flat-map-stream:stable quay.io/numaio/numaflow-java/flat-map-stream:keran-test-uderr && docker push quay.io/numaio/numaflow-java/flat-map-stream:keran-test-uderr
  1. batch map

Test Pipeline

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: map-batch-error-pipeline
spec:
  vertices:
    - name: in
      source:
        generator:
          rpu: 5
          duration: 1s
      scale:
        min: 1
    - name: cat
      limits:
        readBatchSize: 1       
      scale:
        min: 1
      udf:
        container:
          image: quay.io/numaio/numaflow-java/batch-map-flatmap:keran-test-uderr
          imagePullPolicy: Always
    - name: sink
      scale:
        min: 1
      sink:
        log: {}
  edges:
    - from: in
      to: cat
    - from: cat
      to: sink

gRPC Server Log

[main] INFO io.numaproj.numaflow.shared.GrpcServerUtils - Writing server info io.numaproj.numaflow.info.ServerInfo@4cc451f2 to /var/run/numaflow/mapper-server-info
[main] INFO io.numaproj.numaflow.batchmapper.Server - server started, listening on socket path: /var/run/numaflow/batchmap.sock
[main] INFO io.numaproj.numaflow.batchmapper.Server - batch map server is waiting for termination
[netty-boss] WARN io.netty.bootstrap.ServerBootstrap - Unknown channel option 'SO_KEEPALIVE' for channel '[id: 0x9a652f54, L:/var/run/numaflow/batchmap.sock - R:]'
[grpc-default-executor-0] ERROR io.numaproj.numaflow.batchmapper.Service - Encountered an error in batch map onNext - java.lang.RuntimeException: This is a test exception
*** shutting down batch map gRPC server because of an exception - java.lang.RuntimeException: This is a test exception
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.batchmapper.Server - stopping server
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.batchmapper.Service - BatchMap executor was terminated.
[main] INFO io.numaproj.numaflow.batchmapper.Server - batch map server has terminated
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.batchmapper.Server - gracefully shutting down event loop groups
*** shutting down gRPC server since JVM is shutting down

gRPC Client Log

{"level":"warn","ts":"2024-11-06T17:39:28.103529776Z","logger":"numaflow.MapUDF-processor","caller":"mapper/client.go:77","msg":"Mapper server is not ready: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial unix /var/run/numaflow/batchmap.sock: connect: no such file or directory\"","pipeline":"map-batch-error-pipeline","vertex":"cat"}
{"level":"info","ts":"2024-11-06T17:39:28.58380461Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:236","msg":"Generating self-signed certificate","pipeline":"map-batch-error-pipeline","vertex":"cat"}
{"level":"info","ts":"2024-11-06T17:39:28.584188026Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:293","msg":"Start processing udf messages","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","isbsvc":"jetstream","from":"default-map-batch-error-pipeline-cat-0","to":["default-map-batch-error-pipeline-sink-0"]}
{"level":"info","ts":"2024-11-06T17:39:28.584661401Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:121","msg":"Starting forwarder...","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf"}
{"level":"info","ts":"2024-11-06T17:39:28.586494026Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"map-batch-error-pipeline","vertex":"cat"}
{"level":"info","ts":"2024-11-06T17:39:28.586650443Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"map-batch-error-pipeline","vertex":"cat"}
2024/11/06 17:39:28 failed c.grpcClt.MapFn stream.Recv: NonRetryable: java.lang.RuntimeException: This is a test exception
{"level":"error","ts":"2024-11-06T17:39:28.891648818Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:628","msg":"mapUDF.Apply error","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: java.lang.RuntimeException: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).applyUDF\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:628\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).forwardAChunk\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:273\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:141"}
{"level":"error","ts":"2024-11-06T17:39:28.891767735Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:275","msg":"failed to applyUDF","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: java.lang.RuntimeException: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).forwardAChunk\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:275\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:141"}
{"level":"error","ts":"2024-11-06T17:39:28.891860277Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:142","msg":"Failed to forward a chunk","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: java.lang.RuntimeException: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/forward/forward.go:142"}
{"level":"info","ts":"2024-11-06T17:39:28.891892277Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:155","msg":"Closed buffer reader","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","bufferFrom":"default-map-batch-error-pipeline-cat-0"}
{"level":"info","ts":"2024-11-06T17:39:28.891898568Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:162","msg":"Closed partition writer","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","bufferTo":"default-map-batch-error-pipeline-sink-0"}
{"level":"error","ts":"2024-11-06T17:39:28.892013193Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:305","msg":"Map forwarder stopped with error","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","fromPartition":"default-map-batch-error-pipeline-cat-0","error":"gRPC client.MapFn failed, NonRetryable: java.lang.RuntimeException: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/udf.(*MapUDFProcessor).Start.func2\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/udf/map_udf.go:305"}
{"level":"info","ts":"2024-11-06T17:39:28.892068402Z","logger":"numaflow.MapUDF-processor","caller":"publish/publisher.go:286","msg":"Closing watermark publisher","pipeline":"map-batch-error-pipeline","vertex":"cat","entityID":"map-batch-error-pipeline-cat-0","otStore":"default-map-batch-error-pipeline-cat-sink_OT","hbStore":"default-map-batch-error-pipeline-cat-sink_PROCESSORS"}
{"level":"info","ts":"2024-11-06T17:39:28.892101027Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"map-batch-error-pipeline","vertex":"cat","kvName":"default-map-batch-error-pipeline-in-cat_PROCESSORS","watcher":"default-map-batch-error-pipeline-in-cat_PROCESSORS"}
{"level":"error","ts":"2024-11-06T17:39:28.892215193Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"map-batch-error-pipeline","vertex":"cat","kvName":"default-map-batch-error-pipeline-in-cat_PROCESSORS","watcher":"default-map-batch-error-pipeline-in-cat_PROCESSORS","error":"nats: invalid subscription","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(*jetStreamStore).Watch.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"}
{"level":"info","ts":"2024-11-06T17:39:28.892118027Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"map-batch-error-pipeline","vertex":"cat","kvName":"default-map-batch-error-pipeline-in-cat_OT","watcher":"default-map-batch-error-pipeline-in-cat_OT"}
{"level":"error","ts":"2024-11-06T17:39:28.892237402Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"map-batch-error-pipeline","vertex":"cat","kvName":"default-map-batch-error-pipeline-in-cat_OT","watcher":"default-map-batch-error-pipeline-in-cat_OT","error":"nats: invalid subscription","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(*jetStreamStore).Watch.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"}
{"level":"info","ts":"2024-11-06T17:39:28.895441485Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:363","msg":"All udf data processors exited...","pipeline":"map-batch-error-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf"}
{"level":"info","ts":"2024-11-06T17:39:28.895548568Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:293","msg":"Metrics server shutdown","pipeline":"map-batch-error-pipeline","vertex":"cat"}
{"level":"error","ts":"2024-11-06T17:39:28.895759485Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:69","msg":"Nats default: disconnected","pipeline":"map-batch-error-pipeline","vertex":"cat","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/clients/nats/nats_client.go:69\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:5332\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3011"}
{"level":"info","ts":"2024-11-06T17:39:28.89581236Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:63","msg":"Nats default: connection closed","pipeline":"map-batch-error-pipeline","vertex":"cat"}
{"level":"error","ts":"2024-11-06T17:39:28.895848068Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:69","msg":"Nats default: disconnected","pipeline":"map-batch-error-pipeline","vertex":"cat","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/clients/nats/nats_client.go:69\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:5332\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3011"}
{"level":"info","ts":"2024-11-06T17:39:28.895856193Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:63","msg":"Nats default: connection closed","pipeline":"map-batch-error-pipeline","vertex":"cat"}
{"level":"error","ts":"2024-11-06T17:39:28.896038402Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:69","msg":"Nats default: disconnected","pipeline":"map-batch-error-pipeline","vertex":"cat","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/clients/nats/nats_client.go:69\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:5332\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3011"}
{"level":"info","ts":"2024-11-06T17:39:28.896072985Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:63","msg":"Nats default: connection closed","pipeline":"map-batch-error-pipeline","vertex":"cat"}

build

mvn clean install && cd examples && mvn clean install -DskipTests=true && docker tag numaflow-java-examples/batch-map-flatmap:stable quay.io/numaio/numaflow-java/batch-map-flatmap:keran-test-uderr && docker push quay.io/numaio/numaflow-java/batch-map-flatmap:keran-test-uderr
  1. Transformer

Test Pipeline

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: transformer-error-pipeline
spec:
  vertices:
    - name: in
      source:
        generator:
          rpu: 5
          duration: 1s
        transformer:
          container:
            image: quay.io/numaio/numaflow-java/mapt-event-time-filter-function:keran-test-uderr
            imagePullPolicy: Always
      scale:
        min: 1
    - name: cat       
      scale:
        min: 1
      udf:
        builtin:
          name: cat
    - name: sink
      scale:
        min: 1
      sink:
        log: {}
  edges:
    - from: in
      to: cat
    - from: cat
      to: sink

gRPC Server Log

[transformer-akka.actor.default-dispatcher-5] ERROR io.numaproj.numaflow.sourcetransformer.TransformSupervisorActor - Encountered error in sourceTransformFn - 
...

gRPC Client Log

server is not ready: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial unix /var/run/numaflow/sourcetransform.sock: connect: no such file or directory\"","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"warn","ts":"2024-11-06T18:01:32.590759866Z","logger":"numaflow.Source-processor","caller":"sourcetransformer/client.go:72","msg":"Transformer server is not ready: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial unix /var/run/numaflow/sourcetransform.sock: connect: no such file or directory\"","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:01:33.054734366Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:236","msg":"Generating self-signed certificate","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:01:33.055769575Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:01:33.055818491Z","logger":"numaflow.Source-processor","caller":"sources/source.go:305","msg":"Start processing source messages","pipeline":"transformer-error-pipeline","vertex":"in","isbs":"jetstream","to":["default-transformer-error-pipeline-cat-0"]}
{"level":"info","ts":"2024-11-06T18:01:33.055924616Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:01:33.055951116Z","logger":"numaflow","caller":"forward/data_forward.go:129","msg":"Starting forwarder..."}
{"level":"error","ts":"2024-11-06T18:01:35.732851743Z","logger":"numaflow","caller":"forward/data_forward.go:328","msg":"failed to apply source transformer","error":"gRPC client.SourceTransformFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/sources/forward.(*DataForward).forwardAChunk\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/sources/forward/data_forward.go:328\ngithub.com/numaproj/numaflow/pkg/sources/forward.(*DataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/sources/forward/data_forward.go:149"}
{"level":"info","ts":"2024-11-06T18:01:35.732942743Z","logger":"numaflow","caller":"forward/data_forward.go:162","msg":"Closed source reader","sourceFrom":"in"}
{"level":"info","ts":"2024-11-06T18:01:35.732955701Z","logger":"numaflow","caller":"forward/data_forward.go:172","msg":"Closed partition writer","bufferTo":"default-transformer-error-pipeline-cat-0"}
2024/11/06 18:01:35 failed c.grpcClt.SourceTransformFn stream.Recv: NonRetryable: This is a test exception
{"level":"error","ts":"2024-11-06T18:01:35.732965159Z","logger":"numaflow.Source-processor","caller":"sources/source.go:317","msg":"Source forwarder stopped with error","pipeline":"transformer-error-pipeline","vertex":"in","error":"gRPC client.SourceTransformFn failed, NonRetryable: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/sources.(*SourceProcessor).Start\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/sources/source.go:317\ngithub.com/numaproj/numaflow/cmd/commands.NewProcessorCommand.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/cmd/commands/processor.go:86\ngithub.com/spf13/cobra.(*Command).execute\n\t/Users/kyang5/go/pkg/mod/github.com/spf13/[email protected]/command.go:985\ngithub.com/spf13/cobra.(*Command).ExecuteC\n\t/Users/kyang5/go/pkg/mod/github.com/spf13/[email protected]/command.go:1117\ngithub.com/spf13/cobra.(*Command).Execute\n\t/Users/kyang5/go/pkg/mod/github.com/spf13/[email protected]/command.go:1041\ngithub.com/numaproj/numaflow/cmd/commands.Execute\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/cmd/commands/root.go:32\nmain.main\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/cmd/main.go:24\nruntime.main\n\t/opt/homebrew/Cellar/go/1.23.2/libexec/src/runtime/proc.go:272"}
{"level":"info","ts":"2024-11-06T18:01:35.733099868Z","logger":"numaflow.Source-processor","caller":"sources/source.go:335","msg":"Exited...","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:01:35.733178659Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:293","msg":"Metrics server shutdown","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:01:35.733221701Z","logger":"numaflow.Source-processor","caller":"generator/tickgen.go:295","msg":"Context.Done is called. exiting generator loop.","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:01:35.733231701Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"transformer-error-pipeline","vertex":"in","kvName":"default-transformer-error-pipeline-in_SOURCE_PROCESSORS","watcher":"default-transformer-error-pipeline-in_SOURCE_PROCESSORS"}
{"level":"info","ts":"2024-11-06T18:01:35.733335409Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"transformer-error-pipeline","vertex":"in","kvName":"default-transformer-error-pipeline-in_SOURCE_OT","watcher":"default-transformer-error-pipeline-in_SOURCE_OT"}
{"level":"error","ts":"2024-11-06T18:01:35.733543826Z","logger":"numaflow.Source-processor","caller":"nats/nats_client.go:69","msg":"Nats default: disconnected","pipeline":"transformer-error-pipeline","vertex":"in","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/clients/nats/nats_client.go:69\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:5332\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3011"}
{"level":"info","ts":"2024-11-06T18:01:35.733608451Z","logger":"numaflow.Source-processor","caller":"nats/nats_client.go:63","msg":"Nats default: connection closed","pipeline":"transformer-error-pipeline","vertex":"in"}
{"level":"error","ts":"2024-11-06T18:01:35.733627493Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"transformer-error-pipeline","vertex":"in","kvName":"default-transformer-error-pipeline-in_SOURCE_PROCESSORS","watcher":"default-transformer-error-pipeline-in_SOURCE_PROCESSORS","error":"nats: connection closed","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(*jetStreamStore).Watch.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"}
{"level":"error","ts":"2024-11-06T18:01:35.733657118Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"transformer-error-pipeline","vertex":"in","kvName":"default-transformer-error-pipeline-in_SOURCE_OT","watcher":"default-transformer-error-pipeline-in_SOURCE_OT","error":"nats: connection closed","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(*jetStreamStore).Watch.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"}
{"level":"error","ts":"2024-11-06T18:01:35.733666784Z","logger":"numaflow.Source-processor","caller":"nats/nats_client.go:69","msg":"Nats default: disconnected","pipeline":"transformer-error-pipeline","vertex":"in","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/clients/nats/nats_client.go:69\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:5332\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3011"}
{"level":"info","ts":"2024-11-06T18:01:35.733675576Z","logger":"numaflow.Source-processor","caller":"nats/nats_client.go:63","msg":"Nats default: connection closed","pipeline":"transformer-error-pipeline","vertex":"in"}

build

mvn clean install && cd examples && mvn clean install -DskipTests=true && docker tag numaflow-java-examples/mapt-event-time-filter-function:stable quay.io/numaio/numaflow-java/mapt-event-time-filter-function:keran-test-uderr && docker push quay.io/numaio/numaflow-java/mapt-event-time-filter-function:keran-test-uderr
  1. Source

Test Pipeline

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: source-error-pipeline
spec:
  vertices:
    - name: in
      source:
        udsource:
          container:
            image: quay.io/numaio/numaflow-java/source-simple-source:keran-test-uderr
      scale:
        min: 1
    - name: cat       
      scale:
        min: 1
      udf:
        builtin:
          name: cat
    - name: sink
      scale:
        min: 1
      sink:
        log: {}
  edges:
    - from: in
      to: cat
    - from: cat
      to: sink

gRPC Server Log

[main] INFO io.numaproj.numaflow.shared.GrpcServerUtils - Writing server info io.numaproj.numaflow.info.ServerInfo@4fb0f2b9 to /var/run/numaflow/sourcer-server-info
[main] INFO io.numaproj.numaflow.sourcer.Server - server started, listening on /var/run/numaflow/source.sock
[main] INFO io.numaproj.numaflow.sourcer.Server - waiting for server to terminate
[netty-boss] WARN io.netty.bootstrap.ServerBootstrap - Unknown channel option 'SO_KEEPALIVE' for channel '[id: 0x51ddbc77, L:/var/run/numaflow/source.sock - R:]'
[grpc-default-executor-0] ERROR io.numaproj.numaflow.sourcer.Service - Encountered error in readFn onNext - This is a test exception
*** shutting down source gRPC server because of an exception - This is a test exception
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.sourcer.Server - stopping server
[main] INFO io.numaproj.numaflow.sourcer.Server - server has terminated
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.sourcer.Server - gracefully shutting down event loop groups
*** shutting down source gRPC server since JVM is shutting down

gRPC Client Log

{"level":"warn","ts":"2024-11-06T18:24:55.84521441Z","logger":"numaflow.Source-processor","caller":"source/client.go:73","msg":"source client is not ready","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"warn","ts":"2024-11-06T18:24:55.945896119Z","logger":"numaflow.Source-processor","caller":"source/client.go:73","msg":"source client is not ready","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:24:56.430399161Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:236","msg":"Generating self-signed certificate","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:24:56.431008161Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:24:56.431042869Z","logger":"numaflow.Source-processor","caller":"sources/source.go:305","msg":"Start processing source messages","pipeline":"source-error-pipeline","vertex":"in","isbs":"jetstream","to":["default-source-error-pipeline-cat-0"]}
{"level":"info","ts":"2024-11-06T18:24:56.431162786Z","logger":"numaflow","caller":"forward/data_forward.go:129","msg":"Starting forwarder..."}
{"level":"info","ts":"2024-11-06T18:24:56.431162452Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"warn","ts":"2024-11-06T18:24:56.455263702Z","logger":"numaflow","caller":"forward/data_forward.go:203","msg":"failed to read from source","error":"failed to read messages from udsource: failed to receive read response: rpc error: code = Internal desc = This is a test exception"}
{"level":"info","ts":"2024-11-06T18:24:56.455452244Z","logger":"numaflow.Source-processor","caller":"udsource/user_defined_source.go:120","msg":"Shutting down user-defined source...","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:24:56.455590744Z","logger":"numaflow","caller":"forward/data_forward.go:162","msg":"Closed source reader","sourceFrom":"in"}
{"level":"info","ts":"2024-11-06T18:24:56.455611327Z","logger":"numaflow","caller":"forward/data_forward.go:172","msg":"Closed partition writer","bufferTo":"default-source-error-pipeline-cat-0"}
{"level":"error","ts":"2024-11-06T18:24:56.455564077Z","logger":"numaflow.Source-processor","caller":"sources/source.go:317","msg":"Source forwarder stopped with error","pipeline":"source-error-pipeline","vertex":"in","error":"failed to read messages from udsource: failed to receive read response: rpc error: code = Internal desc = This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/sources.(*SourceProcessor).Start\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/sources/source.go:317\ngithub.com/numaproj/numaflow/cmd/commands.NewProcessorCommand.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/cmd/commands/processor.go:86\ngithub.com/spf13/cobra.(*Command).execute\n\t/Users/kyang5/go/pkg/mod/github.com/spf13/[email protected]/command.go:985\ngithub.com/spf13/cobra.(*Command).ExecuteC\n\t/Users/kyang5/go/pkg/mod/github.com/spf13/[email protected]/command.go:1117\ngithub.com/spf13/cobra.(*Command).Execute\n\t/Users/kyang5/go/pkg/mod/github.com/spf13/[email protected]/command.go:1041\ngithub.com/numaproj/numaflow/cmd/commands.Execute\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/cmd/commands/root.go:32\nmain.main\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/cmd/main.go:24\nruntime.main\n\t/opt/homebrew/Cellar/go/1.23.2/libexec/src/runtime/proc.go:272"}
{"level":"info","ts":"2024-11-06T18:24:56.455729202Z","logger":"numaflow.Source-processor","caller":"sources/source.go:335","msg":"Exited...","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-06T18:24:56.455772202Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"source-error-pipeline","vertex":"in","kvName":"default-source-error-pipeline-in_SOURCE_OT","watcher":"default-source-error-pipeline-in_SOURCE_OT"}
{"level":"info","ts":"2024-11-06T18:24:56.455808161Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:293","msg":"Metrics server shutdown","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"warn","ts":"2024-11-06T18:24:56.455830994Z","logger":"numaflow.Source-processor","caller":"sources/source.go:221","msg":"Failed to close gRPC client conn","pipeline":"source-error-pipeline","vertex":"in","error":"rpc error: code = Canceled desc = grpc: the client connection is closing"}
{"level":"info","ts":"2024-11-06T18:24:56.455850327Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"source-error-pipeline","vertex":"in","kvName":"default-source-error-pipeline-in_SOURCE_PROCESSORS","watcher":"default-source-error-pipeline-in_SOURCE_PROCESSORS"}
{"level":"error","ts":"2024-11-06T18:24:56.455884994Z","logger":"numaflow.Source-processor","caller":"nats/nats_client.go:69","msg":"Nats default: disconnected","pipeline":"source-error-pipeline","vertex":"in","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/clients/nats/nats_client.go:69\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:5332\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3011"}
{"level":"info","ts":"2024-11-06T18:24:56.455917827Z","logger":"numaflow.Source-processor","caller":"nats/nats_client.go:63","msg":"Nats default: connection closed","pipeline":"source-error-pipeline","vertex":"in"}
{"level":"error","ts":"2024-11-06T18:24:56.455935619Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"source-error-pipeline","vertex":"in","kvName":"default-source-error-pipeline-in_SOURCE_PROCESSORS","watcher":"default-source-error-pipeline-in_SOURCE_PROCESSORS","error":"nats: connection closed","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(*jetStreamStore).Watch.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"}
{"level":"error","ts":"2024-11-06T18:24:56.455931161Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"source-error-pipeline","vertex":"in","kvName":"default-source-error-pipeline-in_SOURCE_OT","watcher":"default-source-error-pipeline-in_SOURCE_OT","error":"nats: connection closed","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(*jetStreamStore).Watch.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"}
{"level":"error","ts":"2024-11-06T18:24:56.455947327Z","logger":"numaflow.Source-processor","caller":"nats/nats_client.go:69","msg":"Nats default: disconnected","pipeline":"source-error-pipeline","vertex":"in","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/shared/clients/nats/nats_client.go:69\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:5332\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/Users/kyang5/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3011"}
{"level":"info","ts":"2024-11-06T18:24:56.455953286Z","logger":"numaflow.Source-processor","caller":"nats/nats_client.go:63","msg":"Nats default: connection closed","pipeline":"source-error-pipeline","vertex":"in"}

build

mvn clean install && cd examples && mvn clean install -DskipTests=true && docker tag numaflow-java-examples/source-simple-source:stable quay.io/numaio/numaflow-java/source-simple-source:keran-test-uderr && docker push quay.io/numaio/numaflow-java/source-simple-source:keran-test-uderr
  1. sink

Test Pipeline

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: simple-pipeline
spec:
  vertices:
    - name: in
      source:
        generator:
          rpu: 5
          duration: 1s
      scale:
        min: 1
    - name: cat       
      scale:
        min: 1
      udf:
        builtin:
          name: cat
    - name: java-sink
      scale:
        min: 1
      sink:
        udsink:
          container:
            image: quay.io/numaio/numaflow-java/simple-sink:keran-test-uderr
            imagePullPolicy: Always
  edges:
    - from: in
      to: cat
    - from: cat
      to: java-sink

gRPC Server Log

[main] INFO io.numaproj.numaflow.shared.GrpcServerUtils - Writing server info io.numaproj.numaflow.info.ServerInfo@eafc191 to /var/run/numaflow/sinker-server-info
[main] INFO io.numaproj.numaflow.sinker.Server - server started, listening on /var/run/numaflow/sink.sock
[main] INFO io.numaproj.numaflow.sinker.Server - sink server is waiting for termination
[netty-boss] WARN io.netty.bootstrap.ServerBootstrap - Unknown channel option 'SO_KEEPALIVE' for channel '[id: 0x070b9fe9, L:/var/run/numaflow/sink.sock - R:]'
[grpc-default-executor-0] ERROR io.numaproj.numaflow.sinker.Service - Encountered error in sinkFn onNext - java.lang.RuntimeException: This is a test exception
*** shutting down sink gRPC server because of an exception - java.lang.RuntimeException: This is a test exception
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.sinker.Server - stopping server
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.sinker.Service - Sink executor was terminated.
[main] INFO io.numaproj.numaflow.sinker.Server - sink server is terminated
[ForkJoinPool.commonPool-worker-19] INFO io.numaproj.numaflow.sinker.Server - gracefully shutting down event loop groups
*** shutting down sink gRPC server since JVM is shutting down

gRPC Client Log

{"level":"warn","ts":"2024-11-06T15:19:35.925457645Z","logger":"numaflow.Sink-processor","caller":"sinker/client.go:73","msg":"waiting for the server to be ready","pipeline":"simple-pipeline","vertex":"java-sink","server":"/var/run/numaflow/sink.sock"}
{"level":"warn","ts":"2024-11-06T15:19:36.025915062Z","logger":"numaflow.Sink-processor","caller":"sinker/client.go:73","msg":"waiting for the server to be ready","pipeline":"simple-pipeline","vertex":"java-sink","server":"/var/run/numaflow/sink.sock"}
{"level":"warn","ts":"2024-11-06T15:19:36.126267728Z","logger":"numaflow.Sink-processor","caller":"sinker/client.go:73","msg":"waiting for the server to be ready","pipeline":"simple-pipeline","vertex":"java-sink","server":"/var/run/numaflow/sink.sock"}
{"level":"info","ts":"2024-11-06T15:19:36.611790687Z","logger":"numaflow.Sink-processor","caller":"metrics/metrics_server.go:236","msg":"Generating self-signed certificate","pipeline":"simple-pipeline","vertex":"java-sink"}
{"level":"info","ts":"2024-11-06T15:19:36.612498687Z","logger":"numaflow.Sink-processor","caller":"sinks/sink.go:257","msg":"Start processing sink messages ","pipeline":"simple-pipeline","vertex":"java-sink","isbsvc":"jetstream","fromPartition ":"default-simple-pipeline-java-sink-0"}
{"level":"info","ts":"2024-11-06T15:19:36.612653312Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:120","msg":"Starting sink forwarder...","pipeline":"simple-pipeline","vertex":"java-sink"}
{"level":"info","ts":"2024-11-06T15:19:36.613354854Z","logger":"numaflow.Sink-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"simple-pipeline","vertex":"java-sink"}
{"level":"info","ts":"2024-11-06T15:19:36.61346602Z","logger":"numaflow.Sink-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"simple-pipeline","vertex":"java-sink"}
{"level":"error","ts":"2024-11-06T15:19:37.192698521Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:271","msg":"failed to write to sink","pipeline":"simple-pipeline","vertex":"java-sink","error":"gRPC client.SinkFn failed, failed to receive sink response: rpc error: code = Internal desc = java.lang.RuntimeException: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/sinks/forward.(*DataForward).forwardAChunk\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/sinks/forward/forward.go:271\ngithub.com/numaproj/numaflow/pkg/sinks/forward.(*DataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/sinks/forward/forward.go:140"}
{"level":"error","ts":"2024-11-06T15:19:37.193624062Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:141","msg":"Failed to forward a chunk","pipeline":"simple-pipeline","vertex":"java-sink","error":"gRPC client.SinkFn failed, failed to receive sink response: rpc error: code = Internal desc = java.lang.RuntimeException: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/sinks/forward.(*DataForward).Start.func1\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/sinks/forward/forward.go:141"}
{"level":"info","ts":"2024-11-06T15:19:37.193778354Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:154","msg":"Closed buffer reader","pipeline":"simple-pipeline","vertex":"java-sink","bufferFrom":"default-simple-pipeline-java-sink-0"}
{"level":"info","ts":"2024-11-06T15:19:37.193787396Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:161","msg":"Closed sink writer","pipeline":"simple-pipeline","vertex":"java-sink","sink":"java-sink"}
{"level":"error","ts":"2024-11-06T15:19:37.193808479Z","logger":"numaflow.Sink-processor","caller":"sinks/sink.go:269","msg":"Sink forwarder stopped with error","pipeline":"simple-pipeline","vertex":"java-sink","fromPartition":"default-simple-pipeline-java-sink-0","error":"gRPC client.SinkFn failed, failed to receive sink response: rpc error: code = Internal desc = java.lang.RuntimeException: This is a test exception","stacktrace":"github.com/numaproj/numaflow/pkg/sinks.(*SinkProcessor).Start.func3\n\t/Users/kyang5/Desktop/development/numaflow-main/numaflow/pkg/sinks/sink.go:269"}
{"level":"info","ts":"2024-11-06T15:19:37.193919729Z","logger":"numaflow.Sink-processor","caller":"sinks/sink.go:306","msg":"Exited...","pipeline":"simple-pipeline","vertex":"java-sink"}

Build

mvn clean install && cd examples && mvn clean install -DskipTests=true && docker tag numaflow-java-examples/simple-sink:stable quay.io/numaio/numaflow-java/simple-sink:keran-test-uderr && docker push quay.io/numaio/numaflow-java/simple-sink:keran-test-uderr

Signed-off-by: Keran Yang <[email protected]>
.
Signed-off-by: Keran Yang <[email protected]>
Copy link

codecov bot commented Nov 5, 2024

Codecov Report

Attention: Patch coverage is 35.20000% with 243 lines in your changes missing coverage. Please review.

Please upload report for BASE (main@134414a). Learn more about missing BASE report.

Files with missing lines Patch % Lines
.../io/numaproj/numaflow/shared/GrpcServerHelper.java 4.65% 39 Missing and 2 partials ⚠️
...ain/java/io/numaproj/numaflow/sourcer/Service.java 54.71% 24 Missing ⚠️
.../main/java/io/numaproj/numaflow/mapper/Server.java 18.51% 21 Missing and 1 partial ⚠️
...io/numaproj/numaflow/sourcetransformer/Server.java 18.51% 21 Missing and 1 partial ⚠️
...main/java/io/numaproj/numaflow/sourcer/Server.java 23.07% 19 Missing and 1 partial ⚠️
...o/numaproj/numaflow/mapper/MapSupervisorActor.java 43.33% 15 Missing and 2 partials ⚠️
...ow/sourcetransformer/TransformSupervisorActor.java 46.42% 13 Missing and 2 partials ⚠️
.../java/io/numaproj/numaflow/batchmapper/Server.java 50.00% 10 Missing and 3 partials ⚠️
.../java/io/numaproj/numaflow/mapstreamer/Server.java 51.85% 10 Missing and 3 partials ⚠️
.../main/java/io/numaproj/numaflow/sinker/Server.java 50.00% 10 Missing and 3 partials ⚠️
... and 7 more
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #152   +/-   ##
=======================================
  Coverage        ?   57.09%           
  Complexity      ?      390           
=======================================
  Files           ?      127           
  Lines           ?     2953           
  Branches        ?      213           
=======================================
  Hits            ?     1686           
  Misses          ?     1111           
  Partials        ?      156           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Keran Yang <[email protected]>
.
Signed-off-by: Keran Yang <[email protected]>
@KeranYang KeranYang changed the title (WIP) graceful shutdown - yhl's version chore: gracefully shutdown for sink Nov 6, 2024
@KeranYang KeranYang requested a review from yhl25 November 6, 2024 00:43
.
Signed-off-by: Keran Yang <[email protected]>
}
}

public Server createServer(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly copied from GrpcServerUtils.java. We need to manage the state of event loop groups hence I created this class.

@KeranYang KeranYang requested a review from kohlisid November 6, 2024 03:40
@KeranYang KeranYang marked this pull request as ready for review November 6, 2024 03:40
.
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
@KeranYang KeranYang marked this pull request as draft November 6, 2024 13:29
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
@KeranYang KeranYang changed the title chore: gracefully shutdown for sink chore: gracefully shutdown when error encountered Nov 6, 2024
Copy link
Contributor

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that shutting down the server would also shut down the event loops.
The changes look good to me.

One suggestion: We shouldn't expose the event loop-related details to all the components. The event loops are only used by the io.grpc.Server. Instead of using GrpcServerHelper, could we write a wrapper for io.grpc.Server that exposes the necessary methods to create, start, gracefully stop, and force stop the server? This way, it can be abstracted out, and inside the wrapper, you can also shut down the event loops.

Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
@KeranYang KeranYang marked this pull request as ready for review November 6, 2024 21:42
.
Signed-off-by: Keran Yang <[email protected]>
@KeranYang KeranYang merged commit ea910b3 into main Nov 6, 2024
5 checks passed
@KeranYang KeranYang deleted the yhl-grcf-sd branch November 6, 2024 21:47
KeranYang added a commit to KeranYang/numaflow-java that referenced this pull request Jan 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants