C based timer: Difference between revisions

From SoftwareGuy
Jump to navigation Jump to search
Mark (talk | contribs)
Mark (talk | contribs)
Line 127: Line 127:
     print "stackoverflow"
     print "stackoverflow"
work ()</code>
work ()</code>
----<code>import threading</code>
----
 
<code>
<code>def do_every (interval, worker_func, iterations = 0):</code>
import threading
 
def do_every (interval, worker_func, iterations = 0):
<code>if iterations != 1:</code>
  if iterations != 1:
 
    threading.Timer (
<code>threading.Timer (</code>
      interval,
 
      do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]
<code>interval,</code>
    ).start ()
 
  worker_func ()
<code>do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]</code>
def print_hw ():
 
  print "hello world"
<code>).start ()</code> 
def print_so ():
 
  print "stackoverflow"
 
# call print_so every second, 5 times total
<code>worker_func ()</code>
do_every (1, print_so, 5)
 
# call print_hw two times per second, forever
<code>def print_hw ():</code>
do_every (0.5, print_hw)
 
</code>
<code>print "hello world"</code>
 
 
<code>def print_so ():</code>
 
<code>print "stackoverflow"</code>
 
 
 
<code># call print_so every second, 5 times total</code>
 
<code>do_every (1, print_so, 5)</code>
 
 
<code># call print_hw two times per second, forever</code>
 
<code>do_every (0.5, print_hw)</code>
----
----

Revision as of 15:22, 1 April 2024

C Timer

#include <iostream>

  1. include <unistd.h>

<using namespace std;

// Define the callback function

void callback() {

cout << "Timer callback called!" << endl;

}

// Create the timer

timer_t timer;

struct sigevent sigev;

// Set up the timer

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_notify_function = callback;

sigev.sigev_value.sival_ptr = &timer;

timer_create(CLOCK_REALTIME, &sigev, &timer);

// Start the timer

timer_settime(timer, 0, new itimerspec{1, 0}, NULL);

// Wait for the timer to expire

pause();

// Delete the timer

timer_delete(timer);

Python timer

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):

   pass

def foo():

   print time.ctime()

def signal_handler(signum, frame):

   raise ProgramKilled

class Job(threading.Thread):

   def __init__(self, interval, execute, *args, **kwargs):
       threading.Thread.__init__(self)
       self.daemon = False
       self.stopped = threading.Event()
       self.interval = interval
       self.execute = execute
       self.args = args
       self.kwargs = kwargs
       
   def stop(self):
               self.stopped.set()
               self.join()
   def run(self):
           while not self.stopped.wait(self.interval.total_seconds()):
               self.execute(*self.args, **self.kwargs)

if __name__ == "__main__":

   signal.signal(signal.SIGTERM, signal_handler)
   signal.signal(signal.SIGINT, signal_handler)
   job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
   job.start()
   
   while True:
         try:
             time.sleep(1)
         except ProgramKilled:
             print "Program killed: running cleanup code"
             job.stop()
             break

  1. output
  2. Tue Oct 16 17:47:51 2018
  3. Tue Oct 16 17:47:52 2018
  4. Tue Oct 16 17:47:53 2018

More examples

import datetime, threading, time

next_call = time.time()

def foo():

   global next_call
   print datetime.datetime.now()
   next_call = next_call+1
   threading.Timer( next_call - time.time(), foo ).start()

foo()


import datetime, threading, time

def foo():

   next_call = time.time()
   while True:
       print datetime.datetime.now()        
       next_call = next_call+1;

time.sleep(next_call - time.time())

timerThread = threading.Thread(target=foo) timerThread.start()

Followed by:

timerThread = threading.Thread(target=foo) timerThread.daemon = True timerThread.start()


import threading def work ():

   threading.Timer(0.25, work).start ()
   print "stackoverflow"

work ()


import threading def do_every (interval, worker_func, iterations = 0):

 if iterations != 1:
   threading.Timer (
     interval,
     do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]
   ).start ()
 worker_func ()

def print_hw ():

 print "hello world"

def print_so ():

 print "stackoverflow"
  1. call print_so every second, 5 times total

do_every (1, print_so, 5)

  1. call print_hw two times per second, forever

do_every (0.5, print_hw)