Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks can be run several times under certain conditions #2644

Closed
GoodDok opened this issue Feb 1, 2019 · 21 comments
Closed

Tasks can be run several times under certain conditions #2644

GoodDok opened this issue Feb 1, 2019 · 21 comments
Labels

Comments

@GoodDok
Copy link
Contributor

GoodDok commented Feb 1, 2019

Hi,
I encountered a problem of repeatedly running tasks in my pipeline running on Luigi 2.8.0.
Basically, it consists of several tasks:

  • A(input_dt) — the lowest level luigi task, it's result is stored in DB, so exists check on target takes some time;
  • B(input_dt) — has A(input_dt) as it's dependency;
  • C(input_dt) — triggered by cron once in 10 min, depends on B(input_dt) and B(input_dt - 10 min).

As you may see, each B task is checked twice, and sometimes it leads to duplicate runs.
I tried to reproduce the bug for a long time and finally I wrote a test that reproduces it locally:

import logging
import shutil
import unittest
from multiprocessing import Process
from time import sleep

import luigi
import luigi.server
from luigi.task import logger

existence_check_delay = 2
parent_sleep_time = 5
child_sleep_time = 4

logging.basicConfig(format='[%(asctime)s: %(name)-5s %(levelname)-5s %(filename)s:%(lineno)s] '
                           '[%(processName)s:%(process)d %(threadName)s] %(message)s',
                    filename="test_result.log")


class SleepyLocalTarget(luigi.LocalTarget):

    def exists(self):
        sleep(existence_check_delay)
        return super().exists()

    def touch(self):
        self.open("w").close()


class Parent(luigi.Task):
    output_dir = luigi.Parameter(default="")

    def requires(self):
        return Child(output_dir=self.output_dir)

    def output(self):
        return SleepyLocalTarget("{}/Parent".format(self.output_dir))

    def run(self):
        logger.info("============= PARENT IS RUNNING =============")
        sleep(parent_sleep_time)
        self.output().touch()


class Child(luigi.Task):
    output_dir = luigi.Parameter(default="")

    def output(self):
        return SleepyLocalTarget("{}/Child".format(self.output_dir))

    def run(self):
        logger.info("============= CHILD IS RUNNING =============")
        sleep(child_sleep_time)
        self.output().touch()


class ConcurrencyTest(unittest.TestCase):

    @staticmethod
    def test_async_run():
        temp_dir = "_temp_concurrent"
        server_process = Process(target=luigi.server.run)
        process1 = Process(target=luigi.build,
                           kwargs={"tasks": [Parent(output_dir=temp_dir)]})
        process2 = Process(target=luigi.build,
                           kwargs={"tasks": [Parent(output_dir=temp_dir)]})

        server_process.start()
        process1.start()
        sleep(2 * existence_check_delay + child_sleep_time + parent_sleep_time)
        process2.start()
        process1.join()
        process2.join()
        server_process.terminate()

        shutil.rmtree(temp_dir)

Here's the test result log: test_result.log, logger.info("PARENT IS RUNNING") was executed twice.
AFAIK, the main difference between luigi.build in the test and the real execution is in no_lock flag, but this safeguard does not work in case of the real pipeline since C tasks started from cron have different parameters.

I suspect the main problem is that while process2 check Child's state, Parent state can be changed from PENDING to DONE by process1, so the message to scheduler with PENDING status after DONE status triggers Parent task again.

Can you please take a look? Thanks in advance!

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 5, 2019

Here is a picture of what I think happens in this case:
luigi diagram

@Tarrasch
Copy link
Contributor

Tarrasch commented Feb 6, 2019

That's strange, I don't think Worker B waits with sending the status of the Parent task like the picture demonstrates. It should run complete() and the immediately say "PENDING" about the Parent task, to which the scheduler should just ignore as the scheduler knows that Parent task is RUNNING.

I haven't used luigi for almost 2 years now so I might be a bit rusty (read plain wrong). :)

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 7, 2019

@Tarrasch, I figured out how to resolve my issue without any changes: using parallel_scheduling and parallel_scheduling_processes parameters. It allows to run complete() asynchronously, so worker sends the task status almost immediately.
I believe this issue can be closed as well as PR, although I'm a bit surprised that parallel_scheduling is not enabled by default.
And thank you for the support! 👍

@dlstadther
Copy link
Collaborator

@GoodDok is there any documentation which should be updated to make this more clear?

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 7, 2019

@Tarrasch @dlstadther please don't close the issue for a while, it seems like the issue was reproduced even with the discovered parameters, but I want to make it sure.

@Tarrasch
Copy link
Contributor

Tarrasch commented Feb 7, 2019

Take your time and let us know of any discoveries.

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 8, 2019

@Tarrasch, I've created yet another unit test (tried to make it simpler) that involves only one task (AnyTask).
It's output is a file in the temporary directory, the task fails on the second run() due to 'x' mode.
complete() method emulates my situation: I connect to some remote source (DB), get the result, then close the connection.
The test shows the situation when Worker A sends DONE and after that Worker B sends PENDING.
UPDATE: removed local_scheduler parameter, scheduler is started and terminated as a part of test.

from __future__ import print_function

import os
import shutil
import tempfile
import time
import unittest
from multiprocessing import Process
from os import path

import luigi.notifications
import luigi.task_register
import luigi.worker
import luigi.server


class AnyTask(luigi.Task):
    time_to_check = 1
    time_to_run = 2
    output_dir = luigi.Parameter(default="")

    def complete(self):
        time.sleep(self.time_to_check)  # e.g., start connection
        exists = os.path.exists(path.join(self.output_dir, "AnyTask"))
        time.sleep(self.time_to_check)  # e.g., finish connection
        return exists

    def run(self):
        time.sleep(self.time_to_run)
        # 'x' mode does not allow opening file if it already exists
        open(path.join(self.output_dir, "AnyTask"), 'x').close()


class ConcurrentTest(unittest.TestCase):

    def setUp(self):
        self.p = tempfile.mkdtemp()
        self.server_process = Process(target=luigi.server.run)
        self.server_process.start()

    def tearDown(self):
        shutil.rmtree(self.p)
        self.server_process.terminate()

    def test_sending_same_task_twice_does_not_lead_to_double_run(self):
        process = Process(target=luigi.build,
                          kwargs={"tasks": [AnyTask(output_dir=self.p)],
                                  "parallel_scheduling": True,
                                  "parallel_scheduling_processes": 2})
        process.start()
        # the exact time for sleeping depends on number of cores, etc
        # the failing test should "Checking if AnyTask is complete" right before the task is done,
        # so PENDING will be sent after DONE
        time.sleep(AnyTask.time_to_run + AnyTask.time_to_check)
        # AnyTask cannot be run twice, so we will have an error if luigi tries to run it again
        self.assertTrue(luigi.build([AnyTask(output_dir=self.p)],
                                    parallel_scheduling=True,
                                    parallel_scheduling_processes=2))
        process.join()


if __name__ == '__main__':
    unittest.main()

Note: I added parallel_scheduling parameters only to show that they change nothing here: they only save from long-running completeness checks for child tasks.
Also wanted to add that my PR changes don't help for this case (should be declined).

I've updated the diagram below according to the situation:
luigi diagram 2

@kwilcox
Copy link
Contributor

kwilcox commented Feb 10, 2019

I’ve never seen this in many years of luigi ! Before I give this some testing, does your final test still fail if you use the same scheduler for each luigi.build?

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 10, 2019

Hi @kwilcox, yes, most of the time I was testing it with luigid started in a separate process without local_scheduler=True.

@Tarrasch
Copy link
Contributor

Tarrasch commented Feb 10, 2019

@GoodDok, again huge thanks for taking the time to deep dive into this.

As for your test case, whatvever you see when using "local_scheduler": True should not be considered real behaviour. It's not using the same local scheduler, I think both task instances use a separate completely in memory scheduler, there won't be any inter-worker communication. Does that make sense? -- But there are tests spinning up a real scheduler (you can still use an in memory one I think), could you try to reproduce it with that?

@Tarrasch
Copy link
Contributor

Also, how often does thing rerun for you in prod? In most cases if it doesn't happen to often I wouldn't be worried (your tasks should be able two run twice without bad side effects), but it of course depends on how much money you pay when a task runs (and how much money you spend on debugging this).

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 10, 2019

@Tarrasch, most of the time I was testing it with luigid started in a separate process, updated the test so local_scheduler won't confuse anymore. Some code may look strange since I'm not a Python programmer :)
As for rerunning, the problem is that in the current implementation some tasks that I'm running are not idempotent, so running it twice may e.g. duplicate data.

@Tarrasch
Copy link
Contributor

Alright. I see what happens now, and yes your diagram looks correct to me. To me it's basically these series of state transitions PENDING -> RUNNING -> DONE -> PENDING - RUNNING -> DONE as your diagram nicely shows.

I have to admit that this is a fully valid bug you've found and that the current model of the luigi scheduler isn't able to handle this gracefully. Normally when a task is RUNNING the scheduler will ignore others, and once DONE it will not be reported as PENDING by others. However yes there's this timing issue as you've discovered.

Given how our general recommendation to luigi users is to "rerun until it works", it would make sense that there's a period of "PENDING-distrust" directly after task completion (or actually after any DONE-marking, that would work too). Either the tasks can be configurable for this distrust time or probably better, it's dynamic based on how long the completion check took for the task that reported it as DONE (and you add it again to "double that" to get the distrust time).


What do you think?

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 12, 2019

@Tarrasch i believe it's a problem that should be considered thoroughly.
From my point of view, "PENDING-distrust" approach seems fine, and it should be discussed what to do during this period: should the late tasks "fail and retry afterwards" or some other option. As for dynamic configuration of the delay, it may be not enough in cases when "check time" is not stable (e.g. stabilizing a connection to DB may take different amount of time).
One more option that comes to my mind is to make complete() check right before running the task, like it's done for external tasks (maybe as a configurable option):

luigi/luigi/worker.py

Lines 187 to 192 in ff2be7e

if _is_external(self.task):
# External task
# TODO(erikbern): We should check for task completeness after non-external tasks too!
# This will resolve #814 and make things a lot more consistent
if self.task.complete():
status = DONE

But I'm not so familiar with the codebase and don't understand what kind of consequences such change may have.
Besides that, maybe there are some other options to consider, at the end of the day collaborators are those who make the final decision.

@Tarrasch
Copy link
Contributor

It won't be "fail and retry afterwards", it's just that the scheduler will not give the task to the worker to run - however if there are a common parents between the worker that already ran the task and the ignored worker, they will continue to "cooperate".


I'm not sure if we should do a complete() check before a task is run - it has one downside that I see if complete() is slow you don't want it to run twice, and if it's fast you won't have this problem to begin with anyway.

But yes I agree we should discuss this, but I'm not sure how many luigi users care about this given how very long time it took us to find this scheduling bug.

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 18, 2019

Well, sounds like a good way to solve the problem.

I agree, double complete() check has this downside, I suggested it only because it looks like a simple and fast solution while touching scheduler's logic sounds a bit scary for me (although maybe it's not scary at all).

I don't know, maybe it's not that important, but maybe we face an example of survival bias: the bug is pretty hard to reproduce, so someone may have moved from Luigi without a possibility to reproduce nor fix the problem.
As I mentioned before, I suspect these guys faced the same problem: https://groups.google.com/forum/#!topic/luigi-user/YCEQIEz0jLo

@Tarrasch
Copy link
Contributor

You're certainly right about the survival bias. Also it's likely those guys had the same issue.

I'll be here to help reviewing if you decide to take a shot at it.


i thought about this again. What about just have a configurable distrust time only on the server? No need to force the client to send the information on how long complete takes to run. I'm pretty sure nobody will be hurt by having it default to say 10s. What do you think?

@GoodDok
Copy link
Contributor Author

GoodDok commented Mar 6, 2019

@Tarrasch I like your idea to set a configurable distrust time on the server and 10s period seems reasonable for me. Maybe I'll think about applying this approach to resolve the issue.

@stale
Copy link

stale bot commented Jul 4, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.

@stale stale bot added the wontfix label Jul 4, 2019
@stale stale bot closed this as completed Jul 18, 2019
@GoodDok
Copy link
Contributor Author

GoodDok commented Jul 18, 2019

Waiting for the PR review: #2645

P.S.: @Tarrasch @dlstadther @honnix guys, please help me with reopening this one, I see no available options to do it myself.

@zizkebab
Copy link

stable_done_cooldown_secs does not appear in the configuration docs, changing previous behavior...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants