Skip to content

Commit

Permalink
mapM and map MxN
Browse files Browse the repository at this point in the history
  • Loading branch information
ikod committed Sep 24, 2019
1 parent 2edc6f6 commit b65b609
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 53 deletions.
233 changes: 181 additions & 52 deletions source/hio/scheduler.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import std.exception;
import core.sync.condition;
import std.algorithm;
import std.typecons;
import std.range;

//import core.stdc.string;
//import core.stdc.errno;
Expand Down Expand Up @@ -52,7 +53,10 @@ void hlSleep(Duration d) {
}

struct Box(T) {
static if (!is(T == void)) {

enum Void = is(T == void);

static if (!Void) {
T _data;
}
SocketPair _pair;
Expand All @@ -62,17 +66,16 @@ struct Box(T) {

ReturnType!F App(F, A...) (F f, A args) {
alias R = ReturnType!F;
enum Void = is(ReturnType!F == void);
Box!R box;
static if (!Void)
static if (!box.Void)
{
R r;
}
void _wrapper()
{
try
{
static if (!Void)
static if (!box.Void)
{
r = f(args);
box._data = r;
Expand Down Expand Up @@ -120,7 +123,7 @@ ReturnType!F App(F, A...) (F f, A args) {
{
throw box._exception;
}
static if (!Void)
static if (!box.Void)
{
debug tracef("joined, value = %s", box._data);
return box._data;
Expand Down Expand Up @@ -407,7 +410,6 @@ class Threaded(F, A...) : Computation if (isCallable!F) {
alias start = run;
private {
alias R = ReturnType!F;
enum Void = is(ReturnType!F == void);

F _f;
A _args;
Expand All @@ -416,6 +418,7 @@ class Threaded(F, A...) : Computation if (isCallable!F) {
Fiber _parent;
Box!R _box;
Timer _t;
enum Void = _box.Void;
}
final this(F f, A args) {
_f = f;
Expand All @@ -426,6 +429,13 @@ class Threaded(F, A...) : Computation if (isCallable!F) {
override bool ready() {
return _ready;
}
static if (!Void) {
R value() {
if (_ready)
return _box._data;
throw new NotReadyException("You can't call value for non-ready task");
}
}
override bool wait(Duration timeout = Duration.max) {
if (_ready) {
if ( _box._exception ) {
Expand Down Expand Up @@ -477,12 +487,11 @@ class Threaded(F, A...) : Computation if (isCallable!F) {
getDefaultLoop.deinit();
uninitializeLoops();
try {
debug trace("calling");
static if (!Void) {
debug trace("calling");
_box._data = App(_f, _args);
}
else {
debug trace("calling");
App(_f, _args);
}
}
Expand Down Expand Up @@ -524,6 +533,9 @@ class Task(F, A...) : Fiber, Computation if (isCallable!F) {
_args = args;
_waitor = null;
_exception = null;
static if (!Void) {
_result = R.init;
}
super(&run);
}

Expand Down Expand Up @@ -591,6 +603,7 @@ class Task(F, A...) : Fiber, Computation if (isCallable!F) {
enforce!NotReadyException(_ready, "You can't get result from not ready task");
return _result;
}
alias value = result;
}
private final void run() {
static if ( Void )
Expand Down Expand Up @@ -677,6 +690,7 @@ unittest
assert(!t200.ready);
t100.wait(300.msecs);
assert(t100.ready);
assert(t100.value == 100.msecs);
return 1;
});
assert(v == 1);
Expand Down Expand Up @@ -1264,56 +1278,171 @@ unittest {
// assert(test_value == 2);
// }

/// split execution to M threads and N fibers.
/// map
void mapMxN(F, R)(F f, R r, ulong m, ulong n) {
long chunkLen(long x, long y) {
return x / y + (x % y ? 1 : 0);
//
// split array on N balanced chunks
// (not on chunks with N members)
//
private auto splitn(T)(T a, size_t slices) {
T[] r;
if (a.length == 0) {
return r;
}
if (a.length % slices == 0) {
return chunks(a, a.length / slices).array;
}
int n;
while (n < a.length) {
auto rest = a.length - n;
auto done = slices - r.length;
auto size = rest % done ? (rest / done + 1) : rest / done;
r ~= a[n .. n + size];
n += size;
}
return r;
}
unittest {
for(int n=1; n<100; n++) {
for (int slices = 1; slices < n; slices++) {
auto r = splitn(iota(n).array, slices);
assert(r.length == slices);
assert(equal(iota(n), r.join));
}
}
}

import std.range;
// Map array on M threads and N fibers
// Non lazy. Return void if f is void.
// :
// /|\
// / | \
// ->/ | \<- M threads
// / | \
// N N N
// /|\ /|\ /|\
// ||| ||| |||
// ||| |||->|||<- N fibers
// fff fff fff
// ... ... ...
// ... ... ... <- r splitted over MxN fibers
// ... ... ..
//
auto mapMxN(F, R)(R r, F f, ulong m, ulong n) {
enum Void = is(ReturnType!F == void);

assert(m > 0 && n > 0);
assert(r.length > 0);
assert(m > 0 && n > 0 && r.length > 0);

// adjust M and N to length of r
m = min(m, r.length);

Threaded!(void delegate(R), R)[] threads;

foreach(c; chunks(r, chunkLen(r.length, m))) {
// get l/m chunks of input and start thread with this chunk as arg
auto t = threaded((R thread_chunk){
Task!(void delegate())[] fibers;
// split chunk on n parts and start fibers.
// each fiber apply f to each element
// finally it looks like
// M
// /|\
// / | \
// / | \
// / | \
// N N N
// /|\ /|\ /|\
// ||| ||| |||
// ||| ||| |||
//
foreach (fiber_chunk; chunks(thread_chunk, chunkLen(thread_chunk.length,n))) {
auto f = task({
foreach(e; fiber_chunk) {
f(e);
}
});
fibers ~= f;
f.start;
}
foreach (f; fibers) {
f.wait();
}
}, c).start;
threads ~= t;
auto fiberWorker(R fiber_chunk) {
static if (!Void) {
return fiber_chunk.map!(f).array;
} else {
fiber_chunk.each!f;
}
}
foreach (t; threads) {
t.wait();

auto threadWorker(R thread_chunk) {
auto fibers = thread_chunk.splitn(n). // split on N chunks
map!(fiber_chunk => task(&fiberWorker, fiber_chunk)). // start fiber over each chunk
array;
fibers.each!"a.start";
fibers.each!"a.wait";
static if (!Void) {
return fibers.map!"a.value".array.join;
}
}
auto threads = r.splitn(m). // split on M chunks
map!(thread_chunk => threaded(&threadWorker, thread_chunk)). // start thread over each chunk
array;
threads.each!"a.start";
threads.each!"a.wait";
static if (!Void) {
return threads.map!"a.value".array.join;
}
}

// map array on M threads
// Non lazy. Return void if f is void.
// :
// /|\
// / | \
// ->/ | \<- M threads
// / | \
// f f f
// . . .
// . . . <- r splitted over M threads
// . .
//
auto mapM(R, F)(R r, F f, ulong m) if (isArray!R) {
enum Void = is(ReturnType!F == void);

assert(m > 0 && r.length > 0);

m = min(m, r.length);

static if (Void) {
void threadWorker(R chunk) {
chunk.each!f;
}
} else {
auto threadWorker(R chunk) {
return chunk.map!f.array;
}
}

auto threads = r.splitn(m).map!(thread_chunk => threaded(&threadWorker, thread_chunk).start).array;

threads.each!"a.wait";

static if (!Void) {
return threads.map!"a.value".array.join;
}
}

unittest {
import std.range;
import std.stdio;
import core.atomic;

shared int cnt;

void f0(int arg) {
atomicOp!"+="(cnt,arg);
}

int f1(int i) {
return i * i;
}

int[] f2(int i) {
return [i,i+1];
}

App({
// woid function, updates shared counter
iota(20).array.mapMxN(&f0, 2, 3);
assert(cnt == 190);
});

cnt = 0;
App({
// woid function, updates shared counter
iota(20).array.mapM(&f0, 5);
assert(cnt == 190);
});

App({
auto r = iota(20).array.mapMxN(&f1, 2, 3);
assert(equal(r, iota(20).map!"a*a"));
});

App({
auto r = iota(20).array.mapM(&f1, 5);
assert(equal(r, iota(20).map!"a*a"));
});

App({
auto r = iota(20).array.mapM(&f2, 5);
assert(equal(r, iota(20).map!"[a, a+1]"));
});
}
2 changes: 1 addition & 1 deletion source/hio/socket/package.d
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ auto byLineSplitter(R)(R r) {
}

unittest {
import std.stdio, std.range;
import std.range;
for(int s=1;s<7;s++) {
auto result = "a\nbb\n\n\nc\n\n".chunks(s)
.array.map!"to!string(a).representation".byLineSplitter;
Expand Down

0 comments on commit b65b609

Please sign in to comment.