1:- module(connection_pool, [
2 get_connection/2, 3 mark_alive/2, 4 mark_dead/2 5]).
14:- use_module(library(random)). 15:- use_module(library(lists)). 16
17:- use_module(registry). 18:- use_module(util).
25get_connection(Ps, Connection) :-
26 with_mutex(Ps, get_connection0(Ps, Connection)).
27
28get_connection0(Ps, Connection) :-
29 resurrect(Ps, false, _),
30 connections(Ps, Connections),
31 ( Connections = []
32 -> resurrect(Ps, true, Connection)
33 ; options(Ps, Options),
34 ( memberchk(random_selector(false), Options)
35 -> next_rr(Ps, RR),
36 length(Connections, Length),
37 Index is RR mod Length,
38 nth0(Index, Connections, Connection)
39 ; random_select(Connection, Connections, _)
40 )
41 ).
42
43next_rr(Ps, RR) :-
44 recorded(Ps, Value, Ref),
45 erase(Ref),
46 RR is Value.vars.rr + 1,
47 recorda(Ps, Value.put(vars, Value.vars.put(rr, RR))).
53mark_alive(Ps, Connection) :-
54 with_mutex(Ps, mark_alive0(Ps, Connection)).
55
56mark_alive0(Ps, Connection) :-
57 recorded(Ps, Value, Ref),
58 erase(Ref),
59 DeadCount = Value.vars.dead_count,
60 ( selectchk(Connection-_, DeadCount, DeadCount1)
61 -> true
62 ; DeadCount1 = DeadCount
63 ),
64 recorda(Ps, Value.put([vars=Value.vars.put(dead_count, DeadCount1)])).
70mark_dead(Ps, Connection) :-
71 with_mutex(Ps, mark_dead0(Ps, Connection)).
72
73mark_dead0(Ps, Connection) :-
74 recorded(Ps, Value, Ref),
75 erase(Ref),
76 Connections = Value.vars.connections,
77 ( selectchk(Connection, Connections, Connections1)
78 -> update_connection_info(Ps, Connection, Connections1, Value)
79 ; true
80 ).
81
82update_connection_info(Ps, Connection, Connections, Value) :-
83 _{dead_connections:DeadConnections, dead_count:DeadCount} :< Value.vars,
84 ( selectchk(Connection-Count, DeadCount, DeadCount1)
85 -> true
86 ; Count = 0
87 ),
88 Count1 is Count + 1,
89 memberchk(dead_timeout(DeadTimeout), Value.options),
90 memberchk(timeout_cutoff(TimeoutCutoff), Value.options),
91 Timeout is DeadTimeout * 2 ** min(Count1 - 1, TimeoutCutoff),
92 get_time(Now),
93 Future is Now + Timeout,
94 DeadConnections1 = [Future-Connection|DeadConnections],
95 debug(connection_pool,
96 'mark connect ~w dead with timeout ~w and count ~w', [Connection, Timeout, Count1]),
97 keysort(DeadConnections1, DeadConnections2),
98 Value1 = Value.put(vars, Value.vars.put(_{
99 connections:Connections,
100 dead_connections:DeadConnections2,
101 dead_count:[Connection-Count1|DeadCount1]})),
102 recorda(Ps, Value1).
103
104resurrect(Ps, Force, Connection) :-
105 with_mutex(Ps, resurrect0(Ps, Force, Connection)).
106
107resurrect0(Ps, Force, Connection) :-
108 recorded(Ps, Value, Ref),
109 erase(Ref),
110 _{connections:Connections, dead_connections:DeadConnections} :< Value.vars,
111 ( DeadConnections = []
112 -> ( Force
113 -> random_select(Connection, Value.hosts, _),
114 debug(connection_pool,
115 'forced to resurrect, choose a random one ~w', [Connection])
116 ; true
117 ),
118 Value1 = Value
119 ; DeadConnections = [Timeout-Connection|DeadConnections1],
120 ( once((get_time(Now), Timeout =< Now; Force))
121 -> Connections1 = [Connection|Connections],
122 debug(connection_pool,
123 'dead timeout or forced ~w', [Connection]),
124 Value1 = Value.put(vars, Value.vars.put(_{
125 connections:Connections1, dead_connections:DeadConnections1}))
126 ; Value1 = Value
127 )
128 ),
129 recorda(Ps, Value1)
Connection pool.
Connection pool manages lifecycle of connections.