1/*  File:    canny/exe.pl
    2    Author:  Roy Ratcliffe
    3    Created: May  5 2021
    4    Purpose: Canny Executables
    5
    6Copyright (c) 2021, Roy Ratcliffe, Northumberland, United Kingdom
    7
    8Permission is hereby granted, free of charge,  to any person obtaining a
    9copy  of  this  software  and    associated   documentation  files  (the
   10"Software"), to deal in  the   Software  without  restriction, including
   11without limitation the rights to  use,   copy,  modify,  merge, publish,
   12distribute, sublicense, and/or sell  copies  of   the  Software,  and to
   13permit persons to whom the Software is   furnished  to do so, subject to
   14the following conditions:
   15
   16    The above copyright notice and this permission notice shall be
   17    included in all copies or substantial portions of the Software.
   18
   19THE SOFTWARE IS PROVIDED "AS IS", WITHOUT  WARRANTY OF ANY KIND, EXPRESS
   20OR  IMPLIED,  INCLUDING  BUT  NOT   LIMITED    TO   THE   WARRANTIES  OF
   21MERCHANTABILITY, FITNESS FOR A PARTICULAR   PURPOSE AND NONINFRINGEMENT.
   22IN NO EVENT SHALL THE AUTHORS  OR   COPYRIGHT  HOLDERS BE LIABLE FOR ANY
   23CLAIM, DAMAGES OR OTHER LIABILITY,  WHETHER   IN  AN ACTION OF CONTRACT,
   24TORT OR OTHERWISE, ARISING FROM,  OUT  OF   OR  IN  CONNECTION  WITH THE
   25SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   26
   27*/
   28
   29:- module(canny_exe,
   30          [ exe/3                       % +Executable,+Arguments,+Options
   31          ]).
   32:- predicate_options(exe/3, 3,
   33                     [ status(integer),
   34                       pass_to(process_create/3, 3)
   35                     ]).   36:- autoload(library(process), [process_create/3]).   37:- autoload(library(thread), [concurrent/3]).
 exe(+Executable, +Arguments, +Options) is semidet
Implements an experimental approach to wrapping process_create/3 using concurrent/3. It operates concurrent pipe reads, pipe writes and process waits. Predicate parameters match process_create/3 but with a few minor but key improvements. New Options terms offer additional enhanced pipe streaming arguments. See partially-enumerated list below.

If Options specifies any of the above terms, exe/3 prepares goals to write, read and wait concurrently as necessary according to the required configuration. This implies that reading standard output and waiting for the process status happens at the same time. Same goes for writing to standard input. The number of concurrent threads therefore exactly matches the number of concurrent process goals. This goes for clean-up goals as well. Predicate concurrent/3 does not allow zero threads however; it throws a type_error. The implementation always assigns at least one thread which amounts to reusing the calling thread non-concurrently.

All the std terms above can also take a stream options list, so can override default encoding on the process pipes. The following example illustrates. It sends a friendly "hello" in Mandarin Chinese through the Unix tee command which relays the stream to standard output and tees it off to /dev/stderr or standard error for that process. Note that exe/3 decodes the output and error separately, one as an atom but the other as a string.

exe(path(tee),
    [ '/dev/stderr'
    ],
    [ stdin(atom(你好, [encoding(utf8)])),
      stdout(atom(A, [encoding(utf8)])),
      stderr(string(B, [encoding(utf8)])),
      status(exit(0))
    ]).

Implementation Notes

Important to close the input stream immediately after writing and during the call phase. Do not wait for the clean-up phase to close the input stream, otherwise the process will never terminate. It will hang while waiting for standard input to close, assuming the sub-process reads the input.

This leads to a key caveat when using a single concurrent thread. A single callee thread executes the primary read-write goals in sequential order. The current implementation preserves the Options ordering. Hence output should always preceed input, i.e. writing to standard input should go first before attempting to read from standard output. Otherwise the sequence will block indefinitely. For this reason, the number of concurrent threads matches the number of concurrent goals. This abviates the sequencing of the goals because all goals implicitly execute concurrently.

To be done
- Take care when using the status(Status) option unless you have stdin(null) on Windows because, for some sub-processes, the goals never complete.
  109exe(Executable, Arguments, Options) :-
  110    exe(Options, Options_, Calls, Cleanups),
  111    threads(Calls, CallThreads),
  112    threads(Cleanups, CleanupThreads),
  113    setup_call_cleanup(
  114        process_create(Executable, Arguments, Options_),
  115        concurrent(CallThreads, Calls, []),
  116        concurrent(CleanupThreads, Cleanups, [])).
  117
  118threads(Goals, NumberOfGoals) :-
  119    length(Goals, NumberOfGoals),
  120    NumberOfGoals >= 1,
  121    !.
  122threads(_, 1).
  123
  124exe([], [], [], []).
  125exe([Option0|Options0], [Option|Options], [Call|Calls], [Cleanup|Cleanups]) :-
  126    opt(Option0, Option, Call, Cleanup),
  127    !,
  128    exe(Options0, Options, Calls, Cleanups).
  129exe([Option|Options0], [Option|Options], Calls, Cleanups) :-
  130    exe(Options0, Options, Calls, Cleanups).
  131
  132opt(stdin(Compound), stdin(pipe(Stream)),
  133    (   format(Stream, '~s', [S]),
  134        close(Stream)
  135    ), true) :-
  136    compound(Compound),
  137    compound_name_arguments(Compound, Name, [S]),
  138    s(Name),
  139    !.
  140opt(stdin(Compound), stdin(pipe(Stream, Options)),
  141    (   format(Stream, '~s', [S]),
  142        close(Stream)
  143    ), true) :-
  144    compound(Compound),
  145    compound_name_arguments(Compound, Name, [S, Options]),
  146    s(Name),
  147    !.
  148opt(Compound0, Compound, Call, Cleanup) :-
  149    compound(Compound0),
  150    compound_name_arguments(Compound0, Name, [Argument0]),
  151    std(Name),
  152    !,
  153    std(Argument0, Argument, Call, Cleanup),
  154    compound_name_arguments(Compound, Name, [Argument]).
  155opt(status(Status), process(PID), process_wait(PID, Status), true).
  156
  157s(codes).
  158s(atom).
  159s(string).
  160
  161std(stdout).
  162std(stderr).
  163
  164std(codes(Codes), pipe(Stream),
  165    read_stream_to_codes(Stream, Codes), close(Stream)).
  166std(atom(Atom), pipe(Stream),
  167    (   read_stream_to_codes(Stream, Codes),
  168        atom_codes(Atom, Codes)
  169    ), close(Stream)).
  170std(string(String), pipe(Stream),
  171    (   read_stream_to_codes(Stream, Codes),
  172        string_codes(String, Codes)
  173    ), close(Stream)).
  174std(codes(Codes, Options), pipe(Stream, Options),
  175    read_stream_to_codes(Stream, Codes), close(Stream)).
  176std(atom(Atom, Options), pipe(Stream, Options),
  177    (   read_stream_to_codes(Stream, Codes),
  178        atom_codes(Atom, Codes)
  179    ), close(Stream)).
  180std(string(String, Options), pipe(Stream, Options),
  181    (   read_stream_to_codes(Stream, Codes),
  182        string_codes(String, Codes)
  183    ), close(Stream))