Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[imporve] add func blockIfQueueFull() to encapsulate DisableBlockIfQue… #1122

Merged
merged 2 commits into from
Oct 26, 2023

Conversation

gunli
Copy link
Contributor

@gunli gunli commented Oct 26, 2023

Fixes #1088

Master Issue: #1088

Motivation

Since DisableBlockIfQueueFull is difficult to understand and renaming it is not acceptable, we can add a func to encapsulate it and make it easy to understand and use.

Modifications

  1. add func blockIfQueueFull()
   func (p *partitionProducer) blockIfQueueFull() bool {
      //DisableBlockIfQueueFull == false means enable block
      return !p.options.DisableBlockIfQueueFull
  }
  1. call blockIfQueueFull() when neccessary

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change to avoid double negation readability issue.

What about we use -

if p.blockIfQueueFull() {
  // ..
} else {
  // ..
}

and switch if-else blocks, so avoid just move the double negation into if block

@gunli
Copy link
Contributor Author

gunli commented Oct 26, 2023

I like this change to avoid double negation readability issue.

What about we use -

if p.blockIfQueueFull() {
  // ..
} else {
  // ..
}

and switch if-else blocks, so avoid just move the double negation into if block

@tisonkun OK, I will update it right now

@tisonkun
Copy link
Member

DATA RACE - while it should be unrelated to this PR

2023-10-26T08:00:55.3519124Z ==================
2023-10-26T08:00:55.3519265Z WARNING: DATA RACE
2023-10-26T08:00:55.3519475Z Write at 0x00c0004a5c08 by goroutine 382:
2023-10-26T08:00:55.3519639Z   runtime.slicecopy()
2023-10-26T08:00:55.3520153Z       /usr/local/go/src/runtime/slice.go:247 +0x0
2023-10-26T08:00:55.3520711Z   github.com/apache/pulsar-client-go/pulsar/internal.MarshalToSizedBuffer()
2023-10-26T08:00:55.3521522Z       /pulsar/pulsar-client-go/pulsar/internal/utils.go:85 +0x88
2023-10-26T08:00:55.3522012Z   github.com/apache/pulsar-client-go/pulsar/internal.serializeMessage()
2023-10-26T08:00:55.3522530Z       /pulsar/pulsar-client-go/pulsar/internal/commands.go:295 +0x5f5
2023-10-26T08:00:55.3523054Z   github.com/apache/pulsar-client-go/pulsar/internal.(*batchContainer).Flush()
2023-10-26T08:00:55.3523618Z       /pulsar/pulsar-client-go/pulsar/internal/batch_builder.go:274 +0x5fe
2023-10-26T08:00:55.3524270Z   github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).internalFlushCurrentBatch()
2023-10-26T08:00:55.3524886Z       /pulsar/pulsar-client-go/pulsar/producer_partition.go:732 +0xe8
2023-10-26T08:00:55.3525650Z   github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).runEventsLoop()
2023-10-26T08:00:55.3526190Z       /pulsar/pulsar-client-go/pulsar/producer_partition.go:460 +0x4e4
2023-10-26T08:00:55.3526205Z 
2023-10-26T08:00:55.3526446Z Previous read at 0x00c0004a5c08 by goroutine 495:
2023-10-26T08:00:55.3526558Z   ??()
2023-10-26T08:00:55.3526792Z       -:0 +0xffffffffffffffff
2023-10-26T08:00:55.3527093Z   strconv.appendQuotedWith()
2023-10-26T08:00:55.3527398Z       /usr/local/go/src/strconv/quote.go:48 +0x424
2023-10-26T08:00:55.3527558Z   strconv.AppendQuote()
2023-10-26T08:00:55.3527883Z       /usr/local/go/src/strconv/quote.go:131 +0x16e
2023-10-26T08:00:55.3528020Z   fmt.(*fmt).fmtQ()
2023-10-26T08:00:55.3528305Z       /usr/local/go/src/fmt/format.go:457 +0x16f
2023-10-26T08:00:55.3528467Z   fmt.(*pp).fmtString()
2023-10-26T08:00:55.3528742Z       /usr/local/go/src/fmt/print.go:452 +0x96
2023-10-26T08:00:55.3528891Z   fmt.(*pp).printArg()
2023-10-26T08:00:55.3529372Z       /usr/local/go/src/fmt/print.go:694 +0xeb9
2023-10-26T08:00:55.3529520Z   fmt.(*pp).doPrintf()
2023-10-26T08:00:55.3529829Z       /usr/local/go/src/fmt/print.go:1026 +0x330
2023-10-26T08:00:55.3529967Z   fmt.Sprintf()
2023-10-26T08:00:55.3530252Z       /usr/local/go/src/fmt/print.go:219 +0x73
2023-10-26T08:00:55.3530595Z   github.com/sirupsen/logrus.(*TextFormatter).appendValue()
2023-10-26T08:00:55.3531197Z       /pkg/mod/github.com/sirupsen/[email protected]/text_formatter.go:332 +0x11a
2023-10-26T08:00:55.3531553Z   github.com/sirupsen/logrus.(*TextFormatter).appendKeyValue()
2023-10-26T08:00:55.3532271Z       /pkg/mod/github.com/sirupsen/[email protected]/text_formatter.go:320 +0xc8
2023-10-26T08:00:55.3532947Z   github.com/sirupsen/logrus.(*TextFormatter).Format()
2023-10-26T08:00:55.3533734Z       /pkg/mod/github.com/sirupsen/[email protected]/text_formatter.go:221 +0xe5d
2023-10-26T08:00:55.3534006Z   github.com/sirupsen/logrus.(*Entry).write()
2023-10-26T08:00:55.3534508Z       /pkg/mod/github.com/sirupsen/[email protected]/entry.go:275 +0x141
2023-10-26T08:00:55.3534758Z   github.com/sirupsen/logrus.Entry.log()
2023-10-26T08:00:55.3535253Z       /pkg/mod/github.com/sirupsen/[email protected]/entry.go:251 +0x404
2023-10-26T08:00:55.3535493Z   github.com/sirupsen/logrus.(*Entry).Log()
2023-10-26T08:00:55.3536005Z       /pkg/mod/github.com/sirupsen/[email protected]/entry.go:287 +0x167
2023-10-26T08:00:55.3536249Z   github.com/sirupsen/logrus.(*Entry).Logf()
2023-10-26T08:00:55.3536742Z       /pkg/mod/github.com/sirupsen/[email protected]/entry.go:333 +0x118
2023-10-26T08:00:55.3537160Z   github.com/sirupsen/logrus.(*Entry).Infof()
2023-10-26T08:00:55.3537733Z       /pkg/mod/github.com/sirupsen/[email protected]/entry.go:346 +0x72
2023-10-26T08:00:55.3538349Z   github.com/apache/pulsar-client-go/pulsar/log.(*logrusWrapper).Infof()
2023-10-26T08:00:55.3539411Z       /pulsar/pulsar-client-go/pulsar/log/wrapper_logrus.go:83 +0x86
2023-10-26T08:00:55.3540055Z   github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleCloseProducer()
2023-10-26T08:00:55.3540637Z       /pulsar/pulsar-client-go/pulsar/internal/connection.go:913 +0x141
2023-10-26T08:00:55.3541510Z   github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand()
2023-10-26T08:00:55.3542092Z       /pulsar/pulsar-client-go/pulsar/internal/connection.go:576 +0x469
2023-10-26T08:00:55.3542627Z   github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run()
2023-10-26T08:00:55.3543214Z       /pulsar/pulsar-client-go/pulsar/internal/connection.go:431 +0x644
2023-10-26T08:00:55.3544372Z   github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
2023-10-26T08:00:55.3545283Z       /pulsar/pulsar-client-go/pulsar/internal/connection.go:236 +0xc4
2023-10-26T08:00:55.3545296Z 
2023-10-26T08:00:55.3545464Z Goroutine 382 (running) created at:
2023-10-26T08:00:55.3545951Z   github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
2023-10-26T08:00:55.3546605Z       /pulsar/pulsar-client-go/pulsar/producer_partition.go:198 +0x137a
2023-10-26T08:00:55.3547379Z   github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers.func1()
2023-10-26T08:00:55.3547908Z       /pulsar/pulsar-client-go/pulsar/producer_impl.go:245 +0x105
2023-10-26T08:00:55.3547917Z 
2023-10-26T08:00:55.3548085Z Goroutine 495 (finished) created at:
2023-10-26T08:00:55.3548601Z   github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
2023-10-26T08:00:55.3549328Z       /pulsar/pulsar-client-go/pulsar/internal/connection.go:232 +0x7a
2023-10-26T08:00:55.3550325Z   github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
2023-10-26T08:00:55.3550955Z       /pulsar/pulsar-client-go/pulsar/internal/connection_pool.go:115 +0x3a9
2023-10-26T08:00:55.3551476Z   github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
2023-10-26T08:00:55.3552022Z       /pulsar/pulsar-client-go/pulsar/internal/rpc_client.go:120 +0xed
2023-10-26T08:00:55.3552660Z   github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
2023-10-26T08:00:55.3553214Z       /pulsar/pulsar-client-go/pulsar/internal/rpc_client.go:103 +0x211
2023-10-26T08:00:55.3554277Z   github.com/apache/pulsar-client-go/pulsar/internal.(*lookupService).GetPartitionedTopicMetadata()
2023-10-26T08:00:55.3554856Z       /pulsar/pulsar-client-go/pulsar/internal/lookup_service.go:215 +0x2c8
2023-10-26T08:00:55.3555499Z   github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
2023-10-26T08:00:55.3555984Z       /pulsar/pulsar-client-go/pulsar/client_impl.go:251 +0xfd
2023-10-26T08:00:55.3556648Z   github.com/apache/pulsar-client-go/pulsar.(*consumer).internalTopicSubscribeToPartitions()
2023-10-26T08:00:55.3557131Z       /pulsar/pulsar-client-go/pulsar/consumer_impl.go:304 +0xcf
2023-10-26T08:00:55.3557598Z   github.com/apache/pulsar-client-go/pulsar.newInternalConsumer()
2023-10-26T08:00:55.3558089Z       /pulsar/pulsar-client-go/pulsar/consumer_impl.go:256 +0x555
2023-10-26T08:00:55.3558620Z   github.com/apache/pulsar-client-go/pulsar.newConsumer()
2023-10-26T08:00:55.3559160Z       /pulsar/pulsar-client-go/pulsar/consumer_impl.go:196 +0x69b
2023-10-26T08:00:55.3559582Z   github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
2023-10-26T08:00:55.3560040Z       /pulsar/pulsar-client-go/pulsar/client_impl.go:219 +0xc4
2023-10-26T08:00:55.3560428Z   github.com/apache/pulsar-client-go/pulsar.TestSendTimeout()
2023-10-26T08:00:55.3560918Z       /pulsar/pulsar-client-go/pulsar/producer_test.go:1173 +0x314
2023-10-26T08:00:55.3561190Z   testing.tRunner()
2023-10-26T08:00:55.3561587Z       /usr/local/go/src/testing/testing.go:1203 +0x202
2023-10-26T08:00:55.3561721Z ==================

@tisonkun tisonkun merged commit 2a8d151 into apache:master Oct 26, 2023
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improve] Rename DisableBlockIfQueueFull to BlockIfQueueFull as JAVA client and have the same default value
2 participants