C based timer: Difference between revisions
Jump to navigation
Jump to search
No edit summary |
|||
Line 42: | Line 42: | ||
= Python timer = | = Python timer = | ||
<code> | <code>import threading, time, signal | ||
import threading, time, signal | |||
from datetime import timedelta | from datetime import timedelta | ||
Line 51: | Line 50: | ||
class ProgramKilled(Exception): | class ProgramKilled(Exception): | ||
pass | pass | ||
def foo(): | def foo(): | ||
print time.ctime() | print time.ctime() | ||
def signal_handler(signum, frame): | def signal_handler(signum, frame): | ||
raise ProgramKilled | raise ProgramKilled | ||
class Job(threading.Thread): | class Job(threading.Thread): | ||
def __init__(self, interval, execute, *args, **kwargs): | def __init__(self, interval, execute, *args, **kwargs): | ||
Line 74: | Line 70: | ||
while not self.stopped.wait(self.interval.total_seconds()): | while not self.stopped.wait(self.interval.total_seconds()): | ||
self.execute(*self.args, **self.kwargs) | self.execute(*self.args, **self.kwargs) | ||
if __name__ == "__main__": | if __name__ == "__main__": | ||
signal.signal(signal.SIGTERM, signal_handler) | signal.signal(signal.SIGTERM, signal_handler) | ||
Line 88: | Line 83: | ||
job.stop() | job.stop() | ||
break | break | ||
</code> | </code> | ||
#output | #output |
Revision as of 14:34, 28 March 2024
C Timer
#include <iostream>
- include <unistd.h>
<using namespace std;
// Define the callback function
void callback() {
cout << "Timer callback called!" << endl;
}
// Create the timer
timer_t timer;
struct sigevent sigev;
// Set up the timer
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_notify_function = callback;
sigev.sigev_value.sival_ptr = &timer;
timer_create(CLOCK_REALTIME, &sigev, &timer);
// Start the timer
timer_settime(timer, 0, new itimerspec{1, 0}, NULL);
// Wait for the timer to expire
pause();
// Delete the timer
timer_delete(timer);
Python timer
import threading, time, signal
from datetime import timedelta
WAIT_TIME_SECONDS = 1
class ProgramKilled(Exception):
pass
def foo():
print time.ctime()
def signal_handler(signum, frame):
raise ProgramKilled
class Job(threading.Thread):
def __init__(self, interval, execute, *args, **kwargs):
threading.Thread.__init__(self)
self.daemon = False
self.stopped = threading.Event()
self.interval = interval
self.execute = execute
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)
if __name__ == "__main__":
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
job.start()
while True:
try:
time.sleep(1)
except ProgramKilled:
print "Program killed: running cleanup code"
job.stop()
break
- output
- Tue Oct 16 17:47:51 2018
- Tue Oct 16 17:47:52 2018
- Tue Oct 16 17:47:53 2018
- ^CProgram killed: running cleanup code