C based timer: Difference between revisions

From SoftwareGuy
Jump to navigation Jump to search
Mark (talk | contribs)
Mark (talk | contribs)
Line 134: Line 134:


<code>timerThread.start()</code>
<code>timerThread.start()</code>
----
----<code>import threading</code>
 
 
<code>def work ():</code>
 
<code>threading.Timer(0.25, work).start ()</code>
 
<code>print "stackoverflow"</code>
 
 
<code>work ()</code>
----<code>import threading</code>
 
<code>def do_every (interval, worker_func, iterations = 0):</code>
 
<code>if iterations != 1:</code>
 
<code>threading.Timer (</code>
 
<code>interval,</code>
 
<code>do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]</code>
 
<code>).start ()</code> 
 
 
<code>worker_func ()</code>
 
<code>def print_hw ():</code>
 
<code>print "hello world"</code>
 
 
<code>def print_so ():</code>
 
<code>print "stackoverflow"</code>
 
 
 
<code># call print_so every second, 5 times total</code>
 
<code>do_every (1, print_so, 5)</code>
 
 
<code># call print_hw two times per second, forever</code>
 
<code>do_every (0.5, print_hw)</code>
----
----

Revision as of 15:10, 1 April 2024

C Timer

#include <iostream>

  1. include <unistd.h>

<using namespace std;

// Define the callback function

void callback() {

cout << "Timer callback called!" << endl;

}

// Create the timer

timer_t timer;

struct sigevent sigev;

// Set up the timer

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_notify_function = callback;

sigev.sigev_value.sival_ptr = &timer;

timer_create(CLOCK_REALTIME, &sigev, &timer);

// Start the timer

timer_settime(timer, 0, new itimerspec{1, 0}, NULL);

// Wait for the timer to expire

pause();

// Delete the timer

timer_delete(timer);

Python timer

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):

   pass

def foo():

   print time.ctime()

def signal_handler(signum, frame):

   raise ProgramKilled

class Job(threading.Thread):

   def __init__(self, interval, execute, *args, **kwargs):
       threading.Thread.__init__(self)
       self.daemon = False
       self.stopped = threading.Event()
       self.interval = interval
       self.execute = execute
       self.args = args
       self.kwargs = kwargs
       
   def stop(self):
               self.stopped.set()
               self.join()
   def run(self):
           while not self.stopped.wait(self.interval.total_seconds()):
               self.execute(*self.args, **self.kwargs)

if __name__ == "__main__":

   signal.signal(signal.SIGTERM, signal_handler)
   signal.signal(signal.SIGINT, signal_handler)
   job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
   job.start()
   
   while True:
         try:
             time.sleep(1)
         except ProgramKilled:
             print "Program killed: running cleanup code"
             job.stop()
             break

  1. output
  2. Tue Oct 16 17:47:51 2018
  3. Tue Oct 16 17:47:52 2018
  4. Tue Oct 16 17:47:53 2018

More examples

import datetime, threading, time

next_call = time.time()

def foo():

global next_call

print datetime.datetime.now()

next_call = next_call+1

threading.Timer( next_call - time.time(), foo ).start()


foo()


import datetime, threading, time


def foo():

next_call = time.time()

while True:

print datetime.datetime.now()


next_call = next_call+1;

time.sleep(next_call - time.time())


timerThread = threading.Thread(target=foo)

timerThread.start()

Followed by:

timerThread = threading.Thread(target=foo)

timerThread.daemon = True

timerThread.start()


import threading


def work ():

threading.Timer(0.25, work).start ()

print "stackoverflow"


work ()


import threading

def do_every (interval, worker_func, iterations = 0):

if iterations != 1:

threading.Timer (

interval,

do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]

).start ()


worker_func ()

def print_hw ():

print "hello world"


def print_so ():

print "stackoverflow"


# call print_so every second, 5 times total

do_every (1, print_so, 5)


# call print_hw two times per second, forever

do_every (0.5, print_hw)