(define-structure Utility (export vector-map) (open scheme) (begin (define (vector-map f v) (let* ((vl (vector-length v)) (nv (make-vector vl))) (let loop ((i 0)) (if (>= i vl) nv (begin (vector-set! nv i (f (vector-ref v i))) (loop (+ 1 i))) )))) )) (define-structure Servers (export open-server open-servers close-servers get-processor get-result-stream read-result) (open scsh scheme receiving string-lib list-lib extended-ports Utility) (begin (define (open-server spec) ; (format #t "Opening spec: ~a~%" spec) (let ((parts (string-tokenize spec (char-set-complement (char-set #\:))))) (and (= (length parts) 2) (let ((host (car parts)) (port (string->number (cadr parts)))) (socket-connect protocol-family/internet socket-type/stream host port)) ))) (define (open-servers servers) (list->vector (fold (lambda (spec servers) (if (file-exists? spec) (let ((server-names (with-input-from-file spec read))) (append (vector->list (open-servers server-names)) servers)) (let ((server (open-server spec))) (if server (cons server servers) servers)) )) '() servers))) (define (close-servers servers) (for-each (lambda (s) (close-socket s)) (vector->list servers))) (define *ready-processors* '()) (define (get-processor servers) ; blocks until a processor becomes ready ;(format #t "getting server from: ~s~%" *ready-processors*) (if (null? *ready-processors*) (receive (r writers x) (select (vector) (vector-map socket:outport servers) (vector)) (set! *ready-processors* (append *ready-processors* (vector->list writers))) (get-processor servers)) (let ((server (car *ready-processors*))) (set! *ready-processors* (cdr *ready-processors*)) server ))) (define *ready-results* '()) (define (get-result-stream servers) ; doesn't block. returns an empty list if no results available ;(format #t "getting result stream from: ~s~%" *ready-results*) (if (null? *ready-results*) (receive (readers w x) (select (vector-map socket:inport servers) (vector) (vector) 0) (set! *ready-results* (append *ready-results* (vector->list readers))) (and (not (null? *ready-results*)) (get-result-stream servers))) (let ((server (car *ready-results*))) (set! *ready-results* (cdr *ready-results*)) server ))) (define (drain server) (let ((status (fdes-status server))) (set-fdes-status server open/non-blocking) (let drain ((buffer (read-string/partial 512 server)) (accum "")) (if (> (string-length buffer) 0) (drain (read-string/partial 512 server) (string-append accum buffer)) (begin (set-fdes-status server status) accum)) ))) (define *async-streams* '()) (define (deblank s) (string-trim s char-set:whitespace)) (define (read-string s) (let* ((string-port (make-string-input-port s)) (datum (read string-port)) (remaining (string-drop s (current-column string-port)))) (values datum (deblank remaining)))) (define (read-stream server) (let ((stream (assq server *async-streams*))) (if stream (let* ((buffer (cdr stream)) (new (deblank (string-append buffer (drain server))))) (receive (datum remaining) (read-string new) (set-cdr! stream remaining) (values datum remaining))) (receive (datum remaining) (read-string (deblank (drain server))) (set! *async-streams* (cons (cons server remaining) *async-streams*)) (values datum remaining))))) (define (read-result server) (receive (datum remaining) (read-stream server) (if (> (string-length remaining) 0) (set! *ready-results* (append *ready-results* (list server)))) datum)) )) (define-structure Scatter/Gather-Daemon (export start) (open scsh scheme receiving list-lib string-lib Utility Servers) (begin (define (push datum) (write datum) (newline) (force-output (current-output-port))) (define (push-query servers query) ;(format #t "Pushing ~s~%" query) (let ((server (get-processor servers))) (with-current-output-port server (push query)) )) (define (push-result servers) ;(format #t "Pushing results~%") (let ((server (get-result-stream servers))) ;(format #t "Pushing results from ~s~%" server) (and server (let ((result (read-result server))) ;(format #t "got ~s, pushing it~%" result) (push result) server)) )) (define (usage) (for-each display `("usage: sgd ..." #\newline))) (define (start args) (for-each display `("args: " ,args #\newline)) (if (> 2 (length args)) (usage) (let* ((servers (open-servers (cdr args)))) (letrec ((scatter (lambda (args) ;(format #t "scattering: ~s~%" args) (if (eof-object? args) (gather) (begin (push-query servers args) (gather) (scatter (read))) ))) (gather (lambda () ;(format #t "gathering...~%") (let gather ((result (push-result servers))) ;(format #t "gathered: ~s~%" result) (and result (gather (push-result servers)))) ))) (scatter (read))) (close-servers servers)) )) ))