source: trunk/zoo-project/zoo-kernel/service_callback.c @ 959

Last change on this file since 959 was 955, checked in by djay, 5 years ago

Fix issue when RELY_ON_DB is on and data is published. Ensure to use WPS 3 only when required. Set wmfs_link, wfs_link or wcs_link only when found in the metadata.

  • Property svn:keywords set to Id
File size: 25.4 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017-2019 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_json.h"
30#include "service_internal_ms.h"
31#include "sqlapi.h"
32#include <pthread.h>
33#include <libxml/tree.h>
34#include <libxml/parser.h>
35#include <libxml/xpath.h>
36#include <libxml/xpathInternals.h>
37
38#include <libxslt/xslt.h>
39#include <libxslt/xsltInternals.h>
40#include <libxslt/transform.h>
41#include <libxslt/xsltutils.h>
42
43#include "service_callback.h"
44
45#ifdef __cplusplus
46extern "C" {
47#endif
48
49  /**
50   * Parameter definition to be used for sending parameters to a thread.
51   */
52  typedef struct {
53    maps *conf;      //!< the main configuration file
54    map *url;        //!< the callback url maps
55    json_object *res;//!< the JSON object to post
56    int step;        //!< the current step [0,6]
57    int state;       //!< the current state [0,1]
58  } local_params;
59
60  /**
61   * Number of threads
62   */
63  int nbThreads=0;
64  /**
65   * Current step
66   */
67  int cStep=0;
68  /**
69   * Is there any ongoing HTTP request
70   */
71  int isOngoing=0;
72  /**
73   * Threads array
74   */
75  pthread_t* myThreads=NULL;
76  /**
77   * Steps array
78   */
79  bool steps[7][2]={
80    {false,false},
81    {false,false},
82    {false,false},
83    {false,false},
84    {false,false},
85    {false,false},
86    {false,false}
87  };
88  /**
89   * Arguments array to give to the _invokeCallback thread's function
90   */
91  local_params** local_arguments;
92 
93  /**
94   * Check if a service name is prohibited, meaning that the Kernel doesn't have
95   * to invoke the callback for this specific service.
96   *
97   * @param conf the main configuration file maps
98   * @param serviceName the serviceName
99   * @return a bool true if the service is prohibited, false in other case
100   */
101  bool isProhibited(maps* conf,const char* serviceName){
102    map* plist=getMapFromMaps(conf,"callback","prohibited");
103    if(plist!=NULL){
104      char *tmp=plist->value;
105      char *tmpS=strtok(tmp,",");
106      while(tmpS!=NULL){
107        if(strcmp(serviceName,tmpS)==0)
108          return true;
109        tmpS=strtok(NULL,",");
110      }
111    }
112    return false;
113  }
114
115  /**
116   * Practically invoke the callback, meaning sending the HTTP POST request.
117   *
118   * @param args local_params containing all the variables required
119   */
120  void* _invokeCallback(void* args){
121    local_params* arg=(local_params*)args;
122    HINTERNET hInternet,res1;
123    const struct tm *tm;
124    size_t len;
125    time_t now;
126    char *tmp1;
127    map *tmpStatus;
128    maps* tmpConf=createMaps("main");
129    tmpConf->content=createMap("memory","load");
130    hInternet=InternetOpen("ZooWPSClient\0",
131                           INTERNET_OPEN_TYPE_PRECONFIG,
132                           NULL,NULL, 0);
133    if(!CHECK_INET_HANDLE(hInternet)){
134      InternetCloseHandle (&hInternet);
135      return NULL;
136    }
137    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
138    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
139    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
140    hInternet.waitingRequests[0] = zStrdup(URL);
141    free(URL);
142#ifdef CALLBACK_DEBUG
143    now = time ( NULL );
144    tm = localtime ( &now );
145    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
146    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
147    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
148    fprintf(stderr," * JSON: [%s] \n",jsonStr);
149    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
150    fprintf(stderr," * DATE: %s/ \n\n",tmp1);
151    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
152    free(tmp1);
153#endif
154    while( (arg->step!=7 || isOngoing>0) &&
155           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
156           ){
157      zSleep(100);
158    }
159    isOngoing=1;
160#ifdef CALLBACK_DEBUG
161    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
162    int i=0;
163    for(i=0;i<7;i++){
164      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
165    }
166#endif
167   
168    now = time ( NULL );
169    tm = localtime ( &now );
170   
171    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
172    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
173
174#ifdef CALLBACK_DEBUG   
175    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
176    fflush(stderr);
177#endif   
178    free(tmp1);
179    res1 = InternetOpenUrl (&hInternet,
180                            hInternet.waitingRequests[0], 
181                            (char*)jsonStr, strlen(jsonStr),
182                            INTERNET_FLAG_NO_CACHE_WRITE,
183                            0,tmpConf);
184    AddHeaderEntries(&hInternet,arg->conf);
185    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
186    processDownloads(&hInternet);
187    freeMaps(&tmpConf);
188    free(tmpConf);
189    now = time ( NULL );
190    tm = localtime ( &now );
191    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
192    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
193   
194#ifdef CALLBACK_DEBUG   
195    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
196#endif
197    free(tmp1);
198    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
199                                 * sizeof (char));
200    if (tmp == NULL)
201      {
202        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
203        setMapInMaps(arg->conf,"lenv","code","InternalError");
204        return NULL;
205      }
206    size_t bRead;
207    InternetReadFile (hInternet.ihandle[0],
208                      (LPVOID) tmp,
209                      hInternet.
210                      ihandle[0].nDataLen,
211                      &bRead);
212    tmp[hInternet.ihandle[0].nDataLen] = 0;
213    json_object_put(arg->res);
214    InternetCloseHandle(&hInternet);
215    isOngoing=0;
216    if(cStep==0 || cStep==6 || arg->state==1)
217      cStep=arg->step+1;
218#ifdef CALLBACK_DEBUG
219    now = time ( NULL );
220    tm = localtime ( &now );
221    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
222    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
223    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
224    for(i=0;i<7;i++){
225      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
226    }
227    fprintf(stderr,"Result: \n%s\n\n",tmp);
228    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
229    fflush(stderr);
230    free(tmp1);
231#endif
232    steps[arg->step][arg->state]=true;
233    free(tmp);
234#ifdef CALLBACK_DEBUG
235    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
236    fflush(stderr);
237#endif
238    pthread_exit(NULL);
239  }
240 
241  /**
242   * Invoke the callback in case there is a [callback] section containing a url parameter
243   *
244   * @param m the maps containing the main configuration file definitions
245   * @param inputs the inputs defined in the request (can be null if not yet initialized)
246   * @param inputs the outputs provided in the request (can be null if not yet initialized)
247   * @param step the step number, steps are defined as:
248   *  0: Analyze creation
249   *  1: Fetching Data Inputs
250   *  2: Uploading data inputs to cluster
251   *  3: Creating Job Script
252   *  4: Submitting Job to Cluster
253   *  5: Downloading processed output from cluster
254   *  6: Finalize
255   *  7: Dismiss or Error
256   * @param state 0 in case the step starts, 1 when it ends
257   * @return bool true in case of success, false in other cases
258   */
259  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
260    map* url=getMapFromMaps(conf,"callback","url");
261    if(url==NULL)
262      return false;
263     
264    maps* lenv=getMaps(conf,"lenv");
265    map* sname=getMap(lenv->content,"identifier");
266    if(sname!=NULL && isProhibited(conf,sname->value))
267      return false;
268     
269    json_object *res=json_object_new_object();
270
271    map* sid=getMapFromMaps(conf,"lenv","usid");
272    if(sid!=NULL){
273      json_object *jsStr=json_object_new_string(sid->value);
274      json_object_object_add(res,"jobid",jsStr);
275    }
276    const struct tm *tm;
277    size_t len;
278    time_t now;
279    char *tmp1;
280    map *tmpStatus;
281 
282    now = time ( NULL );
283    tm = localtime ( &now );
284
285    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
286    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
287    json_object *jsStr0=json_object_new_string(tmp1);
288    json_object_object_add(res,"datetime",jsStr0);
289    free(tmp1);
290   
291    switch(step){
292    case 0: {
293      // Create a new analyze
294      maps* lenv=getMaps(conf,"lenv");
295      sid=getMapFromMaps(conf,"renv","xrequest");
296      if(sid!=NULL){
297        json_object *jsStr=json_object_new_string(sid->value);
298        json_object_object_add(res,"request_execute_content",jsStr);
299      }
300      sid=getMapFromMaps(conf,"lenv","identifier");
301      if(sid!=NULL){
302        json_object *jsStr=json_object_new_string(sid->value);
303        json_object_object_add(res,"process_identifier",jsStr);
304      }
305      // Save the Execute request on disk
306      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
307      map* req=getMapFromMaps(conf,"renv","xrequest");
308      sid=getMapFromMaps(conf,"lenv","usid");
309      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
310      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
311      FILE* saveExecute=fopen(executePath,"wb");
312      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
313      fflush(saveExecute);
314      fclose(saveExecute);
315      setMapInMaps(conf,"lenv","execute_file",executePath);
316      free(executePath);
317      break;
318    }
319     
320    case 1: {
321      // Update the execute request stored on disk at step 0,0 to modify the references used.
322      if(state==1){
323        maps* curs=inputs;
324        xmlInitParser();
325        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
326        while(curs!=NULL){
327          map* length=getMap(curs->content,"length");
328          map* useMS=getMap(curs->content,"useMapserver");
329          if(length==NULL){
330            addToMap(curs->content,"length","1");
331            length=getMap(curs->content,"length");
332          }
333          int len=atoi(length->value);
334          for(int ii=0;ii<len;ii++){
335            if(getMapArray(curs->content,"byValue",ii)!=NULL && getMapArray(curs->content,"mimeType",ii)!=NULL && useMS!=NULL && strncasecmp(useMS->value,"true",4)==0){
336              map* tmpMap=getMapArray(curs->content,"value",ii);
337              char tmpStr[100];
338              sprintf(tmpStr,"%d",strlen(tmpMap->value));
339              setMapArray(curs->content,"size",ii,tmpStr);
340              tmpMap=getMapArray(curs->content,"mimeType",ii);
341              setMapArray(curs->content,"fmimeType",ii,tmpMap->value);
342              tmpMap=getMapArray(curs->content,"cache_file",ii);
343              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
344              setMapArray(curs->content,"storage",ii,tmpMap->value);
345              setReferenceUrl(conf,curs);
346              addIntToMap(curs->content,"published_id",ii+1);
347              const char *params[7];
348              int xmlLoadExtDtdDefaultValue;
349              int hasFile=-1;
350              map* xslPath=getMapFromMaps(conf,"callback","template");
351              map* filePath=getMapArray(curs->content,"ref_wfs_link",ii);
352              if(filePath==NULL)
353                filePath=getMap(curs->content,"ref_wcs_link");
354              char* inputName=curs->name;
355              if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
356                break;
357              char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
358              char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
359              char tmpParam2[16];
360              sprintf(tmpParam2,"string(\"%d\")",ii);
361              setMapArray(curs->content,"href",ii,filePath->value);
362              setMapArray(curs->content,"xlink:href",ii,filePath->value);
363              tmpMap=getMapArray(curs->content,"cache_url",ii);
364              if(tmpMap!=NULL)
365                setMapArray(curs->content,"xlink:href",ii,tmpMap->value);
366              else
367                setMapArray(curs->content,"xlink:href",ii,filePath->value);
368              sprintf(tmpParam,"string(\"%s\")",curs->name);
369              sprintf(tmpParam1,"string(\"%s\")",filePath->value);
370              sprintf(tmpParam2,"string(\"%d\")",ii);
371              params[0]="attr";
372              params[1]=tmpParam;
373              params[2]="value";
374              params[3]=tmpParam1;//filePath->value;
375              params[4]="cnt";
376              params[5]=tmpParam2;
377              params[6]=NULL;
378              fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s INDEX: %s\n",
379                      tmpParam,tmpParam1,tmpParam2);
380              fflush(stderr);
381              xmlSubstituteEntitiesDefault(1);
382              xmlLoadExtDtdDefaultValue = 0;
383              xsltStylesheetPtr cur = NULL;
384              xmlDocPtr doc, res;
385              cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
386              doc = xmlParseFile(xmlPath->value);
387              fflush(stderr);
388              res = xsltApplyStylesheet(cur, doc, params);
389              xmlChar *xmlbuff;
390              int buffersize;
391              xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
392              // Store the executeRequest in file again
393              free(tmpParam);
394              free(tmpParam1);
395              fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
396              fflush(stderr);
397              FILE* saveExecute=fopen(xmlPath->value,"wb");
398              if(saveExecute!=NULL){
399                fwrite(xmlbuff,1,buffersize,saveExecute);
400                fflush(saveExecute);
401                fclose(saveExecute);
402              }
403              xmlFree(xmlbuff);
404              xmlFreeDoc(doc);
405              xsltFreeStylesheet(cur);
406            }
407          }
408          addIntToMap(curs->content,"published_id",0);
409          curs=curs->next;
410        }
411        xmlCleanupParser();
412        FILE* f0=fopen(xmlPath->value,"rb");
413        if(f0!=NULL){
414          long flen;
415          char *fcontent;
416          fseek (f0, 0, SEEK_END);
417          flen = ftell (f0);
418          fseek (f0, 0, SEEK_SET);
419          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
420          fread(fcontent,flen,1,f0);
421          fcontent[flen]=0;
422          fclose(f0);
423          map *schema=getMapFromMaps(conf,"database","schema");
424          map* sid=getMapFromMaps(conf,"lenv","usid");
425          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
426          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
427#ifdef RELY_ON_DB
428          execSql(conf,1,req);
429#endif
430          free(fcontent);
431          free(req);
432        }
433      }
434
435      // Fetching data inputs
436      maps* curs=inputs;
437      dumpMaps(curs);
438      const char *keys[11][2]={
439        {
440          "xlink:href",
441          "ref_download_link"
442        },
443        {
444          "cache_file",
445          "cachefile"
446        },
447        {
448          "fmimeType",
449          "mimetype"
450        },
451        {
452          "size",
453          "size"
454        },
455        {
456          "ref_wms_link",
457          "ref_wms_link"
458        },
459        {
460          "ref_wfs_link",
461          "ref_wfs_link"
462        },
463        {
464          "ref_wcs_link",
465          "ref_wcs_link"
466        },
467        {
468          "ref_wcs_link",
469          "ref_wcs_link"
470        },
471        {
472          "ref_wcs_preview_link",
473          "ref_wcs_preview_link"
474        },
475        {
476          "geodatatype",
477          "datatype"
478        },
479        {
480          "wgs84_extent",
481          "boundingbox"
482        }       
483      };
484      json_object *res1=json_object_new_object();
485      while(curs!=NULL){
486        if(getMap(curs->content,"length")==NULL){
487          addToMap(curs->content,"length","1");
488        }
489        map* length=getMap(curs->content,"length");
490        int len=atoi(length->value);
491        json_object *res3;
492        int hasRef=-1;
493        for(int ii=0;ii<len;ii++){
494          map* tmpMap=getMapArray(curs->content,"cache_file",ii);
495          sid=getMapArray(curs->content,"ref_wms_link",ii);
496          json_object *res2=json_object_new_object();
497          if(tmpMap!=NULL){
498            if(sid==NULL){
499              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
500              setMapArray(curs->content,"storage",ii,tmpMap->value);
501            }
502            struct stat buf;
503            char timeStr[ 100 ] = "";
504            if (stat(tmpMap->value, &buf)==0){
505              strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
506              json_object *jsStr=json_object_new_string(timeStr);
507              json_object_object_add(res2,"creation_date",jsStr);
508            }
509            tmpMap=getMapArray(curs->content,"fmimeType",ii);
510            if(tmpMap!=NULL){
511              setMapArray(curs->content,"mimeType",ii,tmpMap->value);
512            }
513            setReferenceUrl(conf,curs);
514          }else{         
515          }
516          addIntToMap(curs->content,"published_id",ii+1);
517          int i=0;
518          for(;i<11;i++){
519            sid=getMapArray(curs->content,keys[i][0],ii);
520            if(sid!=NULL){
521              json_object *jsStr=json_object_new_string(sid->value);
522              json_object_object_add(res2,keys[i][1],jsStr);
523              if(i==0){
524                hasRef=1;
525                json_object *jsStr1=json_object_new_string(getProvenance(conf,sid->value));
526                json_object_object_add(res2,"dataOrigin",jsStr1);
527              }
528            }
529          }
530          if(len>1){
531            if(ii==0)
532              res3=json_object_new_array();
533            json_object_array_add(res3,res2);
534          }else
535            res3=res2;
536        }
537        if(hasRef<0)
538          json_object_put(res3);
539        else{
540          json_object_object_add(res1,curs->name,json_object_get(res3));
541          json_object_put(res3);
542        }
543        addIntToMap(curs->content,"published_id",0);
544        curs=curs->next;
545      }
546      json_object_object_add(res,"inputs",res1);
547      break;
548    }
549     
550    case 2: {
551      // Uploading data input to cluster
552      maps* in=getMaps(conf,"uploadQueue");
553      if(in!=NULL){
554        maps* curs=in;
555        map* length=getMapFromMaps(in,"uploadQueue","length");
556        if(length!=NULL){
557          json_object *res1=json_object_new_object();
558          int limit=atoi(length->value);
559          int i=0;
560          maps* uploadQueue=getMaps(in,"uploadQueue");
561          map* tmp=uploadQueue->content;
562          for(;i<limit;i++){
563            map* tmp0=getMapArray(tmp,"input",i);
564            map* tmp1=getMapArray(tmp,"localPath",i);
565            map* tmp2=getMapArray(tmp,"targetPath",i);
566            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
567              json_object *res2=json_object_new_object();
568              json_object *jsStr=json_object_new_string(tmp1->value);
569              json_object_object_add(res2,"local_path",jsStr);
570              jsStr=json_object_new_string(tmp2->value);
571              json_object_object_add(res2,"target_path",jsStr);
572              json_object *res4=json_object_object_get(res1,tmp0->value);
573              if(json_object_is_type(res4,json_type_null)){
574                json_object_object_add(res1,tmp0->value,res2);
575              }else{
576                if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){
577                  json_object *res3=json_object_new_array();
578                  json_object_array_add(res3,json_object_get(res4));
579                  json_object_array_add(res3,res2);
580                  json_object_object_del(res1,tmp0->value);
581                  json_object_object_add(res1,tmp0->value,res3);
582                }else
583                  json_object_array_add(res4,res2);
584              }
585            }
586          }
587          json_object_object_add(res,"inputs",res1);
588        }
589      }
590      break;
591    }
592     
593    case 3: {
594      // Generating job script
595      sid=getMapFromMaps(conf,"lenv","local_script");
596      if(sid!=NULL){
597        json_object *jsStr=json_object_new_string(sid->value);
598        json_object_object_add(res,"script",jsStr);
599      }
600      break;
601    }
602     
603    case 4: {
604      // Submitting job to cluster
605      sid=getMapFromMaps(conf,"lenv","remote_script");
606      if(sid!=NULL){
607        json_object *jsStr=json_object_new_string(sid->value);
608        json_object_object_add(res,"script",jsStr);
609      }
610      break;
611    }
612     
613    case 5: {
614      // Downloading process outputs from cluster
615      maps* curs=outputs;
616      dumpMaps(curs);
617      const char *keys[10][2]={
618        {
619          "Reference",
620          "ref"
621        },
622        {
623          "generated_file",
624          "cachefile"
625        },
626        {
627          "mimeType",
628          "mimetype"
629        },
630        {
631          "size",
632          "size"
633        },
634        {
635          "geodatatype",
636          "datatype"
637        },
638        {
639          "wgs84_extent",
640          "boundingbox"
641        },
642        {
643          "ref_wms_link",
644          "ref_wms_link"
645        },
646        {
647          "ref_wcs_link",
648          "ref_wcs_link"
649        },
650        {
651          "ref_wcs_preview_link",
652          "ref_wcs_preview_link"
653        },
654        {
655          "ref_wfs_link",
656          "ref_wfs_link"
657        }       
658      };
659      const char* specifics[5][2]={
660        {
661          "download_link",
662          "ref_download_link"
663        },
664        {
665          "wms_link",
666          "ref_wms_link"
667        },
668        {
669          "wfs_link",
670          "ref_wfs_link"
671        },
672        {
673          "wcs_link",
674          "ref_wcs_link"
675        },
676        {
677          "wcs_link",
678          "ref_wcs_preview_link"
679        }
680      };
681      json_object *res1=json_object_new_object();
682      while(curs!=NULL){       
683        json_object *res2=json_object_new_object();
684        int i=0;
685        int hasRef=-1;
686        for(;i<10;i++){
687          sid=getMap(curs->content,keys[i][0]);
688          if(sid!=NULL){
689            json_object *jsStr=json_object_new_string(sid->value);
690            json_object_object_add(res2,keys[i][1],jsStr);
691            if(i==0){
692              hasRef=1;
693              json_object_object_add(res2,"ref_download_link",jsStr);
694            }
695            if(i==1){
696              struct stat buf;
697              char timeStr[ 100 ] = "";
698              if (stat(sid->value, &buf)==0){
699                strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
700                json_object *jsStr=json_object_new_string(timeStr);
701                json_object_object_add(res2,"creation_date",jsStr);
702              }
703            }
704          }
705        }
706        if(hasRef>0)
707          json_object_object_add(res1,curs->name,res2);
708        else{
709          maps* curs0=curs->child;
710          int i=0;
711          int bypass=-1;
712          for(i=0;i<5;i++){
713            maps* specificMaps;
714            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
715              int hasRef0=-1;
716              int i0=0;
717              for(;i0<6;i0++){
718                sid=getMap(specificMaps->content,keys[i0][0]);
719                if(sid!=NULL){
720                  json_object *jsStr=json_object_new_string(sid->value);
721                  if(i0==0){
722                    json_object_object_add(res2,specifics[i][1],jsStr);
723                  }
724                  else
725                    json_object_object_add(res2,keys[i0][1],jsStr);
726                  hasRef0=1;
727                  bypass=1;
728                  if(i==1){
729                    struct stat buf;
730                    char timeStr[ 100 ] = "";
731                    if (stat(sid->value, &buf)==0){
732                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
733                      json_object *jsStr=json_object_new_string(timeStr);
734                      json_object_object_add(res2,"creation_date",jsStr);
735                    }
736                  }
737                }
738              }       
739            }
740          }
741          if(bypass<0)
742            while(curs0!=NULL){
743              json_object *res3=json_object_new_object();
744              int i0=0;
745              int hasRef0=-1;
746              for(;i0<10;i0++){
747                sid=getMap(curs0->content,keys[i0][0]);
748                if(sid!=NULL){
749                  json_object *jsStr=json_object_new_string(sid->value);
750                  json_object_object_add(res3,keys[i0][1],jsStr);
751                  hasRef0=1;
752                }
753              }
754              if(hasRef0<0)
755                json_object_put(res3);
756              else
757                json_object_object_add(res2,curs0->name,res3);
758              curs0=curs0->next;
759            }
760          json_object_object_add(res1,curs->name,res2);
761        }
762        curs=curs->next;
763      }
764      json_object_object_add(res,"outputs",res1);
765      break;
766    }
767     
768    case 6: {
769      // Finalize HPC
770      const char *keys[6][2]={
771        {
772          //"SubmitTime",
773          "Submit",
774          "hpc_submission_date"
775        },
776        {
777          "JobId",
778          "hpc_job_identifier"
779        },
780        {
781          "JobName",
782          "hpc_job_name"
783        },
784        {
785          //"StartTime",
786          "Start",
787          "hpc_start_date"
788        },
789        {
790          //"EndTime",
791          "End",
792          "hpc_end_date"
793        },
794        {
795          //"JobState",
796          "State",
797          "hpc_status"
798        }       
799      };
800      int i=0;
801      if(getMaps(conf,"henv")!=NULL){
802        for(i=0;i<6;i++){
803          sid=getMapFromMaps(conf,"henv",keys[i][0]);
804          if(sid!=NULL){
805            json_object *jsStr=json_object_new_string(sid->value);
806            json_object_object_add(res,keys[i][1],jsStr);
807          }
808        }
809      }
810      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
811        json_object *jsStr=json_object_new_string(sid->value);
812        json_object_object_add(res,"hpc_cpu_usage",jsStr);
813      }else{
814        json_object *jsStr=json_object_new_string("1");
815        json_object_object_add(res,"hpc_cpu_usage",jsStr);
816      }
817      json_object *jsStr=json_object_new_string("succeeded");
818      json_object_object_add(res,"wps_status",jsStr);
819      break;
820    }
821     
822    case 7: {
823      // Error or Dismiss
824      sid=getMapFromMaps(conf,"lenv","message");
825      if(sid!=NULL){
826        json_object *jsStr=json_object_new_string(sid->value);
827        json_object_object_add(res,"message",jsStr);
828      }
829      json_object *jsStr;
830      if(state==1)
831        jsStr=json_object_new_string("dismissed");
832      else
833        jsStr=json_object_new_string("failed");
834      json_object_object_add(res,"wps_status",jsStr);
835      break;
836    }
837    others: {
838        break;
839      }
840    }
841
842    if(local_arguments==NULL)
843      local_arguments=(local_params**)malloc(sizeof(local_params*));
844    else
845      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
846    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
847    local_arguments[nbThreads]->conf=conf;
848    local_arguments[nbThreads]->url=url;
849    local_arguments[nbThreads]->res=res;
850    local_arguments[nbThreads]->step=step;
851    local_arguments[nbThreads]->state=state;
852    if(myThreads==NULL)
853      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
854    else
855      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
856    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
857      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
858      return false;
859    }
860    nbThreads++;
861    return true;
862  }
863
864  /**
865   * Wait for the threads to end then, clean used memory.
866   */
867  void cleanupCallbackThreads(){
868    int i=0;
869    for(i=0;i<nbThreads;i++){
870      pthread_join(myThreads[i],NULL);
871      free(local_arguments[i]);
872    }
873    free(local_arguments);
874    free(myThreads);
875  }
876
877#ifdef __cplusplus
878}
879#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png