~ chicken-core (chicken-5) f6536b089aedccbd328f48e1bd88705d6269a830
commit f6536b089aedccbd328f48e1bd88705d6269a830 Author: felix <felix@call-with-current-continuation.org> AuthorDate: Thu Jul 6 21:22:18 2023 +0200 Commit: Peter Bex <peter@more-magic.net> CommitDate: Mon Jul 17 15:56:05 2023 +0200 add internal event-queue mechanism and hooks for threading API, expose accessors to internal task lists. Signed-off-by: Peter Bex <peter@more-magic.net> diff --git a/library.scm b/library.scm index ef7cefea..9b5a8ff7 100644 --- a/library.scm +++ b/library.scm @@ -44,6 +44,7 @@ ##sys#default-read-info-hook ##sys#infix-list-hook ##sys#sharp-number-hook ##sys#user-print-hook ##sys#user-interrupt-hook ##sys#windows-platform + ##sys#resume-thread-on-event ##sys#suspend-thread-on-event ##sys#schedule ##sys#features) (foreign-declare #<<EOF #include <errno.h> @@ -152,7 +153,13 @@ signal_debug_event(C_word mode, C_word msg, C_word args) C_debugger(&cell, 3, av); return C_SCHEME_UNDEFINED; } - + +static C_word C_i_sleep_until_interrupt(C_word secs) +{ + while(C_i_process_sleep(secs) == C_fix(-1) && errno == EINTR); + return C_SCHEME_UNDEFINED; +} + #ifdef NO_DLOAD2 # define HAVE_DLOAD 0 #else @@ -5739,6 +5746,68 @@ EOF (define (##sys#kill-other-threads thunk) (thunk)) ; does nothing, will be modified by scheduler.scm +;; these two procedures should redefined in thread APIs (e.g. srfi-18): +(define (##sys#resume-thread-on-event t) #f) + +(define (##sys#suspend-thread-on-event t) + ;; wait until signal handler fires. If we are only waiting for a finalizer, + ;; then this will wait forever: + (##sys#sleep-until-interrupt)) + +(define (##sys#sleep-until-interrupt) + (##core#inline "C_i_sleep_until_interrupt" 100) + (##sys#dispatch-interrupt (lambda _ #f))) + + +;;; event queues (for signals and finalizers) + +(define (##sys#make-event-queue) + (##sys#make-structure 'event-queue + '() ; head + '() ; tail + #f)) ; suspended thread + +(define (##sys#add-event-to-queue! q e) + (let ((h (##sys#slot q 1)) + (t (##sys#slot q 2)) + (item (cons e '()))) + (if (null? h) + (##sys#setslot q 1 item) + (##sys#setslot t 1 item)) + (##sys#setslot q 2 item) + (let ((st (##sys#slot q 3))) ; thread suspended? + (when st + (##sys#setslot q 3 #f) + (##sys#resume-thread-on-event st))))) + +(define (##sys#get-next-event q) + (let ((st (##sys#slot q 3))) + (and (not st) + (let ((h (##sys#slot q 1))) + (and (not (null? h)) + (let ((x (##sys#slot h 0)) + (n (##sys#slot h 1))) + (##sys#setslot q 1 n) + (when (null? n) (##sys#setslot q 2 '())) + x)))))) + +(define (##sys#wait-for-next-event q) + (let ((st (##sys#slot q 3))) + (when st + (##sys#signal-hook #:runtime-error #f "event queue blocked" q)) + (let again () + (let ((h (##sys#slot q 1))) + (cond ((null? h) + (##sys#setslot q 3 ##sys#current-thread) + (##sys#suspend-thread-on-event ##sys#current-thread) + (again)) + (else + (let ((x (##sys#slot h 0)) + (n (##sys#slot h 1))) + (##sys#setslot q 1 n) + (when (null? n) (##sys#setslot q 2 '())) + x))))))) + ;;; Sleeping: diff --git a/scheduler.scm b/scheduler.scm index cbada6fb..759db957 100644 --- a/scheduler.scm +++ b/scheduler.scm @@ -29,7 +29,7 @@ (unit scheduler) (uses extras) ; for sprintf (disable-interrupts) - (hide ready-queue-head ready-queue-tail ##sys#timeout-list + (hide ready-queue-head ready-queue-tail timeout-list fd-list ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer ##sys#unblock-threads-for-i/o ;; This isn't hidden ATM to allow set!ing it as a hook/workaround @@ -105,7 +105,7 @@ static int C_fdset_nfds; static struct pollfd *C_fdset_set = NULL; inline static int C_fd_ready(int fd, int pos, int what) { - assert(fd == C_fdset_set[pos].fd); /* Must match position in ##sys#fd-list! */ + assert(fd == C_fdset_set[pos].fd); /* Must match position in fd-list! */ return(C_fdset_set[pos].revents & what); } @@ -124,7 +124,7 @@ inline static void C_prepare_fdset(int length) { C_fdset_nfds = 0; } -/* This *must* be called in order, so position will match ##sys#fd-list */ +/* This *must* be called in order, so position will match fd-list */ inline static void C_fdset_add(int fd, int input, int output) { C_fdset_set[C_fdset_nfds].events = ((input ? POLLIN : 0) | (output ? POLLOUT : 0)); C_fdset_set[C_fdset_nfds++].fd = fd; @@ -184,11 +184,11 @@ EOF (##sys#add-to-ready-queue ct) ) (let loop1 () ;; Unblock threads waiting for timeout: - (unless (null? ##sys#timeout-list) + (unless (null? timeout-list) (let ((now (##core#inline_allocate ("C_a_i_current_process_milliseconds" 7) #f))) - (let loop ((lst ##sys#timeout-list)) + (let loop ((lst timeout-list)) (if (null? lst) - (set! ##sys#timeout-list '()) + (set! timeout-list '()) (let* ([tmo1 (caar lst)] ; timeout of thread on list [tto (cdar lst)] ; thread on list [tmo2 (##sys#slot tto 4)] ) ; timeout value stored in thread @@ -201,14 +201,14 @@ EOF (##sys#thread-basic-unblock! tto) (loop (cdr lst)) ) (begin - (set! ##sys#timeout-list lst) + (set! timeout-list lst) ;; If there are no threads blocking on a select call (fd-list) ;; but there are threads in the timeout list then sleep for ;; the number of milliseconds of next thread to wake up. (when (and (null? ready-queue-head) - (null? ##sys#fd-list) - (pair? ##sys#timeout-list)) - (let* ((tmo1 (caar ##sys#timeout-list)) + (null? fd-list) + (pair? timeout-list)) + (let* ((tmo1 (caar timeout-list)) (tmo1 (inexact->exact (round tmo1)))) (set! eintr (and (not (##core#inline @@ -222,13 +222,13 @@ EOF (begin (##sys#update-thread-state-buffer ct) (##sys#force-primordial)) ; force it to handle user-interrupt - (unless (null? ##sys#fd-list) + (unless (null? fd-list) (##sys#unblock-threads-for-i/o) ) ) ;; Fetch and activate next ready thread: (let loop2 () (let ([nt (remove-from-ready-queue)]) (cond [(not nt) - (if (and (null? ##sys#timeout-list) (null? ##sys#fd-list)) + (if (and (null? timeout-list) (null? fd-list)) (panic "deadlock") (loop1) ) ] [(eq? (##sys#slot nt 3) 'ready) (switch nt)] @@ -287,10 +287,12 @@ EOF (##sys#schedule) ) ) ; expected not to return! (oldhook reason state) ) ) ) -(define ##sys#timeout-list '()) +(define timeout-list '()) + +(define (##sys#timeout-queue) timeout-list) (define (##sys#remove-from-timeout-list t) - (let loop ((l ##sys#timeout-list) (prev #f)) + (let loop ((l timeout-list) (prev #f)) (if (null? l) l (let ((h (##sys#slot l 0)) @@ -298,18 +300,18 @@ EOF (if (eq? (##sys#slot h 1) t) (if prev (set-cdr! prev r) - (set! ##sys#timeout-list r)) + (set! timeout-list r)) (loop r l)))))) (define (##sys#thread-block-for-timeout! t tm) (dbg t " blocks for timeout " tm) (when (> tm 0) ;; This should really use a balanced tree: - (let loop ([tl ##sys#timeout-list] [prev #f]) + (let loop ([tl timeout-list] [prev #f]) (if (or (null? tl) (< tm (caar tl))) (if prev (set-cdr! prev (cons (cons tm t) tl)) - (set! ##sys#timeout-list (cons (cons tm t) tl)) ) + (set! timeout-list (cons (cons tm t) tl)) ) (loop (cdr tl) tl) ) ) (##sys#setslot t 3 'blocked) (##sys#setislot t 13 #f) @@ -395,11 +397,13 @@ EOF ;;; `select()/poll()'-based blocking: -(define ##sys#fd-list '()) ; ((FD1 THREAD1 ...) ...) +(define fd-list '()) ; ((FD1 THREAD1 ...) ...) + +(define (##sys#fd-queue) fd-list) (define (create-fdset) - ((foreign-lambda void "C_prepare_fdset" int) (##sys#length ##sys#fd-list)) - (let loop ((lst ##sys#fd-list)) + ((foreign-lambda void "C_prepare_fdset" int) (##sys#length fd-list)) + (let loop ((lst fd-list)) (unless (null? lst) (let ((fd (caar lst)) (input #f) @@ -408,7 +412,7 @@ EOF (lambda (t) (let ((p (##sys#slot t 11))) ;; XXX: This should never be false, because otherwise the - ;; thread is not supposed to be on ##sys#fd-list! + ;; thread is not supposed to be on fd-list! (when (pair? p) ; (FD . RWFLAGS)? (can also be mutex or thread) (let ((i/o (cdr p))) (case i/o @@ -441,9 +445,9 @@ EOF (dbg t " blocks for I/O " fd " in mode " i/o) #;(unless (memq i/o '(#:all #:input #:output)) (panic (sprintf "##sys#thread-block-for-i/o!: invalid i/o mode: ~S" i/o))) - (let loop ([lst ##sys#fd-list]) + (let loop ([lst fd-list]) (if (null? lst) - (set! ##sys#fd-list (cons (list fd t) ##sys#fd-list)) + (set! fd-list (cons (list fd t) fd-list)) (let ([a (car lst)]) (if (fx= fd (car a)) (##sys#setslot a 1 (cons t (cdr a))) @@ -453,12 +457,12 @@ EOF (##sys#setslot t 11 (cons fd i/o)) ) (define (##sys#unblock-threads-for-i/o) - (dbg "fd-list: " ##sys#fd-list) + (dbg "fd-list: " fd-list) (create-fdset) - (let* ((to? (pair? ##sys#timeout-list)) + (let* ((to? (pair? timeout-list)) (rq? (pair? ready-queue-head)) (tmo (if (and to? (not rq?)) ; no thread was unblocked by timeout, so wait - (let* ((tmo1 (caar ##sys#timeout-list)) + (let* ((tmo1 (caar timeout-list)) (tmo1 (inexact->exact (round tmo1))) (now (##core#inline_allocate ("C_a_i_current_process_milliseconds" 7) #f))) (max 0 (- tmo1 now)) ) @@ -471,13 +475,13 @@ EOF (dbg "select(2)/poll(2) returned with result -1" ) (##sys#force-primordial)] [(fx> n 0) - (set! ##sys#fd-list - (let loop ((n n) (pos 0) (lst ##sys#fd-list)) + (set! fd-list + (let loop ((n n) (pos 0) (lst fd-list)) (if (or (zero? n) (null? lst)) lst (let* ((a (car lst)) (fd (car a)) - ;; pos *must* match position of fd in ##sys#fd-list + ;; pos *must* match position of fd in fd-list ;; This is checked in C_fd_ready with assert() (inf (##core#inline "C_fd_input_ready" fd pos)) (outf (##core#inline "C_fd_output_ready" fd pos))) @@ -518,8 +522,8 @@ EOF (define (##sys#clear-i/o-state-for-thread! t) (when (pair? (##sys#slot t 11)) (let ((fd (car (##sys#slot t 11)))) - (set! ##sys#fd-list - (let loop ((lst ##sys#fd-list)) + (set! fd-list + (let loop ((lst fd-list)) (if (null? lst) '() (let* ((a (car lst)) @@ -544,14 +548,14 @@ EOF (let loop ((l ready-queue-head) (i init)) (if (pair? l) (loop (cdr l) (cns 'ready #f (car l) i)) - (let loop ((l ##sys#fd-list) (i i)) + (let loop ((l fd-list) (i i)) (if (pair? l) (loop (cdr l) (let ((fd (caar l))) (let loop ((l (cdar l))) (if (null? l) i (cns 'i/o fd (car l) (loop (cdr l))))))) - (let loop ((l ##sys#timeout-list) (i i)) + (let loop ((l timeout-list) (i i)) (if (pair? l) (loop (cdr l) (cns 'timeout (caar l) (cdar l) i)) i))))))) @@ -560,11 +564,11 @@ EOF ;;; Remove all waiting threads from the relevant queues with the exception of the current thread: (define (##sys#fetch-and-clear-threads) - (let ([all (vector ready-queue-head ready-queue-tail ##sys#fd-list ##sys#timeout-list)]) + (let ([all (vector ready-queue-head ready-queue-tail fd-list timeout-list)]) (set! ready-queue-head '()) (set! ready-queue-tail '()) - (set! ##sys#fd-list '()) - (set! ##sys#timeout-list '()) + (set! fd-list '()) + (set! timeout-list '()) all) ) @@ -573,8 +577,8 @@ EOF (define (##sys#restore-threads vec) (set! ready-queue-head (##sys#slot vec 0)) (set! ready-queue-tail (##sys#slot vec 1)) - (set! ##sys#fd-list (##sys#slot vec 2)) - (set! ##sys#timeout-list (##sys#slot vec 3)) ) + (set! fd-list (##sys#slot vec 2)) + (set! timeout-list (##sys#slot vec 3)) ) ;;; Unblock thread cleanly: @@ -623,9 +627,9 @@ EOF (set! ready-queue-head (list primordial)) (set! ready-queue-tail ready-queue-head) (suspend primordial) ; clear block-obj. and recipients - (for-each (lambda (a) (suspend (cdr a))) ##sys#timeout-list) - (for-each (lambda (a) (for-each suspend (cdr a))) ##sys#fd-list) - (set! ##sys#timeout-list '()) - (set! ##sys#fd-list '()) + (for-each (lambda (a) (suspend (cdr a))) timeout-list) + (for-each (lambda (a) (for-each suspend (cdr a))) fd-list) + (set! timeout-list '()) + (set! fd-list '()) (thunk) (exit)))))Trap