1/* File: canny/redis_streams.pl 2 Author: Roy Ratcliffe 3 Created: Sep 24 2022 4 Purpose: Canny Redis Streams 5 6Copyright (c) 2022, Roy Ratcliffe, Northumberland, United Kingdom 7 8Permission is hereby granted, free of charge, to any person obtaining a 9copy of this software and associated documentation files (the 10"Software"), to deal in the Software without restriction, including 11without limitation the rights to use, copy, modify, merge, publish, 12distribute, sublicense, and/or sell copies of the Software, and to 13permit persons to whom the Software is furnished to do so, subject to 14the following conditions: 15 16 The above copyright notice and this permission notice shall be 17 included in all copies or substantial portions of the Software. 18 19THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 20OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 21MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 22IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 23CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 24TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 25SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 26 27*/ 28 29:- module(canny_redis_streams, 30 [ xrange/4, % +Redis,+Key,-Entries,+Options 31 xread/4, % +Redis,+Streams,-Reads,+Options 32 xread_call/5, % +Redis,+Streams,:Goal,-Fields,+Options 33 xread_call/6 % +Redis,+Streams,:Goal,?Tag,-Fields,+Options 34 ]). 35:- autoload(library(option), [option/3, option/2]). 36:- autoload(library(lists), [append/3]). 37:- autoload(library(redis), [redis/3]). 38 39:- use_module(redis). 40 41:- meta_predicate 42 xread_call(, , , , ), 43 xread_call(, , , , , ).
- and +
respectively or in reverse if rev(true) included in Options list;
the plus stream identifier stands for the maximum identifier, or
the newest, whereas the minus identifier stands for the oldest.
Option count(Count) limits the number of entries to read by
Count items.
The following always unifies Entries with [].
xrange(Server, Key, Entries, [start(+)]). xrange(Server, Key, Entries, [rev(true), start(-)]).
60xrange(Redis, Key, Entries, Options) :- 61 option(rev(Rev), Options, false), 62 rev(Rev, XRange, StartDefault, EndDefault), 63 option(start(Start), Options, StartDefault), 64 option(end(End), Options, EndDefault), 65 ( option(count(Count), Options) 66 -> Arguments = [count, Count] 67 ; Arguments = [] 68 ), 69 Command =.. [XRange, Key, Start, End|Arguments], 70 redis(Redis, Command, Entries). 71 72rev(false, xrange, -, +). 73rev(true, xrevrange, +, -).
block(Milliseconds) specifies a non-zero blocking delay.
   85xread(Redis, Streams, Reads, Options) :-
   86    redis_keys_and_stream_ids(Streams, _, Keys, StreamIds),
   87    append(Keys, StreamIds, Arguments___),
   88    Arguments__ = [streams|Arguments___],
   89    (   option(block(Block), Options)
   90    ->  Arguments_ = [block, Block|Arguments__]
   91    ;   Arguments_ = Arguments__
   92    ),
   93    (   option(count(Count), Options)
   94    ->  Arguments = [count, Count|Arguments_]
   95    ;   Arguments = Arguments_
   96    ),
   97    Command =.. [xread|Arguments],
   98    redis(Redis, Command, Reads).110xread_call(Redis, Streams, Goal, Fields, Options) :- 111 xread(Redis, Streams, Reads, Options), 112 redis_last_streams(Reads, _, Streams_), 113 xread_call_(Redis, Streams.put(Streams_), Goal, Reads, Fields, Options). 114 115xread_call_(_Redis, _Streams, Goal, Reads, Fields, Options) :- 116 redis_stream_read(Reads, Key, StreamId, Fields), 117 call(Goal, Key, StreamId, Fields), 118 select_option(key(Key), Options, _, _), 119 select_option(id(StreamId), Options, _, _), 120 !. 121xread_call_(Redis, Streams, Goal, _Reads, Fields, Options) :- 122 streams_options(Streams, Options), 123 xread_call(Redis, Streams, Goal, Fields, Options). 124 125xread_call(Redis, Streams, Goal, Tag, Fields, Options) :- 126 xread(Redis, Streams, Reads, Options), 127 redis_last_streams(Reads, _, Streams_), 128 xread_call_(Redis, Streams.put(Streams_), Goal, Reads, Tag, Fields, Options). 129 130xread_call_(_Redis, _Streams, Goal, Reads, Tag, Fields, Options) :- 131 redis_stream_read(Reads, Key, StreamId, Tag, Fields), 132 call(Goal, Key, StreamId, Tag, Fields), 133 select_option(key(Key), Options, _, _), 134 select_option(id(StreamId), Options, _, _), 135 !. 136xread_call_(Redis, Streams, Goal, _Reads, Tag, Fields, Options) :- 137 streams_options(Streams, Options), 138 xread_call(Redis, Streams, Goal, Tag, Fields, Options). 139 140streams_options(Streams, Options) :- 141 ( option(threshold(Threshold), Options) 142 -> !, 143 dict_pairs(Streams, _, Pairs), 144 maplist(stream_redis_time, Pairs, RedisTimes), 145 max_list(RedisTimes, RedisTime), 146 RedisTime < Threshold 147 ; true 148 ). 149 150stream_redis_time(_Key-StreamId, RedisTime) :- 151 redis_stream_id(StreamId, RedisTime, _Seq)