1

我正在尝试为以下内容设置一个非常简单的最小工作示例:使用带有 tf.distribute.ParameterServerStrategy 的 tf.Server 在 tf.train.MonitoredSession 中使用 tf.keras 构建的模型。

我的最终目标是在分布式环境中使用 tf.keras 模型,使用两个工作人员,每个工作人员都有一个 GPU 和一个参数服务器。

该模型是根据此处找到的示例和文档构建的:https ://www.tensorflow.org/versions/r1.12/api_docs/python/tf/keras/models/Sequential

根据此处找到的文档使用参数服务器策略:https ://www.tensorflow.org/versions/r1.12/api_docs/python/tf/contrib/distribute/ParameterServerStrategy

包括设备放置和 MonitoredSession 使用在内的整体设置取自:https ://github.com/tensorflow/examples/blob/master/community/en/docs/deploy/distributed.md

我已经在使用“allow_soft_placement”选项,并且在只有一个 CPU 的本地计算机上“模拟”分布式设置,因为在实际分布式设置中存在不同的问题,我试图通过使用自动处理变量初始化的 MonitoredSession。

此代码适用于“正常”(未监控)tf.Session 和变量初始化 - 全局、局部、模型变量和表等。

解冻图形的线对于能够在 tf.keras.Model 的 fit 函数中使用 tf.data.Dataset 是必要的,因为必须创建一个迭代器 - 这会导致冻结图形中的错误。

这是我试图运行的代码。我使用 tensorflow 1.12.0 和 python 3.6.7。我也尝试过 python 2.7,结果相同。

除了安装 tensorflow 之外,该代码不需要任何设置。

import sys
import tensorflow as tf

def main(argv):

  # Create local cluster config for run_local_server.sh script.
  cluster = tf.train.ClusterSpec({"worker": ["localhost:2222"], "ps": ["localhost:2223"]})
  task = 0
  job = str(argv[0])

  # Number of GPUs per worker
  GPU_PER_WORKER = 0

  config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
  server = tf.train.Server(cluster, job_name=job, task_index=task,config=config)

  strategy = tf.contrib.distribute.ParameterServerStrategy(num_gpus_per_worker=GPU_PER_WORKER)
  strategy.configure(session_config=config, cluster_spec=cluster,task_type=job,task_id=task)

  with strategy.scope():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

    x_train = x_train.astype('float32') / 255
    x_test = x_test.astype('float32') / 255

    # Reshape input data from (28, 28) to (28, 28, 1)
    w, h = 28, 28
    x_train = x_train.reshape(x_train.shape[0], w, h, 1)
    x_test = x_test.reshape(x_test.shape[0], w, h, 1)

    # One-hot encode the labels
    y_train = tf.keras.utils.to_categorical(y_train, 10)
    y_test = tf.keras.utils.to_categorical(y_test, 10)

    train_ds = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(x_train),tf.data.Dataset.from_tensor_slices(y_train))).repeat().shuffle(60000).batch(10)
    val_ds = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(x_test),tf.data.Dataset.from_tensor_slices(y_test))).repeat().shuffle(10000).batch(10)

    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task,cluster=cluster)):
      model = tf.keras.models.Sequential()

      conv0 = tf.keras.layers.Conv2D(filters=16, data_format='channels_last', padding="valid", kernel_size=4, strides=1, input_shape=(28,28,1), activation=tf.keras.activations.relu)
      model.add(conv0)

      flatten = tf.keras.layers.Flatten()
      model.add(flatten)

      dense1 = tf.keras.layers.Dense(10, activation=tf.keras.activations.softmax)
      model.add(dense1)

      model.compile(tf.contrib.optimizer_v2.AdamOptimizer(0.001), loss=tf.keras.metrics.mean_absolute_error,metrics=['accuracy'],distribute=strategy)   

    if job == "ps":
      server.join()
    elif job == "worker":
      with tf.train.MonitoredSession(session_creator=tf.train.ChiefSessionCreator(master=server.target,config=config)) as sess:        
        sess.graph._unsafe_unfinalize()
        history = model.fit(x=train_ds, validation_data=val_ds, validation_steps=1000, steps_per_epoch=100, epochs=60)

if __name__ == "__main__":
  main(sys.argv[1:])

该代码不需要大量设置,因为数据集是从 Web 加载并转换为 tf.data.Dataset 的,因为这是我想用真实数据组织管道的方式。MNIST 数据设置示例取自 https://www.kaggle.com/margaretmz/mnist-with-tf-keras

我希望代码不会因为错误的变量或操作放置而失败,因为我基本上将所有这些决定留给了 tensorflow 的实现,方法是使用strategy.scope()tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task,cluster=cluster))

4

0 回答 0