TensorFlow手把手入门之分布式TensorFlow — 3个关键点,把你的TensorFlow代码重构为分布式!

分布式架构就像哈姆雷特,一千个人眼中有一千种分布式方式 — David 9

对于机器学习模型,分布式大致分两类:模型分布式数据分布式:

模型分布式非常复杂和灵活, 它把整个机器学习模型分割,分散在多个节点上,在每个节点上计算模型的各个部分, 最后把结果拼接起来。如果你造了一个并行性很高的深度网络,比如这个,那就更棒了。你只要在每个节点上,计算不同的层,最后把各个层的异步结果通过较为精妙的方式汇总起来。

而我们今天要手把手教大家的是数据分布式。模型把数据拷贝到多个节点上, 每次算Epoch迭代的时候,每个节点对于一个batch的梯度都会有一个计算值,一个batch结束后,所有节点把梯度值汇总起来(ps参数服务器的任务就是汇总所有参数更新),从而进行更新。这就会导致每个batch的计算都比非分布式方法精准。相对非分布式,并行方法下,同样的迭代次数,收敛较快。

如何把自己的单机TensorFlow代码变为分布式的代码?

本文将手把手告诉大家3个关键点,重构自己的TensorFlow代码为分布式代码(开始前请大家前用1分钟了解文末的参考文献,了解基本知识):

关键点1: 定义FLAGS全局变量,获得ps参数服务器,worker工作服务器等分布式全局信息。

# Define parameters
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_float('learning_rate', 0.00003, 'Initial learning rate.')
tf.app.flags.DEFINE_integer('steps_to_validate', 1000,
                            'Steps to validate and print loss')

# For distributed
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("issync", 0, "issync mode")

以上代码是从命令行获得变量的简单方式。使用TensorFlow自带的FLAGS命令行工具。

ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,

上述代码教你如何获得命令行变量到python变量。ps_hosts代表所有参数服务器,work_hosts是所有工作服务器。cluster组装一个分布式集群定义。server代表本地为任务分配的服务器。

关键点2: 在流图Graph定义阶段, 加入“参数服务器”和“工作服务器”的判断,重构Graph定义代码。

if FLAGS.job_name == "ps":
    server.join()
elif FLAGS.job_name == "worker":
    with tf.device(tf.train.replica_device_setter(
                    worker_device="/job:worker/task:%d" % FLAGS.task_index,
                    cluster=cluster)):
        # 这里是各个worker工作服务器下的graph定义。
        # 可以先把你原先的graph定义黏贴到此处,再进行必要的修改

如果当前服务器是ps参数服务器,当前服务器就要执行join方法汇总更新的参数。

如果当前是工作服务器,构建deVice设备上下文,复制数据到各个设备,并且知道任务号,之后再定义原先的Graph。

关键点3: 最后,重构你原来的graph定义和TensorFlow Session训练的方式细节。

    grads_and_vars = optimizer.compute_gradients(cost)
    correct_prediction = tf.equal(y_pred_cls, y_true_cls)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    rep_op = tf.train.SyncReplicasOptimizer(optimizer,
                                            replicas_to_aggregate=len(
                                            worker_hosts),
                                            #replica_id=FLAGS.task_index,
                                            total_num_replicas=len(
                                            worker_hosts),
                                            use_locking=True)
    train_op = rep_op.apply_gradients(grads_and_vars,
                                global_step=global_step)
    init_token_op = rep_op.get_init_tokens_op()
    chief_queue_runner = rep_op.get_chief_queue_runner()


    init_op = tf.initialize_all_variables()
    saver = tf.train.Saver()
    train_batch_size = batch_size
    tf.summary.scalar('cost', cost)
    tf.summary.scalar('accuracy', accuracy)
    summary_op = tf.summary.merge_all()
    summary_writer = tf.summary.FileWriter('./summary_log/train')
    summary_writer_test = tf.summary.FileWriter('./summary_log/test')

sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                            logdir="./checkpoint/",
                            init_op=init_op,
                            summary_op=None,
                            saver=saver,
                            global_step=global_step,
                            save_model_secs=60)
session = sv.prepare_or_wait_for_session(server.target)
sv.start_queue_runners(session, [chief_queue_runner])
session.run(init_token_op)

训练中稍有不同的是上面这段代码,graph定义完毕后,我们要用optimizer.compute_gradients方法计算梯度得到grads_and_vars对象。通过SyncReplicasOptimizer这个特殊的优化器,进行梯度的计算,即rep_op.apply_gradients(grads_and_vars, global_step=global_step)方法。

计算完毕得到的train_op对象就能在未来想用session.run()的地方使用了:

session.run([train_op, cost, global_step], feed_dict=feed_dict_train)

注意以上三个关键点, 你离TensorFlow并行化已经八九不离十了。

实际重构的例子,请看我github上识别猫狗的基本程序:

分布式版:

https://github.com/yanchao727/tensorflow_kaggle_cat_dog/blob/master/cnn.distributed.py

单机版:

https://github.com/yanchao727/tensorflow_kaggle_cat_dog/blob/master/cnn.py

 

参考文献:

  1. https://github.com/thewintersun/distributeTensorflowExample
  2. https://www.tensorflow.org/deploy/distributed
  3. http://blog.csdn.net/luodongri/article/details/52596780
  4. https://www.slideshare.net/stanleywanguni/distributed-machine-learning
  5. https://www.quora.com/How-is-parallel-computing-used-in-machine-learning

本文章属于“David 9的博客”原创,如需转载,请联系微信: david9ml,或邮箱:yanchao727@gmail.com

或直接扫二维码:

打赏
The following two tabs change content below.

发布者

David 9

David 9

微博: http://weibo.com/herewearenow 邮箱:yanchao727@gmail.com 微信: david9ml

发表评论

电子邮件地址不会被公开。 必填项已用*标注