37
38:- module(thread,
39 [ concurrent/3, 40 concurrent_maplist/2, 41 concurrent_maplist/3, 42 concurrent_maplist/4, 43 concurrent_forall/2, 44 concurrent_forall/3, 45 concurrent_and/2, 46 concurrent_and/3, 47 first_solution/3, 48
49 call_in_thread/2, 50 call_in_thread/3 51 ]). 52:- autoload(library(apply), [maplist/2, maplist/3, maplist/4, maplist/5]). 53:- autoload(library(error), [must_be/2, instantiation_error/1]). 54:- autoload(library(lists), [subtract/3, same_length/2, nth0/3]). 55:- autoload(library(option), [option/2, option/3, meta_options/3]). 56:- autoload(library(ordsets), [ord_intersection/3, ord_union/3]). 57:- use_module(library(debug), [debug/3, assertion/1]). 58
60
61:- meta_predicate
62 concurrent(+, :, +),
63 concurrent_maplist(1, +),
64 concurrent_maplist(2, ?, ?),
65 concurrent_maplist(3, ?, ?, ?),
66 concurrent_forall(0, 0),
67 concurrent_forall(0, 0, +),
68 concurrent_and(0, 0),
69 concurrent_and(0, 0, +),
70 first_solution(-, :, +),
71 call_in_thread(+, 0),
72 call_in_thread(+, 0, :). 73
74
75:- predicate_options(concurrent/3, 3,
76 [ pass_to(system:thread_create/3, 3)
77 ]). 78:- predicate_options(concurrent_forall/3, 3,
79 [ threads(nonneg)
80 ]). 81:- predicate_options(concurrent_and/3, 3,
82 [ threads(nonneg)
83 ]). 84:- predicate_options(first_solution/3, 3,
85 [ on_fail(oneof([stop,continue])),
86 on_error(oneof([stop,continue])),
87 pass_to(system:thread_create/3, 3)
88 ]). 89
122
166
167concurrent(1, M:List, _) :-
168 !,
169 maplist(once_in_module(M), List).
170concurrent(N, M:List, Options) :-
171 must_be(positive_integer, N),
172 must_be(list(callable), List),
173 length(List, JobCount),
174 message_queue_create(Done),
175 message_queue_create(Queue),
176 WorkerCount is min(N, JobCount),
177 create_workers(WorkerCount, Queue, Done, Workers, Options),
178 submit_goals(List, 1, M, Queue, VarList),
179 forall(between(1, WorkerCount, _),
180 thread_send_message(Queue, done)),
181 VT =.. [vars|VarList],
182 concur_wait(JobCount, Done, VT, cleanup(Workers, Queue),
183 Result, [], Exitted),
184 subtract(Workers, Exitted, RemainingWorkers),
185 concur_cleanup(Result, RemainingWorkers, [Queue, Done]),
186 ( Result == true
187 -> true
188 ; Result = false
189 -> fail
190 ; Result = exception(Error)
191 -> throw(Error)
192 ).
193
194once_in_module(M, Goal) :-
195 call(M:Goal), !.
196
202
203submit_goals([], _, _, _, []).
204submit_goals([H|T], I, M, Queue, [Vars|VT]) :-
205 term_variables(H, Vars),
206 thread_send_message(Queue, goal(I, M:H, Vars)),
207 I2 is I + 1,
208 submit_goals(T, I2, M, Queue, VT).
209
210
218
219concur_wait(0, _, _, _, true, Exited, Exited) :- !.
220concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :-
221 debug(concurrent, 'Concurrent: waiting for workers ...', []),
222 catch(thread_get_message(Done, Exit), Error,
223 concur_abort(Error, Cleanup, Done, Exitted0)),
224 debug(concurrent, 'Waiting: received ~p', [Exit]),
225 ( Exit = done(Id, Vars)
226 -> debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]),
227 arg(Id, VT, Vars),
228 N2 is N - 1,
229 concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted)
230 ; Exit = finished(Thread)
231 -> thread_join(Thread, JoinStatus),
232 debug(concurrent, 'Concurrent: waiter ~p joined: ~p',
233 [Thread, JoinStatus]),
234 ( JoinStatus == true
235 -> concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted)
236 ; Status = JoinStatus,
237 Exitted = [Thread|Exitted0]
238 )
239 ).
240
241concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :-
242 debug(concurrent, 'Concurrent: got ~p', [Error]),
243 subtract(Workers, Exitted, RemainingWorkers),
244 concur_cleanup(Error, RemainingWorkers, [Queue, Done]),
245 throw(Error).
246
247create_workers(N, Queue, Done, [Id|Ids], Options) :-
248 N > 0,
249 !,
250 thread_create(worker(Queue, Done), Id,
251 [ at_exit(thread_send_message(Done, finished(Id)))
252 | Options
253 ]),
254 N2 is N - 1,
255 create_workers(N2, Queue, Done, Ids, Options).
256create_workers(_, _, _, [], _).
257
258
262
263worker(Queue, Done) :-
264 thread_get_message(Queue, Message),
265 debug(concurrent, 'Worker: received ~p', [Message]),
266 ( Message = goal(Id, Goal, Vars)
267 -> ( Goal
268 -> thread_send_message(Done, done(Id, Vars)),
269 worker(Queue, Done)
270 )
271 ; true
272 ).
273
274
281
282concur_cleanup(Result, Workers, Queues) :-
283 !,
284 ( Result == true
285 -> true
286 ; kill_workers(Workers)
287 ),
288 join_all(Workers),
289 maplist(message_queue_destroy, Queues).
290
291kill_workers([]).
292kill_workers([Id|T]) :-
293 debug(concurrent, 'Signalling ~w', [Id]),
294 catch(thread_signal(Id, abort), _, true),
295 kill_workers(T).
296
297join_all([]).
298join_all([Id|T]) :-
299 thread_join(Id, _),
300 join_all(T).
301
302
303 306
325
326:- dynamic
327 fa_aborted/1. 328
329concurrent_forall(Generate, Test) :-
330 concurrent_forall(Generate, Test, []).
331
332concurrent_forall(Generate, Test, Options) :-
333 jobs(Jobs, Options),
334 Jobs > 1,
335 !,
336 term_variables(Generate, GVars),
337 term_variables(Test, TVars),
338 sort(GVars, GVarsS),
339 sort(TVars, TVarsS),
340 ord_intersection(GVarsS, TVarsS, Shared),
341 Templ =.. [v|Shared],
342 MaxSize is Jobs*4,
343 message_queue_create(Q, [max_size(MaxSize)]),
344 length(Workers, Jobs),
345 thread_self(Me),
346 maplist(thread_create(fa_worker(Q, Me, Templ, Test)), Workers),
347 catch(( forall(Generate,
348 thread_send_message(Q, job(Templ))),
349 forall(between(1, Jobs, _),
350 thread_send_message(Q, done)),
351 maplist(thread_join, Workers),
352 message_queue_destroy(Q)
353 ),
354 Error,
355 fa_cleanup(Error, Workers, Q)).
356concurrent_forall(Generate, Test, _) :-
357 forall(Generate, Test).
358
359fa_cleanup(Error, Workers, Q) :-
360 maplist(safe_abort, Workers),
361 debug(concurrent(fail), 'Joining workers', []),
362 maplist(safe_join, Workers),
363 debug(concurrent(fail), 'Destroying queue', []),
364 retractall(fa_aborted(Q)),
365 message_queue_destroy(Q),
366 ( Error = fa_worker_failed(_0Test, Why)
367 -> debug(concurrent(fail), 'Test ~p failed: ~p', [_0Test, Why]),
368 ( Why == false
369 -> fail
370 ; Why = error(E)
371 -> throw(E)
372 ; assertion(fail)
373 )
374 ; throw(Error)
375 ).
376
377fa_worker(Queue, Main, Templ, Test) :-
378 repeat,
379 thread_get_message(Queue, Msg),
380 ( Msg == done
381 -> !
382 ; Msg = job(Templ),
383 debug(concurrent, 'Running test ~p', [Test]),
384 ( catch_with_backtrace(Test, E, true)
385 -> ( var(E)
386 -> fail
387 ; fa_stop(Queue, Main, fa_worker_failed(Test, error(E)))
388 )
389 ; !,
390 fa_stop(Queue, Main, fa_worker_failed(Test, false))
391 )
392 ).
393
394fa_stop(Queue, Main, Why) :-
395 with_mutex('$concurrent_forall',
396 fa_stop_sync(Queue, Main, Why)).
397
398fa_stop_sync(Queue, _Main, _Why) :-
399 fa_aborted(Queue),
400 !.
401fa_stop_sync(Queue, Main, Why) :-
402 asserta(fa_aborted(Queue)),
403 debug(concurrent(fail), 'Stop due to ~p. Signalling ~q', [Why, Main]),
404 thread_signal(Main, throw(Why)).
405
406jobs(Jobs, Options) :-
407 ( option(threads(Jobs), Options)
408 -> true
409 ; current_prolog_flag(cpu_count, Jobs)
410 -> true
411 ; Jobs = 1
412 ).
413
414safe_abort(Thread) :-
415 catch(thread_signal(Thread, abort), error(_,_), true).
416safe_join(Thread) :-
417 E = error(_,_),
418 catch(thread_join(Thread, _Status), E, true).
419
420
421 424
451
452concurrent_and(Gen, Test) :-
453 concurrent_and(Gen, Test, []).
454
455concurrent_and(Gen, Test, Options) :-
456 jobs(Jobs, Options),
457 MaxSize is Jobs*4,
458 message_queue_create(JobQueue, [max_size(MaxSize)]),
459 message_queue_create(AnswerQueue, [max_size(MaxSize)]),
460 ca_template(Gen, Test, Templ),
461 term_variables(Gen+Test, AllVars),
462 ReplyTempl =.. [v|AllVars],
463 length(Workers, Jobs),
464 Alive is 1<<Jobs-1,
465 maplist(thread_create(ca_worker(JobQueue, AnswerQueue,
466 Templ, Test, ReplyTempl)),
467 Workers),
468 thread_create(ca_generator(Gen, Templ, JobQueue, AnswerQueue),
469 GenThread),
470 State = state(Alive),
471 call_cleanup(
472 ca_gather(State, AnswerQueue, ReplyTempl, Workers),
473 ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue)).
474
475ca_gather(State, AnswerQueue, ReplyTempl, Workers) :-
476 repeat,
477 thread_get_message(AnswerQueue, Msg),
478 ( Msg = true(ReplyTempl)
479 -> true
480 ; Msg = done(Worker)
481 -> nth0(Done, Workers, Worker),
482 arg(1, State, Alive0),
483 Alive1 is Alive0 /\ \(1<<Done),
484 debug(concurrent(and), 'Alive = ~2r', [Alive1]),
485 ( Alive1 =:= 0
486 -> !,
487 fail
488 ; nb_setarg(1, State, Alive1),
489 fail
490 )
491 ; Msg = error(E)
492 -> throw(E)
493 ).
494
495ca_template(Gen, Test, Templ) :-
496 term_variables(Gen, GVars),
497 term_variables(Test, TVars),
498 sort(GVars, GVarsS),
499 sort(TVars, TVarsS),
500 ord_intersection(GVarsS, TVarsS, Shared),
501 ord_union(GVarsS, Shared, TemplVars),
502 Templ =.. [v|TemplVars].
503
504ca_worker(JobQueue, AnswerQueue, Templ, Test, ReplyTempl) :-
505 thread_self(Me),
506 EG = error(existence_error(message_queue, _), _),
507 repeat,
508 catch(thread_get_message(JobQueue, Req), EG, Req=all_done),
509 ( Req = job(Templ)
510 -> ( catch(Test, E, true),
511 ( var(E)
512 -> thread_send_message(AnswerQueue, true(ReplyTempl))
513 ; thread_send_message(AnswerQueue, error(E))
514 ),
515 fail
516 )
517 ; Req == done
518 -> !,
519 message_queue_destroy(JobQueue),
520 thread_send_message(AnswerQueue, done(Me))
521 ; assertion(Req == all_done)
522 -> !,
523 thread_send_message(AnswerQueue, done(Me))
524 ).
525
526ca_generator(Gen, Templ, JobQueue, AnswerQueue) :-
527 ( catch(Gen, E, true),
528 ( var(E)
529 -> thread_send_message(JobQueue, job(Templ))
530 ; thread_send_message(AnswerQueue, error(E))
531 ),
532 fail
533 ; thread_send_message(JobQueue, done)
534 ).
535
536ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue) :-
537 safe_abort(GenThread),
538 safe_join(GenThread),
539 maplist(safe_abort, Workers),
540 maplist(safe_join, Workers),
541 message_queue_destroy(AnswerQueue),
542 catch(message_queue_destroy(JobQueue), error(_,_), true).
543
544
545 548
565
566concurrent_maplist(Goal, List) :-
567 workers(List, WorkerCount),
568 !,
569 maplist(ml_goal(Goal), List, Goals),
570 concurrent(WorkerCount, Goals, []).
571concurrent_maplist(M:Goal, List) :-
572 maplist(once_in_module(M, Goal), List).
573
574once_in_module(M, Goal, Arg) :-
575 call(M:Goal, Arg), !.
576
577ml_goal(Goal, Elem, call(Goal, Elem)).
578
579concurrent_maplist(Goal, List1, List2) :-
580 same_length(List1, List2),
581 workers(List1, WorkerCount),
582 !,
583 maplist(ml_goal(Goal), List1, List2, Goals),
584 concurrent(WorkerCount, Goals, []).
585concurrent_maplist(M:Goal, List1, List2) :-
586 maplist(once_in_module(M, Goal), List1, List2).
587
588once_in_module(M, Goal, Arg1, Arg2) :-
589 call(M:Goal, Arg1, Arg2), !.
590
591ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)).
592
593concurrent_maplist(Goal, List1, List2, List3) :-
594 same_length(List1, List2, List3),
595 workers(List1, WorkerCount),
596 !,
597 maplist(ml_goal(Goal), List1, List2, List3, Goals),
598 concurrent(WorkerCount, Goals, []).
599concurrent_maplist(M:Goal, List1, List2, List3) :-
600 maplist(once_in_module(M, Goal), List1, List2, List3).
601
602once_in_module(M, Goal, Arg1, Arg2, Arg3) :-
603 call(M:Goal, Arg1, Arg2, Arg3), !.
604
605ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)).
606
607workers(List, Count) :-
608 current_prolog_flag(cpu_count, Cores),
609 Cores > 1,
610 length(List, Len),
611 Count is min(Cores,Len),
612 Count > 1,
613 !.
614
615same_length([], [], []).
616same_length([_|T1], [_|T2], [_|T3]) :-
617 same_length(T1, T2, T3).
618
619
620 623
660
661
662first_solution(X, M:List, Options) :-
663 message_queue_create(Done),
664 thread_options(Options, ThreadOptions, RestOptions),
665 length(List, JobCount),
666 create_solvers(List, M, X, Done, Solvers, ThreadOptions),
667 wait_for_one(JobCount, Done, Result, RestOptions),
668 concur_cleanup(kill, Solvers, [Done]),
669 ( Result = done(_, Var)
670 -> X = Var
671 ; Result = error(_, Error)
672 -> throw(Error)
673 ).
674
675create_solvers([], _, _, _, [], _).
676create_solvers([H|T], M, X, Done, [Id|IDs], Options) :-
677 thread_create(solve(M:H, X, Done), Id, Options),
678 create_solvers(T, M, X, Done, IDs, Options).
679
680solve(Goal, Var, Queue) :-
681 thread_self(Me),
682 ( catch(Goal, E, true)
683 -> ( var(E)
684 -> thread_send_message(Queue, done(Me, Var))
685 ; thread_send_message(Queue, error(Me, E))
686 )
687 ; thread_send_message(Queue, failed(Me))
688 ).
689
690wait_for_one(0, _, failed, _) :- !.
691wait_for_one(JobCount, Queue, Result, Options) :-
692 thread_get_message(Queue, Msg),
693 LeftCount is JobCount - 1,
694 ( Msg = done(_, _)
695 -> Result = Msg
696 ; Msg = failed(_)
697 -> ( option(on_fail(stop), Options, stop)
698 -> Result = Msg
699 ; wait_for_one(LeftCount, Queue, Result, Options)
700 )
701 ; Msg = error(_, _)
702 -> ( option(on_error(stop), Options, stop)
703 -> Result = Msg
704 ; wait_for_one(LeftCount, Queue, Result, Options)
705 )
706 ).
707
708
713
714thread_options([], [], []).
715thread_options([H|T], [H|Th], O) :-
716 thread_option(H),
717 !,
718 thread_options(T, Th, O).
719thread_options([H|T], Th, [H|O]) :-
720 thread_options(T, Th, O).
721
722thread_option(local(_)).
723thread_option(global(_)).
724thread_option(trail(_)).
725thread_option(argument(_)).
726thread_option(stack(_)).
727
728
749
750call_in_thread(Thread, Goal) :-
751 call_in_thread(Thread, Goal, []).
752
753call_in_thread(Thread, Goal, _) :-
754 must_be(callable, Goal),
755 var(Thread),
756 !,
757 instantiation_error(Thread).
758call_in_thread(Thread, Goal, _) :-
759 thread_self(Thread),
760 !,
761 once(Goal).
762call_in_thread(Thread, Goal, Options) :-
763 meta_options(is_meta, Options, Options1),
764 term_variables(Goal, Vars),
765 thread_self(Me),
766 A is random(1 000 000 000),
767 thread_signal(Thread, run_in_thread(Goal,Vars,A,Me)),
768 ( catch(thread_get_message(Me, in_thread(A,Result), Options1),
769 Error,
770 forward_exception(Thread, A, Error))
771 -> ( Result = true(Vars)
772 -> true
773 ; Result = error(Error)
774 -> throw(Error)
775 ; fail
776 )
777 ; thread_signal(Thread, kill_task(A, stop(time_limit_exceeded))),
778 option(on_timeout(Action), Options1, throw(time_limit_exceeded)),
779 call(Action)
780 ).
781
782is_meta(on_timeout).
783
784run_in_thread(Goal, Vars, Id, Sender) :-
785 ( catch_with_backtrace(call(Goal), Error, true)
786 -> ( var(Error)
787 -> thread_send_message(Sender, in_thread(Id, true(Vars)))
788 ; Error = stop(_)
789 -> true
790 ; thread_send_message(Sender, in_thread(Id, error(Error)))
791 )
792 ; thread_send_message(Sender, in_thread(Id, false))
793 ).
794
795forward_exception(Thread, Id, Error) :-
796 kill_with(Error, Kill),
797 thread_signal(Thread, kill_task(Id, Kill)),
798 throw(Error).
799
800kill_with(time_limit_exceeded, stop(time_limit_exceeded)) :-
801 !.
802kill_with(_, stop(interrupt)).
803
804kill_task(Id, Exception) :-
805 prolog_current_frame(Frame),
806 prolog_frame_attribute(Frame, parent_goal,
807 run_in_thread(_Goal, _Vars, Id, _Sender)),
808 !,
809 throw(Exception).
810kill_task(_, _)