diff --git a/apps/PGLBox/src/cluster_train_and_infer.py b/apps/PGLBox/src/cluster_train_and_infer.py index d2caf2f9..458393f6 100644 --- a/apps/PGLBox/src/cluster_train_and_infer.py +++ b/apps/PGLBox/src/cluster_train_and_infer.py @@ -21,17 +21,14 @@ import sys import time import glob -import yaml import shutil import argparse import traceback import pickle as pkl import numpy as np import helper -from datetime import datetime import paddle -import paddle.fluid as fluid import paddle.static as static from place import get_cuda_places from pgl.utils.logger import log @@ -61,7 +58,7 @@ def train(args, exe, model_dict, dataset): epoch_loss = 0 train_pass_num = 0 - for pass_dataset in dataset.pass_generator(): + for pass_dataset in dataset.pass_generator(epoch): exe.train_from_dataset( model_dict.train_program, pass_dataset, debug=False) @@ -161,7 +158,7 @@ def train_with_multi_metapath(args, exe, model_dict, dataset): fleet.barrier_worker() savemodel_end = time.time() log.info("STAGE [SAVE MODEL] for epoch [%d] finished, time cost: %f sec" \ - % (epoch + 1, savemodel_end - savemodel_begin)) + % (epoch, savemodel_end - savemodel_begin)) train_end_time = time.time() log.info("STAGE [TRAIN MODEL] finished, time cost: % sec" % @@ -216,7 +213,7 @@ def run_worker(args, exe, model_dict, infer_model_dict): dist_graph.load_edge() ret = dist_graph.load_node() - if ret is not 0: + if ret != 0: return -1 if args.warm_start_from: @@ -260,6 +257,13 @@ def run_worker(args, exe, model_dict, infer_model_dict): log.info("STAGE: need_inference is %s, skip inference process" % args.need_inference) + if args.need_dump_walk is True: + upload_dump_begin = time.time() + util.upload_dump_walk(args, args.local_dump_path) + upload_dump_end = time.time() + log.info("STAGE [UPLOAD DUMP WALK] finished, time cost: %f sec" % + (upload_dump_end - upload_dump_begin)) + return 0 @@ -314,7 +318,7 @@ def main(args): elif fleet.is_worker(): ret = run_worker(args, exe, model_dict, infer_model_dict) - if ret is not 0: + if ret != 0: fleet.stop_worker() return -1 @@ -333,6 +337,7 @@ def main(args): config.local_result_path = "./embedding" config.model_save_path = os.path.join(config.working_root, "model") config.infer_result_path = os.path.join(config.working_root, 'embedding') + config.dump_save_path = os.path.join(config.working_root, 'dump_walk') config.max_steps = config.max_steps if config.max_steps else 0 config.metapath_split_opt = config.metapath_split_opt if config.metapath_split_opt else False print("#===================PRETTY CONFIG============================#") diff --git a/apps/PGLBox/src/dataset.py b/apps/PGLBox/src/dataset.py index 1ab17b73..70da8df8 100644 --- a/apps/PGLBox/src/dataset.py +++ b/apps/PGLBox/src/dataset.py @@ -14,14 +14,17 @@ import os import time -import paddle import threading + +import paddle from paddle.distributed import fleet -import util import paddle.fluid as fluid -from place import get_cuda_places from pgl.utils.logger import log +import util +from place import get_cuda_places +import models.model_util as model_util + class BaseDataset(object): def __init__(self, @@ -221,14 +224,13 @@ def __init__(self, dist_graph=dist_graph, is_predict=False) - def pass_generator(self): + def pass_generator(self, epoch=None): # open a thread for processing the data dataset_list = [] t = threading.Thread(target=self.preload_thread, args=(dataset_list, )) t.setDaemon(True) t.start() - epoch_loss = 0 pass_id = 0 while 1: self.ins_ready_sem.acquire() diff --git a/apps/PGLBox/src/graph.py b/apps/PGLBox/src/graph.py index af5a54d8..133ec528 100644 --- a/apps/PGLBox/src/graph.py +++ b/apps/PGLBox/src/graph.py @@ -116,7 +116,8 @@ def load_edge(self): if not self.metapath_split_opt: load_begin_time = time.time() for i in range(len(self.etype_list)): - log.info("begin to upload edge_type: [%s] to GPU" % self.etype_list[i]) + log.info("begin to upload edge_type: [%s] to GPU" % + self.etype_list[i]) self.graph.upload_batch(0, i, len(get_cuda_places()), self.etype_list[i]) @@ -138,7 +139,7 @@ def load_node(self): ret = self.graph.load_node_file(self.node_types, self.root_dir, self.num_parts) - if ret is not 0: + if ret != 0: log.info("Fail to load node, ntype2files[%s] path[%s] num_part[%d]" \ % (self.node_types, self.root_dir, self.num_parts)) self.graph.release_graph_node() diff --git a/apps/PGLBox/src/hadoop.py b/apps/PGLBox/src/hadoop.py index 7f785d84..a1d44006 100644 --- a/apps/PGLBox/src/hadoop.py +++ b/apps/PGLBox/src/hadoop.py @@ -193,3 +193,22 @@ def put(src, dest, hadoop_bin=None, fs_name=None, fs_ugi=None): cmd += " 2>%s" % ERR_LOG ret = os.system(cmd) return ret + + +def replace(src, dest, hadoop_bin=None, fs_name=None, fs_ugi=None): + """hadoop replace""" + hadoop_bin, fs_name, fs_ugi = parse_account(hadoop_bin, fs_name, fs_ugi) + src = check_hadoop_path(src, fs_name, fs_ugi) + dest = check_hadoop_path(dest, fs_name, fs_ugi) + + tmp = dest + "_" + str(int(time.time())) + cmd = make_base_cmd(hadoop_bin, fs_name, fs_ugi) + cmd += " -mv " + dest + " " + tmp + " && " + + cmd += make_base_cmd(hadoop_bin, fs_name, fs_ugi) + cmd += " -put " + src + " " + dest + " && " + + cmd += make_base_cmd(hadoop_bin, fs_name, fs_ugi) + cmd += " -rmr " + tmp + ret = os.system(cmd) + return ret diff --git a/apps/PGLBox/src/models/model_util.py b/apps/PGLBox/src/models/model_util.py index f62daab6..e67edc08 100644 --- a/apps/PGLBox/src/models/model_util.py +++ b/apps/PGLBox/src/models/model_util.py @@ -283,7 +283,7 @@ def get_sparse_embedding(config, use_cvm=use_cvm) for bow in slot_bows: slot_embedding = bow[:, 1:] - slot_embedding = paddle.nn.softsign(slot_embedding) + slot_embedding = paddle.nn.functional.softsign(slot_embedding) slot_embedding_list.append(slot_embedding) return id_embedding, slot_embedding_list diff --git a/apps/PGLBox/src/util.py b/apps/PGLBox/src/util.py index 1cb94f0f..56bcbdd9 100644 --- a/apps/PGLBox/src/util.py +++ b/apps/PGLBox/src/util.py @@ -47,6 +47,13 @@ def get_global_value(value_sum, value_cnt): return value_sum / np.maximum(value_cnt, 1) +def get_batch_num(value_cnt): + """ get global value """ + value_cnt = np.array(fluid.global_scope().find_var(value_cnt.name) + .get_tensor()) + return value_cnt + + def parse_path(path): """ Args: @@ -264,6 +271,32 @@ def upload_embedding(args, local_embed_path): log.info("embedding has been saved in local path: %s" % working_root) +def upload_dump_walk(args, local_dump_path): + mode, dump_save_path = parse_path(args.dump_save_path) + _, working_root = parse_path(args.working_root) + if mode == "hdfs": + HFS.rm(dump_save_path) + HFS.mkdir(dump_save_path) + + log.info("being to upload walk_path to: %s " % dump_save_path) + for file in glob.glob(os.path.join(local_dump_path, "*")): + basename = os.path.basename(file) + HFS.put(file, dump_save_path) + log.info("[hadoop put] walk_path has been upload to: %s " % + dump_save_path) + elif mode == "afs": + log.info("being to upload walk_path to: %s " % dump_save_path) + # HFS.rm(dump_save_path) + user, passwd = args.fs_ugi.split(',') + gzshell_upload(args.fs_name, user, passwd, local_dump_path, + "afs:%s" % working_root) + log.info("[gzshell] walk_path has been upload to: %s" % dump_save_path) + else: + make_dir(working_root) + run_cmd("mv %s %s" % (local_dump_path, working_root)) + log.info("walk_path has been saved in local path: %s" % working_root) + + def hadoop_touch_done(path): """ touch hadoop done """ if fleet.worker_index() == 0: diff --git a/apps/PGLBox/user_configs/lightgcn.yaml b/apps/PGLBox/user_configs/lightgcn.yaml index c08ed9ae..cc305b6f 100644 --- a/apps/PGLBox/user_configs/lightgcn.yaml +++ b/apps/PGLBox/user_configs/lightgcn.yaml @@ -91,6 +91,8 @@ need_inference: True # 预估embedding的时候,需要的参数,保持默认即可. dump_node_name: "src_node___id" dump_node_emb_name: "src_node___emb" +# 是否需要dump游走路径 +need_dump_walk: False # ---------------------------train param config---------------------------------------------# epochs: 1 diff --git a/apps/PGLBox/user_configs/metapath.yaml b/apps/PGLBox/user_configs/metapath.yaml index 446d3118..69c3dc0a 100644 --- a/apps/PGLBox/user_configs/metapath.yaml +++ b/apps/PGLBox/user_configs/metapath.yaml @@ -82,6 +82,8 @@ need_inference: True # 预估embedding的时候,需要的参数,保持默认即可. dump_node_name: "src_node___id" dump_node_emb_name: "src_node___emb" +# 是否需要dump游走路径 +need_dump_walk: False # ---------------------------train param config---------------------------------------------# epochs: 1