C based timer: Difference between revisions

From SoftwareGuy
Jump to navigation Jump to search
Mark (talk | contribs)
Mark (talk | contribs)
Line 104: Line 104:
</code>
</code>
----
----
<code>import datetime, threading, time</code>
<code>import datetime, threading, time


def foo():
    next_call = time.time()
    while True:
        print datetime.datetime.now()       
        next_call = next_call+1;
time.sleep(next_call - time.time())


<code>def foo():</code>
timerThread = threading.Thread(target=foo)
 
timerThread.start()</code>
<code>next_call = time.time()</code>
 
<code>while True:</code>
 
<code>print datetime.datetime.now()</code>       
 
 
<code>next_call = next_call+1;</code>
 
<code>time.sleep(next_call - time.time())</code>
 
 
<code>timerThread = threading.Thread(target=foo)</code>
 
<code>timerThread.start()</code>


Followed by:
Followed by:


<code>timerThread = threading.Thread(target=foo)</code>
<code>timerThread = threading.Thread(target=foo)
 
timerThread.daemon = True
<code>timerThread.daemon = True</code>
timerThread.start()</code>
 
----
<code>timerThread.start()</code>
<code>import threading
----<code>import threading</code>
def work ():
 
    threading.Timer(0.25, work).start ()
 
    print "stackoverflow"
<code>def work ():</code>
work ()</code>
 
<code>threading.Timer(0.25, work).start ()</code>
 
<code>print "stackoverflow"</code>
 
 
<code>work ()</code>
----<code>import threading</code>
----<code>import threading</code>



Revision as of 15:21, 1 April 2024

C Timer

#include <iostream>

  1. include <unistd.h>

<using namespace std;

// Define the callback function

void callback() {

cout << "Timer callback called!" << endl;

}

// Create the timer

timer_t timer;

struct sigevent sigev;

// Set up the timer

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_notify_function = callback;

sigev.sigev_value.sival_ptr = &timer;

timer_create(CLOCK_REALTIME, &sigev, &timer);

// Start the timer

timer_settime(timer, 0, new itimerspec{1, 0}, NULL);

// Wait for the timer to expire

pause();

// Delete the timer

timer_delete(timer);

Python timer

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):

   pass

def foo():

   print time.ctime()

def signal_handler(signum, frame):

   raise ProgramKilled

class Job(threading.Thread):

   def __init__(self, interval, execute, *args, **kwargs):
       threading.Thread.__init__(self)
       self.daemon = False
       self.stopped = threading.Event()
       self.interval = interval
       self.execute = execute
       self.args = args
       self.kwargs = kwargs
       
   def stop(self):
               self.stopped.set()
               self.join()
   def run(self):
           while not self.stopped.wait(self.interval.total_seconds()):
               self.execute(*self.args, **self.kwargs)

if __name__ == "__main__":

   signal.signal(signal.SIGTERM, signal_handler)
   signal.signal(signal.SIGINT, signal_handler)
   job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
   job.start()
   
   while True:
         try:
             time.sleep(1)
         except ProgramKilled:
             print "Program killed: running cleanup code"
             job.stop()
             break

  1. output
  2. Tue Oct 16 17:47:51 2018
  3. Tue Oct 16 17:47:52 2018
  4. Tue Oct 16 17:47:53 2018

More examples

import datetime, threading, time

next_call = time.time()

def foo():

   global next_call
   print datetime.datetime.now()
   next_call = next_call+1
   threading.Timer( next_call - time.time(), foo ).start()

foo()


import datetime, threading, time

def foo():

   next_call = time.time()
   while True:
       print datetime.datetime.now()        
       next_call = next_call+1;

time.sleep(next_call - time.time())

timerThread = threading.Thread(target=foo) timerThread.start()

Followed by:

timerThread = threading.Thread(target=foo) timerThread.daemon = True timerThread.start()


import threading def work ():

   threading.Timer(0.25, work).start ()
   print "stackoverflow"

work ()


import threading

def do_every (interval, worker_func, iterations = 0):

if iterations != 1:

threading.Timer (

interval,

do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]

).start ()


worker_func ()

def print_hw ():

print "hello world"


def print_so ():

print "stackoverflow"


# call print_so every second, 5 times total

do_every (1, print_so, 5)


# call print_hw two times per second, forever

do_every (0.5, print_hw)