Line data Source code
1 : #include "TRACE/tracemf.h"
2 : #define TRACE_NAME "TransferOutput"
3 : #include "artdaq/ArtModules/ArtdaqOutput.hh"
4 :
5 : #include <csignal>
6 : #include "art/Framework/Core/ModuleMacros.h"
7 : #include "artdaq/DAQdata/NetMonHeader.hh"
8 : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
9 : #include "artdaq/TransferPlugins/TransferInterface.hh"
10 :
11 : namespace art {
12 : class TransferOutput;
13 : }
14 :
15 : /**
16 : * \brief An art::OutputModule which sends events using DataSenderManager.
17 : * This module is designed for transporting Fragment-wrapped art::Events after
18 : * they have been read into art, for example between the EventBuilder and the Aggregator.
19 : */
20 : class art::TransferOutput : public ArtdaqOutput
21 : {
22 : public:
23 : /**
24 : * \brief TransferOutput Constructor
25 : * \param ps ParameterSet used to configure TransferOutput
26 : *
27 : * TransferOutput accepts no Parameters beyond those which art::OutputModule takes.
28 : * See the art::OutputModule documentation for more details on those Parameters.
29 : */
30 : explicit TransferOutput(fhicl::ParameterSet const& ps);
31 :
32 : /**
33 : * \brief TransferOutput Destructor
34 : */
35 : ~TransferOutput() override;
36 :
37 : protected:
38 : /// <summary>
39 : /// Send a message using the Transfer Plugin
40 : /// </summary>
41 : /// <param name="fragment">Fragment to send</param>
42 : void SendMessage(artdaq::FragmentPtr& fragment) override;
43 :
44 : private:
45 : TransferOutput(TransferOutput const&) = delete;
46 : TransferOutput(TransferOutput&&) = delete;
47 : TransferOutput& operator=(TransferOutput const&) = delete;
48 : TransferOutput& operator=(TransferOutput&&) = delete;
49 :
50 : size_t send_timeout_us_;
51 : size_t send_retry_count_;
52 : std::unique_ptr<artdaq::TransferInterface> transfer_;
53 : };
54 :
55 0 : art::TransferOutput::TransferOutput(fhicl::ParameterSet const& ps)
56 0 : : ArtdaqOutput(ps), send_timeout_us_(ps.get<size_t>("send_timeout_us", 5000000)), send_retry_count_(ps.get<size_t>("send_retry_count", 5))
57 : {
58 0 : TLOG(TLVL_DEBUG + 32) << "Begin: TransferOutput::TransferOutput(ParameterSet const& ps)";
59 0 : transfer_ = artdaq::MakeTransferPlugin(ps, "transfer_plugin", artdaq::TransferInterface::Role::kSend);
60 0 : TLOG(TLVL_DEBUG + 32) << "END: TransferOutput::TransferOutput";
61 0 : }
62 :
63 0 : art::TransferOutput::~TransferOutput()
64 : {
65 0 : TLOG(TLVL_DEBUG + 32) << "Begin: TransferOutput::~TransferOutput()";
66 :
67 0 : auto sts = transfer_->transfer_fragment_min_blocking_mode(*artdaq::Fragment::eodFrag(0), 10000);
68 0 : if (sts != artdaq::TransferInterface::CopyStatus::kSuccess)
69 : {
70 0 : TLOG(TLVL_ERROR) << "Error sending EOD Fragment!";
71 : }
72 0 : transfer_.reset(nullptr);
73 0 : TLOG(TLVL_DEBUG + 32) << "End: TransferOutput::~TransferOutput()";
74 0 : }
75 :
76 0 : void art::TransferOutput::SendMessage(artdaq::FragmentPtr& fragment)
77 : {
78 0 : TLOG(TLVL_DEBUG + 32) << "Sending message with sequenceID=" << fragment->sequenceID() << ", type=" << static_cast<int>(fragment->type())
79 0 : << ", length=" << fragment->dataSizeBytes();
80 0 : auto sts = artdaq::TransferInterface::CopyStatus::kErrorNotRequiringException;
81 0 : size_t retries = 0;
82 0 : while (sts != artdaq::TransferInterface::CopyStatus::kSuccess && retries <= send_retry_count_)
83 : {
84 0 : sts = transfer_->transfer_fragment_min_blocking_mode(*fragment, send_timeout_us_);
85 0 : retries++;
86 : }
87 0 : if (retries > send_retry_count_)
88 : {
89 0 : TLOG(TLVL_ERROR) << "Error communicating with remote after " << retries << " tries. Closing art process";
90 0 : kill(getpid(), SIGUSR2);
91 : }
92 :
93 : #if 0
94 : if (messageType == artdaq::Fragment::InitFragmentType)
95 : {
96 : std::fstream ostream("sendInitMessage_TransferOutput.bin", std::ios::out | std::ios::binary);
97 : ostream.write(msg.Buffer(), msg.Length());
98 : ostream.close();
99 : }
100 : #endif
101 0 : }
102 :
103 0 : DEFINE_ART_MODULE(art::TransferOutput) // NOLINT(performance-unnecessary-value-param)
|