Skip to content

Commit 59f7f16

Browse files
authored
Merge pull request #1163 from tcely/patch-2
Write `TaskHistory` rows
2 parents 83e43a9 + 1bfcc07 commit 59f7f16

File tree

3 files changed

+111
-4
lines changed

3 files changed

+111
-4
lines changed

tubesync/common/huey.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import os
23
from functools import wraps
34
from huey import (
@@ -198,9 +199,13 @@ def on_interrupted(signal_name, task_obj, exception_obj=None, /, *, huey=None):
198199
assert hasattr(huey, 'enqueue') and callable(huey.enqueue)
199200
huey.enqueue(task_obj)
200201

202+
storage_key_prefix = 'task_history:'
203+
201204
def historical_task(signal_name, task_obj, exception_obj=None, /, *, huey=None):
202205
signal_time = utils.time_clock()
206+
signal_dt = datetime.datetime.now(datetime.timezone.utc)
203207

208+
from common.models import TaskHistory
204209
add_to_elapsed_signals = frozenset((
205210
signals.SIGNAL_INTERRUPTED,
206211
signals.SIGNAL_ERROR,
@@ -214,7 +219,7 @@ def historical_task(signal_name, task_obj, exception_obj=None, /, *, huey=None):
214219
signals.SIGNAL_EXECUTING,
215220
signals.SIGNAL_RETRYING,
216221
)) | add_to_elapsed_signals
217-
storage_key = f'task_history:{task_obj.id}'
222+
storage_key = f'{storage_key_prefix}{task_obj.id}'
218223
task_obj_attr = '_signals_history'
219224

220225
history = getattr(task_obj, task_obj_attr, None)
@@ -224,13 +229,15 @@ def historical_task(signal_name, task_obj, exception_obj=None, /, *, huey=None):
224229
key=storage_key,
225230
peek=True,
226231
) or dict(
232+
created=signal_dt,
227233
data=task_obj.data,
228234
elapsed=0,
229235
module=task_obj.__module__,
230236
name=task_obj.name,
231237
)
232238
setattr(task_obj, task_obj_attr, history)
233239
assert history is not None
240+
history['modified'] = signal_dt
234241

235242
if signal_name in recorded_signals:
236243
history[signal_name] = signal_time
@@ -240,12 +247,61 @@ def historical_task(signal_name, task_obj, exception_obj=None, /, *, huey=None):
240247
huey.get(key=storage_key)
241248
else:
242249
huey.put(key=storage_key, data=history)
250+
th, created = TaskHistory.objects.get_or_create(
251+
task_id=str(task_obj.id),
252+
name=f"{task_obj.__module__}.{task_obj.name}",
253+
queue=huey.name,
254+
)
255+
th.priority = task_obj.priority
256+
th.task_params = list((
257+
list(task_obj.args),
258+
repr(task_obj.kwargs),
259+
))
260+
if signal_name == signals.SIGNAL_EXECUTING:
261+
th.attempts += 1
262+
th.start_at = signal_dt
263+
elif exception_obj is not None:
264+
th.failed_at = signal_dt
265+
th.last_error = str(exception_obj)
266+
elif signal_name == signals.SIGNAL_ENQUEUED:
267+
from sync.models import Media, Source
268+
if not th.verbose_name and task_obj.args:
269+
key = task_obj.args[0]
270+
for model in (Media, Source,):
271+
try:
272+
model_instance = model.objects.get(pk=key)
273+
except model.DoesNotExist:
274+
pass
275+
else:
276+
if hasattr(model_instance, 'key'):
277+
th.verbose_name = f'{th.name} with: {model_instance.key}'
278+
if hasattr(model_instance, 'name'):
279+
th.verbose_name += f' / {model_instance.name}'
280+
th.end_at = signal_dt
281+
th.save()
243282

244283
# Registration of shared signal handlers
245284

246285
def register_huey_signals():
247-
from django_huey import DJANGO_HUEY, signal
286+
from django_huey import DJANGO_HUEY, get_queue, signal
248287
for qn in DJANGO_HUEY.get('queues', dict()):
249288
signal(signals.SIGNAL_INTERRUPTED, queue=qn)(on_interrupted)
250289
signal(queue=qn)(historical_task)
251290

291+
# clean up old history and results from storage
292+
q = get_queue(qn)
293+
now_time = utils.time_clock()
294+
for key in q.all_results().keys():
295+
if not key.startswith(storage_key_prefix):
296+
continue
297+
history = q.get(peek=True, key=key)
298+
if not isinstance(history, dict):
299+
continue
300+
age = datetime.timedelta(
301+
seconds=(now_time - history.get(signals.SIGNAL_EXECUTING, now_time)),
302+
)
303+
if age > datetime.timedelta(days=7):
304+
result_key = key[len(storage_key_prefix) :]
305+
q.get(peek=False, key=result_key)
306+
q.get(peek=False, key=key)
307+

tubesync/sync/signals.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete
77
from django.db.transaction import atomic, on_commit
88
from django.dispatch import receiver
9+
from django.utils import timezone
910
from django.utils.translation import gettext_lazy as _
10-
from background_task.signals import task_failed
11+
from background_task.signals import task_started, task_successful, task_failed
1112
from background_task.models import Task
1213
from common.logger import log
14+
from common.models import TaskHistory
1315
from common.utils import glob_quote, mkdir_p
1416
from .models import Source, Media, Metadata
1517
from .tasks import (
@@ -166,8 +168,55 @@ def source_post_delete(sender, instance, **kwargs):
166168
delete_task_by_source('sync.tasks.save_all_media_for_source', instance.pk)
167169

168170

169-
@receiver(task_failed, sender=Task)
171+
@receiver(task_started, dispatch_uid='sync.signals.task_task_started')
172+
@atomic(durable=False)
173+
def task_task_started(sender, **kwargs):
174+
locked_tasks = Task.objects.locked(timezone.now())
175+
for task_obj in locked_tasks:
176+
th, created = TaskHistory.objects.get_or_create(
177+
task_id=str(task_obj.pk),
178+
name=task_obj.task_name,
179+
queue=task_obj.queue,
180+
)
181+
th.attempts += 1
182+
th.end_at = task_obj.locked_at
183+
th.priority = (100 - task_obj.priority)
184+
th.repeat = task_obj.repeat
185+
th.repeat_until = task_obj.repeat_until
186+
th.start_at = task_obj.locked_at
187+
th.task_params = list(task_obj.params())
188+
th.verbose_name = task_obj.verbose_name
189+
th.save()
190+
if created:
191+
log.debug(f'Created a new task history record: {th.pk}: {th.verbose_name}')
192+
193+
194+
def merge_completed_task_into_history(task_id, task_obj):
195+
th, created = TaskHistory.objects.get_or_create(
196+
task_id=str(task_id),
197+
name=task_obj.task_name,
198+
queue=task_obj.queue,
199+
)
200+
th.end_at = task_obj.run_at
201+
th.failed_at = task_obj.failed_at
202+
th.last_error = task_obj.last_error
203+
th.repeat = task_obj.repeat
204+
th.repeat_until = task_obj.repeat_until
205+
th.start_at = task_obj.locked_at
206+
th.verbose_name = task_obj.verbose_name
207+
th.save()
208+
209+
210+
@receiver(task_successful, dispatch_uid='sync.signals.task_task_successful')
211+
@atomic(durable=False)
212+
def task_task_successful(sender, task_id, completed_task, **kwargs):
213+
merge_completed_task_into_history(task_id, completed_task)
214+
215+
216+
@receiver(task_failed, dispatch_uid='sync.signals.task_task_failed')
217+
@atomic(durable=False)
170218
def task_task_failed(sender, task_id, completed_task, **kwargs):
219+
merge_completed_task_into_history(task_id, completed_task)
171220
# Triggered after a task fails by reaching its max retry attempts
172221
obj, url = map_task_to_instance(completed_task)
173222
if isinstance(obj, Source):

tubesync/sync/tasks.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from huey import crontab as huey_crontab, signals as huey_signals
2828
from common.huey import CancelExecution, dynamic_retry, register_huey_signals
2929
from common.logger import log
30+
from common.models import TaskHistory
3031
from common.errors import ( BgTaskWorkerError, DownloadFailedException,
3132
NoFormatException, NoMediaException,
3233
NoThumbnailException, )
@@ -199,6 +200,7 @@ def cleanup_completed_tasks():
199200
log.info(f'Deleting completed tasks older than {days_to_keep} days '
200201
f'(run_at before {delta})')
201202
CompletedTask.objects.filter(run_at__lt=delta).delete()
203+
TaskHistory.objects.filter(end_at__lt=delta).delete()
202204

203205

204206
@atomic(durable=False)

0 commit comments

Comments
 (0)