Line data Source code
1 : #include "TRACE/tracemf.h"
2 :
3 : #include "artdaq/ArtModules/ArtdaqOutput.hh"
4 : #include "artdaq/ArtModules/ArtdaqSharedMemoryServiceInterface.h"
5 : #include "artdaq/DAQdata/Globals.hh"
6 : #include "artdaq/DAQrate/DataSenderManager.hh"
7 :
8 : #include "art/Framework/Core/ModuleMacros.h"
9 : #include "art/Framework/Services/Registry/ServiceHandle.h"
10 : #include "fhiclcpp/ParameterSet.h"
11 :
12 : #include <memory>
13 :
14 : // if TRACE_NAME has variable, it is safest to define after includes
15 : #define TRACE_NAME (app_name + "_RootNetOutput").c_str()
16 :
17 : #define DUMP_SEND_MESSAGE 0
18 :
19 : namespace art {
20 : class RootNetOutput;
21 : }
22 :
23 : /**
24 : * \brief An art::OutputModule which sends events using DataSenderManager.
25 : * This module is designed for transporting Fragment-wrapped art::Events after
26 : * they have been read into art, for example between the EventBuilder and the Aggregator.
27 : */
28 : class art::RootNetOutput : public ArtdaqOutput
29 : {
30 : public:
31 : /**
32 : * \brief RootNetOutput Constructor
33 : * \param ps ParameterSet used to configure RootNetOutput
34 : *
35 : * RootNetOutput accepts no Parameters beyond those which art::OutputModule takes.
36 : * See the art::OutputModule documentation for more details on those Parameters.
37 : */
38 : explicit RootNetOutput(fhicl::ParameterSet const& ps);
39 :
40 : /**
41 : * \brief RootNetOutput Destructor
42 : */
43 : ~RootNetOutput() override;
44 :
45 : /**
46 : * \brief Get the number of data receivers
47 : * \return The number of data receivers
48 : */
49 : size_t dataReceiverCount() const { return sender_ptr_->destinationCount(); }
50 :
51 : protected:
52 : /// <summary>
53 : /// Send a message using DataSenderManager
54 : /// </summary>
55 : /// <param name="fragment">Fragment to send</param>
56 : void SendMessage(artdaq::FragmentPtr& fragment) override;
57 :
58 : private:
59 : RootNetOutput(RootNetOutput const&) = delete;
60 : RootNetOutput(RootNetOutput&&) = delete;
61 : RootNetOutput& operator=(RootNetOutput const&) = delete;
62 : RootNetOutput& operator=(RootNetOutput&&) = delete;
63 :
64 : void connect();
65 : void disconnect();
66 :
67 : std::unique_ptr<artdaq::DataSenderManager> sender_ptr_;
68 : fhicl::ParameterSet data_pset_;
69 : double init_timeout_s_;
70 : };
71 :
72 0 : art::RootNetOutput::RootNetOutput(fhicl::ParameterSet const& ps)
73 : : ArtdaqOutput(ps)
74 0 : , sender_ptr_(nullptr)
75 0 : , data_pset_(ps)
76 : {
77 0 : TLOG(TLVL_DEBUG + 32) << "Begin: RootNetOutput::RootNetOutput(ParameterSet const& ps)";
78 : // Make sure the ArtdaqSharedMemoryService is available
79 0 : art::ServiceHandle<ArtdaqSharedMemoryServiceInterface> shm;
80 0 : init_timeout_s_ = ps.get<double>("init_fragment_timeout_seconds", 1.0);
81 0 : connect();
82 0 : TLOG(TLVL_DEBUG + 32) << "End: RootNetOutput::RootNetOutput(ParameterSet const& ps)";
83 0 : }
84 :
85 0 : art::RootNetOutput::~RootNetOutput()
86 : {
87 0 : TLOG(TLVL_DEBUG + 32) << "Begin: RootNetOutput::~RootNetOutput()";
88 0 : disconnect();
89 0 : TLOG(TLVL_DEBUG + 32) << "End: RootNetOutput::~RootNetOutput()";
90 0 : }
91 :
92 0 : void art::RootNetOutput::SendMessage(artdaq::FragmentPtr& fragment)
93 : {
94 : //
95 : // Send message.
96 : //
97 : {
98 0 : TLOG(TLVL_WRITE) << "RootNetOutput::SendMessage Sending a message with type code "
99 0 : << artdaq::detail::RawFragmentHeader::SystemTypeToString(fragment->type());
100 0 : if (sender_ptr_ == nullptr)
101 : {
102 0 : TLOG(TLVL_DEBUG + 32) << "Reconnecting DataSenderManager";
103 0 : connect();
104 : }
105 :
106 : #if DUMP_SEND_MESSAGE
107 : std::string fileName = "sendMessage_" + std::to_string(my_rank) + "_" + std::to_string(getpid()) + "_" +
108 : std::to_string(sequenceId) + ".bin";
109 : std::fstream ostream(fileName, std::ios::out | std::ios::binary);
110 : ostream.write(msg.Buffer(), msg.Length());
111 : ostream.close();
112 : #endif
113 :
114 0 : auto sequenceId = fragment->sequenceID();
115 0 : TLOG(TLVL_DEBUG + 32) << "Sending message with sequenceID=" << sequenceId << ", type=" << static_cast<int>(fragment->type())
116 0 : << ", length=" << fragment->dataSizeBytes();
117 :
118 0 : sender_ptr_->sendFragment(std::move(*fragment));
119 : // Events are unique in art, so this will be the only send with this sequence ID!
120 0 : sender_ptr_->RemoveRoutingTableEntry(sequenceId);
121 0 : TLOG(TLVL_WRITE) << "RootNetOutput::SendMessage: Message sent.";
122 : }
123 0 : }
124 :
125 0 : void art::RootNetOutput::connect()
126 : {
127 0 : auto start_time = std::chrono::steady_clock::now();
128 :
129 0 : char const* artapp_env = getenv("ARTDAQ_RANK");
130 0 : if (artapp_env != nullptr && my_rank < 0)
131 : {
132 0 : my_rank = strtol(artapp_env, nullptr, 10);
133 : }
134 :
135 0 : while (my_rank == -1 && artdaq::TimeUtils::GetElapsedTime(start_time) < init_timeout_s_)
136 : {
137 0 : usleep(1000);
138 : }
139 0 : sender_ptr_ = std::make_unique<artdaq::DataSenderManager>(data_pset_);
140 0 : }
141 :
142 0 : void art::RootNetOutput::disconnect()
143 : {
144 0 : if (sender_ptr_)
145 : {
146 0 : sender_ptr_.reset(nullptr);
147 : }
148 0 : }
149 :
150 0 : DEFINE_ART_MODULE(art::RootNetOutput) // NOLINT(performance-unnecessary-value-param)
|