1:- module(spawn, [ async/2 2 , async/3 3 , await/1 4 , lazy/1 5 , spawn/1 6 , spawn/2 7 ]). 8:- use_module(library(predicate_options)). 9:- use_module(library(record)). 10 11% convenience for async/3 options 12:- record opts( policy:oneof([ephemeral,lazy])=ephemeral 13 ). 14:- predicate_options(spawn/2,2,[pass_to(async/3,3)]). 15:- predicate_options(async/3,3, [ policy(+oneof([ephemeral,lazy])) 16 ]). 17 18:- meta_predicate 19 spawn( ), 20 async( , ), 21 async( , , ), 22 async_policy( , , , ). 23 24:- thread_local 25 spawn_token_needs_await/1.
30spawn(Goal) :-
31 spawn(Goal, []).
For example, the following code runs in about 1 second because both sleep/1 calls happen in parallel. When foo/0 unifies L, it blocks until silly/1 has finished.
silly(L) :- sleep(1), L = [a,b]. foo :- spawn(silly(L)), sleep(1), L=[A,B], % blocks, if necessary writeln(A-B).
If Goal produces multiple solutions, they're iterated when
backtracking over the unification (L=[A,B]
above). If Goal fails
or throws an exception, the calling thread sees it at the unification
point.
58spawn(Goal,Options) :- 59 term_variables(Goal, Vars), 60 async(Goal, Token, Options), 61 Id is random(1<<63), 62 assert(spawn_token_needs_await(Id)), 63 make_opts(Options,Opts), 64 maplist(spawn_freeze(Id,Token,Opts), Vars). 65 66spawn_freeze(Id,Token,Opts,Var) :- 67 freeze(Var,spawn_thaw(Id,Token,Opts)). 68 69spawn_thaw(Id,Token,Opts) :- 70 ( retract(spawn_token_needs_await(Id)) -> 71 debug(spawn,"Await on ~d",[Id]), 72 await(Token) 73 ; opts_policy(Opts,lazy) -> 74 debug(spawn,"Awaiting again on ~d",[Id]), 75 await(Token) 76 ; % already called await/1 -> 77 debug(spawn,"Already did await on ~d",[Id]), 78 true 79 ).
lazy
thread policy.
lazy/1 can be helpful when complicated or expensive goals are only needed in some code paths but duplicating those goals is too verbose. It can be an alternative to creating a new, named predicate. For example,
foo(Xs) :- lazy(i_am_slow(a,B,[c(C),d(d),e(etc)])), % complicated ( day_of_week(tuesday) -> append(B,C,Xs) ; phase_of_moon(full) -> append(C,B,Xs) ; true -> % i_am_slow/3 not executed in this code path Xs = [hi] ).
103lazy(Goal) :-
104 spawn(Goal,[policy(lazy)]).
110async(Goal,Token) :-
111 async(Goal,Token,[]).
Options are as follows:
ephemeral
(default), create a new thread in which to call
goal. If lazy
, only execute Goal when await/1 is called; no
background threads are used.128async(Goal,Token,Options) :- 129 make_opts(Options,Opts), 130 opts_policy(Opts, Policy), 131 async_policy(Policy, Goal, Token, Opts). 132 133 134async_policy(ephemeral, Goal, Token, _Opts) :- 135 % what does the caller need to track this computation? 136 term_variables(Goal, Vars), 137 message_queue_create(SolutionsQ, [max_size(1)]), 138 Token = ephemeral_token(Vars,SolutionsQ), 139 140 % start the worker thread 141 Work = work(Goal,Vars,SolutionsQ), 142 thread_create(ephemeral_worker(Work), _, [detached(true)]). 143async_policy(lazy,Goal,Token,_Opts) :- 144 Token = lazy_thunk(Goal). 145 146 147ephemeral_worker(work(Goal,Vars,SolutionsQ)) :- 148 debug(spawn,"Seeking solutions to: ~q", [Goal]), 149 ( catch(call_cleanup(Goal,Done=true),E,true) *-> 150 ( nonvar(E) -> 151 debug(spawn,"Caught exception: ~q", [E]), 152 thread_send_message(SolutionsQ,exception(E)) 153 ; var(Done) -> 154 debug(spawn,"Sending solution: ~q", [Vars]), 155 thread_send_message(SolutionsQ,solution(Vars)), 156 fail % look for another solution 157 ; Done=true -> 158 debug(spawn,"Final solution: ~q", [Vars]), 159 thread_send_message(SolutionsQ,final(Vars)) 160 ) 161 ; % no solutions -> 162 debug(spawn, "Found no solutions", []), 163 thread_send_message(SolutionsQ,none) 164 ).
await/1 strives to have the same determinism as the original Goal passed to async/3. If that goal fails, await/1 fails. If that goal throws an exception, so does await/1. If that goal produces many solutions, so does await/1 on backtracking.
176await(ephemeral_token(Vars,SolutionsQ)) :- 177 repeat, 178 thread_get_message(SolutionsQ,Solution), 179 ( Solution = solution(Vars) -> 180 true 181 ; Solution = final(Vars) -> 182 !, 183 true 184 ; Solution = none -> 185 !, 186 fail 187 ; Solution = exception(E) -> 188 throw(E) 189 ; % what? -> 190 throw(unexpected_await_solution(Solution)) 191 ). 192await(lazy_thunk(Goal)) :- 193 call(Goal)