1:- module(http2_client, [http2_open/3,
2 http2_close/1,
3 http2_request/4]).
9:- use_module(library(predicate_options)). 10:- use_module(library(list_util), [split_at/4]). 11:- use_module(library(ssl), [ssl_context/3,
12 ssl_negotiate/5,
13 cert_accept_any/5]). 14:- use_module(library(socket), [tcp_connect/3,
15 tcp_select/3,
16 tcp_host_to_address/2]). 17:- use_module(library(url), [parse_url/2]). 18:- use_module(library(record)). 19:- use_module(frames). 20:- use_module(hpack, [lookup_header/3]). 21
22:- multifile prolog:message//1. 23prolog:message(unknown_frame(Code, In, State)) -->
24 [ "Unknown HTTP/2 frame ~w: ~w~nState: ~w"-[Code, In, State] ].
25prolog:message(bad_frame(State, In)) -->
26 [ "Couldn't read frame from ~w~nState: ~w"-[In, State] ].
27prolog:message(connection_closed(Error, Data, State)) -->
28 [ "Connection closed with error code ~w: ~w~nClient state: ~w"-[Error, Data, State] ].
29prolog:message(worker_died) --> [ "HTTP/2 client worker thread died"-[] ].
30
31connection_preface(`PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n`).
32
33default_complete_cb(Headers, _Body) :-
34 debug(http2_client(open), "Complete without callback set ~w", [Headers]).
35% [TODO] store state of connection, to determine what's valid to recieve/send
36:- record http2_stream(headers=[],
37 data=[],
38 done=false,
39 complete_cb=default_complete_cb).
40
41default_close_cb(Data) :-
42 debug(http2_client(open), "Connection closed without callback set ~w", [Data]).
43
44:- record http2_state(authority=false,
45 stream=false,
46 settings=settings{header_table_size: 4096,
47 enable_push: 0, %1,
48 max_concurrent_streams: unlimited,
49 initial_window_size: 65535,
50 max_frame_size: 16384,
51 max_header_list_size: unlimited},
52 recv_header_table=[],
53 recv_header_table_size=4096,
54 send_header_table=[],
55 send_header_table_size=4096,
56 next_stream_id=1,
57 last_stream_id=0,
58 substreams=streams{},
59 close_cb=default_close_cb).
60
61:- predicate_options(http2_open/3, 3, [close_cb(callable),
62 pass_to(ssl_context/3)]). 63:- record http2_ctx(stream=false, worker_thread_id=false).
71http2_open(URL, Http2Ctx, Options) :-
72 73 parse_url(URL, [protocol(https),host(Host)|Attrs]),
74 (memberchk(port(Port), Attrs) ; Port = 443), !,
75 debug(http2_client(open), "URL ~w -> Host ~w:~w", [URL, Host, Port]),
76 ssl_context(client, Ctx, [host(Host),
77 close_parent(true),
78 alpn_protocols([h2]),
79 cacert_file(system(root_certificates))
80 |Options]),
81 tcp_host_to_address(Host, Address),
82 debug(http2_client(open), "Host ~w -> Address ~w", [Host, Address]),
83 tcp_connect(Address:Port, PlainStreamPair, []),
84 debug(http2_client(open), "Connected", []),
85 stream_pair(PlainStreamPair, PlainRead, PlainWrite),
86 set_stream(PlainRead, buffer(false)),
87 ssl_negotiate(Ctx, PlainRead, PlainWrite,
88 SSLRead, SSLWrite),
89 debug(http2_client(open), "Negotiated", []),
90 stream_pair(Stream, SSLRead, SSLWrite),
91 92 connection_preface(ConnectionPreface),
93 format(Stream, "~s", [ConnectionPreface]),
94 95 send_frame(Stream, settings_frame([enable_push-0])),
96 flush_output(Stream),
97 98 (memberchk(close_cb(CloseCb), Options), ! ; CloseCb = default_close_cb),
99 make_http2_state([authority(Host),
100 stream(Stream),
101 close_cb(CloseCb)],
102 State),
103 thread_create(listen_socket(State), WorkerThreadId, [at_exit(warn_worker_died(Stream, CloseCb))]),
104 make_http2_ctx([stream(Stream), worker_thread_id(WorkerThreadId)],
105 Http2Ctx).
106
107warn_worker_died(Stream, CloseCb) :-
108 thread_self(ThreadId),
109 (thread_property(ThreadId, status(exception(finished)))
110 -> debug(http2_client(open), "Worker thread exited normally", [])
111 ; (print_message(warning, worker_died),
112 close(Stream),
113 thread_property(ThreadId, status(Status)),
114 call(CloseCb, _{cause: Status,
115 msg: "Worker thread died"}))).
120http2_close(Http2Ctx) :-
121 http2_ctx_worker_thread_id(Http2Ctx, ThreadId),
122 thread_send_message(ThreadId, done).
123
124:- meta_predicate http2_request(+, +, +, 2).
130http2_request(Ctx, Headers, Body, ResponseCb) :-
131 debug(http2_client(request), "Sending request ~w", [Ctx]),
132 http2_ctx_worker_thread_id(Ctx, WorkerId),
133 Msg = request{headers: Headers,
134 body: Body,
135 on_complete: ResponseCb},
136 thread_send_message(WorkerId, Msg).
137
139
140listen_socket(State0) :-
141 http2_state_stream(State0, Stream),
142 stream_to_lazy_list(Stream, StreamList),
143 listen_socket(State0, StreamList).
144listen_socket(State0, StreamList0) :-
145 thread_self(ThreadId),
146 (thread_get_message(ThreadId, Msg, [timeout(0)])
147 -> (debug(http2_client(request), "Client msg ~k", [Msg]),
148 handle_client_request(Msg, State0, State1),
149 debug(http2_client(request), "Msg sent new state ~w", [State1]))
150 ; State1 = State0), !,
151
152 http2_state_stream(State1, Stream),
153 tcp_select([Stream], Input, 0),
154 (( Input = [Stream] ; \+ attvar(StreamList0) )
155 -> (debug(http2_client(response), "Data available", []),
156 read_frame(State1, StreamList0, State2, StreamList1),
157 debug(http2_client(response), "Read data, rest ~w", [StreamList1]))
158 ; (State1 = State2, StreamList1 = StreamList0)),
159
160 listen_socket(State2, StreamList1).
161
162worker_shutdown(State, Cause) :-
163 http2_state_stream(State, Stream),
164 close(Stream),
165 debug(http2_client(open), "...closed", []),
166
167 http2_state_close_cb(State, CloseCb),
168 http2_state_last_stream_id(State, LastStreamId),
169 call(CloseCb, _{last_stream_id: LastStreamId,
170 cause: Cause}),
171 throw(finished).
172
174
175handle_client_request(done, State, _) :-
176 http2_state_stream(State, Stream),
177 http2_state_last_stream_id(State, LastId),
178 debug(http2_client(open), "Closing connection ~w...", [LastId]),
179 send_frame(Stream, goaway_frame(LastId, 0, [])),
180 flush_output(Stream),
181 worker_shutdown(State, "Client closed").
182handle_client_request(Msg, State0, State4) :-
183 Msg = request{headers: Headers_,
184 body: Body,
185 on_complete: ResponseCb},
186 http2_state_authority(State0, Authority),
187 Headers = [':authority'-Authority,':scheme'-https|Headers_],
188 http2_state_next_stream_id(State0, Ident),
189 stream_info(State0, Ident, StreamInfo0),
190 set_complete_cb_of_http2_stream(ResponseCb, StreamInfo0, StreamInfo1),
191 update_state_substream(Ident, StreamInfo1, State0, State1),
192 NextIdent is Ident + 2,
193 set_next_stream_id_of_http2_state(NextIdent, State1, State2),
194 debug(http2_client(request), "Sending headers ~w ~w", [Headers, State1]),
195 (Body = [] -> HeadersEnd = true ; HeadersEnd = false),
196 send_request_headers(Headers, Ident, HeadersEnd, State2, State3),
197 send_request_body(Body, Ident, State3, State4),
198 http2_state_stream(State4, Stream),
199 flush_output(Stream).
200
(Headers_, Ident, EndStream, State0, State1) :-
202 http2_state_send_header_table(State0, Table0),
203 wrapped_headers(Table0, Headers_, Headers),
204 http2_state_send_header_table_size(State0, TableSize),
205 http2_state_stream(State0, Stream),
206 http2_state_settings(State0, Settings),
207 MaxSize = Settings.max_frame_size,
208 send_frame(Stream,
209 header_frames(MaxSize,
210 Ident, Headers, TableSize-Table0-TableSize1-Table1,
211 [end_headers(true), end_stream(EndStream)])),
212 debug(http2_client(request), "Sent headers", []),
213 set_http2_state_fields(
214 [send_header_table(Table1),
215 send_header_table_size(TableSize1)],
216 State0, State1).
217
218send_request_body([], _, State, State) :- !.
219send_request_body(Body, Ident, State0, State0) :-
220 221 222 http2_state_stream(State0, Stream),
223 http2_state_settings(State0, Settings),
224 MaxSize = Settings.max_frame_size,
225 send_body_parts(Stream, Ident, MaxSize, Body).
226
227send_body_parts(_, _, _, []) :- !.
228send_body_parts(Stream, Ident, MaxSize, Body) :-
229 length(Body, BodyL),
230 BodyL =< MaxSize, !,
231 send_frame(Stream,
232 data_frame(Ident, Body, [end_stream(true)])).
233send_body_parts(Stream, Ident, MaxSize, Body) :-
234 split_at(MaxSize, Body, ToSend, Rest),
235 send_frame(Stream, data_frame(Ident, ToSend, [])),
236 send_body_parts(Stream, Ident, MaxSize, Rest).
237
(_, [], []) :- !.
239wrapped_headers(Table, [K-V|RestH], [indexed(K-V)|RestW]) :-
240 lookup_header(Table, K-V, _), !,
241 wrapped_headers(Table, RestH, RestW).
242wrapped_headers(Table, [K-V|RestH], [literal_inc(K-V)|RestW]) :-
243 !, wrapped_headers(Table, RestH, RestW).
244wrapped_headers(Table, [KV|RestH], [KV|RestW]) :-
245 wrapped_headers(Table, RestH, RestW).
246
248
249read_frame(State0, In, State2, Rest) :-
250 phrase(frames:frame(Type, Flags, Ident, Payload),
251 In, Rest), !,
252 debug(http2_client(response), "Read frame type ~w", [Type]),
253 phrase(frames:frame(Type, Flags, Ident, Payload), Bytes),
254 http2_state_last_stream_id(State0, LastIdent),
255 NewLastIdent is max(LastIdent, Ident),
256 set_last_stream_id_of_http2_state(NewLastIdent, State0, State1),
257 debug(http2_client(response), "Update last seen frame ~w", [NewLastIdent]),
258 handle_frame(Type, Ident, State1, Bytes, State2),
259 debug(http2_client(response), "Handled frame", []).
260read_frame(State, In, _, _) :-
261 print_message(warning, bad_frame(State, In)),
262 !, fail.
263
264handle_frame(0x0, _, State0, In, State2) :- 265 phrase(data_frame(Ident, Data, [end_stream(End)]), In), !,
266 length(Data, DataL),
267 debug(http2_client(response), "Data on stream ~w # = ~w end? ~w", [Ident, DataL, End]),
268 stream_info(State0, Ident, StreamInfo0),
269 http2_stream_data(StreamInfo0, OldData),
270 append(OldData, Data, NewData),
271 set_http2_stream_fields([data(NewData), done(End)],
272 StreamInfo0, StreamInfo1),
273 update_state_substream(Ident, StreamInfo1, State0, State1),
274 (End -> complete_client(Ident, State1, State2) ; State2 = State1).
275handle_frame(0x1, Ident, State0, In, State3) :- 276 http2_state_recv_header_table(State0, HeaderTable0),
277 http2_state_recv_header_table_size(State0, TableSize),
278 phrase(header_frame(Ident,
279 Headers,
280 TableSize-HeaderTable0-TableSize1-HeaderTable1,
281 282 [end_stream(EndStream),
283 end_headers(EndHeaders)]),
284 In), !,
285 debug(http2_client(response), "Header frame ~w", [Headers]),
286 stream_info(State0, Ident, StreamInfo),
287 http2_stream_headers(StreamInfo, PreviousHeaders),
288 append(PreviousHeaders, Headers, NewHeaders),
289 debug(http2_client(response), "NEW HEADERS ~w", [NewHeaders]),
290 set_http2_stream_fields([done(EndStream),
291 headers(NewHeaders)],
292 StreamInfo, StreamInfo1),
293 update_state_substream(Ident, StreamInfo1, State0, State1),
294 set_http2_state_fields([recv_header_table(HeaderTable1),
295 recv_header_table_size(TableSize1)],
296 State1, State2),
297 ((EndStream, EndHeaders)
298 -> complete_client(Ident, State2, State3)
299 ; State3 = State2).
300handle_frame(0x2, _Ident, State0, _In, State0). 301handle_frame(0x3, Ident, State0, In, State2) :- 302 phrase(rst_frame(Ident, ErrCode), In), !,
303 debug(http2_client(response), "Rst frame ~w ~w", [Ident, ErrCode]),
304 stream_info(State0, Ident, StreamInfo0),
305 set_done_of_http2_stream(true, StreamInfo0, StreamInfo1),
306 update_state_substream(Ident, StreamInfo1, State0, State1),
307 complete_client(Ident, State1, State2).
308handle_frame(0x4, _, State0, In, State0) :- 309 phrase(settings_ack_frame, In), !.
310handle_frame(0x4, _, State0, In, State1) :- 311 debug(http2_client(response), "read settings ~w", [In]),
312 phrase(settings_frame(UpdateSettings), In), !,
313 debug(http2_client(response), "Settings ~w", [UpdateSettings]),
314 http2_state_settings(State0, Settings),
315 update_settings(Settings, UpdateSettings, NewSettings),
316 NewTableSize = NewSettings.header_table_size,
317 http2_state_recv_header_table(State0, OldTable),
318 hpack:keep_fitting(NewTableSize, OldTable, NewTable),
319 set_http2_state_fields([settings(NewSettings),
320 recv_header_table(NewTable),
321 recv_header_table_size(NewTableSize)],
322 State0, State1),
323 324 325 http2_state_stream(State1, Stream),
326 send_frame(Stream, settings_ack_frame), flush_output(Stream).
327handle_frame(0x5, Ident, State0, In, State2) :- 328 http2_state_recv_header_table(State0, TableIn),
329 http2_state_recv_header_table_size(State0, TableSize),
330 phrase(push_promise_frame(Ident, NewIdent, TableSize-TableIn-TableSizeOut-TableOut-Headers,
331 [end_headers(_EndHeaders)]),
332 In), !,
333 debug(http2_client(response), "Push promise Stream ~w headers ~w", [NewIdent, Headers]),
334
335 stream_info(State0, NewIdent, StreamInfo0),
336 set_headers_of_http2_stream(Headers, StreamInfo0, StreamInfo1),
337 update_state_substream(NewIdent, StreamInfo1, State0, State1),
338
339 http2_state_last_stream_id(State1, LastStreamId),
340 NewLastId is max(LastStreamId, NewIdent),
341 set_http2_state_fields([last_stream_id(NewLastId),
342 recv_header_table(TableOut),
343 recv_header_table_size(TableSizeOut)],
344 State1, State2).
345handle_frame(0x6, _, State, In, State) :- 346 phrase(ping_frame(_, Ack), In), !,
347 (Ack
348 ; (http2_state_stream(State, Stream),
349 send_frame(Stream, ping_frame(`12345678`, true)))).
350handle_frame(0x7, _, State0, In, State0) :- 351 phrase(goaway_frame(LastStreamId, Error, Data), In),
352 debug(http2_client(response), "GOAWAY frame: ~w ~w ~w", [LastStreamId, Error, Data]),
353 (Error = 0 ; print_message(warning, connection_closed(Error, Data, State0))),
354 worker_shutdown(State0, _{msg: "goaway frame",
355 error: Error,
356 data: Data}).
357handle_frame(0x8, _, State0, In, State0) :- 358 phrase(window_update_frame(Ident, Increment), In), !,
359 debug(http2_client(response), "window frame ~w ~w", [Ident, Increment]),
360 361 true.
362handle_frame(0x9, Ident, State0, In, State3) :- 363 http2_state_recv_header_table(State0, HeaderTable0),
364 http2_state_recv_header_table_size(State0, TableSize),
365 phrase(continuation_frame(Ident,
366 TableSize-HeaderTable0-TableSizeOut-HeaderTable1-Headers,
367 EndHeaders),
368 In),
369 stream_info(State0, Ident, StreamInfo),
370 http2_stream_headers(StreamInfo, PreviousHeaders),
371 append(PreviousHeaders, Headers, NewHeaders),
372 373 374 set_headers_of_http2_stream(NewHeaders, StreamInfo, StreamInfo1),
375 update_state_substream(Ident, StreamInfo1, State0, State1),
376 set_http2_state_fields([recv_header_table(HeaderTable1),
377 recv_header_table_size(TableSizeOut)],
378 State1, State2),
379 http2_stream_done(StreamInfo1, StreamDone),
380 ((StreamDone, EndHeaders)
381 -> complete_client(Ident, State2, State3)
382 ; State3 = State2).
383handle_frame(Code, _, State, In, State) :-
384 print_message(warning, unknown_frame(Code, In, State)),
385 !, fail.
386
388
389complete_client(Ident, State0, State1) :-
390 stream_info(State0, Ident, StreamInfo),
391 notify_client_done(StreamInfo),
392 remove_state_substream(Ident, State0, State1).
393
394notify_client_done(StreamInfo) :-
395 http2_stream_complete_cb(StreamInfo, Cb),
396 http2_stream_headers(StreamInfo, Headers),
397 http2_stream_data(StreamInfo, Body),
398 catch(call(Cb, Headers, Body),
399 Err,
400 debug(http2_client(request), "Error invoking cb ~w", [Err])).
401
403
404:- meta_predicate send_frame(+, :). 405send_frame(Stream, Frame) :-
406 debug(http2_client(request), "sending frame ~w", [Frame]),
407 phrase(Frame, FrameCodes), !,
408 format(Stream, "~s", [FrameCodes]).
409
410update_settings(New, [], New).
411update_settings(Old, [K-V|Rest], New) :-
412 put_dict(K, Old, V, Update),
413 update_settings(Update, Rest, New).
414
415stream_info(State, Ident, Stream) :-
416 http2_state_substreams(State, Streams),
417 (get_dict(Ident, Streams, Stream) ; make_http2_stream([], Stream)).
418
419update_state_substream(Ident, StreamInfo, State0, State1) :-
420 http2_state_substreams(State0, Streams0),
421 put_dict(Ident, Streams0, StreamInfo, Streams1),
422 set_substreams_of_http2_state(Streams1, State0, State1).
423
424remove_state_substream(Ident, State0, State1) :-
425 http2_state_substreams(State0, Streams0),
426 del_dict(Ident, Streams0, _, Streams1),
427 set_substreams_of_http2_state(Streams1, State0, State1)
HTTP/2 client