From 0e5eed86ee9b50c036cf9416bd22de41b5f58da0 Mon Sep 17 00:00:00 2001 From: Robby Date: Sun, 9 Mar 2025 07:56:00 -0500 Subject: [PATCH] Added parMapUnordered and mParMapUnordered. --- .../src/main/scala/org/sireum/ops/SOps.scala | 26 ++++++++++++++- .../main/scala/org/sireum/ops/SOps_Ext.scala | 32 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/library/shared/src/main/scala/org/sireum/ops/SOps.scala b/library/shared/src/main/scala/org/sireum/ops/SOps.scala index cd279347..051f5ca2 100644 --- a/library/shared/src/main/scala/org/sireum/ops/SOps.scala +++ b/library/shared/src/main/scala/org/sireum/ops/SOps.scala @@ -168,8 +168,12 @@ import org.sireum._ @pure def parMap[V, U](s: IS[Z, V], f: V => U @pure): IS[Z, U] = $ + @pure def parMapUnordered[V, U](s: IS[Z, V], f: V => U @pure): IS[Z, U] = $ + def mParMap[V, U](s: IS[Z, V], f: V => U): IS[Z, U] = $ + def mParMapUnordered[V, U](s: IS[Z, V], f: V => U): IS[Z, U] = $ + @pure def parMapCores[V, U](s: IS[Z, V], f: V => U @pure, numOfCores: Z): IS[Z, U] = $ def mParMapCores[V, U](s: IS[Z, V], f: V => U, numOfCores: Z): IS[Z, U] = $ @@ -401,10 +405,18 @@ import org.sireum._ return ISZOpsUtil.parMap(s, f) } - @pure def mParMap[U](f: T => U): IS[Z, U] = { + @pure def parMapUnordered[U](f: T => U @pure): IS[Z, U] = { + return ISZOpsUtil.parMapUnordered(s, f) + } + + def mParMap[U](f: T => U): IS[Z, U] = { return ISZOpsUtil.mParMap(s, f) } + def mParMapUnordered[U](f: T => U): IS[Z, U] = { + return ISZOpsUtil.mParMapUnordered(s, f) + } + @pure def parMapFoldLeft[U, R](f: T => U @pure, g: (R, U) => R @pure, init: R): R = { return ops.ISZOps(parMap(f)).foldLeft(g, init) } @@ -535,6 +547,8 @@ import org.sireum._ def mParMap[@mut V, @mut U](s: MS[Z, V], f: V => U): MS[Z, U] = $ + def mParMapUnordered[@mut V, @mut U](s: MS[Z, V], f: V => U): MS[Z, U] = $ + @pure def parMapCores[@mut V, @mut U](s: MS[Z, V], f: V => U @pure, numOfCores: Z): MS[Z, U] = $ def mParMapCores[@mut V, @mut U](s: MS[Z, V], f: V => U, numOfCores: Z): MS[Z, U] = $ @@ -765,11 +779,21 @@ import org.sireum._ return r } + @pure def parMapUnordered[@mut U](f: T => U @pure): MS[Z, U] = { + val r = MSZOpsUtil.mParMapUnordered(s, f) + return r + } + def mParMap[@mut U](f: T => U): MS[Z, U] = { val r = MSZOpsUtil.mParMap(s, f) return r } + def mParMapUnordered[@mut U](f: T => U): MS[Z, U] = { + val r = MSZOpsUtil.mParMapUnordered(s, f) + return r + } + @pure def parMapFoldLeft[@mut U, @mut R](f: T => U @pure, g: (R, U) => R @pure, init: R): R = { return ops.MSZOps(parMap(f)).foldLeft(g, init) } diff --git a/library/shared/src/main/scala/org/sireum/ops/SOps_Ext.scala b/library/shared/src/main/scala/org/sireum/ops/SOps_Ext.scala index d45a8b23..d1fd789a 100644 --- a/library/shared/src/main/scala/org/sireum/ops/SOps_Ext.scala +++ b/library/shared/src/main/scala/org/sireum/ops/SOps_Ext.scala @@ -84,6 +84,18 @@ object ISOps_Ext { IS[I, U](irs.map(_._2.asInstanceOf[U]).toSeq: _*)(s.companion) } + def mParMapUnordered[@index I, V, U](s: IS[I, V], f: V => U, numOfCores: Z = Runtime.getRuntime.availableProcessors): IS[I, U] = { + val ies = s.elements + val t = Thread.currentThread + val cores = if (numOfCores >= 1) numOfCores.toInt else Runtime.getRuntime.availableProcessors + val irs = + if (ies.size >= MinimumParallelThreshold) $internal.Macro.parMap(poolRef, cores, ies, { p: V => + if (t.isInterrupted) null.asInstanceOf[U] else f(p) + }).toSeq else ies.map(f) + if (Thread.interrupted()) throw new InterruptedException + IS[I, U](irs: _*)(s.companion) + } + @pure def sortWith[@index I, V](s: IS[I, V], lt: (V, V) => B): IS[I, V] = { val es = s.elements.sortWith((e1, e2) => lt(e1, e2).value) val a = s.boxer.create(s.length) @@ -103,8 +115,12 @@ object ISZOpsUtil_Ext { def mParMap[V, U](s: IS[Z, V], f: V => U): IS[Z, U] = ISOps_Ext.mParMap(s, f) + def mParMapUnordered[V, U](s: IS[Z, V], f: V => U): IS[Z, U] = ISOps_Ext.mParMapUnordered(s, f) + def parMap[V, U](s: IS[Z, V], f: V => U): IS[Z, U] = ISOps_Ext.mParMap(s, f) + def parMapUnordered[V, U](s: IS[Z, V], f: V => U): IS[Z, U] = ISOps_Ext.mParMapUnordered(s, f) + def mParMapCores[V, U](s: IS[Z, V], f: V => U, numOfCores: Z): IS[Z, U] = ISOps_Ext.mParMap(s, f, numOfCores) def parMapCores[V, U](s: IS[Z, V], f: V => U, numOfCores: Z): IS[Z, U] = ISOps_Ext.mParMap(s, f, numOfCores) @@ -127,6 +143,18 @@ object MSOps_Ext { MS[I, U](irs.map(_._2.asInstanceOf[U]).toSeq: _*)(s.companion) } + def mParMapUnordered[@index I, V, U](s: MS[I, V], f: V => U, numOfCores: Z = Runtime.getRuntime.availableProcessors): MS[I, U] = { + val ies = s.elements + val t = Thread.currentThread + val cores = if (numOfCores >= 1) numOfCores.toInt else Runtime.getRuntime.availableProcessors + val irs = + if (ies.size >= ISOps_Ext.MinimumParallelThreshold) $internal.Macro.parMap(ISOps_Ext.poolRef, cores, ies, { p: V => + if (t.isInterrupted) null.asInstanceOf[U] else f(p) + }).toSeq else ies.map(f) + if (Thread.interrupted()) throw new InterruptedException + MS[I, U](irs: _*)(s.companion) + } + @pure def sortWith[@index I, V](s: MS[I, V], lt: (V, V) => B): MS[I, V] = { val es = s.elements.sortWith((e1, e2) => lt(e1, e2).value) val a = s.boxer.create(s.length) @@ -142,8 +170,12 @@ object MSOps_Ext { object MSZOpsUtil_Ext { def mParMap[V, U](s: MS[Z, V], f: V => U): MS[Z, U] = MSOps_Ext.mParMap(s, f) + def mParMapUnordered[V, U](s: MS[Z, V], f: V => U): MS[Z, U] = MSOps_Ext.mParMapUnordered(s, f) + def parMap[V, U](s: MS[Z, V], f: V => U): MS[Z, U] = MSOps_Ext.mParMap(s, f) + def parMapUnordered[V, U](s: MS[Z, V], f: V => U): MS[Z, U] = MSOps_Ext.mParMapUnordered(s, f) + def mParMapCores[V, U](s: MS[Z, V], f: V => U, numOfCores: Z): MS[Z, U] = MSOps_Ext.mParMap(s, f, numOfCores) def parMapCores[V, U](s: MS[Z, V], f: V => U, numOfCores: Z): MS[Z, U] = MSOps_Ext.mParMap(s, f, numOfCores)