Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 79 additions & 31 deletions src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
""" Utilities for WMS
"""
import os
from pathlib import Path
from glob import glob
import subprocess
import sys
import json

Expand Down Expand Up @@ -63,8 +66,6 @@ def createJobWrapper(
if os.path.exists(jobWrapperFile):
log.verbose("Removing existing Job Wrapper for", jobID)
os.remove(jobWrapperFile)
with open(os.path.join(diracRoot, defaultWrapperLocation)) as fd:
wrapperTemplate = fd.read()

if "LogLevel" in jobParams:
logLevel = jobParams["LogLevel"]
Expand All @@ -76,34 +77,43 @@ def createJobWrapper(
pythonPath = os.path.realpath(sys.executable)
log.debug("Real python path after resolving links is: ", pythonPath)

# Making real substitutions
sitePython = os.getcwd()
if rootLocation:
sitePython = rootLocation
wrapperTemplate = wrapperTemplate.replace("@SITEPYTHON@", sitePython)

jobWrapperJsonFile = jobWrapperFile + ".json"
with open(jobWrapperJsonFile, "w", encoding="utf8") as jsonFile:
json.dump(arguments, jsonFile, ensure_ascii=False)

with open(jobWrapperFile, "w") as wrapper:
wrapper.write(wrapperTemplate)

if not rootLocation:
rootLocation = wrapperPath

# The "real" location of the jobwrapper after it is started
jobWrapperDirect = os.path.join(rootLocation, f"Wrapper_{jobID}")
jobExeFile = os.path.join(wrapperPath, f"Job{jobID}")
jobFileContents = """#!/bin/sh
{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no {}
""".format(
pythonPath,
jobWrapperDirect,
extraOptions if extraOptions else "",
logLevel,
cfgPath if cfgPath else "",
)
if "Executable" in jobParams and jobParams["Executable"] == "dirac-cwl-exec":
ret = __createCWLJobWrapper(jobID, wrapperPath, log, rootLocation)
if not ret["OK"]:
return ret
jobWrapperFile, jobWrapperJsonFile, jobExeFile, jobFileContents = ret["Value"]
else:
with open(os.path.join(diracRoot, defaultWrapperLocation)) as fd:
wrapperTemplate = fd.read()

# Making real substitutions
sitePython = os.getcwd()
if rootLocation:
sitePython = rootLocation
wrapperTemplate = wrapperTemplate.replace("@SITEPYTHON@", sitePython)

jobWrapperJsonFile = jobWrapperFile + ".json"
with open(jobWrapperJsonFile, "w", encoding="utf8") as jsonFile:
json.dump(arguments, jsonFile, ensure_ascii=False)

with open(jobWrapperFile, "w") as wrapper:
wrapper.write(wrapperTemplate)

if not rootLocation:
rootLocation = wrapperPath

# The "real" location of the jobwrapper after it is started
jobWrapperDirect = os.path.join(rootLocation, f"Wrapper_{jobID}")
jobExeFile = os.path.join(wrapperPath, f"Job{jobID}")
jobFileContents = """#!/bin/sh
{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no {}
""".format(
pythonPath,
jobWrapperDirect,
extraOptions if extraOptions else "",
logLevel,
cfgPath if cfgPath else "",
)

with open(jobExeFile, "w") as jobFile:
jobFile.write(jobFileContents)
Expand All @@ -113,11 +123,49 @@ def createJobWrapper(
"JobWrapperConfigPath": jobWrapperJsonFile,
"JobWrapperPath": jobWrapperFile,
}
if rootLocation != wrapperPath:
if rootLocation and rootLocation != wrapperPath:
generatedFiles["JobExecutableRelocatedPath"] = os.path.join(rootLocation, os.path.basename(jobExeFile))
return S_OK(generatedFiles)


def __createCWLJobWrapper(jobID, wrapperPath, log, rootLocation):
# Get the new JobWrapper
if not rootLocation:
rootLocation = wrapperPath
protoPath = Path(wrapperPath) / f"proto{jobID}"
protoPath.unlink(missing_ok=True)
log.info("Cloning JobWrapper from repository https://github.com/DIRACGrid/dirac-cwl.git into", protoPath)
try:
subprocess.run(["git", "clone", "https://github.com/DIRACGrid/dirac-cwl.git", str(protoPath)], check=True)
except subprocess.CalledProcessError:
return S_ERROR("Failed to clone the JobWrapper repository")
wrapperFound = glob(os.path.join(str(protoPath), "**", "job_wrapper_template.py"), recursive=True)
if len(wrapperFound) < 1 or not Path(wrapperFound[0]).is_file():
return S_ERROR("Could not find the JobWrapper in the cloned repository")
jobWrapperFile = wrapperFound[0]
directJobWrapperFile = str(Path(rootLocation) / Path(wrapperFound[0]).relative_to(wrapperPath))

jobWrapperJsonFile = Path(wrapperPath) / f"InputSandbox{jobID}" / "job.json"
directJobWrapperJsonFile = Path(rootLocation) / f"InputSandbox{jobID}" / "job.json"
# Create the executable file
jobExeFile = os.path.join(wrapperPath, f"Job{jobID}")
protoPath = str(Path(rootLocation) / Path(protoPath).relative_to(wrapperPath))
pixiPath = str(Path(rootLocation) / ".pixi")
jobFileContents = f"""#!/bin/bash
# Install pixi
export PIXI_NO_PATH_UPDATE=1
export PIXI_HOME={pixiPath}
curl -fsSL https://pixi.sh/install.sh | bash
export PATH="{pixiPath}/bin:$PATH"
pixi install --manifest-path {protoPath}
# Get json
dirac-wms-job-get-input {jobID} -D {rootLocation}
# Run JobWrapper
pixi run --manifest-path {protoPath} python {directJobWrapperFile} {directJobWrapperJsonFile}
"""
return S_OK((jobWrapperFile, jobWrapperJsonFile, jobExeFile, jobFileContents))


def rescheduleJobs(
jobIDs: list[int],
source: str = "",
Expand Down
16 changes: 16 additions & 0 deletions src/DIRAC/tests/Utilities/testJobDefinitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,22 @@ def parametricJobInputData():
return endOfAllJobs(J)


def cwlTest():
"""Testing the CWL executable"""

J = baseToAllJobs("CWL_Test")
J.executable = "dirac-cwl-exec"
try:
J.setInputSandbox([find_all("job.json", rootPath, "DIRAC/tests/Workflow")[0]])
except IndexError:
try:
J.setInputSandbox([find_all("job.json", ".", "DIRAC/tests/Workflow")[0]])
except IndexError: # we are in Jenkins
J.setInputSandbox([find_all("job.json", "/home/dirac", "DIRAC/tests/Workflow")[0]])

return endOfAllJobs(J)


def jobWithOutput():
"""Creates a job that uploads an output.
The output SE is not set here, so it would use the default /Resources/StorageElementGroups/SE-USER
Expand Down
36 changes: 36 additions & 0 deletions src/DIRAC/tests/Workflow/Integration/job.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"task": {
"id": ".",
"class": "CommandLineTool",
"inputs": [],
"outputs": [
{
"id": "output1",
"type": "File",
"outputBinding": {
"glob": "test-output"
}
}
],
"requirements": [],
"hints": [
{
"class": "dirac:ExecutionHooks",
"hook_plugin": "QueryBasedPlugin"
}
],
"cwlVersion": "v1.2",
"baseCommand": [
"echo",
"test file content"
],
"stdout": "test-output",
"$namespaces": {
"dirac": "schemas/dirac-metadata.json#/$defs/"
},
"$schemas": [
"schemas/dirac-metadata.json"
]
},
"input": null
}
5 changes: 3 additions & 2 deletions tests/Integration/WorkloadManagementSystem/exe-script.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#!/usr/bin/env python
"""Script to run Executable application"""
import sys
import subprocess

import shlex
import subprocess
import sys

# Main
if __name__ == "__main__":
Expand Down
36 changes: 36 additions & 0 deletions tests/Integration/WorkloadManagementSystem/job.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"task": {
"id": ".",
"class": "CommandLineTool",
"inputs": [],
"outputs": [
{
"id": "output1",
"type": "File",
"outputBinding": {
"glob": "test-output"
}
}
],
"requirements": [],
"hints": [
{
"class": "dirac:ExecutionHooks",
"hook_plugin": "QueryBasedPlugin"
}
],
"cwlVersion": "v1.2",
"baseCommand": [
"echo",
"test file content"
],
"stdout": "test-output",
"$namespaces": {
"dirac": "schemas/dirac-metadata.json#/$defs/"
},
"$schemas": [
"schemas/dirac-metadata.json"
]
},
"input": null
}
4 changes: 4 additions & 0 deletions tests/System/unitTestUserJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ def test_submit(self):
self.assertTrue(res["OK"])
jobsSubmittedList.append(res["Value"])

res = cwlTest()
self.assertTrue(res["OK"])
jobsSubmittedList.append(res["Value"])

print(f"submitted {len(jobsSubmittedList)} jobs: {','.join(str(js) for js in jobsSubmittedList)}")


Expand Down
36 changes: 36 additions & 0 deletions tests/Workflow/Integration/job.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"task": {
"id": ".",
"class": "CommandLineTool",
"inputs": [],
"outputs": [
{
"id": "output1",
"type": "File",
"outputBinding": {
"glob": "test-output"
}
}
],
"requirements": [],
"hints": [
{
"class": "dirac:ExecutionHooks",
"hook_plugin": "QueryBasedPlugin"
}
],
"cwlVersion": "v1.2",
"baseCommand": [
"echo",
"test file content"
],
"stdout": "test-output",
"$namespaces": {
"dirac": "schemas/dirac-metadata.json#/$defs/"
},
"$schemas": [
"schemas/dirac-metadata.json"
]
},
"input": null
}
Loading