Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/beast/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func initDirectories() error {
log.Infoln("Creating beast directories...")

directories := []string{
filepath.Join(core.BEAST_GLOBAL_DIR, core.BEAST_CACHE_DIR),
filepath.Join(core.BEAST_GLOBAL_DIR, core.BEAST_REMOTES_DIR),
filepath.Join(core.BEAST_GLOBAL_DIR, core.BEAST_UPLOADS_DIR),
filepath.Join(core.BEAST_GLOBAL_DIR, core.BEAST_SCRIPTS_DIR),
Expand Down
124 changes: 122 additions & 2 deletions cmd/beast/run.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,137 @@
package main

import (
"encoding/json"
"fmt"
"github.com/sdslabs/beastv4/core"
"github.com/sdslabs/beastv4/core/database"
"github.com/sdslabs/beastv4/core/manager"
"github.com/sdslabs/beastv4/pkg/remoteManager"
"os"
"os/signal"
"path/filepath"
"syscall"

"github.com/sdslabs/beastv4/api"
"github.com/sdslabs/beastv4/core/utils"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
BEAST_GRAPH_CACHE = filepath.Join(core.BEAST_GLOBAL_DIR, core.BEAST_CACHE_DIR, core.BEAST_GRAPH_CACHE)
BEAST_LEADERBOARD_CACHE = filepath.Join(core.BEAST_GLOBAL_DIR, core.BEAST_CACHE_DIR, core.BEAST_LEADERBOARD_CACHE)
)

func stopApiScheduler() {
log.Infoln("Stopping the API scheduler...")
api.BeastScheduler.Stop()
log.Infoln("API scheduler stoped")
}

func stopWorkerQueue() {
log.Infoln("Stopping the worker queue...")
manager.Q.Stop()
log.Infoln("Worker queue stopped")
}

func stopRemoteManagers() {
log.Infoln("Stopping the remote manager queue...")
remoteManager.Stop()
log.Infoln("Remote Manager queue stopped")
}

func cleanUpRunningContainers() {
log.Infoln("Cleaning up running challenges...")

challenges, err := database.QueryAllChallenges()
if err != nil {
return
}

for _, challenge := range challenges {
if challenge.Status == core.DEPLOY_STATUS["deployed"] {
err = manager.UndeployChallenge(challenge.Name)
if err != nil {
log.Errorln(fmt.Sprintf("Failed to undeploy challenge [Id: %v] %s", challenge.ID, challenge.Name))
log.Errorln(err.Error())
} else {
log.Infoln(fmt.Sprintf("Successfully undeploy challenge [Id: %v] %s", challenge.ID, challenge.Name))
}
}
}
}

func cleanUpDatabaseConnections() {
log.Infoln("Backing up database...")

err := database.BackupDatabase()
if err != nil {
log.Errorln("Error while backing up database:", err)
} else {
log.Infoln("Database backup completed successfully")
}

log.Infoln("Terminating database connection...")

err = database.TerminateDatabaseConnections()
if err != nil {
log.Errorln("Unable to terminate database connections:", err)
} else {
log.Infoln("Database connections terminated successfully")
}
}

func writeJson(data any, location string) error {
bytes, err := json.Marshal(data)
if err != nil {
return err
}

return os.WriteFile(location, bytes, 0644)
}

func saveLeaderboardCache() {
var topUsers []uint

leaderboardFresh, err := database.QueryTopUsersByFrozenScore(core.LEADERBOARD_SIZE)
if err == nil {
for i := 0; i < 10 && i < len(leaderboardFresh); i++ {
user := leaderboardFresh[i]
topUsers = append(topUsers, user.ID)
}
}

if err = writeJson(leaderboardFresh, BEAST_LEADERBOARD_CACHE); err != nil {
log.Errorln(fmt.Sprintf("Failed to write to leaderboard cache: %s", err.Error()))
}

graphFresh := database.QueryTimeSeriesForTopUsers(topUsers)
if err = writeJson(graphFresh, BEAST_GRAPH_CACHE); err != nil {
log.Errorln(fmt.Sprintf("Failed to write to leaderboard cache: %s", err.Error()))
}
}

func cleanup() {
log.Info("Starting graceful shutdown cleanup...")

stopApiScheduler()

stopWorkerQueue()
stopRemoteManagers()

saveLeaderboardCache()

cleanUpRunningContainers()
cleanUpDatabaseConnections()

// - Clean up temporary files: found no files to be cleared as of now
// - Close network connections: all ssh connections are already closed and no new network connections as of now

// - Stop background goroutines

log.Info("Graceful shutdown cleanup completed")
}

var runCmd = &cobra.Command{
Use: "run",
Short: "Run Beast API server",
Expand All @@ -36,7 +156,7 @@ var runCmd = &cobra.Command{
<-sigChan

log.Infoln("\nShutdown signal received.")
utils.Cleanup()
cleanup()
log.Infoln("Server stopped gracefully.")
},
}
3 changes: 3 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const ( //names
LOCALHOST string = "localhost"
BEAST_REMOTE_GLOBAL_DIR string = "~/.beast" // This should always be used for remote only.
DOCKER_PID string = "/var/run/docker.pid"
BEAST_GRAPH_CACHE string = "graph_cache.json"
BEAST_LEADERBOARD_CACHE string = "leaderboard.json"
)

const ( //paths
Expand All @@ -53,6 +55,7 @@ const ( //paths
BEAST_EMAIL_TEMPLATE_DIR string = "mailTemplates"
BEAST_SECRETS_DIR string = "secrets"
BEAST_EXAMPLE_DIR string = "_examples"
BEAST_CACHE_DIR string = "cache"
)

const ( //chall types
Expand Down
36 changes: 0 additions & 36 deletions core/utils/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package utils

import (
"fmt"

container_types "github.com/docker/docker/api/types"
"github.com/sdslabs/beastv4/core"
"github.com/sdslabs/beastv4/core/config"
Expand All @@ -14,41 +13,6 @@ import (
log "github.com/sirupsen/logrus"
)

// This function ensures graceful shutdown of all running processes and connections from the server
func Cleanup() {
log.Info("Starting graceful shutdown cleanup...")

// TODO: Add your specific cleanup tasks here
// For example:
// - Stop the scheduler (api.BeastScheduler.Stop())
// - Stop worker queue (manager.Q - note: no Stop() method exists, may need to implement)
// - Stop remote manager queue
// - Close database connections
// - Stop running containers
// - Clean up temporary files
// - Close network connections
// - Stop background goroutines
// - Store leaderboard cache and graph cache if leaderboard is frozen and competition not ended. and make sure to fill it abck on restart.

// Backup the database to ensure no data loss
err := database.BackupDatabase()
if err != nil {
log.Errorf("Error while backing up database: %s", err)
} else {
log.Info("Database backup completed successfully")
}

// Terminate the Database Connection for graceful shutdown
err = database.TerminateDatabaseConnections()
if err != nil {
log.Errorf("Unable to terminate database connections: %s", err)
} else {
log.Info("Database connections terminated successfully")
}

log.Info("Graceful shutdown cleanup completed")
}

func CleanupContainerByFilter(filter, filterVal string) error {
if filter != "id" && filter != "name" {
return fmt.Errorf("Not a valid filter %s", filter)
Expand Down
9 changes: 9 additions & 0 deletions pkg/remoteManager/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,12 @@ func Init() {
}
}
}

func Stop() {
for {
_, err := ServerQueue.Pop()
if err == nil {
break
}
}
}
1 change: 1 addition & 0 deletions pkg/remoteManager/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func RunCommandOnServer(server config.AvailableServer, cmd string) (string, erro
if err != nil {
return "", fmt.Errorf("failed to create session: %s", err)
}
defer client.Close()
defer session.Close()

output, err := session.CombinedOutput(cmd)
Expand Down
16 changes: 14 additions & 2 deletions pkg/workerpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,23 @@ func (q *Queue) Pop(ID string) {
q.Mux.Unlock()
}

func (q *Queue) Stop() {
ids := make([]string, len(q.InQueue))
i := 0
for id, _ := range q.InQueue {
ids[i] = id
i++
}

for _, id := range ids {
q.Pop(id)
}
}

func (q *Queue) startConcurrentWorker(i int, worker Worker) {
var newTask *Task
for {
w := <-q.TaskQueue
newTask = worker.PerformTask(w)
newTask := worker.PerformTask(w)

q.Pop(w.ID)

Expand Down