Skip to content

Commit

Permalink
Make channel raise Connection::ClosedException
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun committed Aug 22, 2024
1 parent d380e65 commit fccfff1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 25 deletions.
44 changes: 21 additions & 23 deletions src/amqp-client/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class AMQP::Client
rescue ex
Log.error(exception: ex) { "Couldn't write CloseOk frame" }
end
@closed = true
@closing_frame = frame
@closed = true
if on_close = @on_close
begin
on_close.call(frame.reply_code, frame.reply_text)
Expand Down Expand Up @@ -268,7 +268,7 @@ class AMQP::Client
def basic_publish(body : IO | Bytes, bytesize : Int, exchange : String, routing_key = "",
mandatory = false, immediate = false, props properties = Properties.new,
blk : Proc(Bool, Nil)? = nil) : UInt64
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed

@connection.with_lock(flush: !@tx) do |c|
if blk && !@confirm_mode
Expand Down Expand Up @@ -309,7 +309,7 @@ class AMQP::Client
waiting_fiber.enqueue
end
sleep
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
confirmed
end

Expand All @@ -321,7 +321,7 @@ class AMQP::Client
waiting_fiber.enqueue
end
sleep
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
confirmed
end

Expand All @@ -333,11 +333,8 @@ class AMQP::Client
return true if @unconfirmed_publishes.empty?
ok = @unconfirmed_empty.receive
unless ok
if frame = @closing_frame
raise ClosedException.new(frame)
else
raise Error.new("BUG: got nack without closing frame")
end
raise_if_closed
raise Error.new("BUG: got nack without closing frame")
end
ok
end
Expand Down Expand Up @@ -383,13 +380,8 @@ class AMQP::Client
write Frame::Basic::Get.new(@id, 0_u16, queue, no_ack)
@basic_get.receive
rescue ex : ::Channel::ClosedError
if cf = @connection.closing_frame
raise Connection::ClosedException.new(cf)
elsif cf = @closing_frame
raise ClosedException.new(cf, cause: ex)
else
raise ex
end
raise_if_closed(cause: ex)
raise ex
end

# :nodoc:
Expand Down Expand Up @@ -430,7 +422,7 @@ class AMQP::Client
raise ex
end
end
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
else
done.close
end
Expand Down Expand Up @@ -681,25 +673,31 @@ class AMQP::Client
end

private def write(frame)
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
@connection.write frame
end

private def next_frame : Frame
@reply_frames.receive
rescue ex : ::Channel::ClosedError
if conn_close = @connection.closing_frame
raise Connection::ClosedException.new(conn_close)
else
raise ClosedException.new(@closing_frame, cause: ex)
end
raise_if_closed(ex)
raise Error.new("BUG: Channel::ClosedError but not closing frame")
end

private macro expect(clz)
frame = next_frame
frame.as?({{ clz }}) || raise Error::UnexpectedFrame.new(frame)
end

private def raise_if_closed(cause ex : Exception? = nil)
if frame = @closing_frame
raise ClosedException.new frame, cause: ex
end
if frame = @connection.closing_frame
raise Connection::ClosedException.new frame, cause: ex
end
end

def inspect(io : IO) : Nil
io << "#<" << self.class.name << " @id=" << @id << '>'
end
Expand Down
4 changes: 2 additions & 2 deletions src/amqp-client/errors.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class AMQP::Client
super(message, cause)
end

def initialize(frame : Frame::Connection::Close)
super("#{frame.reply_text} (#{frame.reply_code})")
def initialize(frame : Frame::Connection::Close, cause : Exception? = nil)
super("#{frame.reply_text} (#{frame.reply_code})", cause)
end

def initialize(message, host, user, vhost)
Expand Down

0 comments on commit fccfff1

Please sign in to comment.