Skip to content

Conversation

@yy462
Copy link
Contributor

@yy462 yy462 commented Apr 3, 2025

add communication monitoring during initialization in NC, GC, LP

@yy462 yy462 requested review from Ryan-YuanLi and yh-yao April 3, 2025 02:07
Comment on lines 1088 to 1091
if args.use_cluster:
monitor = monitor or Monitor()
monitor.init_time_end()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

monitor should only been initialized at same place, if not monitor throw an error and print error info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it is correct, the reason why I do this is that I wanted to ensure monitor is always defined when use_cluster is true, in case it’s not passed correctly.
Gonna throw an error and print error info.

# Append paths relative to the current script's directory
# Use monitor passed from run_fedgraph
if args.use_cluster:
monitor = args.monitor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

monitor is an instance not a args

Copy link
Contributor Author

@yy462 yy462 Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I tried this, cause if we round the project in local, there will be some error when we pass monitor, but I will change it to the original version if we will only run the project in cluster in the future. And I also found we repeatly initial Monitor in some parts of our original code. Like In the helper functions such as run_GC_selftrain, run_GC_Fed_algorithm and run_GCFL_algorithm, we still create a new Monitor instance each time. Should we need to change this

run_LP(args)

# End total communication timing
monitor.total_comm_time_end()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea why we need total_comm_time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, already delete all the code that related total_comm_time. I think I misunderstand what you guys required. For now, I just calculate each initialization time for each run_NC, run_GC, run_LP.

monitor.init_time_start()
monitor.total_comm_time_start()

args.monitor = monitor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont do this way to pass with args

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just delete it and change it back


ray.init(address="auto")


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already deleted.

if server.use_cluster:
monitor = Monitor()
# Use monitor from server
monitor = server.monitor if hasattr(server, "monitor") else Monitor()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only initialize at one place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goi it!

Comment on lines 62 to 68
if args.fedgraph_task == "NC":
run_NC(args, data)
run_NC(args, data, monitor)
elif args.fedgraph_task == "GC":
run_GC(args, data)
run_GC(args, data, monitor)
elif args.fedgraph_task == "LP":
run_LP(args)
run_LP(args, monitor)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this, always init inside of the function

Copy link
Contributor Author

@yy462 yy462 Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that make sense, thanks for the advice. Already fix it.

Comment on lines 305 to 309
[trainer.relabel_adj.remote() for trainer in server.trainers]
if args.use_cluster:
monitor.pretrain_time_end(30)
monitor.train_time_start()

monitor.pretrain_time_end(30)
monitor.train_time_start()
#######################################################################
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could only sleep 30s when use_cluster

Comment on lines 750 to 753

if server.use_cluster:
if monitor is not None:
monitor.train_time_end(30)
fs = frame.style.apply(highlight_max).data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init inside the function so it wont be none, too many if

Comment on lines 1032 to 1037
current_dir = os.path.dirname(os.path.abspath(__file__))
ray.init()
if args.use_cluster:
# Initialize monitor and start tracking initialization time
monitor = Monitor()
if args.use_cluster and monitor is not None:
monitor.init_time_start()

# Append paths relative to the current script's directory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same issue, still use args.use_cluster

Comment on lines 302 to 304

monitor.pretrain_time_end(30)
monitor.pretrain_time_end(30 if args.use_cluster else 0)
monitor.train_time_start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too hack, could you do this if else inside of the monitor

Comment on lines 147 to 150
args = kwds.get("args", {})
self.use_encryption = (
getattr(args, "use_encryption", False)
if hasattr(args, "use_encryption")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can lead to confusion, as the original args are lost after reassignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks for the remind. I've renamed the local variable to args_obj to avoid shadowing *args, ensuring clarity and safety in this part.

Comment on lines 2 to 22
kind: ClusterConfig

metadata:
name: mlarge-1739510276
region: us-east-1

nodeGroups:
- name: head-nodes
instanceType: m5.24xlarge
desiredCapacity: 1
minSize: 0
maxSize: 1
volumeSize: 256
labels:
ray-node-type: head

- name: worker-nodes
instanceType: m5.16xlarge
desiredCapacity: 10
minSize: 10
maxSize: 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why there's a bak

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .bak file is a backup automatically created by Ray when updating the EKS cluster config to prevent accidental loss. It’s safe to ignore or delete if version control is used.

setup_cluster.sh Outdated
Comment on lines 144 to 145
"pip": ["fsspec==2023.6.0", "huggingface_hub", "tenseal"]
}' \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"fsspec==2023.6.0" may be too strict

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we can relax the version constraint to just "fsspec" to avoid potential compatibility issues.

Copy link
Contributor

@Ryan-YuanLi Ryan-YuanLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approve

Comment on lines +1063 to 1067
print(
f"[Debug] Trainer running on node IP: {ray.util.get_node_ip_address()}"
)

clients = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wonder is there some cases that trainers could run on the same ip because they are scheduled to the same pod by ray?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it’s possible — if trainers’ resource demands are small and Ray schedules multiple trainers onto the same node or pod, they will share the same IP.

@yy462 yy462 merged commit 96ed93d into main May 22, 2025
2 checks passed
@yh-yao yh-yao deleted the monitor-comm-cost branch September 18, 2025 20:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants