C based timer: Difference between revisions

From SoftwareGuy
Jump to navigation Jump to search
Mark (talk | contribs)
Mark (talk | contribs)
Line 129: Line 129:
----
----
<code>
<code>
import threading
import threading
def do_every (interval, worker_func, iterations = 0):
def do_every (interval, worker_func, iterations = 0):
   if iterations != 1:
   if iterations != 1:
     threading.Timer (
     threading.Timer (
Line 137: Line 137:
     ).start ()
     ).start ()
   worker_func ()
   worker_func ()
def print_hw ():
def print_hw ():
   print "hello world"
   print "hello world"
def print_so ():
def print_so ():
   print "stackoverflow"
   print "stackoverflow"
# call print_so every second, 5 times total
# call print_so every second, 5 times total
do_every (1, print_so, 5)
do_every (1, print_so, 5)
# call print_hw two times per second, forever
# call print_hw two times per second, forever
do_every (0.5, print_hw)
do_every (0.5, print_hw)
</code>
</code>
----
----

Revision as of 15:23, 1 April 2024

C Timer

#include <iostream>

  1. include <unistd.h>

<using namespace std;

// Define the callback function

void callback() {

cout << "Timer callback called!" << endl;

}

// Create the timer

timer_t timer;

struct sigevent sigev;

// Set up the timer

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_notify_function = callback;

sigev.sigev_value.sival_ptr = &timer;

timer_create(CLOCK_REALTIME, &sigev, &timer);

// Start the timer

timer_settime(timer, 0, new itimerspec{1, 0}, NULL);

// Wait for the timer to expire

pause();

// Delete the timer

timer_delete(timer);

Python timer

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):

   pass

def foo():

   print time.ctime()

def signal_handler(signum, frame):

   raise ProgramKilled

class Job(threading.Thread):

   def __init__(self, interval, execute, *args, **kwargs):
       threading.Thread.__init__(self)
       self.daemon = False
       self.stopped = threading.Event()
       self.interval = interval
       self.execute = execute
       self.args = args
       self.kwargs = kwargs
       
   def stop(self):
               self.stopped.set()
               self.join()
   def run(self):
           while not self.stopped.wait(self.interval.total_seconds()):
               self.execute(*self.args, **self.kwargs)

if __name__ == "__main__":

   signal.signal(signal.SIGTERM, signal_handler)
   signal.signal(signal.SIGINT, signal_handler)
   job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
   job.start()
   
   while True:
         try:
             time.sleep(1)
         except ProgramKilled:
             print "Program killed: running cleanup code"
             job.stop()
             break

  1. output
  2. Tue Oct 16 17:47:51 2018
  3. Tue Oct 16 17:47:52 2018
  4. Tue Oct 16 17:47:53 2018

More examples

import datetime, threading, time

next_call = time.time()

def foo():

   global next_call
   print datetime.datetime.now()
   next_call = next_call+1
   threading.Timer( next_call - time.time(), foo ).start()

foo()


import datetime, threading, time

def foo():

   next_call = time.time()
   while True:
       print datetime.datetime.now()        
       next_call = next_call+1;

time.sleep(next_call - time.time())

timerThread = threading.Thread(target=foo) timerThread.start()

Followed by:

timerThread = threading.Thread(target=foo) timerThread.daemon = True timerThread.start()


import threading def work ():

   threading.Timer(0.25, work).start ()
   print "stackoverflow"

work ()


import threading
def do_every (interval, worker_func, iterations = 0):
 if iterations != 1:
   threading.Timer (
     interval,
     do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]
   ).start ()
 worker_func ()
def print_hw ():
 print "hello world"
def print_so ():
 print "stackoverflow"
# call print_so every second, 5 times total
do_every (1, print_so, 5)
# call print_hw two times per second, forever
do_every (0.5, print_hw)