source: trunk/zoo-project/zoo-kernel/service_callback.c @ 962

Last change on this file since 962 was 962, checked in by djay, 14 months ago

Update OGC API - Processes documentation and implementation, providing a browsable User Interface to Processes.

  • Property svn:keywords set to Id
File size: 31.4 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017-2019 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_internal_ms.h"
30#include <pthread.h>
31#include <libxml/tree.h>
32#include <libxml/parser.h>
33#include <libxml/xpath.h>
34#include <libxml/xpathInternals.h>
35
36#include <libxslt/xslt.h>
37#include <libxslt/xsltInternals.h>
38#include <libxslt/transform.h>
39#include <libxslt/xsltutils.h>
40
41#include "service_callback.h"
42#include "service_json.h"
43#include "sqlapi.h"
44#include <ulinet.h>
45
46
47#ifdef __cplusplus
48extern "C" {
49#endif
50
51  /**
52   * Parameter definition to be used for sending parameters to a thread.
53   */
54  typedef struct {
55    maps *conf;      //!< the main configuration file
56    map *url;        //!< the callback url maps
57    json_object *res;//!< the JSON object to post
58    int step;        //!< the current step [0,6]
59    int state;       //!< the current state [0,1]
60  } local_params;
61
62  /**
63   * Number of threads
64   */
65  int nbThreads=0;
66  /**
67   * Current step
68   */
69  int cStep=0;
70  /**
71   * Maximum value of PercentCompleted
72   */
73  int maxProgress=0;
74  /**
75   * Is there any ongoing HTTP request
76   */
77  int isOngoing=0;
78  /**
79   * Threads array
80   */
81  pthread_t* myThreads=NULL;
82  /**
83   * Steps array
84   */
85  bool steps[7][2]={
86    {false,false},
87    {false,false},
88    {false,false},
89    {false,false},
90    {false,false},
91    {false,false},
92    {false,false}
93  };
94  /**
95   * Arguments array to give to the _invokeCallback thread's function
96   */
97  local_params** local_arguments;
98 
99  /**
100   * Check if a service name is prohibited, meaning that the Kernel doesn't have
101   * to invoke the callback for this specific service.
102   *
103   * @param conf the main configuration file maps
104   * @param serviceName the serviceName
105   * @return a bool true if the service is prohibited, false in other case
106   */
107  bool isProhibited(maps* conf,const char* serviceName){
108    map* plist=getMapFromMaps(conf,"callback","prohibited");
109    if(plist!=NULL){
110      char *tmp=plist->value;
111      char *tmpS=strtok(tmp,",");
112      while(tmpS!=NULL){
113        if(strcmp(serviceName,tmpS)==0)
114          return true;
115        tmpS=strtok(NULL,",");
116      }
117    }
118    return false;
119  }
120  /**
121   * Practically invoke the callback, meaning sending the HTTP POST request.
122   *
123   * @param args local_params containing all the variables required
124   */
125  void* _invokeBasicCallback(void* args){
126#ifdef CALLBACK_DEBUG
127    fprintf(stderr,"************************* From thread %d %s %d: REQUEST CONFIGURE (%s)\n",pthread_self(),__FILE__,__LINE__,arg->url->value);
128    fflush(stderr);
129#endif
130    local_params* arg=(local_params*)args;
131    if(arg->state<cStep){
132#ifdef CALLBACK_DEBUG
133      fprintf(stderr,"************************* From thread %d %s %d: REQUEST CANCELLED (%s) EXIT!\n",pthread_self(),__FILE__,__LINE__,arg->url->value);
134      fflush(stderr);
135#endif
136      freeMaps(&arg->conf);
137      free(arg->conf);
138      freeMap(&arg->url);
139      free(arg->url);
140      pthread_exit(NULL);
141      return NULL;
142    }
143    HINTERNET hInternet,res1;
144    const struct tm *tm;
145    size_t len;
146    char *tmp1;
147    map *tmpStatus;
148    map* pmTmp=getMapFromMaps(arg->conf,"lenv","status");
149    hInternet=InternetOpen("ZooWPSClient\0",
150                           INTERNET_OPEN_TYPE_PRECONFIG,
151                           NULL,NULL, 0);
152    if(!CHECK_INET_HANDLE(hInternet)){
153      InternetCloseHandle (&hInternet);
154      return NULL;
155    }
156    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
157    while( arg->state != SERVICE_SUCCEEDED && arg->state != SERVICE_FAILED && isOngoing>0 ){
158      zSleep(100);
159    }
160    if(arg->state==SERVICE_STARTED && pmTmp!=NULL){
161      if(maxProgress<=atoi(pmTmp->value)){
162        maxProgress=atoi(pmTmp->value);
163      }else{
164#ifdef CALLBACK_DEBUG
165        fprintf(stderr,"************************* From thread %d %s %d: REQUEST CANCELLED (%s) EXIT!\n",pthread_self(),__FILE__,__LINE__,arg->url->value);
166        fflush(stderr);
167#endif
168        freeMaps(&arg->conf);
169        free(arg->conf);
170        freeMap(&arg->url);
171        free(arg->url);
172        pthread_exit(NULL);
173        return NULL;
174      }
175    }else
176      maxProgress=101;
177    isOngoing=1;
178    maps* tmpConf=createMaps("main");
179    tmpConf->content=createMap("memory","load");
180
181    hInternet.waitingRequests[0] = zStrdup(arg->url->value);
182    res1 = InternetOpenUrl (&hInternet,
183                            hInternet.waitingRequests[0], 
184                            (char*)jsonStr, strlen(jsonStr),
185                            INTERNET_FLAG_NO_CACHE_WRITE,
186                            0,tmpConf);
187    AddHeaderEntries(&hInternet,arg->conf);
188    AddMissingHeaderEntry(&hInternet.ihandle[hInternet.nb-1],"Content-Type","application/json");
189#ifdef CALLBACK_DEBUG
190    curl_easy_setopt(hInternet.ihandle[hInternet.nb-1].handle, CURLOPT_VERBOSE, 1);
191#endif
192    if(hInternet.ihandle[hInternet.nb-1].header!=NULL)
193      curl_easy_setopt(hInternet.ihandle[hInternet.nb-1].handle,CURLOPT_HTTPHEADER,hInternet.ihandle[hInternet.nb-1].header);
194    processDownloads(&hInternet);
195    freeMaps(&tmpConf);
196    free(tmpConf);
197#ifdef CALLBACK_DEBUG
198    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
199                                 * sizeof (char));
200    if (tmp == NULL)
201      {
202        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
203        setMapInMaps(arg->conf,"lenv","code","InternalError");
204        return NULL;
205      }
206    size_t bRead;
207    InternetReadFile (hInternet.ihandle[0],
208                      (LPVOID) tmp,
209                      hInternet.
210                      ihandle[0].nDataLen,
211                      &bRead);
212    tmp[hInternet.ihandle[0].nDataLen] = 0;
213    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END \n%s",pthread_self(),__FILE__,__LINE__,tmp);
214    fflush(stderr);
215    free(tmp);
216#endif
217    json_object_put(arg->res);
218    InternetCloseHandle(&hInternet);
219    isOngoing=0;
220    freeMaps(&arg->conf);
221    free(arg->conf);
222    freeMap(&arg->url);
223    if(arg->url!=NULL)
224      free(arg->url);
225    pthread_exit(NULL);
226  }
227
228  /**
229   * Invoke the callback in case there is a [subscriber] section containing one
230   * or more url parameter.
231   *
232   * @param conf the maps containing the main configuration file definitions
233   * @param state the service state SERVICE_SUCCEEDED / STARTED / FAILED
234   * @return bool true in case of success, false in other cases
235   */
236  bool invokeBasicCallback(maps* conf,int state){
237    map* url=getMapFromMaps(conf,"subscriber","inProgressUri");
238    if(state==SERVICE_SUCCEEDED)
239      url=getMapFromMaps(conf,"subscriber","successUri");
240    else
241      if(state==SERVICE_FAILED)
242        url=getMapFromMaps(conf,"subscriber","failedUri");
243    if(url==NULL)
244      return false;
245    map* url0=createMap("url",url->value);
246    map* sname=getMapFromMaps(conf,"lenv","identifier");
247    if(sname!=NULL && isProhibited(conf,sname->value))
248      return false;
249    if(state<cStep)
250      return true;
251    if(cStep!=state || isOngoing==0){
252      json_object *res=NULL;
253      if(state==SERVICE_SUCCEEDED || state==SERVICE_FAILED){
254        maps* pmsTmp=getMaps(conf,"lenv");
255        setMapInMaps(conf,"lenv","no-write","true");
256        map* pmTmp=getMapFromMaps(conf,"lenv","usid");
257        if(pmTmp!=NULL){
258          map* pmResponse=getMapFromMaps(conf,"lenv","jsonStr");
259          res=parseJson(conf,pmResponse->value);
260        }
261      }else
262        res=createStatus(conf,state);
263      if(local_arguments==NULL)
264        local_arguments=(local_params**)malloc(sizeof(local_params*));
265      else
266        local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
267      local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int)));       
268      local_arguments[nbThreads]->conf=dupMaps(&conf);
269      local_arguments[nbThreads]->url=url0;
270      local_arguments[nbThreads]->res=res;
271      local_arguments[nbThreads]->step=0;
272      local_arguments[nbThreads]->state=state;
273      cStep=state;
274      if(myThreads==NULL)
275        myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
276      else
277        myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
278      if(pthread_create(&myThreads[nbThreads], NULL, _invokeBasicCallback, (void*)local_arguments[nbThreads])==-1){
279        setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
280        return false;
281      }
282      nbThreads++;
283    }
284    return true;
285  }
286 
287  /**
288   * Practically invoke the callback, meaning sending the HTTP POST request.
289   *
290   * @param args local_params containing all the variables required
291   */
292  void* _invokeCallback(void* args){
293    local_params* arg=(local_params*)args;
294    HINTERNET hInternet,res1;
295    const struct tm *tm;
296    size_t len;
297    time_t now;
298    char *tmp1;
299    map *tmpStatus;
300    maps* tmpConf=createMaps("main");
301    tmpConf->content=createMap("memory","load");
302    hInternet=InternetOpen("ZooWPSClient\0",
303                           INTERNET_OPEN_TYPE_PRECONFIG,
304                           NULL,NULL, 0);
305    if(!CHECK_INET_HANDLE(hInternet)){
306      InternetCloseHandle (&hInternet);
307      return NULL;
308    }
309    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
310    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
311    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
312    hInternet.waitingRequests[0] = zStrdup(URL);
313    free(URL);
314#ifdef CALLBACK_DEBUG
315    now = time ( NULL );
316    tm = localtime ( &now );
317    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
318    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
319    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
320    fprintf(stderr," * JSON: [%s] \n",jsonStr);
321    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
322    fprintf(stderr," * DATE: %s/ \n\n",tmp1);
323    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
324    free(tmp1);
325#endif
326    while( (arg->step!=7 || isOngoing>0) &&
327           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
328           ){
329      zSleep(100);
330    }
331    isOngoing=1;
332#ifdef CALLBACK_DEBUG
333    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
334    int i=0;
335    for(i=0;i<7;i++){
336      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
337    }
338#endif
339   
340    now = time ( NULL );
341    tm = localtime ( &now );
342   
343    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
344    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
345
346#ifdef CALLBACK_DEBUG   
347    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
348    fflush(stderr);
349#endif   
350    free(tmp1);
351    res1 = InternetOpenUrl (&hInternet,
352                            hInternet.waitingRequests[0], 
353                            (char*)jsonStr, strlen(jsonStr),
354                            INTERNET_FLAG_NO_CACHE_WRITE,
355                            0,tmpConf);
356    AddHeaderEntries(&hInternet,arg->conf);
357    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
358    processDownloads(&hInternet);
359    freeMaps(&tmpConf);
360    free(tmpConf);
361    now = time ( NULL );
362    tm = localtime ( &now );
363    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
364    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
365   
366#ifdef CALLBACK_DEBUG   
367    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
368#endif
369    free(tmp1);
370    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
371                                 * sizeof (char));
372    if (tmp == NULL)
373      {
374        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
375        setMapInMaps(arg->conf,"lenv","code","InternalError");
376        return NULL;
377      }
378    size_t bRead;
379    InternetReadFile (hInternet.ihandle[0],
380                      (LPVOID) tmp,
381                      hInternet.
382                      ihandle[0].nDataLen,
383                      &bRead);
384    tmp[hInternet.ihandle[0].nDataLen] = 0;
385    json_object_put(arg->res);
386    InternetCloseHandle(&hInternet);
387    isOngoing=0;
388    if(cStep==0 || cStep==6 || arg->state==1)
389      cStep=arg->step+1;
390#ifdef CALLBACK_DEBUG
391    now = time ( NULL );
392    tm = localtime ( &now );
393    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
394    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
395    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
396    for(i=0;i<7;i++){
397      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
398    }
399    fprintf(stderr,"Result: \n%s\n\n",tmp);
400    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
401    fflush(stderr);
402    free(tmp1);
403#endif
404    steps[arg->step][arg->state]=true;
405    free(tmp);
406#ifdef CALLBACK_DEBUG
407    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
408    fflush(stderr);
409#endif
410    pthread_exit(NULL);
411  }
412 
413  /**
414   * Invoke the callback in case there is a [callback] section containing a url parameter
415   *
416   * @param m the maps containing the main configuration file definitions
417   * @param inputs the inputs defined in the request (can be null if not yet initialized)
418   * @param inputs the outputs provided in the request (can be null if not yet initialized)
419   * @param step the step number, steps are defined as:
420   *  0: Analyze creation
421   *  1: Fetching Data Inputs
422   *  2: Uploading data inputs to cluster
423   *  3: Creating Job Script
424   *  4: Submitting Job to Cluster
425   *  5: Downloading processed output from cluster
426   *  6: Finalize
427   *  7: Dismiss or Error
428   * @param state 0 in case the step starts, 1 when it ends
429   * @return bool true in case of success, false in other cases
430   */
431  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
432    map* url=getMapFromMaps(conf,"callback","url");
433    if(url==NULL)
434      return false;
435     
436    maps* lenv=getMaps(conf,"lenv");
437    map* sname=getMap(lenv->content,"identifier");
438    if(sname!=NULL && isProhibited(conf,sname->value))
439      return false;
440     
441    json_object *res=json_object_new_object();
442
443    map* sid=getMapFromMaps(conf,"lenv","usid");
444    if(sid!=NULL){
445      json_object *jsStr=json_object_new_string(sid->value);
446      json_object_object_add(res,"jobid",jsStr);
447    }
448    const struct tm *tm;
449    size_t len;
450    time_t now;
451    char *tmp1;
452    map *tmpStatus;
453 
454    now = time ( NULL );
455    tm = localtime ( &now );
456
457    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
458    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
459    json_object *jsStr0=json_object_new_string(tmp1);
460    json_object_object_add(res,"datetime",jsStr0);
461    free(tmp1);
462   
463    switch(step){
464    case 0: {
465      // Create a new analyze
466      maps* lenv=getMaps(conf,"lenv");
467      sid=getMapFromMaps(conf,"renv","xrequest");
468      if(sid!=NULL){
469        json_object *jsStr=json_object_new_string(sid->value);
470        json_object_object_add(res,"request_execute_content",jsStr);
471      }
472      sid=getMapFromMaps(conf,"lenv","identifier");
473      if(sid!=NULL){
474        json_object *jsStr=json_object_new_string(sid->value);
475        json_object_object_add(res,"process_identifier",jsStr);
476      }
477      // Save the Execute request on disk
478      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
479      map* req=getMapFromMaps(conf,"renv","xrequest");
480      sid=getMapFromMaps(conf,"lenv","usid");
481      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
482      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
483      FILE* saveExecute=fopen(executePath,"wb");
484      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
485      fflush(saveExecute);
486      fclose(saveExecute);
487      setMapInMaps(conf,"lenv","execute_file",executePath);
488      free(executePath);
489      break;
490    }
491     
492    case 1: {
493      // Update the execute request stored on disk at step 0,0 to modify the references used.
494      if(state==1){
495        maps* curs=inputs;
496        xmlInitParser();
497        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
498        while(curs!=NULL){
499          map* length=getMap(curs->content,"length");
500          map* useMS=getMap(curs->content,"useMapserver");
501          if(length==NULL){
502            addToMap(curs->content,"length","1");
503            length=getMap(curs->content,"length");
504          }
505          int len=atoi(length->value);
506          for(int ii=0;ii<len;ii++){
507            if(getMapArray(curs->content,"byValue",ii)!=NULL && getMapArray(curs->content,"mimeType",ii)!=NULL && useMS!=NULL && strncasecmp(useMS->value,"true",4)==0){
508              map* tmpMap=getMapArray(curs->content,"value",ii);
509              char tmpStr[100];
510              sprintf(tmpStr,"%ld",strlen(tmpMap->value));
511              setMapArray(curs->content,"size",ii,tmpStr);
512              tmpMap=getMapArray(curs->content,"mimeType",ii);
513              setMapArray(curs->content,"fmimeType",ii,tmpMap->value);
514              tmpMap=getMapArray(curs->content,"cache_file",ii);
515              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
516              setMapArray(curs->content,"storage",ii,tmpMap->value);
517              setReferenceUrl(conf,curs);
518              addIntToMap(curs->content,"published_id",ii+1);
519              const char *params[7];
520              int xmlLoadExtDtdDefaultValue;
521              int hasFile=-1;
522              map* xslPath=getMapFromMaps(conf,"callback","template");
523              map* filePath=getMapArray(curs->content,"ref_wfs_link",ii);
524              if(filePath==NULL)
525                filePath=getMap(curs->content,"ref_wcs_link");
526              char* inputName=curs->name;
527              if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
528                break;
529              char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
530              char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
531              char tmpParam2[16];
532              sprintf(tmpParam2,"string(\"%d\")",ii);
533              setMapArray(curs->content,"href",ii,filePath->value);
534              setMapArray(curs->content,"xlink:href",ii,filePath->value);
535              tmpMap=getMapArray(curs->content,"cache_url",ii);
536              if(tmpMap!=NULL)
537                setMapArray(curs->content,"xlink:href",ii,tmpMap->value);
538              else
539                setMapArray(curs->content,"xlink:href",ii,filePath->value);
540              sprintf(tmpParam,"string(\"%s\")",curs->name);
541              sprintf(tmpParam1,"string(\"%s\")",filePath->value);
542              sprintf(tmpParam2,"string(\"%d\")",ii);
543              params[0]="attr";
544              params[1]=tmpParam;
545              params[2]="value";
546              params[3]=tmpParam1;//filePath->value;
547              params[4]="cnt";
548              params[5]=tmpParam2;
549              params[6]=NULL;
550              fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s INDEX: %s\n",
551                      tmpParam,tmpParam1,tmpParam2);
552              fflush(stderr);
553              xmlSubstituteEntitiesDefault(1);
554              xmlLoadExtDtdDefaultValue = 0;
555              xsltStylesheetPtr cur = NULL;
556              xmlDocPtr doc, res;
557              cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
558              doc = xmlParseFile(xmlPath->value);
559              fflush(stderr);
560              res = xsltApplyStylesheet(cur, doc, params);
561              xmlChar *xmlbuff;
562              int buffersize;
563              xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
564              // Store the executeRequest in file again
565              free(tmpParam);
566              free(tmpParam1);
567              fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
568              fflush(stderr);
569              FILE* saveExecute=fopen(xmlPath->value,"wb");
570              if(saveExecute!=NULL){
571                fwrite(xmlbuff,1,buffersize,saveExecute);
572                fflush(saveExecute);
573                fclose(saveExecute);
574              }
575              xmlFree(xmlbuff);
576              xmlFreeDoc(doc);
577              xsltFreeStylesheet(cur);
578            }
579          }
580          addIntToMap(curs->content,"published_id",0);
581          curs=curs->next;
582        }
583        xmlCleanupParser();
584        FILE* f0=fopen(xmlPath->value,"rb");
585        if(f0!=NULL){
586          long flen;
587          char *fcontent;
588          fseek (f0, 0, SEEK_END);
589          flen = ftell (f0);
590          fseek (f0, 0, SEEK_SET);
591          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
592          fread(fcontent,flen,1,f0);
593          fcontent[flen]=0;
594          fclose(f0);
595          map *schema=getMapFromMaps(conf,"database","schema");
596          map* sid=getMapFromMaps(conf,"lenv","usid");
597          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
598          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
599#ifdef RELY_ON_DB
600          execSql(conf,1,req);
601#endif
602          free(fcontent);
603          free(req);
604        }
605      }
606
607      // Fetching data inputs
608      maps* curs=inputs;
609      dumpMaps(curs);
610      const char *keys[11][2]={
611        {
612          "xlink:href",
613          "ref_download_link"
614        },
615        {
616          "cache_file",
617          "cachefile"
618        },
619        {
620          "fmimeType",
621          "mimetype"
622        },
623        {
624          "size",
625          "size"
626        },
627        {
628          "ref_wms_link",
629          "ref_wms_link"
630        },
631        {
632          "ref_wfs_link",
633          "ref_wfs_link"
634        },
635        {
636          "ref_wcs_link",
637          "ref_wcs_link"
638        },
639        {
640          "ref_wcs_link",
641          "ref_wcs_link"
642        },
643        {
644          "ref_wcs_preview_link",
645          "ref_wcs_preview_link"
646        },
647        {
648          "geodatatype",
649          "datatype"
650        },
651        {
652          "wgs84_extent",
653          "boundingbox"
654        }       
655      };
656      json_object *res1=json_object_new_object();
657      while(curs!=NULL){
658        if(getMap(curs->content,"length")==NULL){
659          addToMap(curs->content,"length","1");
660        }
661        map* length=getMap(curs->content,"length");
662        int len=atoi(length->value);
663        json_object *res3;
664        int hasRef=-1;
665        for(int ii=0;ii<len;ii++){
666          map* tmpMap=getMapArray(curs->content,"cache_file",ii);
667          sid=getMapArray(curs->content,"ref_wms_link",ii);
668          json_object *res2=json_object_new_object();
669          if(tmpMap!=NULL){
670            if(sid==NULL){
671              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
672              setMapArray(curs->content,"storage",ii,tmpMap->value);
673            }
674            struct stat buf;
675            char timeStr[ 100 ] = "";
676            if (stat(tmpMap->value, &buf)==0){
677              strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
678              json_object *jsStr=json_object_new_string(timeStr);
679              json_object_object_add(res2,"creation_date",jsStr);
680            }
681            tmpMap=getMapArray(curs->content,"fmimeType",ii);
682            if(tmpMap!=NULL){
683              setMapArray(curs->content,"mimeType",ii,tmpMap->value);
684            }
685            setReferenceUrl(conf,curs);
686          }else{         
687          }
688          addIntToMap(curs->content,"published_id",ii+1);
689          int i=0;
690          for(;i<11;i++){
691            sid=getMapArray(curs->content,keys[i][0],ii);
692            if(sid!=NULL){
693              json_object *jsStr=json_object_new_string(sid->value);
694              json_object_object_add(res2,keys[i][1],jsStr);
695              if(i==0){
696                hasRef=1;
697                json_object *jsStr1=json_object_new_string(getProvenance(conf,sid->value));
698                json_object_object_add(res2,"dataOrigin",jsStr1);
699              }
700            }
701          }
702          if(len>1){
703            if(ii==0)
704              res3=json_object_new_array();
705            json_object_array_add(res3,res2);
706          }else
707            res3=res2;
708        }
709        if(hasRef<0)
710          json_object_put(res3);
711        else{
712          json_object_object_add(res1,curs->name,json_object_get(res3));
713          json_object_put(res3);
714        }
715        addIntToMap(curs->content,"published_id",0);
716        curs=curs->next;
717      }
718      json_object_object_add(res,"inputs",res1);
719      break;
720    }
721     
722    case 2: {
723      // Uploading data input to cluster
724      maps* in=getMaps(conf,"uploadQueue");
725      if(in!=NULL){
726        maps* curs=in;
727        map* length=getMapFromMaps(in,"uploadQueue","length");
728        if(length!=NULL){
729          json_object *res1=json_object_new_object();
730          int limit=atoi(length->value);
731          int i=0;
732          maps* uploadQueue=getMaps(in,"uploadQueue");
733          map* tmp=uploadQueue->content;
734          for(;i<limit;i++){
735            map* tmp0=getMapArray(tmp,"input",i);
736            map* tmp1=getMapArray(tmp,"localPath",i);
737            map* tmp2=getMapArray(tmp,"targetPath",i);
738            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
739              json_object *res2=json_object_new_object();
740              json_object *jsStr=json_object_new_string(tmp1->value);
741              json_object_object_add(res2,"local_path",jsStr);
742              jsStr=json_object_new_string(tmp2->value);
743              json_object_object_add(res2,"target_path",jsStr);
744              json_object *res4=NULL;
745              if(json_object_object_get_ex(res1,tmp0->value,&res4)!=FALSE){
746                if(json_object_is_type(res4,json_type_null)){
747                  json_object_object_add(res1,tmp0->value,res2);
748                }else{
749                  if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){
750                    json_object *res3=json_object_new_array();
751                    json_object_array_add(res3,json_object_get(res4));
752                    json_object_array_add(res3,res2);
753                    json_object_object_del(res1,tmp0->value);
754                    json_object_object_add(res1,tmp0->value,res3);
755                  }else
756                    json_object_array_add(res4,res2);
757                }
758              }
759            }
760          }
761          json_object_object_add(res,"inputs",res1);
762        }
763      }
764      break;
765    }
766     
767    case 3: {
768      // Generating job script
769      sid=getMapFromMaps(conf,"lenv","local_script");
770      if(sid!=NULL){
771        json_object *jsStr=json_object_new_string(sid->value);
772        json_object_object_add(res,"script",jsStr);
773      }
774      break;
775    }
776     
777    case 4: {
778      // Submitting job to cluster
779      sid=getMapFromMaps(conf,"lenv","remote_script");
780      if(sid!=NULL){
781        json_object *jsStr=json_object_new_string(sid->value);
782        json_object_object_add(res,"script",jsStr);
783      }
784      break;
785    }
786     
787    case 5: {
788      // Downloading process outputs from cluster
789      maps* curs=outputs;
790      dumpMaps(curs);
791      const char *keys[10][2]={
792        {
793          "Reference",
794          "ref"
795        },
796        {
797          "generated_file",
798          "cachefile"
799        },
800        {
801          "mimeType",
802          "mimetype"
803        },
804        {
805          "size",
806          "size"
807        },
808        {
809          "geodatatype",
810          "datatype"
811        },
812        {
813          "wgs84_extent",
814          "boundingbox"
815        },
816        {
817          "ref_wms_link",
818          "ref_wms_link"
819        },
820        {
821          "ref_wcs_link",
822          "ref_wcs_link"
823        },
824        {
825          "ref_wcs_preview_link",
826          "ref_wcs_preview_link"
827        },
828        {
829          "ref_wfs_link",
830          "ref_wfs_link"
831        }       
832      };
833      const char* specifics[5][2]={
834        {
835          "download_link",
836          "ref_download_link"
837        },
838        {
839          "wms_link",
840          "ref_wms_link"
841        },
842        {
843          "wfs_link",
844          "ref_wfs_link"
845        },
846        {
847          "wcs_link",
848          "ref_wcs_link"
849        },
850        {
851          "wcs_link",
852          "ref_wcs_preview_link"
853        }
854      };
855      json_object *res1=json_object_new_object();
856      while(curs!=NULL){       
857        json_object *res2=json_object_new_object();
858        int i=0;
859        int hasRef=-1;
860        for(;i<10;i++){
861          sid=getMap(curs->content,keys[i][0]);
862          if(sid!=NULL){
863            json_object *jsStr=json_object_new_string(sid->value);
864            json_object_object_add(res2,keys[i][1],jsStr);
865            if(i==0){
866              hasRef=1;
867              json_object_object_add(res2,"ref_download_link",jsStr);
868            }
869            if(i==1){
870              struct stat buf;
871              char timeStr[ 100 ] = "";
872              if (stat(sid->value, &buf)==0){
873                strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
874                json_object *jsStr=json_object_new_string(timeStr);
875                json_object_object_add(res2,"creation_date",jsStr);
876              }
877            }
878          }
879        }
880        if(hasRef>0)
881          json_object_object_add(res1,curs->name,res2);
882        else{
883          maps* curs0=curs->child;
884          int i=0;
885          int bypass=-1;
886          for(i=0;i<5;i++){
887            maps* specificMaps;
888            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
889              int hasRef0=-1;
890              int i0=0;
891              for(;i0<6;i0++){
892                sid=getMap(specificMaps->content,keys[i0][0]);
893                if(sid!=NULL){
894                  json_object *jsStr=json_object_new_string(sid->value);
895                  if(i0==0){
896                    json_object_object_add(res2,specifics[i][1],jsStr);
897                  }
898                  else
899                    json_object_object_add(res2,keys[i0][1],jsStr);
900                  hasRef0=1;
901                  bypass=1;
902                  if(i==1){
903                    struct stat buf;
904                    char timeStr[ 100 ] = "";
905                    if (stat(sid->value, &buf)==0){
906                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
907                      json_object *jsStr=json_object_new_string(timeStr);
908                      json_object_object_add(res2,"creation_date",jsStr);
909                    }
910                  }
911                }
912              }       
913            }
914          }
915          if(bypass<0)
916            while(curs0!=NULL){
917              json_object *res3=json_object_new_object();
918              int i0=0;
919              int hasRef0=-1;
920              for(;i0<10;i0++){
921                sid=getMap(curs0->content,keys[i0][0]);
922                if(sid!=NULL){
923                  json_object *jsStr=json_object_new_string(sid->value);
924                  json_object_object_add(res3,keys[i0][1],jsStr);
925                  hasRef0=1;
926                }
927              }
928              if(hasRef0<0)
929                json_object_put(res3);
930              else
931                json_object_object_add(res2,curs0->name,res3);
932              curs0=curs0->next;
933            }
934          json_object_object_add(res1,curs->name,res2);
935        }
936        curs=curs->next;
937      }
938      json_object_object_add(res,"outputs",res1);
939      break;
940    }
941     
942    case 6: {
943      // Finalize HPC
944      const char *keys[6][2]={
945        {
946          //"SubmitTime",
947          "Submit",
948          "hpc_submission_date"
949        },
950        {
951          "JobId",
952          "hpc_job_identifier"
953        },
954        {
955          "JobName",
956          "hpc_job_name"
957        },
958        {
959          //"StartTime",
960          "Start",
961          "hpc_start_date"
962        },
963        {
964          //"EndTime",
965          "End",
966          "hpc_end_date"
967        },
968        {
969          //"JobState",
970          "State",
971          "hpc_status"
972        }       
973      };
974      int i=0;
975      if(getMaps(conf,"henv")!=NULL){
976        for(i=0;i<6;i++){
977          sid=getMapFromMaps(conf,"henv",keys[i][0]);
978          if(sid!=NULL){
979            json_object *jsStr=json_object_new_string(sid->value);
980            json_object_object_add(res,keys[i][1],jsStr);
981          }
982        }
983      }
984      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
985        json_object *jsStr=json_object_new_string(sid->value);
986        json_object_object_add(res,"hpc_cpu_usage",jsStr);
987      }else{
988        json_object *jsStr=json_object_new_string("1");
989        json_object_object_add(res,"hpc_cpu_usage",jsStr);
990      }
991      json_object *jsStr=json_object_new_string("succeeded");
992      json_object_object_add(res,"wps_status",jsStr);
993      break;
994    }
995     
996    case 7: {
997      // Error or Dismiss
998      sid=getMapFromMaps(conf,"lenv","message");
999      if(sid!=NULL){
1000        json_object *jsStr=json_object_new_string(sid->value);
1001        json_object_object_add(res,"message",jsStr);
1002      }
1003      json_object *jsStr;
1004      if(state==1)
1005        jsStr=json_object_new_string("dismissed");
1006      else
1007        jsStr=json_object_new_string("failed");
1008      json_object_object_add(res,"wps_status",jsStr);
1009      break;
1010    }
1011    others: {
1012        break;
1013      }
1014    }
1015
1016    if(local_arguments==NULL)
1017      local_arguments=(local_params**)malloc(sizeof(local_params*));
1018    else
1019      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
1020    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
1021    local_arguments[nbThreads]->conf=conf;
1022    local_arguments[nbThreads]->url=url;
1023    local_arguments[nbThreads]->res=res;
1024    local_arguments[nbThreads]->step=step;
1025    local_arguments[nbThreads]->state=state;
1026    if(myThreads==NULL)
1027      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
1028    else
1029      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
1030    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
1031      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
1032      return false;
1033    }
1034    nbThreads++;
1035    return true;
1036  }
1037
1038  /**
1039   * Wait for the threads to end then, clean used memory.
1040   */
1041  void cleanupCallbackThreads(){
1042    while( isOngoing>0 ){
1043      zSleep(100);
1044    }
1045    int i=0;
1046    for(i=0;i<nbThreads;i++){
1047      pthread_join(myThreads[i],NULL);
1048      free(local_arguments[i]);
1049    }
1050    free(local_arguments);
1051    free(myThreads);
1052  }
1053
1054#ifdef __cplusplus
1055}
1056#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png