Skip to content

Commit

Permalink
Issue #58: make SocketStream and friends continuation friendly by wra…
Browse files Browse the repository at this point in the history
…pping GsSocket references in a TransientStackValue. Add ZnTransactionSafeManagingMultiThreadedServer a subclass of ZnManagingMultiThreadedServer where all references to GsSockets are wrapped by a TransientStackValue ... including places where GsSockets are passed as arguments ... this makes the server instance transaction safe, so continuations can be snapped off and transactions can be safely used in delegates ...
  • Loading branch information
dalehenrich committed Dec 9, 2014
1 parent e29ffe6 commit 03318b1
Show file tree
Hide file tree
Showing 49 changed files with 393 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
as yet unclassified
close
"Flush any data still not sent
"Flush any data still not sent
and take care of the socket."

self flush.
socket close.
self destroy.
self flush.
self socket close.
self destroy
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
as yet unclassified
destroy
"Destroy the receiver and its underlying socket. Does not attempt to flush the output buffers. For a graceful close use SocketStream>>close instead."
socket ifNotNil: [socket close]
"Destroy the receiver and its underlying socket. Does not attempt to flush the output buffers. For a graceful close use SocketStream>>close instead."

socket ifNotNil: [ self socket close ]
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
as yet unclassified
flush
"If the other end is connected and we have something
"If the other end is connected and we have something
to send, then we send it and reset the outBuffer."

((outNextToWrite > 1) and: [socket isActive])
ifTrue: [
[socket sendData: outBuffer count: outNextToWrite - 1]
on: ConnectionTimedOut
do: [:ex | shouldSignal ifFalse: ["swallow"]].
outNextToWrite := 1]
(outNextToWrite > 1 and: [ self socket isActive ])
ifTrue: [
[ self socket sendData: outBuffer count: outNextToWrite - 1 ]
on: ConnectionTimedOut
do: [ :ex |
shouldSignal
ifFalse: [
"swallow"
] ].
outNextToWrite := 1 ]
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
as yet unclassified
flushComet
"Flushes the receiver and answer if the socket is still in a valid state and both ends are properly connected. Free the socket, as a side-effect if there is a problem."

| result |
result := [ self flush. true ]
on: NetworkError
do: [ :err | false ].
result := result and: [ self isConnected and: [ self isOtherEndConnected ] ].
result ifFalse: [ socket destroy ].
^ result
"Flushes the receiver and answer if the socket is still in a valid state and both ends are properly connected. Free the socket, as a side-effect if there is a problem."

| result |
result := [
self flush.
true ]
on: NetworkError
do: [ :err | false ].
result := result and: [ self isConnected and: [ self isOtherEndConnected ] ].
result
ifFalse: [ self socket destroy ].
^ result
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
as yet unclassified
isConnected
"The stream is connected if the socket is."
"The stream is connected if the socket is."

^socket isConnected
^ self socket isConnected
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
as yet unclassified
isDataAvailable
"It the inbuffer is empty, we check the socket for data.
"It the inbuffer is empty, we check the socket for data.
If it claims to have data available to read, we try to read
some once and recursively call this method again.
If something really was available it is now in the inBuffer.
This is because there has been spurious
dataAvailable when there really is no data to get."

self isInBufferEmpty ifFalse: [^true].
^socket dataAvailable
ifFalse: [false]
ifTrue: [self receiveDataIfAvailable; isDataAvailable]

self isInBufferEmpty
ifFalse: [ ^ true ].
^ self socket dataAvailable
ifFalse: [ false ]
ifTrue: [
self
receiveDataIfAvailable;
isDataAvailable ]
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
as yet unclassified
isOtherEndConnected
^socket isConnected
^ self socket isConnected
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
as yet unclassified
nextPutAllFlush: aCollection
"Put a String or a ByteArray onto the stream.
"Put a String or a ByteArray onto the stream.
You can use this if you have very large data - it avoids
copying into the buffer (and avoids buffer growing)
and also flushes any other pending data first."

| toPut |
toPut := binary ifTrue: [aCollection asByteArray] ifFalse: [aCollection asString].
self flush. "first flush pending stuff, then directly send"
socket isConnected ifTrue: [
[socket sendData: toPut count: toPut size]
on: ConnectionTimedOut
do: [:ex | shouldSignal ifFalse: ["swallow"]]]
| toPut |
toPut := binary
ifTrue: [ aCollection asByteArray ]
ifFalse: [ aCollection asString ].
self flush. "first flush pending stuff, then directly send"
self socket isConnected
ifTrue: [
[ self socket sendData: toPut count: toPut size ]
on: ConnectionTimedOut
do: [ :ex |
shouldSignal
ifFalse: [
"swallow"
] ] ]
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
as yet unclassified
receiveAvailableData
"Receive available data (as much as fits in the inBuffer)
"Receive available data (as much as fits in the inBuffer)
but not waiting for more to arrive.
Return the position in the buffer where the
new data starts, regardless if anything
was read, see #adjustInBuffer."

recentlyRead := socket receiveAvailableDataInto: inBuffer startingAt: inNextToWrite.
^self adjustInBuffer: recentlyRead

recentlyRead := self socket
receiveAvailableDataInto: inBuffer
startingAt: inNextToWrite.
^ self adjustInBuffer: recentlyRead
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
as yet unclassified
receiveData
"Receive data with timeout if it has been set.
"Receive data with timeout if it has been set.
If shouldSignal is false we use the Socket methods
that swallow those Exceptions, if it is true the
caller will have to handle those Exceptions.
Return the position in the buffer where the
new data starts, regardless if anything
was read, see #adjustInBuffer."

recentlyRead := shouldSignal ifTrue: [
self shouldTimeout ifTrue: [
socket receiveDataSignallingTimeout: timeout
into: inBuffer startingAt: inNextToWrite]
ifFalse: [
socket receiveDataSignallingClosedInto: inBuffer
startingAt: inNextToWrite]]
ifFalse: [
self shouldTimeout ifTrue: [
"This case is tricky, if it times out and is swallowed

recentlyRead := shouldSignal
ifTrue: [
self shouldTimeout
ifTrue: [
self socket
receiveDataSignallingTimeout: timeout
into: inBuffer
startingAt: inNextToWrite ]
ifFalse: [ self socket receiveDataSignallingClosedInto: inBuffer startingAt: inNextToWrite ] ]
ifFalse: [
self shouldTimeout
ifTrue: [
"This case is tricky, if it times out and is swallowed
how does other methods calling this method repeatedly
get to know that? And what should they do?"
socket receiveDataTimeout: timeout
into: inBuffer startingAt: inNextToWrite]
ifFalse: [
socket receiveDataInto: inBuffer
startingAt: inNextToWrite]].
^self adjustInBuffer: recentlyRead
self socket
receiveDataTimeout: timeout
into: inBuffer
startingAt: inNextToWrite ]
ifFalse: [ self socket receiveDataInto: inBuffer startingAt: inNextToWrite ] ].
^ self adjustInBuffer: recentlyRead
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
as yet unclassified
receiveDataIfAvailable
"Only used to check that there really is data to read
"Only used to check that there really is data to read
from the socket after it signals dataAvailable.
It has been known to signal true and then still
not have anything to read. See also isDataAvailable.
Return the position in the buffer where the
new data starts, regardless if anything
was read, see #adjustInBuffer."

recentlyRead := socket receiveSomeDataInto: inBuffer startingAt: inNextToWrite.
^self adjustInBuffer: recentlyRead
recentlyRead := self socket
receiveSomeDataInto: inBuffer
startingAt: inNextToWrite.
^ self adjustInBuffer: recentlyRead
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
as yet unclassified
socket: aSocket
socket := aSocket
socket := TransientStackValue value: aSocket
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
as yet unclassified
socket
^socket
^ socket value
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@
"bufferSize" : "PaulDeBruicker 03/03/2011 12:09",
"bufferSize:" : "PaulDeBruicker 03/03/2011 12:09",
"checkFlush" : "PaulDeBruicker 03/03/2011 12:09",
"close" : "PaulDeBruicker 03/03/2011 12:15",
"close" : "dkh 12/08/2014 16:07",
"connect" : "dkh 08/06/2012 12:12",
"cr" : "PaulDeBruicker 03/03/2011 12:15",
"crlf" : "PaulDeBruicker 03/03/2011 12:15",
"debug" : "PaulDeBruicker 04/12/2011 15:27",
"destroy" : "PaulDeBruicker 03/03/2011 14:17",
"flush" : "PaulDeBruicker 03/03/2011 14:19",
"flushComet" : "PaulDeBruicker 03/03/2011 12:26",
"destroy" : "dkh 12/08/2014 16:07",
"flush" : "dkh 12/08/2014 16:07",
"flushComet" : "dkh 12/08/2014 16:07",
"greaseNext:putAll:startingAt:" : "PaulDeBruicker 03/03/2011 12:26",
"growInBuffer" : "PaulDeBruicker 03/03/2011 12:27",
"ifStale:" : "PaulDeBruicker 03/03/2011 12:27",
"inBufferSize" : "PaulDeBruicker 03/03/2011 12:27",
"initialize" : "PaulDeBruicker 03/03/2011 12:27",
"isBinary" : "PaulDeBruicker 03/03/2011 12:27",
"isConnected" : "PaulDeBruicker 03/03/2011 12:27",
"isDataAvailable" : "PaulDeBruicker 03/03/2011 12:27",
"isConnected" : "dkh 12/08/2014 16:07",
"isDataAvailable" : "dkh 12/08/2014 16:07",
"isEmpty" : "PaulDeBruicker 03/03/2011 12:28",
"isInBufferEmpty" : "PaulDeBruicker 03/03/2011 12:28",
"isOtherEndConnected" : "dkh 08/06/2012 12:09",
"isOtherEndConnected" : "dkh 12/08/2014 16:07",
"isStream" : "PaulDeBruicker 03/03/2011 12:28",
"moveInBufferDown" : "PaulDeBruicker 03/03/2011 12:28",
"next" : "dkh 06/05/2014 21:12",
Expand All @@ -52,7 +52,7 @@
"nextLineLf" : "PaulDeBruicker 03/03/2011 12:35",
"nextPut:" : "PaulDeBruicker 03/03/2011 12:35",
"nextPutAll:" : "PaulDeBruicker 03/03/2011 12:35",
"nextPutAllFlush:" : "dkh 08/06/2012 12:09",
"nextPutAllFlush:" : "dkh 12/08/2014 16:08",
"noTimeout" : "PaulDeBruicker 03/03/2011 12:35",
"outBufferSize" : "PaulDeBruicker 03/03/2011 12:35",
"peek" : "PaulDeBruicker 03/03/2011 12:35",
Expand All @@ -63,10 +63,10 @@
"printOn:" : "PaulDeBruicker 03/03/2011 12:36",
"putOn:" : "PaulDeBruicker 03/03/2011 11:55",
"readInto:startingAt:count:" : "PaulDeBruicker 03/03/2011 12:36",
"receiveAvailableData" : "PaulDeBruicker 03/03/2011 12:50",
"receiveData" : "PaulDeBruicker 03/03/2011 13:25",
"receiveAvailableData" : "dkh 12/08/2014 16:08",
"receiveData" : "dkh 12/08/2014 16:08",
"receiveData:" : "PaulDeBruicker 03/03/2011 12:28",
"receiveDataIfAvailable" : "PaulDeBruicker 03/03/2011 13:26",
"receiveDataIfAvailable" : "dkh 12/08/2014 16:08",
"recentlyRead" : "PaulDeBruicker 03/03/2011 13:26",
"resetBuffers" : "PaulDeBruicker 03/03/2011 13:26",
"resizeInBuffer:" : "PaulDeBruicker 03/03/2011 13:26",
Expand All @@ -76,8 +76,8 @@
"shouldTimeout" : "PaulDeBruicker 03/03/2011 13:26",
"size" : "PaulDeBruicker 03/03/2011 13:26",
"skip:" : "PaulDeBruicker 03/03/2011 13:27",
"socket" : "PaulDeBruicker 03/03/2011 13:27",
"socket:" : "PaulDeBruicker 03/03/2011 13:27",
"socket" : "dkh 12/08/2014 16:08",
"socket:" : "dkh 12/09/2014 10:01",
"space" : "PaulDeBruicker 03/03/2011 13:27",
"streamBuffer:" : "PaulDeBruicker 03/03/2011 13:27",
"timeout" : "PaulDeBruicker 03/12/2011 12:01",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
initialize-release
onNativeSocket: aSocket forDomain: aCommunicationDomain type: aSocketType protocol: aProtocolNumber
"^self
I initialize myself as a socket defined by the communications domain, type and protocol. This
follows the equivalent of the using the socket() or socketpair() function."

communicationDomain := aCommunicationDomain.
socketType := aSocketType.
protocolNumber := aProtocolNumber.
underlyingSocket := TransientStackValue value: aSocket.
^ self
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
private
onNativeclientSocket: aNativeSocket for: aServerSocket
"^self
I initialize myself with the same properties as aServerSocket and with
aNativeSocket as my underlying socket."

communicationDomain := aServerSocket communicationDomain.
socketType := aServerSocket socketType.
protocolNumber := aServerSocket protocolNumber.
underlyingSocket := TransientStackValue value: aNativeSocket.
^ self
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
private
underlyingSocket
^ underlyingSocket value
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"listenOn:backlogSize:" : "PaulDeBruicker 04/07/2011 17:37",
"listenOn:backlogSize:interface:" : "dkh 08/06/2012 14:53",
"localPort" : "PaulDeBruicker 04/10/2011 10:17",
"onNativeSocket:forDomain:type:protocol:" : "dkh 12/09/2014 10:01",
"onNativeclientSocket:for:" : "dkh 12/09/2014 10:01",
"port" : "PaulDeBruicker 03/12/2011 11:08",
"receiveAvailableDataInto:startingAt:" : "PaulDeBruicker 03/12/2011 11:09",
"receiveDataInto:startingAt:" : "PaulDeBruicker 04/12/2011 15:53",
Expand All @@ -22,6 +24,7 @@
"sendData:count:" : "PaulDeBruicker 03/12/2011 11:09",
"sendSomeData:startIndex:count:" : "PaulDeBruicker 03/12/2011 11:10",
"setOption:value:" : "PaulDeBruicker 04/10/2011 10:18",
"underlyingSocket" : "dkh 12/08/2014 16:12",
"waitForAcceptFor:" : "PaulDeBruicker 04/12/2011 15:56",
"waitForConnectionFor:" : "PaulDeBruicker 03/12/2011 11:10",
"waitForConnectionFor:ifTimedOut:" : "PaulDeBruicker 04/12/2011 16:15",
Expand Down
Loading

0 comments on commit 03318b1

Please sign in to comment.