source: branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c @ 860

Last change on this file since 860 was 860, checked in by djay, 6 years ago

Add status_code key to the lenv section to support returning a specific HTTP error code from the service code. Fix callback invocation to support inputs arrays at step 1 and 2. Fix issue with cpu usage. Fix issue with mapserver publication when an input is optional. Fix callback invocation at step 7 in case the service has failed on the HPC side.

  • Property svn:keywords set to Id
File size: 25.5 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_callback.h"
30#include "service_json.h"
31#include "service_internal_ms.h"
32#include "sqlapi.h"
33#include <pthread.h>
34#include <libxml/tree.h>
35#include <libxml/parser.h>
36#include <libxml/xpath.h>
37#include <libxml/xpathInternals.h>
38
39#include <libxslt/xslt.h>
40#include <libxslt/xsltInternals.h>
41#include <libxslt/transform.h>
42#include <libxslt/xsltutils.h>
43
44#ifdef __cplusplus
45extern "C" {
46#endif
47
48  /**
49   * Parameter definition to be used for sending parameters to a thread.
50   */
51  typedef struct {
52    maps *conf;      //!< the main configuration file
53    map *url;        //!< the callback url maps
54    json_object *res;//!< the JSON object to post
55    int step;        //!< the current step [0,6]
56    int state;       //!< the current state [0,1]
57  } local_params;
58
59  /**
60   * Number of threads
61   */
62  int nbThreads=0;
63  /**
64   * Current step
65   */
66  int cStep=0;
67  /**
68   * Is there any ongoing HTTP request
69   */
70  int isOngoing=0;
71  /**
72   * Threads array
73   */
74  pthread_t* myThreads=NULL;
75  /**
76   * Steps array
77   */
78  bool steps[7][2]={
79    {false,false},
80    {false,false},
81    {false,false},
82    {false,false},
83    {false,false},
84    {false,false},
85    {false,false}
86  };
87  /**
88   * Arguments array to give to the _invokeCallback thread's function
89   */
90  local_params** local_arguments;
91 
92  /**
93   * Check if a service name is prohibited, meaning that the Kernel doesn't have
94   * to invoke the callback for this specific service.
95   *
96   * @param conf the main configuration file maps
97   * @param serviceName the serviceName
98   * @return a bool true if the service is prohibited, false in other case
99   */
100  bool isProhibited(maps* conf,const char* serviceName){
101    map* plist=getMapFromMaps(conf,"callback","prohibited");
102    if(plist!=NULL){
103      char *tmp=plist->value;
104      char *tmpS=strtok(tmp,",");
105      while(tmpS!=NULL){
106        if(strcmp(serviceName,tmpS)==0)
107          return true;
108        tmpS=strtok(NULL,",");
109      }
110    }
111    return false;
112  }
113
114
115  /**
116   * Practically invoke the callback, meaning sending the HTTP POST request.
117   *
118   * @param args local_params containing all the variables required
119   */
120  void* _invokeCallback(void* args){
121    local_params* arg=(local_params*)args;
122    HINTERNET hInternet,res1;
123    hInternet=InternetOpen("ZooWPSClient\0",
124                           INTERNET_OPEN_TYPE_PRECONFIG,
125                           NULL,NULL, 0);
126    if(!CHECK_INET_HANDLE(hInternet)){
127      InternetCloseHandle (&hInternet);
128      return false;
129    }
130    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
131    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
132    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
133    hInternet.waitingRequests[0] = zStrdup(URL);
134    free(URL);
135#ifdef CALLBACK_DEBUG
136    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
137    fprintf(stderr," * JSON: [%s] \n",jsonStr);
138    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
139    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
140#endif
141    while( (arg->step!=7 || isOngoing>0) &&
142           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
143           ){
144      zSleep(100);
145    }
146    isOngoing=1;
147#ifdef CALLBACK_DEBUG
148    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
149    int i=0;
150    for(i=0;i<7;i++){
151      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
152    }
153#endif
154    const struct tm *tm;
155    size_t len;
156    time_t now;
157    char *tmp1;
158    map *tmpStatus;
159   
160    now = time ( NULL );
161    tm = localtime ( &now );
162   
163    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
164    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
165
166#ifdef CALLBACK_DEBUG   
167    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
168    fflush(stderr);
169#endif   
170    free(tmp1);
171    res1 = InternetOpenUrl (&hInternet,
172                            hInternet.waitingRequests[0], 
173                            (char*)jsonStr, strlen(jsonStr),
174                            INTERNET_FLAG_NO_CACHE_WRITE,
175                            0);
176    AddHeaderEntries(&hInternet,arg->conf);
177    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
178    processDownloads(&hInternet);
179    now = time ( NULL );
180    tm = localtime ( &now );
181    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
182    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
183   
184#ifdef CALLBACK_DEBUG   
185    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
186#endif   
187    free(tmp1);
188    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
189                                 * sizeof (char));
190    if (tmp == NULL)
191      {
192        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
193        setMapInMaps(arg->conf,"lenv","code","InternalError");
194        return NULL;
195      }
196    size_t bRead;
197    InternetReadFile (hInternet.ihandle[0],
198                      (LPVOID) tmp,
199                      hInternet.
200                      ihandle[0].nDataLen,
201                      &bRead);
202    tmp[hInternet.ihandle[0].nDataLen] = 0;
203    json_object_put(arg->res);
204    InternetCloseHandle(&hInternet);
205    isOngoing=0;
206    if(cStep==0 || cStep==6 || arg->state==1)
207      cStep=arg->step+1;
208#ifdef CALLBACK_DEBUG   
209    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT\n",pthread_self(),__FILE__,__LINE__);
210    for(i=0;i<7;i++){
211      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
212    }
213    fprintf(stderr,"Result: \n%s\n\n",tmp);
214    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
215    fflush(stderr);
216#endif
217    steps[arg->step][arg->state]=true;
218    free(tmp);
219    //free(args);
220    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
221    fflush(stderr);
222    pthread_exit(NULL);
223  }
224 
225  /**
226   * Invoke the callback in case there is a [callback] section containing a url parameter
227   *
228   * @param m the maps containing the main configuration file definitions
229   * @param inputs the inputs defined in the request (can be null if not yet initialized)
230   * @param inputs the outputs provided in the request (can be null if not yet initialized)
231   * @param step the step number, steps are defined as:
232   *  0: Analyze creation
233   *  1: Fetching Data Inputs
234   *  2: Uploading data inputs to cluster
235   *  3: Creating Job Script
236   *  4: Submitting Job to Cluster
237   *  5: Downloading processed output from cluster
238   *  6: Finalize
239   *  7: Dismiss or Error
240   * @param state 0 in case the step starts, 1 when it ends
241   * @return bool true in case of success, false in other cases
242   */
243  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
244    map* url=getMapFromMaps(conf,"callback","url");
245    if(url==NULL)
246      return false;
247     
248    maps* lenv=getMaps(conf,"lenv");
249    map* sname=getMap(lenv->content,"identifier");
250    if(sname!=NULL && isProhibited(conf,sname->value))
251      return false;
252     
253    json_object *res=json_object_new_object();
254
255    map* sid=getMapFromMaps(conf,"lenv","usid");
256    if(sid!=NULL){
257      json_object *jsStr=json_object_new_string(sid->value);
258      json_object_object_add(res,"jobid",jsStr);
259    }
260    const struct tm *tm;
261    size_t len;
262    time_t now;
263    char *tmp1;
264    map *tmpStatus;
265 
266    now = time ( NULL );
267    tm = localtime ( &now );
268
269    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
270    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
271    json_object *jsStr0=json_object_new_string(tmp1);
272    json_object_object_add(res,"datetime",jsStr0);
273    free(tmp1);
274   
275    switch(step){
276    case 0: {
277      // Create a new analyze
278      maps* lenv=getMaps(conf,"lenv");
279      sid=getMapFromMaps(conf,"renv","xrequest");
280      if(sid!=NULL){
281        json_object *jsStr=json_object_new_string(sid->value);
282        json_object_object_add(res,"request_execute_content",jsStr);
283      }
284      sid=getMapFromMaps(conf,"lenv","identifier");
285      if(sid!=NULL){
286        json_object *jsStr=json_object_new_string(sid->value);
287        json_object_object_add(res,"process_identifier",jsStr);
288      }
289      // Save the Execute request on disk
290      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
291      map* req=getMapFromMaps(conf,"renv","xrequest");
292      sid=getMapFromMaps(conf,"lenv","usid");
293      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
294      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
295      FILE* saveExecute=fopen(executePath,"wb");
296      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
297      fflush(saveExecute);
298      fclose(saveExecute);
299      setMapInMaps(conf,"lenv","execute_file",executePath);
300      free(executePath);
301      break;
302    }
303     
304    case 1: {
305      // Update the execute request stored on disk at step 0,0 to modify the references used.
306      if(state==1){
307        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
308        fflush(stderr);
309        maps* curs=inputs;
310        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
311        fflush(stderr);
312        xmlInitParser();
313        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
314        fflush(stderr);
315        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
316        dumpMap(xmlPath);
317        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
318        fflush(stderr);
319        while(curs!=NULL){
320          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
321          fflush(stderr);
322          dumpMap(curs->content);
323          //map* bvMap=getMap(curs->content,"byValue");
324          // TODO handle mapArray
325          //if(bvMap!=NULL && strncasecmp(bvMap->value,"true",4)==0){
326          if(getMap(curs->content,"href")==NULL && getMap(curs->content,"mimeType")!=NULL){
327            map* tmpMap=getMap(curs->content,"value");
328            char tmpStr[100];
329            sprintf(tmpStr,"%d",strlen(tmpMap->value));
330            addToMap(curs->content,"size",tmpStr);
331            tmpMap=getMap(curs->content,"mimeType");
332            addToMap(curs->content,"fmimeType",tmpMap->value);
333            tmpMap=getMap(curs->content,"cache_file");
334            addToMap(curs->content,"generated_file",tmpMap->value);
335            addToMap(curs->content,"storage",tmpMap->value);
336            setReferenceUrl(conf,curs);
337            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
338            fflush(stderr);
339            dumpMap(curs->content);
340            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
341            fflush(stderr);
342            const char *params[5];
343            int xmlLoadExtDtdDefaultValue;
344            int hasFile=-1;
345            map* xslPath=getMapFromMaps(conf,"callback","template");
346            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
347            fflush(stderr);
348            dumpMap(xslPath);
349            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
350            fflush(stderr);
351            map* filePath=getMap(curs->content,"ref_wfs_link");
352            if(filePath==NULL)
353              filePath=getMap(curs->content,"ref_wcs_link");
354            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
355            fflush(stderr);
356            dumpMap(filePath);
357            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
358            fflush(stderr);
359            char* inputName=curs->name;
360            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
361            fflush(stderr);
362            if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
363              break;
364            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
365            fflush(stderr);
366            char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
367            char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
368            addToMap(curs->content,"href",filePath->value);
369            addToMap(curs->content,"xlink:href",filePath->value);
370            sprintf(tmpParam,"string(\"%s\")",curs->name);
371            sprintf(tmpParam1,"string(\"%s\")",filePath->value);
372            params[0]="attr";
373            params[1]=tmpParam;
374            params[2]="value";
375            params[3]=tmpParam1;//filePath->value;
376            params[4]=NULL;
377            fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s \n",
378                    tmpParam,tmpParam1);
379            xmlSubstituteEntitiesDefault(1);
380            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
381            fflush(stderr);
382            xmlLoadExtDtdDefaultValue = 0;
383            xsltStylesheetPtr cur = NULL;
384            xmlDocPtr doc, res;
385            cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
386            doc = xmlParseFile(xmlPath->value);
387            fflush(stderr);
388            res = xsltApplyStylesheet(cur, doc, params);
389            xmlChar *xmlbuff;
390            int buffersize;
391            xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
392            // Store the executeRequest in file again
393            free(tmpParam);
394            free(tmpParam1);
395            fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
396            fflush(stderr);
397            FILE* saveExecute=fopen(xmlPath->value,"wb");
398            fwrite(xmlbuff,1,buffersize,saveExecute);
399            fflush(saveExecute);
400            fclose(saveExecute);
401            xmlFree(xmlbuff);
402            xmlFreeDoc(doc);
403            xsltFreeStylesheet(cur);
404          }
405          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
406          fflush(stderr);
407          curs=curs->next;
408        }
409        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
410        fflush(stderr);
411        xmlCleanupParser();
412        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
413        fflush(stderr);
414        FILE* f0=fopen(xmlPath->value,"rb");
415        if(f0!=NULL){
416          long flen;
417          char *fcontent;
418          fseek (f0, 0, SEEK_END);
419          flen = ftell (f0);
420          fseek (f0, 0, SEEK_SET);
421          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
422          fread(fcontent,flen,1,f0);
423          fcontent[flen]=0;
424          fclose(f0);
425          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
426          fflush(stderr);
427          map *schema=getMapFromMaps(conf,"database","schema");
428          map* sid=getMapFromMaps(conf,"lenv","usid");
429          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
430          fflush(stderr);
431          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
432          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
433          fflush(stderr);
434          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
435          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
436          fflush(stderr);
437          execSql(conf,1,req);
438          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
439          fflush(stderr);
440          free(fcontent);
441          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
442          fflush(stderr);
443          free(req);
444          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
445          fflush(stderr);
446        }
447      }
448
449      // Fetching data inputs
450      maps* curs=inputs;
451      dumpMaps(curs);
452      char *keys[11][2]={
453        {
454          "xlink:href",
455          "ref"
456        },
457        {
458          "cache_file",
459          "cachefile"
460        },
461        {
462          "fmimeType",
463          "mimetype"
464        },
465        {
466          "size",
467          "size"
468        },
469        {
470          "ref_wms_link",
471          "ref_wms_link"
472        },
473        {
474          "ref_wfs_link",
475          "ref_wfs_link"
476        },
477        {
478          "ref_wcs_link",
479          "ref_wcs_link"
480        },
481        {
482          "ref_wcs_link",
483          "ref_wcs_link"
484        },
485        {
486          "ref_wcs_preview_link",
487          "ref_wcs_preview_link"
488        },
489        {
490          "geodatatype",
491          "datatype"
492        },
493        {
494          "wgs84_extent",
495          "boundingbox"
496        }       
497      };
498      json_object *res1=json_object_new_object();
499      while(curs!=NULL){
500        if(getMap(curs->content,"length")==NULL){
501          addToMap(curs->content,"length","1");
502        }
503        map* length=getMap(curs->content,"length");
504        int len=atoi(length->value);
505        json_object *res3;
506        int hasRef=-1;
507        for(int ii=0;ii<len;ii++){
508          map* tmpMap=getMapArray(curs->content,"cache_file",ii);
509          sid=getMapArray(curs->content,"ref_wms_link",ii);
510          json_object *res2=json_object_new_object();
511          if(tmpMap!=NULL){
512            if(sid==NULL){
513              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
514              setMapArray(curs->content,"storage",ii,tmpMap->value);
515            }
516            fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
517            dumpMap(curs->content);
518            struct stat buf;
519            char timeStr[ 100 ] = "";
520            if (stat(tmpMap->value, &buf)==0){
521              strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
522              json_object *jsStr=json_object_new_string(timeStr);
523              json_object_object_add(res2,"creation_date",jsStr);
524            }
525            tmpMap=getMapArray(curs->content,"fmimeType",ii);
526            if(tmpMap!=NULL){
527              setMapArray(curs->content,"mimeType",ii,tmpMap->value);
528            }
529            setReferenceUrl(conf,curs);
530          }else{         
531          }
532          addIntToMap(curs->content,"published_id",ii+1);
533          int i=0;
534          for(;i<11;i++){
535            sid=getMapArray(curs->content,keys[i][0],ii);
536            if(sid!=NULL){
537              json_object *jsStr=json_object_new_string(sid->value);
538              json_object_object_add(res2,keys[i][1],jsStr);
539              if(i==0){
540                hasRef=1;
541                json_object *jsStr1=json_object_new_string(getProvenance(conf,sid->value));
542                json_object_object_add(res2,"dataOrigin",jsStr1);
543              }
544            }
545          }
546          if(len>1){
547            if(ii==0)
548              res3=json_object_new_array();
549            json_object_array_add(res3,res2);
550          }else
551            res3=res2;
552        }
553        if(hasRef<0)
554          json_object_put(res3);
555        else
556          json_object_object_add(res1,curs->name,res3);
557        addIntToMap(curs->content,"published_id",0);
558        curs=curs->next;
559      }
560      json_object_object_add(res,"inputs",res1);
561      break;
562    }
563     
564    case 2: {
565      // Uploading data input to cluster
566      maps* in=getMaps(conf,"uploadQueue");
567      if(in!=NULL){
568        maps* curs=in;
569        map* length=getMapFromMaps(in,"uploadQueue","length");
570        if(length!=NULL){
571          json_object *res1=json_object_new_object();
572          json_object *res3=json_object_new_array();
573          int limit=atoi(length->value);
574          int i=0;
575          maps* uploadQueue=getMaps(in,"uploadQueue");
576          map* tmp=uploadQueue->content;
577          for(;i<limit;i++){
578            map* tmp0=getMapArray(tmp,"input",i);
579            map* tmp1=getMapArray(tmp,"localPath",i);
580            map* tmp2=getMapArray(tmp,"targetPath",i);
581            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
582              json_object *res2=json_object_new_object();
583              json_object *jsStr=json_object_new_string(tmp1->value);
584              json_object_object_add(res2,"local_path",jsStr);
585              jsStr=json_object_new_string(tmp2->value);
586              json_object_object_add(res2,"target_path",jsStr);
587              json_object *res4=json_object_object_get(res1,tmp0->value);
588              if(json_object_is_type(res4,json_type_null)){
589                json_object_object_add(res1,tmp0->value,res2);
590              }else{
591                if(json_object_is_type(res4,json_type_object)){
592                  json_object_array_add(res3,res4);
593                }
594                json_object_array_add(res3,res2);
595                if(json_object_is_type(res4,json_type_object)){
596                  json_object_object_del(res1,tmp0->value);
597                  json_object_object_add(res1,tmp0->value,res3);
598                }
599              }
600            }
601          }
602          if(json_object_array_length(res3)==0)
603            json_object_put(res3);
604          json_object_object_add(res,"inputs",res1);
605        }
606        //json_object_object_add(res,"inputs",in);
607      }
608      break;
609    }
610     
611    case 3: {
612      // Generating job script
613      sid=getMapFromMaps(conf,"lenv","local_script");
614      if(sid!=NULL){
615        json_object *jsStr=json_object_new_string(sid->value);
616        json_object_object_add(res,"script",jsStr);
617      }
618      break;
619    }
620     
621    case 4: {
622      // Submitting job to cluster
623      sid=getMapFromMaps(conf,"lenv","remote_script");
624      if(sid!=NULL){
625        json_object *jsStr=json_object_new_string(sid->value);
626        json_object_object_add(res,"script",jsStr);
627      }
628      break;
629    }
630     
631    case 5: {
632      // Downloading process outputs from cluster
633      //json_object* in=mapsToJson(outputs);
634      maps* curs=outputs;
635      dumpMaps(curs);
636      char *keys[10][2]={
637        {
638          "Reference",
639          "ref"
640        },
641        {
642          "generated_file",
643          "cachefile"
644        },
645        {
646          "mimeType",
647          "mimetype"
648        },
649        {
650          "size",
651          "size"
652        },
653        {
654          "geodatatype",
655          "datatype"
656        },
657        {
658          "wgs84_extent",
659          "boundingbox"
660        },
661        {
662          "ref_wms_link",
663          "ref_wms_link"
664        },
665        {
666          "ref_wcs_link",
667          "ref_wcs_link"
668        },
669        {
670          "ref_wcs_preview_link",
671          "ref_wcs_preview_link"
672        },
673        {
674          "ref_wfs_link",
675          "ref_wfs_link"
676        }       
677      };
678      char* specifics[5][2]={
679        {
680          "download_link",
681          "ref_download_link"
682        },
683        {
684          "wms_link",
685          "ref_wms_link"
686        },
687        {
688          "wfs_link",
689          "ref_wfs_link"
690        },
691        {
692          "wcs_link",
693          "ref_wcs_link"
694        },
695        {
696          "wcs_link",
697          "ref_wcs_preview_link"
698        }
699      };
700      json_object *res1=json_object_new_object();
701      while(curs!=NULL){       
702        json_object *res2=json_object_new_object();
703        int i=0;
704        int hasRef=-1;
705        for(;i<10;i++){
706          sid=getMap(curs->content,keys[i][0]);
707          if(sid!=NULL){
708            json_object *jsStr=json_object_new_string(sid->value);
709            json_object_object_add(res2,keys[i][1],jsStr);
710            if(i==0)
711              hasRef=1;
712          }
713        }
714        if(hasRef>0)
715          json_object_object_add(res1,curs->name,res2);
716        else{
717          maps* curs0=curs->child;
718          int i=0;
719          int bypass=-1;
720          for(i=0;i<5;i++){
721            maps* specificMaps;
722            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
723              int hasRef0=-1;
724              int i0=0;
725              for(;i0<6;i0++){
726                if(i0==0)
727                  sid=getMap(specificMaps->content,specifics[i][1]);
728                else
729                  sid=getMap(specificMaps->content,keys[i0][0]);
730                if(sid!=NULL){
731                  json_object *jsStr=json_object_new_string(sid->value);
732                  if(i0==0){
733                    json_object_object_add(res2,specifics[i][1],jsStr);
734                  }
735                  else
736                    json_object_object_add(res2,keys[i0][1],jsStr);
737                  hasRef0=1;
738                  bypass=1;
739                  if(i==1){
740                    struct stat buf;
741                    char timeStr[ 100 ] = "";
742                    if (stat(sid->value, &buf)==0){
743                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
744                      json_object *jsStr=json_object_new_string(timeStr);
745                      json_object_object_add(res2,"creation_date",jsStr);
746                    }
747                  }
748                }
749              }       
750            }
751          }
752          if(bypass<0)
753            while(curs0!=NULL){
754              json_object *res3=json_object_new_object();
755              int i0=0;
756              int hasRef0=-1;
757              for(;i0<10;i0++){
758                sid=getMap(curs0->content,keys[i0][0]);
759                if(sid!=NULL){
760                  json_object *jsStr=json_object_new_string(sid->value);
761                  json_object_object_add(res3,keys[i0][1],jsStr);
762                  //if(i0==0)
763                  hasRef0=1;
764                }
765              }
766              if(hasRef0<0)
767                json_object_put(res3);
768              else
769                json_object_object_add(res2,curs0->name,res3);
770              curs0=curs0->next;
771            }
772          json_object_object_add(res1,curs->name,res2);
773        }
774        curs=curs->next;
775      }
776      json_object_object_add(res,"outputs",res1);
777      break;
778    }
779     
780    case 6: {
781      // Finalize HPC
782      char *keys[6][2]={
783        {
784          "SubmitTime",
785          "hpc_submission_date"
786        },
787        {
788          "JobId",
789          "hpc_job_identifier"
790        },
791        {
792          "JobName",
793          "hpc_job_name"
794        },
795        {
796          "StartTime",
797          "hpc_start_date"
798        },
799        {
800          "EndTime",
801          "hpc_end_date"
802        },
803        {
804          "JobState",
805          "hpc_status"
806        }       
807      };
808      int i=0;
809      if(getMaps(conf,"henv")!=NULL){
810        for(i=0;i<6;i++){
811          sid=getMapFromMaps(conf,"henv",keys[i][0]);
812          if(sid!=NULL){
813            json_object *jsStr=json_object_new_string(sid->value);
814            json_object_object_add(res,keys[i][1],jsStr);
815          }
816        }
817      }
818      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
819        json_object *jsStr=json_object_new_string(sid->value);
820        json_object_object_add(res,"hpc_cpu_usage",jsStr);
821      }else{
822        json_object *jsStr=json_object_new_string("1");
823        json_object_object_add(res,"hpc_cpu_usage",jsStr);
824      }
825      json_object *jsStr=json_object_new_string("succeeded");
826      json_object_object_add(res,"wps_status",jsStr);
827      break;
828    }
829     
830    case 7: {
831      // Error or Dismiss
832      sid=getMapFromMaps(conf,"lenv","message");
833      if(sid!=NULL){
834        json_object *jsStr=json_object_new_string(sid->value);
835        json_object_object_add(res,"message",jsStr);
836      }
837      json_object *jsStr;
838      if(state==1)
839        jsStr=json_object_new_string("dismissed");
840      else
841        jsStr=json_object_new_string("failed");
842      json_object_object_add(res,"wps_status",jsStr);
843      break;
844    }
845    others: {
846        break;
847      }
848    }
849
850    if(local_arguments==NULL)
851      local_arguments=(local_params**)malloc(sizeof(local_params*));
852    else
853      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
854    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
855    local_arguments[nbThreads]->conf=conf;
856    local_arguments[nbThreads]->url=url;
857    local_arguments[nbThreads]->res=res;
858    local_arguments[nbThreads]->step=step;
859    local_arguments[nbThreads]->state=state;
860    //pthread_t p1;
861    if(myThreads==NULL)
862      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
863    else
864      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
865    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
866      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
867      return false;
868    }
869    //free(argumentsA);
870    nbThreads++;
871    return true;
872  }
873
874  /**
875   * Wait for the threads to end then, clean used memory.
876   */
877  void cleanupCallbackThreads(){
878    int i=0;
879    for(i=0;i<nbThreads;i++){
880      fprintf(stderr,"%s %d %d \n",__FILE__,__LINE__,i);
881      fflush(stderr);
882      pthread_join(myThreads[i],NULL);
883      free(local_arguments[i]);
884    }
885    free(local_arguments);
886    free(myThreads);
887  }
888
889#ifdef __cplusplus
890}
891#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png