Line data Source code
1 : #include "TRACE/tracemf.h"
2 : #define TRACE_NAME "ShmemWrapper"
3 :
4 : #include "artdaq/ArtModules/detail/ShmemWrapper.hh"
5 :
6 : #include "art/Framework/Services/Registry/ServiceHandle.h"
7 : #include "artdaq/ArtModules/ArtdaqSharedMemoryServiceInterface.h"
8 : #include "artdaq/DAQdata/NetMonHeader.hh"
9 :
10 0 : art::ShmemWrapper::ShmemWrapper(fhicl::ParameterSet const& ps)
11 : {
12 0 : init_timeout_s_ = ps.get<double>("init_fragment_timeout_seconds", 600.0);
13 : // Make sure the ArtdaqSharedMemoryService is available
14 0 : art::ServiceHandle<ArtdaqSharedMemoryServiceInterface> shm;
15 0 : }
16 :
17 0 : std::shared_ptr<ArtdaqEvent> art::ShmemWrapper::receiveMessages()
18 : {
19 0 : TLOG(TLVL_DEBUG + 34) << "Receiving Fragment from NetMonTransportService";
20 0 : TLOG(TLVL_DEBUG + 33) << "receiveMessage BEGIN";
21 0 : art::ServiceHandle<ArtdaqSharedMemoryServiceInterface> shm;
22 0 : std::shared_ptr<ArtdaqEvent> output;
23 :
24 : // Do not process data until Init Fragment received!
25 0 : auto start = std::chrono::steady_clock::now();
26 0 : while (!init_received_ && artdaq::TimeUtils::GetElapsedTime(start) < init_timeout_s_)
27 : {
28 0 : usleep(static_cast<unsigned>(init_timeout_s_ * 1000000 / 100)); // Check 100 times
29 : }
30 0 : if (!init_received_)
31 : {
32 0 : TLOG(TLVL_ERROR) << "Did not receive Init Fragment after " << init_timeout_s_ << " seconds.";
33 : }
34 :
35 0 : output = shm->ReceiveEvent(false);
36 :
37 0 : if (output == nullptr)
38 : {
39 0 : TLOG(TLVL_DEBUG + 32) << "Did not receive event after timeout, returning from receiveMessage ";
40 0 : return output;
41 : }
42 :
43 0 : TLOG(TLVL_DEBUG + 33) << "receiveMessage END";
44 :
45 0 : TLOG(TLVL_DEBUG + 34) << "Done Receiving Fragments from Shared Memory";
46 0 : return output;
47 0 : }
48 :
49 0 : artdaq::FragmentPtrs art::ShmemWrapper::receiveInitMessage()
50 : {
51 0 : TLOG(TLVL_DEBUG + 34) << "Receiving Init Fragment from NetMonTransportService";
52 :
53 0 : TLOG(TLVL_DEBUG + 33) << "receiveInitMessage BEGIN";
54 0 : art::ServiceHandle<ArtdaqSharedMemoryServiceInterface> shm;
55 0 : auto start = std::chrono::steady_clock::now();
56 0 : std::shared_ptr<ArtdaqEvent> eventMap;
57 0 : while (eventMap == nullptr)
58 : {
59 0 : eventMap = shm->ReceiveEvent(true);
60 :
61 0 : if (eventMap != nullptr)
62 : {
63 0 : auto type = eventMap->FirstFragmentType();
64 0 : if (type == artdaq::Fragment::EndOfDataFragmentType)
65 : {
66 0 : TLOG(TLVL_DEBUG + 32) << "Received shutdown message, returning";
67 0 : artdaq::FragmentPtrs output;
68 0 : for (auto& frag : *eventMap->fragments[artdaq::Fragment::EndOfDataFragmentType])
69 : {
70 0 : output.emplace_back(new artdaq::Fragment(std::move(frag)));
71 : }
72 0 : return output;
73 0 : }
74 0 : if (type != artdaq::Fragment::InitFragmentType)
75 : {
76 0 : TLOG(TLVL_WARNING) << "Did NOT receive Init Fragment as first broadcast! Type="
77 0 : << artdaq::detail::RawFragmentHeader::SystemTypeToString(type);
78 0 : eventMap = nullptr;
79 : }
80 : }
81 0 : else if (artdaq::TimeUtils::GetElapsedTime(start) > init_timeout_s_)
82 : {
83 0 : TLOG(TLVL_WARNING) << "Did not receive Init fragment after init_fragment_timeout_seconds (" << artdaq::TimeUtils::GetElapsedTime(start) << ")!";
84 0 : return artdaq::FragmentPtrs();
85 : }
86 : }
87 :
88 : // We return false, indicating we're done reading, if:
89 : // 1) we did not obtain an event, because we timed out and were
90 : // configured NOT to keep trying after a timeout, or
91 : // 2) the event we read was the end-of-data marker: a null
92 : // pointer
93 :
94 0 : TLOG(TLVL_DEBUG + 33) << "receiveInitMessage: Returning top Fragment";
95 0 : artdaq::FragmentPtrs output;
96 0 : for (auto& frag : *eventMap->fragments[artdaq::Fragment::InitFragmentType])
97 : {
98 0 : output.emplace_back(new artdaq::Fragment(std::move(frag)));
99 : }
100 :
101 : #if DUMP_RECEIVE_MESSAGE
102 : std::string fileName = "receiveInitMessage_" + std::to_string(getpid()) + ".bin";
103 : std::fstream ostream(fileName.c_str(), std::ios::out | std::ios::binary);
104 : ostream.write(buffer, header->data_length);
105 : ostream.close();
106 : #endif
107 :
108 0 : TLOG(TLVL_DEBUG + 33) << "receiveInitMessage END";
109 0 : init_received_ = true;
110 :
111 0 : TLOG(TLVL_DEBUG + 34) << "Done Receiving Init Fragment from NetMonTransportService";
112 0 : return output;
113 0 : }
|