腾讯高性能图计算框架Plato代码阅读(二) 图加载

编程入门 行业动态 更新时间:2024-10-14 16:23:36

<a href=https://www.elefans.com/category/jswz/34/1770070.html style=腾讯高性能图计算框架Plato代码阅读(二) 图加载"/>

腾讯高性能图计算框架Plato代码阅读(二) 图加载

腾讯高性能图计算框架Plato代码阅读(二) 图加载

在pagerank计算逻辑中,图加载和图切分是通过如下调用完成的:

  // init graphplato::graph_info_t graph_info(FLAGS_is_directed);auto pdcsc = plato::create_dcsc_seqs_from_path<plato::empty_t>(&graph_info, FLAGS_input, plato::edge_format_t::CSV,plato::dummy_decoder<plato::empty_t>, FLAGS_alpha, FLAGS_part_by_in);

图加载和图切分中涉及到了复杂的代码逻辑,我们今天主要介绍图加载。

入口函数create_dcsc_seqs_from_path解读

函数create_dcsc_seqs_from_path的定义如下,可以看到外层的create_dcsc_seqs_from_path是对内层create_dcsc_seq_from_path的简单封装,封装之处在于指定SEQ_PART(切分策略)为sequence_balanced_by_source_t:

// dcsc with sequence balanced partition by source
template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, sequence_balanced_by_source_t>> create_dcsc_seqs_from_path (graph_info_t*                   pgraph_info,const std::string&              path,edge_format_t                   format,decoder_t<EDATA>                decoder,int                             alpha = -1,bool                            use_in_degree = false,vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr) {return create_dcsc_seq_from_path<EDATA, sequence_balanced_by_source_t, VID_T, CACHE>(pgraph_info, path, format, decoder, alpha, use_in_degree, vid_encoder);
}/** create dcsc graph structure with sequence balanced by source partition from file system** \tparam EDATA          edge data type* \tparam SEQ_PART       sequence partition type* \tparam VID_T          vertex id type, can be uint32_t or uint64_t* \tparam CACHE          cache type, can be edge_block_cache_t or edge_file_cache_t or edge_cache_t** \param pgraph_info     user should fill 'is_directed_' field of  graph_info_t, this function*                        will fill other fields during load process.* \param path            input file path, 'path' can be a file or a directory.*                        'path' can be located on hdfs or posix, distinguish by its prefix.*                        eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs* \param format          file format* \param decoder         edge data decode, string => EDATA* \param is_directed     the graph is directed or not* \param alpha           vertex's weighted for partition, -1 means use default* \param use_in_degree   use in-degree instead of out degree for partition** \return*      graph structure in dcsc form**/
template <typename EDATA, typename SEQ_PART, typename VID_T = vid_t,template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, SEQ_PART>> create_dcsc_seq_from_path (graph_info_t*                       pgraph_info,const std::string&                  path,edge_format_t                       format,decoder_t<EDATA>                    decoder,int                                 alpha = -1,bool                                use_in_degree = false,vencoder_t<EDATA, VID_T, CACHE>     vid_encoder = nullptr)

下面从模板参数、入参、返回值、主流程这几个方面对这个函数做解读。

模板参数

可以看到create_dcsc_seq_from_path是一个使用了template的函数,在pagerank场景下,模板参数的取值为:

EDATA=plato::empty_t,pagerank中在边上没有存储数据

VID_T=plato::vid_t,一般为int32或int64

CACHE=plato::edge_file_cache_t

edge_file_cache_t定义如下,其通过父类对edge_unit_t类型的数据进行管理,用于存储加载上来的图中的边:

// thread-safe edge cache implementation, fixed capacity
template <typename EDATA, typename VID_T = vid_t>
class edge_file_cache_t : public object_file_buffer_t<edge_unit_t<EDATA, VID_T>> {
public:using edata_t            = EDATA;using edge_unit_spec_t   = edge_unit_t<edata_t, VID_T>;explicit edge_file_cache_t(void) : object_file_buffer_t<edge_unit_t<EDATA, VID_T>>() { }explicit edge_file_cache_t(size_t n) : object_file_buffer_t<edge_unit_t<EDATA, VID_T>>(n) { }
};

其父类object_file_buffer_t的定义如下。这个类提供了线程安全的内存管理能力。

// fixed-size, object file buffer with thread-safe traversal
template <typename T, typename Enable = void>
class object_file_buffer_t {}

edge_file_cache_t的模板化参数为EDATA=plato::empty_tVID_T=plato::vid_t,object_file_buffer_t的模板化参数为T=edge_unit_t<EDATA, VID_T>

edge_unit_t的定义如下。其中存储了一个边的起始节点,终止节点,边数据。

template <typename EDATA_T, typename VID_T = vid_t>
struct edge_unit_t {VID_T   src_;VID_T   dst_;EDATA_T edata_;template<typename Ar>void serialize(Ar &ar) { // boost-style serialization when EDATA_T is non-trivialar & src_ & dst_ & edata_;}
};// __attribute__((packed));

入参

大部分入参的作用从名称中可以领会到,我们重点介绍几个入参的类型。

edge_format_t 是一个简单的枚举,一般只能取CSV

enum class edge_format_t {UNKNOWN = 0,CSV     = 1
};

decoder_t 是一个函数类型,作用为解析边上的数据,定义如下。

/** \brief decoder_t, decode edge data from string** \param pOutput   output* \param sInput    unresolve c-string, end with '\0', you can modify it** \return true -- continue parse stage, false -- abort parse***/
template <typename EdgeData>
using decoder_t = std::function<bool(EdgeData*, char*)>;

在pagerank的场景中,decoder_t decoder传入的值为plato::dummy_decoderplato::empty_t,原因是PageRank不需要使用边上的数据。

vencoder_t<EDATA, VID_T, CACHE> 是顶点ID编码器的类型,定义如下:

template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
using vencoder_t = typename std::remove_reference<vid_encoder_t<EDATA,VID_T,CACHE>*>::type;template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
class vid_encoder_t {void encode(CACHE<EDATA, VID_T>& cache, encoder_callback_t callback);
}

可以看到,其实质上是vid_encoder_t<EDATA,VID_T,CACHE>类型,通过提供encode方法来对顶点ID进行重编码,从而让顶点ID重新分布到[0, 顶点个数)的范围内,方便后续的计算。后续的分区操作假设了顶点ID的范围是[0, 顶点个数)。

传入参数取值总结

总结一下,create_dcsc_seqs_from_path传入的参数取值为:

参数取值备注
graph_info_t* pgraph_infograph_info
const std::string& pathFLAGS_input
edge_format_t formatplato::edge_format_t::CSV
decoder_t<EDATA> decoderplato::dummy_decoder<plato::empty_t>
int alpha=-1FLAGS_alpha
bool use_in_degree=falseFLAGS_part_by_in
vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr使用默认值nullptr

返回值

该函数的返回值类型为 std::shared_ptr<dcsc_t<EDATA, sequence_balanced_by_source_t>>

dcsc_t的定义如下。其中,dcsc是doubly compressed sparse column的简称,这种格式的详细定义请见论文On the representation and multiplication of hypersparse matrices,后面有机会单独写一篇文章介绍各种稀疏矩阵存储格式。

/** doubly compressed sparse column storage* vertexId must be compacted** references:*  BULUC, A., AND GILBERT, J.R. On the representation and multiplication of*  hypersparse matrices.** \tparam EDATA      data type associate with edge* \tparam PART_IMPL  partitioner's type**/
template <typename EDATA, typename PART_IMPL, typename ALLOC = std::allocator<adj_unit_t<EDATA>>>
class dcsc_t {}

这里面同样涉及到模板参数,其中,EDATA=plato::empty_t表示边上未存储数据,PART_IMPL=sequence_balanced_by_source_t表示使用sequence_balanced_by_source_t来进行图的切分,ALLOC = std::allocator<adj_unit_t<EDATA>>表示内存分配器的类型。下面打开介绍sequence_balanced_by_source_t 和 adj_unit_t。

图切分策略 sequence_balanced_by_source_t

plato中提供了多种图切分策略,sequence_balanced_by_source_t就是其中一种,核心定义如下

/** Partitioner try to keep each partitions' computation work balanced* vertexId must be compacted** references:*  Julian Shun, Guy E Blelloch. Ligra: A Lightweight Graph Processing*  Framework for Shared Memory**  Xiaowei Zhu, Wenguang Chen, etc. Gemini: A Computation-Centric Distributed*  Graph Processing System**/// edge belong to source node's partition
class sequence_balanced_by_source_t {}

sequence_balanced_by_source_t初始化时,会按照负载均衡的原则预先计算好每个partition负责的顶点编号范围。

plato为了实现负载均衡, 使不同分区上的工作量大致相等,减少struggler导致的任务执行时间长的问题,按照指标α · |Vi| + |Ei|来估计分区i的工作量。这个公式中,α是一个可调节参数,用来决定更重视分区中顶点的数量|Vi|,还是更重视分区中边的数量|Ei|。α的默认值为8*(p-1),其中p代表集群中所运行的进程的数量(一般同集群中的机器数量相同)。

vector poffset用来存储分区的结果。poffset中有p+1个元素,对于分区i,其存储的顶点范围为poffset[i - 1] 到 poffset[i]。为了性能,poffset中存储的顶点编号,除最后一个外,都会对齐到PAGESIZE。请注意poffset中存储的顶点编号是递增的。

初始化的关键逻辑如下:

  /** constructor** \param degrees   each vertex's degrees* \param vertices  vertex number of the graph* \param vertices  edge number of the graph* \param alpha     vertex's weight of computation, default: -1, means*                  alpha = 8 * (partitions - 1)**/template <typename DT>sequence_balanced_by_source_t(const DT* degrees, vid_t vertices, eid_t edges, int alpha = -1) {if (-1 == alpha) {auto& cluster_info = cluster_info_t::get_instance();// 计算alpha的默认值alpha = 8 * (cluster_info.partitions_ - 1);}__init_offset(&offset_, degrees, vertices, edges, alpha);}template <typename DT>
void __init_offset(std::vector<vid_t>* poffset, const DT* degrees, vid_t vertices, eid_t edges, int alpha) {// poffset中存储了每个partition中要管理的节点范围auto& cluster_info = cluster_info_t::get_instance();uint64_t remained_amount = edges + vertices * (uint64_t)alpha;uint64_t expected_amount = 0;poffset->clear();poffset->resize(cluster_info.partitions_ + 1, 0);for (int p_i = 0; p_i < cluster_info.partitions_; ++p_i) {// 总是尝试在剩下的分区中平分剩下的节点,从而得到expected_amountexpected_amount = remained_amount / (cluster_info.partitions_ - p_i);uint64_t amount = 0;for (vid_t v_i = poffset->at(p_i); v_i < vertices; ++v_i) {amount += (alpha + degrees[v_i]);if (amount >= expected_amount) {// 为了性能,将顶点编号对齐到PAGESIZEpoffset->at(p_i + 1) = v_i / PAGESIZE * PAGESIZE;break;}}if ((cluster_info.partitions_ - 1) == p_i) { poffset->at(cluster_info.partitions_) = vertices; }remained_amount -= amount;}
}

初始化计算完毕后,可以直接通过get_partition_id接口查询顶点所对应的分区编号。

  // get vertex's partitioninline int get_partition_id(vid_t v_i) {for (size_t p_i = 0; p_i < (offset_.size() - 1); ++p_i) {if (v_i >= offset_[p_i] && v_i < offset_[p_i + 1]) {return p_i;}}}

查询边的分区时,直接按照边的src的顶点所处的分区确定边的分区:

  // get edge's partitioninline int get_partition_id(vid_t src, vid_t /*dst*/) {return get_partition_id(src);}
邻接单元 adj_unit_t

adj_unit_t的定义比较简单。每个adj_unit_t存储当前顶点的一个邻居,以及从当前顶点到该邻居的边上的数据。

template <typename EDATA_T>
struct adj_unit_t {vid_t    neighbour_;EDATA_T  edata_;template<typename Ar>void serialize(Ar &ar) { // boost-style serialization when EDATA_T is non-trivialar & neighbour_ & edata_;}
};// __attribute__((packed));

create_dcsc_seq_from_path的主流程

在create_dcsc_seq_from_path函数中,主要做了如下几件事情,请结合代码中的注释理解,请注意我只贴出了关键代码,完整代码请点击上面的链接到github上查看。

  1. 从文件中加载边。
  2. 根据用户指定的FLAGS_part_by_in参数,决定是使用顶点入度作为partition的依据,还是使用顶点出度作为partition的依据。
  3. 初始化分区器,计算每个分区中保存的顶点ID范围。
  4. 重新读取文件,按照分区器的指示将顶点和边存储到正确的分区中。
template <typename EDATA, typename SEQ_PART, typename VID_T = vid_t,template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, SEQ_PART>> create_dcsc_seq_from_path (graph_info_t*                       pgraph_info,const std::string&                  path,edge_format_t                       format,decoder_t<EDATA>                    decoder,int                                 alpha = -1,bool                                use_in_degree = false,vencoder_t<EDATA, VID_T, CACHE>     vid_encoder = nullptr) {// 1. 从文件中加载边。plato的输入数据格式为edge list,因此这里使用了加载边的说法。auto cache = load_edges_cache<EDATA, VID_T, CACHE>(pgraph_info, path, format, decoder, nullptr, vid_encoder);{if (use_in_degree) {// 2. 根据用户指定的FLAGS_part_by_in参数,决定是使用顶点入度作为partition的依据,还是使用顶点出度作为partition的依据degrees = generate_dense_in_degrees<vid_t>(*pgraph_info, *cache);} else {degrees = generate_dense_out_degrees<vid_t>(*pgraph_info, *cache);}// 3. 初始化分区器,计算每个分区中保存的顶点ID范围。注意pagerank场景下模板参数SEQ_PART的取值为 class sequence_balanced_by_source_t,这个类实例化时会计算好每个分区中保存的顶点ID范围。reset为std::shared_ptr提供的方法,可以先不关注。part_dcsc.reset(new SEQ_PART(degrees.data(), pgraph_info->vertices_,__edges, alpha));part_dcsc->check_consistency();}// 4. 重新读取文件,按照分区器的指示将顶点和边存储到正确的分区中。pdcsc->load_from_cache(*pgraph_info, *cache)return pdcsc;
}

这篇文章主要打开介绍从文件中加载边的操作,剩下的操作在后续文章中介绍。

1. 从文件中加载边 load_edges_cache

load_edges_cache函数的关键定义如下:

/** parallel load edges from file system to cache** \tparam EDATA        data bind on edge* \tparam VID_T        vertex id type, can be uint32_t or uint64_t* \tparam CACHE        cache type, can be edge_block_cache_t or edge_file_cache_t or edge_cache_t** \param pginfo        graph info* \param path          input file path, 'path' can be a file or a directory.*                      'path' can be located on hdfs or posix, distinguish by its prefix.*                      eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs* \param format        file format* \param decoder       edge data decode, string => EDATA* \param callback      function executed when parsing data* \param vid_encoder   encoder for data ** \return loaded cache or nullptr**/
template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<CACHE<EDATA, vid_t>> load_edges_cache(graph_info_t*                   pginfo,const std::string&              path,edge_format_t                   format,decoder_t<EDATA>                decoder,data_callback_t<EDATA, vid_t>   callback = nullptr,  // pagerank场景下取值 nullptrvencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr) {// 1.1 从文件系统上读取edge list格式的数据read_from_files<EDATA, vid_t>(path, format, decoder, real_callback); // 1.2 通过MPI_Allreduce统计各个机器上加载到的边数量的和MPI_Allreduce(MPI_IN_PLACE, &edges, 1, get_mpi_data_type<eid_t>(), MPI_SUM, MPI_COMM_WORLD);// 1.3 由于v_bitmap大小太大,直接调用MPI_Allreduce会出错,所以通过plato自己实现的allreduce方法分段对v_bitmap进行allreduceallreduce(MPI_IN_PLACE, v_bitmap.data_, word_offset(v_bitmap.size_) + 1, get_mpi_data_type<uint64_t>(),MPI_BOR, MPI_COMM_WORLD);// 1.4 将计算得到的信息保存到入参pginfo中,实质上就是保存到了graph_info中if (pginfo) {pginfo->edges_    = edges;pginfo->vertices_ = v_bitmap.count();pginfo->max_v_i_  = v_bitmap.msb();}
}

1.1 从文件系统上读取edge list格式的数据 read_from_files

read_from_files函数的关键定义如下:

/** parallel parse edges from file system to cache** \tparam EDATA        data bind on edge* \tparam VID_T        vertex id type, can be uint32_t or uint64_t** \param path          input file path, 'path' can be a file or a directory.*                      'path' can be located on hdfs or posix, distinguish by its prefix.*                      eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs* \param format        file format* \param decoder       edge data decode, string => EDATA* \param callback      function executed when parsing data***/
template <typename EDATA, typename VID_T = vid_t>
void read_from_files(const std::string&            path,edge_format_t                 format,decoder_t<EDATA>              decoder,data_callback_t<EDATA, VID_T> callback) {parser = csv_parser<boost::iostreams::filtering_istream, EDATA, VID_T>;// 1.1.1 获取本机的文件列表std::vector<std::string> files = get_files(path);// 1.1.2 通过OpenMP,以cluster_info.threads_个线程并行读取files列表指定的文件,每次通过with_file方法读取一个文件#pragma omp parallel num_threads(cluster_info.threads_){while (true) {std::string filename;{std::lock_guard<std::mutex> lock(files_lock);if (files.empty()) break;filename = std::move(files.back());files.pop_back();}with_file(filename, [&] (boost::iostreams::filtering_istream& is) {parser(is, callback, decoder);});}}
}

1.1.1 获取本机负责的文件列表 get_file 和 get_files_from_hdfs

get_file函数是一个很简单的封装,根据url的开头来决定调用get_files_from_hdfs还是get_files_from_posix,关键定义如下。 本例中我们假设pagerank的计算从HDFS中读取数据。

inline std::vector<std::string> get_files(const std::string& path) {if (boost::starts_with(path, "hdfs://")) {return get_files_from_hdfs(path);} else {return get_files_from_posix(path);}
}

get_files_from_hdfs函数的关键定义如下:

inline std::vector<std::string> get_files_from_hdfs(const std::string& path) {// 由0号机器负责列出文件列表if (0 == cluster_info.partition_id_) {hdfsFileInfo* hdfs_file_list_ptr = hdfsListDirectory(hdfs_t::get_hdfs(path).filesystem_, path.c_str(), &num_files);// 按照在机器之间平均分配文件大小的原则,计算每个机器所负责文件列表assign_files_even_by_size(&fchunks, files, cluster_info.partitions_)}// 将每个机器负责的文件列表发送到各个机器的chunks变量上shuffle<std::vector<std::string>>(shuffle_send, shuffle_recv)// 返回本机负责的文件列表return chunks;
}

上面的shuffle函数使用了non-blocking的MPI通信以提高性能,后面有机会单独写一篇文章介绍。

1.1.2 通过with_file方法读取一个文件

with_file函数的关键定义如下

/** \param filename* \param func      auto func(boost::iostreams::filtering_istream& is)** \return 0 -- success, else failed**/
template <typename Func>
inline void with_file(const std::string& filename, Func func) {boost::iostreams::filtering_istream fin;// 若文件为压缩格式,则进行流式的解压if (boost::iends_with(filename, ".gz")) {fin.push(boost::iostreams::gzip_decompressor());}if (boost::istarts_with(filename, "hdfs://")) {// 对于hdfs上的文件,使用hdfs提供的加载方法hdfs_t::fstream hdfs_fin(hdfs_t::get_hdfs(filename), filename);fin.push(hdfs_fin);// 调用read_from_files中提供的func进行文件加载,定义见下func(fin);} else {fin.push(boost::iostreams::file_source(filename));func(fin);}
}

文件的解析是通过入参指定的func函数进行的,该函数以lambda的形式定义在read_from_files中,关键定义如下,可见对于输入文件流,会使用csv_parser进行解析

      parser = csv_parser<boost::iostreams::filtering_istream, EDATA, VID_T>;with_file(filename, [&] (boost::iostreams::filtering_istream& is) {parser(is, callback, decoder);});

csv_parser的关键定义如下,主要逻辑包括:

  1. 按照逗号进行分割,确定边的src顶点和dst顶点
  2. 累计HUGESIZE条边再一起调用callback进行处理,这种批处理的策略可以提高处理速度
  3. 返回获取的总边数。
template <typename STREAM_T, typename EdgeData, typename VID_T = vid_t>
ssize_t csv_parser(STREAM_T& fin, blockcallback_t<EdgeData, VID_T> callback, decoder_t<EdgeData> decoder) {while (fin.good() && (false == fin.eof())) {fin.getline(sInput.get(), HUGESIZE);pLog   = sInput.get();pToken = strtok_r(sInput.get(), ",", &pSave);auto src = strtoul(pToken, nullptr, 10);buffer[count].src_ = src;pToken = strtok_r(nullptr, ",", &pSave);auto dst = strtoul(pToken, nullptr, 10);buffer[count].dst_ = dst;++total_count;++count;if (count >= HUGESIZE) {callback(buffer.get(), count);count = 0;}}if (0 != count) {callback(buffer.get(), count);count = 0;}return total_count;}

csv_parser的callback是read_from_files的callback入参,定义在load_edges_cache中,关键定义如下:

  auto real_callback = [&](edge_unit_t<EDATA, vid_t>* input, size_t size) {__sync_fetch_and_add(&edges, size);for (size_t i = 0; i < size; ++i) {// v_bitmap记录了当前节点中存在哪些顶点v_bitmap.set_bit(input[i].src_);v_bitmap.set_bit(input[i].dst_);}cache->push_back(input, size);// pagerank场景下callback取值 nullptr,也就是在加载数据时没有更进一步的callback了。if (nullptr != callback) callback(input, size);  return true;};

小结

今天我们初步认识了plato中的图加载和图切分函数create_dcsc_seqs_from_path,并将其中图加载的阶段打开进行了介绍,下一篇文章将进入图切分的逻辑。

更多推荐

腾讯高性能图计算框架Plato代码阅读(二) 图加载

本文发布于:2024-02-06 09:36:43,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1748083.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:腾讯   高性能   框架   加载   代码

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!