Skip to content

Commit

Permalink
feat(decider): Use streams instead of options [LNG-277] (#106)
Browse files Browse the repository at this point in the history
Use streams instead of options
  • Loading branch information
InversionSpaces authored Nov 7, 2023
1 parent 22c1065 commit 9e92d8f
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/aqua/chain/blocks.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ChainConnector from "services.aqua"

-- Get latest block number from RPC
func get_latest(spell_id: SpellId, chain: ChainInfo) -> ?string:
latest: ?string
latest: *string
latest_block <- ChainConnector.latest_block_number(chain.api_endpoint)
if latest_block.success:
latest <<- latest_block.number_hex
Expand All @@ -24,7 +24,7 @@ func get_latest(spell_id: SpellId, chain: ChainInfo) -> ?string:
-- ordering is -1 for <, 0 for =, 1 for >
-- returns nil if either `hex_a` or `hex_b` is an invalid hex
func hex_cmp(spell_id: SpellId, hex_a: string, hex_b: string, pred: i8 -> bool) -> ?bool:
result: ?bool
result: *bool

cmp <- ChainConnector.hex_cmp(hex_a, hex_b)

Expand Down
2 changes: 1 addition & 1 deletion src/aqua/chain/changed_deals.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func make_change_reqs(spell_id: SpellId, deals: []JoinedDeal) -> []DealChangesRe

-- Get DealChanged logs from RPC
func get_deal_changes(spell_id: SpellId, api_endpoint: string, joined: []JoinedDeal) -> ?[]DealChangedResult:
changes: ?[]DealChangedResult
changes: *[]DealChangedResult

deals <- make_change_reqs(spell_id, joined)
if deals.length != 0:
Expand Down
2 changes: 1 addition & 1 deletion src/aqua/chain/from_block.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func get_from_block(spell_id: SpellId) -> string:
-- "latest" => latest block on chain
-- "earliest" => 0x0
func evaluate_from_block(spell_id: SpellId, chain: ChainInfo) -> ?string:
left: ?string
left: *string
from_block <- get_from_block(spell_id)
if from_block == "latest":
latest <- get_latest(spell_id, chain)
Expand Down
6 changes: 3 additions & 3 deletions src/aqua/chain/new_deals.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use "../hex.aqua" as Hex
-- return a value to use as a left boundary in `eth_newLogs`
-- it usually equals to `last seen block + 1`, except on first iteration it is equal to evaluated `from_block`
func get_left_boundary(spell_id: SpellId, chain: ChainInfo) -> ?string:
left: ?string
left: *string

-- load "last seen" block number
-- `last seen block` is initialized to the first `left boundary` this function returns
Expand Down Expand Up @@ -80,7 +80,7 @@ func filter_new(spell_id: SpellId, matches: []DealMatched, joined_deals: []Joine
new: *DealMatched
-- TODO: replace with a more optimal implementation once hashmaps land in AquaVM
for match <- matches:
joined: ?bool
joined: *bool
match_deal_id = match.info.deal_id
-- first, check if worker for that deal is created
if is_worker_created(spell_id, match_deal_id):
Expand All @@ -100,7 +100,7 @@ data Poll:
right_boundary: string

func poll_logs(spell_id: SpellId, chain: ChainInfo, left: string) -> ?Poll:
poll: ?Poll
poll: *Poll

result <- ChainConnector.poll_deal_matches(chain, left)
if !result.success:
Expand Down
2 changes: 1 addition & 1 deletion src/aqua/decider/deal_storage.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func store_deal_state(decider_id: SpellId, deal_id: DealId, state: DealState):

func get_deal_state(decider_id: SpellId, deal_id: DealId) -> ?DealState:
Spell decider_id
state: ?DealState
state: *DealState

json <- Spell.get_string(deal_id)
if json.success && !json.absent:
Expand Down
4 changes: 2 additions & 2 deletions src/aqua/decider/join_deal.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func join_deal(spell_id: SpellId, block: string, deal_id: DealId, app_cid: CID)
deal_log(spell_id, deal_id, msg)

log(["joining a deal from_block", block])
worker_id: ?WorkerId
error: ?string
worker_id: *WorkerId
error: *string
try:
id <- Worker.create(spell_id, deal_id)
if id != nil:
Expand Down
2 changes: 1 addition & 1 deletion src/aqua/fluence/worker.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func is_worker_created(spell_id: SpellId, deal_id: DealId) -> bool:
<- created!

func create(spell_id: SpellId, deal_id: DealId) -> ?WorkerId:
worker_id: ?WorkerId
worker_id: *WorkerId
try:
worker_id <- Worker.create(?[deal_id])
deal_log(spell_id, deal_id, ["created worker for deal", worker_id!])
Expand Down

0 comments on commit 9e92d8f

Please sign in to comment.