34
35:- module(json_rpc_client,
36 [ json_call/4, 37 json_notify/3, 38 json_batch/5, 39 json_full_duplex/2 40 ]). 41:- use_module(library(json_rpc_common)). 42:- autoload(library(json), [json_read_dict/3]). 43:- autoload(library(option), [option/2, meta_options/3]). 44:- use_module(library(debug), [debug/3]). 45:- autoload(library(apply), [maplist/4, maplist/3]). 46:- autoload(library(lists), [append/3, member/2]). 47:- autoload(library(terms), [mapsubterms/3]). 48:- autoload(library(http/http_stream), [stream_range_open/3]). 49:- autoload(library(error), [permission_error/3]). 50
51:- meta_predicate
52 json_call(+, +, -, :),
53 json_full_duplex(+, :). 54
62
63:- dynamic
64 json_result_queue/2, 65 pending/3. 66
109
110json_call(Stream, Goal, Result, Options0) :-
111 meta_options(is_meta, Options0, Options),
112 Goal =.. [Name|Args0],
113 call_args(Args0, Args),
114 client_id(Id, Options),
115 debug(json_rpc, 'Sending request ~p', [Id]),
116 ( option(async(AsyncGoal), Options)
117 -> ( AsyncGoal = _:true
118 -> true
119 ; asserta(pending(Id, Stream, call(AsyncGoal)))
120 ),
121 Async = true
122 ; asserta(pending(Id, Stream, reply))
123 ),
124 ( Args == []
125 -> json_rpc_send(Stream,
126 #{ jsonrpc: "2.0",
127 id: Id,
128 method: Name
129 }, Options)
130 ; json_rpc_send(Stream,
131 #{ jsonrpc: "2.0",
132 id: Id,
133 method: Name,
134 params: Args
135 }, Options)
136 ),
137 ( Async == true
138 -> true
139 ; json_wait_reply(Stream, Id, Result, Options)
140 ).
141
142is_meta(async).
143
144call_args([Arg], Args), is_dict(Arg) =>
145 Args = Arg.
146call_args([Args0], Args), is_list(Args0) =>
147 Args = Args0.
148call_args(Args0, Args) =>
149 Args = Args0.
150
151json_wait_reply(Stream, Id, Result, Options) :-
152 with_mutex(json_rpc_client,
153 get_json_result_queue(Stream, Queue, Options)),
154 debug(json_rpc, 'Waiting for reply', []),
155 ( thread_get_message(Queue, done(Id, Result0), Options)
156 -> map_reply(Result0, Result1, Options),
157 debug(json_rpc, 'Got reply for ~p', [Id]),
158 ( Result1 = throw(Error)
159 -> throw(Error)
160 ; Result1 = true(Result)
161 )
162 ; fail
163 ).
164
165map_reply(Reply0, Reply, Options) :-
166 option(value_string_as(atom), Options),
167 !,
168 mapsubterms(map_string, Reply0, Reply).
169map_reply(Reply, Reply, _).
170
171map_string(String, Atom) :-
172 string(String),
173 atom_string(Atom,String).
174
175client_id(Id, Options) :-
176 option(id(Id), Options),
177 !.
178client_id(Id, _Options) :-
179 flag(json_client_id, Id, Id+1).
180
185
186json_notify(Stream, Goal, Options) :-
187 Goal =.. [Name|Args0],
188 call_args(Args0, Args),
189 ( Args == []
190 -> json_rpc_send(Stream,
191 #{ jsonrpc: "2.0",
192 method: Name
193 }, Options)
194 ; json_rpc_send(Stream,
195 #{ jsonrpc: "2.0",
196 method: Name,
197 params: Args
198 }, Options)
199 ).
200
215
216json_batch(Stream, Notifications, Calls, Results, Options) :-
217 maplist(call_to_json_request, Calls, IDs, Requests1),
218 maplist(call_to_json_notification, Notifications, Requests2),
219 append(Requests1, Requests2, Batch),
220 ( IDs == []
221 -> true
222 ; batch_id(IDs, BatchId),
223 asserta(pending(BatchId, Stream, reply))
224 ),
225 json_rpc_send(Stream, Batch, Options),
226 flush_output(Stream),
227 ( var(BatchId)
228 -> true
229 ; json_wait_reply(Stream, BatchId, Results0, Options),
230 sort(id, <, Results0, Results1),
231 maplist(batch_result, Results1, Results)
232 ).
233
234call_to_json_request(Goal, Id, Request) :-
235 Goal =.. [Name|Args],
236 client_id(Id, []),
237 Request = #{ jsonrpc: "2.0",
238 id: Id,
239 method: Name,
240 params: Args
241 }.
242
243call_to_json_notification(Goal, Notification) :-
244 Goal =.. [Name|Args],
245 Notification = #{ jsonrpc: "2.0",
246 method: Name,
247 params: Args
248 }.
249
250batch_id(IDs, Id) :-
251 sort(IDs, Canonical),
252 variant_sha1(Canonical, Id).
253
254batch_result(Reply, Result), Result0 = Reply.get(result) =>
255 Result = Result0.
256batch_result(Reply, Result), Result0 = Reply.get(error) =>
257 Result = error(Result0).
258
259
260 263
269
270json_full_duplex(Stream, Options) :-
271 with_mutex(json_rpc_client, json_full_duplex_(Stream, Options)).
272
273json_full_duplex_(Stream, _) :-
274 json_result_queue(Stream, _Queue),
275 !,
276 permission_error(json, full_duplex, Stream).
277json_full_duplex_(Stream, M:Options) :-
278 get_json_result_queue(Stream, _Queue,
279 [server_module(M)|Options]).
280
281
287
288get_json_result_queue(Stream, Queue, _Options) :-
289 json_result_queue(Stream, Queue),
290 !.
291get_json_result_queue(Stream, Queue, Options) :-
292 message_queue_create(Queue),
293 asserta(json_result_queue(Stream, Queue)),
294 ( option(thread_alias(Alias), Options)
295 -> true
296 ; flag(json_rpc_client_dispatcher, N, N+1),
297 format(atom(Alias), 'json_rpc_client:~w', [N])
298 ),
299 thread_create(
300 handle_result_loop(Stream, Options),
301 _Id,
302 [ detached(true),
303 alias(Alias),
304 inherit_from(main),
305 at_exit(cleanup_client(Stream))
306 ]).
307
308handle_result_loop(Stream, Options) :-
309 handle_result(Stream, EOF, Options),
310 ( EOF == true
311 -> true
312 ; handle_result_loop(Stream, Options)
313 ).
314
315handle_result(Stream, EOF, Options) :-
316 Error = error(Formal, _),
317 catch(json_receive(Stream, Reply, Options),
318 Error,
319 true),
320 debug(json_rpc, 'Received ~p', [Reply]),
321 ( Reply == end_of_file(true)
322 -> EOF = true
323 ; var(Formal)
324 -> handle_reply(Stream, Reply, Options)
325 ; handle_error(Error, EOF)
326 ).
327
328json_receive(Stream, Reply, Options) :-
329 option(header(true), Options),
330 !,
331 read_header(Stream, Lines),
332 ( Lines == []
333 -> Reply = end_of_file(true)
334 ; header_content_length(Lines, Length),
335 setup_call_cleanup(
336 stream_range_open(Stream, Data, [size(Length)]),
337 json_read_dict(Data,
338 Reply,
339 Options),
340 close(Data))
341 ).
342json_receive(Stream, Reply, Options) :-
343 json_read_dict(Stream,
344 Reply,
345 [ end_of_file(end_of_file(true))
346 | Options
347 ]).
348
(Stream, Lines) :-
350 read_string(Stream, "\n", "\r\t ", Sep, Line),
351 ( (Line == "" ; Sep == -1)
352 -> Lines = []
353 ; Lines = [Line|Rest],
354 read_header(Stream, Rest)
355 ).
356
(Lines, Length) :-
358 member(Line, Lines),
359 split_string(Line, ":", "\t\s", [Field,Value]),
360 string_lower(Field, "content-length"),
361 !,
362 number_string(Length, Value).
363
364handle_reply(Stream, Batch, _Options),
365 is_list(Batch) =>
366 maplist(get_dict(id), Batch, IDs),
367 batch_id(IDs, Id),
368 send_done(Stream, Id, true(Batch)).
369handle_reply(Stream, Reply, _Options),
370 #{ jsonrpc: "2.0",
371 result: Result,
372 id: Id } :< Reply =>
373 send_done(Stream, Id, true(Result)).
374handle_reply(Stream, Reply, _Options),
375 #{ jsonrpc: "2.0",
376 error: Error,
377 id: Id } :< Reply =>
378 send_done(Stream, Id, throw(error(json_rpc_error(Error), _))).
379handle_reply(Stream, Request, Options),
380 #{ jsonrpc: "2.0",
381 method: _Method,
382 params: _Params } :< Request =>
383 option(server_module(M), Options),
384 json_rpc_server:json_rpc_dispatch_request(M, Stream, Request, Options).
385
386send_done(Stream, Id, Data) :-
387 retract(pending(Id, Stream, Action)),
388 !,
389 reply_done(Action, Id, Stream, Data).
390send_done(_Stream, Id, throw(error(json_rpc_error(Error), _))) :-
391 !,
392 print_message(error, error(json_rpc_error(Error, Id), _)).
393send_done(_Stream, _Id, _Result).
394
395reply_done(reply, Id, Stream, Data) =>
396 json_result_queue(Stream, Queue),
397 thread_send_message(Queue, done(Id, Data)).
398reply_done(call(Goal), _Id, _Stream, true(Data)) =>
399 catch_with_backtrace(
400 call(Goal, Data),
401 Error,
402 print_message(error, Error)).
403reply_done(call(_Goal), Id, _Stream,
404 throw(error(json_rpc_error(Error), _))) =>
405 print_message(error, error(json_rpc_error(Error, Id), _)).
406
407handle_error(error(existence_error(stream, _), _), EOF) =>
408 EOF = true.
409handle_error(Error, _EOF) =>
410 print_message(error, Error).
411
415
416cleanup_client(Stream) :-
417 forall(retract(json_result_queue(Stream, Queue)),
418 do_cleanup(Stream, Queue)).
419
420do_cleanup(Stream, Queue) :-
421 close(Stream, [force(true)]),
422 message_queue_destroy(Queue)