Skip to content
This repository was archived by the owner on Jan 15, 2024. It is now read-only.

Commit 0eaee2f

Browse files
leezueric-haibin-lin
authored andcommitted
[CI] AWS Batch serverless CI Pipeline for parallel notebook execution during website build step (#791)
* batch script * Additions to ci/batch/submit-job.py * Fix docs/md2ipynb.py - Automatically set correct working directory - Manually set encoding; System environment may not be set up correctly for inferring encoding * Preserve exit code in ci/batch/docker/gluon_nlp_job.sh * Update docker image * Parallelize notebook runs via AWS Batch Jobs
1 parent 84738d9 commit 0eaee2f

File tree

9 files changed

+447
-37
lines changed

9 files changed

+447
-37
lines changed

Makefile

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,16 @@ clean:
6060

6161
compile_notebooks:
6262
for f in $(shell find docs/examples -type f -name '*.md' -print) ; do \
63-
DIR=`dirname $$f` ; \
64-
BASENAME=`basename $$f` ; \
65-
echo $$DIR $$BASENAME ; \
63+
DIR=$$(dirname $$f) ; \
64+
BASENAME=$$(basename $$f) ; \
65+
TARGETNAME=$${BASENAME%.md}.ipynb ; \
66+
echo $$DIR $$BASENAME $$TARGETNAME; \
6667
cd $$DIR ; \
67-
python $(MD2IPYNB) $$BASENAME ; \
68+
if [ -f $$TARGETNAME ]; then \
69+
echo $$TARGETNAME exists. Skipping compilation of $$BASENAME in Makefile. ; \
70+
else \
71+
python $(MD2IPYNB) $$BASENAME ; \
72+
fi ; \
6873
cd - ; \
6974
done;
7075

ci/batch/docker/Dockerfile

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
FROM nvidia/cuda:10.1-cudnn7-devel-ubuntu18.04
2+
3+
RUN apt-get update && apt-get install -y --no-install-recommends \
4+
build-essential \
5+
locales \
6+
cmake \
7+
git \
8+
curl \
9+
vim \
10+
unzip \
11+
sudo \
12+
ca-certificates \
13+
libjpeg-dev \
14+
libpng-dev \
15+
libfreetype6-dev \
16+
libxft-dev &&\
17+
rm -rf /var/lib/apt/lists/*
18+
19+
RUN curl -o ~/miniconda.sh -O https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh && \
20+
chmod +x ~/miniconda.sh && \
21+
~/miniconda.sh -b -p /opt/conda && \
22+
rm ~/miniconda.sh && \
23+
/opt/conda/bin/conda clean -ya
24+
ENV PATH /opt/conda/bin:$PATH
25+
RUN git clone https://github.com/dmlc/gluon-nlp
26+
WORKDIR gluon-nlp
27+
RUN /bin/bash -c 'CONDA_ENVS_PATH=$PWD/conda CONDA_PKGS_DIRS=$PWD/conda/pkgs conda init bash && source /root/.bashrc && conda env update --prune -p conda/gpu/py3 -f env/gpu/py3.yml && source activate ./conda/gpu/py3 && pip install -v -e . && pip install awscli && python -m spacy download en && python -m spacy download de && python -m nltk.downloader all'
28+
ADD gluon_nlp_job.sh .

ci/batch/docker/gluon_nlp_job.sh

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#!/bin/bash
2+
date
3+
echo "Args: $@"
4+
env
5+
echo "jobId: $AWS_BATCH_JOB_ID"
6+
echo "jobQueue: $AWS_BATCH_JQ_NAME"
7+
echo "computeEnvironment: $AWS_BATCH_CE_NAME"
8+
9+
SOURCE_REF=$1
10+
CONDA_ENV=$2
11+
WORK_DIR=$3
12+
COMMAND=$4
13+
SAVED_OUTPUT=$5
14+
SAVE_PATH=$6
15+
REMOTE=$7
16+
17+
if [ ! -z $REMOTE ]; then
18+
git remote set-url origin $REMOTE
19+
fi;
20+
21+
git fetch origin $SOURCE_REF:working
22+
git checkout working
23+
conda env update --prune -p conda/$CONDA_ENV -f env/$CONDA_ENV.yml
24+
source activate ./conda/$CONDA_ENV
25+
pip install -v -e .
26+
python -m spacy download en
27+
python -m spacy download de
28+
python -m nltk.downloader all
29+
pip install awscli
30+
31+
cd $WORK_DIR
32+
/bin/bash -c "$COMMAND"
33+
COMMAND_EXIT_CODE=$?
34+
if [[ -f $SAVED_OUTPUT ]]; then
35+
aws s3 cp $SAVED_OUTPUT s3://gluon-nlp-staging/$SAVE_PATH;
36+
elif [[ -d $SAVED_OUTPUT ]]; then
37+
aws s3 cp --recursive $SAVED_OUTPUT s3://gluon-nlp-staging/$SAVE_PATH;
38+
fi;
39+
exit $COMMAND_EXIT_CODE

ci/batch/submit-job.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import argparse
2+
from datetime import datetime
3+
import random
4+
import sys
5+
import time
6+
7+
import boto3
8+
from botocore.compat import total_seconds
9+
10+
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
11+
12+
parser.add_argument('--profile', help='profile name of aws account.', type=str,
13+
default=None)
14+
parser.add_argument('--region', help='Default region when creating new connections', type=str,
15+
default=None)
16+
parser.add_argument('--name', help='name of the job', type=str, default='dummy')
17+
parser.add_argument('--job-queue', help='name of the job queue to submit this job', type=str,
18+
default='gluon-nlp-jobs')
19+
parser.add_argument('--job-definition', help='name of the job job definition', type=str,
20+
default='gluon-nlp-jobs:6')
21+
parser.add_argument('--source-ref',
22+
help='ref in GluonNLP main github. e.g. master, refs/pull/500/head',
23+
type=str, default='master')
24+
parser.add_argument('--work-dir',
25+
help='working directory inside the repo. e.g. scripts/sentiment_analysis',
26+
type=str, default='scripts/bert')
27+
parser.add_argument('--saved-output',
28+
help='output to be saved, relative to working directory. '
29+
'it can be either a single file or a directory',
30+
type=str, default='.')
31+
parser.add_argument('--save-path',
32+
help='s3 path where files are saved.',
33+
type=str, default='batch/temp/{}'.format(datetime.now().isoformat()))
34+
parser.add_argument('--conda-env',
35+
help='conda environment preset to use.',
36+
type=str, default='gpu/py3')
37+
parser.add_argument('--command', help='command to run', type=str,
38+
default='git rev-parse HEAD | tee stdout.log')
39+
parser.add_argument('--remote',
40+
help='git repo address. https://github.com/dmlc/gluon-nlp',
41+
type=str, default="https://github.com/dmlc/gluon-nlp")
42+
parser.add_argument('--wait', help='block wait until the job completes. '
43+
'Non-zero exit code if job fails.', action='store_true')
44+
parser.add_argument('--timeout', help='job timeout in seconds', default=None, type=int)
45+
46+
args = parser.parse_args()
47+
48+
session = boto3.Session(profile_name=args.profile, region_name=args.region)
49+
batch, cloudwatch = [session.client(service_name=sn) for sn in ['batch', 'logs']]
50+
51+
def printLogs(logGroupName, logStreamName, startTime):
52+
kwargs = {'logGroupName': logGroupName,
53+
'logStreamName': logStreamName,
54+
'startTime': startTime,
55+
'startFromHead': True}
56+
57+
lastTimestamp = 0
58+
while True:
59+
logEvents = cloudwatch.get_log_events(**kwargs)
60+
61+
for event in logEvents['events']:
62+
lastTimestamp = event['timestamp']
63+
timestamp = datetime.utcfromtimestamp(lastTimestamp / 1000.0).isoformat()
64+
print('[{}] {}'.format((timestamp + '.000')[:23] + 'Z', event['message']))
65+
66+
nextToken = logEvents['nextForwardToken']
67+
if nextToken and kwargs.get('nextToken') != nextToken:
68+
kwargs['nextToken'] = nextToken
69+
else:
70+
break
71+
return lastTimestamp
72+
73+
74+
def getLogStream(logGroupName, jobName, jobId):
75+
response = cloudwatch.describe_log_streams(
76+
logGroupName=logGroupName,
77+
logStreamNamePrefix=jobName + '/' + jobId
78+
)
79+
logStreams = response['logStreams']
80+
if not logStreams:
81+
return ''
82+
else:
83+
return logStreams[0]['logStreamName']
84+
85+
def nowInMillis():
86+
endTime = long(total_seconds(datetime.utcnow() - datetime(1970, 1, 1))) * 1000
87+
return endTime
88+
89+
90+
def main():
91+
spin = ['-', '/', '|', '\\', '-', '/', '|', '\\']
92+
logGroupName = '/aws/batch/job'
93+
94+
jobName = args.name
95+
jobQueue = args.job_queue
96+
jobDefinition = args.job_definition
97+
command = args.command.split()
98+
wait = args.wait
99+
100+
parameters={
101+
'SOURCE_REF': args.source_ref,
102+
'WORK_DIR': args.work_dir,
103+
'SAVED_OUTPUT': args.saved_output,
104+
'SAVE_PATH': args.save_path,
105+
'CONDA_ENV': args.conda_env,
106+
'COMMAND': args.command,
107+
'REMOTE': args.remote
108+
}
109+
kwargs = dict(
110+
jobName=jobName,
111+
jobQueue=jobQueue,
112+
jobDefinition=jobDefinition,
113+
parameters=parameters,
114+
)
115+
if args.timeout is not None:
116+
kwargs['timeout'] = {'attemptDurationSeconds': args.timeout}
117+
submitJobResponse = batch.submit_job(**kwargs)
118+
119+
jobId = submitJobResponse['jobId']
120+
print('Submitted job [{} - {}] to the job queue [{}]'.format(jobName, jobId, jobQueue))
121+
122+
spinner = 0
123+
running = False
124+
status_set = set()
125+
startTime = 0
126+
127+
while wait:
128+
time.sleep(random.randint(5, 10))
129+
describeJobsResponse = batch.describe_jobs(jobs=[jobId])
130+
status = describeJobsResponse['jobs'][0]['status']
131+
if status == 'SUCCEEDED' or status == 'FAILED':
132+
print('=' * 80)
133+
print('Job [{} - {}] {}'.format(jobName, jobId, status))
134+
135+
sys.exit(status == 'FAILED')
136+
137+
elif status == 'RUNNING':
138+
logStreamName = getLogStream(logGroupName, jobName, jobId)
139+
if not running:
140+
running = True
141+
print('\rJob [{} - {}] is RUNNING.'.format(jobName, jobId))
142+
if logStreamName:
143+
print('Output [{}]:\n {}'.format(logStreamName, '=' * 80))
144+
if logStreamName:
145+
startTime = printLogs(logGroupName, logStreamName, startTime) + 1
146+
elif status not in status_set:
147+
status_set.add(status)
148+
print('\rJob [%s - %s] is %-9s... %s' % (jobName, jobId, status, spin[spinner % len(spin)]),)
149+
sys.stdout.flush()
150+
spinner += 1
151+
152+
if __name__ == '__main__':
153+
main()

ci/batch/wait-job.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import argparse
2+
from datetime import datetime
3+
import sys
4+
import time
5+
6+
import boto3
7+
from botocore.compat import total_seconds
8+
9+
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
10+
11+
parser.add_argument('--profile', help='profile name of aws account.', type=str,
12+
default=None)
13+
parser.add_argument('--job-id', help='job id to check status and wait.', type=str,
14+
default=None)
15+
16+
args = parser.parse_args()
17+
18+
session = boto3.Session(profile_name=args.profile)
19+
batch, cloudwatch = [session.client(service_name=sn) for sn in ['batch', 'logs']]
20+
21+
def printLogs(logGroupName, logStreamName, startTime):
22+
kwargs = {'logGroupName': logGroupName,
23+
'logStreamName': logStreamName,
24+
'startTime': startTime,
25+
'startFromHead': True}
26+
27+
lastTimestamp = 0
28+
while True:
29+
logEvents = cloudwatch.get_log_events(**kwargs)
30+
31+
for event in logEvents['events']:
32+
lastTimestamp = event['timestamp']
33+
timestamp = datetime.utcfromtimestamp(lastTimestamp / 1000.0).isoformat()
34+
print('[{}] {}'.format((timestamp + '.000')[:23] + 'Z', event['message']))
35+
36+
nextToken = logEvents['nextForwardToken']
37+
if nextToken and kwargs.get('nextToken') != nextToken:
38+
kwargs['nextToken'] = nextToken
39+
else:
40+
break
41+
return lastTimestamp
42+
43+
44+
def getLogStream(logGroupName, jobName, jobId):
45+
response = cloudwatch.describe_log_streams(
46+
logGroupName=logGroupName,
47+
logStreamNamePrefix=jobName + '/' + jobId
48+
)
49+
logStreams = response['logStreams']
50+
if not logStreams:
51+
return ''
52+
else:
53+
return logStreams[0]['logStreamName']
54+
55+
def nowInMillis():
56+
endTime = long(total_seconds(datetime.utcnow() - datetime(1970, 1, 1))) * 1000
57+
return endTime
58+
59+
60+
def main():
61+
spin = ['-', '/', '|', '\\', '-', '/', '|', '\\']
62+
logGroupName = '/aws/batch/job'
63+
64+
jobId = args.job_id
65+
66+
spinner = 0
67+
running = False
68+
startTime = 0
69+
70+
while True:
71+
time.sleep(1)
72+
describeJobsResponse = batch.describe_jobs(jobs=[jobId])
73+
job = describeJobsResponse['jobs'][0]
74+
status, jobName = job['status'], job['jobName']
75+
if status == 'SUCCEEDED' or status == 'FAILED':
76+
print('=' * 80)
77+
print('Job [{} - {}] {}'.format(jobName, jobId, status))
78+
break
79+
elif status == 'RUNNING':
80+
logStreamName = getLogStream(logGroupName, jobName, jobId)
81+
if not running and logStreamName:
82+
running = True
83+
print('\rJob [{} - {}] is RUNNING.'.format(jobName, jobId))
84+
print('Output [{}]:\n {}'.format(logStreamName, '=' * 80))
85+
if logStreamName:
86+
startTime = printLogs(logGroupName, logStreamName, startTime) + 1
87+
else:
88+
print('\rJob [%s - %s] is %-9s... %s' % (jobName, jobId, status, spin[spinner % len(spin)]),)
89+
sys.stdout.flush()
90+
spinner += 1
91+
92+
if __name__ == '__main__':
93+
main()

0 commit comments

Comments
 (0)