~ chicken-core (chicken-5) /scheduler.scm
Trap1; scheduler.scm - Basic scheduler for multithreading
2;
3; Copyright (c) 2008-2022, The CHICKEN Team
4; Copyright (c) 2000-2007, Felix L. Winkelmann
5; All rights reserved.
6;
7; Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
8; conditions are met:
9;
10; Redistributions of source code must retain the above copyright notice, this list of conditions and the following
11; disclaimer.
12; Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
13; disclaimer in the documentation and/or other materials provided with the distribution.
14; Neither the name of the author nor the names of its contributors may be used to endorse or promote
15; products derived from this software without specific prior written permission.
16;
17; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS
18; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
19; AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
20; CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22; SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23; THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24; OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25; POSSIBILITY OF SUCH DAMAGE.
26
27
28(declare
29 (unit scheduler)
30 (uses extras) ; for sprintf
31 (disable-interrupts)
32 (hide ready-queue-head ready-queue-tail timeout-list fd-list
33 ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer
34 ##sys#unblock-threads-for-i/o
35 ;; This isn't hidden ATM to allow set!ing it as a hook/workaround
36 ; ##sys#force-primordial
37 remove-from-ready-queue fdset-test create-fdset stderr delq
38 ##sys#clear-i/o-state-for-thread! ##sys#abandon-mutexes)
39 (not inline chicken.base#sleep-hook ##sys#interrupt-hook ##sys#force-primordial)
40 (unsafe)
41 (foreign-declare #<<EOF
42#ifdef HAVE_ERRNO_H
43# include <errno.h>
44# define C_signal_interrupted_p C_mk_bool(errno == EINTR)
45#else
46# define C_signal_interrupted_p C_SCHEME_FALSE
47#endif
48
49#ifdef _WIN32
50/* TODO: Winsock select() only works for sockets */
51# include <winsock2.h>
52/* Beware: winsock2.h must come BEFORE windows.h */
53# define C_msleep(n) (Sleep((DWORD)C_num_to_uint64(n)), C_SCHEME_TRUE)
54#else
55# include <sys/time.h>
56static C_word C_msleep(C_word ms);
57C_word C_msleep(C_word ms) {
58#ifdef __CYGWIN__
59 if(usleep((useconds_t)C_num_to_uint64(ms) * 1000) == -1) return C_SCHEME_FALSE;
60#else
61 struct timespec ts;
62 C_word ab[C_SIZEOF_FIX_BIGNUM], *a = ab,
63 sec = C_s_a_u_i_integer_quotient(&a, 2, ms, C_fix(1000)),
64 msec = C_s_a_u_i_integer_remainder(&a, 2, ms, C_fix(1000));
65 ts.tv_sec = (time_t)C_num_to_uint64(sec);
66 ts.tv_nsec = (long)C_unfix(msec) * 1000000;
67
68 if(nanosleep(&ts, NULL) == -1) return C_SCHEME_FALSE;
69#endif
70 return C_SCHEME_TRUE;
71}
72#endif
73
74#ifdef NO_POSIX_POLL
75
76/* Shouldn't we include <sys/select.h> here? */
77static fd_set C_fdset_input, C_fdset_output;
78
79#define C_fd_input_ready(fd,pos) C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_input))
80#define C_fd_output_ready(fd,pos) C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_output))
81
82inline static int C_ready_fds_timeout(int to, unsigned int tm) {
83 struct timeval timeout;
84 timeout.tv_sec = tm / 1000;
85 timeout.tv_usec = fmod(tm, 1000) * 1000;
86 /* we use FD_SETSIZE, but really should use max fd */
87 return select(FD_SETSIZE, &C_fdset_input, &C_fdset_output, NULL, to ? &timeout : NULL);
88}
89
90inline static void C_prepare_fdset(int length) {
91 FD_ZERO(&C_fdset_input);
92 FD_ZERO(&C_fdset_output);
93}
94
95inline static void C_fdset_add(int fd, int input, int output) {
96 if (input) FD_SET(fd, &C_fdset_input);
97 if (output) FD_SET(fd, &C_fdset_output);
98}
99
100#else
101# include <poll.h>
102# include <assert.h>
103
104static int C_fdset_nfds;
105static struct pollfd *C_fdset_set = NULL;
106
107inline static int C_fd_ready(int fd, int pos, int what) {
108 assert(fd == C_fdset_set[pos].fd); /* Must match position in fd-list! */
109 return(C_fdset_set[pos].revents & what);
110}
111
112#define C_fd_input_ready(fd,pos) C_mk_bool(C_fd_ready(C_unfix(fd), C_unfix(pos),POLLIN|POLLERR|POLLHUP|POLLNVAL))
113#define C_fd_output_ready(fd,pos) C_mk_bool(C_fd_ready(C_unfix(fd), C_unfix(pos),POLLOUT|POLLERR|POLLHUP|POLLNVAL))
114
115inline static int C_ready_fds_timeout(int to, unsigned int tm) {
116 return poll(C_fdset_set, C_fdset_nfds, to ? tm : -1);
117}
118
119inline static void C_prepare_fdset(int length) {
120 /* TODO: Only realloc when needed? */
121 C_fdset_set = realloc(C_fdset_set, sizeof(struct pollfd) * length);
122 if (C_fdset_set == NULL)
123 C_halt(C_SCHEME_FALSE); /* Ugly: no message */
124 C_fdset_nfds = 0;
125}
126
127/* This *must* be called in order, so position will match fd-list */
128inline static void C_fdset_add(int fd, int input, int output) {
129 C_fdset_set[C_fdset_nfds].events = ((input ? POLLIN : 0) | (output ? POLLOUT : 0));
130 C_fdset_set[C_fdset_nfds++].fd = fd;
131}
132#endif
133EOF
134) )
135
136(import scheme chicken.base chicken.fixnum chicken.format chicken.condition)
137
138(include "common-declarations.scm")
139
140#;(begin
141 (define stderr ##sys#standard-error) ; use default stderr port
142 (define (dbg . args)
143 (parameterize ((##sys#print-length-limit #f))
144 (for-each
145 (lambda (x)
146 (display x stderr))
147 args)
148 (newline stderr))))
149
150(define-syntax dbg
151 (syntax-rules ()
152 ((_ . _) #f)))
153
154(define-syntax panic
155 (syntax-rules ()
156 ((_ msg) (##core#inline "C_halt" msg))))
157
158(define (delq x lst)
159 (let loop ([lst lst])
160 (cond ((null? lst) lst)
161 ((eq? x (##sys#slot lst 0)) (##sys#slot lst 1))
162 (else (cons (##sys#slot lst 0) (loop (##sys#slot lst 1)))) ) ) )
163
164
165(define (##sys#schedule)
166 (define (switch thread)
167 (dbg "switching to " thread)
168 (set! ##sys#current-thread thread)
169 (##sys#setslot thread 3 'running)
170 (##sys#restore-thread-state-buffer thread)
171 ;;XXX WRONG! this sets the t/i-period ("quantum") for the _next_ thread
172 (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot thread 9))
173 ;; Call upon ye ancient gods to forget about the current
174 ;; continuation; it still refers to the old thread (#1367).
175 (##sys#call-with-cthulhu (##sys#slot thread 1)) )
176 (let* ([ct ##sys#current-thread]
177 [eintr #f]
178 [cts (##sys#slot ct 3)] )
179 (dbg "==================== scheduling, current: " ct ", ready: " ready-queue-head)
180 (##sys#update-thread-state-buffer ct)
181 ;; Put current thread on ready-queue:
182 (when (or (eq? cts 'running) (eq? cts 'ready)) ; should ct really be 'ready? - normally not.
183 (##sys#setislot ct 13 #f) ; clear timeout-unblock flag
184 (##sys#add-to-ready-queue ct) )
185 (let loop1 ()
186 ;; Unblock threads waiting for timeout:
187 (unless (null? timeout-list)
188 (let ((now (##core#inline_allocate ("C_a_i_current_process_milliseconds" 7) #f)))
189 (let loop ((lst timeout-list))
190 (if (null? lst)
191 (set! timeout-list '())
192 (let* ([tmo1 (caar lst)] ; timeout of thread on list
193 [tto (cdar lst)] ; thread on list
194 [tmo2 (##sys#slot tto 4)] ) ; timeout value stored in thread
195 (dbg "timeout: " tto " -> " tmo2 " (now: " now ")")
196 (if (equal? tmo1 tmo2) ;XXX why do we check this?
197 (if (>= now tmo1) ; timeout reached?
198 (begin
199 (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout
200 (##sys#clear-i/o-state-for-thread! tto)
201 (##sys#thread-basic-unblock! tto)
202 (loop (cdr lst)) )
203 (begin
204 (set! timeout-list lst)
205 ;; If there are no threads blocking on a select call (fd-list)
206 ;; but there are threads in the timeout list then sleep for
207 ;; the number of milliseconds of next thread to wake up.
208 (when (and (null? ready-queue-head)
209 (null? fd-list)
210 (pair? timeout-list))
211 (let* ((tmo1 (caar timeout-list))
212 (tmo1 (inexact->exact (round tmo1))))
213 (set! eintr
214 (and (not (##core#inline
215 "C_msleep"
216 (max 0 (- tmo1 now))))
217 (foreign-value
218 "C_signal_interrupted_p" bool) ) ) ) ) ) )
219 (loop (cdr lst)) ) ) ) ) ) )
220 ;; Unblock threads blocked by I/O:
221 (if eintr
222 (begin
223 (##sys#update-thread-state-buffer ct)
224 (##sys#force-primordial)) ; force it to handle user-interrupt
225 (unless (null? fd-list)
226 (##sys#unblock-threads-for-i/o) ) )
227 ;; Fetch and activate next ready thread:
228 (let loop2 ()
229 (let ([nt (remove-from-ready-queue)])
230 (cond [(not nt)
231 (if (and (null? timeout-list) (null? fd-list))
232 (panic "deadlock")
233 (loop1) ) ]
234 [(eq? (##sys#slot nt 3) 'ready) (switch nt)]
235 [else (loop2)] ) ) ) ) ) )
236
237(define (##sys#force-primordial)
238 (dbg "primordial thread forced due to interrupt")
239 (##sys#setislot ##sys#primordial-thread 13 #f)
240 (##sys#thread-unblock! ##sys#primordial-thread) )
241
242(define ready-queue-head '())
243(define ready-queue-tail '())
244
245(define (##sys#ready-queue) ready-queue-head)
246
247(define (##sys#add-to-ready-queue thread)
248 (##sys#setslot thread 3 'ready)
249 (let ((new-pair (cons thread '())))
250 (cond ((eq? '() ready-queue-head)
251 (set! ready-queue-head new-pair))
252 (else (set-cdr! ready-queue-tail new-pair)) )
253 (set! ready-queue-tail new-pair) ) )
254
255(define (remove-from-ready-queue)
256 (let ((first-pair ready-queue-head))
257 (and (not (null? first-pair))
258 (let ((first-cdr (cdr first-pair)))
259 (set! ready-queue-head first-cdr)
260 (when (eq? '() first-cdr) (set! ready-queue-tail '()))
261 (car first-pair) ) ) ) )
262
263(define (##sys#update-thread-state-buffer thread)
264 (let ([buf (##sys#slot thread 5)])
265 (##sys#setslot buf 0 ##sys#dynamic-winds)
266 (##sys#setslot buf 1 ##sys#standard-input)
267 (##sys#setslot buf 2 ##sys#standard-output)
268 (##sys#setslot buf 3 ##sys#standard-error)
269 (##sys#setslot buf 4 ##sys#current-exception-handler)
270 (##sys#setslot buf 5 ##sys#current-parameter-vector) ) )
271
272(define (##sys#restore-thread-state-buffer thread)
273 (let ([buf (##sys#slot thread 5)])
274 (set! ##sys#dynamic-winds (##sys#slot buf 0))
275 (set! ##sys#standard-input (##sys#slot buf 1))
276 (set! ##sys#standard-output (##sys#slot buf 2))
277 (set! ##sys#standard-error (##sys#slot buf 3))
278 (set! ##sys#current-exception-handler (##sys#slot buf 4))
279 (set! ##sys#current-parameter-vector (##sys#slot buf 5)) ) )
280
281(set! ##sys#interrupt-hook
282 (let ([oldhook ##sys#interrupt-hook])
283 (lambda (reason state)
284 (when (fx= reason 255) ; C_TIMER_INTERRUPT_NUMBER
285 (let ([ct ##sys#current-thread])
286 (##sys#setslot ct 1 (lambda () (oldhook reason state)))
287 (##sys#schedule) ) ) ; expected not to return!
288 (oldhook reason state) ) ) )
289
290(define timeout-list '())
291
292(define (##sys#timeout-queue) timeout-list)
293
294(define (##sys#remove-from-timeout-list t)
295 (let loop ((l timeout-list) (prev #f))
296 (if (null? l)
297 l
298 (let ((h (##sys#slot l 0))
299 (r (##sys#slot l 1)))
300 (if (eq? (##sys#slot h 1) t)
301 (if prev
302 (set-cdr! prev r)
303 (set! timeout-list r))
304 (loop r l))))))
305
306(define (##sys#thread-block-for-timeout! t tm)
307 (dbg t " blocks for timeout " tm)
308 (when (> tm 0)
309 ;; This should really use a balanced tree:
310 (let loop ([tl timeout-list] [prev #f])
311 (if (or (null? tl) (< tm (caar tl)))
312 (if prev
313 (set-cdr! prev (cons (cons tm t) tl))
314 (set! timeout-list (cons (cons tm t) tl)) )
315 (loop (cdr tl) tl) ) )
316 (##sys#setslot t 3 'blocked)
317 (##sys#setislot t 13 #f)
318 (##sys#setslot t 4 tm) ) )
319
320(define (##sys#thread-block-for-termination! t t2)
321 (dbg t " blocks for " t2)
322 (let ([state (##sys#slot t2 3)])
323 (unless (or (eq? state 'dead) (eq? state 'terminated))
324 (##sys#setslot t2 12 (cons t (##sys#slot t2 12)))
325 (##sys#setslot t 3 'blocked)
326 (##sys#setislot t 13 #f)
327 (##sys#setslot t 11 t2) ) ) )
328
329(define (##sys#abandon-mutexes thread)
330 (let ((ms (##sys#slot thread 8)))
331 (unless (null? ms)
332 (##sys#for-each
333 (lambda (m)
334 (##sys#setislot m 2 #f)
335 (##sys#setislot m 4 #t)
336 (##sys#setislot m 5 #f)
337 (let ((wts (##sys#slot m 3)))
338 (unless (null? wts)
339 (for-each
340 (lambda (t2)
341 (dbg " unblocking: " t2)
342 (##sys#thread-unblock! t2) )
343 wts) ) )
344 (##sys#setislot m 3 '()) )
345 ms) ) ) )
346
347(define (##sys#thread-kill! t s)
348 (dbg "killing: " t " -> " s ", recipients: " (##sys#slot t 12))
349 (##sys#abandon-mutexes t)
350 (let ((blocked (##sys#slot t 11)))
351 (cond
352 ((##sys#structure? blocked 'condition-variable)
353 (##sys#setslot blocked 2 (delq t (##sys#slot blocked 2))))
354 ((##sys#structure? blocked 'thread)
355 (##sys#setslot blocked 12 (delq t (##sys#slot blocked 12))))) )
356 (##sys#remove-from-timeout-list t)
357 (##sys#clear-i/o-state-for-thread! t)
358 (##sys#setslot t 3 s)
359 (##sys#setislot t 4 #f)
360 (##sys#setislot t 11 #f)
361 (##sys#setislot t 8 '())
362 (let ((rs (##sys#slot t 12)))
363 (unless (null? rs)
364 (for-each
365 (lambda (t2)
366 (dbg " checking: " t2 " (" (##sys#slot t2 3) ") -> " (##sys#slot t2 11))
367 (when (eq? (##sys#slot t2 11) t)
368 (##sys#thread-basic-unblock! t2) ) )
369 rs) ) )
370 (##sys#setislot t 12 '()) )
371
372(define (##sys#thread-basic-unblock! t)
373 (dbg "unblocking: " t)
374 (##sys#setislot t 11 #f) ; (FD . RWFLAGS) | #<MUTEX> | #<THREAD>
375 (##sys#setislot t 4 #f)
376 (##sys#add-to-ready-queue t) )
377
378(define (##sys#default-exception-handler arg)
379 (let ([ct ##sys#current-thread])
380 (dbg "exception: " ct " -> "
381 (if (##sys#structure? arg 'condition) (##sys#slot arg 2) arg))
382 (cond ((foreign-value "C_abort_on_thread_exceptions" bool)
383 (let* ([pt ##sys#primordial-thread]
384 [ptx (##sys#slot pt 1)] )
385 (##sys#setslot
386 pt 1
387 (lambda ()
388 (signal arg)
389 (ptx) ) )
390 (##sys#thread-unblock! pt) ) )
391 (else
392 (##sys#show-exception-warning arg "in thread" ct)))
393 (##sys#setslot ct 7 arg)
394 (##sys#thread-kill! ct 'terminated)
395 (##sys#schedule) ) )
396
397
398;;; `select()/poll()'-based blocking:
399
400(define fd-list '()) ; ((FD1 THREAD1 ...) ...)
401
402(define (##sys#fd-queue) fd-list)
403
404(define (create-fdset)
405 ((foreign-lambda void "C_prepare_fdset" int) (##sys#length fd-list))
406 (let loop ((lst fd-list))
407 (unless (null? lst)
408 (let ((fd (caar lst))
409 (input #f)
410 (output #f))
411 (for-each
412 (lambda (t)
413 (let ((p (##sys#slot t 11)))
414 ;; XXX: This should never be false, because otherwise the
415 ;; thread is not supposed to be on fd-list!
416 (when (pair? p) ; (FD . RWFLAGS)? (can also be mutex or thread)
417 (let ((i/o (cdr p)))
418 (case i/o
419 ((#t #:input)
420 (set! input #t))
421 ((#f #:output)
422 (set! output #t))
423 ((#:all)
424 (set! input #t)
425 (set! output #t))
426 (else
427 (panic
428 (sprintf "create-fdset: invalid i/o direction: ~S (fd = ~S)" i/o fd))))))))
429 (cdar lst))
430 ;; Our position in fd-list must match fdset array position, so
431 ;; always add an fdset entry, even if input & output are #f.
432 ((foreign-lambda void "C_fdset_add" int bool bool) fd input output)
433 (loop (cdr lst))))))
434
435(define (fdset-test inf outf i/o)
436 (case i/o
437 ((#t #:input) inf)
438 ((#f #:output) outf)
439 ((#:all) (or inf outf))
440 (else
441 (panic (sprintf "fdset-test: invalid i/o direction: ~S (i = ~S, o = ~S)"
442 i/o inf outf)))))
443
444(define (##sys#thread-block-for-i/o! t fd i/o)
445 (dbg t " blocks for I/O " fd " in mode " i/o)
446 #;(unless (memq i/o '(#:all #:input #:output))
447 (panic (sprintf "##sys#thread-block-for-i/o!: invalid i/o mode: ~S" i/o)))
448 (let loop ([lst fd-list])
449 (if (null? lst)
450 (set! fd-list (cons (list fd t) fd-list))
451 (let ([a (car lst)])
452 (if (fx= fd (car a))
453 (##sys#setslot a 1 (cons t (cdr a)))
454 (loop (cdr lst)) ) ) ) )
455 (##sys#setslot t 3 'blocked)
456 (##sys#setislot t 13 #f)
457 (##sys#setslot t 11 (cons fd i/o)) )
458
459(define (##sys#unblock-threads-for-i/o)
460 (dbg "fd-list: " fd-list)
461 (create-fdset)
462 (let* ((to? (pair? timeout-list))
463 (rq? (pair? ready-queue-head))
464 (tmo (if (and to? (not rq?)) ; no thread was unblocked by timeout, so wait
465 (let* ((tmo1 (caar timeout-list))
466 (tmo1 (inexact->exact (round tmo1)))
467 (now (##core#inline_allocate ("C_a_i_current_process_milliseconds" 7) #f)))
468 (max 0 (- tmo1 now)) )
469 0))) ; otherwise immediate timeout.
470 (dbg "waiting for I/O with timeout " tmo)
471 (let ((n ((foreign-lambda int "C_ready_fds_timeout" bool unsigned-integer)
472 (or rq? to?) tmo)))
473 (dbg n " fds ready")
474 (cond [(eq? -1 n)
475 (dbg "select(2)/poll(2) returned with result -1" )
476 (##sys#force-primordial)]
477 [(fx> n 0)
478 (set! fd-list
479 (let loop ((n n) (pos 0) (lst fd-list))
480 (if (or (zero? n) (null? lst))
481 lst
482 (let* ((a (car lst))
483 (fd (car a))
484 ;; pos *must* match position of fd in fd-list
485 ;; This is checked in C_fd_ready with assert()
486 (inf (##core#inline "C_fd_input_ready" fd pos))
487 (outf (##core#inline "C_fd_output_ready" fd pos)))
488 (dbg "fd " fd " state: input=" inf ", output=" outf)
489 (if (or inf outf)
490 (let loop2 ((threads (cdr a)) (keep '()))
491 (if (null? threads)
492 (if (null? keep)
493 (loop (sub1 n) (add1 pos) (cdr lst))
494 (cons (cons fd keep)
495 (loop (sub1 n) (add1 pos) (cdr lst))))
496 (let* ((t (car threads))
497 (p (##sys#slot t 11)) )
498 (dbg "checking " t " " p)
499 (cond ((##sys#slot t 13) ; unblocked by timeout?
500 (dbg t " unblocked by timeout")
501 (loop2 (cdr threads) keep))
502 ((not (pair? p)) ; not blocked for I/O?
503 ;; thread on fd-list is not blocked for I/O - this
504 ;; is incorrect but will be ignored, just let it run
505 (when (##sys#slot t 4) ; also blocked for timeout?
506 (##sys#remove-from-timeout-list t))
507 (##sys#thread-basic-unblock! t)
508 (loop2 (cdr threads) keep))
509 ((not (eq? fd (car p)))
510 (panic (sprintf "thread is registered for I/O on unknown file-descriptor: ~S (expected ~S)" (car p) fd)))
511 ((fdset-test inf outf (cdr p))
512 (when (##sys#slot t 4) ; also blocked for timeout?
513 (##sys#remove-from-timeout-list t))
514 (##sys#thread-basic-unblock! t)
515 (loop2 (cdr threads) keep))
516 (else (loop2 (cdr threads) (cons t keep)))))))
517 (cons a (loop n (add1 pos) (cdr lst))) ) ) ) ) ) ] ))) )
518
519
520;;; Clear I/O state for unblocked thread
521
522(define (##sys#clear-i/o-state-for-thread! t)
523 (when (pair? (##sys#slot t 11))
524 (let ((fd (car (##sys#slot t 11))))
525 (set! fd-list
526 (let loop ((lst fd-list))
527 (if (null? lst)
528 '()
529 (let* ((a (car lst))
530 (fd2 (car a)) )
531 (if (eq? fd fd2)
532 (let ((ts (delq t (cdr a)))) ; remove from fd-list entry
533 (cond ((null? ts) (cdr lst))
534 (else
535 (##sys#setslot a 1 ts) ; fd-list entry is list with t removed
536 lst) ) )
537 (cons a (loop (cdr lst)))))))))))
538
539
540;;; Get list of all threads that are ready or waiting for timeout or waiting for I/O:
541;
542; (contributed by Joerg Wittenberger)
543
544(define (##sys#all-threads #!optional
545 (cns (lambda (queue arg val init)
546 (cons val init)))
547 (init '()))
548 (let loop ((l ready-queue-head) (i init))
549 (if (pair? l)
550 (loop (cdr l) (cns 'ready #f (car l) i))
551 (let loop ((l fd-list) (i i))
552 (if (pair? l)
553 (loop (cdr l)
554 (let ((fd (caar l)))
555 (let loop ((l (cdar l)))
556 (if (null? l) i
557 (cns 'i/o fd (car l) (loop (cdr l)))))))
558 (let loop ((l timeout-list) (i i))
559 (if (pair? l)
560 (loop (cdr l) (cns 'timeout (caar l) (cdar l) i))
561 i)))))))
562
563
564;;; Remove all waiting threads from the relevant queues with the exception of the current thread:
565
566(define (##sys#fetch-and-clear-threads)
567 (let ([all (vector ready-queue-head ready-queue-tail fd-list timeout-list)])
568 (set! ready-queue-head '())
569 (set! ready-queue-tail '())
570 (set! fd-list '())
571 (set! timeout-list '())
572 all) )
573
574
575;;; Restore list of waiting threads:
576
577(define (##sys#restore-threads vec)
578 (set! ready-queue-head (##sys#slot vec 0))
579 (set! ready-queue-tail (##sys#slot vec 1))
580 (set! fd-list (##sys#slot vec 2))
581 (set! timeout-list (##sys#slot vec 3)) )
582
583
584;;; Unblock thread cleanly:
585
586(define (##sys#thread-unblock! t)
587 (when (or (eq? 'blocked (##sys#slot t 3))
588 (eq? 'sleeping (##sys#slot t 3)))
589 (##sys#remove-from-timeout-list t)
590 (##sys#clear-i/o-state-for-thread! t)
591 (##sys#thread-basic-unblock! t) ) )
592
593
594;;; Put a thread to sleep:
595
596(define (##sys#thread-sleep! tm)
597 (##sys#call-with-current-continuation
598 (lambda (return)
599 (let ((ct ##sys#current-thread))
600 (##sys#setslot ct 1 (lambda () (return (##core#undefined))))
601 (##sys#thread-block-for-timeout! ct tm)
602 (##sys#schedule)))))
603
604
605;;; Override `sleep` in library.scm to operate on the current thread:
606
607(set! chicken.base#sleep-hook
608 (lambda (n)
609 (##sys#thread-sleep!
610 (+ (##core#inline_allocate ("C_a_i_current_process_milliseconds" 7) #f)
611 (* 1000.0 n)))))
612
613
614;;; Kill all threads in fd-, io- and timeout-lists and assign one thread as the
615; new primordial one. Overrides "##sys#kill-other-threads" in library.scm.
616
617(set! ##sys#kill-other-threads
618 (let ((exit exit))
619 (lambda (thunk)
620 (let ((primordial ##sys#current-thread))
621 (define (suspend t)
622 (unless (eq? t primordial)
623 (##sys#setslot t 3 'suspended))
624 (##sys#setslot t 11 #f) ; block-object (thread/mutex/fd & flags)
625 (##sys#setslot t 12 '())) ; recipients (waiting for join)
626 (set! ##sys#primordial-thread primordial)
627 (set! ready-queue-head (list primordial))
628 (set! ready-queue-tail ready-queue-head)
629 (suspend primordial) ; clear block-obj. and recipients
630 (for-each (lambda (a) (suspend (cdr a))) timeout-list)
631 (for-each (lambda (a) (for-each suspend (cdr a))) fd-list)
632 (set! timeout-list '())
633 (set! fd-list '())
634 (thunk)
635 (exit)))))