Cppgram  1.0.0
Easy and modern C++14 Telegram Bot API wrapper
polling_impl.hpp
1 #include <thread>
2 
3 #include "polling.hpp"
4 
5 template <class T>
7  uint_fast8_t threads_number,
8  uint_fast32_t limit,
9  uint_fast32_t timeout )
10  : limit( limit )
11  , timeout( timeout )
12  , updates_queue( 150 )
13 {
14  bots = std::vector<T>( threads_number, bot );
15  console_stdout = spdlog::stdout_color_mt( "console" );
16  console_stderr = spdlog::stderr_color_mt( "error_console" );
17 }
18 
19 template <class T>
20 cppgram::Polling<T>::Polling( std::vector<T> bots, uint_fast32_t limit, uint_fast32_t timeout )
21  : bots( bots )
22  , limit( limit )
23  , timeout( timeout )
24  , updates_queue( 150 )
25 {
26 }
27 
28 template <class T>
29 void
31 {
32  uint_fast8_t size = bots.size();
33  if ( size == 0 )
34  {
35  return;
36  }
37 
38  init();
39 
40  if ( size == 1 )
41  {
42  runSinglethread();
43  }
44 
45  runMultithread();
46 }
47 
48 template <class T>
49 void
51 {
52  T poller = bots.back();
53  bots.pop_back();
54  std::vector<std::thread> threads;
55  for ( T &bot : bots )
56  {
57  threads.push_back( std::thread( &Polling::loopBot, this, std::move( bot ) ) );
58  }
59 
60  // Array objects has been moved, clear it
61  bots.clear();
62 
63  setThreadAffinity( threads );
64 
65  console_stdout->info( "Bots started." );
66 
67  moodycamel::ProducerToken producer_token( updates_queue );
68 
69  std::vector<cppgram::types::Update> updates;
70  uint_fast32_t updates_offset = firstUpdateID( poller );
71 
72  // Get the updates until the program has been stopped by the SIGINT
73  while ( 1 )
74  {
75  uint_fast32_t count;
76  if ( poller.getUpdates( updates, updates_offset, limit, timeout ) )
77  {
78  count = updates.size();
79  updates_queue.enqueue_bulk( producer_token, updates.begin(), count );
80  updates_offset += count;
81  updates.clear();
82  }
83  }
84 
85  for ( std::thread &t : threads )
86  {
87  t.join();
88  }
89 }
90 
91 template <class T>
92 void
94 {
95  console_stdout->info( "Bots started." );
96  auto bot = bots.back();
97  uint_fast32_t updates_offset = firstUpdateID( bot );
98  while ( 1 )
99  {
100  std::vector<cppgram::types::Update> updates;
101  if ( bot.getUpdates( updates, updates_offset ) )
102  {
103  for ( auto update : updates )
104  {
105  bot.processUpdate( std::move( update ) );
106  }
107  updates_offset += updates.size();
108  updates.clear();
109  }
110  }
111 }
112 
113 template <class T>
114 void
115 cppgram::Polling<T>::setThreadAffinity( std::vector<std::thread> &threads )
116 {
117  u_int8_t cores = std::thread::hardware_concurrency();
118  if ( threads.size() <= cores )
119  {
120  for ( uint_fast8_t i = 0; i < threads.size(); i++ )
121  {
122  cpu_set_t cpuset;
123  CPU_ZERO( &cpuset );
124  CPU_SET( i, &cpuset );
125  int rc = pthread_setaffinity_np(
126  threads[i].native_handle(), sizeof( cpu_set_t ), &cpuset );
127  if ( rc != 0 )
128  {
129  console_stderr->critical( "Error calling pthread_setaffinity_np: "
130  + std::to_string( rc ) );
131  }
132  }
133  console_stdout->info( "Thread affinity per cpu set." );
134  }
135 }
136 
137 template <class T>
138 void
140 {
141  cppgram::types::Update new_update;
142  while ( 1 )
143  {
144  updates_queue.wait_dequeue( new_update );
145  bot.processUpdate( std::move( new_update ) );
146  }
147 }
148 
149 template <class T>
150 uint_fast32_t
152 {
153  std::vector<cppgram::types::Update> first_update;
154  while ( !poller.getUpdates( first_update, 0, 1, timeout ) )
155  ;
156  return first_update[0].update_id - 1;
157 }
158 
159 template <class T>
160 void
162 {
163  auto sink = std::make_shared<spdlog::sinks::simple_file_sink_mt>( "bot.log" );
164 
165  for ( auto &bot : bots )
166  {
167  if ( bot.logger_ptr == nullptr )
168  {
169  bot.setLogger( sink );
170  bot.init();
171  }
172  }
173 
174  // Set async mode for loggers, and put interval based flush
175  spdlog::set_async_mode( 8192,
176  spdlog::async_overflow_policy::block_retry,
177  nullptr,
178  std::chrono::seconds( 300 ) );
179 }
std::shared_ptr< spdlog::logger > console_stdout
Logger that will print messages to the console.
Definition: polling.hpp:94
moodycamel::BlockingConcurrentQueue< types::Update > updates_queue
Queue of the updates to process.
Definition: polling.hpp:88
void init()
Init logger for each bot.
Definition: polling_impl.hpp:161
uint_fast32_t firstUpdateID(T &poller)
Get the offset of the first update to process.
Definition: polling_impl.hpp:151
std::vector< T > bots
Vector of bots.
Definition: polling.hpp:85
Polling(T bot, uint_fast8_t threads_number, uint_fast32_t limit=100, uint_fast32_t timeout=60)
Create a polling object by passing a single bot, and the number of threads.
Definition: polling_impl.hpp:6
std::shared_ptr< spdlog::logger > console_stderr
Logger that will print error messages to the console.
Definition: polling.hpp:91
void run()
Start the polling.
Definition: polling_impl.hpp:30
Handle long (or short) polling for a bot.
Definition: polling.hpp:12