Skip to content

Commit 885f319

Browse files
authored
feat: Add RQ engine (#315)
Signed-off-by: Michele Dolfi <[email protected]>
1 parent d584895 commit 885f319

File tree

10 files changed

+947
-1001
lines changed

10 files changed

+947
-1001
lines changed

.github/styles/config/vocabularies/Docling/accept.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ async
55
(?i)urls
66
uvicorn
77
[Ww]ebserver
8+
RQ
9+
(?i)url
810
keyfile
911
[Ww]ebsocket(s?)
1012
[Kk]ubernetes

docling_serve/__main__.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from rich.console import Console
1212

1313
from docling_serve.settings import docling_serve_settings, uvicorn_settings
14+
from docling_serve.storage import get_scratch
1415

1516
warnings.filterwarnings(action="ignore", category=UserWarning, module="pydantic|torch")
1617
warnings.filterwarnings(action="ignore", category=FutureWarning, module="easyocr")
@@ -361,6 +362,37 @@ def run(
361362
)
362363

363364

365+
@app.command()
366+
def rq_worker() -> Any:
367+
"""
368+
Run the [bold]Docling JobKit[/bold] RQ worker.
369+
"""
370+
from docling_jobkit.convert.manager import DoclingConverterManagerConfig
371+
from docling_jobkit.orchestrators.rq.orchestrator import RQOrchestratorConfig
372+
from docling_jobkit.orchestrators.rq.worker import run_worker
373+
374+
rq_config = RQOrchestratorConfig(
375+
redis_url=docling_serve_settings.eng_rq_redis_url,
376+
results_prefix=docling_serve_settings.eng_rq_results_prefix,
377+
sub_channel=docling_serve_settings.eng_rq_sub_channel,
378+
scratch_dir=get_scratch(),
379+
)
380+
381+
cm_config = DoclingConverterManagerConfig(
382+
artifacts_path=docling_serve_settings.artifacts_path,
383+
options_cache_size=docling_serve_settings.options_cache_size,
384+
enable_remote_services=docling_serve_settings.enable_remote_services,
385+
allow_external_plugins=docling_serve_settings.allow_external_plugins,
386+
max_num_pages=docling_serve_settings.max_num_pages,
387+
max_file_size=docling_serve_settings.max_file_size,
388+
)
389+
390+
run_worker(
391+
rq_config=rq_config,
392+
cm_config=cm_config,
393+
)
394+
395+
364396
def main() -> None:
365397
app()
366398

docling_serve/app.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,17 @@ async def process_url(
417417
detail=f"Conversion is taking too long. The maximum wait time is configure as DOCLING_SERVE_MAX_SYNC_WAIT={docling_serve_settings.max_sync_wait}.",
418418
)
419419

420-
task = await orchestrator.get_raw_task(task_id=task.task_id)
420+
task_result = await orchestrator.task_result(task_id=task.task_id)
421+
if task_result is None:
422+
raise HTTPException(
423+
status_code=404,
424+
detail="Task result not found. Please wait for a completion status.",
425+
)
421426
response = await prepare_response(
422-
task=task, orchestrator=orchestrator, background_tasks=background_tasks
427+
task_id=task.task_id,
428+
task_result=task_result,
429+
orchestrator=orchestrator,
430+
background_tasks=background_tasks,
423431
)
424432
return response
425433

@@ -457,9 +465,17 @@ async def process_file(
457465
detail=f"Conversion is taking too long. The maximum wait time is configure as DOCLING_SERVE_MAX_SYNC_WAIT={docling_serve_settings.max_sync_wait}.",
458466
)
459467

460-
task = await orchestrator.get_raw_task(task_id=task.task_id)
468+
task_result = await orchestrator.task_result(task_id=task.task_id)
469+
if task_result is None:
470+
raise HTTPException(
471+
status_code=404,
472+
detail="Task result not found. Please wait for a completion status.",
473+
)
461474
response = await prepare_response(
462-
task=task, orchestrator=orchestrator, background_tasks=background_tasks
475+
task_id=task.task_id,
476+
task_result=task_result,
477+
orchestrator=orchestrator,
478+
background_tasks=background_tasks,
463479
)
464480
return response
465481

@@ -618,9 +634,17 @@ async def task_result(
618634
task_id: str,
619635
):
620636
try:
621-
task = await orchestrator.get_raw_task(task_id=task_id)
637+
task_result = await orchestrator.task_result(task_id=task_id)
638+
if task_result is None:
639+
raise HTTPException(
640+
status_code=404,
641+
detail="Task result not found. Please wait for a completion status.",
642+
)
622643
response = await prepare_response(
623-
task=task, orchestrator=orchestrator, background_tasks=background_tasks
644+
task_id=task_id,
645+
task_result=task_result,
646+
orchestrator=orchestrator,
647+
background_tasks=background_tasks,
624648
)
625649
return response
626650
except TaskNotFoundError:

docling_serve/datamodel/responses.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from docling.datamodel.document import ConversionStatus, ErrorItem
77
from docling.utils.profiling import ProfilingItem
8-
from docling_core.types.doc import DoclingDocument
8+
from docling_jobkit.datamodel.result import ExportDocumentResponse
99
from docling_jobkit.datamodel.task_meta import TaskProcessingMeta
1010

1111

@@ -18,26 +18,19 @@ class ClearResponse(BaseModel):
1818
status: str = "ok"
1919

2020

21-
class DocumentResponse(BaseModel):
22-
filename: str
23-
md_content: Optional[str] = None
24-
json_content: Optional[DoclingDocument] = None
25-
html_content: Optional[str] = None
26-
text_content: Optional[str] = None
27-
doctags_content: Optional[str] = None
28-
29-
3021
class ConvertDocumentResponse(BaseModel):
31-
document: DocumentResponse
22+
document: ExportDocumentResponse
3223
status: ConversionStatus
3324
errors: list[ErrorItem] = []
3425
processing_time: float
3526
timings: dict[str, ProfilingItem] = {}
3627

3728

3829
class PresignedUrlConvertDocumentResponse(BaseModel):
39-
status: ConversionStatus
4030
processing_time: float
31+
num_converted: int
32+
num_succeeded: int
33+
num_failed: int
4134

4235

4336
class ConvertDocumentErrorResponse(BaseModel):

docling_serve/orchestrator_factory.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from docling_jobkit.orchestrators.base_orchestrator import BaseOrchestrator
44

55
from docling_serve.settings import AsyncEngine, docling_serve_settings
6+
from docling_serve.storage import get_scratch
67

78

89
@lru_cache
@@ -20,6 +21,7 @@ def get_async_orchestrator() -> BaseOrchestrator:
2021
local_config = LocalOrchestratorConfig(
2122
num_workers=docling_serve_settings.eng_loc_num_workers,
2223
shared_models=docling_serve_settings.eng_loc_share_models,
24+
scratch_dir=get_scratch(),
2325
)
2426

2527
cm_config = DoclingConverterManagerConfig(
@@ -33,6 +35,20 @@ def get_async_orchestrator() -> BaseOrchestrator:
3335
cm = DoclingConverterManager(config=cm_config)
3436

3537
return LocalOrchestrator(config=local_config, converter_manager=cm)
38+
elif docling_serve_settings.eng_kind == AsyncEngine.RQ:
39+
from docling_jobkit.orchestrators.rq.orchestrator import (
40+
RQOrchestrator,
41+
RQOrchestratorConfig,
42+
)
43+
44+
rq_config = RQOrchestratorConfig(
45+
redis_url=docling_serve_settings.eng_rq_redis_url,
46+
results_prefix=docling_serve_settings.eng_rq_results_prefix,
47+
sub_channel=docling_serve_settings.eng_rq_sub_channel,
48+
scratch_dir=get_scratch(),
49+
)
50+
51+
return RQOrchestrator(config=rq_config)
3652
elif docling_serve_settings.eng_kind == AsyncEngine.KFP:
3753
from docling_jobkit.orchestrators.kfp.orchestrator import (
3854
KfpOrchestrator,

0 commit comments

Comments
 (0)