diff --git a/luigi/scheduler.py b/luigi/scheduler.py
index 9f67b84c54..38222b18c6 100644
--- a/luigi/scheduler.py
+++ b/luigi/scheduler.py
@@ -154,6 +154,12 @@ class scheduler(Config):
 
     metrics_collector = parameter.EnumParameter(enum=MetricsCollectors, default=MetricsCollectors.default)
 
+    stable_done_cooldown_secs = parameter.IntParameter(default=10,
+                                                       description="Sets cooldown period to avoid running the same task twice")
+    """
+    Sets a cooldown period in seconds after a task was completed, during this period the same task will not accepted by the scheduler.
+    """
+
     def _get_retry_policy(self):
         return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window)
 
@@ -836,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
         if task is None or (task.status != RUNNING and not worker.enabled):
             return
 
+        # Ignore claims that the task is PENDING if it very recently was marked as DONE.
+        if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.stable_done_cooldown_secs:
+            return
+
         # for setting priority, we'll sometimes create tasks with unset family and params
         if not task.family:
             task.family = family
diff --git a/test/scheduler_api_test.py b/test/scheduler_api_test.py
index 73d5d5b547..500dc0ff95 100644
--- a/test/scheduler_api_test.py
+++ b/test/scheduler_api_test.py
@@ -46,6 +46,7 @@ def get_scheduler_config(self):
             'disable_window': 10,
             'retry_count': 3,
             'disable_hard_timeout': 60 * 60,
+            'stable_done_cooldown_secs': 0
         }
 
     def tearDown(self):
diff --git a/test/scheduler_test.py b/test/scheduler_test.py
index 95cce5db71..918f1e46f0 100644
--- a/test/scheduler_test.py
+++ b/test/scheduler_test.py
@@ -19,10 +19,15 @@
 import pickle
 import tempfile
 import time
+import os
+import shutil
+from multiprocessing import Process
 from helpers import unittest
 
 import luigi.scheduler
+import luigi.server
 from helpers import with_config
+from luigi.target import FileAlreadyExists
 
 
 class SchedulerIoTest(unittest.TestCase):
@@ -281,3 +286,65 @@ def test_get_pending_tasks_with_many_done_tasks(self):
 
         non_trivial_worker = scheduler_state.get_worker('NON_TRIVIAL')
         self.assertEqual({'A'}, self.get_pending_ids(non_trivial_worker, scheduler_state))
+
+
+class FailingOnDoubleRunTask(luigi.Task):
+    time_to_check_secs = 1
+    time_to_run_secs = 2
+    output_dir = luigi.Parameter(default="")
+
+    def __init__(self, *args, **kwargs):
+        super(FailingOnDoubleRunTask, self).__init__(*args, **kwargs)
+        self.file_name = os.path.join(self.output_dir, "AnyTask")
+
+    def complete(self):
+        time.sleep(self.time_to_check_secs)  # e.g., establish connection
+        exists = os.path.exists(self.file_name)
+        time.sleep(self.time_to_check_secs)  # e.g., close connection
+        return exists
+
+    def run(self):
+        time.sleep(self.time_to_run_secs)
+        if os.path.exists(self.file_name):
+            raise FileAlreadyExists(self.file_name)
+        open(self.file_name, 'w').close()
+
+
+class StableDoneCooldownSecsTest(unittest.TestCase):
+
+    def setUp(self):
+        self.p = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self.p)
+
+    def run_task(self):
+        return luigi.build([FailingOnDoubleRunTask(output_dir=self.p)],
+                           parallel_scheduling=True,
+                           parallel_scheduling_processes=2)
+
+    @with_config({'worker': {'keep_alive': 'false'}})
+    def run_same_task_twice(self, failure_expected=False):
+        server_process = Process(target=luigi.server.run)
+        process = Process(target=self.run_task)
+        try:
+            # scheduler is started
+            server_process.start()
+            # first run is started
+            process.start()
+            time.sleep(FailingOnDoubleRunTask.time_to_run_secs + FailingOnDoubleRunTask.time_to_check_secs)
+            # second run of the same task is started
+            luigi_run_result = self.run_task()
+            self.assertEqual(luigi_run_result, not failure_expected)
+        finally:
+            process.join(1)
+            server_process.terminate()
+            server_process.join(1)
+
+    @with_config({'scheduler': {'stable_done_cooldown_secs': '5'}})
+    def test_sending_same_task_twice_with_cooldown_does_not_lead_to_double_run(self):
+        self.run_same_task_twice(failure_expected=False)
+
+    @with_config({'scheduler': {'stable_done_cooldown_secs': '0'}})
+    def test_sending_same_task_twice_without_cooldown_leads_to_double_run(self):
+        self.run_same_task_twice(failure_expected=True)
diff --git a/test/task_running_resources_test.py b/test/task_running_resources_test.py
index 53273d3f50..ece733fa1c 100644
--- a/test/task_running_resources_test.py
+++ b/test/task_running_resources_test.py
@@ -21,7 +21,7 @@
 import multiprocessing
 from contextlib import contextmanager
 
-from helpers import unittest, RunOnceTask
+from helpers import unittest, RunOnceTask, with_config
 
 import luigi
 import luigi.server
@@ -73,6 +73,7 @@ def test_resource_reduction(self):
 
 class ConcurrentRunningResourcesTest(unittest.TestCase):
 
+    @with_config({'scheduler': {'stable_done_cooldown_secs': '0'}})
     def setUp(self):
         super(ConcurrentRunningResourcesTest, self).setUp()
 
diff --git a/test/worker_test.py b/test/worker_test.py
index 67d45b6a1a..6dcc2269e9 100644
--- a/test/worker_test.py
+++ b/test/worker_test.py
@@ -123,7 +123,7 @@ def run(self):
 class WorkerTest(LuigiTestCase):
 
     def run(self, result=None):
-        self.sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10)
+        self.sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10, stable_done_cooldown_secs=0)
         self.time = time.time
         with Worker(scheduler=self.sch, worker_id='X') as w, Worker(scheduler=self.sch, worker_id='Y') as w2:
             self.w = w