41
42:- module(redis,
43 [ redis_server/3, 44 redis_connect/1, 45 redis_connect/3, 46 redis_disconnect/1, 47 redis_disconnect/2, 48 49 redis/1, 50 redis/2, 51 redis/3, 52 53 redis_get_list/3, 54 redis_get_list/4, 55 redis_set_list/3, 56 redis_get_hash/3, 57 redis_set_hash/3, 58 redis_scan/3, 59 redis_sscan/4, 60 redis_hscan/4, 61 redis_zscan/4, 62 63 redis_subscribe/4, 64 redis_subscribe/2, 65 redis_unsubscribe/2, 66 redis_current_subscription/2, 67 redis_write/2, 68 redis_read/2, 69 70 redis_array_dict/3, 71 72 redis_property/2, 73 redis_current_command/2, 74 redis_current_command/3 75 ]). 76:- autoload(library(socket), [tcp_connect/3]). 77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]). 78:- autoload(library(broadcast), [broadcast/1]). 79:- autoload(library(error),
80 [ must_be/2,
81 type_error/2,
82 instantiation_error/1,
83 uninstantiation_error/1,
84 existence_error/2,
85 existence_error/3
86 ]). 87:- autoload(library(lazy_lists), [lazy_list/2]). 88:- autoload(library(lists), [append/3, member/2]). 89:- autoload(library(option), [merge_options/3, option/2,
90 option/3, select_option/4]). 91:- autoload(library(pairs), [group_pairs_by_key/2]). 92:- autoload(library(time), [call_with_time_limit/2]). 93:- use_module(library(debug), [debug/3, assertion/1]). 94:- use_module(library(settings), [setting/4, setting/2]). 95:- if(exists_source(library(ssl))). 96:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]). 97:- endif. 98
99:- use_foreign_library(foreign(redis4pl)). 100
101:- setting(max_retry_count, nonneg, 8640, 102 "Max number of retries"). 103:- setting(max_retry_wait, number, 10,
104 "Max time to wait between recovery attempts"). 105:- setting(sentinel_timeout, number, 0.2,
106 "Time to wait for a sentinel"). 107
108:- predicate_options(redis_server/3, 3,
109 [ pass_to(redis:redis_connect/3, 3)
110 ]). 111:- predicate_options(redis_connect/3, 3,
112 [ reconnect(boolean),
113 user(atom),
114 password(atomic),
115 version(between(2,3))
116 ]). 117:- predicate_options(redis_disconnect/2, 2,
118 [ force(boolean)
119 ]). 120:- predicate_options(redis_scan/3, 3,
121 [ match(atomic),
122 count(nonneg),
123 type(atom)
124 ]). 126:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 127:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 128:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 129
130
148
149:- dynamic server/3. 150
151:- dynamic ( connection/2, 152 sentinel/2 153 ) as volatile. 154
166
167redis_server(Alias, Address, Options) :-
168 must_be(ground, Alias),
169 retractall(server(Alias, _, _)),
170 asserta(server(Alias, Address, Options)).
171
172server(default, localhost:6379, []).
173
227
228redis_connect(Conn) :-
229 redis_connect(default, Conn, []).
230
231redis_connect(Conn, Host, Port) :-
232 var(Conn),
233 ground(Host), ground(Port),
234 !, 235 redis_connect(Host:Port, Conn, []).
236redis_connect(Server, Conn, Options) :-
237 atom(Server),
238 !,
239 ( server(Server, Address, DefaultOptions)
240 -> merge_options(Options, DefaultOptions, Options2),
241 do_connect(Server, Address, Conn, [address(Address)|Options2])
242 ; existence_error(redis_server, Server)
243 ).
244redis_connect(Address, Conn, Options) :-
245 do_connect(Address, Address, Conn, [address(Address)|Options]).
246
252
253do_connect(Id, sentinel(Pool), Conn, Options) =>
254 sentinel_master(Id, Pool, Conn, Options).
255do_connect(Id, Address0, Conn, Options) =>
256 tcp_address(Address0, Address),
257 tcp_connect(Address, Stream0, Options),
258 tls_upgrade(Address, Stream0, Stream, Options),
259 Conn = redis_connection(Id, Stream, 0, Options),
260 hello(Conn, Options).
261
262tcp_address(unix(Path), Path) :-
263 !. 264tcp_address(Address, Address).
265
269
270:- if(current_predicate(ssl_context/3)). 271tls_upgrade(Host:_Port, Raw, Stream, Options) :-
272 option(tls(true), Options),
273 !,
274 must_have_option(cacert(CacertFile), Options),
275 must_have_option(key(KeyFile), Options),
276 must_have_option(cert(CertFile), Options),
277 ssl_context(client, SSL,
278 [ host(Host),
279 certificate_file(CertFile),
280 key_file(KeyFile),
281 cacerts([file(CacertFile)]),
282 cert_verify_hook(tls_verify),
283 close_parent(true)
284 ]),
285 stream_pair(Raw, RawRead, RawWrite),
286 ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
287 stream_pair(Stream, Read, Write).
288:- endif. 289tls_upgrade(_, Stream, Stream, _).
290
291:- if(current_predicate(ssl_context/3)). 292
298
299:- public tls_verify/5. 300tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
301 !.
302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
303 !.
304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
305 fail.
306
307:- endif. 308
312
313sentinel_master(Id, Pool, Master, Options) :-
314 must_have_option(sentinels(Sentinels), Options),
315 sentinel_auth(Options, Options1),
316 setting(sentinel_timeout, TMO),
317 ( sentinel(Pool, Sentinel)
318 ; member(Sentinel, Sentinels)
319 ),
320 catch(call_with_time_limit(
321 TMO,
322 do_connect(Id, Sentinel, Conn,
323 [sentinel(true)|Options1])),
324 Error,
325 (print_message(warning, Error),fail)),
326 !,
327 debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
328 call_cleanup(
329 query_sentinel(Pool, Conn, Sentinel, MasterAddr),
330 redis_disconnect(Conn)),
331 debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
332 do_connect(Id, MasterAddr, Master, Options),
333 debug(redis(sentinel), 'Connected to claimed master', []),
334 redis(Master, role, Role),
335 ( Role = [master|_Slaves]
336 -> debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
337 ; redis_disconnect(Master),
338 debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
339 sleep(TMO),
340 sentinel_master(Id, Pool, Master, Options)
341 ).
342
343sentinel_auth(Options0, Options) :-
344 option(sentinel_user(User), Options0),
345 option(sentinel_password(Passwd), Options0),
346 !,
347 merge_options([user(User), password(Passwd)], Options0, Options).
348sentinel_auth(Options0, Options) :-
349 select_option(password(_), Options0, Options, _).
350
351
352query_sentinel(Pool, Conn, Sentinel, Host:Port) :-
353 redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
354 MasterData = [Host,Port],
355 redis(Conn, sentinel(sentinels, Pool), Peers),
356 transaction(update_known_sentinels(Pool, Sentinel, Peers)).
357
358update_known_sentinels(Pool, Sentinel, Peers) :-
359 retractall(sentinel(Pool, _)),
360 maplist(update_peer_sentinel(Pool), Peers),
361 asserta(sentinel(Pool, Sentinel)).
362
363update_peer_sentinel(Pool, Attrs),
364 memberchk(ip-Host, Attrs),
365 memberchk(port-Port, Attrs) =>
366 asserta(sentinel(Pool, Host:Port)).
367
368must_have_option(Opt, Options) :-
369 option(Opt, Options),
370 !.
371must_have_option(Opt, Options) :-
372 existence_error(option, Opt, Options).
373
378
379hello(Con, Options) :-
380 option(version(V), Options),
381 V >= 3,
382 !,
383 ( option(user(User), Options),
384 option(password(Password), Options)
385 -> redis(Con, hello(3, auth, User, Password))
386 ; redis(Con, hello(3))
387 ).
388hello(Con, Options) :-
389 option(password(Password), Options),
390 !,
391 redis(Con, auth(Password)).
392hello(_, _).
393
400
401redis_stream(Var, S, _) :-
402 ( var(Var)
403 -> !, instantiation_error(Var)
404 ; nonvar(S)
405 -> !, uninstantiation_error(S)
406 ).
407redis_stream(ServerName, S, Connect) :-
408 atom(ServerName),
409 !,
410 ( connection(ServerName, S0)
411 -> S = S0
412 ; Connect == true,
413 server(ServerName, Address, Options)
414 -> redis_connect(Address, Connection, Options),
415 redis_stream(Connection, S, false),
416 asserta(connection(ServerName, S))
417 ; existence_error(redis_server, ServerName)
418 ).
419redis_stream(redis_connection(_,S0,_,_), S, _) :-
420 S0 \== (-),
421 !,
422 S = S0.
423redis_stream(Redis, S, _) :-
424 Redis = redis_connection(Id,-,_,Options),
425 option(address(Address), Options),
426 do_connect(Id,Address,Redis2,Options),
427 arg(2, Redis2, S0),
428 nb_setarg(2, Redis, S0),
429 S = S0.
430
431has_redis_stream(Var, _) :-
432 var(Var),
433 !,
434 instantiation_error(Var).
435has_redis_stream(Alias, S) :-
436 atom(Alias),
437 !,
438 connection(Alias, S).
439has_redis_stream(redis_connection(_,S,_,_), S) :-
440 S \== (-).
441
442
455
456redis_disconnect(Redis) :-
457 redis_disconnect(Redis, []).
458
459redis_disconnect(Redis, Options) :-
460 option(force(true), Options),
461 !,
462 ( Redis = redis_connection(_Id, S, _, _Opts)
463 -> ( S == (-)
464 -> true
465 ; close(S, [force(true)]),
466 nb_setarg(2, Redis, -)
467 )
468 ; has_redis_stream(Redis, S)
469 -> close(S, [force(true)]),
470 retractall(connection(_,S))
471 ; true
472 ).
473redis_disconnect(Redis, _Options) :-
474 redis_stream(Redis, S, false),
475 close(S),
476 retractall(connection(_,S)).
477
519
520redis(Redis, PipeLine) :-
521 is_list(PipeLine),
522 !,
523 redis_pipeline(Redis, PipeLine).
524redis(Redis, Req) :-
525 redis(Redis, Req, _).
526
644
645redis(Redis, Req, Out) :-
646 out_val(Out, Val),
647 redis1(Redis, Req, Out),
648 Val \== nil.
649
650out_val(Out, Val) :-
651 ( nonvar(Out),
652 Out = (Val as _)
653 -> true
654 ; Val = Out
655 ).
656
657redis1(Redis, Req, Out) :-
658 Error = error(Formal, _),
659 catch(redis2(Redis, Req, Out), Error, true),
660 ( var(Formal)
661 -> true
662 ; recover(Error, Redis, redis1(Redis, Req, Out))
663 ).
664
665redis2(Redis, Req, Out) :-
666 atom(Redis),
667 !,
668 redis_stream(Redis, S, true),
669 with_mutex(Redis,
670 ( redis_write_msg(S, Req),
671 redis_read_stream(Redis, S, Out)
672 )).
673redis2(Redis, Req, Out) :-
674 redis_stream(Redis, S, true),
675 redis_write_msg(S, Req),
676 redis_read_stream(Redis, S, Out).
677
679
680redis_pipeline(Redis, PipeLine) :-
681 Error = error(Formal, _),
682 catch(redis_pipeline2(Redis, PipeLine), Error, true),
683 ( var(Formal)
684 -> true
685 ; recover(Error, Redis, redis_pipeline(Redis, PipeLine))
686 ).
687
688redis_pipeline2(Redis, PipeLine) :-
689 atom(Redis),
690 !,
691 redis_stream(Redis, S, true),
692 with_mutex(Redis,
693 redis_pipeline3(Redis, S, PipeLine)).
694redis_pipeline2(Redis, PipeLine) :-
695 redis_stream(Redis, S, true),
696 redis_pipeline3(Redis, S, PipeLine).
697
698redis_pipeline3(Redis, S, PipeLine) :-
699 maplist(write_pipeline(S), PipeLine),
700 flush_output(S),
701 read_pipeline(Redis, S, PipeLine).
702
703write_pipeline(S, Command -> _Reply) :-
704 !,
705 redis_write_msg_no_flush(S, Command).
706write_pipeline(S, Command) :-
707 redis_write_msg_no_flush(S, Command).
708
709read_pipeline(Redis, S, PipeLine) :-
710 E = error(Formal,_),
711 catch(read_pipeline2(Redis, S, PipeLine), E, true),
712 ( var(Formal)
713 -> true
714 ; reconnect_error(E)
715 -> redis_disconnect(Redis, [force(true)]),
716 throw(E)
717 ; resync(Redis),
718 throw(E)
719 ).
720
721read_pipeline2(Redis, S, PipeLine) :-
722 maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
723 maplist(handle_push(Redis), Pushed),
724 maplist(handle_error, Errors),
725 maplist(bind_reply, PipeLine, Replies).
726
727redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
728 !,
729 redis_read_msg(S, ReplyIn, Reply, Error, Push).
730redis_read_msg3(S, Var, Reply, Error, Push) :-
731 redis_read_msg(S, Var, Reply, Error, Push).
732
733handle_push(Redis, Pushed) :-
734 handle_push_messages(Pushed, Redis).
735handle_error(Error) :-
736 ( var(Error)
737 -> true
738 ; throw(Error)
739 ).
740bind_reply(_Command -> Reply0, Reply) :-
741 !,
742 Reply0 = Reply.
743bind_reply(_Command, _).
744
745
751
752:- meta_predicate recover(+, +, 0). 753
754recover(Error, Redis, Goal) :-
755 reconnect_error(Error),
756 auto_reconnect(Redis),
757 !,
758 debug(redis(recover), '~p: got error ~p; trying to reconnect',
759 [Redis, Error]),
760 redis_disconnect(Redis, [force(true)]),
761 ( wait_to_retry(Redis, Error)
762 -> call(Goal),
763 retractall(failure(Redis, _))
764 ; throw(Error)
765 ).
766recover(Error, _, _) :-
767 throw(Error).
768
769auto_reconnect(redis_connection(_,_,_,Options)) :-
770 !,
771 option(reconnect(true), Options).
772auto_reconnect(Server) :-
773 ground(Server),
774 server(Server, _, Options),
775 option(reconnect(true), Options, true).
776
777reconnect_error(error(io_error(_Action, _On),_)).
778reconnect_error(error(socket_error(_Code, _),_)).
779reconnect_error(error(syntax_error(unexpected_eof),_)).
780
787
788:- dynamic failure/2 as volatile. 789
790wait_to_retry(Redis, Error) :-
791 redis_failures(Redis, Failures),
792 setting(max_retry_count, Count),
793 Failures < Count,
794 Failures2 is Failures+1,
795 redis_set_failures(Redis, Failures2),
796 setting(max_retry_wait, MaxWait),
797 Wait is min(MaxWait*100, 1<<Failures)/100.0,
798 debug(redis(recover), ' Sleeping ~p seconds', [Wait]),
799 retry_message_level(Failures, Level),
800 print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
801 sleep(Wait).
802
803redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
804 !,
805 Failures = Failures0.
806redis_failures(Server, Failures) :-
807 atom(Server),
808 ( failure(Server, Failures)
809 -> true
810 ; Failures = 0
811 ).
812
813redis_set_failures(Connection, Count) :-
814 compound(Connection),
815 !,
816 nb_setarg(3, Connection, Count).
817redis_set_failures(Server, Count) :-
818 atom(Server),
819 retractall(failure(Server, _)),
820 asserta(failure(Server, Count)).
821
822retry_message_level(0, warning) :- !.
823retry_message_level(_, silent).
824
825
831
832redis(Req) :-
833 setup_call_cleanup(
834 redis_connect(default, C, []),
835 redis1(C, Req, Out),
836 redis_disconnect(C)),
837 print(Out).
838
844
845redis_write(Redis, Command) :-
846 redis_stream(Redis, S, true),
847 redis_write_msg(S, Command).
848
849redis_read(Redis, Reply) :-
850 redis_stream(Redis, S, true),
851 redis_read_stream(Redis, S, Reply).
852
853
854 857
872
873redis_get_list(Redis, Key, List) :-
874 redis_get_list(Redis, Key, -1, List).
875
876redis_get_list(Redis, Key, Chunk, List) :-
877 redis(Redis, llen(Key), Len),
878 ( ( Chunk >= Len
879 ; Chunk == -1
880 )
881 -> ( Len == 0
882 -> List = []
883 ; End is Len-1,
884 list_range(Redis, Key, 0, End, List)
885 )
886 ; lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
887 ).
888
889rlist_next(State, List, Tail) :-
890 State = s(Redis,Key,Offset,Slice,Len),
891 End is min(Len-1, Offset+Slice-1),
892 list_range(Redis, Key, Offset, End, Elems),
893 ( End =:= Len-1
894 -> List = Elems,
895 Tail = []
896 ; Offset2 is Offset+Slice,
897 nb_setarg(3, State, Offset2),
898 append(Elems, Tail, List)
899 ).
900
902
903list_range(DB, Key, Start, Start, [Elem]) :-
904 !,
905 redis(DB, lindex(Key, Start), Elem).
906list_range(DB, Key, Start, End, List) :-
907 !,
908 redis(DB, lrange(Key, Start, End), List).
909
910
911
918
919redis_set_list(Redis, Key, List) :-
920 redis(Redis, del(Key), _),
921 ( List == []
922 -> true
923 ; Term =.. [rpush,Key|List],
924 redis(Redis, Term, _Count)
925 ).
926
927
937
938redis_get_hash(Redis, Key, Dict) :-
939 redis(Redis, hgetall(Key), Dict as dict(auto)).
940
941redis_set_hash(Redis, Key, Dict) :-
942 redis_array_dict(Array, _, Dict),
943 Term =.. [hset,Key|Array],
944 redis(Redis, del(Key), _),
945 redis(Redis, Term, _Count).
946
955
956redis_array_dict(Array, Tag, Dict) :-
957 nonvar(Array),
958 !,
959 array_to_pairs(Array, Pairs),
960 dict_pairs(Dict, Tag, Pairs).
961redis_array_dict(TwoList, Tag, Dict) :-
962 dict_pairs(Dict, Tag, Pairs),
963 pairs_to_array(Pairs, TwoList).
964
965array_to_pairs([], []) :-
966 !.
967array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
968 !, 969 atom_string(Name, NameS),
970 array_to_pairs(T0, T).
971array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
972 atom_string(Name, NameS),
973 array_to_pairs(T0, T).
974
975pairs_to_array([], []) :-
976 !.
977pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
978 atom_string(Name, NameS),
979 pairs_to_array(T0, T).
980
1002
1003redis_scan(Redis, LazyList, Options) :-
1004 scan_options([match,count,type], Options, Parms),
1005 lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
1006
1007redis_sscan(Redis, Set, LazyList, Options) :-
1008 scan_options([match,count,type], Options, Parms),
1009 lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
1010
1011redis_hscan(Redis, Hash, LazyList, Options) :-
1012 scan_options([match,count,type], Options, Parms),
1013 lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
1014
1015redis_zscan(Redis, Set, LazyList, Options) :-
1016 scan_options([match,count,type], Options, Parms),
1017 lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
1018
1019scan_options([], _, []).
1020scan_options([H|T0], Options, [H,V|T]) :-
1021 Term =.. [H,V],
1022 option(Term, Options),
1023 !,
1024 scan_options(T0, Options, T).
1025scan_options([_|T0], Options, T) :-
1026 scan_options(T0, Options, T).
1027
1028
1029scan_next(State, List, Tail) :-
1030 State = s(Command,Redis,Cursor,Params),
1031 Command =.. CList,
1032 append(CList, [Cursor|Params], CList2),
1033 Term =.. CList2,
1034 redis(Redis, Term, [NewCursor,Elems0]),
1035 scan_pairs(Command, Elems0, Elems),
1036 ( NewCursor == 0
1037 -> List = Elems,
1038 Tail = []
1039 ; nb_setarg(3, State, NewCursor),
1040 append(Elems, Tail, List)
1041 ).
1042
1043scan_pairs(hscan(_), List, Pairs) :-
1044 !,
1045 scan_pairs(List, Pairs).
1046scan_pairs(zscan(_), List, Pairs) :-
1047 !,
1048 scan_pairs(List, Pairs).
1049scan_pairs(_, List, List).
1050
1051scan_pairs([], []).
1052scan_pairs([Key,Value|T0], [Key-Value|T]) :-
1053 !,
1054 scan_pairs(T0, T).
1055scan_pairs([Key-Value|T0], [Key-Value|T]) :-
1056 scan_pairs(T0, T).
1057
1058
1059 1062
1069
1070redis_current_command(Redis, Command) :-
1071 redis_current_command(Redis, Command, _).
1072
1073redis_current_command(Redis, Command, Properties) :-
1074 nonvar(Command),
1075 !,
1076 redis(Redis, command(info, Command), [[_|Properties]]).
1077redis_current_command(Redis, Command, Properties) :-
1078 redis(Redis, command, Commands),
1079 member([Name|Properties], Commands),
1080 atom_string(Command, Name).
1081
1087
1088redis_property(Redis, Property) :-
1089 redis(Redis, info, String),
1090 info_terms(String, Terms),
1091 member(Property, Terms).
1092
1093info_terms(Info, Pairs) :-
1094 split_string(Info, "\n", "\r\n ", Lines),
1095 convlist(info_line_term, Lines, Pairs).
1096
1097info_line_term(Line, Term) :-
1098 sub_string(Line, B, _, A, :),
1099 !,
1100 sub_atom(Line, 0, B, _, Name),
1101 \+ sub_atom(Name, _, _, 0, '_human'),
1102 sub_string(Line, _, A, 0, ValueS),
1103 ( number_string(Value, ValueS)
1104 -> true
1105 ; Value = ValueS
1106 ),
1107 Term =.. [Name,Value].
1108
1109
1110 1113
1141
1142:- dynamic ( subscription/2, 1143 listening/3 1144 ) as volatile. 1145
1146redis_subscribe(Redis, Spec, Id, Options) :-
1147 atom(Redis),
1148 !,
1149 channels(Spec, Channels),
1150 pubsub_thread_options(ThreadOptions, Options),
1151 thread_create(setup_call_cleanup(
1152 redis_connect(Redis, Conn, [reconnect(true)]),
1153 redis_subscribe1(Redis, Conn, Channels),
1154 redis_disconnect(Conn)),
1155 Thread,
1156 ThreadOptions),
1157 pubsub_id(Thread, Id).
1158redis_subscribe(Redis, Spec, Id, Options) :-
1159 channels(Spec, Channels),
1160 pubsub_thread_options(ThreadOptions, Options),
1161 thread_create(redis_subscribe1(Redis, Redis, Channels),
1162 Thread,
1163 ThreadOptions),
1164 pubsub_id(Thread, Id).
1165
1166pubsub_thread_options(ThreadOptions, Options) :-
1167 merge_options(Options, [detached(true)], ThreadOptions).
1168
1169pubsub_id(Thread, Thread).
1173
1174redis_subscribe1(Redis, Conn, Channels) :-
1175 Error = error(Formal, _),
1176 catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
1177 ( var(Formal)
1178 -> true
1179 ; recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
1180 thread_self(Me),
1181 pubsub_id(Me, Id),
1182 findall(Channel, subscription(Id, Channel), CurrentChannels),
1183 redis_subscribe1(Redis, Conn, CurrentChannels)
1184 ).
1185
1186redis_subscribe2(Redis, Conn, Channels) :-
1187 redis_subscribe3(Conn, Channels),
1188 redis_listen(Redis, Conn).
1189
1190redis_subscribe3(Conn, Channels) :-
1191 thread_self(Me),
1192 pubsub_id(Me, Id),
1193 prolog_listen(this_thread_exit, pubsub_clean(Id)),
1194 maplist(register_subscription(Id), Channels),
1195 redis_stream(Conn, S, true),
1196 Req =.. [subscribe|Channels],
1197 redis_write_msg(S, Req).
1198
1199pubsub_clean(Id) :-
1200 retractall(listening(Id, _Connection, _Thread)),
1201 retractall(subscription(Id, _Channel)).
1202
1212
1213redis_subscribe(Id, Spec) :-
1214 channels(Spec, Channels),
1215 ( listening(Id, Connection, _Thread)
1216 -> true
1217 ; existence_error(redis_pubsub, Id)
1218 ),
1219 maplist(register_subscription(Id), Channels),
1220 redis_stream(Connection, S, true),
1221 Req =.. [subscribe|Channels],
1222 redis_write_msg(S, Req).
1223
1224redis_unsubscribe(Id, Spec) :-
1225 channels(Spec, Channels),
1226 ( listening(Id, Connection, _Thread)
1227 -> true
1228 ; existence_error(redis_pubsub, Id)
1229 ),
1230 maplist(unregister_subscription(Id), Channels),
1231 redis_stream(Connection, S, true),
1232 Req =.. [unsubscribe|Channels],
1233 redis_write_msg(S, Req).
1234
1238
1239redis_current_subscription(Id, Channels) :-
1240 findall(Id-Channel, subscription(Id, Channel), Pairs),
1241 keysort(Pairs, Sorted),
1242 group_pairs_by_key(Sorted, Grouped),
1243 member(Id-Channels, Grouped).
1244
1245channels(Spec, List) :-
1246 is_list(Spec),
1247 !,
1248 maplist(channel_name, Spec, List).
1249channels(Ch, [Key]) :-
1250 channel_name(Ch, Key).
1251
1252channel_name(Atom, Atom) :-
1253 atom(Atom),
1254 !.
1255channel_name(Key, Atom) :-
1256 phrase(key_parts(Key), Parts),
1257 !,
1258 atomic_list_concat(Parts, :, Atom).
1259channel_name(Key, _) :-
1260 type_error(redis_key, Key).
1261
1262key_parts(Var) -->
1263 { var(Var), !, fail }.
1264key_parts(Atom) -->
1265 { atom(Atom) },
1266 !,
1267 [Atom].
1268key_parts(A:B) -->
1269 key_parts(A),
1270 key_parts(B).
1271
1272
1273
1274
1275register_subscription(Id, Channel) :-
1276 ( subscription(Id, Channel)
1277 -> true
1278 ; assertz(subscription(Id, Channel))
1279 ).
1280
1281unregister_subscription(Id, Channel) :-
1282 retractall(subscription(Id, Channel)).
1283
1284redis_listen(Redis, Conn) :-
1285 thread_self(Me),
1286 pubsub_id(Me, Id),
1287 setup_call_cleanup(
1288 assertz(listening(Id, Conn, Me), Ref),
1289 redis_listen_loop(Redis, Id, Conn),
1290 erase(Ref)).
1291
1292redis_listen_loop(Redis, Id, Conn) :-
1293 redis_stream(Conn, S, true),
1294 ( subscription(Id, _)
1295 -> redis_read_stream(Redis, S, Reply),
1296 redis_broadcast(Redis, Reply),
1297 redis_listen_loop(Redis, Id, Conn)
1298 ; true
1299 ).
1300
1301redis_broadcast(_, [subscribe, _Channel, _N]) :-
1302 !.
1303redis_broadcast(Redis, [message, Channel, Data]) :-
1304 !,
1305 catch(broadcast(redis(Redis, Channel, Data)),
1306 Error,
1307 print_message(error, Error)).
1308redis_broadcast(Redis, Message) :-
1309 assertion((Message = [Type, Channel, _Data],
1310 atom(Type),
1311 atom(Channel))),
1312 debug(redis(warning), '~p: Unknown message while listening: ~p',
1313 [Redis,Message]).
1314
1315
1316 1319
1334
1335redis_read_stream(Redis, SI, Out) :-
1336 E = error(Formal,_),
1337 catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
1338 ( var(Formal)
1339 -> handle_push_messages(Push, Redis),
1340 ( var(Error)
1341 -> Out = Out0
1342 ; resync(Redis),
1343 throw(Error)
1344 )
1345 ; redis_disconnect(Redis, [force(true)]),
1346 throw(E)
1347 ).
1348
1349handle_push_messages([], _).
1350handle_push_messages([H|T], Redis) :-
1351 ( catch(handle_push_message(H, Redis), E,
1352 print_message(warning, E))
1353 -> true
1354 ; true
1355 ),
1356 handle_push_messages(T, Redis).
1357
1358handle_push_message(["pubsub"|List], Redis) :-
1359 redis_broadcast(Redis, List).
1363handle_push_message([message|List], Redis) :-
1364 redis_broadcast(Redis, [message|List]).
1365
1366
1373
1374resync(Redis) :-
1375 E = error(Formal,_),
1376 catch(do_resync(Redis), E, true),
1377 ( var(Formal)
1378 -> true
1379 ; redis_disconnect(Redis, [force(true)])
1380 ).
1381
1382do_resync(Redis) :-
1383 A is random(1_000_000_000),
1384 redis_stream(Redis, S, true),
1385 redis_write_msg(S, echo(A)),
1386 catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
1387 time_limit_exceeded,
1388 throw(error(time_limit_exceeded,_))).
1389
1390
1400
1401
1402
1403 1406
1407:- multifile
1408 prolog:error_message//1,
1409 prolog:message//1. 1410
1411prolog:error_message(redis_error(Code, String)) -->
1412 [ 'REDIS: ~w: ~s'-[Code, String] ].
1413
1414prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
1415 [ 'REDIS: connection error. Retrying in ~2f seconds'-[Wait], nl ],
1416 [ ' '-[] ], '$messages':translate_message(Error)