Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How does one compose Streams with EventLoop Timers? #540

Open
mthiffau opened this issue Aug 11, 2022 · 4 comments
Open

How does one compose Streams with EventLoop Timers? #540

mthiffau opened this issue Aug 11, 2022 · 4 comments

Comments

@mthiffau
Copy link

I'd like to create a Stream which produces "ticks" at even time intervals. My current solution can do this by blocking the the thread which is producing the ticks with sleep calls, but obviously it would be more optimal if I could make use of a Timer.

The following doesn't work (I think because the bare 'Then' doesn't handle the Stream stuff?). Is there something similar I could do? Or would an entirely different approach be better?

class ClockActor : public StaticThreadPool::Schedulable {
  using clock = std::chrono::steady_clock;
  using milliseconds = std::chrono::milliseconds;
  using time_point = std::chrono::time_point<clock>;

public:
  ClockActor(Pinned pinned, milliseconds tick_period) :
    StaticThreadPool::Schedulable(pinned), tick_period_(tick_period) {}

  auto Ticks() {
    return Repeat()
      >> Until([this] () {
        return Schedule(Then([this] () {
          return count_ > 5; // This is just here while testing.
        }));
      })
      >> Schedule(Map([this] () mutable {
        time_point now = clock::now();
        if (last_tick_ == time_point()) {
          last_tick_ = now;
        }
        milliseconds time_since_last_tick =
          std::chrono::duration_cast<milliseconds>(now - last_tick_);
        milliseconds time_to_wait =
          std::chrono::duration_cast<milliseconds>(
            tick_period_ - (time_since_last_tick - tick_period_));
        printf("since last tick: %ld\n", time_since_last_tick.count());
        printf("time to wait: %ld\n", time_to_wait.count());
        return time_to_wait;
      }))
      >> Then([] (auto& wait_time) {
        return Timer(milliseconds(wait_time));
      })
      >> Schedule(Map([this] () mutable {
        last_tick_ = clock::now();
        return count_++;
      }));
  }

private:
  int count_ = 0;
  const milliseconds tick_period_;
  time_point last_tick_;
};
@benh
Copy link
Member

benh commented Aug 12, 2022

The following doesn't work (I think because the bare 'Then' doesn't handle the Stream stuff?). Is there something similar I could do? Or would an entirely different approach be better?

You're exactly right! The Then can not be composed with "streaming" combinators. You should be able to just swap Then with Map and then you'll be almost good to go.

A few more things you'll need to do:

  • Add Loop at the end of this pipeline so that you'll loop back to the Repeat.
  • You'll need to make sure that you've created and started an EventLoop if you already haven't, but when your code hits Timer you should get a pretty loud warning if you haven't started an event loop. From your main() you can do that pretty easily by just doing EventLoop::ConstructDefaultAndRunForeverDetached(); before doing anything else.

Also, we just landed a PR that should detect when you shouldn't be able to compose things. If you don't mind giving HEAD a try I'd love to see if the static_assert ends up being helpful for you!

@benh
Copy link
Member

benh commented Aug 13, 2022

Following up here, I had forgotten that we had added support for running the event loop from the main thread when you're using the * operator on an eventual.

So if you write a main() like so:

int main() {
  EventLoop::ConstructDefault();
  return *SomeEventualFunctionThatReturnsInt();
}

Then you should be all set. However, if you do something like this:

int main() {
  EventLoop::ConstructDefault();
  auto task = Task::Of<int>([]() {
      return SomeEventualFunctionThatReturnsInt();
  });
  std::future<int> result = task.Start("my task name");
  result.wait(); // Your main thread will BLOCK here!
  return result.get();
}

Then your main thread will be waiting on the future and there won't be a thread to run the event loop. There is a variant here however that can help:

int main() {
  EventLoop::ConstructDefault();
  auto task = Task::Of<int>([]() {
      return SomeEventualFunctionThatReturnsInt();
  });
  std::future<int> result = task.Start("my task name");
  EventLoop::Default().RunUntil(result); // Thanks for donating your main thread to run the event loop!
  return result.get();
}

It all depends on what you need!

@mthiffau
Copy link
Author

Hi Ben,

Thanks for the replies, just getting back to this now. I was trying out what you said in your first message but couldn't get it to compile till just now. I missed that the argument to the lambda in a Map() needs to be an rvalue reference (&&), though that makes sense I think in retrospect.

I also don't know why it didn't occur to me that Map could return an eventual instead of a synchronously created value. Now that totally makes sense.

Similar to the PingPong example you pointed me to, the Loop() in my code is created by a second actor, and my initial tiny pipeline looks like:

clock.Ticks() >> physics.RunPhysicsUpdates()

I was thinking a cool toy example would be to simulate a few balls (circles) bouncing around in 2d space. Generating clock ticks can be a more platform dependent thing so I split it out. I might look into hooking up a third actor to do simple rendering or something. There are other possibilities like figuring out how to take some kind of keyboard/mouse input to allow interaction with the system (eg. fling a ball). Basically I just want to try doing something reasonably complicated.

If it doesn't exist already, something I might try as a challenge is to see if I can create some kind of mechanism for sending ticks from the same clock to two different downstream actors, eg:

actorA.Stream() >> ForkStream(actorB.Listen(), actorC.Listen());

I did figure out the event loop setup and RunUntil in my main, and I found WaitForSignal so I'm using that to catch SIGINIT and stop my event loop which is super nice.

@benh
Copy link
Member

benh commented Aug 15, 2022

@mthiffau ooh, I love a fun example like that. I once had one that drew a ball moving across a screen with ncurses.

We don't currently have a ForkStream, but you could totally build it! What would be the result? Do both of the eventuals also return streams that get zipped together? Or do the eventuals return a single value? For example:

actorA.Stream()
    >> Tee(actorB.Listen(), actorC.Listen())
    >> Then(Unpack([](auto&& actor_b_listen_result, auto&& actor_c_listen_result) {
           // ...
        }));

Or:

actorA.Stream()
    >> Tee(actorB.Listen(), actorC.Listen())
    >> Map([](auto&& something_from_either_actor_b_or_actor_a) {
           // ...
        });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants