Shamrock 2025.10.0
Astrophysical Code
Loading...
Searching...
No Matches
sparseXchg.hpp
Go to the documentation of this file.
1// -------------------------------------------------------//
2//
3// SHAMROCK code for hydrodynamics
4// Copyright (c) 2021-2026 Timothée David--Cléris <tim.shamrock@proton.me>
5// SPDX-License-Identifier: CeCILL Free Software License Agreement v2.1
6// Shamrock is licensed under the CeCILL 2.1 License, see LICENSE for more information
7//
8// -------------------------------------------------------//
9
10#pragma once
11
19#include "shambase/assert.hpp"
21#include "shambase/integer.hpp"
23#include "shambase/string.hpp"
24#include "shambase/time.hpp"
30#include "shambackends/math.hpp"
32#include "shamcomm/logs.hpp"
35#include "shamcomm/wrapper.hpp"
36#include <string_view>
37#include <functional>
38#include <mpi.h>
39#include <stdexcept>
40#include <string>
41#include <thread>
42#include <vector>
43
44namespace shamalgs::collective {
45
46 struct SendPayload {
47 i32 receiver_rank;
48 std::unique_ptr<shamcomm::CommunicationBuffer> payload;
49 };
50
51 struct RecvPayload {
52 i32 sender_ranks; // should not be plural
53 std::unique_ptr<shamcomm::CommunicationBuffer> payload;
54 };
55
57 std::vector<u64> local_send_vec_comm_ranks;
58 std::vector<u64> global_comm_ranks;
59
60 void build(const std::vector<SendPayload> &message_send) {
61 StackEntry stack_loc{};
62
63 local_send_vec_comm_ranks.resize(message_send.size());
64
65 i32 iterator = 0;
66 for (u64 i = 0; i < message_send.size(); i++) {
67 local_send_vec_comm_ranks[i]
68 = sham::pack32(shamcomm::world_rank(), message_send[i].receiver_rank);
69 }
70
71 vector_allgatherv(local_send_vec_comm_ranks, global_comm_ranks, MPI_COMM_WORLD);
72 }
73 };
74
75 void sparse_comm_debug_infos(
76 std::shared_ptr<sham::DeviceScheduler> dev_sched,
77 const std::vector<SendPayload> &message_send,
78 std::vector<RecvPayload> &message_recv,
79 const SparseCommTable &comm_table);
80
81 void sparse_comm_isend_probe_count_irecv(
82 std::shared_ptr<sham::DeviceScheduler> dev_sched,
83 const std::vector<SendPayload> &message_send,
84 std::vector<RecvPayload> &message_recv,
85 const SparseCommTable &comm_table);
86
87 void sparse_comm_allgather_isend_irecv(
88 std::shared_ptr<sham::DeviceScheduler> dev_sched,
89 const std::vector<SendPayload> &message_send,
90 std::vector<RecvPayload> &message_recv,
91 const SparseCommTable &comm_table);
92
93 inline void sparse_comm_c(
94 std::shared_ptr<sham::DeviceScheduler> dev_sched,
95 const std::vector<SendPayload> &message_send,
96 std::vector<RecvPayload> &message_recv,
97 const SparseCommTable &comm_table) {
98 // sparse_comm_debug_infos(dev_sched, message_send, message_recv, comm_table);
99 // sparse_comm_isend_probe_count_irecv(dev_sched, message_send, message_recv,
100 // comm_table);
101 sparse_comm_allgather_isend_irecv(dev_sched, message_send, message_recv, comm_table);
102 }
103
104 inline void base_sparse_comm(
105 std::shared_ptr<sham::DeviceScheduler> dev_sched,
106 const std::vector<SendPayload> &message_send,
107 std::vector<RecvPayload> &message_recv) {
108 StackEntry stack_loc{};
109
110 SparseCommTable comm_table;
111
112 comm_table.build(message_send);
113
114 sparse_comm_c(dev_sched, message_send, message_recv, comm_table);
115 }
116
117 inline void base_sparse_comm_max_comm(
118 std::shared_ptr<sham::DeviceScheduler> dev_sched,
119 std::vector<SendPayload> &message_send,
120 std::vector<RecvPayload> &message_recv,
121 u32 max_simultaneous_send) {
122
123 int send_loc = message_send.size();
124 int send_max_count;
125 shamcomm::mpi::Allreduce(&send_loc, &send_max_count, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD);
126
127 // logger::raw_ln(send_loc, send_max_count);
128
129 StackEntry stack_loc{};
130
131 int i = 0;
132 while (i < send_max_count) {
133
134 if (i > 0) {
135 ON_RANK_0(
136 logger::warn_ln("SparseComm", "Split sparse comm", i, "/", send_max_count));
137 }
138
139 std::vector<SendPayload> message_send_tmp;
140 std::vector<RecvPayload> message_recv_tmp;
141
142 for (int j = i; (j < (i + max_simultaneous_send)) && (j < message_send.size()); j++) {
143 // logger::raw_ln("emplace message", j);
144 message_send_tmp.emplace_back(std::move(message_send[j]));
145 }
146
147 base_sparse_comm(dev_sched, message_send_tmp, message_recv_tmp);
148
149 for (int j = 0; j < message_recv_tmp.size(); j++) {
150 message_recv.emplace_back(std::move(message_recv_tmp[j]));
151 }
152
153 i += max_simultaneous_send;
154 }
155 }
156
157} // namespace shamalgs::collective
Shamrock communication buffers.
std::uint32_t u32
32 bit unsigned integer
std::uint64_t u64
64 bit unsigned integer
std::int32_t i32
32 bit integer
Shamrock assertion utility.
This header file contains utility functions related to exception handling in the code.
std::vector< int > vector_allgatherv(const std::vector< T > &send_vec, const MPI_Datatype &send_type, std::vector< T > &recv_vec, const MPI_Datatype &recv_type, const MPI_Comm comm)
allgatherv on vector with size query (size querying variant of vector_allgatherv_ks) //TODO add fault...
Definition exchanges.hpp:98
MPI string gather / allgather helpers (declarations; implementations in shamalgs/src/collective/gathe...
Utility functions for MPI error checking.
i32 world_rank()
Gives the rank of the current process in the MPI communicator.
Definition worldInfo.cpp:40
This file contains the definition for the stacktrace related functionality.
Functions related to the MPI communicator.
#define ON_RANK_0(x)
Macro to execute code only on rank 0.
Definition worldInfo.hpp:73
void Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
MPI wrapper for MPI_Allreduce.
Definition wrapper.cpp:119