35
43
44:-module(cql_database,
45 [get_transaction_context/5,
46 odbc_execute_with_statement_cache/7,
47 save_database_event/6,
48 application_value_to_odbc_value/7,
49 odbc_value_to_application_value/5,
50 cql_transaction/3,
51 database_transaction_query_info/3,
52 current_transaction_id/1,
53 transaction_active/0,
54 register_database_connection_details/2,
55 resolve_deadlock/1,
56 database_connection_details/2,
57 odbc_connection_call/3,
58 update_history/14,
59 odbc_cleanup_and_disconnect/1]). 60
61:-use_module(library(cql/cql)). 62:-use_module(library(debug)). 63
64:-dynamic
65 database_connection_details/2. 66:-volatile
67 database_connection_details/2. 68
69:-thread_local
70 database_event/6,
71 transaction_active/0,
72 transaction_context/4,
73 database_transaction_query_info/3. 74
75get_transaction_context(TransactionId, TrxId, AccessToken, TransactionTimestamp, Connection) :-
76 ( transaction_context(TransactionId_, AccessToken_, TransactionTimestamp_, Connection_) ->
77 TransactionId = TransactionId_,
78 TrxId = {null},
79 AccessToken = AccessToken_,
80 TransactionTimestamp = TransactionTimestamp_,
81 Connection = Connection_
82
83 ; otherwise ->
84 throw(no_database_transaction_active)
85 ).
86
87
88:-meta_predicate
89 odbc_connection_call(+, -, 0). 90
91:-thread_local
92 93 odbc_connection_available/2,
94 95 odbc_connection_in_use/1. 96
97:-multifile(cql_max_db_connections_hook/1). 98:-multifile(cql:odbc_connection_complete_hook/3). 99odbc_connection_call(Schema, Connection, Goal) :-
100 ( retract(odbc_connection_available(Schema, Connection)) -> 101 assert(odbc_connection_in_use(Schema)),
102 setup_call_cleanup(true,
103 Goal,
104 ( odbc_end_transaction(Connection, rollback), 105 retract(odbc_connection_in_use(Schema)),
106 assert(odbc_connection_available(Schema, Connection)))) 107 ; aggregate_all(r(count), odbc_connection_in_use(Schema), r(N)),
108 ( cql_max_db_connections_hook(MaxDbConnections)->
109 true
110 ; otherwise->
111 MaxDbConnections = 10
112 ),
113 N >= MaxDbConnections ->
114 thread_self(ThreadId),
115
116 cql_error(too_many_schema_connections, 'Too many connections on ~w: Maximum is ~w', [ThreadId, MaxDbConnections])
117
118 ; database_connection_details(Schema, ConnectionDetails) ->
119 ( ConnectionDetails = driver_string(DriverString) ->
120 true
121
122 ; ConnectionDetails = dsn(Dsn, Username, Password) ->
123 gethostname(HostName),
124 format(atom(DriverString), 'DSN=~w;UID=~w;PWD=~w;WSID=~w;', [Dsn, Username, Password, HostName])
125
126 ; ConnectionDetails = dsn(Dsn) ->
127 gethostname(HostName),
128 format(atom(DriverString), 'DSN=~w;WSID=~w;', [Dsn, HostName])
129
130 ; otherwise ->
131 throw(invalid_connection_details(ConnectionDetails))
132 ),
133
134 odbc_connect(-,
135 Connection,
136 [driver_string(DriverString),
137 silent(true),
138 null({null}),
139 auto_commit(false),
140 wide_column_threshold(8000),
141 mars(true)]), 142
143 thread_at_exit(odbc_cleanup_and_disconnect(Connection)),
144 assert(odbc_connection_available(Schema, Connection)),
145
146 ignore(cql:odbc_connection_complete_hook(Schema, ConnectionDetails, Connection)),
147 odbc_connection_call(Schema, Connection, Goal)
148
149 ; otherwise ->
150 throw(no_database_connection_details)
151 ).
152
153
154
155
156
167
168odbc_cleanup_and_disconnect(Connection) :-
169 catch_all(odbc_cleanup_and_disconnect_1(Connection),
170 E,
171 ( thread_self(ThreadId),
172 cql_log([], error, '[~w] odbc_cleanup_and_disconnect/1 : ~w', [ThreadId, E]))).
173
174odbc_cleanup_and_disconnect_1(Connection) :-
175 thread_self(ThreadId),
176 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]),
177 odbc_end_transaction(Connection, rollback),
178 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]),
179 forall(retract(cached_prepared_odbc_statement(_, _, Connection, _, _, CachedStatement, _)),
180 ( debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]),
181 odbc_free_statement(CachedStatement),
182 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)])
183 )
184 ),
185 retractall(lru_key(_)),
186 retractall(lru_statement(_)),
187 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]),
188 odbc_disconnect(Connection),
189 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]),
190 191 retractall(sql_server_spid(Connection, _, _, _)).
192
194
195:-thread_local
196 197 198 199 200 201 202 203 cached_prepared_odbc_statement/7. 204
205:-thread_local
206 lru_statement/1,
207 statement_locked/1,
208 lru_key/1. 209
210max_lru_size(4000).
211
213evict_cache_entries(_, 0):- !.
214evict_cache_entries(Key, N):-
215 N > 0,
216 retract(lru_statement(MutexId)),
217 218 ( statement_locked(MutexId)->
219 220 true
221 ; otherwise->
222 thread_self(ThreadId),
223 retract(cached_prepared_odbc_statement(Sql, _, _, _, _, Statement, MutexId)),
224 odbc_free_statement(Statement),
225 debug(odbc_statement_cache, 'CACHE-EVICT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]),
226 flag(Key, X, X-1)
227 ),
228 NN is N-1,
229 !,
230 evict_cache_entries(Key, NN).
231
232
233odbc_execute_with_statement_cache(Connection, _, _, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :-
234 cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, _, _, Statement, MutexId),
235 !,
236 setup_call_cleanup(assert(statement_locked(MutexId)),
237 ( thread_self(ThreadId),
238 retract(lru_statement(MutexId)),
239 assertz(lru_statement(MutexId)),
240 debug(odbc_statement_cache, 'CACHE-HIT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]),
241 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row)
242 ),
243 retract(statement_locked(MutexId))).
244
245odbc_execute_with_statement_cache(Connection, FileName, LineNumber, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :-
246 thread_self(ThreadId),
247 debug(odbc_statement_cache, 'CACHE-MISS [~w] : ~@', [ThreadId, trimmed_sql(Sql, 80)]),
248 odbc_prepare(Connection, Sql, OdbcParameterDataTypes, Statement, []),
249 gensym(statement_lock_, MutexId),
250 ( lru_key(Key)->
251 true
252 ; otherwise->
253 gensym(lru_key_, Key),
254 assert(lru_key(Key))
255 ),
256 setup_call_cleanup(assert(statement_locked(MutexId)),
257 ( assertz(cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId)),
258 assertz(lru_statement(MutexId)),
259 max_lru_size(MaxSize),
260 flag(Key, CacheSize, CacheSize+1),
261 ( CacheSize >= MaxSize->
262 Delta is CacheSize - MaxSize,
263 evict_cache_entries(Key, Delta)
264 ; otherwise->
265 true
266 ),
267 flag(Key, Z, Z),
268 debug(odbc_statement_cache, 'CACHE-STORE [~w] ~w, ~w : ~@', [ThreadId, Statement, MutexId, trimmed_sql(Sql, 60)]),
269 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row)
270 ),
271 retract(statement_locked(MutexId))).
272
273
274
284save_database_event(AccessToken, 285 EventType, 286 Schema, 287 TableName, 288 PrimaryKeyColumnName, 289 PrimaryKey) :- 290 ( database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey) ->
291 292 true
293 ; otherwise->
294 assert(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey))
295 ).
296
297
298:-meta_predicate(cql_transaction(+, +, 0)). 299
300cql_transaction(Schema, AccessToken, Goal):-
301 thread_self(ThreadId),
302 setup_call_cleanup(assert(transaction_active),
303 cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet),
304 ( retractall(database_transaction_query_info(ThreadId, _, _)),
305 retractall(transaction_context(_, _, _, _)),
306 retractall(database_event(_, _, _, _, _, _)),
307 flag(transaction_count, Count, Count+1),
308 retractall(transaction_active))), 309 cql_process_database_events(DatabaseEventsSet).
310
311cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet):-
312 ( transaction_context(ExistingTransactionId, _, _, _) ->
313 throw(database_transaction_already_in_progress(ExistingTransactionId))
314 ; otherwise ->
315 true
316 ),
317 resolve_deadlock(cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet)).
318
319cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet) :-
320 odbc_connection_call(Schema,
321 Connection,
322 ( ( dbms(Schema, 'Microsoft SQL Server')->
323 odbc_query(Connection, 'SELECT CONVERT(VARCHAR(36), NEWID())', row(TransactionId))
324 ; dbms(Schema, 'PostgreSQL') ->
325 odbc_query(Connection, 'SELECT uuid_generate_v1()', row(TransactionId))
326 ; dbms(Schema, 'SQLite') ->
327 odbc_query(Connection, 'SELECT substr(u,1,8)||\'-\'||substr(u,9,4)||\'-4\'||substr(u,13,3)||\'-\'||v||substr(u,17,3)||\'-\'||substr(u,21,12) from (select lower(hex(randomblob(16))) as u, substr(\'89ab\',abs(random()) % 4 + 1, 1) as v)', row(TransactionId))
328 ; otherwise ->
329 throw(no_dbms_for_schema(Schema))
330 ),
331 dbms(Schema, DBMS),
332 store_transaction_info(AccessToken, Connection, DBMS, Goal),
333 get_time(ExecutionTime),
334 assert(transaction_context(TransactionId, AccessToken, ExecutionTime, Connection)),
335
336 ( cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) ->
337 true
338 ; otherwise ->
339 340 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_logic_failure),
341 fail
342 ))).
343
344
345:-meta_predicate
346 cql_transaction_3(0, +, +, +, -). 347
348cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) :-
349 log_transaction_state(AccessToken, TransactionId, transaction_starting),
350 catch(Goal, E, Error = E),
351 352 findall(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey),
353 retract(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)),
354 DatabaseEvents),
355 356 list_to_set(DatabaseEvents, DatabaseEventsSet),
357 ( var(Error) ->
358 odbc_end_transaction(Connection, commit),
359 log_transaction_state(AccessToken, TransactionId, transaction_committed)
360
361 ; otherwise ->
362 363 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_error),
364 throw(Error)
365 ).
366
367
380
381:-meta_predicate
382 resolve_deadlock(0). 383
384resolve_deadlock(Goal) :-
385 thread_self(ThreadId),
386 flag(transaction_count, InitialCount, InitialCount),
387
388 maximum_deadlock_retries(MaximumDeadlockRetries),
389 between(1, MaximumDeadlockRetries, RetryCount), 390
391 ( RetryCount >= MaximumDeadlockRetries ->
392 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_FAILED\tCOULD NOT RESOLVE deadlock on thread \'~w\'. Goal: ~w', [ThreadId, Goal]),
393 throw(deadlock_retry_count_exceeded(MaximumDeadlockRetries))
394
395 ; RetryCount > 1 ->
396 397 flag(transaction_count, CurrentCount, CurrentCount),
398 ( CurrentCount =:= InitialCount ->
399 Flag = no_other_transaction_completed
400 ; otherwise ->
401 Flag = another_transaction_completed
402 )
403
404 ; otherwise ->
405 Flag = no_deadlock
406 ),
407
408 ( Flag == no_other_transaction_completed ->
409 Delay is ( 2 << RetryCount) / 1000.0, 410 sleep(Delay),
411 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\'(attempt ~w). Initiated by EXPIRY of RANDOM WAIT of ~w seconds.', [ThreadId, RetryCount, Delay])
412
413 ; Flag == another_transaction_completed ->
414 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\' (attempt ~w). Initiated by COMPLETION of a TRANSACTION on another thread.', [ThreadId, RetryCount])
415 ; otherwise ->
416 true
417 ),
418
419 catch_all((Goal ->
420 LogicalStatus = 1
421 ; otherwise ->
422 true
423 ),
424 error(odbc('40001', _, _), _),
425 ( cql_log([debug(deadlocks)], warning, 'DEADLOCK_DETECTED\tThread \'~w\' selected as DEADLOCK VICTIM. Goal: ~w', [ThreadId, Goal]),
426 retractall(database_transaction_query_info(ThreadId, _, _)),
427 retractall(transaction_context(_, _, _, _)),
428 retractall(database_event(_, _, _, _, _, _)),
429 fail)),
430 ( RetryCount > 1 ->
431 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLVED\tdeadlocked transaction on thread \'~w\' RESOLVED (attempt ~w).', [ThreadId, RetryCount])
432
433 ; otherwise ->
434 true
435 ),
436 !, 437 LogicalStatus == 1.
438
439
443
444maximum_deadlock_retries(10).
445
447
448log_transaction_state(AccessToken, TransactionId, TransactionState) :-
449 cql_access_token_to_user_id(AccessToken, UserId),
450 upcase_atom(TransactionState, TransactionStateUc),
451 cql_log([], informational, '\t~p\t~p\t~p', [UserId, TransactionId, TransactionStateUc]).
452
453
459
460register_database_connection_details(Schema, ConnectionDetails) :-
461 assert(database_connection_details(Schema, ConnectionDetails)).
462
463
464update_history(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal):-
465 ignore(cql_update_history_hook(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal)).
466
467
468
470:-multifile(cql:application_value_to_odbc_value_hook/7). 471application_value_to_odbc_value(ApplicationValue, OdbcDataType, Schema, TableName, ColumnName, Qualifiers, OdbcValue):-
472 ( var(ApplicationValue)->
473 throw(instantiation_error(ApplicationValue))
474 ; cql:application_value_to_odbc_value_hook(OdbcDataType, Schema, TableName, ColumnName, Qualifiers, ApplicationValue, OdbcValue)->
475 true
476 ; otherwise->
477 OdbcValue = ApplicationValue
478 ).
479
480
481odbc_numeric_precision_limit(27).
482
483
485:-multifile(cql:odbc_value_to_application_value_hook/7). 486odbc_value_to_application_value(Schema, TableSpec, ColumnName, OdbcValue, ApplicationValue):-
487 cql_data_type(Schema, TableSpec, ColumnName, DatabaseDataType, _, _, _, Domain, _, _),
488 !,
489 ( cql:odbc_value_to_application_value_hook(DatabaseDataType, Schema, TableSpec, ColumnName, Domain, OdbcValue, ApplicationValue)->
490 true
491 ; otherwise->
492 ApplicationValue = OdbcValue
493 ).
494
496catch_all(A, B, C):- catch(A, B, C).
497
498
499:-multifile(cql:process_database_events/1). 500cql_process_database_events(Events):-
501 ignore(cql:process_database_events(Events)).
502
503:-multifile(cql:cql_transaction_info_hook/5). 504store_transaction_info(AccessToken, Connection, DBMS, Goal):-
505 ( cql:cql_transaction_info_hook(AccessToken, Connection, DBMS, Goal, Info)->
506 true
507 ; otherwise->
508 Info = {null}
509 ),
510 thread_self(ThreadId),
511 assert(database_transaction_query_info(ThreadId, Goal, Info)).
512
514
515current_transaction_id(TransactionId):-
516 transaction_context(TransactionId, _, _, _)