C based timer: Difference between revisions

From SoftwareGuy
Jump to navigation Jump to search
Mark (talk | contribs)
Mark (talk | contribs)
Line 42: Line 42:


= Python timer =
= Python timer =
<code>import threading, time, signal
<code>
 
import threading, time, signal
from datetime import timedelta
 
from datetime import timedelta
WAIT_TIME_SECONDS = 1
 
WAIT_TIME_SECONDS = 1
class ProgramKilled(Exception):
    pass
class ProgramKilled(Exception):
def foo():
    pass
    print time.ctime()
def foo():
def signal_handler(signum, frame):
    print time.ctime()
    raise ProgramKilled
def signal_handler(signum, frame):
class Job(threading.Thread):
    raise ProgramKilled
    def __init__(self, interval, execute, *args, **kwargs):
class Job(threading.Thread):
        threading.Thread.__init__(self)
    def __init__(self, interval, execute, *args, **kwargs):
        self.daemon = False
        threading.Thread.__init__(self)
        self.stopped = threading.Event()
        self.daemon = False
        self.interval = interval
        self.stopped = threading.Event()
        self.execute = execute
        self.interval = interval
        self.args = args
        self.execute = execute
        self.kwargs = kwargs
        self.args = args
       
        self.kwargs = kwargs
    def stop(self):
       
                self.stopped.set()
    def stop(self):
                self.join()
        self.stopped.set()
        self.join()
     def run(self):
     def run(self):
            while not self.stopped.wait(self.interval.total_seconds()):
        while not self.stopped.wait(self.interval.total_seconds()):
                self.execute(*self.args, **self.kwargs)
            self.execute(*self.args, **self.kwargs)
if __name__ == "__main__":
if __name__ == "__main__":
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
    job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
    job.start()
    job.start()
      
      
    while True:
    while True:
          try:
          try:
              time.sleep(1)
              time.sleep(1)
          except ProgramKilled:
          except ProgramKilled:
              print "Program killed: running cleanup code"
              print "Program killed: running cleanup code"
              job.stop()
              job.stop()
              break
              break
</code>
</code>
#output
#output
Line 91: Line 92:
==== More examples ====
==== More examples ====
<code>
<code>
import datetime, threading, time
import datetime, threading, time
 
next_call = time.time()
next_call = time.time()
 
def foo():
def foo():
    global next_call
    global next_call
    print datetime.datetime.now()
    print datetime.datetime.now()
    next_call = next_call+1
    next_call = next_call+1
    threading.Timer( next_call - time.time(), foo ).start()
    threading.Timer( next_call - time.time(), foo ).start()
 
foo()
foo()
</code>
</code>
----
----
<code>import datetime, threading, time
<code>
 
import datetime, threading, time
def foo():
    next_call = time.time()
def foo():
    while True:
    next_call = time.time()
        print datetime.datetime.now()         
    while True:
        next_call = next_call+1;
        print datetime.datetime.now()         
time.sleep(next_call - time.time())
        next_call = next_call+1;
 
time.sleep(next_call - time.time())
timerThread = threading.Thread(target=foo)
timerThread.start()</code>
timerThread = threading.Thread(target=foo)
timerThread.start()</code>


Followed by:
Followed by:


<code>timerThread = threading.Thread(target=foo)
<code>
timerThread.daemon = True
timerThread = threading.Thread(target=foo)
timerThread.start()</code>
timerThread.daemon = True
timerThread.start()</code>
----
----
<code>import threading
<code>
def work ():
import threading
    threading.Timer(0.25, work).start ()
def work ():
    print "stackoverflow"
    threading.Timer(0.25, work).start ()
work ()</code>
    print "stackoverflow"
work ()
</code>
----
----
<code>
<code>

Revision as of 15:29, 1 April 2024

C Timer

#include <iostream>

  1. include <unistd.h>

<using namespace std;

// Define the callback function

void callback() {

cout << "Timer callback called!" << endl;

}

// Create the timer

timer_t timer;

struct sigevent sigev;

// Set up the timer

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_notify_function = callback;

sigev.sigev_value.sival_ptr = &timer;

timer_create(CLOCK_REALTIME, &sigev, &timer);

// Start the timer

timer_settime(timer, 0, new itimerspec{1, 0}, NULL);

// Wait for the timer to expire

pause();

// Delete the timer

timer_delete(timer);

Python timer

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):
    pass
def foo():
    print time.ctime()
def signal_handler(signum, frame):
    raise ProgramKilled
class Job(threading.Thread):
    def __init__(self, interval, execute, *args, **kwargs):
        threading.Thread.__init__(self)
        self.daemon = False
        self.stopped = threading.Event()
        self.interval = interval
        self.execute = execute
        self.args = args
        self.kwargs = kwargs
        
    def stop(self):
        self.stopped.set()
        self.join()
   def run(self):
        while not self.stopped.wait(self.interval.total_seconds()):
            self.execute(*self.args, **self.kwargs)
if __name__ == "__main__":
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
    job.start()
   
    while True:
          try:
              time.sleep(1)
          except ProgramKilled:
              print "Program killed: running cleanup code"
              job.stop()
              break

  1. output
  2. Tue Oct 16 17:47:51 2018
  3. Tue Oct 16 17:47:52 2018
  4. Tue Oct 16 17:47:53 2018

More examples

import datetime, threading, time

next_call = time.time()

def foo():
    global next_call
    print datetime.datetime.now()
    next_call = next_call+1
    threading.Timer( next_call - time.time(), foo ).start()

foo()


import datetime, threading, time

def foo():
    next_call = time.time()
    while True:
        print datetime.datetime.now()        
        next_call = next_call+1;
time.sleep(next_call - time.time())

timerThread = threading.Thread(target=foo)
timerThread.start()

Followed by:

timerThread = threading.Thread(target=foo)
timerThread.daemon = True
timerThread.start()

import threading
def work ():
    threading.Timer(0.25, work).start ()
    print "stackoverflow"
work ()


import threading
def do_every (interval, worker_func, iterations = 0):
 if iterations != 1:
   threading.Timer (
     interval,
     do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]
   ).start ()
 worker_func ()
def print_hw ():
 print "hello world"
def print_so ():
 print "stackoverflow"
# call print_so every second, 5 times total
do_every (1, print_so, 5)
# call print_hw two times per second, forever
do_every (0.5, print_hw)