Skip to content

Commit

Permalink
fix bug for resubscribing with the same asset (#3119)
Browse files Browse the repository at this point in the history
Co-authored-by: cl-ea <[email protected]>
  • Loading branch information
karen-stepanyan and cl-ea authored Jan 17, 2024
1 parent 084ca88 commit 988a72e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/bright-insects-share.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@chainlink/dar-adapter': patch
---

Fixed bug that was causing EA to unsubscribe/resubscribe to the same assets
1 change: 1 addition & 0 deletions packages/sources/dar/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const config = new AdapterConfig(
{
envDefaultOverrides: {
CACHE_MAX_AGE: 20 * 60 * 1000, //20 minutes
WS_CONNECTION_OPEN_TIMEOUT: 30_000,
},
},
)
18 changes: 12 additions & 6 deletions packages/sources/dar/src/transport/price.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export const config: WebSocketTransportConfig<WsTransportTypes> = {
return {
headers: {
Authorization: token,
assets: desiredSubs.map((pair) => pair['base'].toLowerCase()).join(','),
assets: [...new Set(desiredSubs.map((pair) => pair['base'].toLowerCase()))].join(','),
},
}
},
Expand Down Expand Up @@ -95,11 +95,17 @@ export class DarWebsocketTransport extends WebSocketTransport<WsTransportTypes>
context: EndpointContext<WsTransportTypes>,
subscriptions: SubscriptionDeltas<RequestParams>,
): Promise<void> {
if (
this.wsConnection &&
!this.connectionClosed() &&
(subscriptions.new.length || subscriptions.stale.length)
) {
const desiredBases = subscriptions.desired.map((s) => s.base)
const shouldUnsubscribe = () => {
// we unsubscribe and resubscribe if a new subscription (base) is present in desired subscriptions only once or
// when stale subscription (base) is not part of desired subscriptions
return (
subscriptions.new.some((s) => desiredBases.filter((a) => a === s.base).length === 1) ||
subscriptions.stale.some((s) => desiredBases.indexOf(s.base) === -1)
)
}

if (this.wsConnection && !this.connectionClosed() && shouldUnsubscribe()) {
logger.debug(
`closing WS connection for new subscriptions: ${JSON.stringify(subscriptions.desired)}`,
)
Expand Down
38 changes: 34 additions & 4 deletions packages/sources/dar/test/unit/price.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,36 @@ describe('DarWebsocketTransport', () => {
expect(subscriptions['new']).toEqual([{ base: 'BTC', quote: 'USD' }])
transport.wsConnection.close()
})

it('new subscription with existing base does not close the connection', async () => {
subscriptions = {
desired: [{ base: 'BTC', quote: 'USD' }],
new: [],
stale: [],
}

await transport.streamHandler(context, subscriptions)
await sleep(100)
expect(connClosed).toEqual(false)
expect(transport.connectionClosed()).toEqual(false)

subscriptions = {
desired: [
{ base: 'BTC', quote: 'USD' },
{ base: 'BTC', quote: 'ETH' },
],
new: [{ base: 'BTC', quote: 'ETH' }],
stale: [],
}

await transport.streamHandler(context, subscriptions)
expect(connClosed).toEqual(false)
expect(transport.connectionClosed()).toEqual(false)
})
})

describe('test closing of connection', () => {
it('new subscription, closes existing connection', async () => {
it('new subscription, closes existing connection and resubscribes', async () => {
subscriptions = {
desired: [{ base: 'BTC', quote: 'USD' }],
new: [{ base: 'BTC', quote: 'USD' }],
Expand All @@ -181,14 +207,18 @@ describe('DarWebsocketTransport', () => {
expect(transport.connectionClosed()).toEqual(false)

subscriptions = {
desired: [],
desired: [
{ base: 'BTC', quote: 'USD' },
{ base: 'ETH', quote: 'USD' },
],
new: [{ base: 'ETH', quote: 'USD' }],
stale: [],
}

await transport.streamHandler(context, subscriptions)
// we close the connection however since we have a new unique base ('ETH') we will resubscribe again
expect(connClosed).toEqual(true)
expect(transport.connectionClosed()).toEqual(true)
expect(transport.connectionClosed()).toEqual(false)
})

it('stale connection, closes existing connection', async () => {
Expand All @@ -206,7 +236,7 @@ describe('DarWebsocketTransport', () => {
subscriptions = {
desired: [],
new: [],
stale: [{ base: 'ETH', quote: 'USD' }],
stale: [{ base: 'BTC', quote: 'USD' }],
}

await transport.streamHandler(context, subscriptions)
Expand Down

0 comments on commit 988a72e

Please sign in to comment.