@@ -28,8 +28,13 @@ use tokio::process::{Child, Command};
2828
2929use crate :: util;
3030
31+ const DATANODE_ADDR : & str = "127.0.0.1:4100" ;
32+ const METASRV_ADDR : & str = "127.0.0.1:3002" ;
3133const SERVER_ADDR : & str = "127.0.0.1:4001" ;
3234const SERVER_LOG_FILE : & str = "/tmp/greptime-sqlness.log" ;
35+ const METASRV_LOG_FILE : & str = "/tmp/greptime-sqlness-metasrv.log" ;
36+ const FRONTEND_LOG_FILE : & str = "/tmp/greptime-sqlness-frontend.log" ;
37+ const DATANODE_LOG_FILE : & str = "/tmp/greptime-sqlness-datanode.log" ;
3338
3439pub struct Env { }
3540
@@ -48,8 +53,14 @@ impl EnvController for Env {
4853 /// Stop one [`Database`].
4954 #[ allow( clippy:: print_stdout) ]
5055 async fn stop ( & self , _mode : & str , mut database : Self :: DB ) {
51- database. server_process . kill ( ) . await . unwrap ( ) ;
52- let _ = database. server_process . wait ( ) . await ;
56+ let mut server = database. server_process ;
57+ Env :: stop_server ( & mut server) . await ;
58+ if let Some ( mut metasrv) = database. metasrv_process . take ( ) {
59+ Env :: stop_server ( & mut metasrv) . await ;
60+ }
61+ if let Some ( mut datanode) = database. datanode_process . take ( ) {
62+ Env :: stop_server ( & mut datanode) . await ;
63+ }
5364 println ! ( "Stopped DB." ) ;
5465 }
5566}
@@ -98,18 +109,99 @@ impl Env {
98109
99110 GreptimeDB {
100111 server_process,
112+ metasrv_process : None ,
113+ datanode_process : None ,
101114 client,
102115 db,
103116 }
104117 }
105118
106119 pub async fn start_distributed ( ) -> GreptimeDB {
107- todo ! ( )
120+ let cargo_build_result = Command :: new ( "cargo" )
121+ . current_dir ( util:: get_workspace_root ( ) )
122+ . args ( [ "build" , "--bin" , "greptime" ] )
123+ . stdout ( Stdio :: null ( ) )
124+ . output ( )
125+ . await
126+ . expect ( "Failed to start GreptimeDB" )
127+ . status ;
128+ if !cargo_build_result. success ( ) {
129+ panic ! ( "Failed to build GreptimeDB (`cargo build` fails)" ) ;
130+ }
131+
132+ // start a distributed GreptimeDB
133+ let mut meta_server = Env :: start_server ( "metasrv" ) ;
134+ // wait for election
135+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
136+ let mut frontend = Env :: start_server ( "frontend" ) ;
137+ let mut datanode = Env :: start_server ( "datanode" ) ;
138+
139+ for addr in [ DATANODE_ADDR , METASRV_ADDR , SERVER_ADDR ] . iter ( ) {
140+ let is_up = util:: check_port ( addr. parse ( ) . unwrap ( ) , Duration :: from_secs ( 10 ) ) . await ;
141+ if !is_up {
142+ Env :: stop_server ( & mut meta_server) . await ;
143+ Env :: stop_server ( & mut frontend) . await ;
144+ Env :: stop_server ( & mut datanode) . await ;
145+ panic ! ( "Server {addr} doesn't up in 10 seconds, quit." )
146+ }
147+ }
148+
149+ let client = Client :: with_urls ( vec ! [ SERVER_ADDR ] ) ;
150+ let db = DB :: new ( "greptime" , client. clone ( ) ) ;
151+
152+ GreptimeDB {
153+ server_process : frontend,
154+ metasrv_process : Some ( meta_server) ,
155+ datanode_process : Some ( datanode) ,
156+ client,
157+ db,
158+ }
159+ }
160+
161+ async fn stop_server ( process : & mut Child ) {
162+ process. kill ( ) . await . unwrap ( ) ;
163+ let _ = process. wait ( ) . await ;
164+ }
165+
166+ fn start_server ( subcommand : & str ) -> Child {
167+ let log_file_name = match subcommand {
168+ "datanode" => DATANODE_LOG_FILE ,
169+ "frontend" => FRONTEND_LOG_FILE ,
170+ "metasrv" => METASRV_LOG_FILE ,
171+ _ => panic ! ( "Unexpected subcommand: {subcommand}" ) ,
172+ } ;
173+ let log_file = OpenOptions :: new ( )
174+ . create ( true )
175+ . write ( true )
176+ . truncate ( true )
177+ . open ( log_file_name)
178+ . unwrap_or_else ( |_| panic ! ( "Cannot open log file at {log_file_name}" ) ) ;
179+
180+ let mut args = vec ! [ subcommand, "start" ] ;
181+ if subcommand == "frontend" {
182+ args. push ( "--metasrv-addr=0.0.0.0:3002" ) ;
183+ } else if subcommand == "datanode" {
184+ args. push ( "--rpc-addr=0.0.0.0:4100" ) ;
185+ args. push ( "--metasrv-addr=0.0.0.0:3002" ) ;
186+ args. push ( "--node-id=1" ) ;
187+ args. push ( "--data-dir=/tmp/greptimedb_node_1/data" ) ;
188+ args. push ( "--wal-dir=/tmp/greptimedb_node_1/wal" ) ;
189+ }
190+
191+ let process = Command :: new ( "./greptime" )
192+ . current_dir ( util:: get_binary_dir ( "debug" ) )
193+ . args ( args)
194+ . stdout ( log_file)
195+ . spawn ( )
196+ . expect ( "Failed to start the DB" ) ;
197+ process
108198 }
109199}
110200
111201pub struct GreptimeDB {
112202 server_process : Child ,
203+ metasrv_process : Option < Child > ,
204+ datanode_process : Option < Child > ,
113205 #[ allow( dead_code) ]
114206 client : Client ,
115207 db : DB ,
0 commit comments