1:- module(k8s_client, [
    2    k8s_create_resource/6,      % +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instanc:dict, -InstanceOut:dict, +Options:list
    3    k8s_delete_resource/5,      % +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instance:dict/atom, +Options:list
    4    k8s_get_resource/6,         % +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, ?InstanceName:atomic, -Instance:dict, +Options:list
    5    k8s_resource_types/2,       % -ResourceTypes:list(dict), +Options:list
    6    k8s_update_resource/6,      % +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instanc:dict, -InstanceOut:dict, +Options:list
    7    k8s_watch_resources/5,      % :Callback, +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Options:list
    8    k8s_watch_resources_async/6 % :Callback, +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, -StopWatcher, +Options:list
    9    ]).
Predicates for communicating with the kubernetes API server, with support to standard configuration options via KUBECONFIG, `~/.kube/config`, or from-pod-process access.

Example: `prolog ?- use(library(k8s_client)). ?- k8s_get_resource(core, v1, pods, PodName, _, [k8s_namespace(myns)]). PodName = "dex-567cdd88cd-dbv9x" ; PodName = "envoy-proxy-599b679cf7-94764" ; ...

   23:- set_prolog_flag(generate_debug_info, false).   24
   25:- use_module(library(yaml)).   26:- use_module(library(http/http_open)).   27:- use_module(library(http/json)).   28:- use_module(library(http/http_client)).   29
   30
   31:- dynamic
   32    cluster_resources/2,
   33    watcher_status/2.   34
   35:- meta_predicate 
   36    k8s_watch_resources(2, +, +, +, +),
   37    k8s_watch_resources_async(2, +, +, +, -, +),
   38    watch_modification_call(2, +, +, +, -, +, -),
   39    watch_resources_loop(2, +, +, +, +, +),
   40    watch_stream(2, +, +, +, +, +).   41
   42%%% PUBLIC PREDICATES %%%%%%%%%%%%%%%%%%%%%%%%%%
 k8s_create_resource(+ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instanc:dict, -InstanceOut:dict, +Options:list) is semidet
Creates a resource at the Kubernetes API and unifies the InstanceOut with the server response. Options are same as for the predicate k8s_get_resource/6
   47k8s_create_resource(ApiGroup, Version, ResourceTypeName, Instance, InstanceOut, Options) :-
   48    context_options(Options, Options1),
   49    resource_uri(ApiGroup, Version, ResourceTypeName, Uri, Options1),
   50    api_get(Uri, InstanceOut, [method(post), post(json(Instance)) |Options1 ]).
 k8s_delete_resource(+ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instance:dict/atom, +Options:list) is semidet
Delete a resource at the Kubernetes API. Options are same as for the predicate k8s_get_resource/6
   55k8s_delete_resource(ApiGroup, Version, ResourceTypeName, Instance, Options) :-
   56    (   is_dict(Instance)
   57    ->  InstanceName = Instance.metadata.name
   58    ;   InstanceName = Instance
   59    ),
   60    context_options(Options, Options1),
   61    resource_uri(ApiGroup, Version, ResourceTypeName, Uri0, Options1),
   62    atomic_list_concat([Uri0, InstanceName], '/', Uri),
   63    api_get(Uri, _, [method(delete) |Options1 ]).
 k8s_get_resource(+ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, -InstanceName:atomic, -Instance:dict, +Options:list) is nondet
k8s_get_resource(+ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +InstanceName:atomic, -Instance:dict, +Options:list) is nondet. Unifies InstanceName - and Instance with the object representing resource of the kubernetes API. If the InstanceName is not bound then all instances are retrieved. ApiGroup is either the valid name of the Kubernetes API Group or the core atom.

The actual cluster address, context, and namespace is provided either options or loaded from the configuration. The below options are supported, in addition to options passed down to the http_open/3 predicate:

   86k8s_get_resource(ApiGroup, Version, ResourceTypeName, InstanceName, Instance, Options) :-
   87    % list retrieval 
   88    var(InstanceName),
   89    var(Instance), 
   90    context_options(Options, Options1),
   91    resource_uri(ApiGroup, Version, ResourceTypeName, Uri, Options1),
   92    api_get(Uri, List, Options1),
   93    member(Instance, List.items),
   94    ignore( InstanceName = Instance.get(metadata).name).
   95 k8s_get_resource(ApiGroup, Version, ResourceTypeName, InstanceName, Instance, Options) :-
   96    % list retrieval 
   97    nonvar(InstanceName),
   98    var(Instance),
   99    context_options(Options, Options1),
  100    resource_uri(ApiGroup, Version, ResourceTypeName, Uri0, Options1),
  101    atomic_list_concat([Uri0, InstanceName], '/', Uri),
  102    api_get(Uri, Instance, Options1).
 k8s_resources(-ResourceTypes:list(dict), +Options) is semidet
Unifies the ResourceTypes with list of resource types available on the cluster. Primary used for resource discovery. `Options˙ are same as for the k8s_get_resource/6 with extra option:
  113k8s_resource_types(ResourceTypes, Options) :-
  114    select_option(k8s_resource_types_mode(Caching), Options, Options0, cache),
  115    Caching \= renew, 
  116    Caching \= remote,
  117    context_options(Options0, Options1),
  118    option(k8s_context(ContextName), Options1),
  119    option(k8s_config(Config), Options1),
  120    config_get_context(Config, ContextName, Context),
  121    ClusterName = Context.cluster,
  122    cluster_resources(ClusterName, ResourceTypes),
  123    !.
  124k8s_resource_types(ResourceTypes, Options) :-
  125    select_option(k8s_resource_types_mode(Caching), Options, Options0, cache),
  126    Caching \= local,
  127    context_options(Options0, Options1),
  128    api_resources_core( Core, Options1),
  129    api_resources_groups( Grouped, Options1),
  130    append(Core, Grouped, ResourceTypes),
  131    % cache results
  132    (   member(Caching, [cache, renew])
  133    ->  option(k8s_context(ContextName), Options1),
  134        option(k8s_config(Config), Options1),
  135        config_get_context(Config, ContextName, Context),
  136        ClusterName = Context.cluster,
  137        retractall(cluster_resources(ClusterName,_)),
  138        asserta(cluster_resources(ClusterName, ResourceTypes))
  139    ;   true
  140    ), 
  141    !.
 k8s_update_resource(+ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instanc:dict, -InstanceOut:dict, +Options:list) is semidet
Updates a resource at the Kubernetes API and unifies the InstanceOut with the server response. Options are same as for the predicate k8s_get_resource/6
  146k8s_update_resource(ApiGroup, Version, ResourceTypeName, Instance, InstanceOut, Options) :-
  147    context_options(Options, Options1),
  148    resource_uri(ApiGroup, Version, ResourceTypeName, Uri0, Options1),
  149    atomic_list_concat([Uri0, Instance.metadata.name], '/', Uri),
  150    api_get(Uri, InstanceOut, [method(put), post(json(Instance)) |Options1 ]).
 k8s_watch_resources(:Callback, +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Options:list) is det
This predicate watches the changes of the resource list specified by the arguments and call `call(:Goal, ChangeType:atom, ResourceInstance:dict)` for each modification in the list. The ChangeType is one of the added, modified, deleted atom. Initial list of instances is provided as sequence of added callbacks after the call of this predicate.

The call is blocking the caller thread.

Options are same as for the k8s_get_resource/6 with extra option:

  165k8s_watch_resources(Callback, ApiGroup, Version, ResourceTypeName, Options) :-
  166    select_option(k8s_resource_version(ResourceVersion), Options, Options1, 0),
  167    watch_resources_loop(Callback, ApiGroup, Version, ResourceTypeName, state(ResourceVersion, []), Options1).
 k8s_watch_resources_async(:Callback, +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, -StopWatcher, +Options:list) is det
Forks the separated backgrond thread in which the predicate 'k8s_watch_resources/5` is execute. THe background thread can be finished and joined by calling call(StopWatcher). Other arguments are same as for the 'k8s_watch_resources/5`. Be aware that the Callback is invoked from the different thread than the thread calling this predicate.
  173k8s_watch_resources_async(Callback, ApiGroup, Version, ResourceTypeName, k8s_client:watcher_exit(Id), Options) :-
  174    thread_create(
  175        k8s_watch_resources(Callback, ApiGroup, Version, ResourceTypeName, [watcher_id(Id), timeout(1) | Options]),
  176        Id, 
  177        [   at_exit(retractall(watcher_status(Id, _)))
  178        ]
  179    ). 
  180
  181%%% PRIVATE PREDICATES %%%%%%%%%%%%%%%%%%%%%%%%%
  182
  183api_get(Url, Reply) :-
  184    api_get(Url, Reply, []).
  185
  186api_get( Url, Reply, Options) :-
  187    config_connection_options( Url, UriComponents, Options, Options1),    
  188    uri_components(Uri, UriComponents),
  189    http_get( Uri, Reply, [ json_object(dict) |Options1]).
  190
  191api_resources_core(Resources, Options) :-
  192    api_get(api, Versions, Options),
  193    foldl(api_resources_core_(Options), Versions.versions, [], Resources).
  194
  195api_resources_core_(Options, Version, In, Out) :-
  196    atomic_list_concat([api, Version], '/', Url),
  197    api_get(Url, Resources, Options),
  198    maplist(put_dict( _{group: core, version: Version}), Resources.resources, Resources1),
  199    append(In, Resources1, Out). 
  200
  201api_resources_groups(Resources, Options) :-
  202    api_get(apis, Groups, Options),
  203    foldl(api_resources_groups_(Options), Groups.groups, [], Resources).
  204
  205api_resources_groups_(Options, Group, In, Resources) :-
  206    Preferred = Group.preferredVersion.version,
  207    api_resources_groups_(Options, Preferred, Group.preferredVersion, In, Resources).
  208
  209api_resources_groups_(Options, Preferred, GroupVersion, In, Out) :-
  210    atomic_list_concat([apis, GroupVersion.groupVersion], '/', Url),
  211    api_get(Url, GroupResources, Options),
  212    ( Preferred = GroupVersion.version
  213    ->  IsPreferred = true
  214    ;   IsPreferred = false
  215    ),
  216    maplist(
  217        put_dict( _{group: groupVersion, version: GroupVersion.version,  isPreferred: IsPreferred }), 
  218        GroupResources.resources, 
  219        Resources1),
  220    append(In, Resources1, Out).
  221
  222atomic_eq( Left, Right) :-
  223    nonvar(Left),
  224    atom_string(LeftA, Left), 
  225    atom_string(LeftA, Right),
  226    !.
  227atomic_eq( Left, Right) :-
  228    nonvar(Right),    
  229    atom_string(RightA, Right), 
  230    atom_string(RightA, Left),
  231    !.
  232
  233base64_certificate(Cert64, Certificate) :-
  234    base64(Cert, Cert64),
  235    atom_codes(Cert, CertCodes),
  236    open_codes_stream(CertCodes, CertStream),
  237    load_certificate(CertStream, Certificate).
  238
  239config_connection_options(ResourceUrl, Server, OptionsIn, OptionsOut) :-
  240    (   select_option(k8s_config(Config), OptionsIn, Options1)
  241    ->  true
  242    ;   load_config(Config),
  243        Options1 = OptionsIn
  244    ), 
  245    (   select_option(k8s_context(ContextName), Options1, Options2)
  246    ->  true
  247    ;   config_current_context(Config, ContextName),
  248        Options2 = Options1
  249    ), 
  250    config_connection_options(ResourceUrl, Config, ContextName, Server, Options2, OptionsOut).
  251
  252config_connection_options(ResourceUrl, Config, ContextName, ServerUriComponents, OptionsIn, OptionsOut) :-
  253    config_get_context(Config, ContextName, Ctx),
  254    atom_string(ClusterName, Ctx.cluster),
  255    config_get_cluster(Config, ClusterName, Cluster ),
  256    config_connection_resource_uri(ResourceUrl, Cluster.server, UriComponents),
  257    config_connections_queries(UriComponents, OptionsIn, ServerUriComponents, Options0),
  258    config_cluster_options(Cluster, Options0, Options1),
  259    config_client_options(Config, Ctx, Options1, OptionsOut),
  260    !. 
  261
  262config_connections_queries(UriCompnentsIn, OptionsIn, UriCompnentsOut, OptionsOut) :-
  263    uri_data(search, UriCompnentsIn, Search),
  264    (   var(Search)
  265    ->  Query0 = []
  266    ;   uri_query_components(Search, Query0)
  267    ),
  268    config_connections_queries_(Query0, Queries, OptionsIn, OptionsOut), 
  269    (   Queries = []
  270    ->  UriCompnentsIn = UriCompnentsOut
  271    ;   uri_query_components(QueriesSegment, Queries),
  272        uri_data(search, UriCompnentsIn, QueriesSegment, UriCompnentsOut)
  273    ).
  274
  275config_connections_queries_(QueriesIn, QueriesOut, OptionsIn, OptionsOut) :-
  276    select_option( k8s_query(Query), OptionsIn, Options),
  277    is_list(Query), 
  278    append(Query, QueriesIn, Queries),
  279    !,
  280    config_connections_queries_( Queries, QueriesOut, Options, OptionsOut).
  281 config_connections_queries_(QueriesIn, QueriesOut, OptionsIn, OptionsOut) :-
  282    select_option( k8s_query(Query), OptionsIn, Options),
  283    !,
  284    config_connections_queries_( [Query | QueriesIn], QueriesOut, Options, OptionsOut).
  285 config_connections_queries_(QueriesIn, QueriesOut, OptionsIn, OptionsOut) :-  
  286    select_option( k8s_resource_version(ResourceVersion), OptionsIn, Options),
  287    ResourceVersion \= 0,
  288    !,
  289    config_connections_queries_( [ resourceVersion = ResourceVersion | QueriesIn], QueriesOut, Options, OptionsOut).
  290 config_connections_queries_(QueriesIn, QueriesOut, OptionsIn, OptionsOut) :-
  291    select_option( k8s_selectors(Selectors), OptionsIn, Options),
  292    (   is_list(Selectors)
  293    ->  atomic_list_concat(Selectors, ',', SelectorsText)
  294    ;   SelectorsText = Selectors
  295    ),
  296    !,
  297    config_connections_queries_( [ labelSelector = SelectorsText | QueriesIn], QueriesOut, Options, OptionsOut).
  298 config_connections_queries_(Queries, Queries, Options, Options).
  299
  300config_connection_resource_uri(ResourceUrl, ServerName, UriComponents) :-
  301    atom_string(Server, ServerName),
  302    uri_components(Server, UriComponents0),
  303    uri_data(path, UriComponents0, Path),
  304    directory_file_path(Path, ResourceUrl, FullPath),
  305    uri_data(path, UriComponents0, FullPath, UriComponents).
  306
  307config_ca_options(Cluster, OptionsIn, [ cacerts([certificate(CaCert)])| OptionsIn ]) :-
  308    base64_certificate( Cluster.get('certificate-authority-data'), CaCert),
  309    !.
  310 config_ca_options(Cluster, OptionsIn, [ cacerts([file(CaCert)])| OptionsIn ]) :-
  311    Cluster.get('certificate-authority') = CaCert,
  312    !.
  313 config_ca_options(_, _, _) :-
  314    print_message(error, kubernetes(unsupported_config, cluster)),
  315    fail.
  316
  317config_client_options(Config, Ctx, OptionsIn, [ certificate_key_pairs([ClientCert-ClientKey]) | OptionsIn]) :-
  318    atom_string(UserName, Ctx.user), 
  319    config_get_user(Config, UserName, User),
  320    base64(ClientCert, User.get('client-certificate-data')),
  321    base64(ClientKey, User.get('client-key-data')),
  322    !.
  323 config_client_options(Config, Ctx, OptionsIn, [ certificate_file(ClientCert), certificate_file(ClientKey) | OptionsIn]) :-
  324    atom_string(UserName, Ctx.user), 
  325    config_get_user(Config, UserName, User),
  326    base64(ClientCert, User.get('client-certificate')),
  327    base64(ClientKey, User.get('client-key')),
  328    !.
  329  config_client_options(Config, Ctx, OptionsIn, [ authorization(bearer(Token)) | OptionsIn]) :-
  330    atom_string(UserName, Ctx.user), 
  331    config_get_user(Config, UserName, User),
  332    atom_string(Token, User.get(token)),
  333    !.
  334  config_client_options(_, _, _, _) :-
  335    print_message(error, kubernetes(unsupported_config, user)),
  336    fail.
  337
  338config_cluster_options(Cluster, OptionsIn, OptionsOut) :-
  339    config_ca_options(Cluster, OptionsIn, Options0),
  340    (   Proxy = Cluster.get(proxy)
  341    ->  Options1 = [proxy(Proxy) | Options0 ]
  342    ;   Options1 = Options0
  343    ),
  344    (   Cluster.get('insecure-skip-tls-verify') = true
  345    ->  OptionsOut = [ cert_verify_hook(cert_accept_any) | Options1]
  346    ;   OptionsOut = Options1).
  347
  348config_current_context(Context) :-
  349    load_config(Cfg),
  350    config_current_context(Cfg, Context).
  351
  352config_current_context(Cfg, Context) :-
  353    atom_string(Context, Cfg.get('current-context')).
  354
  355config_get_cluster(Config, ClusterName, Cluster) :-
  356    member(ClusterDict, Config.clusters),
  357    atom_string(ClusterName, ClusterDict.name),
  358    Cluster = ClusterDict.cluster.
  359 
  360config_get_context(Config, ContextName, Context) :-
  361    member(ContextDict, Config.contexts),
  362    atom_string(ContextName, ContextDict.name),
  363    Context = ContextDict.context.
  364
  365config_get_user(Config, UserName, User) :-
  366    member(UserDict, Config.users),
  367    atom_string(UserName, UserDict.name),
  368    User = UserDict.user.
  369   
  370context_options( OptionsIn, OptionsOut) :-
  371    (   option(k8s_config(Config), OptionsIn)
  372    ->  OptionsIn = Options1
  373    ;   load_config(Config),
  374        Options1 = [ k8s_config(Config) | OptionsIn ]
  375    ), 
  376    (   option(k8s_context(ContextName), Options1)
  377    ->  Options1 = Options2
  378    ;   config_current_context(Config, ContextName),
  379        Options2 = [ k8s_context(ContextName) | Options1 ]
  380    ), 
  381    (   option(k8s_namespace(_), Options2)
  382    ->  Options2 = OptionsOut
  383    ;   config_current_context(Config, ContextName),
  384        config_get_context(Config, ContextName, Context),
  385        dict_get_default(Context, namespace, "default", Namespace),
  386        OptionsOut = [ k8s_namespace(Namespace) | Options2 ]
  387    ),
  388    !.  
  389
  390dict_get_default(Dict, Key, Default, Value) :-
  391    Dict.get(Key) = Value
  392    -> true
  393    ;  Value = Default.
  394
  395is_resource_namespaced( ApiGroup, Version, ResourceTypeName, Options ) :-
  396    (   option(k8s_resource_types_mode(_), Options)
  397    ->  LocalOptions = Options  % respect caching options
  398    ;   LocalOptions = [ k8s_resource_types_mode(local) | Options ] % or avoid loading all resource types side effect
  399    ),
  400    % get list of the resource types
  401    (   k8s_resource_types(ResourceTypes, LocalOptions)
  402    ->  true
  403    ;   (   ApiGroup = core
  404        ->  api_resources_core_(Options, Version, [], ResourceTypes)
  405        ;   atomic_list_concat([ApiGroup, Version], '/', GroupVersion),
  406            api_resources_groups_(Options, Version, _{ version: Version, groupVersion: GroupVersion}, [], ResourceTypes)
  407        )
  408    ),
  409    % check if resource is naespaced 
  410    member( ResourceType, ResourceTypes),
  411    atomic_eq(ResourceTypeName, ResourceType.name),
  412    atomic_eq(ApiGroup, ResourceType.group),
  413    atomic_eq(Version, ResourceType.version),
  414    !,
  415    true = ResourceType.namespaced.
  416
  417load_and_merge_config_file(Path, ConfigIn, ConfigOut) :-
  418    path_to_posix(Path, PathPx),
  419    yaml_read(PathPx, ConfigDict),
  420    ConfigIn1 = _{ 
  421        clusters:[], users: [], contexts: [], 
  422        'current-context': "", preferences: _{}
  423    }.put(ConfigIn),
  424    ConfigDict1 = _{ 
  425        clusters:[], users: [], contexts: [], 
  426        'current-context': ConfigIn1.'current-context', 
  427        preferences: ConfigIn1.preferences
  428        }.put(ConfigDict),
  429    append(ConfigIn1.clusters, ConfigDict1.clusters, Clusters),
  430    append(ConfigIn1.users, ConfigDict1.users, Users),
  431    append(ConfigIn1.contexts, ConfigDict1.contexts, Contexts),
  432    ConfigOut = 
  433        _{
  434            apiVersion: v1,
  435            kind: 'Config', 
  436            preferences: ConfigDict1.preferences,
  437            clusters: Clusters,
  438            users: Users, 
  439            contexts: Contexts, 
  440            'current-context': ConfigDict1.'current-context'
  441        },
  442    !.
  443
  444load_config(ConfigDict) :- % KUBECONFIG variant
  445    getenv('KUBECONFIG', Path),
  446    (   current_prolog_flag(windows, true)
  447    ->  atomic_list_concat(Files, ';', Path)
  448    ;   atomic_list_concat(Files, ':', Path)
  449    ),
  450    foldl(load_and_merge_config_file, Files, _{}, ConfigDict),
  451    print_message(informational, kubernetes(config_loaded, kubeconfig)),
  452    !.
  453 load_config(ConfigDict) :- % ~/.kube/config variant
  454    (   getenv('USERPROFILE', HomePath)
  455    ;   getenv('HOME', HomePath)
  456    ),
  457    path_to_posix(HomePath, HomePathPx),
  458    directory_file_path(HomePathPx, '.kube/config', ConfigPath),
  459    exists_file(ConfigPath),    
  460    yaml_read(ConfigPath, ConfigDict),
  461    print_message(informational, kubernetes(config_loaded, user_config)),
  462
  463    !.
  464
  465 load_config(ConfigDict) :- % access api from pod
  466    exists_file('/var/run/secrets/kubernetes.io/serviceaccount/token'),
  467    exists_file('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'),
  468    exists_file('/var/run/secrets/kubernetes.io/serviceaccount/namespace'),
  469    read_file_to_string('/var/run/secrets/kubernetes.io/serviceaccount/token', Token, []),
  470    read_file_to_string('/var/run/secrets/kubernetes.io/serviceaccount/namespace', Namespace, []),
  471    ConfigDict = _{
  472        apiVersion: v1,
  473            kind: 'Config', 
  474            clusters: [
  475                _{ 
  476                    name: "default-api",
  477                    cluster: _{
  478                        'certificate-authority': "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
  479                        server: "https://kubernetes.default.svc"
  480                    }
  481                }
  482            ],
  483            users: [
  484                _{
  485                    name: "service-account",
  486                    user: _{
  487                        token: Token
  488                    }
  489                }
  490            ],
  491            contexts: [
  492                _{
  493                    name: "from-pod",
  494                    context: _{
  495                        cluster: "default-api",
  496                        user: "service-account",
  497                        namespace: Namespace
  498                    }
  499                }
  500            ], 
  501        'current-context': "from-pod"
  502    },    
  503    print_message(informational, kubernetes(config_loaded, pod)),
  504    !.
  505
  506noop_healtz.
  507
  508path_to_posix(Path, Posix) :-
  509    atomic_list_concat(Segments, '\\', Path),
  510    atomic_list_concat(Segments, '/', Posix).
  511
  512prolog:message(kubernetes(unsupported_config, cluster)) -->
  513    ['Kubernetes: Configuration of the cluster is not supported'].
  514 prolog:message(kubernetes(unsupported_config, user)) -->
  515    ['Kubernetes: Configuration of the user '].
  516 prolog:message(kubernetes(watcher_exited, Resource)) -->
  517    ['Kubernetes: Watching of the resources type `~p` exited '- Resource].
  518 prolog:message(kubernetes(watcher_update_failure, Goal, Id)) -->
  519    ['Kubernetes: Calling the goal ~p from resource controller for resource ~p modification failed'- [Goal, Id]].
  520 prolog:message(kubernetes(config_loaded, kubeconfig)) -->
  521    ['Kubernetes: Configuration taken using the environment variable KUBECONFIG'].
  522 prolog:message(kubernetes(config_loaded, user_config)) -->
  523    ['Kubernetes: Configuration taken the users home directory'].
  524 prolog:message(kubernetes(config_loaded, pod)) -->
  525    ['Kubernetes: Configuration taken from the kubernetes pod service account'].
  526 prolog:message(kubernetes(watch_modification, Change)) -->
  527    { atom_json_dict(Json, Change, [as(atom), width(0)]) },
  528    ['Kubernetes: Modification of resources detected: ~p' - [Json] ].
  529prolog:message(kubernetes(watcher_heartbeat_callback_failure, HeartCallback, Id)) -->
  530    ['Kubernetes: The heartbeat callback `~w` of the watcher thread `~w` failed' - [HeartCallback, Id] ].
  531
  532resource_uri(ApiGroup, Version, ResourceTypeName, Uri, Options) :-
  533    option(k8s_namespace(all), Options, all),
  534    (   ApiGroup = core
  535    ->  atomic_list_concat([api, Version, ResourceTypeName], '/', Uri)
  536    ;   atomic_list_concat([apis, ApiGroup, Version, ResourceTypeName ], '/', Uri)
  537    ),
  538    !.
  539 resource_uri(ApiGroup, Version, ResourceTypeName, Uri, Options) :-
  540    select_option(k8s_namespace(Namespace), Options, Options1),
  541    (   is_resource_namespaced(ApiGroup, Version, ResourceTypeName, Options)
  542    ->  (   ApiGroup = core
  543        ->  atomic_list_concat([api, Version, namespaces, Namespace, ResourceTypeName], '/', Uri)
  544        ;   atomic_list_concat([apis, ApiGroup, Version, namespaces, Namespace, ResourceTypeName], '/', Uri)
  545        )
  546    ;   resource_uri(ApiGroup, Version, ResourceTypeName, Uri, [ k8s_namespace(all) | Options1 ])
  547    ),
  548    !.
  549
  550watch_modification_call(_, Id, _, Version, Version, KnownResources, KnownResources) :-
  551    watcher_status(Id, exit_request), % just exit if exist is requested
  552    !.
  553 watch_modification_call(_, _, error(timeout_error(read, _), _) , State, State) :- !. % just continue listening
  554 watch_modification_call(_, _, error(_, _), State, _) :- % rethrow other errors
  555    throw(error(k8s_watcher_error(connection_broken), State)).
  556 watch_modification_call(_, _, end_of_file, State, _) :- % need reconnect if stream is ended
  557    throw(error(k8s_watcher_error(connection_broken), State)).
  558 watch_modification_call(_, _, Change, state(_, R), state(0, R)) :-
  559    Change.get(type) =  "ERROR" . % if no bookmark was sent and only old resources available then reset version
  560 watch_modification_call(_, _, Change, state(_, R), state(Version, R)) :-
  561    Change.get(type) =  "BOOKMARK",
  562    Version = Change.object.metadata.resourceVersion.
  563  watch_modification_call(Goal, _, Change, state(_, R), state(Version, [Resource | Rest])) :-
  564    memberchk(Change.get(type), [ "ADDED", "MODIFIED"]),    
  565    dict_get_default(Change.object.metadata, namespace, [], ResourceNamespace),
  566    ResourceName =  Change.object.metadata.name,
  567    Version = Change.object.metadata.resourceVersion,
  568    (   select(resource(ResourceNamespace, ResourceName, OldVersion), R, Rest)
  569    ->  (   OldVersion = Version
  570        ->  true
  571        ;   call(Goal, modified, Change.object)
  572        )
  573    ;   call(Goal, added, Change.object),
  574        Rest = R
  575    ),
  576    Resource = resource(ResourceNamespace, ResourceName, Version).
  577 watch_modification_call(Goal, _, Change,  state(_, R), state(Version, Rest)) :-
  578    dict_get_default(Change.object.metadata, namespace, [], ResourceNamespace),
  579    ResourceName =  Change.object.metadata.name,
  580    Version = Change.object.metadata.resourceVersion,
  581    (   select(resource(ResourceNamespace, ResourceName, _), R, Rest)
  582    ->  call(Goal, deleted, Change.object)
  583    ;   Rest = R
  584    ).
  585 watch_modification_call(Goal, Id, _, State, State) :- 
  586    print_message(error, kubernetes(watcher_update_failure,Goal, Id)).
  587
  588
  589watch_resources_loop(_, _, _, ResourceTypeName, state(Version, _),  Options) :-  % special handling to exit the async loop
  590    (   option(watcher_id(Id), Options),
  591        watcher_status(Id, exit_request)
  592    ->  (   print_message(informational, kubernetes(watcher_exited, ResourceTypeName)),
  593            thread_exit(resourceVersion(Version))
  594        )
  595    ),
  596    !.
  597 watch_resources_loop(Callback, ApiGroup, Version, ResourceTypeName, State, Options) :-
  598    context_options([k8s_query(watch=1), k8s_query(allowWatchBookmarks=true) | Options], Options1),
  599    resource_uri(ApiGroup, Version, ResourceTypeName, Url, Options1),
  600    config_connection_options( Url, UriComponents, Options1, Options2),
  601    uri_components(Uri, UriComponents),
  602    http_open( Uri, Stream, Options2),
  603    (   option(watcher_id(Id), Options)
  604    ->  retractall(watcher_status(Id, running(_))),        
  605        asserta(watcher_status(Id, running(Stream)))
  606    ;   Id = []
  607    ),  
  608    (   option(heartbeat_callback(HeartCallback), Options)
  609    ->  true
  610    ;   HeartCallback = noop_healtz
  611    ),
  612      
  613    !,
  614    catch(
  615        watch_stream(Callback, HeartCallback, Stream, Id, State, State1),
  616        error(k8s_watcher_error(connection_broken), State1), 
  617        true
  618    ),
  619    sleep(1), % reduce CPU load in case of persistent connection error
  620    !, % cut here to avoid recursion stack on async loop
  621    watch_resources_loop(Callback, ApiGroup, Version, ResourceTypeName, State1, Options).
  622
  623watch_stream(_, _, _, Id, State, State) :-
  624    watcher_status(Id, exit_request),
  625    !.
  626watch_stream(Goal, HeartCallback, Stream, Id, StateIn, StateOut) :-
  627    (   call(HeartCallback)
  628    ->  true
  629    ;   print_message(error, kubernetes(watcher_heartbeat_callback_failure, HeartCallback, Id))
  630    ),
  631    catch(
  632        (   peek_string(Stream, 4, _),
  633            json_read_dict(Stream, Change, [end_of_file(end_of_file)]),
  634            print_message(informational, kubernetes(watch_modification, Change))
  635        ),
  636        Error,
  637        Change = Error
  638    ),
  639    watch_modification_call(Goal, Id, Change, StateIn, State0),
  640    !,
  641    watch_stream(Goal, HeartCallback, Stream, Id, State0, StateOut),
  642    !.
  643
  644watcher_exit(Id) :-
  645 watcher_exit(Id, _).
  646
  647watcher_exit(Id, Status) :-
  648    (   retract(k8s_client:watcher_status(Id, running(Stream)))
  649    ->  assertz(k8s_client:watcher_status(Id, exit_request)),
  650        retractall(k8s_client:watcher_status(Id, running(_))),
  651        close(Stream)
  652    ;   assertz(k8s_client:watcher_status(Id, exit_request))
  653    ),
  654    thread_join(Id, Status),
  655    ignore(retractall(k8s_client:watcher_status(Id,_)))