1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker and Sean Charles 4 E-mail: jan@swi-prolog.org and <sean at objitsu dot com> 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2013-2022, Sean Charles 7 SWI-Prolog Solutions b.v. 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34 35 NOTE 36 37 The original code was subject to the MIT licence and written by 38 Sean Charles. Re-licenced to standard SWI-Prolog BSD-2 with 39 permission from Sean Charles. 40*/ 41 42:- module(redis, 43 [ redis_server/3, % +Alias, +Address, +Options 44 redis_connect/1, % -Connection 45 redis_connect/3, % -Connection, +Host, +Port 46 redis_disconnect/1, % +Connection 47 redis_disconnect/2, % +Connection, +Options 48 % Queries 49 redis/1, % +Request 50 redis/2, % +Connection, +Request 51 redis/3, % +Connection, +Request, -Reply 52 % High level queries 53 redis_get_list/3, % +Redis, +Key, -List 54 redis_get_list/4, % +Redis, +Key, +ChunkSize, -List 55 redis_set_list/3, % +Redis, +Key, +List 56 redis_get_hash/3, % +Redis, +Key, -Data:dict 57 redis_set_hash/3, % +Redis, +Key, +Data:dict 58 redis_scan/3, % +Redis, -LazyList, +Options 59 redis_sscan/4, % +Redis, +Set, -LazyList, +Options 60 redis_hscan/4, % +Redis, +Hash, -LazyList, +Options 61 redis_zscan/4, % +Redis, +Set, -LazyList, +Options 62 % Publish/Subscribe 63 redis_subscribe/4, % +Redis, +Channels, -Id, +Options 64 redis_subscribe/2, % +Id, +Channels 65 redis_unsubscribe/2, % +Id, +Channels 66 redis_current_subscription/2, % ?Id,?Channels 67 redis_write/2, % +Redis, +Command 68 redis_read/2, % +Redis, -Reply 69 % Building blocks 70 redis_array_dict/3, % ?Array, ?Tag, ?Dict 71 % Admin stuff 72 redis_property/2, % +Reply, ?Property 73 redis_current_command/2, % +Redis,?Command 74 redis_current_command/3 % +Redis, +Command, -Properties 75 ]). 76:- autoload(library(socket), [tcp_connect/3]). 77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]). 78:- autoload(library(broadcast), [broadcast/1]). 79:- autoload(library(error), 80 [ must_be/2, 81 type_error/2, 82 instantiation_error/1, 83 uninstantiation_error/1, 84 existence_error/2, 85 existence_error/3 86 ]). 87:- autoload(library(lazy_lists), [lazy_list/2]). 88:- autoload(library(lists), [append/3, member/2]). 89:- autoload(library(option), [merge_options/3, option/2, 90 option/3, select_option/4]). 91:- autoload(library(pairs), [group_pairs_by_key/2]). 92:- autoload(library(time), [call_with_time_limit/2]). 93:- use_module(library(debug), [debug/3, assertion/1]). 94:- use_module(library(settings), [setting/4, setting/2]). 95:- if(exists_source(library(ssl))). 96:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]). 97:- endif. 98 99:- use_foreign_library(foreign(redis4pl)). 100 101:- setting(max_retry_count, nonneg, 8640, % one day 102 "Max number of retries"). 103:- setting(max_retry_wait, number, 10, 104 "Max time to wait between recovery attempts"). 105:- setting(sentinel_timeout, number, 0.2, 106 "Time to wait for a sentinel"). 107 108:- predicate_options(redis_server/3, 3, 109 [ pass_to(redis:redis_connect/3, 3) 110 ]). 111:- predicate_options(redis_connect/3, 3, 112 [ reconnect(boolean), 113 user(atom), 114 password(atomic), 115 version(between(2,3)) 116 ]). 117:- predicate_options(redis_disconnect/2, 2, 118 [ force(boolean) 119 ]). 120:- predicate_options(redis_scan/3, 3, 121 [ match(atomic), 122 count(nonneg), 123 type(atom) 124 ]). 125% Actually not passing, but the same 126:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 127:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 128:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).
149:- dynamic server/3. 150 151:- dynamic ( connection/2, % ServerName, Stream 152 sentinel/2 % Pool, Address 153 ) as volatile.
default
points at localhost:6379
with no connect options. The default
server is used for redis/1 and redis/2 and may be changed using this
predicate. Options are described with redis_connect/3.
Connections established this way are by default automatically
reconnected if the connection is lost for some reason unless a
reconnect(false)
option is specified.
167redis_server(Alias, Address, Options) :- 168 must_be(ground, Alias), 169 retractall(server(Alias, _, _)), 170 asserta(server(Alias, Address, Options)). 171 172server(default, localhost:6379, []).
redis_connect(+Address,
-Connection, +Options)
. redis_connect/1 is equivalent to
redis_connect(localhost:6379, Connection, [])
. Options:
true
, try to reconnect to the service when the connection
seems lost. Default is true
for connections specified using
redis_server/3 and false
for explictly opened connections.version(3)
and password(Password)
are specified, these
are used to authenticate using the HELLO command.3
, the HELLO command is used to upgrade the protocol.cacert
, key
and cert
options.sentinel(MasterName)
to enable contacting a network of Redis servers guarded by a
sentinel network.Instead of using these predicates, redis/2 and redis/3 are normally used with a server name argument registered using redis_server/3. These predicates are meant for creating a temporary paralel connection or using a connection with a blocking call.
228redis_connect(Conn) :- 229 redis_connect(default, Conn, []). 230 231redis_connect(Conn, Host, Port) :- 232 var(Conn), 233 ground(Host), ground(Port), 234 !, % GNU-Prolog compatibility 235 redis_connect(Host:Port, Conn, []). 236redis_connect(Server, Conn, Options) :- 237 atom(Server), 238 !, 239 ( server(Server, Address, DefaultOptions) 240 -> merge_options(Options, DefaultOptions, Options2), 241 do_connect(Server, Address, Conn, [address(Address)|Options2]) 242 ; existence_error(redis_server, Server) 243 ). 244redis_connect(Address, Conn, Options) :- 245 do_connect(Address, Address, Conn, [address(Address)|Options]).
redis_connection(Id, Stream, Failures, Options)
253do_connect(Id, sentinel(Pool), Conn, Options) => 254 sentinel_master(Id, Pool, Conn, Options). 255do_connect(Id, Address0, Conn, Options) => 256 tcp_address(Address0, Address), 257 tcp_connect(Address, Stream0, Options), 258 tls_upgrade(Address, Stream0, Stream, Options), 259 Conn = redis_connection(Id, Stream, 0, Options), 260 hello(Conn, Options). 261 262tcp_address(unix(Path), Path) :- 263 !. % Using an atom is ambiguous 264tcp_address(Address, Address).
tls(true)
is specified.270:- if(current_predicate(ssl_context/3)). 271tls_upgrade(Host:_Port, Raw, Stream, Options) :- 272 option(tls(true), Options), 273 !, 274 must_have_option(cacert(CacertFile), Options), 275 must_have_option(key(KeyFile), Options), 276 must_have_option(cert(CertFile), Options), 277 ssl_context(client, SSL, 278 [ host(Host), 279 certificate_file(CertFile), 280 key_file(KeyFile), 281 cacerts([file(CacertFile)]), 282 cert_verify_hook(tls_verify), 283 close_parent(true) 284 ]), 285 stream_pair(Raw, RawRead, RawWrite), 286 ssl_negotiate(SSL, RawRead, RawWrite, Read, Write), 287 stream_pair(Stream, Read, Write). 288:- endif. 289tls_upgrade(_, Stream, Stream, _). 290 291:- if(current_predicate(ssl_context/3)).
redis-cli
), we accept the
certificate as long as it is signed, not verifying the hostname.299:- public tls_verify/5. 300tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :- 301 !. 302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :- 303 !. 304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :- 305 fail. 306 307:- endif.
313sentinel_master(Id, Pool, Master, Options) :- 314 must_have_option(sentinels(Sentinels), Options), 315 sentinel_auth(Options, Options1), 316 setting(sentinel_timeout, TMO), 317 ( sentinel(Pool, Sentinel) 318 ; member(Sentinel, Sentinels) 319 ), 320 catch(call_with_time_limit( 321 TMO, 322 do_connect(Id, Sentinel, Conn, 323 [sentinel(true)|Options1])), 324 Error, 325 (print_message(warning, Error),fail)), 326 !, 327 debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]), 328 call_cleanup( 329 query_sentinel(Pool, Conn, Sentinel, MasterAddr), 330 redis_disconnect(Conn)), 331 debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]), 332 do_connect(Id, MasterAddr, Master, Options), 333 debug(redis(sentinel), 'Connected to claimed master', []), 334 redis(Master, role, Role), 335 ( Role = [master|_Slaves] 336 -> debug(redis(sentinel), 'Verified role at ~p', [MasterAddr]) 337 ; redis_disconnect(Master), 338 debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]), 339 sleep(TMO), 340 sentinel_master(Id, Pool, Master, Options) 341 ). 342 343sentinel_auth(Options0, Options) :- 344 option(sentinel_user(User), Options0), 345 option(sentinel_password(Passwd), Options0), 346 !, 347 merge_options([user(User), password(Passwd)], Options0, Options). 348sentinel_auth(Options0, Options) :- 349 select_option(password(_), Options0, Options, _). 350 351 352query_sentinel(Pool, Conn, Sentinel, Host:Port) :- 353 redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData), 354 MasterData = [Host,Port], 355 redis(Conn, sentinel(sentinels, Pool), Peers), 356 transaction(update_known_sentinels(Pool, Sentinel, Peers)). 357 358update_known_sentinels(Pool, Sentinel, Peers) :- 359 retractall(sentinel(Pool, _)), 360 maplist(update_peer_sentinel(Pool), Peers), 361 asserta(sentinel(Pool, Sentinel)). 362 363update_peer_sentinel(Pool, Attrs), 364 memberchk(ip-Host, Attrs), 365 memberchk(port-Port, Attrs) => 366 asserta(sentinel(Pool, Host:Port)). 367 368must_have_option(Opt, Options) :- 369 option(Opt, Options), 370 !. 371must_have_option(Opt, Options) :- 372 existence_error(option, Opt, Options).
379hello(Con, Options) :- 380 option(version(V), Options), 381 V >= 3, 382 !, 383 ( option(user(User), Options), 384 option(password(Password), Options) 385 -> redis(Con, hello(3, auth, User, Password)) 386 ; redis(Con, hello(3)) 387 ). 388hello(Con, Options) :- 389 option(password(Password), Options), 390 !, 391 redis(Con, auth(Password)). 392hello(_, _).
redis_connection(Id,Stream,Failures,Options)
. If the stream is
disconnected it will be reconnected.401redis_stream(Var, S, _) :- 402 ( var(Var) 403 -> !, instantiation_error(Var) 404 ; nonvar(S) 405 -> !, uninstantiation_error(S) 406 ). 407redis_stream(ServerName, S, Connect) :- 408 atom(ServerName), 409 !, 410 ( connection(ServerName, S0) 411 -> S = S0 412 ; Connect == true, 413 server(ServerName, Address, Options) 414 -> redis_connect(Address, Connection, Options), 415 redis_stream(Connection, S, false), 416 asserta(connection(ServerName, S)) 417 ; existence_error(redis_server, ServerName) 418 ). 419redis_stream(redis_connection(_,S0,_,_), S, _) :- 420 S0 \== (-), 421 !, 422 S = S0. 423redis_stream(Redis, S, _) :- 424 Redis = redis_connection(Id,-,_,Options), 425 option(address(Address), Options), 426 do_connect(Id,Address,Redis2,Options), 427 arg(2, Redis2, S0), 428 nb_setarg(2, Redis, S0), 429 S = S0. 430 431has_redis_stream(Var, _) :- 432 var(Var), 433 !, 434 instantiation_error(Var). 435has_redis_stream(Alias, S) :- 436 atom(Alias), 437 !, 438 connection(Alias, S). 439has_redis_stream(redis_connection(_,S,_,_), S) :- 440 S \== (-).
true
(default false
), do not raise any errors if
Connection does not exist or closing the connection raises
a network or I/O related exception. This version is used
internally if a connection is in a broken state, either due
to a protocol error or a network issue.456redis_disconnect(Redis) :- 457 redis_disconnect(Redis, []). 458 459redis_disconnect(Redis, Options) :- 460 option(force(true), Options), 461 !, 462 ( Redis = redis_connection(_Id, S, _, _Opts) 463 -> ( S == (-) 464 -> true 465 ; close(S, [force(true)]), 466 nb_setarg(2, Redis, -) 467 ) 468 ; has_redis_stream(Redis, S) 469 -> close(S, [force(true)]), 470 retractall(connection(_,S)) 471 ; true 472 ). 473redis_disconnect(Redis, _Options) :- 474 redis_stream(Redis, S, false), 475 close(S), 476 retractall(connection(_,S)).
redis(Connection, Command, _)
and second, it
can be used to exploit Redis pipelines and transactions. The
second form is acticated if Request is a list. In that case, each
element of the list is either a term Command -> Reply
or a simple
Command. Semantically this represents a sequence of redis/3 and
redis/2 calls. It differs in the following aspects:
multi
and the last exec
, the
commands are executed as a Redis transaction, i.e., they
are executed atomically.Procedurally, the process takes the following steps:
Command -> Reply
terms.Examples
?- redis(default, [ lpush(li,1), lpush(li,2), lrange(li,0,-1) -> List ]). List = ["2", "1"].
520redis(Redis, PipeLine) :- 521 is_list(PipeLine), 522 !, 523 redis_pipeline(Redis, PipeLine). 524redis(Redis, Req) :- 525 redis(Redis, Req, _).
"A:B:..."
. This is a common shorthand for
representing Redis keys.
Reply is either a plain term (often a variable) or a term Value as
Type
. In the latter form, Type dictates how the Redis bulk
reply is translated to Prolog. The default equals to auto
, i.e.,
as a number of the content satisfies the Prolog number syntax and
as an atom otherwise.
status(Atom)
Returned if the server replies with + Status
. Atom
is the textual value of Status converted to lower case,
e.g., status(ok)
or status(pong)
.nil
This atom is returned for a NIL/NULL value. Note that if
the reply is only nil
, redis/3 fails. The nil
value
may be embedded inside lists or maps.nil
. If Reply
as a whole would be nil
the call fails.
Redis bulk replies are translated depending on the as
Type as
explained above.
bytes
(iso_latin_1
), utf8
and text
(the
current locale translation).type_error(Type, String)
is raised.min_tagged_integer
and max_tagged_integer
, allowing
the value to be used as a dict key.auto(atom, number)
auto(atom,tagged_integer)
. This allows the value
to be used as a key for a SWI-Prolog dict.pairs
type
can also be applied to a Redis array. In this case the array
length must be even. This notably allows fetching a Redis
hash as pairs using HGETALL
using version 2 of the
Redis protocol.pairs(AsKey, AsValue)
, but convert the resulting
pair list into a SWI-Prolog dict. AsKey must convert to a
valid dict key, i.e., an atom or tagged integer. See dict_key
.dict(dict_key, AsValue)
.Here are some simple examples
?- redis(default, set(a, 42), X). X = status("OK"). ?- redis(default, get(a), X). X = "42". ?- redis(default, get(a), X as integer). X = 42. ?- redis(default, get(a), X as float). X = 42.0. ?- redis(default, set(swipl:version, 8)). true. ?- redis(default, incr(swipl:version), X). X = 9.
645redis(Redis, Req, Out) :- 646 out_val(Out, Val), 647 redis1(Redis, Req, Out), 648 Val \== nil. 649 650out_val(Out, Val) :- 651 ( nonvar(Out), 652 Out = (Val as _) 653 -> true 654 ; Val = Out 655 ). 656 657redis1(Redis, Req, Out) :- 658 Error = error(Formal, _), 659 catch(redis2(Redis, Req, Out), Error, true), 660 ( var(Formal) 661 -> true 662 ; recover(Error, Redis, redis1(Redis, Req, Out)) 663 ). 664 665redis2(Redis, Req, Out) :- 666 atom(Redis), 667 !, 668 redis_stream(Redis, S, true), 669 with_mutex(Redis, 670 ( redis_write_msg(S, Req), 671 redis_read_stream(Redis, S, Out) 672 )). 673redis2(Redis, Req, Out) :- 674 redis_stream(Redis, S, true), 675 redis_write_msg(S, Req), 676 redis_read_stream(Redis, S, Out).
680redis_pipeline(Redis, PipeLine) :- 681 Error = error(Formal, _), 682 catch(redis_pipeline2(Redis, PipeLine), Error, true), 683 ( var(Formal) 684 -> true 685 ; recover(Error, Redis, redis_pipeline(Redis, PipeLine)) 686 ). 687 688redis_pipeline2(Redis, PipeLine) :- 689 atom(Redis), 690 !, 691 redis_stream(Redis, S, true), 692 with_mutex(Redis, 693 redis_pipeline3(Redis, S, PipeLine)). 694redis_pipeline2(Redis, PipeLine) :- 695 redis_stream(Redis, S, true), 696 redis_pipeline3(Redis, S, PipeLine). 697 698redis_pipeline3(Redis, S, PipeLine) :- 699 maplist(write_pipeline(S), PipeLine), 700 flush_output(S), 701 read_pipeline(Redis, S, PipeLine). 702 703write_pipeline(S, Command -> _Reply) :- 704 !, 705 redis_write_msg_no_flush(S, Command). 706write_pipeline(S, Command) :- 707 redis_write_msg_no_flush(S, Command). 708 709read_pipeline(Redis, S, PipeLine) :- 710 E = error(Formal,_), 711 catch(read_pipeline2(Redis, S, PipeLine), E, true), 712 ( var(Formal) 713 -> true 714 ; reconnect_error(E) 715 -> redis_disconnect(Redis, [force(true)]), 716 throw(E) 717 ; resync(Redis), 718 throw(E) 719 ). 720 721read_pipeline2(Redis, S, PipeLine) :- 722 maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed), 723 maplist(handle_push(Redis), Pushed), 724 maplist(handle_error, Errors), 725 maplist(bind_reply, PipeLine, Replies). 726 727redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :- 728 !, 729 redis_read_msg(S, ReplyIn, Reply, Error, Push). 730redis_read_msg3(S, Var, Reply, Error, Push) :- 731 redis_read_msg(S, Var, Reply, Error, Push). 732 733handle_push(Redis, Pushed) :- 734 handle_push_messages(Pushed, Redis). 735handle_error(Error) :- 736 ( var(Error) 737 -> true 738 ; throw(Error) 739 ). 740bind_reply(_Command -> Reply0, Reply) :- 741 !, 742 Reply0 = Reply. 743bind_reply(_Command, _).
752:- meta_predicate recover( , , ). 753 754recover(Error, Redis, Goal) :- 755 reconnect_error(Error), 756 auto_reconnect(Redis), 757 !, 758 debug(redis(recover), '~p: got error ~p; trying to reconnect', 759 [Redis, Error]), 760 redis_disconnect(Redis, [force(true)]), 761 ( wait_to_retry(Redis, Error) 762 -> call(Goal), 763 retractall(failure(Redis, _)) 764 ; throw(Error) 765 ). 766recover(Error, _, _) :- 767 throw(Error). 768 769auto_reconnect(redis_connection(_,_,_,Options)) :- 770 !, 771 option(reconnect(true), Options). 772auto_reconnect(Server) :- 773 ground(Server), 774 server(Server, _, Options), 775 option(reconnect(true), Options, true). 776 777reconnect_error(error(io_error(_Action, _On),_)). 778reconnect_error(error(socket_error(_Code, _),_)). 779reconnect_error(error(syntax_error(unexpected_eof),_)).
max_retry_wait
. If the
setting max_retry_count
is exceeded we fail and the called signals
an exception.788:- dynamic failure/2 as volatile. 789 790wait_to_retry(Redis, Error) :- 791 redis_failures(Redis, Failures), 792 setting(max_retry_count, Count), 793 Failures < Count, 794 Failures2 is Failures+1, 795 redis_set_failures(Redis, Failures2), 796 setting(max_retry_wait, MaxWait), 797 Wait is min(MaxWait*100, 1<<Failures)/100.0, 798 debug(redis(recover), ' Sleeping ~p seconds', [Wait]), 799 retry_message_level(Failures, Level), 800 print_message(Level, redis(retry(Redis, Failures, Wait, Error))), 801 sleep(Wait). 802 803redis_failures(redis_connection(_,_,Failures0,_), Failures) :- 804 !, 805 Failures = Failures0. 806redis_failures(Server, Failures) :- 807 atom(Server), 808 ( failure(Server, Failures) 809 -> true 810 ; Failures = 0 811 ). 812 813redis_set_failures(Connection, Count) :- 814 compound(Connection), 815 !, 816 nb_setarg(3, Connection, Count). 817redis_set_failures(Server, Count) :- 818 atom(Server), 819 retractall(failure(Server, _)), 820 asserta(failure(Server, Count)). 821 822retry_message_level(0, warning) :- !. 823retry_message_level(_, silent).
832redis(Req) :-
833 setup_call_cleanup(
834 redis_connect(default, C, []),
835 redis1(C, Req, Out),
836 redis_disconnect(C)),
837 print(Out).
845redis_write(Redis, Command) :- 846 redis_stream(Redis, S, true), 847 redis_write_msg(S, Command). 848 849redis_read(Redis, Reply) :- 850 redis_stream(Redis, S, true), 851 redis_read_stream(Redis, S, Reply). 852 853 854 /******************************* 855 * HIGH LEVEL ACCESS * 856 *******************************/
LRANGE
requests. Note
that this results in O(N^2) complexity. Using a lazy list is most
useful for relatively short lists holding possibly large items.
Note that values retrieved are strings, unless the value was added
using Term as prolog
.
873redis_get_list(Redis, Key, List) :- 874 redis_get_list(Redis, Key, -1, List). 875 876redis_get_list(Redis, Key, Chunk, List) :- 877 redis(Redis, llen(Key), Len), 878 ( ( Chunk >= Len 879 ; Chunk == -1 880 ) 881 -> ( Len == 0 882 -> List = [] 883 ; End is Len-1, 884 list_range(Redis, Key, 0, End, List) 885 ) 886 ; lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List) 887 ). 888 889rlist_next(State, List, Tail) :- 890 State = s(Redis,Key,Offset,Slice,Len), 891 End is min(Len-1, Offset+Slice-1), 892 list_range(Redis, Key, Offset, End, Elems), 893 ( End =:= Len-1 894 -> List = Elems, 895 Tail = [] 896 ; Offset2 is Offset+Slice, 897 nb_setarg(3, State, Offset2), 898 append(Elems, Tail, List) 899 ). 900 901% Redis LRANGE demands End > Start and returns inclusive. 902 903list_range(DB, Key, Start, Start, [Elem]) :- 904 !, 905 redis(DB, lindex(Key, Start), Elem). 906list_range(DB, Key, Start, End, List) :- 907 !, 908 redis(DB, lrange(Key, Start, End), List).
[]
, Key is deleted. Note that key values
are always strings in Redis. The same conversion rules as for
redis/1-3 apply.
919redis_set_list(Redis, Key, List) :-
920 redis(Redis, del(Key), _),
921 ( List == []
922 -> true
923 ; Term =.. [rpush,Key|List],
924 redis(Redis, Term, _Count)
925 ).
HGETALL
command. If the Redis hash is not used by
other (non-Prolog) applications one may also consider using the
Term as prolog
syntax to store the Prolog dict as-is.938redis_get_hash(Redis, Key, Dict) :- 939 redis(Redis, hgetall(Key), Dict as dict(auto)). 940 941redis_set_hash(Redis, Key, Dict) :- 942 redis_array_dict(Array, _, Dict), 943 Term =.. [hset,Key|Array], 944 redis(Redis, del(Key), _), 945 redis(Redis, Term, _Count).
956redis_array_dict(Array, Tag, Dict) :- 957 nonvar(Array), 958 !, 959 array_to_pairs(Array, Pairs), 960 dict_pairs(Dict, Tag, Pairs). 961redis_array_dict(TwoList, Tag, Dict) :- 962 dict_pairs(Dict, Tag, Pairs), 963 pairs_to_array(Pairs, TwoList). 964 965array_to_pairs([], []) :- 966 !. 967array_to_pairs([NameS-Value|T0], [Name-Value|T]) :- 968 !, % RESP3 returns a map as pairs. 969 atom_string(Name, NameS), 970 array_to_pairs(T0, T). 971array_to_pairs([NameS,Value|T0], [Name-Value|T]) :- 972 atom_string(Name, NameS), 973 array_to_pairs(T0, T). 974 975pairs_to_array([], []) :- 976 !. 977pairs_to_array([Name-Value|T0], [NameS,Value|T]) :- 978 atom_string(Name, NameS), 979 pairs_to_array(T0, T).
SCAN
, SSCAN
, HSCAN
and ZSCAN` commands
into a lazy list. For redis_scan/3 and redis_sscan/4 the result is
a list of strings. For redis_hscan/4 and redis_zscan/4, the result
is a list of pairs. Options processed:
MATCH
subcommand, only returning matches for
Pattern.COUNT
subcommand, giving a hint to the size of the
chunks fetched.TYPE
subcommand, only returning answers of the
indicated type.1003redis_scan(Redis, LazyList, Options) :- 1004 scan_options([match,count,type], Options, Parms), 1005 lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList). 1006 1007redis_sscan(Redis, Set, LazyList, Options) :- 1008 scan_options([match,count,type], Options, Parms), 1009 lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList). 1010 1011redis_hscan(Redis, Hash, LazyList, Options) :- 1012 scan_options([match,count,type], Options, Parms), 1013 lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList). 1014 1015redis_zscan(Redis, Set, LazyList, Options) :- 1016 scan_options([match,count,type], Options, Parms), 1017 lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList). 1018 1019scan_options([], _, []). 1020scan_options([H|T0], Options, [H,V|T]) :- 1021 Term =.. [H,V], 1022 option(Term, Options), 1023 !, 1024 scan_options(T0, Options, T). 1025scan_options([_|T0], Options, T) :- 1026 scan_options(T0, Options, T). 1027 1028 1029scan_next(State, List, Tail) :- 1030 State = s(Command,Redis,Cursor,Params), 1031 Command =.. CList, 1032 append(CList, [Cursor|Params], CList2), 1033 Term =.. CList2, 1034 redis(Redis, Term, [NewCursor,Elems0]), 1035 scan_pairs(Command, Elems0, Elems), 1036 ( NewCursor == 0 1037 -> List = Elems, 1038 Tail = [] 1039 ; nb_setarg(3, State, NewCursor), 1040 append(Elems, Tail, List) 1041 ). 1042 1043scan_pairs(hscan(_), List, Pairs) :- 1044 !, 1045 scan_pairs(List, Pairs). 1046scan_pairs(zscan(_), List, Pairs) :- 1047 !, 1048 scan_pairs(List, Pairs). 1049scan_pairs(_, List, List). 1050 1051scan_pairs([], []). 1052scan_pairs([Key,Value|T0], [Key-Value|T]) :- 1053 !, 1054 scan_pairs(T0, T). 1055scan_pairs([Key-Value|T0], [Key-Value|T]) :- 1056 scan_pairs(T0, T). 1057 1058 1059 /******************************* 1060 * ABOUT * 1061 *******************************/
1070redis_current_command(Redis, Command) :- 1071 redis_current_command(Redis, Command, _). 1072 1073redis_current_command(Redis, Command, Properties) :- 1074 nonvar(Command), 1075 !, 1076 redis(Redis, command(info, Command), [[_|Properties]]). 1077redis_current_command(Redis, Command, Properties) :- 1078 redis(Redis, command, Commands), 1079 member([Name|Properties], Commands), 1080 atom_string(Command, Name).
redis(info, String)
and parses the result. As this is for machine
usage, properties names *_human are skipped.1088redis_property(Redis, Property) :- 1089 redis(Redis, info, String), 1090 info_terms(String, Terms), 1091 member(Property, Terms). 1092 1093info_terms(Info, Pairs) :- 1094 split_string(Info, "\n", "\r\n ", Lines), 1095 convlist(info_line_term, Lines, Pairs). 1096 1097info_line_term(Line, Term) :- 1098 sub_string(Line, B, _, A, :), 1099 !, 1100 sub_atom(Line, 0, B, _, Name), 1101 \+ sub_atom(Name, _, _, 0, '_human'), 1102 sub_string(Line, _, A, 0, ValueS), 1103 ( number_string(Value, ValueS) 1104 -> true 1105 ; Value = ValueS 1106 ), 1107 Term =.. [Name,Value]. 1108 1109 1110 /******************************* 1111 * SUBSCRIBE * 1112 *******************************/
redis(Id, Channel, Data)
If redis_unsubscribe/2 removes the last subscription, the thread terminates.
To simply print the incomming messages use e.g.
?- listen(redis(_, Channel, Data), format('Channel ~p got ~p~n', [Channel,Data])). true. ?- redis_subscribe(default, test, Id, []). Id = redis_pubsub_3, ?- redis(publish(test, "Hello world")). Channel test got "Hello world" 1 true.
1142:- dynamic ( subscription/2, % Id, Channel 1143 listening/3 % Id, Connection, Thread 1144 ) as volatile. 1145 1146redis_subscribe(Redis, Spec, Id, Options) :- 1147 atom(Redis), 1148 !, 1149 channels(Spec, Channels), 1150 pubsub_thread_options(ThreadOptions, Options), 1151 thread_create(setup_call_cleanup( 1152 redis_connect(Redis, Conn, [reconnect(true)]), 1153 redis_subscribe1(Redis, Conn, Channels), 1154 redis_disconnect(Conn)), 1155 Thread, 1156 ThreadOptions), 1157 pubsub_id(Thread, Id). 1158redis_subscribe(Redis, Spec, Id, Options) :- 1159 channels(Spec, Channels), 1160 pubsub_thread_options(ThreadOptions, Options), 1161 thread_create(redis_subscribe1(Redis, Redis, Channels), 1162 Thread, 1163 ThreadOptions), 1164 pubsub_id(Thread, Id). 1165 1166pubsub_thread_options(ThreadOptions, Options) :- 1167 merge_options(Options, [detached(true)], ThreadOptions). 1168 1169pubsub_id(Thread, Thread). 1170%pubsub_id(Thread, Id) :- 1171% thread_property(Thread, id(TID)), 1172% atom_concat('redis_pubsub_', TID, Id). 1173 1174redis_subscribe1(Redis, Conn, Channels) :- 1175 Error = error(Formal, _), 1176 catch(redis_subscribe2(Redis, Conn, Channels), Error, true), 1177 ( var(Formal) 1178 -> true 1179 ; recover(Error, Conn, redis1(Conn, echo("reconnect"), _)), 1180 thread_self(Me), 1181 pubsub_id(Me, Id), 1182 findall(Channel, subscription(Id, Channel), CurrentChannels), 1183 redis_subscribe1(Redis, Conn, CurrentChannels) 1184 ). 1185 1186redis_subscribe2(Redis, Conn, Channels) :- 1187 redis_subscribe3(Conn, Channels), 1188 redis_listen(Redis, Conn). 1189 1190redis_subscribe3(Conn, Channels) :- 1191 thread_self(Me), 1192 pubsub_id(Me, Id), 1193 prolog_listen(this_thread_exit, pubsub_clean(Id)), 1194 maplist(register_subscription(Id), Channels), 1195 redis_stream(Conn, S, true), 1196 Req =.. [subscribe|Channels], 1197 redis_write_msg(S, Req). 1198 1199pubsub_clean(Id) :- 1200 retractall(listening(Id, _Connection, _Thread)), 1201 retractall(subscription(Id, _Channel)).
1213redis_subscribe(Id, Spec) :- 1214 channels(Spec, Channels), 1215 ( listening(Id, Connection, _Thread) 1216 -> true 1217 ; existence_error(redis_pubsub, Id) 1218 ), 1219 maplist(register_subscription(Id), Channels), 1220 redis_stream(Connection, S, true), 1221 Req =.. [subscribe|Channels], 1222 redis_write_msg(S, Req). 1223 1224redis_unsubscribe(Id, Spec) :- 1225 channels(Spec, Channels), 1226 ( listening(Id, Connection, _Thread) 1227 -> true 1228 ; existence_error(redis_pubsub, Id) 1229 ), 1230 maplist(unregister_subscription(Id), Channels), 1231 redis_stream(Connection, S, true), 1232 Req =.. [unsubscribe|Channels], 1233 redis_write_msg(S, Req).
1239redis_current_subscription(Id, Channels) :- 1240 findall(Id-Channel, subscription(Id, Channel), Pairs), 1241 keysort(Pairs, Sorted), 1242 group_pairs_by_key(Sorted, Grouped), 1243 member(Id-Channels, Grouped). 1244 1245channels(Spec, List) :- 1246 is_list(Spec), 1247 !, 1248 maplist(channel_name, Spec, List). 1249channels(Ch, [Key]) :- 1250 channel_name(Ch, Key). 1251 1252channel_name(Atom, Atom) :- 1253 atom(Atom), 1254 !. 1255channel_name(Key, Atom) :- 1256 phrase(key_parts(Key), Parts), 1257 !, 1258 atomic_list_concat(Parts, :, Atom). 1259channel_name(Key, _) :- 1260 type_error(redis_key, Key). 1261 1262key_parts(Var) --> 1263 { var(Var), !, fail }. 1264key_parts(Atom) --> 1265 { atom(Atom) }, 1266 !, 1267 [Atom]. 1268key_parts(A:B) --> 1269 key_parts(A), 1270 key_parts(B). 1271 1272 1273 1274 1275register_subscription(Id, Channel) :- 1276 ( subscription(Id, Channel) 1277 -> true 1278 ; assertz(subscription(Id, Channel)) 1279 ). 1280 1281unregister_subscription(Id, Channel) :- 1282 retractall(subscription(Id, Channel)). 1283 1284redis_listen(Redis, Conn) :- 1285 thread_self(Me), 1286 pubsub_id(Me, Id), 1287 setup_call_cleanup( 1288 assertz(listening(Id, Conn, Me), Ref), 1289 redis_listen_loop(Redis, Id, Conn), 1290 erase(Ref)). 1291 1292redis_listen_loop(Redis, Id, Conn) :- 1293 redis_stream(Conn, S, true), 1294 ( subscription(Id, _) 1295 -> redis_read_stream(Redis, S, Reply), 1296 redis_broadcast(Redis, Reply), 1297 redis_listen_loop(Redis, Id, Conn) 1298 ; true 1299 ). 1300 1301redis_broadcast(_, [subscribe, _Channel, _N]) :- 1302 !. 1303redis_broadcast(Redis, [message, Channel, Data]) :- 1304 !, 1305 catch(broadcast(redis(Redis, Channel, Data)), 1306 Error, 1307 print_message(error, Error)). 1308redis_broadcast(Redis, Message) :- 1309 assertion((Message = [Type, Channel, _Data], 1310 atom(Type), 1311 atom(Channel))), 1312 debug(redis(warning), '~p: Unknown message while listening: ~p', 1313 [Redis,Message]). 1314 1315 1316 /******************************* 1317 * READ/WRITE * 1318 *******************************/
nil
status(String)
true
or false
). RESP3 only.If something goes wrong, the connection is closed and an exception is raised.
1335redis_read_stream(Redis, SI, Out) :- 1336 E = error(Formal,_), 1337 catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true), 1338 ( var(Formal) 1339 -> handle_push_messages(Push, Redis), 1340 ( var(Error) 1341 -> Out = Out0 1342 ; resync(Redis), 1343 throw(Error) 1344 ) 1345 ; redis_disconnect(Redis, [force(true)]), 1346 throw(E) 1347 ). 1348 1349handle_push_messages([], _). 1350handle_push_messages([H|T], Redis) :- 1351 ( catch(handle_push_message(H, Redis), E, 1352 print_message(warning, E)) 1353 -> true 1354 ; true 1355 ), 1356 handle_push_messages(T, Redis). 1357 1358handle_push_message(["pubsub"|List], Redis) :- 1359 redis_broadcast(Redis, List). 1360% some protocol version 3 push messages (such as 1361% __keyspace@* events) seem to come directly 1362% without a pubsub header 1363handle_push_message([message|List], Redis) :- 1364 redis_broadcast(Redis, [message|List]).
1374resync(Redis) :- 1375 E = error(Formal,_), 1376 catch(do_resync(Redis), E, true), 1377 ( var(Formal) 1378 -> true 1379 ; redis_disconnect(Redis, [force(true)]) 1380 ). 1381 1382do_resync(Redis) :- 1383 A is random(1_000_000_000), 1384 redis_stream(Redis, S, true), 1385 redis_write_msg(S, echo(A)), 1386 catch(call_with_time_limit(0.2, '$redis_resync'(S, A)), 1387 time_limit_exceeded, 1388 throw(error(time_limit_exceeded,_))).
redis4pl
.
1403 /******************************* 1404 * MESSAGES * 1405 *******************************/ 1406 1407:- multifile 1408 prolog:error_message//1, 1409 prolog:message//1. 1410 1411prologerror_message(redis_error(Code, String)) --> 1412 [ 'REDIS: ~w: ~s'-[Code, String] ]. 1413 1414prologmessage(redis(retry(_Redis, _Failures, Wait, Error))) --> 1415 [ 'REDIS: connection error. Retrying in ~2f seconds'-[Wait], nl ], 1416 [ ' '-[] ], '$messages':translate_message(Error)
Redis client
This library is a client to Redis, a popular key value store to deal with caching and communication between micro services.
In the typical use case we register the details of one or more Redis servers using redis_server/3. Subsequenly, redis/2-3 is used to issue commands on the server. For example:
*/