1/*  File:    canny/redis_streams.pl
    2    Author:  Roy Ratcliffe
    3    Created: Sep 24 2022
    4    Purpose: Canny Redis Streams
    5
    6Copyright (c) 2022, Roy Ratcliffe, Northumberland, United Kingdom
    7
    8Permission is hereby granted, free of charge,  to any person obtaining a
    9copy  of  this  software  and    associated   documentation  files  (the
   10"Software"), to deal in  the   Software  without  restriction, including
   11without limitation the rights to  use,   copy,  modify,  merge, publish,
   12distribute, sublicense, and/or sell  copies  of   the  Software,  and to
   13permit persons to whom the Software is   furnished  to do so, subject to
   14the following conditions:
   15
   16    The above copyright notice and this permission notice shall be
   17    included in all copies or substantial portions of the Software.
   18
   19THE SOFTWARE IS PROVIDED "AS IS", WITHOUT  WARRANTY OF ANY KIND, EXPRESS
   20OR  IMPLIED,  INCLUDING  BUT  NOT   LIMITED    TO   THE   WARRANTIES  OF
   21MERCHANTABILITY, FITNESS FOR A PARTICULAR   PURPOSE AND NONINFRINGEMENT.
   22IN NO EVENT SHALL THE AUTHORS  OR   COPYRIGHT  HOLDERS BE LIABLE FOR ANY
   23CLAIM, DAMAGES OR OTHER LIABILITY,  WHETHER   IN  AN ACTION OF CONTRACT,
   24TORT OR OTHERWISE, ARISING FROM,  OUT  OF   OR  IN  CONNECTION  WITH THE
   25SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   26
   27*/
   28
   29:- module(canny_redis_streams,
   30          [ xrange/4,                           % +Redis,+Key,-Entries,+Options
   31            xread/4,                            % +Redis,+Streams,-Reads,+Options
   32            xread_call/5,                       % +Redis,+Streams,:Goal,-Fields,+Options
   33            xread_call/6                        % +Redis,+Streams,:Goal,?Tag,-Fields,+Options
   34          ]).   35:- autoload(library(option), [option/3, option/2]).   36:- autoload(library(lists), [append/3]).   37:- autoload(library(redis), [redis/3]).   38
   39:- use_module(redis).   40
   41:- meta_predicate
   42    xread_call(+, +, :, -, +),
   43    xread_call(+, +, :, ?, -, +).
 xrange(+Redis, +Key:atom, -Entries:list, +Options:list) is det
Applies range selection to Key stream. Options optionally specify the start and end stream identifiers, defaulting to - and + respectively or in reverse if rev(true) included in Options list; the plus stream identifier stands for the maximum identifier, or the newest, whereas the minus identifier stands for the oldest. Option count(Count) limits the number of entries to read by Count items.

The following always unifies Entries with [].

xrange(Server, Key, Entries, [start(+)]).
xrange(Server, Key, Entries, [rev(true), start(-)]).
   60xrange(Redis, Key, Entries, Options) :-
   61    option(rev(Rev), Options, false),
   62    rev(Rev, XRange, StartDefault, EndDefault),
   63    option(start(Start), Options, StartDefault),
   64    option(end(End), Options, EndDefault),
   65    (   option(count(Count), Options)
   66    ->  Arguments = [count, Count]
   67    ;   Arguments = []
   68    ),
   69    Command =.. [XRange, Key, Start, End|Arguments],
   70    redis(Redis, Command, Entries).
   71
   72rev(false, xrange, -, +).
   73rev(true, xrevrange, +, -).
 xread(+Redis, +Streams:dict, -Reads:list, +Options:list) is semidet
Unifies Reads from Streams. Fails on time-out, if option block(Milliseconds) specifies a non-zero blocking delay.
Arguments:
Reads- by stream key. The reply has the form [Key, Entries] for each stream where each member of Entries has the form [StreamID, Fields] where Fields is an array of keys and values.
   85xread(Redis, Streams, Reads, Options) :-
   86    redis_keys_and_stream_ids(Streams, _, Keys, StreamIds),
   87    append(Keys, StreamIds, Arguments___),
   88    Arguments__ = [streams|Arguments___],
   89    (   option(block(Block), Options)
   90    ->  Arguments_ = [block, Block|Arguments__]
   91    ;   Arguments_ = Arguments__
   92    ),
   93    (   option(count(Count), Options)
   94    ->  Arguments = [count, Count|Arguments_]
   95    ;   Arguments = Arguments_
   96    ),
   97    Command =.. [xread|Arguments],
   98    redis(Redis, Command, Reads).
 xread_call(+Redis, +Streams, :Goal, -Fields, +Options) is semidet
 xread_call(+Redis, +Streams, :Goal, ?Tag, -Fields, +Options) is semidet
Reads Streams continuously until Goal succeeds or times out. Also supports a Redis time limit option so that blocking, if used, does not continue indefinately even on a very busy stream set. The limit applies to any of the given streams; it acts as a time threshold for continuous blocking failures.
  110xread_call(Redis, Streams, Goal, Fields, Options) :-
  111    xread(Redis, Streams, Reads, Options),
  112    redis_last_streams(Reads, _, Streams_),
  113    xread_call_(Redis, Streams.put(Streams_), Goal, Reads, Fields, Options).
  114
  115xread_call_(_Redis, _Streams, Goal, Reads, Fields, _Options) :-
  116    redis_stream_read(Reads, Key, StreamId, Fields),
  117    call(Goal, Key, StreamId, Fields),
  118    !.
  119xread_call_(Redis, Streams, Goal, _Reads, Fields, Options) :-
  120    (   option(threshold(Threshold), Options)
  121    ->  dict_pairs(Streams, _, Pairs),
  122        maplist(xread_call__, Pairs, RedisTimes),
  123        max_list(RedisTimes, RedisTime),
  124        RedisTime < Threshold
  125    ;   true
  126    ),
  127    xread_call(Redis, Streams, Goal, Fields, Options).
  128
  129xread_call__(_Key-StreamId, RedisTime) :-
  130    redis_stream_id(StreamId, RedisTime, _Seq).
  131
  132xread_call(Redis, Streams, Goal, Tag, Fields, Options) :-
  133    xread(Redis, Streams, Reads, Options),
  134    redis_last_streams(Reads, _, Streams_),
  135    xread_call_(Redis, Streams.put(Streams_), Goal, Reads, Tag, Fields, Options).
  136
  137xread_call_(_Redis, _Streams, Goal, Reads, Tag, Fields, _Options) :-
  138    redis_stream_read(Reads, Key, StreamId, Tag, Fields),
  139    call(Goal, Key, StreamId, Tag, Fields),
  140    !.
  141xread_call_(Redis, Streams, Goal, _Reads, Tag, Fields, Options) :-
  142    (   option(threshold(Threshold), Options)
  143    ->  dict_pairs(Streams, _, Pairs),
  144        maplist(xread_call__, Pairs, RedisTimes),
  145        max_list(RedisTimes, RedisTime),
  146        RedisTime < Threshold
  147    ;   true
  148    ),
  149    xread_call(Redis, Streams, Goal, Tag, Fields, Options)