source: branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c @ 854

Last change on this file since 854 was 854, checked in by djay, 6 years ago

HPC support update. Add inputs for create options in Gdal_Dem.

  • Property svn:keywords set to Id
File size: 24.0 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_callback.h"
30#include "service_json.h"
31#include "service_internal_ms.h"
32#include "sqlapi.h"
33#include <pthread.h>
34#include <libxml/tree.h>
35#include <libxml/parser.h>
36#include <libxml/xpath.h>
37#include <libxml/xpathInternals.h>
38
39#include <libxslt/xslt.h>
40#include <libxslt/xsltInternals.h>
41#include <libxslt/transform.h>
42#include <libxslt/xsltutils.h>
43
44#ifdef __cplusplus
45extern "C" {
46#endif
47
48  /**
49   * Parameter definition to be used for sending parameters to a thread.
50   */
51  typedef struct {
52    maps *conf;      //!< the main configuration file
53    map *url;        //!< the callback url maps
54    json_object *res;//!< the JSON object to post
55    int step;        //!< the current step [0,6]
56    int state;       //!< the current state [0,1]
57  } local_params;
58
59  /**
60   * Number of threads
61   */
62  int nbThreads=0;
63  /**
64   * Current step
65   */
66  int cStep=0;
67  /**
68   * Is there any ongoing HTTP request
69   */
70  int isOngoing=0;
71  /**
72   * Threads array
73   */
74  pthread_t* myThreads=NULL;
75  /**
76   * Steps array
77   */
78  bool steps[7][2]={
79    {false,false},
80    {false,false},
81    {false,false},
82    {false,false},
83    {false,false},
84    {false,false},
85    {false,false}
86  };
87  /**
88   * Arguments array to give to the _invokeCallback thread's function
89   */
90  local_params** local_arguments;
91 
92  /**
93   * Check if a service name is prohibited, meaning that the Kernel doesn't have
94   * to invoke the callback for this specific service.
95   *
96   * @param conf the main configuration file maps
97   * @param serviceName the serviceName
98   * @return a bool true if the service is prohibited, false in other case
99   */
100  bool isProhibited(maps* conf,const char* serviceName){
101    map* plist=getMapFromMaps(conf,"callback","prohibited");
102    if(plist!=NULL){
103      char *tmp=plist->value;
104      char *tmpS=strtok(tmp,",");
105      while(tmpS!=NULL){
106        if(strcmp(serviceName,tmpS)==0)
107          return true;
108        tmpS=strtok(NULL,",");
109      }
110    }
111    return false;
112  }
113
114
115  /**
116   * Practically invoke the callback, meaning sending the HTTP POST request.
117   *
118   * @param args local_params containing all the variables required
119   */
120  void* _invokeCallback(void* args){
121    local_params* arg=(local_params*)args;
122    HINTERNET hInternet,res1;
123    hInternet=InternetOpen("ZooWPSClient\0",
124                           INTERNET_OPEN_TYPE_PRECONFIG,
125                           NULL,NULL, 0);
126    if(!CHECK_INET_HANDLE(hInternet)){
127      InternetCloseHandle (&hInternet);
128      return false;
129    }
130    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
131    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
132    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
133    hInternet.waitingRequests[0] = zStrdup(URL);
134    free(URL);
135#ifdef CALLBACK_DEBUG
136    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
137    fprintf(stderr," * JSON: [%s] \n",jsonStr);
138    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
139    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
140#endif
141    while( (arg->step!=7 || isOngoing>0) &&
142           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
143           ){
144      struct timespec tv;
145      tv.tv_sec = 0;
146      tv.tv_nsec = (long) 5*1e+9;
147      nanosleep(&tv, &tv);
148      //sleep(1);
149    }
150    isOngoing=1;
151#ifdef CALLBACK_DEBUG
152    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
153    int i=0;
154    for(i=0;i<7;i++){
155      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
156    }
157#endif
158    const struct tm *tm;
159    size_t len;
160    time_t now;
161    char *tmp1;
162    map *tmpStatus;
163   
164    now = time ( NULL );
165    tm = localtime ( &now );
166   
167    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
168    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
169
170#ifdef CALLBACK_DEBUG   
171    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
172    fflush(stderr);
173#endif   
174    free(tmp1);
175    res1 = InternetOpenUrl (&hInternet,
176                            hInternet.waitingRequests[0], 
177                            (char*)jsonStr, strlen(jsonStr),
178                            INTERNET_FLAG_NO_CACHE_WRITE,
179                            0);
180    AddHeaderEntries(&hInternet,arg->conf);
181    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
182    processDownloads(&hInternet);
183    now = time ( NULL );
184    tm = localtime ( &now );
185    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
186    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
187   
188#ifdef CALLBACK_DEBUG   
189    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
190#endif   
191    free(tmp1);
192    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
193                                 * sizeof (char));
194    if (tmp == NULL)
195      {
196        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
197        setMapInMaps(arg->conf,"lenv","code","InternalError");
198        return NULL;
199      }
200    size_t bRead;
201    InternetReadFile (hInternet.ihandle[0],
202                      (LPVOID) tmp,
203                      hInternet.
204                      ihandle[0].nDataLen,
205                      &bRead);
206    tmp[hInternet.ihandle[0].nDataLen] = 0;
207    json_object_put(arg->res);
208    InternetCloseHandle(&hInternet);
209    isOngoing=0;
210    if(cStep==0 || cStep==6 || arg->state==1)
211      cStep=arg->step+1;
212#ifdef CALLBACK_DEBUG   
213    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT\n",pthread_self(),__FILE__,__LINE__);
214    for(i=0;i<7;i++){
215      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
216    }
217    fprintf(stderr,"Result: \n%s\n\n",tmp);
218    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
219    fflush(stderr);
220#endif
221    steps[arg->step][arg->state]=true;
222    free(tmp);
223    //free(args);
224    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
225    fflush(stderr);
226    pthread_exit(NULL);
227  }
228 
229  /**
230   * Invoke the callback in case there is a [callback] section containing a url parameter
231   *
232   * @param m the maps containing the main configuration file definitions
233   * @param inputs the inputs defined in the request (can be null if not yet initialized)
234   * @param inputs the outputs provided in the request (can be null if not yet initialized)
235   * @param step the step number, steps are defined as:
236   *  0: Analyze creation
237   *  1: Fetching Data Inputs
238   *  2: Uploading data inputs to cluster
239   *  3: Creating Job Script
240   *  4: Submitting Job to Cluster
241   *  5: Downloading processed output from cluster
242   *  6: Finalize
243   *  7: Dismiss or Error
244   * @param state 0 in case the step starts, 1 when it ends
245   * @return bool true in case of success, false in other cases
246   */
247  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
248    map* url=getMapFromMaps(conf,"callback","url");
249    if(url==NULL)
250      return false;
251     
252    maps* lenv=getMaps(conf,"lenv");
253    map* sname=getMap(lenv->content,"identifier");
254    if(sname!=NULL && isProhibited(conf,sname->value))
255      return false;
256     
257    json_object *res=json_object_new_object();
258
259    map* sid=getMapFromMaps(conf,"lenv","usid");
260    if(sid!=NULL){
261      json_object *jsStr=json_object_new_string(sid->value);
262      json_object_object_add(res,"jobid",jsStr);
263    }
264    const struct tm *tm;
265    size_t len;
266    time_t now;
267    char *tmp1;
268    map *tmpStatus;
269 
270    now = time ( NULL );
271    tm = localtime ( &now );
272
273    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
274    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
275    json_object *jsStr0=json_object_new_string(tmp1);
276    json_object_object_add(res,"datetime",jsStr0);
277    free(tmp1);
278   
279    switch(step){
280    case 0: {
281      // Create a new analyze
282      maps* lenv=getMaps(conf,"lenv");
283      sid=getMapFromMaps(conf,"renv","xrequest");
284      if(sid!=NULL){
285        json_object *jsStr=json_object_new_string(sid->value);
286        json_object_object_add(res,"request_execute_content",jsStr);
287      }
288      sid=getMapFromMaps(conf,"lenv","identifier");
289      if(sid!=NULL){
290        json_object *jsStr=json_object_new_string(sid->value);
291        json_object_object_add(res,"process_identifier",jsStr);
292      }
293      // Save the Execute request on disk
294      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
295      map* req=getMapFromMaps(conf,"renv","xrequest");
296      sid=getMapFromMaps(conf,"lenv","usid");
297      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
298      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
299      FILE* saveExecute=fopen(executePath,"wb");
300      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
301      fflush(saveExecute);
302      fclose(saveExecute);
303      setMapInMaps(conf,"lenv","execute_file",executePath);
304      free(executePath);
305      break;
306    }
307     
308    case 1: {
309      // Fetching data inputs
310      maps* curs=inputs;
311      dumpMaps(curs);
312      char *keys[10][2]={
313        {
314          "href",
315          "ref"
316        },
317        {
318          "cache_file",
319          "cachefile"
320        },
321        {
322          "fmimeType",
323          "mimetype"
324        },
325        {
326          "size",
327          "size"
328        },
329        {
330          "ref_wms_link",
331          "ref_wms_link"
332        },
333        {
334          "ref_wcs_link",
335          "ref_wcs_link"
336        },
337        {
338          "ref_wcs_link",
339          "ref_wcs_link"
340        },
341        {
342          "ref_wcs_preview_link",
343          "ref_wcs_preview_link"
344        },
345        {
346          "geodatatype",
347          "datatype"
348        },
349        {
350          "wcs_extent",
351          "boundingbox"
352        }       
353      };
354      json_object *res1=json_object_new_object();
355      while(curs!=NULL){
356        map* tmpMap=getMap(curs->content,"cache_file");
357        sid=getMap(curs->content,"ref_wms_link");
358        json_object *res2=json_object_new_object();
359        if(tmpMap!=NULL){
360          if(sid==NULL){
361            addToMap(curs->content,"generated_file",tmpMap->value);
362            addToMap(curs->content,"storage",tmpMap->value);
363          }
364          fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
365          dumpMap(curs->content);
366          struct stat buf;
367          char timeStr[ 100 ] = "";
368          if (stat(tmpMap->value, &buf)==0){
369            strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
370            json_object *jsStr=json_object_new_string(timeStr);
371            json_object_object_add(res2,"creation_date",jsStr);
372          }
373          tmpMap=getMap(curs->content,"fmimeType");
374          if(tmpMap!=NULL){
375            addToMap(curs->content,"mimeType",tmpMap->value);
376          }
377          setReferenceUrl(conf,curs);
378        }
379        int i=0;
380        int hasRef=-1;
381        for(;i<10;i++){
382          sid=getMap(curs->content,keys[i][0]);
383          if(sid!=NULL){
384            json_object *jsStr=json_object_new_string(sid->value);
385            json_object_object_add(res2,keys[i][1],jsStr);
386            if(i==0){
387              hasRef=1;
388              json_object *jsStr1=json_object_new_string(getProvenance(conf,url->value));
389              json_object_object_add(res2,"dataOrigin",jsStr1);
390            }
391          }
392        }
393        if(hasRef<0)
394          json_object_put(res2);
395        else
396          json_object_object_add(res1,curs->name,res2);
397        curs=curs->next;
398      }
399      json_object_object_add(res,"inputs",res1);
400      break;
401    }
402     
403    case 2: {
404      // Update the execute request stored on disk at step 1,1 to modify the references used.
405      if(state==0){
406        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
407        fflush(stderr);
408        maps* curs=inputs;
409        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
410        fflush(stderr);
411        xmlInitParser();
412        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
413        fflush(stderr);
414        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
415        dumpMap(xmlPath);
416        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
417        fflush(stderr);
418        while(curs!=NULL){
419          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
420          fflush(stderr);
421          dumpMap(curs->content);
422          //map* bvMap=getMap(curs->content,"byValue");
423          // TODO handle mapArray
424          //if(bvMap!=NULL && strncasecmp(bvMap->value,"true",4)==0){
425          if(getMap(curs->content,"href")==NULL && getMap(curs->content,"mimeType")!=NULL){
426            map* tmpMap=getMap(curs->content,"cache_file");
427            addToMap(curs->content,"generated_file",tmpMap->value);
428            addToMap(curs->content,"storage",tmpMap->value);
429            setReferenceUrl(conf,curs);
430            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
431            fflush(stderr);
432            dumpMap(curs->content);
433            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
434            fflush(stderr);
435            const char *params[5];
436            int xmlLoadExtDtdDefaultValue;
437            int hasFile=-1;
438            map* xslPath=getMapFromMaps(conf,"callback","template");
439            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
440            fflush(stderr);
441            dumpMap(xslPath);
442            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
443            fflush(stderr);
444            map* filePath=getMap(curs->content,"ref_wfs_link");
445            if(filePath==NULL)
446              filePath=getMap(curs->content,"ref_wcs_link");
447            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
448            fflush(stderr);
449            dumpMap(filePath);
450            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
451            fflush(stderr);
452            char* inputName=curs->name;
453            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
454            fflush(stderr);
455            if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
456              break;
457            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
458            fflush(stderr);
459            char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
460            char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
461            sprintf(tmpParam,"string(\"%s\")",curs->name);         
462            sprintf(tmpParam1,"string(\"%s\")",filePath->value);           
463            params[0]="attr";
464            params[1]=tmpParam;
465            params[2]="value";
466            params[3]=tmpParam1;//filePath->value;
467            params[4]=NULL;
468            fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s \n",
469                    tmpParam,tmpParam1);
470            xmlSubstituteEntitiesDefault(1);
471            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
472            fflush(stderr);
473            xmlLoadExtDtdDefaultValue = 0;
474            xsltStylesheetPtr cur = NULL;
475            xmlDocPtr doc, res;
476            cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
477            doc = xmlParseFile(xmlPath->value);
478            fflush(stderr);
479            res = xsltApplyStylesheet(cur, doc, params);
480            xmlChar *xmlbuff;
481            int buffersize;
482            xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
483            // Store the executeRequest in file again
484            free(tmpParam);
485            free(tmpParam1);
486            fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
487            fflush(stderr);
488            FILE* saveExecute=fopen(xmlPath->value,"wb");
489            fwrite(xmlbuff,1,buffersize,saveExecute);
490            fflush(saveExecute);
491            fclose(saveExecute);
492            xmlFree(xmlbuff);
493            xmlFreeDoc(doc);
494            xsltFreeStylesheet(cur);
495          }
496          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
497          fflush(stderr);
498          curs=curs->next;
499        }
500        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
501        fflush(stderr);
502        xmlCleanupParser();
503        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
504        fflush(stderr);
505        FILE* f0=fopen(xmlPath->value,"rb");
506        if(f0!=NULL){
507          long flen;
508          char *fcontent;
509          fseek (f0, 0, SEEK_END);
510          flen = ftell (f0);
511          fseek (f0, 0, SEEK_SET);
512          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
513          fread(fcontent,flen,1,f0);
514          fcontent[flen]=0;
515          fclose(f0);
516          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
517          fflush(stderr);
518          map *schema=getMapFromMaps(conf,"database","schema");
519          map* sid=getMapFromMaps(conf,"lenv","usid");
520          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
521          fflush(stderr);
522          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
523          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
524          fflush(stderr);
525          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
526          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
527          fflush(stderr);
528          execSql(conf,1,req);
529          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
530          fflush(stderr);
531          free(fcontent);
532          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
533          fflush(stderr);
534          free(req);
535          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
536          fflush(stderr);
537        }
538      }
539
540      // Uploading data input to cluster
541      maps* in=getMaps(conf,"uploadQueue");
542      if(in!=NULL){
543        maps* curs=in;
544        map* length=getMapFromMaps(in,"uploadQueue","length");
545        if(length!=NULL){
546          json_object *res1=json_object_new_object();
547          json_object *res2=json_object_new_object();
548          int limit=atoi(length->value);
549          int i=0;
550          maps* uploadQueue=getMaps(in,"uploadQueue");
551          map* tmp=uploadQueue->content;
552          for(;i<limit;i++){
553            map* tmp0=getMapArray(tmp,"input",i);
554            map* tmp1=getMapArray(tmp,"localPath",i);
555            map* tmp2=getMapArray(tmp,"targetPath",i);
556            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
557              json_object *jsStr=json_object_new_string(tmp1->value);
558              json_object_object_add(res2,"local_path",jsStr);
559              jsStr=json_object_new_string(tmp2->value);
560              json_object_object_add(res2,"target_path",jsStr);
561              json_object_object_add(res1,tmp0->value,res2);
562            }
563          }
564          json_object_object_add(res,"inputs",res1);
565        }
566        //json_object_object_add(res,"inputs",in);
567      }
568      break;
569    }
570     
571    case 3: {
572      // Generating job script
573      sid=getMapFromMaps(conf,"lenv","local_script");
574      if(sid!=NULL){
575        json_object *jsStr=json_object_new_string(sid->value);
576        json_object_object_add(res,"script",jsStr);
577      }
578      break;
579    }
580     
581    case 4: {
582      // Submitting job to cluster
583      sid=getMapFromMaps(conf,"lenv","remote_script");
584      if(sid!=NULL){
585        json_object *jsStr=json_object_new_string(sid->value);
586        json_object_object_add(res,"script",jsStr);
587      }
588      break;
589    }
590     
591    case 5: {
592      // Downloading process outputs from cluster
593      //json_object* in=mapsToJson(outputs);
594      maps* curs=outputs;
595      dumpMaps(curs);
596      char *keys[10][2]={
597        {
598          "Reference",
599          "ref"
600        },
601        {
602          "generated_file",
603          "cachefile"
604        },
605        {
606          "mimeType",
607          "mimetype"
608        },
609        {
610          "size",
611          "size"
612        },
613        {
614          "geodatatype",
615          "datatype"
616        },
617        {
618          "wms_extent",
619          "boundingbox"
620        },
621        {
622          "ref_wms_link",
623          "ref_wms_link"
624        },
625        {
626          "ref_wcs_link",
627          "ref_wcs_link"
628        },
629        {
630          "ref_wcs_link",
631          "ref_wcs_preview_link"
632        },
633        {
634          "ref_wfs_link",
635          "ref_wfs_link"
636        }       
637      };
638      char* specifics[5][2]={
639        {
640          "download_link",
641          "ref_download_link"
642        },
643        {
644          "wms_link",
645          "ref_wms_link"
646        },
647        {
648          "wfs_link",
649          "ref_wfs_link"
650        },
651        {
652          "wcs_link",
653          "ref_wcs_link"
654        },
655        {
656          "wcs_link",
657          "ref_wcs_preview_link"
658        }
659      };
660      json_object *res1=json_object_new_object();
661      while(curs!=NULL){       
662        json_object *res2=json_object_new_object();
663        int i=0;
664        int hasRef=-1;
665        for(;i<10;i++){
666          sid=getMap(curs->content,keys[i][0]);
667          if(sid!=NULL){
668            json_object *jsStr=json_object_new_string(sid->value);
669            json_object_object_add(res2,keys[i][1],jsStr);
670            if(i==0)
671              hasRef=1;
672          }
673        }
674        if(hasRef>0)
675          json_object_object_add(res1,curs->name,res2);
676        else{
677          maps* curs0=curs->child;
678          int i=0;
679          int bypass=-1;
680          for(i=0;i<5;i++){
681            maps* specificMaps;
682            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
683              int hasRef0=-1;
684              int i0=0;
685              for(;i0<6;i0++){
686                sid=getMap(specificMaps->content,keys[i0][0]);
687                if(sid!=NULL){
688                  json_object *jsStr=json_object_new_string(sid->value);
689                  if(i0==0){
690                    json_object_object_add(res2,specifics[i][1],jsStr);
691                  }
692                  else
693                    json_object_object_add(res2,keys[i0][1],jsStr);
694                  hasRef0=1;
695                  bypass=1;
696                  if(i==1){
697                    struct stat buf;
698                    char timeStr[ 100 ] = "";
699                    if (stat(sid->value, &buf)==0){
700                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
701                      json_object *jsStr=json_object_new_string(timeStr);
702                      json_object_object_add(res2,"creation_date",jsStr);
703                    }
704                  }
705                }
706              }       
707            }
708          }
709          if(bypass<0)
710            while(curs0!=NULL){
711              json_object *res3=json_object_new_object();
712              int i0=0;
713              int hasRef0=-1;
714              for(;i0<10;i0++){
715                sid=getMap(curs0->content,keys[i0][0]);
716                if(sid!=NULL){
717                  json_object *jsStr=json_object_new_string(sid->value);
718                  json_object_object_add(res3,keys[i0][1],jsStr);
719                  //if(i0==0)
720                  hasRef0=1;
721                }
722              }
723              if(hasRef0<0)
724                json_object_put(res3);
725              else
726                json_object_object_add(res2,curs0->name,res3);
727              curs0=curs0->next;
728            }
729          json_object_object_add(res1,curs->name,res2);
730        }
731        curs=curs->next;
732      }
733      json_object_object_add(res,"outputs",res1);
734      break;
735    }
736     
737    case 6: {
738      // Finalize HPC
739      char *keys[6][2]={
740        {
741          "SubmitTime",
742          "hpc_submission_date"
743        },
744        {
745          "JobId",
746          "hpc_job_identifier"
747        },
748        {
749          "JobName",
750          "hpc_job_name"
751        },
752        {
753          "StartTime",
754          "hpc_start_date"
755        },
756        {
757          "EndTime",
758          "hpc_end_date"
759        },
760        {
761          "JobState",
762          "hpc_status"
763        }       
764      };
765      int i=0;
766      if(getMaps(conf,"henv")!=NULL){
767        for(i=0;i<6;i++){
768          sid=getMapFromMaps(conf,"henv",keys[i][0]);
769          if(sid!=NULL){
770            json_object *jsStr=json_object_new_string(sid->value);
771            json_object_object_add(res,keys[i][1],jsStr);
772          }
773        }
774      }
775      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
776        json_object *jsStr=json_object_new_string(sid->value);
777        json_object_object_add(res,"hpc_cpu_usage",jsStr);
778      }else{
779        json_object *jsStr=json_object_new_string("1");
780        json_object_object_add(res,"hpc_cpu_usage",jsStr);
781      }
782      json_object *jsStr=json_object_new_string("succeeded");
783      json_object_object_add(res,"wps_status",jsStr);
784      break;
785    }
786     
787    case 7: {
788      // Error or Dismiss
789      sid=getMapFromMaps(conf,"lenv","message");
790      if(sid!=NULL){
791        json_object *jsStr=json_object_new_string(sid->value);
792        json_object_object_add(res,"message",jsStr);
793      }
794      json_object *jsStr;
795      if(state==1)
796        jsStr=json_object_new_string("dismissed");
797      else
798        jsStr=json_object_new_string("failed");
799      json_object_object_add(res,"wps_status",jsStr);
800      break;
801    }
802    others: {
803        break;
804      }
805    }
806
807    if(local_arguments==NULL)
808      local_arguments=(local_params**)malloc(sizeof(local_params*));
809    else
810      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
811    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
812    local_arguments[nbThreads]->conf=conf;
813    local_arguments[nbThreads]->url=url;
814    local_arguments[nbThreads]->res=res;
815    local_arguments[nbThreads]->step=step;
816    local_arguments[nbThreads]->state=state;
817    //pthread_t p1;
818    if(myThreads==NULL)
819      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
820    else
821      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
822    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
823      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
824      return false;
825    }
826    //free(argumentsA);
827    nbThreads++;
828    return true;
829  }
830
831  /**
832   * Wait for the threads to end then, clean used memory.
833   */
834  void cleanupCallbackThreads(){
835    int i=0;
836    for(i=0;i<nbThreads;i++){
837      fprintf(stderr,"%s %d %d \n",__FILE__,__LINE__,i);
838      fflush(stderr);
839      pthread_join(myThreads[i],NULL);
840      free(local_arguments[i]);
841    }
842    free(local_arguments);
843    free(myThreads);
844  }
845
846#ifdef __cplusplus
847}
848#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png