source: branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c @ 851

Last change on this file since 851 was 851, checked in by djay, 7 years ago

Invoke callback asynchronously. Still the ZOO-Kernel has still to wait for every requests to finish before stoping its execution.

  • Property svn:keywords set to Id
File size: 15.4 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_callback.h"
30#include "service_json.h"
31#include "service_internal_ms.h"
32#include <pthread.h>
33
34#ifdef __cplusplus
35extern "C" {
36#endif
37
38  int nbThreads=0;
39  int cStep=0;
40  pthread_t* myThreads=NULL;
41  bool steps[7][2]={
42    {false,false},
43    {false,false},
44    {false,false},
45    {false,false},
46    {false,false},
47    {false,false},
48    {false,false}
49  };
50 
51  /**
52   * Check if a service name is prohibited, meaning that the Kernel doesn't have
53   * to invoke the callback for this specific service.
54   *
55   * @param conf the main configuration file maps
56   * @param serviceName the serviceName
57   * @return a bool true if the service is prohibited, false in other case
58   */
59  bool isProhibited(maps* conf,const char* serviceName){
60    map* plist=getMapFromMaps(conf,"callback","prohibited");
61    if(plist!=NULL){
62      char *tmp=plist->value;
63      char *tmpS=strtok(tmp,",");
64      while(tmpS!=NULL){
65        if(strcmp(serviceName,tmpS)==0)
66          return true;
67        tmpS=strtok(NULL,",");
68      }
69    }
70    return false;
71  }
72
73  /**
74   * Parameter definition to be used for sending parameters to a thread.
75   */
76  typedef struct {
77    maps *conf;      //!< the main configuration file
78    map *url;        //!< the callback url maps
79    json_object *res;//!< the JSON object to post
80    int step;        //!< the current step [0,6]
81    int state;       //!< the current state [0,1]
82  } local_params;
83
84  /**
85   * Verify if the URL should use a shared cache or not.
86   *
87   * In case the security section contains a key named "shared", then if the
88   * domain listed in the shared key are contained in the url given as parameter
89   * then it return "SHARED" in other cases, it returns "OTHER".
90   *
91   * @param conf the main configuration file maps
92   * @param url the URL to evaluate
93   * @return a string "SHARED" in case the host is in a domain listed in the
94   * shared key, "OTHER" in other cases.
95   */
96  char* getProvenance(maps* conf,const char* url){
97    map* sharedCache=getMapFromMaps(conf,"security","shared");
98    if(sharedCache!=NULL){
99      char *res=NULL;
100      char *hosts=sharedCache->value;
101      char *curs=strtok(hosts,",");
102      while(curs!=NULL){
103        if(strstr(url,curs)==NULL)
104          res="OTHER";
105        else{
106          res="SHARED";
107          return res;
108        }
109      }
110      return res;
111    }
112    return "OTHER";
113  }
114
115  /**
116   * Practically invoke the callback, meaning sending the HTTP POST request.
117   *
118   * @param args local_params containing all the variables required
119   */
120  void* _invokeCallback(void* args){
121    local_params* arg=(local_params*)args;
122    HINTERNET hInternet,res1;
123    hInternet=InternetOpen("ZooWPSClient\0",
124                           INTERNET_OPEN_TYPE_PRECONFIG,
125                           NULL,NULL, 0);
126    if(!CHECK_INET_HANDLE(hInternet)){
127      InternetCloseHandle (&hInternet);
128      return false;
129    }
130    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
131    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
132    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
133    hInternet.waitingRequests[0] = zStrdup(URL);
134    free(URL);
135    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
136    fprintf(stderr," * JSON: [%s] \n",jsonStr);
137    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
138    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
139    while( cStep!=7 &&
140           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
141           ){
142      sleep(1);
143    }
144    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
145    int i=0;
146    for(i=0;i<7;i++){
147      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
148      fflush(stderr);
149    }
150    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n",pthread_self(),__FILE__,__LINE__);
151    fflush(stderr);
152    res1 = InternetOpenUrl (&hInternet,
153                            hInternet.waitingRequests[0], 
154                            (char*)jsonStr, strlen(jsonStr),
155                            INTERNET_FLAG_NO_CACHE_WRITE,
156                            0);
157    AddHeaderEntries(&hInternet,arg->conf);
158    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
159    processDownloads(&hInternet);
160    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END\n\n",pthread_self(),__FILE__,__LINE__);
161    fflush(stderr);
162    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
163                                 * sizeof (char));
164    if (tmp == NULL)
165      {
166        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
167        setMapInMaps(arg->conf,"lenv","code","InternalError");
168        return NULL;
169      }
170    size_t bRead;
171    InternetReadFile (hInternet.ihandle[0],
172                      (LPVOID) tmp,
173                      hInternet.
174                      ihandle[0].nDataLen,
175                      &bRead);
176    tmp[hInternet.ihandle[0].nDataLen] = 0;
177    json_object_put(arg->res);
178    InternetCloseHandle(&hInternet);
179    if(cStep==0 || cStep==6 || arg->state==1)
180      cStep=arg->step+1;
181    steps[arg->step][arg->state]=true;
182    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT\n",pthread_self(),__FILE__,__LINE__);
183    for(i=0;i<7;i++){
184      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
185    }
186    fprintf(stderr,"Result: \n%s\n\n",tmp);
187    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
188    fflush(stderr);
189    free(tmp);
190    free(args);
191    pthread_exit(NULL);
192  }
193 
194  /**
195   * Invoke the callback in case there is a [callback] section containing a url parameter
196   *
197   * @param m the maps containing the main configuration file definitions
198   * @param inputs the inputs defined in the request (can be null if not yet initialized)
199   * @param inputs the outputs provided in the request (can be null if not yet initialized)
200   * @param step the step number, steps are defined as:
201   *  0: Analyze creation
202   *  1: Fetching Data Inputs
203   *  2: Uploading data inputs to cluster
204   *  3: Creating Job Script
205   *  4: Submitting Job to Cluster
206   *  5: Downloading processed output from cluster
207   *  6: Finalize
208   *  7: Dismiss or Error
209   * @param state 0 in case the step starts, 1 when it ends
210   * @return bool true in case of success, false in other cases
211   */
212  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
213    map* url=getMapFromMaps(conf,"callback","url");
214    if(url==NULL)
215      return false;
216     
217    maps* lenv=getMaps(conf,"lenv");
218    map* sname=getMap(lenv->content,"identifier");
219    if(sname!=NULL && isProhibited(conf,sname->value))
220      return false;
221     
222    json_object *res=json_object_new_object();
223
224    map* sid=getMapFromMaps(conf,"lenv","usid");
225    if(sid!=NULL){
226      json_object *jsStr=json_object_new_string(sid->value);
227      json_object_object_add(res,"jobid",jsStr);
228    }
229    const struct tm *tm;
230    size_t len;
231    time_t now;
232    char *tmp1;
233    map *tmpStatus;
234 
235    now = time ( NULL );
236    tm = localtime ( &now );
237
238    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
239
240    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
241    json_object *jsStr0=json_object_new_string(tmp1);
242    json_object_object_add(res,"datetime",jsStr0);
243
244    switch(step){
245    case 0: {
246      // Create a new analyze
247      maps* lenv=getMaps(conf,"lenv");
248      sid=getMapFromMaps(conf,"lenv","xrequest");
249      if(sid!=NULL){
250        json_object *jsStr=json_object_new_string(sid->value);
251        json_object_object_add(res,"request_execute_content",jsStr);
252      }
253      sid=getMapFromMaps(conf,"lenv","identifier");
254      if(sid!=NULL){
255        json_object *jsStr=json_object_new_string(sid->value);
256        json_object_object_add(res,"process_identifier",jsStr);
257      }
258      break;
259    }
260    case 1: {
261      // Fetching data inputs
262      maps* curs=inputs;
263      char *keys[8][2]={
264        {
265          "href",
266          "ref"
267        },
268        {
269          "cache_file",
270          "cachefile"
271        },
272        {
273          "fmimeType",
274          "mimetype"
275        },
276        {
277          "size",
278          "size"
279        },
280        {
281          "ref_wms_link",
282          "ref_wms_link"
283        },
284        {
285          "ref_wcs_link",
286          "ref_wcs_link"
287        },
288        {
289          "ref_wfs_link",
290          "ref_wfs_link"
291        },
292        {
293          "geodatatype",
294          "datatype"
295        }       
296      };
297      json_object *res1=json_object_new_object();
298      while(curs!=NULL){
299        map* tmpMap=getMap(curs->content,"cache_file");
300        sid=getMap(curs->content,"ref_wms_link");
301        json_object *res2=json_object_new_object();
302        if(tmpMap!=NULL && sid==NULL){
303          addToMap(curs->content,"generated_file",tmpMap->value);
304          struct stat buf;
305          char timeStr[ 100 ] = "";
306          if (stat(tmpMap->value, &buf)==0){
307            strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
308            json_object *jsStr=json_object_new_string(timeStr);
309            json_object_object_add(res2,"creation_date",jsStr);
310          }
311          tmpMap=getMap(curs->content,"fmimeType");
312          if(tmpMap!=NULL){
313            addToMap(curs->content,"mimeType",tmpMap->value);
314          }
315          setReferenceUrl(conf,curs);
316          //outputMapfile(conf,curs);
317          dumpMaps(curs);
318        }
319        int i=0;
320        int hasRef=-1;
321        for(;i<8;i++){
322          sid=getMap(curs->content,keys[i][0]);
323          if(sid!=NULL){
324            json_object *jsStr=json_object_new_string(sid->value);
325            json_object_object_add(res2,keys[i][1],jsStr);
326            if(i==0){
327              hasRef=1;
328              json_object *jsStr1=json_object_new_string(getProvenance(conf,url->value));
329              json_object_object_add(res2,"dataOrigin",jsStr1);
330            }
331          }
332        }
333        if(hasRef<0)
334          json_object_put(res2);
335        else
336          json_object_object_add(res1,curs->name,res2);
337        curs=curs->next;
338      }
339      json_object_object_add(res,"inputs",res1);
340      break;
341    }
342    case 2: {
343      // Uploading data input to cluster
344      maps* in=getMaps(conf,"uploadQueue");
345      if(in!=NULL){
346        maps* curs=in;
347        map* length=getMapFromMaps(in,"uploadQueue","length");
348        if(length!=NULL){
349          json_object *res1=json_object_new_object();
350          json_object *res2=json_object_new_object();
351          int limit=atoi(length->value);
352          int i=0;
353          maps* uploadQueue=getMaps(in,"uploadQueue");
354          map* tmp=uploadQueue->content;
355          for(;i<limit;i++){
356            map* tmp0=getMapArray(tmp,"input",i);
357            map* tmp1=getMapArray(tmp,"localPath",i);
358            map* tmp2=getMapArray(tmp,"targetPath",i);
359            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
360              json_object *jsStr=json_object_new_string(tmp1->value);
361              json_object_object_add(res2,"local_path",jsStr);
362              jsStr=json_object_new_string(tmp2->value);
363              json_object_object_add(res2,"target_path",jsStr);
364              json_object_object_add(res1,tmp0->value,res2);
365            }
366          }
367          json_object_object_add(res,"inputs",res1);
368        }
369        //json_object_object_add(res,"inputs",in);
370      }
371      break;
372    }
373    case 3: {
374      // Generating job script
375      sid=getMapFromMaps(conf,"lenv","local_script");
376      if(sid!=NULL){
377        json_object *jsStr=json_object_new_string(sid->value);
378        json_object_object_add(res,"script",jsStr);
379      }
380      break;
381    }
382    case 4: {
383      // Submitting job to cluster
384      sid=getMapFromMaps(conf,"lenv","remote_script");
385      if(sid!=NULL){
386        json_object *jsStr=json_object_new_string(sid->value);
387        json_object_object_add(res,"script",jsStr);
388      }
389      break;
390    }
391    case 5: {
392      // Downloading process outputs from cluster
393      //json_object* in=mapsToJson(outputs);
394      dumpMaps(outputs);
395      maps* curs=outputs;
396      char *keys[8][2]={
397        {
398          "Reference",
399          "ref"
400        },
401        {
402          "storage",
403          "cachefile"
404        },
405        {
406          "fmimeType",
407          "mimetype"
408        },
409        {
410          "size",
411          "size"
412        },
413        {
414          "ref_wms_link",
415          "ref_wms_link"
416        },
417        {
418          "ref_wcs_link",
419          "ref_wcs_link"
420        },
421        {
422          "ref_wfs_link",
423          "ref_wfs_link"
424        },
425        {
426          "geodatatype",
427          "datatype"
428        }       
429      };
430      json_object *res1=json_object_new_object();
431      while(curs!=NULL){       
432        map* tmpMap=getMap(curs->content,"cache_file");
433        sid=getMap(curs->content,"ref_wms_link");
434        json_object *res2=json_object_new_object();
435        int i=0;
436        int hasRef=-1;
437        for(;i<8;i++){
438          sid=getMap(curs->content,keys[i][0]);
439          if(sid!=NULL){
440            json_object *jsStr=json_object_new_string(sid->value);
441            json_object_object_add(res2,keys[i][1],jsStr);
442            if(i==0)
443              hasRef=1;
444          }
445        }
446        if(hasRef>0)
447          json_object_object_add(res1,curs->name,res2);
448        else{
449          maps* curs0=curs->child;
450          while(curs0!=NULL){
451            json_object *res3=json_object_new_object();
452            int i0=0;
453            int hasRef0=-1;
454            for(;i0<8;i0++){
455              sid=getMap(curs0->content,keys[i0][0]);
456              if(sid!=NULL){
457                json_object *jsStr=json_object_new_string(sid->value);
458                json_object_object_add(res3,keys[i0][1],jsStr);
459                //if(i0==0)
460                hasRef0=1;
461              }
462            }
463            if(hasRef0<0)
464              json_object_put(res3);
465            else
466              json_object_object_add(res2,curs0->name,res3);
467            curs0=curs0->next;
468          }       
469          json_object_object_add(res1,curs->name,res2);
470        }
471        curs=curs->next;
472      }
473      json_object_object_add(res,"outputs",res1);
474      break;
475    }
476    case 6: {
477      // Finalize HPC
478      sid=getMapFromMaps(conf,"lenv","local_script");
479      if(sid!=NULL){
480        json_object *jsStr=json_object_new_string(sid->value);
481        json_object_object_add(res,"inputs",jsStr);
482      }
483      break;
484    }
485    case 7: {
486      // Error or Dismiss
487      sid=getMapFromMaps(conf,"lenv","message");
488      if(sid!=NULL){
489        json_object *jsStr=json_object_new_string(sid->value);
490        json_object_object_add(res,"message",jsStr);
491      }
492      json_object *jsStr=json_object_new_string("failed");
493      json_object_object_add(res,"wps_status",jsStr);
494      break;
495    }
496    others: {
497        break;
498      }
499    }
500   
501    local_params* argumentsA=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int)));
502    argumentsA->conf=conf;
503    argumentsA->url=url;
504    argumentsA->res=res;
505    argumentsA->step=step;
506    argumentsA->state=state;
507    //pthread_t p1;
508    if(myThreads==NULL)
509      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
510    else
511      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
512    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)argumentsA)==-1){
513      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
514      return false;
515    }
516    //free(argumentsA);
517    nbThreads++;
518    return true;
519  }
520
521  void cleanupCallbackThreads(){
522    int i=0;
523    for(i=0;i<nbThreads;i++){
524      pthread_join(myThreads[i],NULL);
525    }
526    free(myThreads);
527  }
528
529#ifdef __cplusplus
530}
531#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png