Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions .github/workflows/pipeline_swfs_test.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
name: Test pipelines with seaweedfs
on:
workflow_dispatch:
# pull_request:
# paths:
# - tests/gh-actions/install_KinD_create_KinD_cluster_install_kustomize.sh
# - .github/workflows/pipeline_swfs_test.yaml
# - apps/pipeline/upstream/**
# - tests/gh-actions/install_istio.sh
# - tests/gh-actions/install_cert_manager.sh
# - tests/gh-actions/install_oauth2-proxy.sh
# - common/cert-manager/**
# - common/oauth2-proxy/**
# - common/istio*/**
# - experimental/seaweedfs/**
pull_request:
paths:
# - tests/gh-actions/install_KinD_create_KinD_cluster_install_kustomize.sh
- .github/workflows/pipeline_swfs_test.yaml
- apps/pipeline/upstream/**
# - tests/gh-actions/install_istio.sh
# - tests/gh-actions/install_cert_manager.sh
# - tests/gh-actions/install_oauth2-proxy.sh
# - common/cert-manager/**
# - common/oauth2-proxy/**
# - common/istio*/**
- experimental/seaweedfs/**

jobs:
build:
timeout-minutes: 15
runs-on: ubuntu-latest
runs-on:
labels: ubuntu-latest-16-cores
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -41,7 +42,7 @@ jobs:
run: kustomize build common/kubeflow-namespace/base | kubectl apply -f -

- name: Install KF Pipelines
run: ./tests/gh-actions/install_pipelines.sh
run: ./tests/gh-actions/install_pipelines_swfs.sh

- name: Install KF Multi Tenancy
run: ./tests/gh-actions/install_multi_tenancy.sh
Expand All @@ -61,13 +62,6 @@ jobs:
fi
kubectl get secret mlpipeline-minio-artifact -n "$KF_PROFILE" -o json | jq -r '.data | keys[] as $k | "\($k): \(. | .[$k] | @base64d)"' | tr '\n' ' '

- name: Install seaweedfs
run: |
kustomize build experimental/seaweedfs/istio | kubectl apply -f -
kubectl -n kubeflow wait --for=condition=available --timeout=120s deploy/seaweedfs
kubectl wait --for=condition=Ready pods --all --all-namespaces --timeout=60s --field-selector=status.phase!=Succeeded
kubectl -n kubeflow exec deploy/seaweedfs -c seaweedfs -- sh -c "echo \"s3.configure -user minio -access_key minio -secret_key minio123 -actions Read,Write,List -apply\" | /usr/bin/weed shell"

- name: Port forward
run: ./tests/gh-actions/port_forward_gateway.sh

Expand Down
28 changes: 24 additions & 4 deletions experimental/seaweedfs/base/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,27 @@ kind: Kustomization
namespace: kubeflow

resources:
- seaweedfs-deployment.yaml
- seaweedfs-pvc.yaml
- seaweedfs-service.yaml
- seadweedfs-networkpolicy.yaml
- seaweedfs/
- ../../../apps/pipeline/upstream/env/cert-manager/platform-agnostic-multi-user
configMapGenerator:
- name: kubeflow-pipelines-profile-controller-code
behavior: replace
files:
- pipeline-profile-controller/sync.py
patches:
- path: minio-service-patch.yaml
- path: pipeline-profile-controller/deployment.yaml
- patch: |-
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-pipeline-ui
spec:
template:
spec:
containers:
- name: ml-pipeline-ui
env:
- name: ARTIFACTS_SERVICE_PROXY_ENABLED
value: 'false'
$patch: merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: kubeflow-pipelines-profile-controller
spec:
template:
spec:
containers:
- name: profile-controller
securityContext:
allowPrivilegeEscalation: false
seccompProfile:
type: RuntimeDefault
runAsNonRoot: true
runAsUser: 1000
runAsGroup: 0
capabilities:
drop:
- ALL
# We just need an image with the python botocore library installed
image: docker.io/alpine/k8s:1.32.3
command: ["python", "/hooks/sync.py"]
env:
- name: KFP_VERSION
valueFrom:
configMapKeyRef:
name: pipeline-install-config
key: appVersion
- name: AWS_ENDPOINT_URL
value: http://seaweedfs:8111
- name: AWS_REGION
value: us-east-1
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mlpipeline-minio-artifact
key: accesskey
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mlpipeline-minio-artifact
key: secretkey
243 changes: 243 additions & 0 deletions experimental/seaweedfs/base/pipeline-profile-controller/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# Copyright 2020-2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import os
import base64

# From awscli installed in alpine/k8s image
import botocore.session

S3_BUCKET_NAME = 'mlpipeline'

session = botocore.session.get_session()
# To interact with seaweedfs user management. Region does not matter.
iam = session.create_client('iam', region_name='foobar')


def main():
settings = get_settings_from_env()
server = server_factory(**settings)
server.serve_forever()


def get_settings_from_env(controller_port=None,
visualization_server_image=None, frontend_image=None,
visualization_server_tag=None, frontend_tag=None, disable_istio_sidecar=None):
"""
Returns a dict of settings from environment variables relevant to the controller

Environment settings can be overridden by passing them here as arguments.

Settings are pulled from the all-caps version of the setting name. The
following defaults are used if those environment variables are not set
to enable backwards compatibility with previous versions of this script:
visualization_server_image: ghcr.io/kubeflow/kfp-visualization-server
visualization_server_tag: value of KFP_VERSION environment variable
frontend_image: ghcr.io/kubeflow/kfp-frontend
frontend_tag: value of KFP_VERSION environment variable
disable_istio_sidecar: Required (no default)
minio_access_key: Required (no default)
minio_secret_key: Required (no default)
"""
settings = dict()
settings["controller_port"] = \
controller_port or \
os.environ.get("CONTROLLER_PORT", "8080")

settings["visualization_server_image"] = \
visualization_server_image or \
os.environ.get("VISUALIZATION_SERVER_IMAGE", "ghcr.io/kubeflow/kfp-visualization-server")

settings["frontend_image"] = \
frontend_image or \
os.environ.get("FRONTEND_IMAGE", "ghcr.io/kubeflow/kfp-frontend")

# Look for specific tags for each image first, falling back to
# previously used KFP_VERSION environment variable for backwards
# compatibility
settings["visualization_server_tag"] = \
visualization_server_tag or \
os.environ.get("VISUALIZATION_SERVER_TAG") or \
os.environ["KFP_VERSION"]

settings["frontend_tag"] = \
frontend_tag or \
os.environ.get("FRONTEND_TAG") or \
os.environ["KFP_VERSION"]

settings["disable_istio_sidecar"] = \
disable_istio_sidecar if disable_istio_sidecar is not None \
else os.environ.get("DISABLE_ISTIO_SIDECAR") == "true"

return settings


def server_factory(visualization_server_image,
visualization_server_tag, frontend_image, frontend_tag,
disable_istio_sidecar, url="", controller_port=8080):
"""
Returns an HTTPServer populated with Handler with customized settings
"""
class Controller(BaseHTTPRequestHandler):
def sync(self, parent, attachments):
# parent is a namespace
namespace = parent.get("metadata", {}).get("name")

pipeline_enabled = parent.get("metadata", {}).get(
"labels", {}).get("pipelines.kubeflow.org/enabled")

if pipeline_enabled != "true":
return {"status": {}, "attachments": []}

# Compute status based on observed state.
desired_status = {
"kubeflow-pipelines-ready":
len(attachments["Secret.v1"]) == 1 and
len(attachments["ConfigMap.v1"]) == 3 and
len(attachments["Deployment.apps/v1"]) == 2 and
len(attachments["Service.v1"]) == 2 and
len(attachments["DestinationRule.networking.istio.io/v1alpha3"]) == 1 and
len(attachments["AuthorizationPolicy.security.istio.io/v1beta1"]) == 1 and
"True" or "False"
}

# Generate the desired attachment object(s).
desired_resources = [
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "kfp-launcher",
"namespace": namespace,
},
"data": {
"defaultPipelineRoot": f"minio://{S3_BUCKET_NAME}/private-artifacts/{namespace}/v2/artifacts",
},
},
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "metadata-grpc-configmap",
"namespace": namespace,
},
"data": {
"METADATA_GRPC_SERVICE_HOST":
"metadata-grpc-service.kubeflow",
"METADATA_GRPC_SERVICE_PORT": "8080",
},
},
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "artifact-repositories",
"namespace": namespace,
"annotations": {
"workflows.argoproj.io/default-artifact-repository": "default-namespaced"
}
},
"data": {
"default-namespaced": json.dumps({
"archiveLogs": True,
"s3": {
"endpoint": "minio-service.kubeflow:9000",
"bucket": S3_BUCKET_NAME,
"keyFormat": f"private-artifacts/{namespace}/{{{{workflow.name}}}}/{{{{workflow.creationTimestamp.Y}}}}/{{{{workflow.creationTimestamp.m}}}}/{{{{workflow.creationTimestamp.d}}}}/{{{{pod.name}}}}",
"insecure": True,
"accessKeySecret": {
"name": "mlpipeline-minio-artifact",
"key": "accesskey",
},
"secretKeySecret": {
"name": "mlpipeline-minio-artifact",
"key": "secretkey",
}
}
})
}
},
]
print('Received request:\n', json.dumps(parent, sort_keys=True))
print('Desired resources except secrets:\n', json.dumps(desired_resources, sort_keys=True))

# Moved after the print argument because this is sensitive data.

# Check if secret is already there when the controller made the request. If yes, then
# use it. Else create a new credentials on seaweedfs for the namespace.
if s3_secret := attachments["Secret.v1"].get(f"{namespace}/mlpipeline-minio-artifact"):
desired_resources.append(s3_secret)
print('Using existing secret')
else:
print('Creating new access key.')
s3_access_key = iam.create_access_key(UserName=namespace)
# Use the AWS IAM API of seaweedfs to manage access policies to bucket.
# This policy ensures that a user can only access artifacts from his own profile.
iam.put_user_policy(
UserName=namespace,
PolicyName=f"KubeflowProject{namespace}",
PolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"s3:Put*",
"s3:Get*",
"s3:List*"
],
"Resource": [
f"arn:aws:s3:::{S3_BUCKET_NAME}/artifacts/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/private-artifacts/{namespace}/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/private/{namespace}/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/shared/*",
]
}]
})
)
desired_resources.insert(
0,
{
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": "mlpipeline-minio-artifact",
"namespace": namespace,
},
"data": {
"accesskey": base64.b64encode(s3_access_key["AccessKey"]["AccessKeyId"].encode('utf-8')).decode("utf-8"),
"secretkey": base64.b64encode(s3_access_key["AccessKey"]["SecretAccessKey"].encode('utf-8')).decode("utf-8"),
},
})

return {"status": desired_status, "attachments": desired_resources}

def do_POST(self):
# Serve the sync() function as a JSON webhook.
observed = json.loads(
self.rfile.read(int(self.headers.get("content-length"))))
desired = self.sync(observed["object"], observed["attachments"])

self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(json.dumps(desired), 'utf-8'))

return HTTPServer((url, int(controller_port)), Controller)


if __name__ == "__main__":
main()
11 changes: 11 additions & 0 deletions experimental/seaweedfs/base/seaweedfs/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: kubeflow

resources:
- seaweedfs-deployment.yaml
- seaweedfs-pvc.yaml
- seaweedfs-networkpolicy.yaml
- seaweedfs-create-admin-user-job.yaml
- seaweedfs-service.yaml
- seaweedfs-service-account.yaml
Loading
Loading