View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        jan@swi-prolog.org
    5    WWW:           https://www.swi-prolog.org
    6    Copyright (c)  2025, SWI-Prolog Solutions b.v.
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(json_rpc_client,
   36          [ json_call/4,          % +Stream, +Goal, -Result, +Options
   37            json_notify/3,        % +Stream, +Goal, +Options
   38            json_batch/5,         % +Stream, +Notifications, +Calls, -Results, +Options
   39            json_full_duplex/2    % +Stream, :Options
   40          ]).   41:- use_module(library(json_rpc_common)).   42:- autoload(library(json), [json_read_dict/3]).   43:- autoload(library(option), [option/2, meta_options/3]).   44:- use_module(library(debug), [debug/3]).   45:- autoload(library(apply), [maplist/4, maplist/3]).   46:- autoload(library(lists), [append/3, member/2]).   47:- autoload(library(terms), [mapsubterms/3]).   48:- autoload(library(http/http_stream), [stream_range_open/3]).   49:- autoload(library(error), [permission_error/3]).   50
   51:- meta_predicate
   52    json_call(+, +, -, :),
   53    json_full_duplex(+, :).

JSON RPC client

This module implements a JSON RPC compliant client. The three predicates require a stream pair (see stream_pair/2) that connects us to a JSON RPC server.

*/

   63:- dynamic
   64    json_result_queue/2,              % Stream, Queue
   65    pending/3.                        % Id, Stream, Action
 json_call(+Stream, +Goal, -Result, +Options) is det
Run Goal on a JSON RPC service identified by Stream and wait for Result. This predicate may be called from multiple threads. As replies come in in arbitrary order, this predicate starts a thread the reads the replies from Stream and informs the calling thread using a Prolog message queue.

If Stream is closed this library terminates the thread and related message queue.

Options are passed to json_write_dict/3 and thread_get_message/3. Additional options:

async(:Closure)
Do not wait for the request to complete. Instead, call call(Closure, Data) from the client reading thread when the request is completed. If Closure is true, ignore the reply. As we cannot inject errors as exceptions in the calling thread, possible errors are printed.
thread_alias(+Atom)
Alias name to use for the thread that deals with incomming replies and requests. Defaults to json_rpc_client:<N>, where N is a unique number.
Arguments:
Goal- is a callable term. The functor name is the method. If there is a single argument that is a dict, we invoke a JSON-RPC method using named arguments. If there is a single argument that is a list, use the elements of the list as positional arguments. If there are zero or more than one arguments use these as positional arguments. Examples:
TermMethodTypeJSON (params)

f(#{a:1,b:2})fnamed{"a":1, "b":2}
f(["a", 42])fpositional["a", 42]
f([#{"a":1}])fpositional[{"a":1}]
f()fpositional[]
f("a", 42)fpositional["a", 42]

Options processed:

  110json_call(Stream, Goal, Result, Options0) :-
  111    meta_options(is_meta, Options0, Options),
  112    Goal =.. [Name|Args0],
  113    call_args(Args0, Args),
  114    client_id(Id, Options),
  115    debug(json_rpc, 'Sending request ~p', [Id]),
  116    (   option(async(AsyncGoal), Options)
  117    ->  (   AsyncGoal = _:true
  118        ->  true
  119        ;   asserta(pending(Id, Stream, call(AsyncGoal)))
  120        ),
  121        Async = true
  122    ;   asserta(pending(Id, Stream, reply))
  123    ),
  124    (   Args == []
  125    ->  json_rpc_send(Stream,
  126                      #{ jsonrpc: "2.0",
  127                         id: Id,
  128                         method: Name
  129                       }, Options)
  130    ;   json_rpc_send(Stream,
  131                      #{ jsonrpc: "2.0",
  132                         id: Id,
  133                         method: Name,
  134                         params: Args
  135                       }, Options)
  136    ),
  137    (   Async == true
  138    ->  true
  139    ;   json_wait_reply(Stream, Id, Result, Options)
  140    ).
  141
  142is_meta(async).
  143
  144call_args([Arg], Args), is_dict(Arg) =>
  145    Args = Arg.
  146call_args([Args0], Args), is_list(Args0) =>
  147    Args = Args0.
  148call_args(Args0, Args) =>
  149    Args = Args0.
  150
  151json_wait_reply(Stream, Id, Result, Options) :-
  152    with_mutex(json_rpc_client,
  153               get_json_result_queue(Stream, Queue, Options)),
  154    debug(json_rpc, 'Waiting for reply', []),
  155    (   thread_get_message(Queue, done(Id, Result0), Options)
  156    ->  map_reply(Result0, Result1, Options),
  157        debug(json_rpc, 'Got reply for ~p', [Id]),
  158        (   Result1 = throw(Error)
  159        ->  throw(Error)
  160        ;   Result1 = true(Result)
  161        )
  162    ;   fail
  163    ).
  164
  165map_reply(Reply0, Reply, Options) :-
  166    option(value_string_as(atom), Options),
  167    !,
  168    mapsubterms(map_string, Reply0, Reply).
  169map_reply(Reply, Reply, _).
  170
  171map_string(String, Atom) :-
  172    string(String),
  173    atom_string(Atom,String).
  174
  175client_id(Id, Options) :-
  176    option(id(Id), Options),
  177    !.
  178client_id(Id, _Options) :-
  179    flag(json_client_id, Id, Id+1).
 json_notify(+Stream, +Goal, +Options) is det
Run Goal on a JSON RPC service identified by Stream without waiting for the result.
  186json_notify(Stream, Goal, Options) :-
  187    Goal =.. [Name|Args0],
  188    call_args(Args0, Args),
  189    (   Args == []
  190    ->  json_rpc_send(Stream,
  191                      #{ jsonrpc: "2.0",
  192                         method: Name
  193                       }, Options)
  194    ;   json_rpc_send(Stream,
  195                      #{ jsonrpc: "2.0",
  196                         method: Name,
  197                         params: Args
  198                       }, Options)
  199    ).
 json_batch(+Stream, +Notifications:list, +Calls:list, -Results:list, +Options) is det
Run a batch of notifications and normal calls on the JSON server at the other end of Stream. On success, Result is unified to a list with the same length as Calls. Each element either contains a value, similar to json_call/4 or a term error(Dict), where Dict holds the code, message and optional data field. Note that error(Dict) is not a valid JSON type and this is thus unambiguous. While the JSON RPC standard allows the server to process the messages in any order and allows for concurrent processing, all results are sent in one message and this client ensures the elements of the Results list are in the same order as the Calls list. If the Calls list is empty this predicate does not wait for a reply.
  216json_batch(Stream, Notifications, Calls, Results, Options) :-
  217    maplist(call_to_json_request, Calls, IDs, Requests1),
  218    maplist(call_to_json_notification, Notifications, Requests2),
  219    append(Requests1, Requests2, Batch),
  220    (   IDs == []
  221    ->  true
  222    ;   batch_id(IDs, BatchId),
  223        asserta(pending(BatchId, Stream, reply))
  224    ),
  225    json_rpc_send(Stream, Batch, Options),
  226    flush_output(Stream),
  227    (   var(BatchId)
  228    ->  true
  229    ;   json_wait_reply(Stream, BatchId, Results0, Options),
  230        sort(id, <, Results0, Results1),
  231        maplist(batch_result, Results1, Results)
  232    ).
  233
  234call_to_json_request(Goal, Id, Request) :-
  235    Goal =.. [Name|Args],
  236    client_id(Id, []),
  237    Request = #{ jsonrpc: "2.0",
  238                 id: Id,
  239                 method: Name,
  240                 params: Args
  241               }.
  242
  243call_to_json_notification(Goal, Notification) :-
  244    Goal =.. [Name|Args],
  245    Notification = #{ jsonrpc: "2.0",
  246                      method: Name,
  247                      params: Args
  248                    }.
  249
  250batch_id(IDs, Id) :-
  251    sort(IDs, Canonical),
  252    variant_sha1(Canonical, Id).
  253
  254batch_result(Reply, Result), Result0 = Reply.get(result) =>
  255    Result = Result0.
  256batch_result(Reply, Result), Result0 = Reply.get(error) =>
  257    Result = error(Result0).
  258
  259
  260                /*******************************
  261                *        INCOMMING DATA        *
  262                *******************************/
 json_full_duplex(+Stream, :Options) is det
Start the thread for incomming data and on requests, dispatch them using library(jso_rpc_server) in the module derived from the Options list.
  270json_full_duplex(Stream, Options) :-
  271    with_mutex(json_rpc_client, json_full_duplex_(Stream, Options)).
  272
  273json_full_duplex_(Stream, _) :-
  274    json_result_queue(Stream, _Queue),
  275    !,
  276    permission_error(json, full_duplex, Stream).
  277json_full_duplex_(Stream, M:Options) :-
  278    get_json_result_queue(Stream, _Queue,
  279                          [server_module(M)|Options]).
 get_json_result_queue(+Stream, -Queue, +Options) is det
Find the result queue associated to Stream. If this does not exist, create one, as well as a thread that handles the incomming results and dispatches these to the queue.
  288get_json_result_queue(Stream, Queue, _Options) :-
  289    json_result_queue(Stream, Queue),
  290    !.
  291get_json_result_queue(Stream, Queue, Options) :-
  292    message_queue_create(Queue),
  293    asserta(json_result_queue(Stream, Queue)),
  294    (   option(thread_alias(Alias), Options)
  295    ->  true
  296    ;   flag(json_rpc_client_dispatcher, N, N+1),
  297        format(atom(Alias), 'json_rpc_client:~w', [N])
  298    ),
  299    thread_create(
  300        handle_result_loop(Stream, Options),
  301        _Id,
  302        [ detached(true),
  303          alias(Alias),
  304          inherit_from(main),
  305          at_exit(cleanup_client(Stream))
  306        ]).
  307
  308handle_result_loop(Stream, Options) :-
  309    handle_result(Stream, EOF, Options),
  310    (   EOF == true
  311    ->  true
  312    ;   handle_result_loop(Stream, Options)
  313    ).
  314
  315handle_result(Stream, EOF, Options) :-
  316    Error = error(Formal, _),
  317    catch(json_receive(Stream, Reply, Options),
  318          Error,
  319          true),
  320    debug(json_rpc, 'Received ~p', [Reply]),
  321    (   Reply == end_of_file(true)
  322    ->  EOF = true
  323    ;   var(Formal)
  324    ->  handle_reply(Stream, Reply, Options)
  325    ;   handle_error(Error, EOF)
  326    ).
  327
  328json_receive(Stream, Reply, Options) :-
  329    option(header(true), Options),
  330    !,
  331    read_header(Stream, Lines),
  332    (   Lines == []
  333    ->  Reply = end_of_file(true)
  334    ;   header_content_length(Lines, Length),
  335        setup_call_cleanup(
  336            stream_range_open(Stream, Data, [size(Length)]),
  337            json_read_dict(Data,
  338                           Reply,
  339                           Options),
  340            close(Data))
  341    ).
  342json_receive(Stream, Reply, Options) :-
  343    json_read_dict(Stream,
  344                   Reply,
  345                   [ end_of_file(end_of_file(true))
  346                   | Options
  347                   ]).
  348
  349read_header(Stream, Lines) :-
  350    read_string(Stream, "\n", "\r\t ", Sep, Line),
  351    (   (Line == "" ; Sep == -1)
  352    ->  Lines = []
  353    ;   Lines = [Line|Rest],
  354        read_header(Stream, Rest)
  355    ).
  356
  357header_content_length(Lines, Length) :-
  358    member(Line, Lines),
  359    split_string(Line, ":", "\t\s", [Field,Value]),
  360    string_lower(Field, "content-length"),
  361    !,
  362    number_string(Length, Value).
  363
  364handle_reply(Stream, Batch, _Options),
  365    is_list(Batch) =>
  366    maplist(get_dict(id), Batch, IDs),
  367    batch_id(IDs, Id),
  368    send_done(Stream, Id, true(Batch)).
  369handle_reply(Stream, Reply, _Options),
  370    #{ jsonrpc: "2.0",
  371       result: Result,
  372       id: Id } :< Reply =>
  373    send_done(Stream, Id, true(Result)).
  374handle_reply(Stream, Reply, _Options),
  375    #{ jsonrpc: "2.0",
  376       error: Error,
  377       id: Id } :< Reply =>
  378    send_done(Stream, Id, throw(error(json_rpc_error(Error), _))).
  379handle_reply(Stream, Request, Options),
  380    #{ jsonrpc: "2.0",
  381       method: _Method,
  382       params: _Params } :< Request =>
  383    option(server_module(M), Options),
  384    json_rpc_server:json_rpc_dispatch_request(M, Stream, Request, Options).
  385
  386send_done(Stream, Id, Data) :-
  387    retract(pending(Id, Stream, Action)),
  388    !,
  389    reply_done(Action, Id, Stream, Data).
  390send_done(_Stream, Id, throw(error(json_rpc_error(Error), _))) :-
  391    !,
  392    print_message(error, error(json_rpc_error(Error, Id), _)).
  393send_done(_Stream, _Id, _Result).
  394
  395reply_done(reply, Id, Stream, Data) =>
  396    json_result_queue(Stream, Queue),
  397    thread_send_message(Queue, done(Id, Data)).
  398reply_done(call(Goal), _Id, _Stream, true(Data)) =>
  399    catch_with_backtrace(
  400        call(Goal, Data),
  401        Error,
  402        print_message(error, Error)).
  403reply_done(call(_Goal), Id, _Stream,
  404           throw(error(json_rpc_error(Error), _))) =>
  405    print_message(error, error(json_rpc_error(Error, Id), _)).
  406
  407handle_error(error(existence_error(stream, _), _), EOF) =>
  408    EOF = true.
  409handle_error(Error, _EOF) =>
  410    print_message(error, Error).
 cleanup_client(+Stream) is det
Thread exit handler to remove the registration and queue.
  416cleanup_client(Stream) :-
  417    forall(retract(json_result_queue(Stream, Queue)),
  418           do_cleanup(Stream, Queue)).
  419
  420do_cleanup(Stream, Queue) :-
  421    close(Stream, [force(true)]),
  422    message_queue_destroy(Queue)