1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: jan@swi-prolog.org 5 WWW: https://www.swi-prolog.org 6 Copyright (c) 2025, SWI-Prolog Solutions b.v. 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(json_rpc_client, 36 [ json_call/4, % +Stream, +Goal, -Result, +Options 37 json_notify/3, % +Stream, +Goal, +Options 38 json_batch/5, % +Stream, +Notifications, +Calls, -Results, +Options 39 json_full_duplex/2 % +Stream, :Options 40 ]). 41:- use_module(library(json_rpc_common)). 42:- autoload(library(json), [json_read_dict/3]). 43:- autoload(library(option), [option/2, meta_options/3]). 44:- use_module(library(debug), [debug/3]). 45:- autoload(library(apply), [maplist/4, maplist/3]). 46:- autoload(library(lists), [append/3, member/2]). 47:- autoload(library(terms), [mapsubterms/3]). 48:- autoload(library(http/http_stream), [stream_range_open/3]). 49:- autoload(library(error), [permission_error/3]). 50 51:- meta_predicate 52 json_call(, , , ), 53 json_full_duplex(, ).
63:- dynamic 64 json_result_queue/2, % Stream, Queue 65 pending/3. % Id, Stream, Action
If Stream is closed this library terminates the thread and related message queue.
Options are passed to json_write_dict/3 and thread_get_message/3. Additional options:
call(Closure, Data) from the client reading thread when
the request is completed. If Closure is true, ignore
the reply. As we cannot inject errors as exceptions in
the calling thread, possible errors are printed.json_rpc_client:<N>,
where N is a unique number.110json_call(Stream, Goal, Result, Options0) :- 111 meta_options(is_meta, Options0, Options), 112 Goal =.. [Name|Args0], 113 call_args(Args0, Args), 114 client_id(Id, Options), 115 debug(json_rpc, 'Sending request ~p', [Id]), 116 ( option(async(AsyncGoal), Options) 117 -> ( AsyncGoal = _:true 118 -> true 119 ; asserta(pending(Id, Stream, call(AsyncGoal))) 120 ), 121 Async = true 122 ; asserta(pending(Id, Stream, reply)) 123 ), 124 ( Args == [] 125 -> json_rpc_send(Stream, 126 #{ jsonrpc: "2.0", 127 id: Id, 128 method: Name 129 }, Options) 130 ; json_rpc_send(Stream, 131 #{ jsonrpc: "2.0", 132 id: Id, 133 method: Name, 134 params: Args 135 }, Options) 136 ), 137 ( Async == true 138 -> true 139 ; json_wait_reply(Stream, Id, Result, Options) 140 ). 141 142is_meta(async). 143 144call_args([Arg], Args), is_dict(Arg) => 145 Args = Arg. 146call_args([Args0], Args), is_list(Args0) => 147 Args = Args0. 148call_args(Args0, Args) => 149 Args = Args0. 150 151json_wait_reply(Stream, Id, Result, Options) :- 152 with_mutex(json_rpc_client, 153 get_json_result_queue(Stream, Queue, Options)), 154 debug(json_rpc, 'Waiting for reply', []), 155 ( thread_get_message(Queue, done(Id, Result0), Options) 156 -> map_reply(Result0, Result1, Options), 157 debug(json_rpc, 'Got reply for ~p', [Id]), 158 ( Result1 = throw(Error) 159 -> throw(Error) 160 ; Result1 = true(Result) 161 ) 162 ; fail 163 ). 164 165map_reply(Reply0, Reply, Options) :- 166 option(value_string_as(atom), Options), 167 !, 168 mapsubterms(map_string, Reply0, Reply). 169map_reply(Reply, Reply, _). 170 171map_string(String, Atom) :- 172 string(String), 173 atom_string(Atom,String). 174 175client_id(Id, Options) :- 176 option(id(Id), Options), 177 !. 178client_id(Id, _Options) :- 179 flag(json_client_id, Id, Id+1).
186json_notify(Stream, Goal, Options) :-
187 Goal =.. [Name|Args0],
188 call_args(Args0, Args),
189 ( Args == []
190 -> json_rpc_send(Stream,
191 #{ jsonrpc: "2.0",
192 method: Name
193 }, Options)
194 ; json_rpc_send(Stream,
195 #{ jsonrpc: "2.0",
196 method: Name,
197 params: Args
198 }, Options)
199 ).error(Dict), where Dict holds the
code, message and optional data field. Note that error(Dict)
is not a valid JSON type and this is thus unambiguous. While the
JSON RPC standard allows the server to process the messages in any
order and allows for concurrent processing, all results are sent in
one message and this client ensures the elements of the Results list
are in the same order as the Calls list. If the Calls list is empty
this predicate does not wait for a reply.216json_batch(Stream, Notifications, Calls, Results, Options) :- 217 maplist(call_to_json_request, Calls, IDs, Requests1), 218 maplist(call_to_json_notification, Notifications, Requests2), 219 append(Requests1, Requests2, Batch), 220 ( IDs == [] 221 -> true 222 ; batch_id(IDs, BatchId), 223 asserta(pending(BatchId, Stream, reply)) 224 ), 225 json_rpc_send(Stream, Batch, Options), 226 flush_output(Stream), 227 ( var(BatchId) 228 -> true 229 ; json_wait_reply(Stream, BatchId, Results0, Options), 230 sort(id, <, Results0, Results1), 231 maplist(batch_result, Results1, Results) 232 ). 233 234call_to_json_request(Goal, Id, Request) :- 235 Goal =.. [Name|Args], 236 client_id(Id, []), 237 Request = #{ jsonrpc: "2.0", 238 id: Id, 239 method: Name, 240 params: Args 241 }. 242 243call_to_json_notification(Goal, Notification) :- 244 Goal =.. [Name|Args], 245 Notification = #{ jsonrpc: "2.0", 246 method: Name, 247 params: Args 248 }. 249 250batch_id(IDs, Id) :- 251 sort(IDs, Canonical), 252 variant_sha1(Canonical, Id). 253 254batch_result(Reply, Result), Result0 = Reply.get(result) => 255 Result = Result0. 256batch_result(Reply, Result), Result0 = Reply.get(error) => 257 Result = error(Result0). 258 259 260 /******************************* 261 * INCOMMING DATA * 262 *******************************/
library(jso_rpc_server) in the module derived from the Options
list.270json_full_duplex(Stream, Options) :- 271 with_mutex(json_rpc_client, json_full_duplex_(Stream, Options)). 272 273json_full_duplex_(Stream, _) :- 274 json_result_queue(Stream, _Queue), 275 !, 276 permission_error(json, full_duplex, Stream). 277json_full_duplex_(Stream, M:Options) :- 278 get_json_result_queue(Stream, _Queue, 279 [server_module(M)|Options]).
288get_json_result_queue(Stream, Queue, _Options) :- 289 json_result_queue(Stream, Queue), 290 !. 291get_json_result_queue(Stream, Queue, Options) :- 292 message_queue_create(Queue), 293 asserta(json_result_queue(Stream, Queue)), 294 ( option(thread_alias(Alias), Options) 295 -> true 296 ; flag(json_rpc_client_dispatcher, N, N+1), 297 format(atom(Alias), 'json_rpc_client:~w', [N]) 298 ), 299 thread_create( 300 handle_result_loop(Stream, Options), 301 _Id, 302 [ detached(true), 303 alias(Alias), 304 inherit_from(main), 305 at_exit(cleanup_client(Stream)) 306 ]). 307 308handle_result_loop(Stream, Options) :- 309 handle_result(Stream, EOF, Options), 310 ( EOF == true 311 -> true 312 ; handle_result_loop(Stream, Options) 313 ). 314 315handle_result(Stream, EOF, Options) :- 316 Error = error(Formal, _), 317 catch(json_receive(Stream, Reply, Options), 318 Error, 319 true), 320 debug(json_rpc, 'Received ~p', [Reply]), 321 ( Reply == end_of_file(true) 322 -> EOF = true 323 ; var(Formal) 324 -> handle_reply(Stream, Reply, Options) 325 ; handle_error(Error, EOF) 326 ). 327 328json_receive(Stream, Reply, Options) :- 329 option(header(true), Options), 330 !, 331 read_header(Stream, Lines), 332 ( Lines == [] 333 -> Reply = end_of_file(true) 334 ; header_content_length(Lines, Length), 335 setup_call_cleanup( 336 stream_range_open(Stream, Data, [size(Length)]), 337 json_read_dict(Data, 338 Reply, 339 Options), 340 close(Data)) 341 ). 342json_receive(Stream, Reply, Options) :- 343 json_read_dict(Stream, 344 Reply, 345 [ end_of_file(end_of_file(true)) 346 | Options 347 ]). 348 349read_header(Stream, Lines) :- 350 read_string(Stream, "\n", "\r\t ", Sep, Line), 351 ( (Line == "" ; Sep == -1) 352 -> Lines = [] 353 ; Lines = [Line|Rest], 354 read_header(Stream, Rest) 355 ). 356 357header_content_length(Lines, Length) :- 358 member(Line, Lines), 359 split_string(Line, ":", "\t\s", [Field,Value]), 360 string_lower(Field, "content-length"), 361 !, 362 number_string(Length, Value). 363 364handle_reply(Stream, Batch, _Options), 365 is_list(Batch) => 366 maplist(get_dict(id), Batch, IDs), 367 batch_id(IDs, Id), 368 send_done(Stream, Id, true(Batch)). 369handle_reply(Stream, Reply, _Options), 370 #{ jsonrpc: "2.0", 371 result: Result, 372 id: Id } :< Reply => 373 send_done(Stream, Id, true(Result)). 374handle_reply(Stream, Reply, _Options), 375 #{ jsonrpc: "2.0", 376 error: Error, 377 id: Id } :< Reply => 378 send_done(Stream, Id, throw(error(json_rpc_error(Error), _))). 379handle_reply(Stream, Request, Options), 380 #{ jsonrpc: "2.0", 381 method: _Method, 382 params: _Params } :< Request => 383 option(server_module(M), Options), 384 json_rpc_server:json_rpc_dispatch_request(M, Stream, Request, Options). 385 386send_done(Stream, Id, Data) :- 387 retract(pending(Id, Stream, Action)), 388 !, 389 reply_done(Action, Id, Stream, Data). 390send_done(_Stream, Id, throw(error(json_rpc_error(Error), _))) :- 391 !, 392 print_message(error, error(json_rpc_error(Error, Id), _)). 393send_done(_Stream, _Id, _Result). 394 395reply_done(reply, Id, Stream, Data) => 396 json_result_queue(Stream, Queue), 397 thread_send_message(Queue, done(Id, Data)). 398reply_done(call(Goal), _Id, _Stream, true(Data)) => 399 catch_with_backtrace( 400 call(Goal, Data), 401 Error, 402 print_message(error, Error)). 403reply_done(call(_Goal), Id, _Stream, 404 throw(error(json_rpc_error(Error), _))) => 405 print_message(error, error(json_rpc_error(Error, Id), _)). 406 407handle_error(error(existence_error(stream, _), _), EOF) => 408 EOF = true. 409handle_error(Error, _EOF) => 410 print_message(error, Error).
416cleanup_client(Stream) :- 417 forall(retract(json_result_queue(Stream, Queue)), 418 do_cleanup(Stream, Queue)). 419 420do_cleanup(Stream, Queue) :- 421 close(Stream, [force(true)]), 422 message_queue_destroy(Queue)
JSON RPC client
This module implements a JSON RPC compliant client. The three predicates require a stream pair (see stream_pair/2) that connects us to a JSON RPC server.
*/