Skip to content
Merged
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ third_party/

# clion workspace.
cmake-build-*
CMakeFiles
cmake_install.cmake
35 changes: 18 additions & 17 deletions go/cmd/pserver/pserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/namsral/flag"
"github.com/topicai/candy"

"github.com/PaddlePaddle/Paddle/go/pserver"
log "github.com/sirupsen/logrus"
Expand All @@ -25,42 +26,42 @@ func main() {
flag.Parse()

level, err := log.ParseLevel(*logLevel)
if err != nil {
panic(err)
}
candy.Must(err)

log.SetLevel(level)

var idx int
var cp *pserver.Checkpoint
if *index >= 0 {
idx = *index
} else {
timeout := time.Second * time.Duration((*etcdTimeout))
e := pserver.NewEtcdClient(*etcdEndpoint, *numPservers, timeout)
idx, err = e.Register()
candy.Must(err)
cp, err = pserver.NewCheckpoint(idx, checkpointPath, e)
if err != nil {
panic(err)
log.Infof("Fetch checkpoint failed, %s\n", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log.Errorf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

s, err := pserver.NewService(idx)
if err != nil {
panic(err)
var s *pserver.Service
var err error
if cp != nil {
s, err = pserver.NewServiceFromCheckpoint(idx, cp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unify these two interface may be better, we can pass an empty checkpoint in default. then we will have only one create entry and leave out the duplicate code.

s := &Service{
	idx: idx,
}
s.optMap = make(map[string]*optimizer)
s.initialized = make(chan struct{})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thx!

} else {
s, err = pserver.NewService(idx)
}
candy.Must(err)

err = rpc.Register(s)
if err != nil {
panic(err)
}
candy.Must(err)

rpc.HandleHTTP()
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil {
panic(err)
}
candy.Must(err)

log.Infof("start pserver at port %d", *port)
err = http.Serve(l, nil)

if err != nil {
panic(err)
}
candy.Must(err)
}
21 changes: 20 additions & 1 deletion go/pserver/etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
const (
// PsDesired is etcd path for store desired pserver count
PsDesired = "/ps_desired"
// PsAddr is the base dir for pserver to store their addr
// PsPath is the base dir for pserver to store their addr
PsPath = "/ps/"
// PsCheckpoint is the etcd path for store checkpoints information
PsCheckpoint = "/checkpoints/"
)

// EtcdClient is the etcd client that the pserver uses for fault
Expand Down Expand Up @@ -186,3 +188,20 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) {

return idx, nil
}

// GetCheckpoints gets the checkpoint information by the specified pserver id
func (e *EtcdClient) GetCheckpointInfo(string idx) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this function to GetKey, caller can compute and pass in the key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

key := PsCheckpoint + idx
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is timeout defined, does this code compiles? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry :), fixed it.

resp, err := c.Get(ctx, key)
cancel()
if err != nil {
return "", err
}
kvs := resp.Kvs
if len(kvs) == 0 {
return "", nil
}
v := kvs[0].Value
return string(v), nil
}
51 changes: 50 additions & 1 deletion go/pserver/service.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package pserver

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
)

Expand Down Expand Up @@ -49,13 +53,58 @@ type Service struct {
optMap map[string]*optimizer
}

// Checkpoint saves the checkpoint for pserver
type Checkpoint struct {
UUID string `json:"uuid"`
MD5 string `json:"md5"`
Timestamp string `json:"timestamp"`
State []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why State and ParameterWithConfig does not have json tag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpoint will be initialized from the JSON data in etcd, which only the the key uuid, md5 and timestamp, but according with your other comment, separate Checkpoint and CheckpointInfo is better way, thanks for @helinwang .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ParameterWithConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with your comment on the other PR, Checkpoint need to contain checkpoints for all parameters on the pserver.
I was confused before. I think Checkpoint should be renamed to CheckpointInfo, it's actually a reference to the checkpoint file on distributed filesystem. The type of Timestamp should be changed to int64.
Perhaps something like:

type CheckpointInfo struct {
  UUID string
  Path string
  MD5 string
  Timestamp int64
}

Maybe we can have another type like:

type ParameterCheckpoint struct {
  ParameterWithConfig
  State []byte
}

type Checkpoint []ParameterCheckpoint

How do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I think separate Checkpoint and CheckpointInfo is a good idea.I will do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename ParameterCheckpoint to CheckpointData should be more clear. Then these two struct should be:

type CheckpointMeta struct {}
type CheckpointData struct {}
type Checkpoint struct {
  meta CheckpointMeta
  data CheckpointData
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, CheckpointMeta looks like a appropriate name.
Maybe we don't need the struct Checkpoint including meta and data? We can

  • Json.Umarshal from etcd to initialize CheckpointMeta.
  • gob.Decode(...) from the checkpoint file to initialize CheckpointData

I looks like more simple?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@typhoonzero How do you think about this way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed with @Yancey1989 . Do we need to separate meta and data in checkpoint ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree!

Do we need to separate meta and data in checkpoint ?

Maybe not, It's simple enouph to load meta and data into the struct separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there in only type here without variable name.

Copy link
Contributor Author

@Yancey0623 Yancey0623 Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According with: https://golang.org/ref/spec#Struct_types, a field without a explicit field name is an anonymous field, so I can initialize the Param filed of ParameterWithConfig as ParameterCheckpoint.Param = xxx

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. a brilliant feature of Go.

}

// NewCheckpoint creates a new checkpoint.
func NewCheckpoint(idx int, cpPath string, e *EtcdClient) (*Checkpoint, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to NewCheckpointFromFile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the offline discuss, I will add Load and Save method for the struct Checkponit:)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

v, err := e.GetCheckpointInfo(idx)
if err != nil {
return nil, err
}
var cp Checkpoint
if err = json.Unmarshal(v, &cp); err != nil {
return nil, err
}
fn := filepath.Join(cpPath, cp.UUID)
if _, err = os.Stat(fn); os.IsNotExist(err) {
return nil, err
}

f, err := os.Open(fn)
if err != nil {
return nil, err
}
defer f.Close()

buf, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
// TODO: create checkpoint from file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finish this TODO and then review.


return nil, nil
}

// NewServiceFromCheckpoint creates a new service with the specified checkpoint
func NewServiceFromCheckpoint(idx int, cp *Checkpoint) (*Service, error) {
// TODO: create service from checkpoint
return nil, nil
}

// NewService creates a new service, will bypass etcd registration if no
// endpoints specified.
func NewService(idx int) (*Service, error) {
s := &Service{
idx: idx,
}
s.optMap = make(map[string]*optimizer)
s.optMap = make(map[string]*optimizer)
s.initialized = make(chan struct{})
return s, nil
}
Expand Down