Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature limit duplicates #214

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ This is how you do it
kwargs={'foo': 'bar'}, # Keyword arguments passed into function when executed
interval=60, # Time before the function is called again, in seconds
repeat=10, # Repeat this number of times (None means repeat forever)
max_in_queue=3, # Maximum times the job appears in the queue at a given moment
meta={'foo': 'bar'} # Arbitrary pickleable data on the job itself
)

Expand Down
17 changes: 16 additions & 1 deletion rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def enqueue_in(self, time_delta, func, *args, **kwargs):
def schedule(self, scheduled_time, func, args=None, kwargs=None,
interval=None, repeat=None, result_ttl=None, ttl=None,
timeout=None, id=None, description=None, queue_name=None,
meta=None):
meta=None, max_in_queue=None):
"""
Schedule a job to be periodically executed, at a certain interval.
"""
Expand All @@ -214,6 +214,8 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None,
job.meta['repeat'] = int(repeat)
if repeat and interval is None:
raise ValueError("Can't repeat a job without interval argument")
if max_in_queue is not None:
job.meta['max_in_queue'] = int(max_in_queue)
job.save()
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(scheduled_time)})
Expand Down Expand Up @@ -321,6 +323,8 @@ def epoch_to_datetime(epoch):
job_id = job_id.decode('utf-8')
try:
job = self.job_class.fetch(job_id, connection=self.connection)
if self.is_job_reach_max_in_queue(job):
continue
except NoSuchJobError:
# Delete jobs that aren't there from scheduler
self.cancel(job_id)
Expand Down Expand Up @@ -348,6 +352,17 @@ def get_queue_for_job(self, job):
return self.queue_class.from_queue_key(
key, connection=self.connection, job_class=self.job_class)

def is_job_reach_max_in_queue(self, job):
"""
Returns a boolean indicating whether the given job reach it max times
on it queue, by it max_in_queue field.
"""
max_jobs_in_queue = job.meta.get("max_in_queue", None)
if max_jobs_in_queue is None:
return False
relevant_queue = self.get_queue_for_job(job)
return relevant_queue.job_ids.count(job.id) >= max_jobs_in_queue

def enqueue_job(self, job):
"""
Move a scheduled job to a queue. In addition, it also does puts the job
Expand Down
16 changes: 16 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,22 @@ def test_job_with_interval_can_set_meta(self):
self.scheduler.enqueue_job(job)
self.assertEqual(job.meta, meta)

def test_job_not_queued_more_than_max_in_queue(self):
"""
Ensure that jobs queued not more max_in_queue times.
"""
time_now = datetime.utcnow()
interval = 1
max_in_queue = 1
scheduler_with_small_interval = Scheduler(connection=self.testconn, interval=1)
job = scheduler_with_small_interval.schedule(time_now, say_hello, interval=interval,
max_in_queue=max_in_queue, queue_name="test_queue")
time.sleep(3)

relevant_queue = scheduler_with_small_interval.get_queue_for_job(job)
num_of_appearances_in_queue = relevant_queue.job_ids.count(job.id)
self.assertLessEqual(num_of_appearances_in_queue, max_in_queue)

def test_job_with_crontab_get_rescheduled(self):
# Create a job with a cronjob_string
job = self.scheduler.cron("1 * * * *", say_hello)
Expand Down