C based timer: Difference between revisions
Jump to navigation
Jump to search
(8 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
= C Timer = | = C Timer = | ||
<code>#include <iostream> | <code> | ||
#include <unistd.h> | #include <iostream> | ||
#include <unistd.h> | |||
using namespace std; | |||
// Define the callback function | |||
// Define the callback function | |||
void callback() { | |||
void callback() { | |||
cout << "Timer callback called!" << endl; | cout << "Timer callback called!" << endl; | ||
} | |||
// Create the timer | |||
timer_t timer; | |||
struct sigevent sigev; | |||
// Set up the timer | |||
sigev.sigev_notify = SIGEV_THREAD; | |||
sigev.sigev_notify_function = callback; | |||
sigev.sigev_value.sival_ptr = &timer; | |||
timer_create(CLOCK_REALTIME, &sigev, &timer); | |||
// Start the timer | |||
timer_settime(timer, 0, new itimerspec{1, 0}, NULL); | |||
// Wait for the timer to expire | |||
pause(); | |||
// Delete the timer | |||
timer_delete(timer); | |||
</code> | |||
= Python timer = | = Python timer = | ||
<code>import threading, time, signal | <code> | ||
import threading, time, signal | |||
from datetime import timedelta | |||
from datetime import timedelta | |||
WAIT_TIME_SECONDS = 1 | |||
WAIT_TIME_SECONDS = 1 | |||
class ProgramKilled(Exception): | |||
class ProgramKilled(Exception): | |||
def foo(): | pass | ||
def foo(): | |||
def signal_handler(signum, frame): | print time.ctime() | ||
def signal_handler(signum, frame): | |||
class Job(threading.Thread): | raise ProgramKilled | ||
class Job(threading.Thread): | |||
def __init__(self, interval, execute, *args, **kwargs): | |||
threading.Thread.__init__(self) | |||
self.daemon = False | |||
self.stopped = threading.Event() | |||
self.interval = interval | |||
self.execute = execute | |||
self.args = args | |||
self.kwargs = kwargs | |||
def stop(self): | |||
self.stopped.set() | |||
self.join() | |||
def run(self): | def run(self): | ||
while not self.stopped.wait(self.interval.total_seconds()): | |||
self.execute(*self.args, **self.kwargs) | |||
if __name__ == "__main__": | if __name__ == "__main__": | ||
signal.signal(signal.SIGTERM, signal_handler) | |||
signal.signal(signal.SIGINT, signal_handler) | |||
job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo) | |||
job.start() | |||
while True: | |||
try: | |||
time.sleep(1) | |||
except ProgramKilled: | |||
print "Program killed: running cleanup code" | |||
job.stop() | |||
break | |||
</code> | </code> | ||
#output | #output | ||
Line 90: | Line 80: | ||
==== More examples ==== | ==== More examples ==== | ||
<code>import datetime, threading, time | <code> | ||
import datetime, threading, time | |||
next_call = time.time() | |||
next_call = time.time() | |||
def foo(): | |||
def foo(): | |||
global next_call | global next_call | ||
print datetime.datetime.now() | |||
print datetime.datetime.now() | next_call = next_call+1 | ||
threading.Timer( next_call - time.time(), foo ).start() | |||
next_call = next_call+1 | |||
foo() | |||
threading.Timer( next_call - time.time(), foo ).start() | </code> | ||
foo() | |||
---- | ---- | ||
<code>import datetime, threading, time | <code> | ||
import datetime, threading, time | |||
def foo(): | |||
next_call = time.time() | |||
while True: | |||
print datetime.datetime.now() | |||
next_call = next_call+1; | |||
time.sleep(next_call - time.time()) | |||
timerThread = threading.Thread(target=foo) | |||
timerThread.start()</code> | |||
Followed by: | Followed by: | ||
<code>timerThread = threading.Thread(target=foo) | <code> | ||
timerThread = threading.Thread(target=foo) | |||
timerThread.daemon = True | |||
timerThread.start()</code> | |||
---- | |||
----<code>import threading | <code> | ||
import threading | |||
def work (): | |||
threading.Timer(0.25, work).start () | |||
print "stackoverflow" | |||
work () | |||
</code> | |||
---- | |||
<code> | |||
import threading | |||
def do_every (interval, worker_func, iterations = 0): | |||
----<code>import threading | if iterations != 1: | ||
threading.Timer ( | |||
interval, | |||
do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1] | |||
).start () | |||
worker_func () | |||
def print_hw (): | |||
print "hello world" | |||
def print_so (): | |||
print "stackoverflow" | |||
# call print_so every second, 5 times total | |||
do_every (1, print_so, 5) | |||
# call print_hw two times per second, forever | |||
do_every (0.5, print_hw) | |||
</code> | |||
---- | ---- |
Latest revision as of 15:34, 1 April 2024
C Timer[edit | edit source]
#include <iostream>
#include <unistd.h>
using namespace std;
// Define the callback function
void callback() {
cout << "Timer callback called!" << endl;
}
// Create the timer
timer_t timer;
struct sigevent sigev;
// Set up the timer
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_notify_function = callback;
sigev.sigev_value.sival_ptr = &timer;
timer_create(CLOCK_REALTIME, &sigev, &timer);
// Start the timer
timer_settime(timer, 0, new itimerspec{1, 0}, NULL);
// Wait for the timer to expire
pause();
// Delete the timer
timer_delete(timer);
Python timer[edit | edit source]
import threading, time, signal
from datetime import timedelta
WAIT_TIME_SECONDS = 1
class ProgramKilled(Exception):
pass
def foo():
print time.ctime()
def signal_handler(signum, frame):
raise ProgramKilled
class Job(threading.Thread):
def __init__(self, interval, execute, *args, **kwargs):
threading.Thread.__init__(self)
self.daemon = False
self.stopped = threading.Event()
self.interval = interval
self.execute = execute
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)
if __name__ == "__main__":
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
job.start()
while True:
try:
time.sleep(1)
except ProgramKilled:
print "Program killed: running cleanup code"
job.stop()
break
- output
- Tue Oct 16 17:47:51 2018
- Tue Oct 16 17:47:52 2018
- Tue Oct 16 17:47:53 2018
More examples[edit | edit source]
import datetime, threading, time
next_call = time.time()
def foo():
global next_call
print datetime.datetime.now()
next_call = next_call+1
threading.Timer( next_call - time.time(), foo ).start()
foo()
import datetime, threading, time
def foo():
next_call = time.time()
while True:
print datetime.datetime.now()
next_call = next_call+1;
time.sleep(next_call - time.time())
timerThread = threading.Thread(target=foo)
timerThread.start()
Followed by:
timerThread = threading.Thread(target=foo)
timerThread.daemon = True
timerThread.start()
import threading
def work ():
threading.Timer(0.25, work).start ()
print "stackoverflow"
work ()
import threading
def do_every (interval, worker_func, iterations = 0):
if iterations != 1:
threading.Timer (
interval,
do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]
).start ()
worker_func ()
def print_hw ():
print "hello world"
def print_so ():
print "stackoverflow"
# call print_so every second, 5 times total
do_every (1, print_so, 5)
# call print_hw two times per second, forever
do_every (0.5, print_hw)