28 std::string metadata_patch = sched.serialize_patch_metadata().dump(4);
30 using namespace shamrock::patch;
32 std::vector<u64> pids;
33 std::vector<u64> bytecounts;
34 std::vector<sham::DeviceBuffer<u8>> datas;
38 auto ser_sz = pdat.serialize_buf_byte_size();
41 pdat.serialize_buf(ser);
43 auto tmp = ser.finalize();
47 bytecounts.push_back(bytecount);
48 datas.push_back(std::move(tmp));
51 std::vector<u64> all_pids;
52 std::vector<u64> all_bytecounts;
54 shamalgs::collective::vector_allgatherv(
55 pids, get_mpi_type<u64>(), all_pids, get_mpi_type<u64>(), MPI_COMM_WORLD);
56 shamalgs::collective::vector_allgatherv(
57 bytecounts, get_mpi_type<u64>(), all_bytecounts, get_mpi_type<u64>(), MPI_COMM_WORLD);
59 std::vector<u64> all_offsets = all_bytecounts;
61 std::exclusive_scan(all_offsets.begin(), all_offsets.end(), all_offsets.begin(),
u64{0});
63 using namespace nlohmann;
67 j[
"bytecounts"] = all_bytecounts;
68 j[
"offsets"] = all_offsets;
70 std::string sout = j.dump(4);
83 bool preallocate =
false;
85 MPI_Offset tot_byte = all_offsets.back() + all_bytecounts.back() + metadata_user.size()
86 + metadata_patch.size() + sout.size() +
sizeof(std::size_t) * 3;
87 MPICHECK(MPI_File_preallocate(mfile, tot_byte));
90 shamalgs::collective::write_header(mfile, metadata_user, head_ptr);
91 shamalgs::collective::write_header(mfile, metadata_patch, head_ptr);
92 shamalgs::collective::write_header(mfile, sout, head_ptr);
97 "table sizes {} {} {}", metadata_patch.size(), metadata_user.size(), sout.size()));
100 auto check_same_mpi = [](std::string s) {
101 u64 out = shamalgs::collective::allreduce_sum(s.size());
106 "string size mismatch between all processes,\n size : {}\nthe "
111 "size mismatch in shamrock dump header");
115 check_same_mpi(metadata_user);
116 check_same_mpi(metadata_patch);
117 check_same_mpi(sout);
122 MPICHECK(MPI_File_set_view(mfile, 0, MPI_BYTE, MPI_CHAR,
"native", MPI_INFO_NULL));
126 std::unordered_map<u64, size_t> map{};
127 for (
u32 i = 0; i < all_pids.size(); i++) {
128 map[all_pids[i]] = i;
131 for (
u32 i = 0; i < datas.size(); i++) {
134 u64 bytecount = bytecounts[i];
136 size_t off = all_offsets[map[pid]];
137 auto &data = datas[i];
141 shamalgs::collective::write_at<u8>(mfile, buf.get_ptr(), bytecount, head_ptr + off);
146 MPI_File_close(&mfile);
150 size_t plist_len = all_offsets.size();
151 size_t max_head = all_offsets[plist_len - 1] + all_bytecounts[plist_len - 1] + head_ptr;
155 "dump to {}\n - took {}, bandwidth = {}/s",
174 std::string metadata_patch{};
175 std::string patchdata_infos{};
177 metadata_user = shamalgs::collective::read_header(mfile, head_ptr);
178 metadata_patch = shamalgs::collective::read_header(mfile, head_ptr);
179 patchdata_infos = shamalgs::collective::read_header(mfile, head_ptr);
183 MPICHECK(MPI_File_set_view(mfile, 0, MPI_BYTE, MPI_CHAR,
"native", MPI_INFO_NULL));
187 using namespace nlohmann;
189 json jmeta_patch = json::parse(metadata_patch);
190 json jpdat_info = json::parse(patchdata_infos);
192 ctx.pdata_layout_new();
195 jmeta_patch.at(
"crit_patch_split").get<
u64>(),
196 jmeta_patch.at(
"crit_patch_merge").get<
u64>());
202 sched.patch_data.sim_box.from_json(jmeta_patch.at(
"sim_box"));
207 for (
auto &p : sched.patch_list.global) {
212 auto loc_ids = sched.patch_list.build_local();
216 std::vector<u64> all_offsets;
217 std::vector<u64> all_pids;
218 std::vector<u64> all_bytecounts;
220 all_bytecounts = jpdat_info.at(
"bytecounts").get<std::vector<u64>>();
221 all_offsets = jpdat_info.at(
"offsets").get<std::vector<u64>>();
222 all_pids = jpdat_info.at(
"pids").get<std::vector<u64>>();
224 struct PatchFileOffset {
225 u64 offset, bytecount;
228 std::unordered_map<u64, PatchFileOffset> off_table;
230 for (
u32 i = 0; i < all_bytecounts.size(); i++) {
231 off_table[all_pids[i]] = {all_offsets[i], all_bytecounts[i]};
234 for (
const auto &p : sched.patch_list.local) {
235 u64 pid = p.id_patch;
236 auto loc_file_info = off_table[pid];
239 loc_file_info.bytecount, shamsys::instance::get_compute_scheduler_ptr());
241 shamalgs::collective::read_at<u8>(
242 mfile, buf.get_ptr(), loc_file_info.bytecount, head_ptr + loc_file_info.offset);
247 shamsys::instance::get_compute_scheduler_ptr(), std::move(out));
251 sched.patch_data.owned_data.add_obj(pid, std::move(pdat));
254 MPI_File_close(&mfile);
258 size_t plist_len = all_offsets.size();
259 size_t max_head = all_offsets[plist_len - 1] + all_bytecounts[plist_len - 1] + head_ptr;
263 "load dump from {}\n - took {}, bandwidth = {}/s",