CANN/ge LLM数据分发C++功能

发布时间:2026/7/4 7:32:01
CANN/ge LLM数据分发C++功能 功能介绍【免费下载链接】geGEGraph Engine是面向昇腾的图编译器和执行器提供了计算图优化、多流并行、内存复用和模型下沉等技术手段加速模型执行效率减少模型内存占用。 GE 提供对 PyTorch、TensorFlow 前端的友好接入能力并同时支持 onnx、pb 等主流模型格式的解析与编译。项目地址: https://gitcode.com/cann/ge链路管理建链功能介绍调用LinkLlmClusters接口在PD节点之间建立通信链路链路主要用于KV Cache的传输。使用场景建链操作是节点之间进行数据传输的前提在D2D模式中建链接口采用类TCP建链流程P侧初始化时提供侦听信息由D侧发起建链。根据业务繁忙情况在需要调整集群PD节点配比时通过建链扩容节点。功能示例初始化LLM-DataDist。其中P侧需要设置侦听的Device IP和port。// P侧 LlmDataDist llm_data_dist(PROMPT_CLUSTER_ID, LlmRole::kPrompt); std::mapAscendString, AscendString options; options[OPTION_DEVICE_ID] 0; //替换为真实IP端口 options[OPTION_LISTEN_IP_INFO] ip:port; options[OPTION_BUF_POOL_CFG] R({ buf_cfg:[{total_size:2097152,blk_size:256,max_buf_size:8192}], buf_pool_size: 2147483648 }); auto ret llm_data_dist.Initialize(options); if (ret ! LLM_SUCCESS) { printf([ERROR] Initialize failed, ret %u\n, ret); return -1; } // D侧 LlmDataDist llm_data_dist(DECODER_CLUSTER_ID, LlmRole::kDecoder); std::mapAscendString, AscendString options; options[OPTION_DEVICE_ID] 0; options[OPTION_BUF_POOL_CFG] R({ buf_pool_size: 2147483648 }); auto ret llm_data_dist.Initialize(options); if (ret ! LLM_SUCCESS) { printf([ERROR] Initialize failed, ret %u\n, ret); return -1; }在D侧调用LinkLlmClusters接口发起建链操作。std::vectorStatus rets; std::vectorClusterInfo clusters; ClusterInfo cluster_info; IpInfo local_ip_info; // 替换为本地真实IP local_ip_info.ip ip; IpInfo remote_ip_info; // 替换为对端真实IP remote_ip_info.ip ip; // 替换为P侧初始化时的侦听端口 remote_ip_info.port port; cluster_info.remote_cluster_id PROMPT_CLUSTER_ID; cluster_info.local_ip_infos.emplace_back(std::move(local_ip_info)); cluster_info.remote_ip_infos.emplace_back(std::move(remote_ip_info)); clusters.emplace_back(std::move(cluster_info)); auto ret llm_data_dist.LinkLlmClusters(clusters, rets); if (ret ! LLM_SUCCESS) { printf([ERROR] LinkLlmClusters failed, ret %u\n, ret); return -1; } for (const auto inner_ret : rets) { if (inner_ret ! LLM_SUCCESS) { printf([ERROR] LinkLlmClusters failed, ret %u\n, inner_ret); return -1; } }异常处理当调用LinkLlmClusters失败时需排查两台机器网络是否连通Device IP是否正确port是否被占用。更多异常处理请参考错误码]。断链功能介绍调用UnlinkLlmClusters接口断开并清理PD之间的通信链路。使用场景当P或者D集群节点出现异常时通过断链清理异常链路或者需要调整集群PD节点配比时通过断链关闭已建立的链路。功能示例调用断链方式有如下两种。一种是通过在D侧发起断链常用在链路非故障场景。// clusters同建链的clusters auto ret llm_data_dist.UnlinkLlmClusters(clusters, rets); if (ret ! LLM_SUCCESS) { printf([ERROR] UnlinkLlmClusters failed, ret %u\n, ret); return -1; } for (const auto inner_ret : rets) { if (inner_ret ! LLM_SUCCESS) { printf([ERROR] UnlinkLlmClusters failed, ret %u\n, inner_ret); return -1; } }一种是通过在PD两侧都发起强制断链常用在链路故障场景。// PD两侧 std::vectorStatus rets; std::vectorClusterInfo clusters; ClusterInfo cluster_info; IpInfo local_ip_info; // 替换为本地真实IP local_ip_info.ip ip; IpInfo remote_ip_info; // 替换为对端真实IP remote_ip_info.ip ip; // 替换为P侧初始化时的侦听端口 remote_ip_info.port port; cluster_info.remote_cluster_id PROMPT_CLUSTER_ID; cluster_info.local_ip_infos.emplace_back(std::move(local_ip_info)); // local_ip_infos的IP是本地的Device IP地址 cluster_info.remote_ip_infos.emplace_back(std::move(remote_ip_info)); // remote_ip_infos的IP是对端的Device IP地址 clusters.emplace_back(std::move(cluster_info)); auto ret llmDataDist.UnlinkLlmClusters(clusters, rets, 1000, true); if (ret ! LLM_SUCCESS) { printf([ERROR] UnlinkLlmClusters failed, ret %u\n, ret); return -1; } for (const auto inner_ret : rets) { if (inner_ret ! LLM_SUCCESS) { printf([ERROR] UnlinkLlmClusters failed, ret %u\n, inner_ret); return -1; } }异常处理当调用UnlinkLlmClusters失败时如果是网络故障场景可改用强制断链方式。更多异常处理请参考错误码。KV Cache管理功能介绍在LLM-DataDist初始化时会预申请一块指定大小的内存池由OPTION_BUF_POOL_CFG配置项决定其大小后续的KV Cache的内存申请及释放都在预申请的内存上进行相比每次执行时申请一块cache内存可以节省耗时。KV Cache管理涉及的主要接口及功能如下|接口名称|功能| |--|--| |AllocateCache|分配Cache。| |DeallocateCache|释放Cache。| |PullKvCache|从远端节点拉取Cache到本地Cache仅当角色为Decoder时可调用。| |PullKvBlocks|在传输Blocks Cache场景下通过配置block列表的方式从远端节点拉取Cache到本地Cache仅当角色为Decoder时可调用。| |CopyKvCache|拷贝KV Cache。支持D2DD2H的拷贝。当期望PullKvCache和其他使用Cache的操作流水时可以额外申请一块中转Cache。当其他流程在使用Cache时可以先将下一次的Cache pull到中转Cache待其他流程使用完Cache后拷贝到指定的位置从而通过pipeline流水将PullKvCache的耗时隐藏减少总耗时。公共前缀场景在新请求推理前可以将公共前缀拷贝到新的内存中与当前请求的KV合并推理。| |CopyKvBlocks|PA场景下通过block列表的方式拷贝KV Cache。支持D2DD2HH2D的拷贝。D2D场景主要是针对当多个回答需要共用相同blockblock没填满时新增的token需要拷贝到新的block上继续迭代。H2D和D2H的拷贝主要用于对应block_index上Cache内存的换入换出。| |PushKvCache|从本地节点推送Cache到远端节点仅当角色为Prompt时可调用。| |PushKvBlocks|在传输Blocks Cache场景下通过配置block列表的方式从本地节点推送Cache到远端节点仅当角色为Prompt时可调用。|使用场景主要用于分布式集群间的KV Cache传输和搬移。功能示例一般Cache传输场景本示例介绍一般Cache传输场景下接口的使用主要涉及KV Cache的申请、释放、传输。如下将根据业务角色给出伪代码示例。P侧和D侧根据建链章节的示例完成LLM-DataDist的初始化和建链操作。在P/D侧给每个请求申请对应大小的KV Cache内存若失败则需要释放对应的资源。void OnError(LlmDataDist llm_data_dist, Cache cache) { if (cache.cache_id 0) { (void) llmDataDist.DeallocateCache(cache.cache_id); } llm_data_dist.Finalize(); } CacheDesc kv_cache_desc{}; kv_cache_desc.num_tensors NUM_TENSORS; kv_cache_desc.data_type DT_INT32; kv_cache_desc.shape {8, 16}; Cache cache{}; auto ret llm_data_dist.AllocateCache(kv_cache_desc, cache); if (ret ! LLM_SUCCESS) { printf([ERROR] AllocateCache failed, ret %u\n, ret); OnError(llm_data_dist, cache); return -1; }将Cache从P侧传输到D侧有如下两种方式在D侧调用PullKvCache接口拉取KV Cache。// P侧进行全量推理写入cache通知D侧可以pull // D侧拉取cache CacheIndex cache_index{PROMPT_CLUSTER_ID, 1, 0}; ret llm_data_dist.PullKvCache(cache_index, cache, 0); if (ret ! LLM_SUCCESS) { printf([ERROR] PullKvCache failed, ret %u\n, ret); return -1; } // 进行增量推理在P侧调用PushKvCache接口推送KV Cache。// P侧一层计算完可在传输线程立即推送以实现将大部分传输时间和计算重叠。 CacheIndex dst_cache_index{DECODE_CLUSTER_ID, 1, 0}; KvCacheExtParam ext_param{}; ext_param.src_layer_range std::pairint32_t, int32_t(0, 0); ext_param.dst_layer_range std::pairint32_t, int32_t(0, 0); // 每层tensor数量可根据实际模型修改 ext_param.tensor_num_per_layer 2; ret llm_data_dist.PushKvCache(cache, dst_cache_index, 0, -1, ext_param); if (ret ! LLM_SUCCESS) { printf([ERROR] PushKvCache failed, ret %u\n, ret); return -1; }P/D侧根据业务中Cache的使用时机自行释放对应请求的KV Cache内存。ret llm_data_dist.DeallocateCache(cache.cache_id); if (ret ! LLM_SUCCESS) { printf([ERROR] DeallocateCache failed, ret %u\n, ret); } else { printf([INFO] DeallocateCache success\n); }业务退出时P侧和D侧根据断链章节的示例进行断链和调用finalize接口释放资源。功能示例Blocks Cache传输场景本示例介绍Blocks Cache将Cache使用块状形式管理传输场景下接口的使用主要涉及KV Cache的申请、释放、传输。如下将根据业务角色给出伪代码示例。P侧和D侧根据集群建链的示例完成LLM-DataDist的初始化和建链操作。在P侧和D侧模型的每层按照计算好的num_block数量调用AllocateCache接口申请KV Cache。上层框架根据不同请求对创建的num_block大小的KV Cache进行复用业务结束后释放申请的内存。void OnError(LlmDataDist llm_data_dist, Cache cache) { if (cache.cache_id 0) { (void) llm_data_dist.DeallocateCache(cache.cache_id); } llm_data_dist.Finalize(); } CacheDesc kv_cache_desc{}; kv_cache_desc.num_tensors NUM_TENSORS; kv_cache_desc.data_type DT_INT32; kv_cache_desc.shape {8, 16}; Cache cache{}; auto ret llm_data_dist.AllocateCache(kv_cache_desc, cache); if (ret ! LLM_SUCCESS) { printf([ERROR] AllocateCache failed, ret %u\n, ret); OnError(llm_data_dist, cache); return -1; }P侧接收到新请求后为每个请求分配好对应的block_index这部分是推理框架提供的功能。模型推理完之后该请求对应的KV Cache就在对应的block_index所在的内存上将模型输出和请求对应的block_table传输给D侧推理模型作为输入。D侧接收到新请求后为每个请求分配好对应的block_index__然后调用pull_blocks接口根据P侧的block_index和D侧的block_index的对应关系将KV Cache传输到指定位置。在D侧调用PullKvBlocks接口拉取KV Cache。// P侧进行全量推理写入cache通知D侧可以拉取cache // D侧拉取cache CacheIndex cache_index{PROMPT_CLUSTER_ID, 1, 0}; std::vectoruint64_t prompt_blocks {0, 1, 2, 3}; std::vectoruint64_t decoder_blocks {3, 2, 1, 0}; auto ret llm_data_dist.PullKvBlocks(cache_index, cache, prompt_blocks, decoder_blocks); if (ret ! LLM_SUCCESS) { printf([ERROR] PullKvBlocks failed, ret %u\n, ret); return -1; } // 进行增量推理在P侧调用PushKvBlocks接口推送KV Cache。// P侧一层计算完可在传输线程立即推送以实现将大部分传输时间和计算重叠。 CacheIndex dst_cache_index{DECODE_CLUSTER_ID, 1}; KvCacheExtParam ext_param{}; ext_param.src_layer_range std::pairint32_t, int32_t(0, 0); ext_param.dst_layer_range std::pairint32_t, int32_t(0, 0); // 每层tensor数量可根据实际模型修改 ext_param.tensor_num_per_layer 2; std::vectoruint64_t prompt_blocks {0, 1, 2, 3}; std::vectoruint64_t decoder_blocks {3, 2, 1, 0}; ret llm_data_dist.PushKvBlocks(cache, dst_cache_index, prompt_blocks, decoder_blocks, ext_param); if (ret ! LLM_SUCCESS) { printf([ERROR] PushKvBlocks failed, ret %u\n, ret); return -1; }P/D侧根据业务中Cache的使用时机自行释放对应请求的KV Cache内存。ret llm_data_dist.DeallocateCache(cache.cache_id); if (ret ! LLM_SUCCESS) { printf([ERROR] DeallocateCache failed, ret %u\n, ret); } else { printf([INFO] DeallocateCache success\n); }业务退出时P侧和D侧根据断链章节的示例进行断链和资源释放。异常处理错误码LLM_DEVICE_OUT_OF_MEMORY表示Device申请KV Cache内存失败。需要检查初始化时设置的OPTION_BUF_POOL_CFG大小以及申请KV Cache的大小查看是否有请求KV Cache拉取之后没有释放内存。错误码LLM_KV_CACHE_NOT_EXIST表示对端KV Cache不存在需要检查对端进程是否异常或者对应KV Cache的请求是否推理完成。该错误不影响其他请求流程确认流程后可以重试。错误码LLM_WAIT_PROCESS_TIMEOUT或LLM_TIMEOUT表示pull KV超时说明链路出现问题需要尝试重新断链或建链。错误码LLM_NOT_YET_LINK说明与远端cluster没有建链。【免费下载链接】geGEGraph Engine是面向昇腾的图编译器和执行器提供了计算图优化、多流并行、内存复用和模型下沉等技术手段加速模型执行效率减少模型内存占用。 GE 提供对 PyTorch、TensorFlow 前端的友好接入能力并同时支持 onnx、pb 等主流模型格式的解析与编译。项目地址: https://gitcode.com/cann/ge创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考