Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions doc/design/dist/README.md → doc/design/cluster_train/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The master process will:
- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass.


#### Task
#### Task

A task is a data shard to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size.

Expand Down Expand Up @@ -78,7 +78,7 @@ The communication pattern between the trainers and the parameter servers depends
- Synchronous Stochastic Gradient Descent (sync-SGD)

Parameter server will wait for all trainer finish n-th mini-batch calculation and send their gradients before broadcasting new parameters to every trainer. Every trainer will wait for the new parameters before starting n+1-th mini-batch.

- Asynchronous Stochastic Gradient Descent (async-SGD)

There will no synchronization between different trainers, and parameter server updates its parameter as soon as it receives new gradient:
Expand Down Expand Up @@ -118,8 +118,6 @@ When the master is started by the Kubernetes, it executes the following steps at
1. Watches the trainer prefix keys `/trainer/` on etcd to find the live trainers.
1. Starts dispatching the tasks to the trainers, and updates task queue using an etcd transaction to ensure lock is held during the update.

The master process will kill itself if its etcd lease expires.

When the master process is dead for any reason, Kubernetes will restart it. It will be online again with all states recovered from etcd in few minutes.

### Trainer Process
Expand All @@ -132,6 +130,8 @@ When the trainer is started by the Kubernetes, it executes the following steps a

If trainer's etcd lease expires, it will try set key `/trainer/<unique ID>` again so that the master process can discover the trainer again.

Whenever a trainer fails, the master process is responsible to schedule the failed task back to "todo queue". then kubernetes will try to start the trainer somewhere else, then the recovered trainer will try to fetch new task to continue the training.
Copy link
Contributor

@helinwang helinwang Apr 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the master process is responsible to schedule the failed task back to "todo queue"

这个之前貌似已经讲过了:

If a task timeout. the master process will move it back to the todo queue

为了避免混淆是不是可以去掉它,变成:

Whenever a trainer fails, then kubernetes will try to start the trainer somewhere else, then the recovered trainer will try to fetch new task to continue the training.

Copy link
Collaborator

@wangkuiyi wangkuiyi Apr 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一点儿英语建议:

When a trainer fails, Kuberentes would try to restart it. The recovered trainer would fetch tasks from the TODO queue and go on training.

  • Whenever 是个很强的语气。这里不需要。
  • “万一。。。挂了”是个虚拟语气,应用用would而不是will。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Done.


### Parameter Server Process

When the parameter server is started by Kubernetes, it executes the following steps at startup:
Expand All @@ -140,11 +140,11 @@ When the parameter server is started by Kubernetes, it executes the following st
1. Search through etcd keys `/ps/<index>` (`/ps/0`, `/ps/1`, ...) to find the first non-existant key whose index is smaller than the total number of parameter servers. Set the key using a transaction to avoid concurrent writes. The parameter server's index is inferred from the key name.

The desired number of parameter servers is 3:

<img src="src/paddle-ps-0.png"/>

The third parameter server joined:

<img src="src/paddle-ps-1.png"/>

1. The parameter server can load parameters if there are already saved parameters in the save path (inferred from its index).
Expand All @@ -153,6 +153,13 @@ When the parameter server is started by Kubernetes, it executes the following st
If the parameter server's etcd lease expires, the parameter server will kill itself.


## Parameter Server Checkpointing
See [here](./checkpointing.md)

## Store and dispatching trainning data
See [here](./data_dispatch.md)


## Dynamic Scaling

### Trainer Scaling
Expand Down
76 changes: 76 additions & 0 deletions doc/design/cluster_train/checkpointing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Paddle大规模分布式训练设计

## 概览
参考[这里](./README.md)

## 分布式训练架构
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这一节感觉并不属于checkpointing.md,但是有很多有用的内容,能否想一下怎么融合到readme.md里面?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. 求review


常见的深度学习分布式训练的架构如图:

<img src="src/trainer.png" width="500"/>

为了完成一个深度学习的训练任务,集群中会运行多个trainer和parameter server,每个trainer启动时,会先尝试从parameter server集群下载最新的参数,然后以mini-batch为单位读取训练数据集中的一部分数据(Data shard)。trainer会在训练过程中持续与parameter server通讯,上传计算出来的梯度以及下载最新的模型。

每个parameter server保存所有parameter的一个分片(Global model shard),并负责接受所有trainer发送的梯度,完成SGD和优化算法,然后发送更新后的parameter到每个trainer。

这样,通过trainer和parameter server的分布式协作,可以完成神经网络的SGD方法的训练。Paddle可以同时支持同步SGD(synchronize SGD)和异步SGD(asynchronize SGD)。

在使用同步SGD训练神经网络时,Paddle使用同步屏障(barrier),使梯度的提交和参数的更新按照顺序方式执行。在异步SGD中,则并不会等待所有trainer提交梯度才更新参数,这样极大的提高了计算的并行性:parameter server之间不相互依赖,并行的接收梯度和更新参数,parameter server也不会等待trainer全部都提交梯度之后才开始下一步,trainer之间也不会相互依赖,并行的执行模型的训练。可以看出,虽然异步SGD方式会提高参数更新并行度, 但是并不能保证参数同步更新,在任意时间某一台parameter server上保存的参数可能比另一台要更新,与同步SGD相比,梯度会有噪声。

在上面的分布式计算模型中,使用异步SGD比同步SGD可以一定程度的提供训练任务的容灾性。假设在某一时刻,一个trainer进程停止工作,其他的trainer仍然可以完成对部分数据的训练。

参考上面所描述的Paddle实现细节,可以进一步的优化以下方面:
1. 目前模型的参数是保存在parameter server进程的内存中的。在同步SGD或异步SGD训练过程中任意一台parameter server不能异常退出,否则参数丢失,训练不能继续执行。需要考虑每个模型分片(model shard)保存多个副本(replica)防止parameter server单点故障。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多个replica是指会有一个replica parameter server一直replicate主parameter server,随时待命么?
这个确实可以保证high availability,感觉实现起来需要额外代码,运行起来也增加集群负担。咱们已经有parameter server的fault recovery,并且每次recover在几分钟就可以恢复,加上训练过程不需要high available,等几分钟没事,所以要不要考虑不用replica了?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个已经是之前的考虑了,已经删除。

1. 不能在一个训练任务中动态的增加或减少Trainer个数或parameter个数(异步SGD是否可以增加Trainer?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

异步SGD可以增加Trainer,同步SGD做一些协调也是能够增加减少trainer的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上,这里已经删除。

1. 在同步SGD训练过程中,需要保证参数更新满足事务性操作。即可能在更新参数过程中,存放这个参数的shard所在的服务器故障,就需要rollback并重新更新这个参数shard的其他存活副本。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

考虑到深度学习随机性很强,这一条感觉有点得不偿失,增加系统复杂度。是不是可以删了:)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上,这里已经删除。

1. 为了支持大量的训练任务和使用模型的应用在一个集群上,需要支持训练任务节点的伸缩。
1. 支持训练任务的前置任务和后置任务,支持训练任务的定时调度和对在线流式数据的处理

## 模型参数检查点(Checkpointing)
模型数据检查点的实现,可以有效的避免parameter server的单点或多点同时故障。模型参数检查点通过定期向磁盘上保存一份存储在parameter server内存中的模型数据的完整镜像,来保证训练过程可以从中间状态重新启动。在一个不可中断并缺少备份的训练任务中,可以通过阶段性的保存每个parameter server的数据快照(snapshot)到 ***分布式存储服务/分布式存储挂载点*** 达到容灾的目的,比如每隔10分钟或1小时保存最新的快照,并删除更早的快照。在出现单点故障时,只需要恢复这台节点,或者将这台节点迁移到另一个节点并启动即可恢复训练任务。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

比如每隔10分钟或1小时保存最新的快照

这里说的是ps(parameter server)进程做checkpointing吧,这块实际实现时觉得可能需要仔细思考下。 同步SGD,是完全同步的状态,如果是每个ps进程保存一个分片,一个完整的模型通常保存的是同一个batch_id(感觉说时间不是特别准确)的参数。 异步SGD,ps的参数还分片吗?checkpointing机制会和同步SGD完全一样吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qingqing01 感谢评论~

目前sync SGD和async SGD保存checkpoint的方式是一样的。

在之前的版本中是考虑了sync SGD情况下的多个ps之间的checkpoint同步的机制的,即在每n个min_batch时,触发训练暂停,然后等待所有ps完成checkpoint保存,再开始训练。这个方式相对现在的实现比较复杂。由于checkpoint恢复如果不是恢复到一个全局同步的状态,而是有的ps的参数更新,有的ps的参数相对较老,会引入更多随机性,实际对训练的影响也不会很大(参考async SGD的思路)。所以先选择简单的实现方式。我会把checkpoint同步机制放在TODO中。

对于触发方式,还是觉得每n个mini_batch触发checkpoint会合适点? @helinwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请问“分布式存储服务/分布式存储挂载点”的区别是啥?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

避免歧义,只保留了“分布式存储服务”,下面的内容会说明挂载方式。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“10分钟或1小时”中的1小时太久了,感觉就写10分钟就好。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

时间间隔会作为提交训练的参数,用户可以设置。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我建议(不一定对),暴露给用户的可设置东西尽可能少:fault tolerant镜像备份时间间隔这个设置感觉太细了,用户也不懂。
举个例子:go语言的gc这么复杂的东西,用户只能设定一个变量。java gc有很多变量可以调,但是可能大家并不知道怎么用。


<img src="src/checkpointing.png" width="500"/>

### 快照保存的设计如下:

说明:

* parameter server在集群中启动后,自动挂载分布式存储目录,并把快照保存到这个目录下。
* 所有parameter server和trainer在etcd上注册自己的id节点为TTL节点`/ps/[id]``/trainer/[id]`,并保持心跳。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉这一条,特别是trainer部分,跟checkpoing无关吧。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已删。Done.

* ***注:trainer在故障恢复后,master会将失败的task重新分配给恢复的trainer执行。这样会引入更大的随机性。***
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我理解的checkpoint是指parameter server的checkpoing,这个是trainer相关的,貌似跟checkpoint无关。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* ***注:parameter server在保存检查点时,利用了Linux内核的“写时复制”技术,在fork的进程中保存检查点,原进程可以继续接收trainer的梯度更新请求,而不影响检查点数据的保存。***
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得直接加一个read write lock就好了,写磁盘的时候获取read lock,不允许内存写入。

如果使用“写时复制”,写磁盘的时候基本肯定会有并发的内存写入,会引入复制,增加内存开销,感觉并没有引入什么好处。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好处是:

  1. 写入checkpoint的时候不需要lock内存。写磁盘的时候获取read lock,参数更新需要获取write lock,此时是不能同时参数更新的,pserver智能等待checkpoint写完。
  2. 程序开发简单。

但也想到:如果pserver用golang编写,fork进程会导致go routine无法复制的问题。也会比较麻烦。修改成等待的方式。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

貌似这一行没有改,是不是忘记了?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* ***注:每个parameter server的检查点各自独立保存,暂时不考虑多个parameter server同步的保存一个特定时间点的全局检查点,同样会引入随机性。***
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好像“同样会引入随机性”改成“因为这样做也没法保证消除随机性”)更容易理解。



检查点保存程序流程:

1. 如果满足条件""每个pass或每n个mini-batch"时,parameter server会`fork`自己,子进程中执行保存检查点任务,父进程继续工作。如果已经有子进程在进行保存检查点工作,则忽略。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • 如果使用read write lock就不用fork了。

  • 这里其实需要取得etcd的对应parameter server index的保存snapshot的lock(比如:/snapshot_lock/ps0。(防止同时多个同一个index的parameter server保存同一个文件。)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果使用read write lock就不用fork了。
同上,使用golang不方便fork。但是也得锁住parameter的内存,并停止接收更新参数。参数更新在获取write lock的时候就会等待了。

这里其实需要取得etcd的对应parameter server index的保存snapshot的lock

我理解不需要,parameter server只需要写入到glusterfs的不同目录比如:/pservers/0下。

Copy link
Contributor

@helinwang helinwang Apr 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有可能发生有两个parameter server同时以为自己的index是0,也有可能有两个master同时存在(即使只指定了1个master):因为如果replicaset说要4个pserver pod,在某一个时间可能会有6个pod存在。(这也是为什么master往etcd写入他的状态的时候需要用lock + transaction,防止同时多个master存在。bigtable paper里面的master也虽然设计成只有一个,但是还是需要用lock确保自己是唯一的一个)。

具体原因请看上次在gopher k8s非官方的slack channel(600人的群)得到的这个回答:
screen shot 2017-04-20 at 4 30 33 pm

2. parameter server生成一个UUID,向指定的目录中一个新的文件(文件名为此UUID)写入快照数据。在快照写入完成后,计算这个文件的MD5 sum。然后在etcd的`/checkpoints/[pserver_id]`中写入json内容:`{"uuid": [UUID], "md5", "MD5 sum", "timestamp": xxxx}`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我的感觉(可能是错误的)是能不要用etcd的地方就不要用了。这里可以考虑设计成:

  1. 向目录中的文件checkpoint.tmp写入快照数据。
  2. mv checkpoint.tmp checkpoint (这个操作是原子操作)。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mv在mv单个文件并且是本地文件系统时时原子操作(这里也不确定不同文件系统是否表现相同?),在挂载的分布式文件系统中不一定是原子的操作,参考: https://bugzilla.redhat.com/show_bug.cgi?id=762766 。(不确定最新的版本是否可以支持)

如果将来考虑使用其他的分布式存储系统,也得考虑这些系统的各种操作是否原子。比较通用的情况还是写etcd了。

Copy link
Contributor

@helinwang helinwang Apr 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对不起,我用原子这个词不是很恰当,我想要表达的其实是mv只要执行了,最终就一定会成功,不会出现mv到一半的状况(不会一半文件是正确的,另一半文件是垃圾,没有这个中间状态)。

关于atomic,我仔细想了一下,mv如果不是atomic的,遇到race的话(读文件和mv在非常接近的时间发生)会出现这种情况:读取的人会读到旧的文件。
首先,mv和读在非常接近的时间发生可能性应该很低:而mv是pserver存checkpoint的时候发生的,而读checkpoint是这个pserver被重启之后发生的,重启需要一定时间的。
其次,貌似发生了这个情况也不会影响数据的正确性(不会一半文件是正确的,另一半文件是垃圾),只是读到了旧的模型。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

经讨论大家同意 @typhoonzero 的方法好。

3. 删除磁盘目录中不是当前uuid的快照文件。
4. 关闭fork出来的进程。

这里需要用户额外注意,在您的实际环境中,训练任务的运行可能会占满trainer和parameter server之间的网络带宽,如果parameter server此时还需要通过网络访问分布式存储以保存快照,可能会造成网络拥塞,而出现阶段性的运行停滞。

### 从快照恢复

在parameter server第一次启动或任意时间parameter server故障后被Kubernetes重新启动,则需要回滚到上一个检查点:

1. 从etcd中读取节点:`/checkpoints/[pserver_id]`获取最新的检查点的文件uuid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果使用前面提到的checkpoint.tmp方法,这里可以改成:

  1. checkpoint文件恢复模型。
  2. 开始提供服务。

Copy link
Contributor

@helinwang helinwang Apr 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

决定用PR里提出的方法,即使parameter server存到一半挂掉也没事。

1. 从磁盘文件中加载uuid文件名的检查点快照文件,并加载其中的参数
1. 如果上面两步出现错误,则使用启动参数定义的初始化方法初始化参数
1. 开始提供服务

## TODO List
### 推测执行/加速执行(TODO)
在异构集群中,如果存在某些trainer执行速度过慢会影响整体集群的速度(如图中Trainer 1),此时master将负责启动一个新的Trainer(Accelerate Trainer 2),使用同样的训练数据block。哪个trainer先完成block的训练,则把另一个慢速的kill掉。

### 动态扩容/缩容
目前只考虑动态扩容trainer数量,可以减小系统复杂性。

## 术语
* model: 指深度学习训练之后得到的所有参数,使用这个神经网络可以完成对新数据的预测
* parameters: 神经网络中的参数,包括权重w和偏置b。一个神经网络的模型由大量的参数组成
* shard: 分片,通常指将一个整体拆分成多份的其中的一份。
* model shard: 将一个神经网络参数拆分成多份,每个shard分别存储在其中一台parameter server之上
* parameter block: 多个parameter block构成一个model shard
* 单点故障: 任意时刻只可能同时有一台服务器故障。由于集群中同时存在两台机器故障的概率极低((平均故障率*平均故障修复时间)^2)只对特殊在线系统考虑两台以上同时故障的容灾。
67 changes: 67 additions & 0 deletions doc/design/cluster_train/data_dispatch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
## 训练数据的存储和分发

### 流程介绍
生产环境中的训练数据集通常体积很大,并被存储在诸如Hadoop HDFS, Ceph, AWS S3之类的分布式存储之上。这些分布式存储服务通常会把数据切割成多个分片分布式的存储在多个节点之上。这样就可以在云端执行多种数据类计算任务,包括:
Copy link
Contributor

@helinwang helinwang Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

能否吧"Hadoop HDFS, Ceph, AWS S3"改成"Hadoop HDFS,Ceph,AWS S3":)
对不起这个我有点挑刺了:),只是想让这个文档在每个细节都完美。
要是可以的话,括号也麻烦都用中文的全码括号:)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

赞细致~Done.


* 数据预处理任务
* Paddle训练任务
* 在线模型预测服务

<img src="src/data_dispatch.png" width="500"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个图中的"mount API"和"Storage API"是指的同一个东西吗?如果是的话能否用同一个名字?

Copy link
Collaborator

@wangkuiyi wangkuiyi Apr 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

除了 @helinwang 的问题之外:

  1. 这个图没有对应的说明。自成一段。
  2. Paddle cluster 是什么?我理解不存在这个一个集群?
  3. online cluster 是什么?我理解也不存在这样一个集群?

我把我画过的一些图上传到一个repo,供大家复用。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero 这里这个图是不是想表示一个Kubernetes机群上可以跑多种jobs?如果是,可以复用 PaddlePaddle/talks#1 这里的图

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangkuiyi 是的,可以直接用这个图👍。

两个问题:

  1. 为什么是reinforcement learning?
  2. 这里假定了训练数据都是来自于log,是否能加上用户自己上传的数据?比如使用paddle upload <filename>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


### 训练数据的存储

选择GlusterFS作为训练数据的存储服务(后续的实现考虑HDFS)。

在Kubernetes上运行的不同的计算框架,可以通过Volume或PersistentVolume挂载存储空间到每个容器中。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请问Volume是指单机挂载node上的文件系统吗?上一行说了用GlusterFS,是不是这里只能用PersistenVolume(我对PersistenVolume的理解是:挂载分布式文件系统只能用它。可能理解有误。)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kubernetes支持两种方式挂载存储:Volume和PV。Volume是直接调用存储的API把存储挂载到Pod上,PV的方式是在kubernetes集群中先建立一个存储的池子,然后使用PVC申请。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

明白了,谢谢!


在存储中的共享位置,需要保存PaddlePaddle book中的所有dataset数据,并且可以被提交的job直接使用。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • “PaddlePaddle book中的所有dataset数据”是不是用“公开数据集”更合适?比如咱们会把imagenet数据集放进去,但是imagenet并不在paddle book里面。

  • "在存储中的共享位置"貌似不是很通顺?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


### 上传训练文件

使用下面命令,可以把本地的训练数据上传到存储集群中

```
paddle upload train_data.list
Copy link
Contributor

@helinwang helinwang Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要能够指定dataset name这个参数,因为之后reader是通过dataset name引用这个数据集的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

```

其中`.list`文件描述了训练数据的文件和对应的label,对于图像类数据,`.list文件`样例如下,每一行包含了图片文件的路径和其label:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要加上用什么分隔符,可以考虑tab,比如:

其中.list文件样例如下,每一行包含了图片文件的路径和其label(用tab分隔开)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


```
/data/image1.jpg 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我们需要支持绝对路径吗?如果要支持的话,咱们上传的list文件需要把绝对路径替换成相对路径。绝对路径传到集群上不认识。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改成相对路径了~

/data/image1.jpg 5
/data/image1.jpg 2
/data/image1.jpg 5
/data/image1.jpg 1
/data/image1.jpg 8
...
```

对于文本类训练数据样例如下(机器翻译),一行中包含源语言,目标语言的文本(label):

```
L&apos; inflation , en Europe , a dérapé sur l&apos; alimentation Food : Where European inflation slipped up
L&apos; inflation accélérée , mesurée dans la zone euro , est due principalement à l&apos; augmentation rapide des prix de l&apos; alimentation . The skyward zoom in food prices is the dominant force behind the speed up in eurozone inflation .
...
```

### 使用reader

使用v2 API编写训练任务是,可以编写如下简单的reader,返回文件中的各列,然后在调用`trainer.train()`时传入,完成训练数据的读取:

```python
def train():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

面向用户的reader接口与我想象的不一样。我想象中的是:

reader = paddle.dist.reader("dataset-name")
batch_reader = paddle.batch(paddle.dataset.mnist.train(), 128)
paddle.train(batch_reader, ...)

paddle.train内部会获取reader的内容:

def paddle.train(batch_reader):
  r = batch_reader() # create a interator for one pass of data
  for batch in r:
    # train

这里面batch是含有128个data instance的mini-batch。每一个data instance会是一个tuple,tuple元素的顺序与.list文件文件中每一列的顺序是一致的:
比如:

./image1.jpg    1
./image2.jpg    2

每一个data instance会是(raw_image_file_binary_data, label)。其中raw_image_file_binary_data是对应图像文件的没有解码的原始二进制数据,用户需要自己解码。label是文本类型(比如:“1“,”2“),这里用户需要的其实是整形,用户需要自己转换成整形。


我觉得这个方法的reader的好处是其如何实现的其实是对用户透明的。对比:

fp = open("/glusterfs/mount/dir/yourfile_%d.list" % TRAINER_ID, "r")
def reader():
  for l in fp:
  yield l[:-1].split("\t")

  return reader

以上代码可以是一种实现方式(但也不完全正确,对应的每一行应该由master指定,不应该从TRAINER_ID算出来。设计文档中的TRAINER_ID是一个字符串型的UUID,并不是整形。我们不需要trainer有自己的顺序。),我认为这一种实现方式不应该让用户知道,也不该让用户去实现。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

fp = open("/glusterfs/mount/dir/yourfile_%d.list" % TRAINER_ID, "r")

def reader():
for l in fp:
yield l[:-1].split("\t")

return reader
```

## TODO

### 支持用户自定义的数据预处理job
### 支持SSTable格式的key-value数据
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉“支持SSTable格式的key-value数据”改成“支持将数据合并成内部的文件格式,方便sharding与顺序读取”更加贴切。

可能顺也需要调整一下,变成:

### 支持将数据合并成内部的文件格式,方便sharding与顺序读取
### 支持用户自定义的数据预处理job

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. 补充了举例文件格式。

### 支持将数据合并成内部的文件格式(key-value, protobuf等),方便sharding与顺序读取

Binary file added doc/design/cluster_train/src/checkpointing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/cluster_train/src/data_dispatch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/cluster_train/src/trainer.graffle
Binary file not shown.
Binary file added doc/design/cluster_train/src/trainer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/images/replica.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/images/two_phase_commit.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.