1:- module(rs_rolog,
2 [
3 rs_init/1,
4 rs_call/2,
5 rs_eval/3,
6 rs_submit/3,
7 rs_close/1,
8 rx_call/1,
9 rx_eval/2,
10 rx_submit/2
11 ]). 12
13:- reexport(library(rolog)). 14:- use_module(library(broadcast)). 15:- use_module(library(http/http_session)). 16
17:- dynamic rs_session/1. 18
19rs_init(Session) :-
20 <- library('RSclient'),
21 Session <- 'RS.connect'(),
22 assert(rs_session(Session)).
23
24rs_call(Session, Expr) :-
25 idle(Session),
26 <- 'RS.eval'(Session, Expr, wait=true).
27
28rs_eval(Session, Expr, Result) :-
29 idle(Session),
30 Result <- 'RS.eval'(Session, Expr, wait=true).
31
32rs_submit(Session, Alias, Expr) :-
33 idle(Session),
34 assert(lock(Session)),
35 <- 'RS.eval'(Session, Expr, wait=false),
36 thread_create(rs_collect(Session, Alias, Expr), _).
37
38rs_collect(Session, Alias, Expr) :-
39 Result <- 'RS.collect'(Session, timeout=0),
40 ( Result = [] 41 -> sleep(0.1),
42 rs_collect(Session, Alias, Expr)
43 ; retract(lock(Session)),
44 broadcast(rs_result(Session, Alias, Expr, Result))
45 ).
46
47rs_close(Session) :-
48 <- 'RS.close'(Session),
49 retract(Session).
50
51:- dynamic lock/1. 52
53idle(Session) :-
54 \+ lock(Session),
55 !.
56
57idle(Session) :-
58 thread_wait(\+ lock(Session), [wait_preds([-(lock/1)])]).
59
61rx_call(Expr) :-
62 http_in_session(Session),
63 rs_session(Session),
64 !,
65 rs_call(Session, Expr).
66
68rx_call(Expr) :-
69 r_call(Expr).
70
71rx_eval(Expr, Result) :-
72 http_in_session(Session),
73 rs_session(Session),
74 !,
75 rs_eval(Session, Expr, Result).
76
77rx_eval(Expr, Result) :-
78 r_eval(Expr, Result).
79
80rx_submit(Alias, Expr) :-
81 http_in_session(Session),
82 rs_session(Session),
83 !,
84 rs_submit(Session, Alias, Expr).
85
86rx_submit(_, _) :-
87 resource_error(http_session).
88
89test :-
90 rs_init(s1),
91 rs_init(s2),
92
93 rs_eval(s1, sum(abs(sin(1:10000000))), Res1),
94 format("s1-sync: Res1 = ~w~n", [Res1]),
95 rs_eval(s2, sum(abs(sin(1:10000000))), Res2),
96 format("s2-sync: Res2 = ~w~n", [Res2]),
97
98 listen(rs_result(S, A, E, R), format("Session ~w, Alias ~w, Expr ~w, Result ~w~n", [S, A, E, R])),
99
100 time(rs_submit(s1, a, sum(abs(sin(1:10000000))))),
101 time(rs_submit(s1, b, sum(abs(sin(1:1000000))))),
102 time(rs_submit(s2, c, sum(abs(sin(1:30000000))))),
103 time(rs_submit(s2, d, sum(abs(sin(1:3000000))))),
104
105 rs_eval(s1, sum(abs(sin(1:100))), Res3),
106 format("s1-sync: Res3 = ~w~n", [Res3]),
107 rs_eval(s2, sum(abs(sin(1:100))), Res4),
108 format("s2-sync: Res4 = ~w~n", [Res4]),
109
110 rs_close(s1),
111 rs_close(s2)