Shamrock 2025.10.0
Astrophysical Code
Loading...
Searching...
No Matches
ShamrockDump.cpp
Go to the documentation of this file.
1// -------------------------------------------------------//
2//
3// SHAMROCK code for hydrodynamics
4// Copyright (c) 2021-2026 Timothée David--Cléris <tim.shamrock@proton.me>
5// SPDX-License-Identifier: CeCILL Free Software License Agreement v2.1
6// Shamrock is licensed under the CeCILL 2.1 License, see LICENSE for more information
7//
8// -------------------------------------------------------//
9
18#include "shamcmdopt/env.hpp"
19#include "shamcomm/logs.hpp"
21
22namespace shamrock {
23
24 void write_shamrock_dump(std::string fname, std::string metadata_user, PatchScheduler &sched) {
25
26 StackEntry stack_loc{};
27
28 std::string metadata_patch = sched.serialize_patch_metadata().dump(4);
29
30 using namespace shamrock::patch;
31
32 std::vector<u64> pids;
33 std::vector<u64> bytecounts;
34 std::vector<sham::DeviceBuffer<u8>> datas;
35
36 // serialize patchdatas and push them into dat
37 sched.patch_data.for_each_patchdata([&](u64 pid, PatchDataLayer &pdat) {
38 auto ser_sz = pdat.serialize_buf_byte_size();
39 shamalgs::SerializeHelper ser(shamsys::instance::get_compute_scheduler_ptr());
40 ser.allocate(ser_sz);
41 pdat.serialize_buf(ser);
42
43 auto tmp = ser.finalize();
44 size_t bytecount = tmp.get_size();
45
46 pids.push_back(pid);
47 bytecounts.push_back(bytecount);
48 datas.push_back(std::move(tmp));
49 });
50
51 std::vector<u64> all_pids;
52 std::vector<u64> all_bytecounts;
53
54 shamalgs::collective::vector_allgatherv(
55 pids, get_mpi_type<u64>(), all_pids, get_mpi_type<u64>(), MPI_COMM_WORLD);
56 shamalgs::collective::vector_allgatherv(
57 bytecounts, get_mpi_type<u64>(), all_bytecounts, get_mpi_type<u64>(), MPI_COMM_WORLD);
58
59 std::vector<u64> all_offsets = all_bytecounts;
60
61 std::exclusive_scan(all_offsets.begin(), all_offsets.end(), all_offsets.begin(), u64{0});
62
63 using namespace nlohmann;
64
65 json j;
66 j["pids"] = all_pids;
67 j["bytecounts"] = all_bytecounts;
68 j["offsets"] = all_offsets;
69
70 std::string sout = j.dump(4);
71
72 // Write to the file
73
74 u64 head_ptr = 0;
75 MPI_File mfile{};
76
77 shamcomm::open_reset_file(mfile, fname);
78
79 shambase::Timer timer;
80 timer.start();
81
82 // do some perf investigation before enabling preallocation
83 bool preallocate = false;
84 if (preallocate) {
85 MPI_Offset tot_byte = all_offsets.back() + all_bytecounts.back() + metadata_user.size()
86 + metadata_patch.size() + sout.size() + sizeof(std::size_t) * 3;
87 MPICHECK(MPI_File_preallocate(mfile, tot_byte));
88 }
89
90 shamalgs::collective::write_header(mfile, metadata_user, head_ptr);
91 shamalgs::collective::write_header(mfile, metadata_patch, head_ptr);
92 shamalgs::collective::write_header(mfile, sout, head_ptr);
93
94 shamlog_debug_ln(
95 "ShamrockDump",
96 shambase::format(
97 "table sizes {} {} {}", metadata_patch.size(), metadata_user.size(), sout.size()));
98
99 if (/*do check*/ true) {
100 auto check_same_mpi = [](std::string s) {
101 u64 out = shamalgs::collective::allreduce_sum(s.size());
102 if (out != s.size() * shamcomm::world_size()) {
103 logger::err_ln(
104 "ShamrockDump",
105 shambase::format(
106 "string size mismatch between all processes,\n size : {}\nthe "
107 "string : {}\n",
108 s.size(),
109 s));
111 "size mismatch in shamrock dump header");
112 }
113 };
114
115 check_same_mpi(metadata_user);
116 check_same_mpi(metadata_patch);
117 check_same_mpi(sout);
118 }
119
120 if (!shamcmdopt::getenv_str("SHAMDUMP_OFFSET_MODE_OLD").has_value()) {
121 // reset MPI view
122 MPICHECK(MPI_File_set_view(mfile, 0, MPI_BYTE, MPI_CHAR, "native", MPI_INFO_NULL));
123 }
124
125 // map of patch id -> all_pids idx
126 std::unordered_map<u64, size_t> map{};
127 for (u32 i = 0; i < all_pids.size(); i++) {
128 map[all_pids[i]] = i;
129 }
130
131 for (u32 i = 0; i < datas.size(); i++) {
132
133 u64 pid = pids[i];
134 u64 bytecount = bytecounts[i];
135
136 size_t off = all_offsets[map[pid]];
137 auto &data = datas[i];
138
139 shamcomm::CommunicationBuffer buf(data, shamsys::instance::get_compute_scheduler_ptr());
140
141 shamalgs::collective::write_at<u8>(mfile, buf.get_ptr(), bytecount, head_ptr + off);
142 }
143
144 // write data to file
145
146 MPI_File_close(&mfile);
147 timer.end();
148
149 if (shamcomm::world_rank() == 0) {
150 size_t plist_len = all_offsets.size();
151 size_t max_head = all_offsets[plist_len - 1] + all_bytecounts[plist_len - 1] + head_ptr;
152 logger::info_ln(
153 "Shamrock Dump",
154 shambase::format(
155 "dump to {}\n - took {}, bandwidth = {}/s",
156 fname,
157 timer.get_time_str(),
158 shambase::readable_sizeof(max_head / timer.elasped_sec())));
159 }
160 }
161
162 void load_shamrock_dump(std::string fname, std::string &metadata_user, ShamrockCtx &ctx) {
163
164 StackEntry stack_loc{};
165
166 u64 head_ptr = 0;
167 MPI_File mfile{};
168
169 shamcomm::open_read_only_file(mfile, fname);
170
171 shambase::Timer timer;
172 timer.start();
173
174 std::string metadata_patch{};
175 std::string patchdata_infos{};
176
177 metadata_user = shamalgs::collective::read_header(mfile, head_ptr);
178 metadata_patch = shamalgs::collective::read_header(mfile, head_ptr);
179 patchdata_infos = shamalgs::collective::read_header(mfile, head_ptr);
180
181 if (!shamcmdopt::getenv_str("SHAMDUMP_OFFSET_MODE_OLD").has_value()) {
182 // reset MPI view
183 MPICHECK(MPI_File_set_view(mfile, 0, MPI_BYTE, MPI_CHAR, "native", MPI_INFO_NULL));
184 }
185 // logger::raw_ln(metadata_user, metadata_patch, patchdata_infos);
186
187 using namespace nlohmann;
188
189 json jmeta_patch = json::parse(metadata_patch);
190 json jpdat_info = json::parse(patchdata_infos);
191
192 ctx.pdata_layout_new();
193 *ctx.pdl = jmeta_patch.at("patchdata_layout").get<patch::PatchDataLayerLayout>();
194 ctx.init_sched(
195 jmeta_patch.at("crit_patch_split").get<u64>(),
196 jmeta_patch.at("crit_patch_merge").get<u64>());
197
198 auto &sched = shambase::get_check_ref(ctx.sched);
199
200 sched.patch_list = jmeta_patch.at("patchlist").get<SchedulerPatchList>();
201 sched.patch_tree = jmeta_patch.at("patchtree").get<scheduler::PatchTree>();
202 sched.patch_data.sim_box.from_json(jmeta_patch.at("sim_box"));
203
204 // edit patch owner to fit in new world size, or spread if more processes now
205 // a bit dirty but gets the job done for now
206 // ideally we should call a load balance once
207 for (auto &p : sched.patch_list.global) {
208 p.node_owner_id = p.node_owner_id % shamcomm::world_size();
209 }
210
211 // rebuild local patch list
212 auto loc_ids = sched.patch_list.build_local();
213
214 // Load patchdata according to new LB
215
216 std::vector<u64> all_offsets;
217 std::vector<u64> all_pids;
218 std::vector<u64> all_bytecounts;
219
220 all_bytecounts = jpdat_info.at("bytecounts").get<std::vector<u64>>();
221 all_offsets = jpdat_info.at("offsets").get<std::vector<u64>>();
222 all_pids = jpdat_info.at("pids").get<std::vector<u64>>();
223
224 struct PatchFileOffset {
225 u64 offset, bytecount;
226 };
227
228 std::unordered_map<u64, PatchFileOffset> off_table;
229
230 for (u32 i = 0; i < all_bytecounts.size(); i++) {
231 off_table[all_pids[i]] = {all_offsets[i], all_bytecounts[i]};
232 }
233
234 for (const auto &p : sched.patch_list.local) {
235 u64 pid = p.id_patch;
236 auto loc_file_info = off_table[pid];
237
239 loc_file_info.bytecount, shamsys::instance::get_compute_scheduler_ptr());
240
241 shamalgs::collective::read_at<u8>(
242 mfile, buf.get_ptr(), loc_file_info.bytecount, head_ptr + loc_file_info.offset);
243
245
247 shamsys::instance::get_compute_scheduler_ptr(), std::move(out));
248
249 patch::PatchDataLayer pdat = patch::PatchDataLayer::deserialize_buf(ser, ctx.pdl);
250
251 sched.patch_data.owned_data.add_obj(pid, std::move(pdat));
252 }
253
254 MPI_File_close(&mfile);
255 timer.end();
256
257 if (shamcomm::world_rank() == 0) {
258 size_t plist_len = all_offsets.size();
259 size_t max_head = all_offsets[plist_len - 1] + all_bytecounts[plist_len - 1] + head_ptr;
260 logger::info_ln(
261 "Shamrock Dump",
262 shambase::format(
263 "load dump from {}\n - took {}, bandwidth = {}/s",
264 fname,
265 timer.get_time_str(),
266 shambase::readable_sizeof(max_head / timer.elasped_sec())));
267 }
268 }
269
270} // namespace shamrock
std::uint32_t u32
32 bit unsigned integer
std::uint64_t u64
64 bit unsigned integer
The MPI scheduler.
SchedulerPatchData patch_data
handle the data of the patches of the scheduler
Handle the patch list of the mpi scheduler.
A buffer allocated in USM (Unified Shared Memory)
size_t get_size() const
Gets the number of elements in the buffer.
Class Timer measures the time elapsed since the timer was started.
Definition time.hpp:96
std::string get_time_str() const
Converts the stored nanosecond time to a string representation.
Definition time.hpp:117
void end()
Stops the timer and stores the elapsed time in nanoseconds.
Definition time.hpp:111
f64 elasped_sec() const
Converts the stored nanosecond time to a floating point representation in seconds.
Definition time.hpp:123
void start()
Starts the timer.
Definition time.hpp:106
Shamrock communication buffers.
static sham::DeviceBuffer< u8 > convert_usm(CommunicationBuffer &&buf)
destroy the buffer and recover the held object
PatchDataLayer container class, the layout is described in patchdata_layout.
Patch Tree : Tree structure organisation for an abstract list of patches Nb : this tree is compatible...
Definition PatchTree.hpp:29
#define MPICHECK(mpicall)
Shortcut macro to check MPI return codes.
std::string readable_sizeof(double size)
given a sizeof value return a readble string Example : readable_sizeof(1024*1024*1024) -> "1....
Definition string.hpp:139
void throw_with_loc(std::string message, SourceLocation loc=SourceLocation{})
Throw an exception and append the source location to it.
T & get_check_ref(const std::unique_ptr< T > &ptr, SourceLocation loc=SourceLocation())
Takes a std::unique_ptr and returns a reference to the object it holds. It throws a std::runtime_erro...
Definition memory.hpp:110
std::optional< std::string > getenv_str(const char *env_var)
Get the content of the environment variable if it exist.
Definition env.cpp:24
void open_reset_file(MPI_File &fh, const std::string &fname)
Open a MPI file and remove its content.
Definition io.cpp:24
i32 world_rank()
Gives the rank of the current process in the MPI communicator.
Definition worldInfo.cpp:40
i32 world_size()
Gives the size of the MPI communicator.
Definition worldInfo.cpp:38
void open_read_only_file(MPI_File &fh, const std::string &fname)
Open a mpi file in read only mode.
Definition io.cpp:52
namespace for the main framework
Definition __init__.py:1
void load_shamrock_dump(std::string fname, std::string &metadata_user, ShamrockCtx &ctx)
Load a Shamrock dump file and restore the state of the patches and retreive user metadata.
void write_shamrock_dump(std::string fname, std::string metadata_user, PatchScheduler &sched)
Write a Shamrock dump file containing the current state of the patches and user supplied metadata.
This file contains the definition for the stacktrace related functionality.