35
36:- module(http_dyn_workers,
37 [
38 ]). 39:- use_module(library(http/thread_httpd)). 40:- use_module(library(debug)). 41:- use_module(library(settings)). 42:- use_module(library(aggregate)). 43
44:- setting(http:max_workers, integer, 100,
45 "Maximum number of workers to create"). 46:- setting(http:worker_idle_limit, number, 10,
47 "Terminate a dynamic worker when idle for this time"). 48:- setting(http:max_load, number, 10,
49 "Maximum load average caused by HTTP workers"). 50
81
87
88:- multifile
89 http:schedule_workers/1. 90
91http:schedule_workers(Dict) :-
92 get_time(Now),
93 catch(thread_send_message('__http_scheduler', no_workers(Now, Dict)),
94 error(existence_error(message_queue, _), _),
95 fail),
96 !.
97http:schedule_workers(Dict) :-
98 create_scheduler,
99 http:schedule_workers(Dict).
100
101create_scheduler :-
102 catch(thread_create(http_scheduler, _,
103 [ alias('__http_scheduler'),
104 inherit_from(main),
105 debug(false),
106 detached(true)
107 ]),
108 error(_,_),
109 fail).
110
111http_scheduler :-
112 get_time(Now),
113 http_scheduler(_{ waiting:0,
114 time:Now
115 }).
116
117http_scheduler(State) :-
118 ( thread_self(Me),
119 thread_get_message(Me, Task, [timeout(10)])
120 -> true
121 ; Task = update_load_avg
122 ),
123 ( catch(reschedule(Task, State, State1),
124 Error,
125 ( print_message(warning, Error),
126 fail))
127 -> !,
128 http_scheduler(State1)
129 ; http_scheduler(State)
130 ).
131
133
134reschedule(no_workers(Reported, Dict), State0, State) :-
135 update_load_avg(Dict, State0, State, Load),
136 setting(http:max_load, MaxLoad),
137 ( Load > MaxLoad
138 -> debug(http(scheduler), 'Load ~1f > ~1f; not adding workers',
139 [ Load, MaxLoad ])
140 ; aggregate_all(count, http_current_worker(Dict.port, _), Workers),
141 setting(http:max_workers, MaxWorkers),
142 ( Workers >= MaxWorkers
143 -> debug(http(scheduler),
144 'Reached max workers (~D); not adding workers',
145 [ MaxWorkers ])
146 ; Wait is 0.001*(MaxWorkers/max(1, MaxWorkers-Workers)),
147 get_time(Now),
148 Sleep is max(0.001, Wait + Reported-Now),
149 debug(http(scheduler),
150 'Waiting: ~w; active: ~w; sleep: ~3f; load: ~1f',
151 [Dict.waiting, Workers, Sleep, Load]),
152 sleep(Sleep),
153 accept_queue(Dict, Queue),
154 message_queue_property(Queue, size(Newsize)),
155 ( Newsize == 0
156 -> debug(http(scheduler), 'Drained', [])
157 ; debug(http(scheduler), 'Size is ~w: adding worker', [Newsize]),
158 setting(http:worker_idle_limit, MaxIdle),
159 http_add_worker(Dict.port,
160 [ max_idle_time(MaxIdle)
161 ])
162 )
163 )
164 ).
165reschedule(update_load_avg, State0, State) :-
166 update_load_avg(_{}, State0, State, _).
167
168update_load_avg(_Dict, State, State, Load) :-
169 _{stamp:Last, load:Load} :< State.get(load),
170 get_time(Now),
171 Now - Last < 10.
172update_load_avg(Dict, State0, State, Load) :-
173 server_port(Dict, State0, State1, Port),
174 !,
175 aggregate_all(sum(CPU), worker_cpu(Port, CPU), CPU1),
176 get_time(Now),
177 ( LoadDict = State1.get(load),
178 _{stamp:Last, cpu:LastCPU} :< LoadDict
179 -> Load0 is (CPU1-LastCPU)/(Now-Last),
180 smooth_load(LoadDict, Load0, Load),
181 State = State1.put(load, _{stamp:Now, cpu:CPU1, load:Load})
182 ; State = State1.put(load, _{stamp:Now, cpu:CPU1}),
183 Load = 0
184 ).
185update_load_avg(_, _, _, 0).
186
187worker_cpu(Port, CPU) :-
188 http_current_worker(Port, Thread),
189 catch(thread_statistics(Thread, cputime, CPU), _, fail).
190
191server_port(_Dict, State, State, Port) :-
192 Port = State.get(port),
193 !.
194server_port(Dict, State0, State, Port) :-
195 Port = Dict.get(port),
196 State = State0.put(port, Port).
197
198smooth_load(LoadDict, Load0, Load) :-
199 OldLoad = LoadDict.get(load),
200 !,
201 Load is (5*OldLoad+Load0)/6.
202smooth_load(_, Load, Load).
203
208
209accept_queue(Dict, Queue) :-
210 Queue = Dict.get(queue),
211 !.
212accept_queue(Dict, Queue) :-
213 thread_httpd:current_server(Dict.port, _, _, Queue, _, _),
214 !