C based timer: Difference between revisions

From SoftwareGuy
Jump to navigation Jump to search
Mark (talk | contribs)
No edit summary
Mark (talk | contribs)
Line 42: Line 42:


= Python timer =
= Python timer =
<code>
<code>import threading, time, signal
import threading, time, signal


from datetime import timedelta
from datetime import timedelta
Line 51: Line 50:
class ProgramKilled(Exception):
class ProgramKilled(Exception):
     pass
     pass
def foo():
def foo():
     print time.ctime()
     print time.ctime()
   
def signal_handler(signum, frame):
def signal_handler(signum, frame):
     raise ProgramKilled
     raise ProgramKilled
   
class Job(threading.Thread):
class Job(threading.Thread):
     def __init__(self, interval, execute, *args, **kwargs):
     def __init__(self, interval, execute, *args, **kwargs):
Line 74: Line 70:
             while not self.stopped.wait(self.interval.total_seconds()):
             while not self.stopped.wait(self.interval.total_seconds()):
                 self.execute(*self.args, **self.kwargs)
                 self.execute(*self.args, **self.kwargs)
           
if __name__ == "__main__":
if __name__ == "__main__":
     signal.signal(signal.SIGTERM, signal_handler)
     signal.signal(signal.SIGTERM, signal_handler)
Line 88: Line 83:
               job.stop()
               job.stop()
               break
               break
</code>
</code>
#output
#output

Revision as of 14:34, 28 March 2024

C Timer

#include <iostream>

  1. include <unistd.h>

<using namespace std;

// Define the callback function

void callback() {

cout << "Timer callback called!" << endl;

}

// Create the timer

timer_t timer;

struct sigevent sigev;

// Set up the timer

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_notify_function = callback;

sigev.sigev_value.sival_ptr = &timer;

timer_create(CLOCK_REALTIME, &sigev, &timer);

// Start the timer

timer_settime(timer, 0, new itimerspec{1, 0}, NULL);

// Wait for the timer to expire

pause();

// Delete the timer

timer_delete(timer);

Python timer

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):

   pass

def foo():

   print time.ctime()

def signal_handler(signum, frame):

   raise ProgramKilled

class Job(threading.Thread):

   def __init__(self, interval, execute, *args, **kwargs):
       threading.Thread.__init__(self)
       self.daemon = False
       self.stopped = threading.Event()
       self.interval = interval
       self.execute = execute
       self.args = args
       self.kwargs = kwargs
       
   def stop(self):
               self.stopped.set()
               self.join()
   def run(self):
           while not self.stopped.wait(self.interval.total_seconds()):
               self.execute(*self.args, **self.kwargs)

if __name__ == "__main__":

   signal.signal(signal.SIGTERM, signal_handler)
   signal.signal(signal.SIGINT, signal_handler)
   job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
   job.start()
   
   while True:
         try:
             time.sleep(1)
         except ProgramKilled:
             print "Program killed: running cleanup code"
             job.stop()
             break

  1. output
  2. Tue Oct 16 17:47:51 2018
  3. Tue Oct 16 17:47:52 2018
  4. Tue Oct 16 17:47:53 2018
  5. ^CProgram killed: running cleanup code