LCOV - code coverage report
Current view: top level - artdaq/DAQrate - SharedMemoryEventManager.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 64.0 % 1052 673
Test Date: 2025-09-04 00:45:34 Functions: 68.0 % 203 138

            Line data    Source code
       1              : 
       2              : #include "artdaq/DAQrate/SharedMemoryEventManager.hh"
       3              : #include <sys/wait.h>
       4              : 
       5              : #include <memory>
       6              : #include <numeric>
       7              : 
       8              : #include "artdaq-core/Core/StatisticsCollection.hh"
       9              : #include "artdaq-core/Data/MetadataFragment.hh"
      10              : #include "artdaq-core/Utilities/TraceLock.hh"
      11              : 
      12              : #define TRACE_NAME (app_name + "_SharedMemoryEventManager").c_str()
      13              : 
      14              : // clang-format off
      15              : #define TLVL_ADDFRAGMENT            32
      16              : #define TLVL_ADDINITFRAGMENT        33
      17              : #define TLVL_BROADCASTFRAGMENT      34
      18              : #define TLVL_BROADCASTFRAGMENTS     35
      19              : #define TLVL_BROADCASTFRAGMENTS_2   36
      20              : #define TLVL_CHECKPENDINGBROADCASTS 37
      21              : #define TLVL_CHECKPENDINGBUFFERS    38
      22              : #define TLVL_CHECKPENDINGBUFFERS_2  39
      23              : #define TLVL_CHECKPENDINGBUFFERS_3  40
      24              : #define TLVL_CHECKPENDINGBUFFERS_4  41
      25              : #define TLVL_COMPLETEBUFFER         42
      26              : #define TLVL_CONSTRUCTOR            43
      27              : #define TLVL_DESTRUCTOR             43
      28              : #define TLVL_DONEWRITINGFRAGMENT    44
      29              : #define TLVL_ENDOFDATA              45
      30              : #define TLVL_ENDOFDATA_2            46
      31              : #define TLVL_ENDRUN                 47
      32              : #define TLVL_GETBUFFER              48
      33              : #define TLVL_GETFRAGMENTCOUNT       49
      34              : #define TLVL_GETSUBRUN              50
      35              : #define TLVL_GETSUBRUN_2            51
      36              : #define TLVL_PARSEARTCOMMANDLINE    52
      37              : #define TLVL_RECONFIGUREART         53
      38              : #define TLVL_RUNART                 54
      39              : #define TLVL_RUNART_2               55
      40              : #define TLVL_SENDINIT               56
      41              : #define TLVL_SENDMETRICS            57
      42              : #define TLVL_SHUTDOWN               58
      43              : #define TLVL_STARTRUN               58
      44              : #define TLVL_UPDATEARTCONFIG        59
      45              : #define TLVL_WRITEFRAGMENTHEADER    60
      46              : #define TLVL_BUFFER                 61
      47              : #define TLVL_BUFLCK                 62
      48              : // clang-format on
      49              : 
      50              : std::mutex artdaq::SharedMemoryEventManager::sequence_id_mutex_;
      51              : std::mutex artdaq::SharedMemoryEventManager::subrun_event_map_mutex_;
      52              : const std::string artdaq::SharedMemoryEventManager::
      53              :     FRAGMENTS_RECEIVED_STAT_KEY("SharedMemoryEventManagerFragmentsReceived");
      54              : const std::string artdaq::SharedMemoryEventManager::
      55              :     EVENTS_RELEASED_STAT_KEY("SharedMemoryEventManagerEventsReleased");
      56              : 
      57           15 : artdaq::SharedMemoryEventManager::SharedMemoryEventManager(const fhicl::ParameterSet& pset, fhicl::ParameterSet art_pset)
      58           30 :     : SharedMemoryManager(pset.get<uint32_t>("shared_memory_key", Globals::SharedMemoryKey(0xEE000000)),
      59           15 :                           pset.get<size_t>("buffer_count"),
      60           75 :                           pset.has_key("max_event_size_bytes") ? pset.get<size_t>("max_event_size_bytes") : pset.get<size_t>("expected_fragments_per_event") * pset.get<size_t>("max_fragment_size_bytes"),
      61           30 :                           pset.get<size_t>("stale_buffer_timeout_usec", pset.get<size_t>("event_queue_wait_time", 5) * 1000000),
      62           30 :                           !pset.get<bool>("broadcast_mode", false))
      63           30 :     , num_art_processes_(pset.get<size_t>("art_analyzer_count", 1))
      64           30 :     , num_fragments_per_event_(pset.get<size_t>("expected_fragments_per_event"))
      65           15 :     , queue_size_(pset.get<size_t>("buffer_count"))
      66           15 :     , run_id_(0)
      67           15 :     , subrun_id_(0)
      68           30 :     , max_subrun_event_map_length_(pset.get<size_t>("max_subrun_lookup_table_size", 100))
      69           30 :     , subrun_transition_hold_time_s_(pset.get<double>("subrun_transition_hold_time_s", 0.001))
      70           30 :     , max_event_list_length_(pset.get<size_t>("max_event_list_length", 100))
      71           30 :     , update_run_ids_(pset.get<bool>("update_run_ids_on_new_fragment", true))
      72           30 :     , use_sequence_id_for_event_number_(pset.get<bool>("use_sequence_id_for_event_number", true))
      73           50 :     , overwrite_mode_(!pset.get<bool>("use_art", true) || pset.get<bool>("overwrite_mode", false) || pset.get<bool>("broadcast_mode", false))
      74           60 :     , init_fragment_count_(pset.get<size_t>("init_fragment_count", pset.get<bool>("send_init_fragments", true) ? 1 : 0))
      75           15 :     , running_(false)
      76           15 :     , buffer_writes_pending_()
      77           60 :     , open_event_report_interval_ms_(pset.get<int>("open_event_report_interval_ms", pset.get<int>("incomplete_event_report_interval_ms", -1)))
      78           15 :     , last_open_event_report_time_(std::chrono::steady_clock::now())
      79           15 :     , last_backpressure_report_time_(std::chrono::steady_clock::now())
      80           15 :     , last_fragment_header_write_time_(std::chrono::steady_clock::now())
      81           45 :     , event_timing_(pset.get<size_t>("buffer_count"))
      82           30 :     , broadcast_timeout_ms_(pset.get<int>("fragment_broadcast_timeout_ms", 3000))
      83           15 :     , run_event_count_(0)
      84           15 :     , run_incomplete_event_count_(0)
      85           15 :     , subrun_event_count_(0)
      86           15 :     , subrun_incomplete_event_count_(0)
      87           15 :     , oversize_fragment_count_(0)
      88           45 :     , maximum_oversize_fragment_count_(pset.get<int>("maximum_oversize_fragment_count", 1))
      89           15 :     , restart_art_(false)
      90           30 :     , always_restart_art_(pset.get<bool>("restart_crashed_art_processes", true))
      91           30 :     , manual_art_(pset.get<bool>("manual_art", false))
      92           15 :     , current_art_pset_(art_pset)
      93           45 :     , art_cmdline_(pset.get<std::string>("art_command_line", "art -c #CONFIG_FILE#"))
      94           30 :     , art_process_index_offset_(pset.get<size_t>("art_index_offset", 0))
      95           30 :     , minimum_art_lifetime_s_(pset.get<double>("minimum_art_lifetime_s", 2.0))
      96           30 :     , art_event_processing_time_us_(pset.get<size_t>("expected_art_event_processing_time_us", 1000000))
      97           15 :     , requests_(nullptr)
      98           15 :     , tokens_(nullptr)
      99           15 :     , data_pset_(pset)
     100           60 :     , broadcasts_(pset.get<uint32_t>("broadcast_shared_memory_key", Globals::SharedMemoryKey(0xBB000000)),
     101           45 :                   pset.get<size_t>("broadcast_buffer_count", 10),
     102           15 :                   pset.get<size_t>("broadcast_buffer_size", 0x100000),
     103          345 :                   pset.get<int>("expected_art_event_processing_time_us", 100000) * pset.get<size_t>("buffer_count"), false)
     104              : {
     105           15 :         subrun_event_map_[0] = 1;
     106           15 :         SetMinWriteSize(sizeof(detail::RawEventHeader) + sizeof(detail::RawFragmentHeader));
     107           15 :         broadcasts_.SetMinWriteSize(sizeof(detail::RawEventHeader) + sizeof(detail::RawFragmentHeader));
     108              : 
     109           15 :         RegisterWriter();
     110           15 :         broadcasts_.RegisterWriter();
     111              : 
     112           45 :         if (!pset.get<bool>("use_art", true))
     113              :         {
     114           30 :                 TLOG(TLVL_INFO) << "BEGIN SharedMemoryEventManager CONSTRUCTOR with use_art:false";
     115           10 :                 num_art_processes_ = 0;
     116              :         }
     117              :         else
     118              :         {
     119           15 :                 TLOG(TLVL_INFO) << "BEGIN SharedMemoryEventManager CONSTRUCTOR with use_art:true";
     120           10 :                 TLOG(TLVL_CONSTRUCTOR) << "art_pset is " << art_pset.to_string();
     121              :         }
     122              : 
     123           15 :         if (manual_art_)
     124            0 :                 current_art_config_file_ = std::make_shared<art_config_file>(art_pset, GetKey(), GetBroadcastKey());
     125              :         else
     126           15 :                 current_art_config_file_ = std::make_shared<art_config_file>(art_pset);
     127              : 
     128           15 :         if (overwrite_mode_ && num_art_processes_ > 0)
     129              :         {
     130            0 :                 TLOG(TLVL_WARNING) << "Art is configured to run, but overwrite mode is enabled! Check your configuration if this in unintentional!";
     131            0 :         }
     132           15 :         else if (overwrite_mode_)
     133              :         {
     134           30 :                 TLOG(TLVL_INFO) << "Overwrite Mode enabled, no configured art processes at startup";
     135              :         }
     136              : 
     137          103 :         for (size_t ii = 0; ii < size(); ++ii)
     138              :         {
     139           88 :                 buffer_writes_pending_[ii] = 0;
     140              :                 // Make sure the mutexes are created once
     141           88 :                 std::lock_guard<std::mutex> lk(buffer_mutexes_[ii]);
     142           88 :         }
     143              : 
     144           15 :         if (!IsValid())
     145              :         {
     146            0 :                 throw cet::exception(app_name + "_SharedMemoryEventManager") << "Unable to attach to Shared Memory!";  // NOLINT(cert-err60-cpp)
     147              :         }
     148              : 
     149           30 :         TLOG(TLVL_CONSTRUCTOR) << "Setting Writer rank to " << my_rank;
     150           15 :         SetRank(my_rank);
     151           30 :         TLOG(TLVL_CONSTRUCTOR) << "Writer Rank is " << GetRank();
     152              : 
     153           15 :         statsHelper_.addMonitoredQuantityName(FRAGMENTS_RECEIVED_STAT_KEY);
     154           15 :         statsHelper_.addMonitoredQuantityName(EVENTS_RELEASED_STAT_KEY);
     155              : 
     156              :         // fetch the monitoring parameters and create the MonitoredQuantity instances
     157           15 :         statsHelper_.createCollectors(pset, 100, 30.0, 60.0, EVENTS_RELEASED_STAT_KEY);
     158              : 
     159           30 :         TLOG(TLVL_CONSTRUCTOR) << "END CONSTRUCTOR";
     160           15 : }
     161              : 
     162           16 : artdaq::SharedMemoryEventManager::~SharedMemoryEventManager() noexcept
     163              : {
     164           30 :         TLOG(TLVL_DESTRUCTOR) << "DESTRUCTOR";
     165           15 :         if (running_)
     166              :         {
     167              :                 try
     168              :                 {
     169            8 :                         endOfData();
     170              :                 }
     171            0 :                 catch (...)
     172              :                 {
     173              :                         // IGNORED
     174            0 :                 }
     175              :         }
     176              : 
     177           15 :         UnregisterWriter();
     178           15 :         broadcasts_.UnregisterWriter();
     179           30 :         TLOG(TLVL_DESTRUCTOR) << "Destructor END";
     180           16 : }
     181              : 
     182         2758 : bool artdaq::SharedMemoryEventManager::AddFragment(detail::RawFragmentHeader frag, void* dataPtr)
     183              : {
     184         2758 :         if (!running_) return true;
     185              : 
     186         5516 :         TLOG(TLVL_ADDFRAGMENT) << "AddFragment(Header, ptr) BEGIN frag.word_count=" << frag.word_count
     187         2758 :                                << ", sequence_id=" << frag.sequence_id;
     188         2758 :         auto buffer = getBufferForSequenceID_(frag.sequence_id, true, frag.timestamp);
     189         5516 :         TLOG(TLVL_ADDFRAGMENT) << "Using buffer " << buffer << " for seqid=" << frag.sequence_id;
     190         2758 :         if (buffer == -1)
     191              :         {
     192          687 :                 return false;
     193              :         }
     194         2071 :         if (buffer == -2)
     195              :         {
     196            0 :                 TLOG(TLVL_ERROR) << "Dropping event because data taking has already passed this event number: " << frag.sequence_id;
     197            0 :                 return true;
     198              :         }
     199              : 
     200         2071 :         auto hdr = getEventHeader_(buffer);
     201         2071 :         if (update_run_ids_)
     202              :         {
     203         2071 :                 hdr->run_id = run_id_;
     204         2071 :                 hdr->subrun_id = subrun_id_;
     205              :         }
     206         2071 :         hdr->subrun_id = GetSubrunForSequenceID(frag.sequence_id);
     207              : 
     208         4142 :         TLOG(TLVL_ADDFRAGMENT) << "AddFragment before Write calls";
     209         2071 :         Write(buffer, dataPtr, frag.word_count * sizeof(RawDataType));
     210              : 
     211         4142 :         TLOG(TLVL_ADDFRAGMENT) << "Checking for complete event";
     212         2071 :         auto fragmentCount = GetFragmentCount(frag.sequence_id);
     213         2071 :         hdr->is_complete = fragmentCount == num_fragments_per_event_ && buffer_writes_pending_[buffer] == 0;
     214         4142 :         TLOG(TLVL_ADDFRAGMENT) << "hdr->is_complete=" << std::boolalpha << hdr->is_complete
     215            0 :                                << ", fragmentCount=" << fragmentCount
     216            0 :                                << ", num_fragments_per_event=" << num_fragments_per_event_
     217         2071 :                                << ", buffer_writes_pending_[buffer]=" << buffer_writes_pending_[buffer];
     218              : 
     219         2071 :         complete_buffer_(buffer);
     220         2071 :         if (requests_)
     221              :         {
     222         2071 :                 requests_->SendRequest(true);
     223              :         }
     224              : 
     225         4142 :         TLOG(TLVL_ADDFRAGMENT) << "AddFragment END";
     226         2071 :         statsHelper_.addSample(FRAGMENTS_RECEIVED_STAT_KEY, frag.word_count * sizeof(RawDataType));
     227         2071 :         return true;
     228              : }
     229              : 
     230         2071 : bool artdaq::SharedMemoryEventManager::AddFragment(FragmentPtr frag, size_t timeout_usec, FragmentPtr& outfrag)
     231              : {
     232         4142 :         TLOG(TLVL_ADDFRAGMENT) << "AddFragment(FragmentPtr) BEGIN";
     233         2071 :         auto hdr = *reinterpret_cast<detail::RawFragmentHeader*>(frag->headerAddress());  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     234         2071 :         auto data = frag->headerAddress();
     235         2071 :         auto start = std::chrono::steady_clock::now();
     236         2071 :         bool sts = false;
     237         6900 :         while (!sts && TimeUtils::GetElapsedTimeMicroseconds(start) < timeout_usec)
     238              :         {
     239         2758 :                 sts = AddFragment(hdr, data);
     240         2758 :                 if (!sts)
     241              :                 {
     242          687 :                         usleep(1000);
     243              :                 }
     244              :         }
     245         2071 :         if (!sts)
     246              :         {
     247            0 :                 outfrag = std::move(frag);
     248              :         }
     249         4142 :         TLOG(TLVL_ADDFRAGMENT) << "AddFragment(FragmentPtr) RETURN " << std::boolalpha << sts;
     250         2071 :         return sts;
     251              : }
     252              : 
     253           26 : artdaq::RawDataType* artdaq::SharedMemoryEventManager::WriteFragmentHeader(detail::RawFragmentHeader frag, bool dropIfNoBuffersAvailable)
     254              : {
     255           26 :         if (!running_) return nullptr;
     256           50 :         TLOG(TLVL_WRITEFRAGMENTHEADER) << "WriteFragmentHeader BEGIN, seqID=" << frag.sequence_id;
     257           25 :         auto buffer = getBufferForSequenceID_(frag.sequence_id, true, frag.timestamp);
     258              : 
     259           25 :         if (buffer < 0)
     260              :         {
     261            2 :                 if (buffer == -1 && !dropIfNoBuffersAvailable)
     262              :                 {
     263            0 :                         std::unique_lock<std::mutex> bp_lk(sequence_id_mutex_);
     264            0 :                         if (TimeUtils::GetElapsedTime(last_backpressure_report_time_) > 1.0)
     265              :                         {
     266            0 :                                 TLOG(TLVL_WARNING) << app_name << ": Back-pressure condition: All Shared Memory buffers have been full for " << TimeUtils::GetElapsedTime(last_fragment_header_write_time_) << " s!";
     267            0 :                                 last_backpressure_report_time_ = std::chrono::steady_clock::now();
     268              :                         }
     269            0 :                         if (metricMan)
     270              :                         {
     271            0 :                                 metricMan->sendMetric("Back-pressure wait time", TimeUtils::GetElapsedTime(last_fragment_header_write_time_), "s", 1, MetricMode::LastPoint);
     272              :                         }
     273            0 :                         TLOG(TLVL_WRITEFRAGMENTHEADER) << "No shared memory buffers available, seqID=" << frag.sequence_id;
     274            0 :                         return nullptr;
     275            0 :                 }
     276            2 :                 if (buffer == -2)
     277              :                 {
     278            6 :                         TLOG(TLVL_ERROR) << "Dropping fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id << " because data taking has already passed this event.";
     279              :                 }
     280              :                 else
     281              :                 {
     282            0 :                         TLOG(TLVL_INFO) << "Dropping fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id << " because there is no room in the queue and reliable mode is off.";
     283              :                 }
     284            2 :                 dropped_data_.emplace_back(frag, std::make_unique<Fragment>(frag.word_count - frag.num_words()));
     285            2 :                 auto it = dropped_data_.rbegin();
     286              : 
     287            4 :                 TLOG(TLVL_WRITEFRAGMENTHEADER) << "Dropping fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id << " into "
     288            2 :                                                << static_cast<void*>(it->second->dataBegin()) << " sz=" << it->second->dataSizeBytes();
     289              : 
     290            2 :                 return it->second->dataBegin();
     291              :         }
     292              : 
     293           23 :         last_backpressure_report_time_ = std::chrono::steady_clock::now();
     294           23 :         last_fragment_header_write_time_ = std::chrono::steady_clock::now();
     295              :         // Increment this as soon as we know we want to use the buffer
     296           23 :         buffer_writes_pending_[buffer]++;
     297              : 
     298           23 :         if (metricMan)
     299              :         {
     300          161 :                 metricMan->sendMetric("Input Fragment Rate", 1, "Fragments/s", 1, MetricMode::Rate);
     301              :         }
     302              : 
     303           46 :         TLOG(TLVL_BUFLCK) << "WriteFragmentHeader: obtaining buffer_mutexes lock for buffer " << buffer << ", seqID=" << frag.sequence_id;
     304              :         ;
     305              : 
     306           23 :         std::unique_lock<std::mutex> lk(buffer_mutexes_.at(buffer));
     307              : 
     308           46 :         TLOG(TLVL_BUFLCK) << "WriteFragmentHeader: obtained buffer_mutexes lock for buffer " << buffer << ", seqID=" << frag.sequence_id;
     309              : 
     310           23 :         auto hdrpos = reinterpret_cast<RawDataType*>(GetWritePos(buffer));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     311           23 :         Write(buffer, &frag, frag.num_words() * sizeof(RawDataType));
     312              : 
     313           23 :         auto pos = reinterpret_cast<RawDataType*>(GetWritePos(buffer));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     314           23 :         if (frag.word_count - frag.num_words() > 0)
     315              :         {
     316           23 :                 auto sts = IncrementWritePos(buffer, (frag.word_count - frag.num_words()) * sizeof(RawDataType));
     317              : 
     318           23 :                 if (!sts)
     319              :                 {
     320            0 :                         reinterpret_cast<detail::RawFragmentHeader*>(hdrpos)->word_count = frag.num_words();       // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     321            0 :                         reinterpret_cast<detail::RawFragmentHeader*>(hdrpos)->type = Fragment::ErrorFragmentType;  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     322            0 :                         TLOG(TLVL_ERROR) << "Dropping over-size fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id << " because there is no room in the current buffer for this Fragment! (Keeping header)";
     323            0 :                         dropped_data_.emplace_back(frag, std::make_unique<Fragment>(frag.word_count - frag.num_words()));
     324            0 :                         auto it = dropped_data_.rbegin();
     325              : 
     326            0 :                         oversize_fragment_count_++;
     327              : 
     328            0 :                         if (maximum_oversize_fragment_count_ > 0 && oversize_fragment_count_ >= maximum_oversize_fragment_count_)
     329              :                         {
     330            0 :                                 lk.unlock();
     331            0 :                                 throw cet::exception("Too many over-size Fragments received! Please adjust max_event_size_bytes or max_fragment_size_bytes!");
     332              :                         }
     333              : 
     334            0 :                         TLOG(TLVL_WRITEFRAGMENTHEADER) << "Dropping over-size fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id
     335            0 :                                                        << " into " << static_cast<void*>(it->second->dataBegin());
     336            0 :                         return it->second->dataBegin();
     337              :                 }
     338              :         }
     339           46 :         TLOG(TLVL_WRITEFRAGMENTHEADER) << "WriteFragmentHeader END, seqID=" << frag.sequence_id;
     340           23 :         return pos;
     341           23 : }
     342              : 
     343           25 : void artdaq::SharedMemoryEventManager::DoneWritingFragment(detail::RawFragmentHeader frag)
     344              : {
     345           50 :         TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment BEGIN";
     346              : 
     347           25 :         auto buffer = getBufferForSequenceID_(frag.sequence_id, false, frag.timestamp);
     348           25 :         if (buffer < 0)
     349              :         {
     350            2 :                 for (auto it = dropped_data_.begin(); it != dropped_data_.end(); ++it)
     351              :                 {
     352            2 :                         if (frag.operator==(it->first))  // TODO, ELF 5/26/2023: Workaround until artdaq_core can be fixed for C++20
     353              :                         {
     354            2 :                                 dropped_data_.erase(it);
     355            2 :                                 return;
     356              :                         }
     357              :                 }
     358            0 :                 if (buffer == -1)
     359              :                 {
     360            0 :                         Detach(true, app_name + "SharedMemoryEventManager",
     361              :                                "getBufferForSequenceID_ returned -1 in DoneWritingFragment. This indicates a possible mismatch between expected Fragment count and the actual number of Fragments received.");
     362              :                 }
     363            0 :                 return;
     364              :         }
     365              : 
     366           23 :         if (!frag.valid)
     367              :         {
     368            0 :                 UpdateFragmentHeader(buffer, frag);
     369              :         }
     370              : 
     371           23 :         statsHelper_.addSample(FRAGMENTS_RECEIVED_STAT_KEY, frag.word_count * sizeof(RawDataType));
     372              :         {
     373           46 :                 TLOG(TLVL_BUFLCK) << "DoneWritingFragment: obtaining buffer_mutexes lock for buffer " << buffer;
     374              : 
     375           23 :                 std::unique_lock<std::mutex> lk(buffer_mutexes_.at(buffer));
     376              : 
     377           46 :                 TLOG(TLVL_BUFLCK) << "DoneWritingFragment: obtained buffer_mutexes lock for buffer " << buffer;
     378              : 
     379           46 :                 TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment: Received Fragment with sequence ID " << frag.sequence_id << " and fragment id " << frag.fragment_id << " (type " << static_cast<int>(frag.type) << ")";
     380           23 :                 auto hdr = getEventHeader_(buffer);
     381           23 :                 if (update_run_ids_)
     382              :                 {
     383           23 :                         hdr->run_id = run_id_;
     384           23 :                         hdr->subrun_id = subrun_id_;
     385              :                 }
     386           23 :                 hdr->subrun_id = GetSubrunForSequenceID(frag.sequence_id);
     387              : 
     388           46 :                 TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment: Updating buffer touch time";
     389           23 :                 TouchBuffer(buffer);
     390              : 
     391           23 :                 if (buffer_writes_pending_[buffer] > 1)
     392              :                 {
     393            6 :                         TLOG(TLVL_DONEWRITINGFRAGMENT) << "Done writing fragment, but there's another writer. Not doing bookkeeping steps.";
     394            3 :                         buffer_writes_pending_[buffer]--;
     395            3 :                         return;
     396              :                 }
     397           40 :                 TLOG(TLVL_DONEWRITINGFRAGMENT) << "Done writing fragment, and no other writer. Doing bookkeeping steps.";
     398           20 :                 auto frag_count = GetFragmentCount(frag.sequence_id);
     399           20 :                 hdr->is_complete = frag_count >= num_fragments_per_event_;
     400              : 
     401           20 :                 if (frag_count > num_fragments_per_event_)
     402              :                 {
     403            3 :                         TLOG(TLVL_WARNING) << "DoneWritingFragment: This Event has more Fragments ( " << frag_count << " ) than specified in configuration ( " << num_fragments_per_event_ << " )!"
     404            2 :                                            << " This is probably due to a misconfiguration and is *not* a reliable mode!";
     405              :                 }
     406              : 
     407           40 :                 TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment: Received Fragment with sequence ID " << frag.sequence_id << " and fragment id " << frag.fragment_id << ", count/expected = " << frag_count << "/" << num_fragments_per_event_;
     408              : #if ART_SUPPORTS_DUPLICATE_EVENTS
     409              :                 if (!hdr->is_complete && released_incomplete_events_.count(frag.sequence_id))
     410              :                 {
     411              :                         hdr->is_complete = frag_count >= released_incomplete_events_[frag.sequence_id] && buffer_writes_pending_[buffer] == 0;
     412              :                 }
     413              : #endif
     414              : 
     415           20 :                 complete_buffer_(buffer);
     416              : 
     417              :                 // Move this down here to avoid race condition
     418           20 :                 buffer_writes_pending_[buffer]--;
     419           23 :         }
     420           20 :         if (requests_)
     421              :         {
     422           20 :                 requests_->SendRequest(true);
     423              :         }
     424           40 :         TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment END";
     425              : }
     426              : 
     427         2101 : size_t artdaq::SharedMemoryEventManager::GetFragmentCount(Fragment::sequence_id_t seqID, Fragment::type_t type)
     428              : {
     429         2101 :         return GetFragmentCountInBuffer(getBufferForSequenceID_(seqID, false), type);
     430              : }
     431              : 
     432         2102 : size_t artdaq::SharedMemoryEventManager::GetFragmentCountInBuffer(int buffer, Fragment::type_t type)
     433              : {
     434         2102 :         if (buffer < 0)
     435              :         {
     436            0 :                 return 0;
     437              :         }
     438         2102 :         ResetReadPos(buffer);
     439         2102 :         IncrementReadPos(buffer, sizeof(detail::RawEventHeader));
     440              : 
     441         2102 :         size_t count = 0;
     442              : 
     443         8318 :         while (MoreDataInBuffer(buffer))
     444              :         {
     445         6216 :                 auto fragHdr = reinterpret_cast<artdaq::detail::RawFragmentHeader*>(GetReadPos(buffer));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     446         6216 :                 IncrementReadPos(buffer, fragHdr->word_count * sizeof(RawDataType));
     447         6216 :                 if (type != Fragment::InvalidFragmentType && fragHdr->type != type)
     448              :                 {
     449              :                         // Skip fragments with the wrong type, as they were over-size and truncated to the header
     450            0 :                         continue;
     451              :                 }
     452        12432 :                 TLOG(TLVL_GETFRAGMENTCOUNT) << "Adding Fragment with size=" << fragHdr->word_count << " to Fragment count";
     453         6216 :                 ++count;
     454              :         }
     455              : 
     456         2102 :         return count;
     457              : }
     458              : 
     459            0 : void artdaq::SharedMemoryEventManager::UpdateFragmentHeader(int buffer, artdaq::detail::RawFragmentHeader hdr)
     460              : {
     461            0 :         if (buffer < 0)
     462              :         {
     463            0 :                 return;
     464              :         }
     465            0 :         ResetReadPos(buffer);
     466            0 :         IncrementReadPos(buffer, sizeof(detail::RawEventHeader));
     467              : 
     468            0 :         while (MoreDataInBuffer(buffer))
     469              :         {
     470            0 :                 auto fragHdr = reinterpret_cast<artdaq::detail::RawFragmentHeader*>(GetReadPos(buffer));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     471            0 :                 if (hdr.fragment_id == fragHdr->fragment_id)
     472              :                 {
     473            0 :                         *fragHdr = hdr;
     474            0 :                         break;
     475              :                 }
     476              :         }
     477              : 
     478            0 :         return;
     479              : }
     480              : 
     481            6 : void artdaq::SharedMemoryEventManager::RunArt(size_t process_index, const std::shared_ptr<std::atomic<pid_t>>& pid_out)
     482              : {
     483              :         do
     484              :         {
     485            6 :                 auto start_time = std::chrono::steady_clock::now();
     486            6 :                 send_init_frags_();
     487           18 :                 TLOG(TLVL_INFO) << "Starting art process with config file " << current_art_config_file_->getFileName();
     488              : 
     489            6 :                 pid_t pid = 0;
     490              : 
     491            6 :                 if (!manual_art_)
     492              :                 {
     493            6 :                         pid = fork();
     494            6 :                         if (pid == 0)
     495              :                         { /* child */
     496              :                                 // 23-May-2018, KAB: added the setting of the partition number env var
     497              :                                 // in the environment of the child art process so that Globals.hh
     498              :                                 // will pick it up there and provide it to the artdaq classes that
     499              :                                 // are used in data transfers, etc. within the art process.
     500            0 :                                 std::string envVarKey = "ARTDAQ_PARTITION_NUMBER";
     501            0 :                                 std::string envVarValue = std::to_string(Globals::GetPartitionNumber());
     502            0 :                                 if (setenv(envVarKey.c_str(), envVarValue.c_str(), 1) != 0)
     503              :                                 {
     504            0 :                                         TLOG(TLVL_ERROR) << "Error setting environment variable \"" << envVarKey
     505            0 :                                                          << "\" in the environment of a child art process. "
     506            0 :                                                          << "This may result in incorrect TCP port number "
     507            0 :                                                          << "assignments or other issues, and data may "
     508            0 :                                                          << "not flow through the system correctly.";
     509              :                                 }
     510            0 :                                 envVarKey = "ARTDAQ_APPLICATION_NAME";
     511            0 :                                 envVarValue = app_name;
     512            0 :                                 if (setenv(envVarKey.c_str(), envVarValue.c_str(), 1) != 0)
     513              :                                 {
     514            0 :                                         TLOG(TLVL_RUNART) << "Error setting environment variable \"" << envVarKey
     515            0 :                                                           << "\" in the environment of a child art process. ";
     516              :                                 }
     517            0 :                                 envVarKey = "ARTDAQ_RANK";
     518            0 :                                 envVarValue = std::to_string(my_rank);
     519            0 :                                 if (setenv(envVarKey.c_str(), envVarValue.c_str(), 1) != 0)
     520              :                                 {
     521            0 :                                         TLOG(TLVL_RUNART) << "Error setting environment variable \"" << envVarKey
     522            0 :                                                           << "\" in the environment of a child art process. ";
     523              :                                 }
     524              : 
     525            0 :                                 TLOG(TLVL_RUNART_2) << "Parsing art command line";
     526            0 :                                 auto args = parse_art_command_line_(current_art_config_file_, process_index);
     527              : 
     528            0 :                                 TLOG(TLVL_RUNART_2) << "Calling execvp with application name " << args[0];
     529            0 :                                 execvp(args[0], &args[0]);
     530              : 
     531            0 :                                 TLOG(TLVL_RUNART_2) << "Application exited, cleaning up";
     532            0 :                                 for (auto& arg : args)
     533              :                                 {
     534            0 :                                         delete[] arg;
     535              :                                 }
     536              : 
     537            0 :                                 exit(1);
     538            0 :                         }
     539              :                 }
     540              :                 else
     541              :                 {
     542              :                         // Using cin/cout here to ensure console is active (artdaqDriver)
     543            0 :                         std::cout << "Please run the following command in a separate terminal:" << std::endl
     544            0 :                                   << "art -c " << current_art_config_file_->getFileName() << std::endl
     545            0 :                                   << "Then, in a third terminal, execute: \"ps aux|grep [a]rt -c " << current_art_config_file_->getFileName() << "\" and note the PID of the art process." << std::endl
     546            0 :                                   << "Finally, return to this window and enter the pid: " << std::endl;
     547            0 :                         std::cin >> pid;
     548              :                 }
     549            6 :                 *pid_out = pid;
     550              : 
     551           18 :                 TLOG(TLVL_INFO) << "PID of new art process is " << pid;
     552              :                 {
     553            6 :                         std::unique_lock<std::mutex> lk(art_process_mutex_);
     554            6 :                         art_processes_.insert(pid);
     555            6 :                 }
     556              :                 siginfo_t status;
     557            6 :                 auto sts = 0;
     558            6 :                 if (!manual_art_)
     559              :                 {
     560            6 :                         sts = waitid(P_PID, pid, &status, WEXITED);
     561              :                 }
     562              :                 else
     563              :                 {
     564            0 :                         while (kill(pid, 0) >= 0) usleep(10000);
     565              : 
     566            0 :                         TLOG(TLVL_INFO) << "Faking good exit status, please see art process for actual exit status!";
     567            0 :                         status.si_code = CLD_EXITED;
     568            0 :                         status.si_status = 0;
     569              :                 }
     570           18 :                 TLOG(TLVL_INFO) << "Removing PID " << pid << " from process list";
     571              :                 {
     572            6 :                         std::unique_lock<std::mutex> lk(art_process_mutex_);
     573            6 :                         art_processes_.erase(pid);
     574            6 :                 }
     575            6 :                 if (sts < 0)
     576              :                 {
     577            0 :                         TLOG(TLVL_WARNING) << "Error occurred in waitid for art process " << pid << ": " << errno << " (" << strerror(errno) << ").";
     578              :                 }
     579            6 :                 else if (status.si_code == CLD_EXITED && status.si_status == 0)
     580              :                 {
     581           12 :                         TLOG(TLVL_INFO) << "art process " << pid << " exited normally, " << (restart_art_ ? "restarting" : "not restarting");
     582            4 :                 }
     583              :                 else
     584              :                 {
     585            2 :                         auto art_lifetime = TimeUtils::GetElapsedTime(start_time);
     586            2 :                         if (art_lifetime < minimum_art_lifetime_s_)
     587              :                         {
     588            2 :                                 restart_art_ = false;
     589              :                         }
     590              : 
     591            2 :                         auto exit_type = "exited with status code";
     592            2 :                         switch (status.si_code)
     593              :                         {
     594            0 :                                 case CLD_DUMPED:
     595              :                                 case CLD_KILLED:
     596            0 :                                         exit_type = "was killed with signal";
     597            0 :                                         break;
     598            2 :                                 case CLD_EXITED:
     599              :                                 default:
     600            2 :                                         break;
     601              :                         }
     602              : 
     603            6 :                         TLOG((restart_art_ ? TLVL_WARNING : TLVL_ERROR))
     604            2 :                             << "art process " << pid << " " << exit_type << " " << status.si_status
     605            2 :                             << (status.si_code == CLD_DUMPED ? " (core dumped)" : "")
     606            2 :                             << " after running for " << std::setprecision(2) << std::fixed << art_lifetime << " seconds, "
     607            4 :                             << (restart_art_ ? "restarting" : "not restarting");
     608              :                 }
     609            6 :         } while (restart_art_);
     610            6 : }
     611              : 
     612           17 : void artdaq::SharedMemoryEventManager::StartArt()
     613              : {
     614           17 :         size_t initialCount = GetAttachedCount();
     615           17 :         restart_art_ = always_restart_art_;
     616           17 :         if (num_art_processes_ == 0)
     617              :         {
     618           11 :                 return;
     619              :         }
     620           12 :         for (size_t ii = 0; ii < num_art_processes_; ++ii)
     621              :         {
     622            6 :                 StartArtProcess(current_art_pset_, ii);
     623              :         }
     624            6 :         auto startTime = std::chrono::steady_clock::now();
     625            6 :         while (GetAttachedCount() - initialCount != num_art_processes_)
     626              :         {
     627            0 :                 TLOG(TLVL_INFO) << "Waiting for all art processes to connect to shared memory, " << TimeUtils::GetElapsedTime(startTime) << " s elapsed.";
     628            0 :                 std::this_thread::sleep_for(std::chrono::seconds(1));
     629              :         }
     630              : }
     631              : 
     632            6 : pid_t artdaq::SharedMemoryEventManager::StartArtProcess(fhicl::ParameterSet pset, size_t process_index)
     633              : {
     634              :         static std::mutex start_art_mutex;
     635            6 :         std::unique_lock<std::mutex> lk(start_art_mutex);
     636              :         // TraceLock lk(start_art_mutex, 15, "StartArtLock");
     637            6 :         restart_art_ = always_restart_art_;
     638            6 :         auto initialCount = GetAttachedCount();
     639            6 :         auto startTime = std::chrono::steady_clock::now();
     640              : 
     641            6 :         if (pset != current_art_pset_ || !current_art_config_file_)
     642              :         {
     643            0 :                 current_art_pset_ = pset;
     644            0 :                 if (manual_art_)
     645            0 :                         current_art_config_file_ = std::make_shared<art_config_file>(pset, GetKey(), GetBroadcastKey());
     646              :                 else
     647            0 :                         current_art_config_file_ = std::make_shared<art_config_file>(pset);
     648              :         }
     649            6 :         std::shared_ptr<std::atomic<pid_t>> pid(new std::atomic<pid_t>(-1));
     650           12 :         boost::thread thread([this, process_index, pid] { RunArt(process_index, pid); });
     651            6 :         thread.detach();
     652              : 
     653            6 :         auto currentCount = GetAttachedCount() - initialCount;
     654           37 :         while ((currentCount < 1 || *pid <= 0) && (TimeUtils::GetElapsedTime(startTime) < 5 || manual_art_))
     655              :         {
     656           31 :                 usleep(10000);
     657           31 :                 currentCount = GetAttachedCount() - initialCount;
     658              :         }
     659            6 :         if ((currentCount < 1 || *pid <= 0) && manual_art_)
     660              :         {
     661            0 :                 TLOG(TLVL_WARNING) << "Manually-started art process has not connected to shared memory or has bad PID: connected:" << currentCount << ", PID:" << pid;
     662            0 :                 return 0;
     663              :         }
     664            6 :         if (currentCount < 1 || *pid <= 0)
     665              :         {
     666            0 :                 TLOG(TLVL_WARNING) << "art process has not started after 5s. Check art configuration!"
     667            0 :                                    << " (pid=" << *pid << ", attachedCount=" << currentCount << ")";
     668            0 :                 return 0;
     669              :         }
     670              : 
     671           18 :         TLOG(TLVL_INFO) << std::setw(4) << std::fixed << "art initialization took "
     672           12 :                         << TimeUtils::GetElapsedTime(startTime) << " seconds.";
     673              : 
     674            6 :         return *pid;
     675            6 : }
     676              : 
     677            4 : void artdaq::SharedMemoryEventManager::ShutdownArtProcesses(std::set<pid_t>& pids)
     678              : {
     679            4 :         restart_art_ = false;
     680              :         // current_art_config_file_ = nullptr;
     681              :         // current_art_pset_ = fhicl::ParameterSet();
     682              : 
     683          349 :         auto check_pids = [&](bool print) {
     684          349 :                 std::unique_lock<std::mutex> lk(art_process_mutex_);
     685          694 :                 for (auto pid = pids.begin(); pid != pids.end();)
     686              :                 {
     687              :                         // 08-May-2018, KAB: protect against killing invalid PIDS
     688              : 
     689          345 :                         if (*pid <= 0)
     690              :                         {
     691            0 :                                 TLOG(TLVL_WARNING) << "Removing an invalid PID (" << *pid
     692            0 :                                                    << ") from the shutdown list.";
     693            0 :                                 pid = pids.erase(pid);
     694              :                         }
     695          345 :                         else if (kill(*pid, 0) < 0)
     696              :                         {
     697            0 :                                 pid = pids.erase(pid);
     698              :                         }
     699              :                         else
     700              :                         {
     701          345 :                                 if (print)
     702              :                                 {
     703            0 :                                         std::cout << *pid << " ";
     704              :                                 }
     705          345 :                                 ++pid;
     706              :                         }
     707              :                 }
     708          349 :         };
     709          349 :         auto count_pids = [&]() {
     710          349 :                 std::unique_lock<std::mutex> lk(art_process_mutex_);
     711          698 :                 return pids.size();
     712          349 :         };
     713            4 :         check_pids(false);
     714            4 :         if (count_pids() == 0)
     715              :         {
     716            0 :                 TLOG(TLVL_SHUTDOWN) << "All art processes already exited, nothing to do.";
     717            0 :                 usleep(1000);
     718            4 :                 return;
     719              :         }
     720              : 
     721            4 :         if (!manual_art_)
     722              :         {
     723            4 :                 int graceful_wait_ms = art_event_processing_time_us_ * size() * 10 / 1000;
     724            4 :                 int gentle_wait_ms = art_event_processing_time_us_ * size() * 2 / 1000;
     725            4 :                 int int_wait_ms = art_event_processing_time_us_ * size() / 1000;
     726            4 :                 auto shutdown_start = std::chrono::steady_clock::now();
     727              : 
     728              :                 //              if (!overwrite_mode_)
     729              :                 {
     730            8 :                         TLOG(TLVL_SHUTDOWN) << "Waiting up to " << graceful_wait_ms << " ms for all art processes to exit gracefully";
     731          345 :                         for (int ii = 0; ii < graceful_wait_ms; ++ii)
     732              :                         {
     733          345 :                                 usleep(1000);
     734              : 
     735          345 :                                 check_pids(false);
     736          345 :                                 if (count_pids() == 0)
     737              :                                 {
     738           12 :                                         TLOG(TLVL_INFO) << "All art processes exited after " << TimeUtils::GetElapsedTimeMilliseconds(shutdown_start) << " ms.";
     739            4 :                                         return;
     740              :                                 }
     741              :                         }
     742              :                 }
     743              : 
     744              :                 {
     745            0 :                         TLOG(TLVL_SHUTDOWN) << "Gently informing art processes that it is time to shut down";
     746            0 :                         std::unique_lock<std::mutex> lk(art_process_mutex_);
     747            0 :                         for (auto pid : pids)
     748              :                         {
     749            0 :                                 TLOG(TLVL_SHUTDOWN) << "Sending SIGQUIT to pid " << pid;
     750            0 :                                 kill(pid, SIGQUIT);
     751              :                         }
     752            0 :                 }
     753              : 
     754            0 :                 TLOG(TLVL_SHUTDOWN) << "Waiting up to " << gentle_wait_ms << " ms for all art processes to exit from SIGQUIT";
     755            0 :                 for (int ii = 0; ii < gentle_wait_ms; ++ii)
     756              :                 {
     757            0 :                         usleep(1000);
     758              : 
     759            0 :                         check_pids(false);
     760            0 :                         if (count_pids() == 0)
     761              :                         {
     762            0 :                                 TLOG(TLVL_INFO) << "All art processes exited after " << TimeUtils::GetElapsedTimeMilliseconds(shutdown_start) << " ms (SIGQUIT).";
     763            0 :                                 return;
     764              :                         }
     765              :                 }
     766              : 
     767              :                 {
     768            0 :                         TLOG(TLVL_SHUTDOWN) << "Insisting that the art processes shut down";
     769            0 :                         std::unique_lock<std::mutex> lk(art_process_mutex_);
     770            0 :                         for (auto pid : pids)
     771              :                         {
     772            0 :                                 kill(pid, SIGINT);
     773              :                         }
     774            0 :                 }
     775              : 
     776            0 :                 TLOG(TLVL_SHUTDOWN) << "Waiting up to " << int_wait_ms << " ms for all art processes to exit from SIGINT";
     777            0 :                 for (int ii = 0; ii < int_wait_ms; ++ii)
     778              :                 {
     779            0 :                         usleep(1000);
     780              : 
     781            0 :                         check_pids(false);
     782              : 
     783            0 :                         if (count_pids() == 0)
     784              :                         {
     785            0 :                                 TLOG(TLVL_INFO) << "All art processes exited after " << TimeUtils::GetElapsedTimeMilliseconds(shutdown_start) << " ms (SIGINT).";
     786            0 :                                 return;
     787              :                         }
     788              :                 }
     789              : 
     790            0 :                 TLOG(TLVL_SHUTDOWN) << "Killing remaning art processes with extreme prejudice";
     791            0 :                 while (count_pids() > 0)
     792              :                 {
     793              :                         {
     794            0 :                                 std::unique_lock<std::mutex> lk(art_process_mutex_);
     795            0 :                                 kill(*pids.begin(), SIGKILL);
     796            0 :                                 usleep(1000);
     797            0 :                         }
     798            0 :                         check_pids(false);
     799              :                 }
     800            0 :                 TLOG(TLVL_INFO) << "All art processes exited after " << TimeUtils::GetElapsedTimeMilliseconds(shutdown_start) << " ms (SIGKILL).";
     801              :         }
     802              :         else
     803              :         {
     804            0 :                 std::cout << "Please shut down all art processes, then hit return/enter" << std::endl;
     805            0 :                 while (count_pids() > 0)
     806              :                 {
     807            0 :                         std::cout << "The following PIDs are running: ";
     808            0 :                         check_pids(true);
     809            0 :                         std::cout << std::endl;
     810            0 :                         usleep(500000);
     811              :                 }
     812              :         }
     813              : }
     814              : 
     815            1 : void artdaq::SharedMemoryEventManager::ReconfigureArt(fhicl::ParameterSet art_pset, run_id_t newRun, int n_art_processes)
     816              : {
     817            2 :         TLOG(TLVL_RECONFIGUREART) << "ReconfigureArt BEGIN";
     818            1 :         if (restart_art_ || !always_restart_art_)  // Art is running
     819              :         {
     820            0 :                 endOfData();
     821              :         }
     822           11 :         for (size_t ii = 0; ii < broadcasts_.size(); ++ii)
     823              :         {
     824           10 :                 broadcasts_.MarkBufferEmpty(ii, true);
     825              :         }
     826            1 :         if (newRun == 0)
     827              :         {
     828            1 :                 newRun = run_id_ + 1;
     829              :         }
     830              : 
     831            1 :         if (art_pset != current_art_pset_ || !current_art_config_file_)
     832              :         {
     833            1 :                 current_art_pset_ = art_pset;
     834            1 :                 if (manual_art_)
     835            0 :                         current_art_config_file_ = std::make_shared<art_config_file>(art_pset, GetKey(), GetBroadcastKey());
     836              :                 else
     837            1 :                         current_art_config_file_ = std::make_shared<art_config_file>(art_pset);
     838              :         }
     839              : 
     840            1 :         if (n_art_processes != -1)
     841              :         {
     842            0 :                 TLOG(TLVL_INFO) << "Setting number of art processes to " << n_art_processes;
     843            0 :                 num_art_processes_ = n_art_processes;
     844              :         }
     845            1 :         startRun(newRun);
     846            2 :         TLOG(TLVL_RECONFIGUREART) << "ReconfigureArt END";
     847            1 : }
     848              : 
     849           16 : bool artdaq::SharedMemoryEventManager::endOfData()
     850              : {
     851           16 :         running_ = false;
     852              :         {
     853           16 :                 std::lock_guard<std::mutex> lk(init_fragments_mutex_);
     854           16 :                 init_fragment_map_.clear();
     855           16 :         }
     856           16 :         init_frags_sent_ = false;
     857           32 :         TLOG(TLVL_ENDOFDATA) << "SharedMemoryEventManager::endOfData";
     858           16 :         restart_art_ = false;
     859              : 
     860           16 :         auto start = std::chrono::steady_clock::now();
     861          212 :         auto pendingWriteCount = std::accumulate(buffer_writes_pending_.begin(), buffer_writes_pending_.end(), 0, [](int a, auto& b) { return a + b.second.load(); });
     862           32 :         TLOG(TLVL_ENDOFDATA) << "endOfData: Waiting for " << pendingWriteCount << " pending writes to complete";
     863           26 :         while (pendingWriteCount > 0 && TimeUtils::GetElapsedTimeMicroseconds(start) < 1000000)
     864              :         {
     865           10 :                 usleep(10000);
     866           50 :                 pendingWriteCount = std::accumulate(buffer_writes_pending_.begin(), buffer_writes_pending_.end(), 0, [](int a, auto& b) { return a + b.second.load(); });
     867              :         }
     868              : 
     869           16 :         size_t initialStoreSize = GetOpenEventCount();
     870           32 :         TLOG(TLVL_ENDOFDATA) << "endOfData: Flushing " << initialStoreSize
     871           16 :                              << " stale events from the SharedMemoryEventManager.";
     872           16 :         int counter = initialStoreSize;
     873           20 :         while (!active_buffers_.empty() && counter > 0)
     874              :         {
     875            4 :                 complete_buffer_(*active_buffers_.begin());
     876            4 :                 counter--;
     877              :         }
     878           32 :         TLOG(TLVL_ENDOFDATA) << "endOfData: Done flushing, there are now " << GetOpenEventCount()
     879           16 :                              << " stale events in the SharedMemoryEventManager.";
     880              : 
     881           32 :         TLOG(TLVL_ENDOFDATA) << "Waiting for " << (ReadReadyCount() + (size() - WriteReadyCount(overwrite_mode_))) << " outstanding buffers...";
     882           16 :         start = std::chrono::steady_clock::now();
     883           16 :         auto lastReadCount = ReadReadyCount() + (size() - WriteReadyCount(overwrite_mode_));
     884           16 :         auto end_of_data_wait_us = art_event_processing_time_us_ * (lastReadCount > 0 ? lastReadCount : 1);  // size();
     885              : 
     886           16 :         auto outstanding_buffer_wait_time = art_event_processing_time_us_ > 100000 ? 100000 : art_event_processing_time_us_;
     887              : 
     888              :         // We will wait until no buffer has been read for the end of data wait seconds, or no art processes are left.
     889           30 :         while (lastReadCount > 0 && (end_of_data_wait_us == 0 || TimeUtils::GetElapsedTimeMicroseconds(start) < end_of_data_wait_us) && get_art_process_count_() > 0)
     890              :         {
     891           14 :                 auto temp = ReadReadyCount() + (size() - WriteReadyCount(overwrite_mode_));
     892           14 :                 if (temp != lastReadCount)
     893              :                 {
     894            8 :                         TLOG(TLVL_ENDOFDATA_2) << "Waiting for " << temp << " outstanding buffers...";
     895            4 :                         lastReadCount = temp;
     896            4 :                         start = std::chrono::steady_clock::now();
     897              :                 }
     898           14 :                 if (lastReadCount > 0)
     899              :                 {
     900           20 :                         TLOG(TLVL_ENDOFDATA_2) << "About to sleep " << outstanding_buffer_wait_time << " us - lastReadCount=" << lastReadCount << " size=" << size() << " end_of_data_wait_us=" << end_of_data_wait_us;
     901           10 :                         usleep(outstanding_buffer_wait_time);
     902              :                 }
     903              :         }
     904              : 
     905           32 :         TLOG(TLVL_ENDOFDATA) << "endOfData: After wait for outstanding buffers. Still outstanding: " << lastReadCount << ", time waited: "
     906           16 :                              << TimeUtils::GetElapsedTime(start) << " s / " << (end_of_data_wait_us / 1000000.0) << " s, art process count: " << get_art_process_count_();
     907              : 
     908           32 :         TLOG(TLVL_ENDOFDATA) << "endOfData: Broadcasting EndOfData Fragment";
     909           16 :         FragmentPtrs broadcast;
     910           16 :         broadcast.emplace_back(Fragment::eodFrag(GetBufferCount()));
     911           16 :         bool success = broadcastFragments_(broadcast);
     912           16 :         if (!success)
     913              :         {
     914            0 :                 TLOG(TLVL_ENDOFDATA) << "endOfData: Clearing buffers to make room for EndOfData Fragment";
     915            0 :                 for (size_t ii = 0; ii < broadcasts_.size(); ++ii)
     916              :                 {
     917            0 :                         broadcasts_.MarkBufferEmpty(ii, true);
     918              :                 }
     919            0 :                 broadcastFragments_(broadcast);
     920              :         }
     921           16 :         auto endOfDataProcessingStart = std::chrono::steady_clock::now();
     922           20 :         while (get_art_process_count_() > 0)
     923              :         {
     924            8 :                 TLOG(TLVL_ENDOFDATA) << "There are " << get_art_process_count_() << " art processes remaining. Proceeding to shutdown.";
     925              : 
     926            4 :                 ShutdownArtProcesses(art_processes_);
     927              :         }
     928           32 :         TLOG(TLVL_ENDOFDATA) << "It took " << TimeUtils::GetElapsedTime(endOfDataProcessingStart) << " s for all art processes to close after sending EndOfData Fragment";
     929              : 
     930           16 :         ResetAttachedCount();
     931              : 
     932           32 :         TLOG(TLVL_ENDOFDATA) << "endOfData: Clearing buffers";
     933          114 :         for (size_t ii = 0; ii < size(); ++ii)
     934              :         {
     935           98 :                 MarkBufferEmpty(ii, true);
     936              :         }
     937              : 
     938           16 :         released_events_.clear();
     939           16 :         released_incomplete_events_.clear();
     940              : 
     941           32 :         TLOG(TLVL_ENDOFDATA) << "endOfData END";
     942           48 :         TLOG(TLVL_INFO) << "EndOfData Complete. There were " << GetLastSeenBufferID() << " buffers processed.";
     943           16 :         return true;
     944           16 : }
     945              : 
     946           17 : void artdaq::SharedMemoryEventManager::startRun(run_id_t runID)
     947              : {
     948           17 :         running_ = true;
     949              :         {
     950           17 :                 std::lock_guard<std::mutex> lk(init_fragments_mutex_);
     951           17 :                 init_fragment_map_.clear();
     952           17 :         }
     953           17 :         init_frags_sent_ = false;
     954           17 :         statsHelper_.resetStatistics();
     955           34 :         TLOG(TLVL_STARTRUN) << "startRun: Clearing broadcast buffers";
     956          187 :         for (size_t ii = 0; ii < broadcasts_.size(); ++ii)
     957              :         {
     958          170 :                 broadcasts_.MarkBufferEmpty(ii, true);
     959              :         }
     960           17 :         released_events_.clear();
     961           17 :         released_incomplete_events_.clear();
     962           17 :         StartArt();
     963           17 :         run_id_ = runID;
     964           17 :         subrun_id_ = 1;
     965              :         {
     966           17 :                 std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
     967           17 :                 subrun_event_map_.clear();
     968           17 :                 subrun_event_map_[0] = 1;
     969           17 :         }
     970           17 :         run_event_count_ = 0;
     971           17 :         run_incomplete_event_count_ = 0;
     972           17 :         requests_ = std::make_unique<RequestSender>(data_pset_);
     973           17 :         if (requests_)
     974              :         {
     975           17 :                 requests_->SetRunNumber(static_cast<uint32_t>(run_id_));
     976              :         }
     977           51 :         if (data_pset_.has_key("routing_token_config"))
     978              :         {
     979            0 :                 auto rmPset = data_pset_.get<fhicl::ParameterSet>("routing_token_config");
     980            0 :                 if (rmPset.get<bool>("use_routing_manager", false))
     981              :                 {
     982            0 :                         tokens_ = std::make_unique<TokenSender>(rmPset);
     983            0 :                         tokens_->SetRunNumber(static_cast<uint32_t>(run_id_));
     984            0 :                         tokens_->SendRoutingToken(queue_size_, run_id_);
     985              :                 }
     986            0 :         }
     987           34 :         TLOG(TLVL_STARTRUN) << "Starting run " << run_id_
     988            0 :                             << ", max queue size = "
     989            0 :                             << queue_size_
     990            0 :                             << ", queue size = "
     991           17 :                             << GetLockedBufferCount();
     992           17 :         if (metricMan)
     993              :         {
     994          119 :                 metricMan->sendMetric("Run Number", static_cast<uint64_t>(run_id_), "Run", 1, MetricMode::LastPoint | MetricMode::Persist);
     995              :         }
     996           17 : }
     997              : 
     998            1 : bool artdaq::SharedMemoryEventManager::endRun()
     999              : {
    1000            3 :         TLOG(TLVL_INFO) << "Ending run " << run_id_;
    1001            2 :         TLOG(TLVL_ENDRUN) << "Shutting down RequestSender";
    1002            1 :         requests_.reset(nullptr);
    1003            2 :         TLOG(TLVL_ENDRUN) << "Shutting down TokenSender";
    1004            1 :         tokens_.reset(nullptr);
    1005              : 
    1006            3 :         TLOG(TLVL_INFO) << "Run " << run_id_ << " has ended. There were " << run_event_count_ << " events in this run.";
    1007            1 :         run_event_count_ = 0;
    1008            1 :         run_incomplete_event_count_ = 0;
    1009            1 :         oversize_fragment_count_ = 0;
    1010              :         {
    1011            1 :                 std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
    1012            1 :                 subrun_event_map_.clear();
    1013            1 :                 subrun_event_map_[0] = 1;
    1014            1 :         }
    1015            1 :         return true;
    1016              : }
    1017              : 
    1018            5 : void artdaq::SharedMemoryEventManager::rolloverSubrun(sequence_id_t boundary, subrun_id_t subrun, bool sendFragment)
    1019              : {
    1020              :         // Generated EndOfSubrun Fragments have Sequence ID 0 and should be ignored
    1021            5 :         if (boundary == 0 || boundary == Fragment::InvalidSequenceID)
    1022              :         {
    1023            2 :                 return;
    1024              :         }
    1025              : 
    1026            5 :         std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
    1027              : 
    1028              :         // Don't re-rollover to an already-defined subrun
    1029            5 :         if (!subrun_event_map_.empty() && subrun_event_map_.rbegin()->second >= subrun)
    1030              :         {
    1031            2 :                 return;
    1032              :         }
    1033            9 :         TLOG(TLVL_INFO) << "Will roll over to subrun " << subrun << " when I reach Sequence ID " << (boundary + 1);
    1034            3 :         subrun_event_map_[boundary + 1] = subrun;
    1035            3 :         while (subrun_event_map_.size() > max_subrun_event_map_length_)
    1036              :         {
    1037            0 :                 subrun_event_map_.erase(subrun_event_map_.begin());
    1038              :         }
    1039              : 
    1040            3 :         if (sendFragment)
    1041              :         {
    1042            0 :                 auto endOfSubrunFrag = artdaq::MetadataFragment::CreateEndOfSubrunFragment(my_rank, boundary, subrun, 0);
    1043            0 :                 BroadcastFragment(endOfSubrunFrag);
    1044            0 :         }
    1045            5 : }
    1046              : 
    1047            2 : void artdaq::SharedMemoryEventManager::rolloverSubrun(bool sendFragment)
    1048              : {
    1049            2 :         Fragment::sequence_id_t seqID = 0;
    1050            2 :         subrun_id_t subrun = 0;
    1051              :         {
    1052            2 :                 std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
    1053            5 :                 for (auto& it : subrun_event_map_)
    1054              :                 {
    1055            3 :                         if (it.first >= seqID)
    1056              :                         {
    1057            3 :                                 seqID = it.first + 1;
    1058              :                         }
    1059            3 :                         if (it.second >= subrun)
    1060              :                         {
    1061            3 :                                 subrun = it.second + 1;
    1062              :                         }
    1063              :                 }
    1064            2 :         }
    1065            2 :         rolloverSubrun(seqID, subrun, sendFragment);
    1066            2 : }
    1067              : 
    1068            0 : void artdaq::SharedMemoryEventManager::sendMetrics()
    1069              : {
    1070            0 :         if (metricMan)
    1071              :         {
    1072            0 :                 metricMan->sendMetric("Open Event Count", GetOpenEventCount(), "events", 1, MetricMode::LastPoint);
    1073            0 :                 metricMan->sendMetric("Pending Event Count", GetPendingEventCount(), "events", 1, MetricMode::LastPoint);
    1074              :         }
    1075              : 
    1076            0 :         if (open_event_report_interval_ms_ > 0 && GetLockedBufferCount() != 0u)
    1077              :         {
    1078            0 :                 if (TimeUtils::GetElapsedTimeMilliseconds(last_open_event_report_time_) < static_cast<size_t>(open_event_report_interval_ms_))
    1079              :                 {
    1080            0 :                         return;
    1081              :                 }
    1082              : 
    1083            0 :                 last_open_event_report_time_ = std::chrono::steady_clock::now();
    1084            0 :                 std::ostringstream oss;
    1085            0 :                 oss << "Open Events (expecting " << num_fragments_per_event_ << " Fragments): ";
    1086            0 :                 for (auto& ev : active_buffers_)
    1087              :                 {
    1088            0 :                         auto hdr = getEventHeader_(ev);
    1089            0 :                         oss << hdr->sequence_id << " (has " << GetFragmentCount(hdr->sequence_id) << " Fragments), ";
    1090              :                 }
    1091            0 :                 TLOG(TLVL_SENDMETRICS) << oss.str();
    1092            0 :         }
    1093              : }
    1094              : 
    1095         8862 : artdaq::detail::RawEventHeader* artdaq::SharedMemoryEventManager::getEventHeader_(int buffer)
    1096              : {
    1097         8862 :         return reinterpret_cast<detail::RawEventHeader*>(GetBufferStart(buffer));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
    1098              : }
    1099              : 
    1100         2565 : artdaq::SharedMemoryEventManager::subrun_id_t artdaq::SharedMemoryEventManager::GetSubrunForSequenceID(Fragment::sequence_id_t seqID)
    1101              : {
    1102         2565 :         subrun_id_t subrun = 1;
    1103         2565 :         if (init_fragment_count_ > 0)
    1104              :         {
    1105            6 :                 TLOG(TLVL_GETSUBRUN_2) << "init_fragment_count > 0 (processing art events): Decoding subrun from sequenceID " << seqID;
    1106            3 :                 subrun = seqID >> 32;
    1107              :         }
    1108              :         else
    1109              :         {
    1110         2562 :                 std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
    1111              : 
    1112         5124 :                 TLOG(TLVL_GETSUBRUN_2) << "GetSubrunForSequenceID BEGIN map size = " << subrun_event_map_.size();
    1113         2562 :                 auto it = subrun_event_map_.begin();
    1114              : 
    1115         5127 :                 while (it->first <= seqID && it != subrun_event_map_.end())
    1116              :                 {
    1117         5130 :                         TLOG(TLVL_GETSUBRUN_2) << "Map has sequence ID " << it->first << ", subrun " << it->second << " (looking for <= " << seqID << ")";
    1118         2565 :                         subrun = it->second;
    1119         2565 :                         ++it;
    1120              :                 }
    1121         2562 :         }
    1122              : 
    1123         5130 :         TLOG(TLVL_GETSUBRUN) << "GetSubrunForSequenceID returning subrun " << subrun << " for sequence ID " << seqID;
    1124         2565 :         return subrun;
    1125              : }
    1126              : 
    1127         4909 : int artdaq::SharedMemoryEventManager::getBufferForSequenceID_(Fragment::sequence_id_t seqID, bool create_new, Fragment::timestamp_t timestamp)
    1128              : {
    1129         9818 :         TLOG(TLVL_GETBUFFER) << "getBufferForSequenceID " << seqID << " BEGIN";
    1130         4909 :         std::unique_lock<std::mutex> lk(sequence_id_mutex_);
    1131              : 
    1132         9818 :         TLOG(TLVL_GETBUFFER) << "getBufferForSequenceID obtained sequence_id_mutex for seqid=" << seqID;
    1133              : 
    1134         4909 :         auto buffers = GetBuffersOwnedByManager();
    1135         4924 :         for (auto& buf : buffers)
    1136              :         {
    1137         3790 :                 auto hdr = getEventHeader_(buf);
    1138         3790 :                 if (hdr->sequence_id == seqID)
    1139              :                 {
    1140         7550 :                         TLOG(TLVL_GETBUFFER) << "getBufferForSequenceID " << seqID << " returning " << buf;
    1141         3775 :                         return buf;
    1142              :                 }
    1143              :         }
    1144              : 
    1145              : #if !ART_SUPPORTS_DUPLICATE_EVENTS
    1146         1134 :         if (released_incomplete_events_.count(seqID) != 0u)
    1147              :         {
    1148            6 :                 TLOG(TLVL_ERROR) << "Event " << seqID << " has already been marked \"Incomplete\" and sent to art!";
    1149            2 :                 return -2;
    1150              :         }
    1151         1132 :         if (released_events_.count(seqID) != 0u)
    1152              :         {
    1153            6 :                 TLOG(TLVL_ERROR) << "Event " << seqID << " has already been completed and released to art! Check configuration for inconsistent Fragment count per event!";
    1154            2 :                 return -2;
    1155              :         }
    1156              : #endif
    1157              : 
    1158         1130 :         if (!create_new)
    1159              :         {
    1160            0 :                 return -1;
    1161              :         }
    1162              : 
    1163         1130 :         check_pending_buffers_(lk);
    1164         1130 :         int new_buffer = GetBufferForWriting(false);
    1165              : 
    1166         1130 :         if (new_buffer == -1)
    1167              :         {
    1168          687 :                 new_buffer = GetBufferForWriting(overwrite_mode_);
    1169              :         }
    1170              : 
    1171         1130 :         if (new_buffer == -1)
    1172              :         {
    1173          687 :                 return -1;
    1174              :         }
    1175          886 :         TLOG(TLVL_BUFLCK) << "getBufferForSequenceID_: obtaining buffer_mutexes lock for buffer " << new_buffer;
    1176          443 :         std::unique_lock<std::mutex> buffer_lk(buffer_mutexes_.at(new_buffer));
    1177          886 :         TLOG(TLVL_BUFLCK) << "getBufferForSequenceID_: obtained buffer_mutexes lock for buffer " << new_buffer;
    1178              : 
    1179          443 :         event_timing_[new_buffer] = std::chrono::steady_clock::now();
    1180              : 
    1181          443 :         auto hdr = getEventHeader_(new_buffer);
    1182          443 :         hdr->is_complete = false;
    1183          443 :         hdr->run_id = run_id_;
    1184          443 :         hdr->subrun_id = GetSubrunForSequenceID(seqID);
    1185          443 :         hdr->event_id = use_sequence_id_for_event_number_ ? static_cast<uint32_t>(seqID) : static_cast<uint32_t>(timestamp);
    1186          443 :         hdr->sequence_id = seqID;
    1187          443 :         hdr->timestamp = timestamp;
    1188          443 :         buffer_writes_pending_[new_buffer] = 0;
    1189          443 :         IncrementWritePos(new_buffer, sizeof(detail::RawEventHeader));
    1190          443 :         Globals::SetMFIteration("Sequence ID " + std::to_string(seqID));
    1191              : 
    1192          886 :         TLOG(TLVL_BUFFER) << "getBufferForSequenceID placing " << new_buffer << " to active.";
    1193          443 :         active_buffers_.insert(new_buffer);
    1194          886 :         TLOG(TLVL_BUFFER) << "Buffer occupancy now (total,full,reading,empty,pending,active)=("
    1195            0 :                           << size() << ","
    1196            0 :                           << ReadReadyCount() << ","
    1197            0 :                           << WriteReadyCount(true) - WriteReadyCount(false) - ReadReadyCount() << ","
    1198            0 :                           << WriteReadyCount(false) << ","
    1199            0 :                           << pending_buffers_.size() << ","
    1200          443 :                           << active_buffers_.size() << ")";
    1201              : 
    1202          443 :         if (requests_)
    1203              :         {
    1204          443 :                 requests_->AddRequest(seqID, timestamp);
    1205              :         }
    1206          886 :         TLOG(TLVL_GETBUFFER) << "getBufferForSequenceID " << seqID << " returning newly initialized buffer " << new_buffer;
    1207          443 :         return new_buffer;
    1208         4909 : }
    1209              : 
    1210            0 : bool artdaq::SharedMemoryEventManager::hasFragments_(int buffer)
    1211              : {
    1212            0 :         if (buffer == -1)
    1213              :         {
    1214            0 :                 return true;
    1215              :         }
    1216            0 :         if (!CheckBuffer(buffer, BufferSemaphoreFlags::Writing))
    1217              :         {
    1218            0 :                 return true;
    1219              :         }
    1220            0 :         ResetReadPos(buffer);
    1221            0 :         IncrementReadPos(buffer, sizeof(detail::RawEventHeader));
    1222            0 :         return MoreDataInBuffer(buffer);
    1223              : }
    1224              : 
    1225         2095 : void artdaq::SharedMemoryEventManager::complete_buffer_(int buffer)
    1226              : {
    1227         2095 :         auto hdr = getEventHeader_(buffer);
    1228         2095 :         if (hdr != nullptr && hdr->is_complete)
    1229              :         {
    1230          876 :                 TLOG(TLVL_COMPLETEBUFFER) << "complete_buffer_: This fragment completes event " << hdr->sequence_id << ".";
    1231              : 
    1232              :                 {
    1233          876 :                         TLOG(TLVL_BUFFER) << "complete_buffer_ moving " << buffer << " from active to pending.";
    1234              : 
    1235          876 :                         TLOG(TLVL_BUFLCK) << "complete_buffer_: obtaining sequence_id_mutex lock for seqid=" << hdr->sequence_id;
    1236          438 :                         std::unique_lock<std::mutex> lk(sequence_id_mutex_);
    1237          876 :                         TLOG(TLVL_BUFLCK) << "complete_buffer_: obtained sequence_id_mutex lock for seqid=" << hdr->sequence_id;
    1238          438 :                         active_buffers_.erase(buffer);
    1239          438 :                         pending_buffers_.insert(buffer);
    1240          438 :                         released_events_.insert(hdr->sequence_id);
    1241          538 :                         while (released_events_.size() > max_event_list_length_)
    1242              :                         {
    1243          100 :                                 released_events_.erase(released_events_.begin());
    1244              :                         }
    1245              : 
    1246          876 :                         TLOG(TLVL_BUFFER) << "Buffer occupancy now (total,full,reading,empty,pending,active)=("
    1247            0 :                                           << size() << ","
    1248            0 :                                           << ReadReadyCount() << ","
    1249            0 :                                           << WriteReadyCount(true) - WriteReadyCount(false) - ReadReadyCount() << ","
    1250            0 :                                           << WriteReadyCount(false) << ","
    1251            0 :                                           << pending_buffers_.size() << ","
    1252          438 :                                           << active_buffers_.size() << ")";
    1253          438 :                         check_pending_buffers_(lk);
    1254          438 :                 }
    1255          438 :                 if (requests_)
    1256              :                 {
    1257          438 :                         requests_->RemoveRequest(hdr->sequence_id);
    1258              :                 }
    1259              :         }
    1260         2095 :         check_pending_broadcasts_();
    1261         2095 : }
    1262              : 
    1263            0 : bool artdaq::SharedMemoryEventManager::bufferComparator(int bufA, int bufB)
    1264              : {
    1265            0 :         return getEventHeader_(bufA) < getEventHeader_(bufB);
    1266              : }
    1267              : 
    1268           10 : void artdaq::SharedMemoryEventManager::CheckPendingBuffers()
    1269              : {
    1270              :         {
    1271           20 :                 TLOG(TLVL_BUFLCK) << "Obtaining sequence_id_mutex_";
    1272           10 :                 std::unique_lock<std::mutex> lk(sequence_id_mutex_);
    1273           20 :                 TLOG(TLVL_BUFLCK) << "Obtained sequence_id_mutex_";
    1274              : 
    1275           10 :                 check_pending_buffers_(lk);
    1276           10 :         }
    1277           10 :         check_pending_broadcasts_();
    1278           10 : }
    1279              : 
    1280         1578 : void artdaq::SharedMemoryEventManager::check_pending_buffers_(std::unique_lock<std::mutex> const& lock)
    1281              : {
    1282         3156 :         TLOG(TLVL_CHECKPENDINGBUFFERS) << "check_pending_buffers_ BEGIN Locked=" << std::boolalpha << lock.owns_lock();
    1283              : 
    1284         1578 :         auto buffers = GetBuffersOwnedByManager();
    1285         2031 :         for (auto buf : buffers)
    1286              :         {
    1287          453 :                 if (ResetBuffer(buf) && (pending_buffers_.count(buf) == 0u))
    1288              :                 {
    1289            2 :                         TLOG(TLVL_CHECKPENDINGBUFFERS) << "check_pending_buffers_ Incomplete buffer detected, buf=" << buf << " active_bufers_.count(buf)=" << active_buffers_.count(buf) << " buffer_writes_pending_[buf]=" << buffer_writes_pending_[buf].load();
    1290            1 :                         auto hdr = getEventHeader_(buf);
    1291            2 :                         if ((active_buffers_.count(buf) != 0u) && buffer_writes_pending_[buf].load() == 0)
    1292              :                         {
    1293            1 :                                 if (requests_)
    1294              :                                 {
    1295            1 :                                         requests_->RemoveRequest(hdr->sequence_id);
    1296              :                                 }
    1297            2 :                                 TLOG(TLVL_BUFFER) << "check_pending_buffers_ moving buffer " << buf << " from active to pending";
    1298            1 :                                 active_buffers_.erase(buf);
    1299            1 :                                 pending_buffers_.insert(buf);
    1300            2 :                                 TLOG(TLVL_BUFFER) << "Buffer occupancy now (total,full,reading,empty,pending,active)=("
    1301            0 :                                                   << size() << ","
    1302            0 :                                                   << ReadReadyCount() << ","
    1303            0 :                                                   << WriteReadyCount(true) - WriteReadyCount(false) - ReadReadyCount() << ","
    1304            0 :                                                   << WriteReadyCount(false) << ","
    1305            0 :                                                   << pending_buffers_.size() << ","
    1306            1 :                                                   << active_buffers_.size() << ")";
    1307              : 
    1308            1 :                                 run_incomplete_event_count_++;
    1309            1 :                                 if (metricMan)
    1310              :                                 {
    1311            7 :                                         metricMan->sendMetric("Incomplete Event Rate", 1, "events/s", 3, MetricMode::Rate);
    1312              :                                 }
    1313            1 :                                 if (released_incomplete_events_.count(hdr->sequence_id) == 0u)
    1314              :                                 {
    1315            1 :                                         released_incomplete_events_[hdr->sequence_id] = num_fragments_per_event_ - GetFragmentCountInBuffer(buf);
    1316              :                                 }
    1317              :                                 else
    1318              :                                 {
    1319            0 :                                         released_incomplete_events_[hdr->sequence_id] -= GetFragmentCountInBuffer(buf);
    1320              :                                 }
    1321              : 
    1322            3 :                                 TLOG(TLVL_WARNING) << "Event " << hdr->sequence_id
    1323            2 :                                                    << " was opened " << TimeUtils::GetElapsedTime(event_timing_[buf]) << " s ago"
    1324            1 :                                                    << " and has timed out (missing " << released_incomplete_events_[hdr->sequence_id] << " Fragments)."
    1325            2 :                                                    << "Scheduling release to art.";
    1326              :                         }
    1327              :                 }
    1328              :         }
    1329              : 
    1330         1578 :         std::list<int> sorted_buffers(pending_buffers_.begin(), pending_buffers_.end());
    1331         1578 :         sorted_buffers.sort([this](int a, int b) { return bufferComparator(a, b); });
    1332              : 
    1333         1578 :         auto available_buffers = WriteReadyCount(overwrite_mode_);
    1334         1578 :         auto counter = 0;
    1335         1578 :         double eventSize = 0;
    1336         1578 :         double eventTime = 0;
    1337         2017 :         for (auto buf : sorted_buffers)
    1338              :         {
    1339          439 :                 auto hdr = getEventHeader_(buf);
    1340          439 :                 auto thisEventSize = BufferDataSize(buf);
    1341              : 
    1342          439 :                 if (update_run_ids_ && hdr->subrun_id < subrun_id_)
    1343              :                 {
    1344            0 :                         hdr->subrun_id = subrun_id_;
    1345              :                 }
    1346          439 :                 bool currentSubrun = hdr->subrun_id == subrun_id_;
    1347              : 
    1348          439 :                 if (hdr->subrun_id > subrun_id_ && (available_buffers > 0 || TimeUtils::GetElapsedTime(last_event_time_) < subrun_transition_hold_time_s_))
    1349              :                 {
    1350            0 :                         TLOG(TLVL_CHECKPENDINGBUFFERS_4) << "Holding event " << std::to_string(hdr->sequence_id) << " (sr=" << hdr->subrun_id << ") in buffer " << buf << ", "
    1351            0 :                                                          << "event_size=" << thisEventSize << ", buffer_size=" << BufferSize();
    1352            0 :                         continue;
    1353            0 :                 }
    1354              : 
    1355          878 :                 TLOG(TLVL_CHECKPENDINGBUFFERS_4) << "Releasing event " << std::to_string(hdr->sequence_id) << " (sr=" << hdr->subrun_id << ") in buffer " << buf << " to art, "
    1356          439 :                                                  << "event_size=" << thisEventSize << ", buffer_size=" << BufferSize();
    1357          439 :                 statsHelper_.addSample(EVENTS_RELEASED_STAT_KEY, thisEventSize);
    1358              : 
    1359          878 :                 TLOG(TLVL_BUFFER) << "check_pending_buffers_ removing buffer " << buf << " moving from pending to full";
    1360          439 :                 MarkBufferFull(buf);
    1361          439 :                 run_event_count_++;
    1362          439 :                 counter++;
    1363          439 :                 eventSize += thisEventSize;
    1364          439 :                 eventTime += TimeUtils::GetElapsedTime(event_timing_[buf]);
    1365          439 :                 pending_buffers_.erase(buf);
    1366          439 :                 if (currentSubrun)
    1367              :                 {
    1368          439 :                         last_event_time_ = std::chrono::steady_clock::now();
    1369              :                 }
    1370              :         }
    1371         3156 :         TLOG(TLVL_BUFFER) << "Buffer occupancy now (total,full,reading,empty,pending,active)=("
    1372            0 :                           << size() << ","
    1373            0 :                           << ReadReadyCount() << ","
    1374            0 :                           << WriteReadyCount(true) - WriteReadyCount(false) - ReadReadyCount() << ","
    1375            0 :                           << WriteReadyCount(false) << ","
    1376            0 :                           << pending_buffers_.size() << ","
    1377         1578 :                           << active_buffers_.size() << ")";
    1378              : 
    1379         1578 :         if (tokens_ && tokens_->RoutingTokenSendsEnabled())
    1380              :         {
    1381            0 :                 TLOG(TLVL_CHECKPENDINGBUFFERS_3) << "Sent tokens: " << tokens_->GetSentTokenCount() << ", Event count: " << run_event_count_;
    1382            0 :                 auto outstanding_tokens = tokens_->GetSentTokenCount() - run_event_count_;
    1383              : 
    1384            0 :                 TLOG(TLVL_CHECKPENDINGBUFFERS_3) << "check_pending_buffers_: outstanding_tokens: " << outstanding_tokens << ", available_buffers: " << available_buffers
    1385            0 :                                                  << ", tokens_to_send: " << available_buffers - outstanding_tokens;
    1386              : 
    1387            0 :                 if (available_buffers > outstanding_tokens)
    1388              :                 {
    1389            0 :                         auto tokens_to_send = available_buffers - outstanding_tokens;
    1390              : 
    1391            0 :                         while (tokens_to_send > 0)
    1392              :                         {
    1393            0 :                                 TLOG(TLVL_CHECKPENDINGBUFFERS_3) << "check_pending_buffers_: Sending a Routing Token";
    1394            0 :                                 tokens_->SendRoutingToken(1, run_id_);
    1395            0 :                                 tokens_to_send--;
    1396              :                         }
    1397              :                 }
    1398              :         }
    1399              : 
    1400         1578 :         if (statsHelper_.readyToReport())
    1401              :         {
    1402            0 :                 std::string statString = buildStatisticsString_();
    1403            0 :                 TLOG(TLVL_INFO) << statString;
    1404            0 :         }
    1405              : 
    1406         1578 :         if (metricMan)
    1407              :         {
    1408         3156 :                 TLOG(TLVL_CHECKPENDINGBUFFERS_2) << "check_pending_buffers_: Sending Metrics";
    1409         9468 :                 metricMan->sendMetric("Event Rate", counter, "Events", 1, MetricMode::Rate);
    1410         9468 :                 metricMan->sendMetric("Data Rate", eventSize, "Bytes", 1, MetricMode::Rate);
    1411         1578 :                 if (counter > 0)
    1412              :                 {
    1413         2634 :                         metricMan->sendMetric("Average Event Size", eventSize / counter, "Bytes", 1, MetricMode::Average);
    1414         3073 :                         metricMan->sendMetric("Average Event Building Time", eventTime / counter, "s", 1, MetricMode::Average);
    1415              :                 }
    1416              : 
    1417         9468 :                 metricMan->sendMetric("Events Released to art this run", run_event_count_, "Events", 1, MetricMode::LastPoint);
    1418         9468 :                 metricMan->sendMetric("Incomplete Events Released to art this run", run_incomplete_event_count_, "Events", 1, MetricMode::LastPoint);
    1419         1578 :                 if (tokens_ && tokens_->RoutingTokenSendsEnabled())
    1420              :                 {
    1421            0 :                         metricMan->sendMetric("Tokens sent", tokens_->GetSentTokenCount(), "Tokens", 2, MetricMode::LastPoint);
    1422              :                 }
    1423              : 
    1424         1578 :                 auto bufferReport = GetBufferReport();
    1425         1578 :                 int full = 0, empty = 0, writing = 0, reading = 0;
    1426        17236 :                 for (auto& buf : bufferReport)
    1427              :                 {
    1428        15658 :                         switch (buf.second)
    1429              :                         {
    1430        11853 :                                 case BufferSemaphoreFlags::Full:
    1431        11853 :                                         full++;
    1432        11853 :                                         break;
    1433         3068 :                                 case BufferSemaphoreFlags::Empty:
    1434         3068 :                                         empty++;
    1435         3068 :                                         break;
    1436           14 :                                 case BufferSemaphoreFlags::Writing:
    1437           14 :                                         writing++;
    1438           14 :                                         break;
    1439          723 :                                 case BufferSemaphoreFlags::Reading:
    1440          723 :                                         reading++;
    1441          723 :                                         break;
    1442              :                         }
    1443              :                 }
    1444         1578 :                 auto total = size();
    1445         3156 :                 TLOG(TLVL_CHECKPENDINGBUFFERS_2) << "Buffer usage: full=" << full << ", empty=" << empty << ", writing=" << writing << ", reading=" << reading << ", total=" << total;
    1446              : 
    1447         9468 :                 metricMan->sendMetric("Shared Memory Full Buffers", full, "buffers", 2, MetricMode::LastPoint);
    1448         9468 :                 metricMan->sendMetric("Shared Memory Available Buffers", empty, "buffers", 2, MetricMode::LastPoint);
    1449         9468 :                 metricMan->sendMetric("Shared Memory Pending Buffers", writing, "buffers", 2, MetricMode::LastPoint);
    1450         9468 :                 metricMan->sendMetric("Shared Memory Reading Buffers", reading, "buffers", 2, MetricMode::LastPoint);
    1451         1578 :                 if (total > 0)
    1452              :                 {
    1453         9468 :                         metricMan->sendMetric("Shared Memory Full %", full * 100 / static_cast<double>(total), "%", 2, MetricMode::LastPoint);
    1454        11046 :                         metricMan->sendMetric("Shared Memory Available %", empty * 100 / static_cast<double>(total), "%", 2, MetricMode::LastPoint);
    1455              :                 }
    1456         1578 :         }
    1457         3156 :         TLOG(TLVL_CHECKPENDINGBUFFERS) << "check_pending_buffers_ END";
    1458         1578 : }
    1459              : 
    1460            0 : void artdaq::SharedMemoryEventManager::BroadcastFragment(FragmentPtr& frag)
    1461              : {
    1462              :         {
    1463            0 :                 std::lock_guard<std::mutex> lk(broadcast_mutex_);
    1464              : 
    1465            0 :                 bool entry_found = false;
    1466            0 :                 for (auto& entry : pending_broadcasts_)
    1467              :                 {
    1468            0 :                         if (entry.type == frag->type() && entry.sequence_id == frag->sequenceID())
    1469              :                         {
    1470            0 :                                 TLOG(TLVL_BROADCASTFRAGMENT) << "Received BroadcastFragment of type " << static_cast<int>(frag->type()) << ", seqID " << frag->sequenceID() << " matching current pending_broadcasts_ entry. frags=" << entry.fragments.size() + 1 << "/" << init_fragment_count_;
    1471            0 :                                 entry.fragments.push_back(std::move(frag));
    1472            0 :                                 entry_found = true;
    1473            0 :                                 break;
    1474              :                         }
    1475              :                 }
    1476            0 :                 if (!entry_found)
    1477              :                 {
    1478            0 :                         TLOG(TLVL_BROADCASTFRAGMENT) << "Received BroadcastFragment of type " << static_cast<int>(frag->type()) << ", seqID " << frag->sequenceID() << ", creating new pending_broadcasts_ entry";
    1479            0 :                         pending_broadcasts_.emplace_back();
    1480            0 :                         pending_broadcasts_.back().deadline = std::chrono::steady_clock::now() + std::chrono::microseconds(GetBufferTimeout());
    1481            0 :                         pending_broadcasts_.back().type = frag->type();
    1482            0 :                         pending_broadcasts_.back().sequence_id = frag->sequenceID();
    1483            0 :                         pending_broadcasts_.back().subrun_id = GetSubrunForSequenceID(frag->sequenceID());
    1484            0 :                         pending_broadcasts_.back().fragments.push_back(std::move(frag));
    1485              :                 }
    1486            0 :         }
    1487            0 :         check_pending_broadcasts_();
    1488            0 : }
    1489              : 
    1490         2105 : void artdaq::SharedMemoryEventManager::check_pending_broadcasts_()
    1491              : {
    1492         2105 :         std::lock_guard<std::mutex> lk(broadcast_mutex_);
    1493              : 
    1494         2105 :         auto entry = pending_broadcasts_.begin();
    1495         2105 :         auto now = std::chrono::steady_clock::now();
    1496         2105 :         while (entry != pending_broadcasts_.end())
    1497              :         {
    1498            0 :                 if (running_)
    1499              :                 {
    1500            0 :                         if ((!init_frags_sent_ || entry->fragments.size() == 0 || (entry->fragments.size() < init_fragment_count_ && now < entry->deadline)))
    1501              :                         {
    1502            0 :                                 entry++;
    1503            0 :                                 continue;
    1504              :                         }
    1505            0 :                         if (entry->fragments.front()->type() == Fragment::EndOfSubrunFragmentType || entry->fragments.front()->type() == artdaq::Fragment::SubrunDataFragmentType)
    1506              :                         {
    1507            0 :                                 if (entry->subrun_id == subrun_id_ && TimeUtils::GetElapsedTime(last_event_time_) < subrun_transition_hold_time_s_)
    1508              :                                 {
    1509            0 :                                         TLOG(TLVL_CHECKPENDINGBROADCASTS) << "Holding entry size = " << entry->fragments.size() << " / " << init_fragment_count_ << ", lead SeqID = " << entry->fragments.front()->sequenceID() << ", subrun=" << entry->subrun_id << " because it is EndOfSubrun and hold time has not expired";
    1510            0 :                                         entry++;
    1511            0 :                                         continue;
    1512            0 :                                 }
    1513              :                         }
    1514              :                 }
    1515              : 
    1516            0 :                 TLOG(TLVL_CHECKPENDINGBROADCASTS) << "Broadcasting entry init_frags_sent_=" << init_frags_sent_ << ", size=" << entry->fragments.size() << "/" << init_fragment_count_ << ", subrun=" << entry->subrun_id << ", lead SeqID=" << entry->fragments.front()->sequenceID() << " deadline delta=" << std::chrono::duration_cast<std::chrono::microseconds>(now - entry->deadline).count() << " us";
    1517            0 :                 broadcastFragments_(entry->fragments);
    1518            0 :                 entry = pending_broadcasts_.erase(entry);
    1519              :         }
    1520         2105 : }
    1521              : 
    1522           22 : bool artdaq::SharedMemoryEventManager::broadcastFragments_(FragmentPtrs& frags)
    1523              : {
    1524           22 :         if (frags.empty())
    1525              :         {
    1526            0 :                 TLOG(TLVL_ERROR) << "Requested broadcast but no Fragments given!";
    1527            0 :                 return false;
    1528              :         }
    1529           22 :         if (!broadcasts_.IsValid())
    1530              :         {
    1531            0 :                 TLOG(TLVL_ERROR) << "Broadcast attempted but broadcast shared memory is unavailable!";
    1532            0 :                 return false;
    1533              :         }
    1534           44 :         TLOG(TLVL_BROADCASTFRAGMENTS) << "Broadcasting " << frags.size() << " Fragments with lead seqID=" << frags.front()->sequenceID()
    1535            0 :                                       << ", type " << detail::RawFragmentHeader::SystemTypeToString(frags.front()->type())
    1536           22 :                                       << ", size=" << frags.front()->sizeBytes() << "B.";
    1537           22 :         auto buffer = broadcasts_.GetBufferForWriting(false);
    1538           44 :         TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_: after getting buffer 1st buffer=" << buffer;
    1539           22 :         auto start_time = std::chrono::steady_clock::now();
    1540           22 :         while (buffer == -1 && TimeUtils::GetElapsedTimeMilliseconds(start_time) < static_cast<size_t>(broadcast_timeout_ms_))
    1541              :         {
    1542            0 :                 usleep(10000);
    1543            0 :                 buffer = broadcasts_.GetBufferForWriting(true);  // Go into overwrite mode
    1544              :         }
    1545           44 :         TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_: after getting buffer w/timeout, buffer=" << buffer << ", elapsed time=" << TimeUtils::GetElapsedTime(start_time) << " s.";
    1546           22 :         if (buffer == -1)
    1547              :         {
    1548            0 :                 TLOG(TLVL_ERROR) << "Broadcast of fragment type " << frags.front()->typeString() << " failed due to timeout waiting for buffer!";
    1549            0 :                 return false;
    1550              :         }
    1551              : 
    1552           44 :         TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_: Filling in RawEventHeader";
    1553           22 :         auto hdr = reinterpret_cast<detail::RawEventHeader*>(broadcasts_.GetBufferStart(buffer));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
    1554           22 :         hdr->run_id = run_id_;
    1555           22 :         hdr->subrun_id = GetSubrunForSequenceID(frags.front()->sequenceID());
    1556           22 :         hdr->sequence_id = frags.front()->sequenceID();
    1557           22 :         hdr->is_complete = true;
    1558           22 :         broadcasts_.IncrementWritePos(buffer, sizeof(detail::RawEventHeader));
    1559              : 
    1560           22 :         if (frags.front()->type() == artdaq::Fragment::EndOfSubrunFragmentType || frags.front()->type() == artdaq::Fragment::SubrunDataFragmentType)
    1561              :         {
    1562            0 :                 subrun_id_ = hdr->subrun_id + 1;
    1563              :         }
    1564              : 
    1565           44 :         for (auto& frag : frags)
    1566              :         {
    1567           22 :                 if (frag->sequenceID() != hdr->sequence_id || frag->type() != frags.front()->type())
    1568              :                 {
    1569            0 :                         TLOG(TLVL_WARNING) << "Skipping fragment due to Type/SeqID mismatch! seqID=" << frag->sequenceID() << " (expected " << hdr->sequence_id << "), type=" << static_cast<int>(frag->type()) << " (" << static_cast<int>(frags.front()->type()) << ")";
    1570            0 :                         continue;
    1571            0 :                 }
    1572           44 :                 TLOG(TLVL_BROADCASTFRAGMENTS_2) << "broadcastFragments_ before Fragment Write call seqID=" << frag->sequenceID() << ", fragID=" << frag->fragmentID() << ", type=" << static_cast<int>(frag->type());
    1573           22 :                 broadcasts_.Write(buffer, frag->headerAddress(), frag->size() * sizeof(RawDataType));
    1574              :         }
    1575              : 
    1576           44 :         TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_ Marking buffer full";
    1577           22 :         broadcasts_.MarkBufferFull(buffer, -1);
    1578           44 :         TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_ Complete";
    1579           22 :         return true;
    1580              : }
    1581              : 
    1582            0 : std::vector<char*> artdaq::SharedMemoryEventManager::parse_art_command_line_(const std::shared_ptr<art_config_file>& config_file, size_t process_index)
    1583              : {
    1584            0 :         auto offset_index = process_index + art_process_index_offset_;
    1585            0 :         TLOG(TLVL_PARSEARTCOMMANDLINE) << "parse_art_command_line_: Parsing command line " << art_cmdline_ << ", config_file: " << config_file->getFileName() << ", index: " << process_index << " (w/offset: " << offset_index << ")";
    1586            0 :         std::string art_cmdline_tmp = art_cmdline_;
    1587            0 :         auto filenameit = art_cmdline_tmp.find("#CONFIG_FILE#");
    1588            0 :         if (filenameit != std::string::npos)
    1589              :         {
    1590            0 :                 art_cmdline_tmp.replace(filenameit, 13, config_file->getFileName());
    1591              :         }
    1592            0 :         auto indexit = art_cmdline_tmp.find("#PROCESS_INDEX#");
    1593            0 :         if (indexit != std::string::npos)
    1594              :         {
    1595            0 :                 art_cmdline_tmp.replace(indexit, 15, std::to_string(offset_index));
    1596              :         }
    1597            0 :         TLOG(TLVL_PARSEARTCOMMANDLINE) << "parse_art_command_line_: After replacing index and config parameters, command line is " << art_cmdline_tmp;
    1598              : 
    1599            0 :         std::istringstream iss(art_cmdline_tmp);
    1600            0 :         auto tokens = std::vector<std::string>{std::istream_iterator<std::string>{iss}, std::istream_iterator<std::string>{}};
    1601            0 :         std::vector<char*> output;
    1602              : 
    1603            0 :         for (auto& token : tokens)
    1604              :         {
    1605            0 :                 TLOG(TLVL_PARSEARTCOMMANDLINE) << "parse_art_command_line_: Adding cmdline token " << token << " to output list";
    1606            0 :                 output.emplace_back(new char[token.length() + 1]);
    1607            0 :                 memcpy(output.back(), token.c_str(), token.length());
    1608            0 :                 output.back()[token.length()] = '\0';  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
    1609              :         }
    1610            0 :         output.emplace_back(nullptr);
    1611              : 
    1612            0 :         return output;
    1613            0 : }
    1614              : 
    1615            6 : void artdaq::SharedMemoryEventManager::send_init_frags_()
    1616              : {
    1617            6 :         std::lock_guard<std::mutex> lk(init_fragments_mutex_);
    1618            6 :         if (init_fragment_map_size_() >= init_fragment_count_ && init_fragment_count_ > 0)
    1619              :         {
    1620            0 :                 TLOG(TLVL_INFO) << "Broadcasting " << init_fragment_map_size_() << " Init Fragment(s) to all art subprocesses...";
    1621              : 
    1622            0 :                 FragmentPtrs init_fragments;
    1623            0 :                 for (auto& fragment_id_pair : init_fragment_map_)
    1624              :                 {
    1625            0 :                         for (auto& ts_pair : fragment_id_pair.second)
    1626              :                         {
    1627            0 :                                 init_fragments.emplace_back(std::make_unique<Fragment>(*ts_pair.second));
    1628              :                         }
    1629              :                 }
    1630              : 
    1631            0 :                 broadcastFragments_(init_fragments);
    1632            0 :                 TLOG(TLVL_SENDINIT) << "Init Fragment sent";
    1633            0 :                 init_frags_sent_ = true;
    1634            0 :         }
    1635            6 :         else if (init_fragment_count_ > 0 && init_fragment_map_size_() == 0)
    1636              :         {
    1637            0 :                 TLOG(TLVL_WARNING) << "Cannot send Init Fragment(s) because I haven't yet received them! Set send_init_fragments to false or init_fragment_count to 0 if this process does not receive serialized art events to avoid potentially lengthy timeouts!";
    1638              :         }
    1639            6 :         else if (init_fragment_count_ > 0)
    1640              :         {
    1641            0 :                 TLOG(TLVL_INFO) << "Cannot send Init Fragment(s) because I haven't yet received them (have " << init_fragment_map_size_() << " of " << init_fragment_count_ << ")!";
    1642              :         }
    1643              :         else
    1644              :         {
    1645              :                 // Send an empty Init Fragment so that ArtdaqInput knows that this is a pure-Fragment input
    1646            6 :                 artdaq::FragmentPtrs begin_run_fragments_;
    1647            6 :                 begin_run_fragments_.emplace_back(new artdaq::Fragment());
    1648            6 :                 begin_run_fragments_.back()->setSystemType(artdaq::Fragment::InitFragmentType);
    1649            6 :                 broadcastFragments_(begin_run_fragments_);
    1650            6 :                 init_frags_sent_ = true;
    1651            6 :         }
    1652            6 : }
    1653              : 
    1654            0 : void artdaq::SharedMemoryEventManager::AddInitFragment(FragmentPtr& frag)
    1655              : {
    1656            0 :         std::unique_lock<std::mutex> lk(init_fragments_mutex_);
    1657              : 
    1658            0 :         auto fragId = frag->fragmentID();
    1659            0 :         auto ts = frag->timestamp();
    1660              : 
    1661            0 :         init_fragment_map_[fragId][ts] = std::move(frag);
    1662            0 :         TLOG(TLVL_ADDINITFRAGMENT) << "Received Init Fragment from rank " << fragId << ", art process id " << ts << ". Now have " << init_fragment_map_size_() << " of " << init_fragment_count_;
    1663              : 
    1664              :         // Don't send until all init fragments have been received
    1665            0 :         if (init_fragment_map_size_() >= init_fragment_count_)
    1666              :         {
    1667            0 :                 lk.unlock();
    1668            0 :                 send_init_frags_();
    1669              :         }
    1670            0 : }
    1671              : 
    1672            6 : size_t artdaq::SharedMemoryEventManager::init_fragment_map_size_() const
    1673              : {
    1674            6 :         size_t size = 0;
    1675              : 
    1676            6 :         for (auto& frag_id_pair : init_fragment_map_)
    1677              :         {
    1678            0 :                 size += frag_id_pair.second.size();
    1679              :         }
    1680              : 
    1681            6 :         return size;
    1682              : }
    1683              : 
    1684            0 : void artdaq::SharedMemoryEventManager::UpdateArtConfiguration(fhicl::ParameterSet art_pset)
    1685              : {
    1686            0 :         TLOG(TLVL_UPDATEARTCONFIG) << "UpdateArtConfiguration BEGIN";
    1687            0 :         if (art_pset != current_art_pset_ || !current_art_config_file_)
    1688              :         {
    1689            0 :                 current_art_pset_ = art_pset;
    1690            0 :                 if (manual_art_)
    1691            0 :                         current_art_config_file_ = std::make_shared<art_config_file>(art_pset, GetKey(), GetBroadcastKey());
    1692              :                 else
    1693            0 :                         current_art_config_file_ = std::make_shared<art_config_file>(art_pset);
    1694              :         }
    1695            0 :         TLOG(TLVL_UPDATEARTCONFIG) << "UpdateArtConfiguration END";
    1696            0 : }
    1697              : 
    1698            0 : std::string artdaq::SharedMemoryEventManager::buildStatisticsString_() const
    1699              : {
    1700            0 :         std::ostringstream oss;
    1701            0 :         oss << app_name << " statistics:" << std::endl;
    1702              : 
    1703              :         artdaq::MonitoredQuantityPtr mqPtr =
    1704            0 :             artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(EVENTS_RELEASED_STAT_KEY);
    1705            0 :         if (mqPtr.get() != nullptr)
    1706              :         {
    1707            0 :                 artdaq::MonitoredQuantityStats stats;
    1708            0 :                 mqPtr->getStats(stats);
    1709            0 :                 oss << "  Event statistics: " << stats.recentSampleCount << " events released at " << stats.recentSampleRate
    1710            0 :                     << " events/sec, effective data rate = "
    1711            0 :                     << (stats.recentValueRate / 1024.0 / 1024.0)
    1712            0 :                     << " MB/sec, monitor window = " << stats.recentDuration
    1713            0 :                     << " sec, min::max event size = " << (stats.recentValueMin / 1024.0 / 1024.0)
    1714            0 :                     << "::" << (stats.recentValueMax / 1024.0 / 1024.0) << " MB" << std::endl;
    1715            0 :                 if (stats.recentSampleRate > 0.0)
    1716              :                 {
    1717            0 :                         oss << "  Average time per event: ";
    1718            0 :                         oss << " elapsed time = " << (1.0 / stats.recentSampleRate) << " sec" << std::endl;
    1719              :                 }
    1720            0 :         }
    1721              : 
    1722            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_RECEIVED_STAT_KEY);
    1723            0 :         if (mqPtr.get() != nullptr)
    1724              :         {
    1725            0 :                 artdaq::MonitoredQuantityStats stats;
    1726            0 :                 mqPtr->getStats(stats);
    1727            0 :                 oss << "  Fragment statistics: " << stats.recentSampleCount << " fragments received at " << stats.recentSampleRate
    1728            0 :                     << " fragments/sec, effective data rate = "
    1729            0 :                     << (stats.recentValueRate / 1024.0 / 1024.0)
    1730            0 :                     << " MB/sec, monitor window = " << stats.recentDuration
    1731            0 :                     << " sec, min::max fragment size = " << (stats.recentValueMin / 1024.0 / 1024.0)
    1732            0 :                     << "::" << (stats.recentValueMax / 1024.0 / 1024.0) << " MB" << std::endl;
    1733            0 :         }
    1734              : 
    1735            0 :         oss << "  Event counts: Run -- " << run_event_count_ << " Total, " << run_incomplete_event_count_ << " Incomplete."
    1736            0 :             << "  Subrun -- " << subrun_event_count_ << " Total, " << subrun_incomplete_event_count_ << " Incomplete. "
    1737            0 :             << std::endl;
    1738              :         //-----------------------------------------------------------------------------
    1739              :         // P.Murat: add statistics on the SHM buffers
    1740              :         // there are 4 different flags: 0:empty, 1:writing; 2:full 3:reading
    1741              :         // want statistics on all of them
    1742              :         //-----------------------------------------------------------------------------
    1743              :         // auto = std::vector<std::pair<int, artdaq::SharedMemoryManager::BufferSemaphoreFlags>>
    1744            0 :         artdaq::SharedMemoryEventManager* nc_this = (artdaq::SharedMemoryEventManager*)this;
    1745              : 
    1746            0 :         int bsize = nc_this->BufferSize();
    1747              : 
    1748            0 :         auto v = nc_this->GetBufferReport();
    1749              : 
    1750            0 :         int nbb[4] = {0, 0, 0, 0};
    1751              : 
    1752            0 :         int nbuff = v.size();
    1753            0 :         for (int i = 0; i < nbuff; i++)
    1754              :         {
    1755            0 :                 auto x = v[i];
    1756            0 :                 int flag = (int)x.second;
    1757            0 :                 nbb[flag]++;
    1758              :         }
    1759              : 
    1760            0 :         oss << "  Shared Memory: "
    1761            0 :             << nbuff << " buffers of " << bsize << " B, "
    1762            0 :             << nbb[0] << " Empty, " << nbb[1] << " Writing, " << nbb[2] << " Full, " << nbb[3] << " reading"
    1763            0 :             << std::endl;
    1764              : 
    1765            0 :         return oss.str();
    1766            0 : }
        

Generated by: LCOV version 2.0-1