Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 14 additions & 41 deletions .github/workflows/pipeline_swfs_test.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
name: Test pipelines with seaweedfs
on:
workflow_dispatch:
# pull_request:
# paths:
# - tests/gh-actions/install_KinD_create_KinD_cluster_install_kustomize.sh
# - .github/workflows/pipeline_swfs_test.yaml
# - apps/pipeline/upstream/**
# - tests/gh-actions/install_istio.sh
# - tests/gh-actions/install_cert_manager.sh
# - tests/gh-actions/install_oauth2-proxy.sh
# - common/cert-manager/**
# - common/oauth2-proxy/**
# - common/istio*/**
# - experimental/seaweedfs/**
pull_request:
paths:
#- tests/gh-actions/install_KinD_create_KinD_cluster_install_kustomize.sh

Check warning on line 6 in .github/workflows/pipeline_swfs_test.yaml

View workflow job for this annotation

GitHub Actions / format_YAML_files

6:6 [comments] missing starting space in comment
- .github/workflows/pipeline_swfs_test.yaml
- apps/pipeline/upstream/**
#- tests/gh-actions/install_istio.sh

Check warning on line 9 in .github/workflows/pipeline_swfs_test.yaml

View workflow job for this annotation

GitHub Actions / format_YAML_files

9:6 [comments] missing starting space in comment
#- tests/gh-actions/install_cert_manager.sh

Check warning on line 10 in .github/workflows/pipeline_swfs_test.yaml

View workflow job for this annotation

GitHub Actions / format_YAML_files

10:6 [comments] missing starting space in comment
#- tests/gh-actions/install_oauth2-proxy.sh

Check warning on line 11 in .github/workflows/pipeline_swfs_test.yaml

View workflow job for this annotation

GitHub Actions / format_YAML_files

11:6 [comments] missing starting space in comment
#- common/cert-manager/**

Check warning on line 12 in .github/workflows/pipeline_swfs_test.yaml

View workflow job for this annotation

GitHub Actions / format_YAML_files

12:6 [comments] missing starting space in comment
#- common/oauth2-proxy/**

Check warning on line 13 in .github/workflows/pipeline_swfs_test.yaml

View workflow job for this annotation

GitHub Actions / format_YAML_files

13:6 [comments] missing starting space in comment
#- common/istio*/**

Check warning on line 14 in .github/workflows/pipeline_swfs_test.yaml

View workflow job for this annotation

GitHub Actions / format_YAML_files

14:6 [comments] missing starting space in comment
- experimental/seaweedfs/**

jobs:
build:
Expand Down Expand Up @@ -41,7 +41,7 @@
run: kustomize build common/kubeflow-namespace/base | kubectl apply -f -

- name: Install KF Pipelines
run: ./tests/gh-actions/install_pipelines.sh
run: ./tests/gh-actions/install_pipelines_swfs.sh

- name: Install KF Multi Tenancy
run: ./tests/gh-actions/install_multi_tenancy.sh
Expand All @@ -61,13 +61,6 @@
fi
kubectl get secret mlpipeline-minio-artifact -n "$KF_PROFILE" -o json | jq -r '.data | keys[] as $k | "\($k): \(. | .[$k] | @base64d)"' | tr '\n' ' '

- name: Install seaweedfs
run: |
kustomize build experimental/seaweedfs/istio | kubectl apply -f -
kubectl -n kubeflow wait --for=condition=available --timeout=120s deploy/seaweedfs
kubectl wait --for=condition=Ready pods --all --all-namespaces --timeout=60s --field-selector=status.phase!=Succeeded
kubectl -n kubeflow exec deploy/seaweedfs -c seaweedfs -- sh -c "echo \"s3.configure -user minio -access_key minio -secret_key minio123 -actions Read,Write,List -apply\" | /usr/bin/weed shell"

- name: Port forward
run: ./tests/gh-actions/port_forward_gateway.sh

Expand Down Expand Up @@ -100,24 +93,4 @@
done

- name: Applying Pod Security Standards restricted levels for static namespaces
run: ./tests/gh-actions/enable_restricted_PSS.sh

- name: Collect Logs on Failure
if: failure()
run: |
mkdir -p logs
kubectl get all --all-namespaces > logs/resources.txt
kubectl get events --all-namespaces --sort-by=.metadata.creationTimestamp > logs/events.txt
for namespace in kubeflow istio-system cert-manager auth kubeflow-user-example-com; do
kubectl describe pods -n $namespace > logs/$namespace-pods.txt
for pod in $(kubectl get pods -n $namespace -o jsonpath='{.items[*].metadata.name}'); do
kubectl logs -n $namespace $pod --tail=100 > logs/$namespace-$pod.txt 2>&1 || true
done
done

- name: Upload Diagnostic Logs
if: always()
uses: actions/upload-artifact@v4
with:
name: kubeflow-test-logs
path: logs/
run: ./tests/gh-actions/enable_restricted_PSS.sh

Check failure on line 96 in .github/workflows/pipeline_swfs_test.yaml

View workflow job for this annotation

GitHub Actions / format_YAML_files

96:55 [new-line-at-end-of-file] no new line character at the end of file
28 changes: 24 additions & 4 deletions experimental/seaweedfs/base/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,27 @@ kind: Kustomization
namespace: kubeflow

resources:
- seaweedfs-deployment.yaml
- seaweedfs-pvc.yaml
- seaweedfs-service.yaml
- seadweedfs-networkpolicy.yaml
- seaweedfs/
- ../../../apps/pipeline/upstream/env/cert-manager/platform-agnostic-multi-user
configMapGenerator:
- name: kubeflow-pipelines-profile-controller-code
behavior: replace
files:
- pipeline-profile-controller/sync.py
patches:
- path: minio-service-patch.yaml
- path: pipeline-profile-controller/deployment.yaml
- patch: |-
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-pipeline-ui
spec:
template:
spec:
containers:
- name: ml-pipeline-ui
env:
- name: ARTIFACTS_SERVICE_PROXY_ENABLED
value: 'false'
$patch: merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: kubeflow-pipelines-profile-controller
spec:
template:
spec:
containers:
- name: profile-controller
securityContext:
allowPrivilegeEscalation: false
seccompProfile:
type: RuntimeDefault
runAsNonRoot: true
runAsUser: 1000
runAsGroup: 0
capabilities:
drop:
- ALL
# We just need an image with the python botocore library installed
image: docker.io/alpine/k8s:1.32.3
command: ["python", "/hooks/sync.py"]
env:
- name: KFP_VERSION
valueFrom:
configMapKeyRef:
name: pipeline-install-config
key: appVersion
- name: AWS_ENDPOINT_URL
value: http://seaweedfs:8111
- name: AWS_REGION
value: us-east-1
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mlpipeline-minio-artifact
key: accesskey
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mlpipeline-minio-artifact
key: secretkey
243 changes: 243 additions & 0 deletions experimental/seaweedfs/base/pipeline-profile-controller/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# Copyright 2020-2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import os
import base64

# From awscli installed in alpine/k8s image
import botocore.session

S3_BUCKET_NAME = 'mlpipeline'

session = botocore.session.get_session()
# To interact with seaweedfs user management. Region does not matter.
iam = session.create_client('iam', region_name='foobar')


def main():
settings = get_settings_from_env()
server = server_factory(**settings)
server.serve_forever()


def get_settings_from_env(controller_port=None,
visualization_server_image=None, frontend_image=None,
visualization_server_tag=None, frontend_tag=None, disable_istio_sidecar=None):
"""
Returns a dict of settings from environment variables relevant to the controller

Environment settings can be overridden by passing them here as arguments.

Settings are pulled from the all-caps version of the setting name. The
following defaults are used if those environment variables are not set
to enable backwards compatibility with previous versions of this script:
visualization_server_image: ghcr.io/kubeflow/kfp-visualization-server
visualization_server_tag: value of KFP_VERSION environment variable
frontend_image: ghcr.io/kubeflow/kfp-frontend
frontend_tag: value of KFP_VERSION environment variable
disable_istio_sidecar: Required (no default)
minio_access_key: Required (no default)
minio_secret_key: Required (no default)
"""
settings = dict()
settings["controller_port"] = \
controller_port or \
os.environ.get("CONTROLLER_PORT", "8080")

settings["visualization_server_image"] = \
visualization_server_image or \
os.environ.get("VISUALIZATION_SERVER_IMAGE", "ghcr.io/kubeflow/kfp-visualization-server")

settings["frontend_image"] = \
frontend_image or \
os.environ.get("FRONTEND_IMAGE", "ghcr.io/kubeflow/kfp-frontend")

# Look for specific tags for each image first, falling back to
# previously used KFP_VERSION environment variable for backwards
# compatibility
settings["visualization_server_tag"] = \
visualization_server_tag or \
os.environ.get("VISUALIZATION_SERVER_TAG") or \
os.environ["KFP_VERSION"]

settings["frontend_tag"] = \
frontend_tag or \
os.environ.get("FRONTEND_TAG") or \
os.environ["KFP_VERSION"]

settings["disable_istio_sidecar"] = \
disable_istio_sidecar if disable_istio_sidecar is not None \
else os.environ.get("DISABLE_ISTIO_SIDECAR") == "true"

return settings


def server_factory(visualization_server_image,
visualization_server_tag, frontend_image, frontend_tag,
disable_istio_sidecar, url="", controller_port=8080):
"""
Returns an HTTPServer populated with Handler with customized settings
"""
class Controller(BaseHTTPRequestHandler):
def sync(self, parent, attachments):
# parent is a namespace
namespace = parent.get("metadata", {}).get("name")

pipeline_enabled = parent.get("metadata", {}).get(
"labels", {}).get("pipelines.kubeflow.org/enabled")

if pipeline_enabled != "true":
return {"status": {}, "attachments": []}

# Compute status based on observed state.
desired_status = {
"kubeflow-pipelines-ready":
len(attachments["Secret.v1"]) == 1 and
len(attachments["ConfigMap.v1"]) == 3 and
len(attachments["Deployment.apps/v1"]) == 2 and
len(attachments["Service.v1"]) == 2 and
len(attachments["DestinationRule.networking.istio.io/v1alpha3"]) == 1 and
len(attachments["AuthorizationPolicy.security.istio.io/v1beta1"]) == 1 and
"True" or "False"
}

# Generate the desired attachment object(s).
desired_resources = [
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "kfp-launcher",
"namespace": namespace,
},
"data": {
"defaultPipelineRoot": f"minio://{S3_BUCKET_NAME}/private-artifacts/{namespace}/v2/artifacts",
},
},
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "metadata-grpc-configmap",
"namespace": namespace,
},
"data": {
"METADATA_GRPC_SERVICE_HOST":
"metadata-grpc-service.kubeflow",
"METADATA_GRPC_SERVICE_PORT": "8080",
},
},
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "artifact-repositories",
"namespace": namespace,
"annotations": {
"workflows.argoproj.io/default-artifact-repository": "default-namespaced"
}
},
"data": {
"default-namespaced": json.dumps({
"archiveLogs": True,
"s3": {
"endpoint": "minio-service.kubeflow:9000",
"bucket": S3_BUCKET_NAME,
"keyFormat": f"private-artifacts/{namespace}/{{{{workflow.name}}}}/{{{{workflow.creationTimestamp.Y}}}}/{{{{workflow.creationTimestamp.m}}}}/{{{{workflow.creationTimestamp.d}}}}/{{{{pod.name}}}}",
"insecure": True,
"accessKeySecret": {
"name": "mlpipeline-minio-artifact",
"key": "accesskey",
},
"secretKeySecret": {
"name": "mlpipeline-minio-artifact",
"key": "secretkey",
}
}
})
}
},
]
print('Received request:\n', json.dumps(parent, sort_keys=True))
print('Desired resources except secrets:\n', json.dumps(desired_resources, sort_keys=True))

# Moved after the print argument because this is sensitive data.

# Check if secret is already there when the controller made the request. If yes, then
# use it. Else create a new credentials on seaweedfs for the namespace.
if s3_secret := attachments["Secret.v1"].get(f"{namespace}/mlpipeline-minio-artifact"):
desired_resources.append(s3_secret)
print('Using existing secret')
else:
print('Creating new access key.')
s3_access_key = iam.create_access_key(UserName=namespace)
# Use the AWS IAM API of seaweedfs to manage access policies to bucket.
# This policy ensures that a user can only access artifacts from his own profile.
iam.put_user_policy(
UserName=namespace,
PolicyName=f"KubeflowProject{namespace}",
PolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"s3:Put*",
"s3:Get*",
"s3:List*"
],
"Resource": [
f"arn:aws:s3:::{S3_BUCKET_NAME}/artifacts/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/private-artifacts/{namespace}/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/private/{namespace}/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/shared/*",
]
}]
})
)
desired_resources.insert(
0,
{
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": "mlpipeline-minio-artifact",
"namespace": namespace,
},
"data": {
"accesskey": base64.b64encode(s3_access_key["AccessKey"]["AccessKeyId"].encode('utf-8')).decode("utf-8"),
"secretkey": base64.b64encode(s3_access_key["AccessKey"]["SecretAccessKey"].encode('utf-8')).decode("utf-8"),
},
})

return {"status": desired_status, "attachments": desired_resources}

def do_POST(self):
# Serve the sync() function as a JSON webhook.
observed = json.loads(
self.rfile.read(int(self.headers.get("content-length"))))
desired = self.sync(observed["object"], observed["attachments"])

self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(json.dumps(desired), 'utf-8'))

return HTTPServer((url, int(controller_port)), Controller)


if __name__ == "__main__":
main()
Loading
Loading