Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(decider): Use streams instead of options [LNG-277] #106

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/aqua/chain/blocks.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ChainConnector from "services.aqua"

-- Get latest block number from RPC
func get_latest(spell_id: SpellId, chain: ChainInfo) -> ?string:
latest: ?string
latest: *string
latest_block <- ChainConnector.latest_block_number(chain.api_endpoint)
if latest_block.success:
latest <<- latest_block.number_hex
Expand All @@ -24,7 +24,7 @@ func get_latest(spell_id: SpellId, chain: ChainInfo) -> ?string:
-- ordering is -1 for <, 0 for =, 1 for >
-- returns nil if either `hex_a` or `hex_b` is an invalid hex
func hex_cmp(spell_id: SpellId, hex_a: string, hex_b: string, pred: i8 -> bool) -> ?bool:
result: ?bool
result: *bool

cmp <- ChainConnector.hex_cmp(hex_a, hex_b)

Expand Down
2 changes: 1 addition & 1 deletion src/aqua/chain/changed_deals.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func make_change_reqs(spell_id: SpellId, deals: []JoinedDeal) -> []DealChangesRe

-- Get DealChanged logs from RPC
func get_deal_changes(spell_id: SpellId, api_endpoint: string, joined: []JoinedDeal) -> ?[]DealChangedResult:
changes: ?[]DealChangedResult
changes: *[]DealChangedResult

deals <- make_change_reqs(spell_id, joined)
if deals.length != 0:
Expand Down
2 changes: 1 addition & 1 deletion src/aqua/chain/from_block.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func get_from_block(spell_id: SpellId) -> string:
-- "latest" => latest block on chain
-- "earliest" => 0x0
func evaluate_from_block(spell_id: SpellId, chain: ChainInfo) -> ?string:
left: ?string
left: *string
from_block <- get_from_block(spell_id)
if from_block == "latest":
latest <- get_latest(spell_id, chain)
Expand Down
6 changes: 3 additions & 3 deletions src/aqua/chain/new_deals.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use "../hex.aqua" as Hex
-- return a value to use as a left boundary in `eth_newLogs`
-- it usually equals to `last seen block + 1`, except on first iteration it is equal to evaluated `from_block`
func get_left_boundary(spell_id: SpellId, chain: ChainInfo) -> ?string:
left: ?string
left: *string

-- load "last seen" block number
-- `last seen block` is initialized to the first `left boundary` this function returns
Expand Down Expand Up @@ -80,7 +80,7 @@ func filter_new(spell_id: SpellId, matches: []DealMatched, joined_deals: []Joine
new: *DealMatched
-- TODO: replace with a more optimal implementation once hashmaps land in AquaVM
for match <- matches:
joined: ?bool
joined: *bool
match_deal_id = match.info.deal_id
-- first, check if worker for that deal is created
if is_worker_created(spell_id, match_deal_id):
Expand All @@ -100,7 +100,7 @@ data Poll:
right_boundary: string

func poll_logs(spell_id: SpellId, chain: ChainInfo, left: string) -> ?Poll:
poll: ?Poll
poll: *Poll

result <- ChainConnector.poll_deal_matches(chain, left)
if !result.success:
Expand Down
2 changes: 1 addition & 1 deletion src/aqua/decider/deal_storage.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func store_deal_state(decider_id: SpellId, deal_id: DealId, state: DealState):

func get_deal_state(decider_id: SpellId, deal_id: DealId) -> ?DealState:
Spell decider_id
state: ?DealState
state: *DealState

json <- Spell.get_string(deal_id)
if json.success && !json.absent:
Expand Down
4 changes: 2 additions & 2 deletions src/aqua/decider/join_deal.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func join_deal(spell_id: SpellId, block: string, deal_id: DealId, app_cid: CID)
deal_log(spell_id, deal_id, msg)

log(["joining a deal from_block", block])
worker_id: ?WorkerId
error: ?string
worker_id: *WorkerId
error: *string
try:
id <- Worker.create(spell_id, deal_id)
if id != nil:
Expand Down
2 changes: 1 addition & 1 deletion src/aqua/fluence/worker.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func is_worker_created(spell_id: SpellId, deal_id: DealId) -> bool:
<- created!

func create(spell_id: SpellId, deal_id: DealId) -> ?WorkerId:
worker_id: ?WorkerId
worker_id: *WorkerId
try:
worker_id <- Worker.create(?[deal_id])
deal_log(spell_id, deal_id, ["created worker for deal", worker_id!])
Expand Down