source: branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c @ 886

Last change on this file since 886 was 886, checked in by djay, 6 years ago

Check for md5sum of any file in the cache to avoid sending the same file on HPC server twice. Add the multiple LiteralData? inputs support for HPC services. Remove condition for defining SCALE for every band in the outputed MapServer? mapfile.

  • Property svn:keywords set to Id
File size: 25.4 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_callback.h"
30#include "service_json.h"
31#include "service_internal_ms.h"
32#include "sqlapi.h"
33#include <pthread.h>
34#include <libxml/tree.h>
35#include <libxml/parser.h>
36#include <libxml/xpath.h>
37#include <libxml/xpathInternals.h>
38
39#include <libxslt/xslt.h>
40#include <libxslt/xsltInternals.h>
41#include <libxslt/transform.h>
42#include <libxslt/xsltutils.h>
43
44#ifdef __cplusplus
45extern "C" {
46#endif
47
48  /**
49   * Parameter definition to be used for sending parameters to a thread.
50   */
51  typedef struct {
52    maps *conf;      //!< the main configuration file
53    map *url;        //!< the callback url maps
54    json_object *res;//!< the JSON object to post
55    int step;        //!< the current step [0,6]
56    int state;       //!< the current state [0,1]
57  } local_params;
58
59  /**
60   * Number of threads
61   */
62  int nbThreads=0;
63  /**
64   * Current step
65   */
66  int cStep=0;
67  /**
68   * Is there any ongoing HTTP request
69   */
70  int isOngoing=0;
71  /**
72   * Threads array
73   */
74  pthread_t* myThreads=NULL;
75  /**
76   * Steps array
77   */
78  bool steps[7][2]={
79    {false,false},
80    {false,false},
81    {false,false},
82    {false,false},
83    {false,false},
84    {false,false},
85    {false,false}
86  };
87  /**
88   * Arguments array to give to the _invokeCallback thread's function
89   */
90  local_params** local_arguments;
91 
92  /**
93   * Check if a service name is prohibited, meaning that the Kernel doesn't have
94   * to invoke the callback for this specific service.
95   *
96   * @param conf the main configuration file maps
97   * @param serviceName the serviceName
98   * @return a bool true if the service is prohibited, false in other case
99   */
100  bool isProhibited(maps* conf,const char* serviceName){
101    map* plist=getMapFromMaps(conf,"callback","prohibited");
102    if(plist!=NULL){
103      char *tmp=plist->value;
104      char *tmpS=strtok(tmp,",");
105      while(tmpS!=NULL){
106        if(strcmp(serviceName,tmpS)==0)
107          return true;
108        tmpS=strtok(NULL,",");
109      }
110    }
111    return false;
112  }
113
114  /**
115   * Practically invoke the callback, meaning sending the HTTP POST request.
116   *
117   * @param args local_params containing all the variables required
118   */
119  void* _invokeCallback(void* args){
120    local_params* arg=(local_params*)args;
121    HINTERNET hInternet,res1;
122    const struct tm *tm;
123    size_t len;
124    time_t now;
125    char *tmp1;
126    map *tmpStatus;
127    maps* tmpConf=createMaps("main");
128    tmpConf->content=createMap("memory","load");
129    hInternet=InternetOpen("ZooWPSClient\0",
130                           INTERNET_OPEN_TYPE_PRECONFIG,
131                           NULL,NULL, 0);
132    if(!CHECK_INET_HANDLE(hInternet)){
133      InternetCloseHandle (&hInternet);
134      return false;
135    }
136    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
137    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
138    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
139    hInternet.waitingRequests[0] = zStrdup(URL);
140    free(URL);
141#ifdef CALLBACK_DEBUG
142    now = time ( NULL );
143    tm = localtime ( &now );
144    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
145    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
146    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
147    fprintf(stderr," * JSON: [%s] \n",jsonStr);
148    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
149    fprintf(stderr," * DATE: %s/ \n\n",tmp1);
150    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
151    free(tmp1);
152#endif
153    while( (arg->step!=7 || isOngoing>0) &&
154           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
155           ){
156      zSleep(100);
157    }
158    isOngoing=1;
159#ifdef CALLBACK_DEBUG
160    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
161    int i=0;
162    for(i=0;i<7;i++){
163      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
164    }
165#endif
166   
167    now = time ( NULL );
168    tm = localtime ( &now );
169   
170    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
171    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
172
173#ifdef CALLBACK_DEBUG   
174    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
175    fflush(stderr);
176#endif   
177    free(tmp1);
178    res1 = InternetOpenUrl (&hInternet,
179                            hInternet.waitingRequests[0], 
180                            (char*)jsonStr, strlen(jsonStr),
181                            INTERNET_FLAG_NO_CACHE_WRITE,
182                            0,tmpConf);
183    AddHeaderEntries(&hInternet,arg->conf);
184    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
185    processDownloads(&hInternet);
186    freeMaps(&tmpConf);
187    free(tmpConf);
188    now = time ( NULL );
189    tm = localtime ( &now );
190    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
191    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
192   
193#ifdef CALLBACK_DEBUG   
194    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
195#endif
196    free(tmp1);
197    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
198                                 * sizeof (char));
199    if (tmp == NULL)
200      {
201        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
202        setMapInMaps(arg->conf,"lenv","code","InternalError");
203        return NULL;
204      }
205    size_t bRead;
206    InternetReadFile (hInternet.ihandle[0],
207                      (LPVOID) tmp,
208                      hInternet.
209                      ihandle[0].nDataLen,
210                      &bRead);
211    tmp[hInternet.ihandle[0].nDataLen] = 0;
212    json_object_put(arg->res);
213    InternetCloseHandle(&hInternet);
214    isOngoing=0;
215    if(cStep==0 || cStep==6 || arg->state==1)
216      cStep=arg->step+1;
217#ifdef CALLBACK_DEBUG
218    now = time ( NULL );
219    tm = localtime ( &now );
220    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
221    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
222    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
223    for(i=0;i<7;i++){
224      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
225    }
226    fprintf(stderr,"Result: \n%s\n\n",tmp);
227    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
228    fflush(stderr);
229    free(tmp1);
230#endif
231    steps[arg->step][arg->state]=true;
232    free(tmp);
233#ifdef CALLBACK_DEBUG
234    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
235    fflush(stderr);
236#endif
237    pthread_exit(NULL);
238  }
239 
240  /**
241   * Invoke the callback in case there is a [callback] section containing a url parameter
242   *
243   * @param m the maps containing the main configuration file definitions
244   * @param inputs the inputs defined in the request (can be null if not yet initialized)
245   * @param inputs the outputs provided in the request (can be null if not yet initialized)
246   * @param step the step number, steps are defined as:
247   *  0: Analyze creation
248   *  1: Fetching Data Inputs
249   *  2: Uploading data inputs to cluster
250   *  3: Creating Job Script
251   *  4: Submitting Job to Cluster
252   *  5: Downloading processed output from cluster
253   *  6: Finalize
254   *  7: Dismiss or Error
255   * @param state 0 in case the step starts, 1 when it ends
256   * @return bool true in case of success, false in other cases
257   */
258  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
259    map* url=getMapFromMaps(conf,"callback","url");
260    if(url==NULL)
261      return false;
262     
263    maps* lenv=getMaps(conf,"lenv");
264    map* sname=getMap(lenv->content,"identifier");
265    if(sname!=NULL && isProhibited(conf,sname->value))
266      return false;
267     
268    json_object *res=json_object_new_object();
269
270    map* sid=getMapFromMaps(conf,"lenv","usid");
271    if(sid!=NULL){
272      json_object *jsStr=json_object_new_string(sid->value);
273      json_object_object_add(res,"jobid",jsStr);
274    }
275    const struct tm *tm;
276    size_t len;
277    time_t now;
278    char *tmp1;
279    map *tmpStatus;
280 
281    now = time ( NULL );
282    tm = localtime ( &now );
283
284    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
285    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
286    json_object *jsStr0=json_object_new_string(tmp1);
287    json_object_object_add(res,"datetime",jsStr0);
288    free(tmp1);
289   
290    switch(step){
291    case 0: {
292      // Create a new analyze
293      maps* lenv=getMaps(conf,"lenv");
294      sid=getMapFromMaps(conf,"renv","xrequest");
295      if(sid!=NULL){
296        json_object *jsStr=json_object_new_string(sid->value);
297        json_object_object_add(res,"request_execute_content",jsStr);
298      }
299      sid=getMapFromMaps(conf,"lenv","identifier");
300      if(sid!=NULL){
301        json_object *jsStr=json_object_new_string(sid->value);
302        json_object_object_add(res,"process_identifier",jsStr);
303      }
304      // Save the Execute request on disk
305      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
306      map* req=getMapFromMaps(conf,"renv","xrequest");
307      sid=getMapFromMaps(conf,"lenv","usid");
308      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
309      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
310      FILE* saveExecute=fopen(executePath,"wb");
311      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
312      fflush(saveExecute);
313      fclose(saveExecute);
314      setMapInMaps(conf,"lenv","execute_file",executePath);
315      free(executePath);
316      break;
317    }
318     
319    case 1: {
320      // Update the execute request stored on disk at step 0,0 to modify the references used.
321      if(state==1){
322        maps* curs=inputs;
323        xmlInitParser();
324        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
325        while(curs!=NULL){
326          map* length=getMap(curs->content,"length");
327          map* useMS=getMap(curs->content,"useMapserver");
328          if(length==NULL){
329            addToMap(curs->content,"length","1");
330            length=getMap(curs->content,"length");
331          }
332          int len=atoi(length->value);
333          for(int ii=0;ii<len;ii++){
334            if(getMapArray(curs->content,"byValue",ii)!=NULL && getMapArray(curs->content,"mimeType",ii)!=NULL && useMS!=NULL && strncasecmp(useMS->value,"true",4)==0){
335              map* tmpMap=getMapArray(curs->content,"value",ii);
336              char tmpStr[100];
337              sprintf(tmpStr,"%d",strlen(tmpMap->value));
338              setMapArray(curs->content,"size",ii,tmpStr);
339              tmpMap=getMapArray(curs->content,"mimeType",ii);
340              setMapArray(curs->content,"fmimeType",ii,tmpMap->value);
341              tmpMap=getMapArray(curs->content,"cache_file",ii);
342              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
343              setMapArray(curs->content,"storage",ii,tmpMap->value);
344              setReferenceUrl(conf,curs);
345              addIntToMap(curs->content,"published_id",ii+1);
346              const char *params[7];
347              int xmlLoadExtDtdDefaultValue;
348              int hasFile=-1;
349              map* xslPath=getMapFromMaps(conf,"callback","template");
350              map* filePath=getMapArray(curs->content,"ref_wfs_link",ii);
351              if(filePath==NULL)
352                filePath=getMap(curs->content,"ref_wcs_link");
353              char* inputName=curs->name;
354              if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
355                break;
356              char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
357              char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
358              char tmpParam2[16];
359              sprintf(tmpParam2,"string(\"%d\")",ii);
360              setMapArray(curs->content,"href",ii,filePath->value);
361              setMapArray(curs->content,"xlink:href",ii,filePath->value);
362              tmpMap=getMapArray(curs->content,"cache_url",ii);
363              if(tmpMap!=NULL)
364                setMapArray(curs->content,"xlink:href",ii,tmpMap->value);
365              else
366                setMapArray(curs->content,"xlink:href",ii,filePath->value);
367              sprintf(tmpParam,"string(\"%s\")",curs->name);
368              sprintf(tmpParam1,"string(\"%s\")",filePath->value);
369              sprintf(tmpParam2,"string(\"%d\")",ii);
370              params[0]="attr";
371              params[1]=tmpParam;
372              params[2]="value";
373              params[3]=tmpParam1;//filePath->value;
374              params[4]="cnt";
375              params[5]=tmpParam2;
376              params[6]=NULL;
377              fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s INDEX: %s\n",
378                      tmpParam,tmpParam1,tmpParam2);
379              fflush(stderr);
380              xmlSubstituteEntitiesDefault(1);
381              xmlLoadExtDtdDefaultValue = 0;
382              xsltStylesheetPtr cur = NULL;
383              xmlDocPtr doc, res;
384              cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
385              doc = xmlParseFile(xmlPath->value);
386              fflush(stderr);
387              res = xsltApplyStylesheet(cur, doc, params);
388              xmlChar *xmlbuff;
389              int buffersize;
390              xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
391              // Store the executeRequest in file again
392              free(tmpParam);
393              free(tmpParam1);
394              fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
395              fflush(stderr);
396              FILE* saveExecute=fopen(xmlPath->value,"wb");
397              if(saveExecute!=NULL){
398                fwrite(xmlbuff,1,buffersize,saveExecute);
399                fflush(saveExecute);
400                fclose(saveExecute);
401              }
402              xmlFree(xmlbuff);
403              xmlFreeDoc(doc);
404              xsltFreeStylesheet(cur);
405            }
406          }
407          addIntToMap(curs->content,"published_id",0);
408          curs=curs->next;
409        }
410        xmlCleanupParser();
411        FILE* f0=fopen(xmlPath->value,"rb");
412        if(f0!=NULL){
413          long flen;
414          char *fcontent;
415          fseek (f0, 0, SEEK_END);
416          flen = ftell (f0);
417          fseek (f0, 0, SEEK_SET);
418          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
419          fread(fcontent,flen,1,f0);
420          fcontent[flen]=0;
421          fclose(f0);
422          map *schema=getMapFromMaps(conf,"database","schema");
423          map* sid=getMapFromMaps(conf,"lenv","usid");
424          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
425          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
426#ifdef RELY_ON_DB
427          execSql(conf,1,req);
428#endif
429          free(fcontent);
430          free(req);
431        }
432      }
433
434      // Fetching data inputs
435      maps* curs=inputs;
436      dumpMaps(curs);
437      char *keys[11][2]={
438        {
439          "xlink:href",
440          "ref_download_link"
441        },
442        {
443          "cache_file",
444          "cachefile"
445        },
446        {
447          "fmimeType",
448          "mimetype"
449        },
450        {
451          "size",
452          "size"
453        },
454        {
455          "ref_wms_link",
456          "ref_wms_link"
457        },
458        {
459          "ref_wfs_link",
460          "ref_wfs_link"
461        },
462        {
463          "ref_wcs_link",
464          "ref_wcs_link"
465        },
466        {
467          "ref_wcs_link",
468          "ref_wcs_link"
469        },
470        {
471          "ref_wcs_preview_link",
472          "ref_wcs_preview_link"
473        },
474        {
475          "geodatatype",
476          "datatype"
477        },
478        {
479          "wgs84_extent",
480          "boundingbox"
481        }       
482      };
483      json_object *res1=json_object_new_object();
484      while(curs!=NULL){
485        if(getMap(curs->content,"length")==NULL){
486          addToMap(curs->content,"length","1");
487        }
488        map* length=getMap(curs->content,"length");
489        int len=atoi(length->value);
490        json_object *res3;
491        int hasRef=-1;
492        for(int ii=0;ii<len;ii++){
493          map* tmpMap=getMapArray(curs->content,"cache_file",ii);
494          sid=getMapArray(curs->content,"ref_wms_link",ii);
495          json_object *res2=json_object_new_object();
496          if(tmpMap!=NULL){
497            if(sid==NULL){
498              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
499              setMapArray(curs->content,"storage",ii,tmpMap->value);
500            }
501            struct stat buf;
502            char timeStr[ 100 ] = "";
503            if (stat(tmpMap->value, &buf)==0){
504              strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
505              json_object *jsStr=json_object_new_string(timeStr);
506              json_object_object_add(res2,"creation_date",jsStr);
507            }
508            tmpMap=getMapArray(curs->content,"fmimeType",ii);
509            if(tmpMap!=NULL){
510              setMapArray(curs->content,"mimeType",ii,tmpMap->value);
511            }
512            setReferenceUrl(conf,curs);
513          }else{         
514          }
515          addIntToMap(curs->content,"published_id",ii+1);
516          int i=0;
517          for(;i<11;i++){
518            sid=getMapArray(curs->content,keys[i][0],ii);
519            if(sid!=NULL){
520              json_object *jsStr=json_object_new_string(sid->value);
521              json_object_object_add(res2,keys[i][1],jsStr);
522              if(i==0){
523                hasRef=1;
524                json_object *jsStr1=json_object_new_string(getProvenance(conf,sid->value));
525                json_object_object_add(res2,"dataOrigin",jsStr1);
526              }
527            }
528          }
529          if(len>1){
530            if(ii==0)
531              res3=json_object_new_array();
532            json_object_array_add(res3,res2);
533          }else
534            res3=res2;
535        }
536        if(hasRef<0)
537          json_object_put(res3);
538        else{
539          json_object_object_add(res1,curs->name,json_object_get(res3));
540          json_object_put(res3);
541        }
542        addIntToMap(curs->content,"published_id",0);
543        curs=curs->next;
544      }
545      json_object_object_add(res,"inputs",res1);
546      break;
547    }
548     
549    case 2: {
550      // Uploading data input to cluster
551      maps* in=getMaps(conf,"uploadQueue");
552      if(in!=NULL){
553        maps* curs=in;
554        map* length=getMapFromMaps(in,"uploadQueue","length");
555        if(length!=NULL){
556          json_object *res1=json_object_new_object();
557          int limit=atoi(length->value);
558          int i=0;
559          maps* uploadQueue=getMaps(in,"uploadQueue");
560          map* tmp=uploadQueue->content;
561          for(;i<limit;i++){
562            map* tmp0=getMapArray(tmp,"input",i);
563            map* tmp1=getMapArray(tmp,"localPath",i);
564            map* tmp2=getMapArray(tmp,"targetPath",i);
565            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
566              json_object *res2=json_object_new_object();
567              json_object *jsStr=json_object_new_string(tmp1->value);
568              json_object_object_add(res2,"local_path",jsStr);
569              jsStr=json_object_new_string(tmp2->value);
570              json_object_object_add(res2,"target_path",jsStr);
571              json_object *res4=json_object_object_get(res1,tmp0->value);
572              if(json_object_is_type(res4,json_type_null)){
573                json_object_object_add(res1,tmp0->value,res2);
574              }else{
575                if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){
576                  json_object *res3=json_object_new_array();
577                  json_object_array_add(res3,json_object_get(res4));
578                  json_object_array_add(res3,res2);
579                  json_object_object_del(res1,tmp0->value);
580                  json_object_object_add(res1,tmp0->value,res3);
581                }else
582                  json_object_array_add(res4,res2);
583              }
584            }
585          }
586          json_object_object_add(res,"inputs",res1);
587        }
588      }
589      break;
590    }
591     
592    case 3: {
593      // Generating job script
594      sid=getMapFromMaps(conf,"lenv","local_script");
595      if(sid!=NULL){
596        json_object *jsStr=json_object_new_string(sid->value);
597        json_object_object_add(res,"script",jsStr);
598      }
599      break;
600    }
601     
602    case 4: {
603      // Submitting job to cluster
604      sid=getMapFromMaps(conf,"lenv","remote_script");
605      if(sid!=NULL){
606        json_object *jsStr=json_object_new_string(sid->value);
607        json_object_object_add(res,"script",jsStr);
608      }
609      break;
610    }
611     
612    case 5: {
613      // Downloading process outputs from cluster
614      maps* curs=outputs;
615      dumpMaps(curs);
616      char *keys[10][2]={
617        {
618          "Reference",
619          "ref"
620        },
621        {
622          "generated_file",
623          "cachefile"
624        },
625        {
626          "mimeType",
627          "mimetype"
628        },
629        {
630          "size",
631          "size"
632        },
633        {
634          "geodatatype",
635          "datatype"
636        },
637        {
638          "wgs84_extent",
639          "boundingbox"
640        },
641        {
642          "ref_wms_link",
643          "ref_wms_link"
644        },
645        {
646          "ref_wcs_link",
647          "ref_wcs_link"
648        },
649        {
650          "ref_wcs_preview_link",
651          "ref_wcs_preview_link"
652        },
653        {
654          "ref_wfs_link",
655          "ref_wfs_link"
656        }       
657      };
658      char* specifics[5][2]={
659        {
660          "download_link",
661          "ref_download_link"
662        },
663        {
664          "wms_link",
665          "ref_wms_link"
666        },
667        {
668          "wfs_link",
669          "ref_wfs_link"
670        },
671        {
672          "wcs_link",
673          "ref_wcs_link"
674        },
675        {
676          "wcs_link",
677          "ref_wcs_preview_link"
678        }
679      };
680      json_object *res1=json_object_new_object();
681      while(curs!=NULL){       
682        json_object *res2=json_object_new_object();
683        int i=0;
684        int hasRef=-1;
685        for(;i<10;i++){
686          sid=getMap(curs->content,keys[i][0]);
687          if(sid!=NULL){
688            json_object *jsStr=json_object_new_string(sid->value);
689            json_object_object_add(res2,keys[i][1],jsStr);
690            if(i==0){
691              hasRef=1;
692              json_object_object_add(res2,"ref_download_link",jsStr);
693            }
694            if(i==1){
695              struct stat buf;
696              char timeStr[ 100 ] = "";
697              if (stat(sid->value, &buf)==0){
698                strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
699                json_object *jsStr=json_object_new_string(timeStr);
700                json_object_object_add(res2,"creation_date",jsStr);
701              }
702            }
703          }
704        }
705        if(hasRef>0)
706          json_object_object_add(res1,curs->name,res2);
707        else{
708          maps* curs0=curs->child;
709          int i=0;
710          int bypass=-1;
711          for(i=0;i<5;i++){
712            maps* specificMaps;
713            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
714              int hasRef0=-1;
715              int i0=0;
716              for(;i0<6;i0++){
717                sid=getMap(specificMaps->content,keys[i0][0]);
718                if(sid!=NULL){
719                  json_object *jsStr=json_object_new_string(sid->value);
720                  if(i0==0){
721                    json_object_object_add(res2,specifics[i][1],jsStr);
722                  }
723                  else
724                    json_object_object_add(res2,keys[i0][1],jsStr);
725                  hasRef0=1;
726                  bypass=1;
727                  if(i==1){
728                    struct stat buf;
729                    char timeStr[ 100 ] = "";
730                    if (stat(sid->value, &buf)==0){
731                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
732                      json_object *jsStr=json_object_new_string(timeStr);
733                      json_object_object_add(res2,"creation_date",jsStr);
734                    }
735                  }
736                }
737              }       
738            }
739          }
740          if(bypass<0)
741            while(curs0!=NULL){
742              json_object *res3=json_object_new_object();
743              int i0=0;
744              int hasRef0=-1;
745              for(;i0<10;i0++){
746                sid=getMap(curs0->content,keys[i0][0]);
747                if(sid!=NULL){
748                  json_object *jsStr=json_object_new_string(sid->value);
749                  json_object_object_add(res3,keys[i0][1],jsStr);
750                  hasRef0=1;
751                }
752              }
753              if(hasRef0<0)
754                json_object_put(res3);
755              else
756                json_object_object_add(res2,curs0->name,res3);
757              curs0=curs0->next;
758            }
759          json_object_object_add(res1,curs->name,res2);
760        }
761        curs=curs->next;
762      }
763      json_object_object_add(res,"outputs",res1);
764      break;
765    }
766     
767    case 6: {
768      // Finalize HPC
769      char *keys[6][2]={
770        {
771          //"SubmitTime",
772          "Submit",
773          "hpc_submission_date"
774        },
775        {
776          "JobId",
777          "hpc_job_identifier"
778        },
779        {
780          "JobName",
781          "hpc_job_name"
782        },
783        {
784          //"StartTime",
785          "Start",
786          "hpc_start_date"
787        },
788        {
789          //"EndTime",
790          "End",
791          "hpc_end_date"
792        },
793        {
794          //"JobState",
795          "State",
796          "hpc_status"
797        }       
798      };
799      int i=0;
800      if(getMaps(conf,"henv")!=NULL){
801        for(i=0;i<6;i++){
802          sid=getMapFromMaps(conf,"henv",keys[i][0]);
803          if(sid!=NULL){
804            json_object *jsStr=json_object_new_string(sid->value);
805            json_object_object_add(res,keys[i][1],jsStr);
806          }
807        }
808      }
809      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
810        json_object *jsStr=json_object_new_string(sid->value);
811        json_object_object_add(res,"hpc_cpu_usage",jsStr);
812      }else{
813        json_object *jsStr=json_object_new_string("1");
814        json_object_object_add(res,"hpc_cpu_usage",jsStr);
815      }
816      json_object *jsStr=json_object_new_string("succeeded");
817      json_object_object_add(res,"wps_status",jsStr);
818      break;
819    }
820     
821    case 7: {
822      // Error or Dismiss
823      sid=getMapFromMaps(conf,"lenv","message");
824      if(sid!=NULL){
825        json_object *jsStr=json_object_new_string(sid->value);
826        json_object_object_add(res,"message",jsStr);
827      }
828      json_object *jsStr;
829      if(state==1)
830        jsStr=json_object_new_string("dismissed");
831      else
832        jsStr=json_object_new_string("failed");
833      json_object_object_add(res,"wps_status",jsStr);
834      break;
835    }
836    others: {
837        break;
838      }
839    }
840
841    if(local_arguments==NULL)
842      local_arguments=(local_params**)malloc(sizeof(local_params*));
843    else
844      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
845    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
846    local_arguments[nbThreads]->conf=conf;
847    local_arguments[nbThreads]->url=url;
848    local_arguments[nbThreads]->res=res;
849    local_arguments[nbThreads]->step=step;
850    local_arguments[nbThreads]->state=state;
851    if(myThreads==NULL)
852      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
853    else
854      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
855    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
856      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
857      return false;
858    }
859    nbThreads++;
860    return true;
861  }
862
863  /**
864   * Wait for the threads to end then, clean used memory.
865   */
866  void cleanupCallbackThreads(){
867    int i=0;
868    for(i=0;i<nbThreads;i++){
869      pthread_join(myThreads[i],NULL);
870      free(local_arguments[i]);
871    }
872    free(local_arguments);
873    free(myThreads);
874  }
875
876#ifdef __cplusplus
877}
878#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png