source: branches/prototype-v0/zoo-project/zoo-kernel/service_internal_hpc.c @ 854

Last change on this file since 854 was 854, checked in by djay, 6 years ago

HPC support update. Add inputs for create options in Gdal_Dem.

  • Property svn:keywords set to Id
File size: 35.4 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 * Copyright (c) 2017 GeoLabs SARL
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 */
29
30#include "service_internal_hpc.h"
31#include "response_print.h"
32#include "server_internal.h"
33#include "service_callback.h"
34#include "mimetypes.h"
35#include <sys/un.h>
36
37typedef struct {
38  maps* conf;
39  char* local_file;
40  char* target_file;
41} local_params;
42
43/**
44 * Add nested outputs to every outputs that is geographic format
45 * @see isGeographic
46 * @param s the service current definition
47 */ 
48void addNestedOutputs(service** s){
49  if((*s)==NULL){
50    return;
51  }   
52  if(*s==NULL || (*s)->outputs==NULL){
53    return;
54  }
55  elements *out=(*s)->outputs;
56  elements* cur=out;
57  map* serviceType=getMap((*s)->content,"ServiceType");
58  if(strncmp(serviceType->value,"HPC",3)!=0)
59    return;
60  while(cur!=NULL && cur->defaults!=NULL){
61    map* mimeType=getMap(cur->defaults->content,"mimeType");
62    if(mimeType!=NULL){
63      int geo=isGeographic(mimeType->value);
64      if(geo>0){
65        elements *tmp[3]={
66          dupElements(cur),
67          dupElements(cur),
68          dupElements(cur)
69        };
70        char *geoLink="wcs_link";
71        if(geo==2){
72          geoLink="wfs_link";
73        }
74        int i=0;
75        for(;i<3;i++){
76          if(tmp[i]->next!=NULL){
77            freeElements(&tmp[i]->next);
78            free(tmp[i]->next);
79            tmp[i]->next=NULL;
80          }
81          free(tmp[i]->name);
82          if(tmp[i]->format!=NULL)
83            free(tmp[i]->format);
84          tmp[i]->format=zStrdup("ComplexData");
85          freeMap(&tmp[i]->content);
86          free(tmp[i]->content);
87          tmp[i]->content=NULL;
88          switch(i){
89          case 0:
90            tmp[i]->name=zStrdup("download_link");
91            tmp[i]->content=createMap("Title",_("Download link"));
92            addToMap(tmp[i]->content,"Abstract",_("The download link"));
93            addToMap(tmp[i]->defaults->content,"useMapserver","false");
94            if(tmp[i]->supported!=NULL){
95              freeIOType(&tmp[i]->supported);
96              free(tmp[i]->supported);
97              tmp[i]->supported=NULL;
98            }
99            break;
100          case 1:
101            tmp[i]->name=zStrdup("wms_link");
102            tmp[i]->content=createMap("Title",_("WMS link"));
103            addToMap(tmp[i]->content,"Abstract",_("The WMS link"));
104            if(tmp[i]->supported!=NULL && tmp[i]->supported->next!=NULL){
105              freeIOType(&tmp[i]->supported->next);
106              free(tmp[i]->supported->next);
107              tmp[i]->supported->next=NULL;
108            }else{
109              if(tmp[i]->supported!=NULL)
110                addToMap(tmp[i]->supported->content,"useMapserver","true");
111              addToMap(tmp[i]->defaults->content,"useMapserver","true");
112            }
113            break;
114          case 2:
115            if(geo==2){
116              tmp[i]->name=zStrdup("wfs_link");
117              tmp[i]->content=createMap("Title",_("WFS link"));
118              addToMap(tmp[i]->content,"Abstract",_("The WFS link"));
119            }else{
120              tmp[i]->name=zStrdup("wcs_link");
121              tmp[i]->content=createMap("Title",_("WCS link"));
122              addToMap(tmp[i]->content,"Abstract",_("The WCS link"));
123            }
124            if(tmp[i]->supported!=NULL && tmp[i]->supported->next!=NULL &&
125               tmp[i]->supported->next->content!=NULL){
126              freeIOType(&tmp[i]->supported);
127              free(tmp[i]->supported);
128              tmp[i]->supported=NULL;
129              tmp[i]->supported=createIoType();
130              iotype* cnext=cur->supported->next;
131              tmp[i]->supported->content=createMap(cnext->content->name,cnext->content->value);
132              addMapToMap(&tmp[i]->supported->content,cnext->content->next);
133              addToMap(tmp[i]->supported->content,"useMapserver","true");
134            }else
135              addToMap(tmp[i]->defaults->content,"useMapserver","true");
136            break;
137          }
138        }
139        addToElements(&cur->child,tmp[0]);
140        addToElements(&cur->child,tmp[1]);
141        addToElements(&cur->child,tmp[2]);
142        free(cur->format);
143        cur->format=NULL;
144        if(cur->defaults!=NULL){
145          freeIOType(&cur->defaults);
146          free(cur->defaults);
147          cur->defaults=NULL;
148        }
149        if(cur->supported!=NULL){
150          freeIOType(&cur->supported);
151          free(cur->supported);
152          cur->supported=NULL;
153        }
154        freeElements(&tmp[2]);
155        free(tmp[2]);
156        freeElements(&tmp[1]);
157        free(tmp[1]);
158        freeElements(&tmp[0]);
159        free(tmp[0]);
160        //addToMap(cur->content,"internal","true");
161      }
162    }
163    cur=cur->next;
164  }
165  //dumpElements((*s)->outputs);
166}
167
168/**
169 * Acquire a read lock on every files used as input for executing a service.
170 * @param conf the main configuration file map
171 * @return 0 if every file can be locked, -1 if one lock has failed.
172 */
173int addReadLocks(maps** conf){
174  map* queueLengthMap=getMapFromMaps(*conf,"uploadQueue","length");
175  maps* queueMaps=getMaps(*conf,"uploadQueue");
176  if(queueLengthMap!=NULL){
177    int cnt=atoi(queueLengthMap->value);
178    int i=0;
179    for(i=0;i<cnt;i++){
180      map* argv[2]={
181        getMapArray(queueMaps->content,"input",i),
182        getMapArray(queueMaps->content,"localPath",i)
183      };
184      zooLock* lck;
185      if((lck=lockFile(*conf,argv[1]->value,'r'))==NULL){
186        char* templateStr=_("Unable to lock the file for %s in read mode.");
187        char *tmpMessage=(char*)malloc((strlen(templateStr)+strlen(argv[0]->value)+1)*sizeof(char));
188        sprintf(tmpMessage,templateStr,argv[0]->value);
189        setMapInMaps(*conf,"lenv","message",tmpMessage);
190        free(tmpMessage);
191        return -1;
192      }else{
193        if(zoo_file_locks_cnt==0){
194          zoo_file_locks=(zooLock**)malloc(sizeof(zooLock*));
195        }
196        else{
197          zoo_file_locks=(zooLock**)realloc(zoo_file_locks,(zoo_file_locks_cnt+1)*sizeof(zooLock*));
198        }
199        zoo_file_locks[zoo_file_locks_cnt]=lck;
200        zoo_file_locks_cnt++;
201      }
202    }
203  }
204  return 0;
205}
206
207/**
208 * Remove all read locks set for files used as input for executing the service.
209 * @param conf the main configuration maps pointer
210 * @return 0 in case of success, -1 if any error occured. In case of error, one
211 * can refer to the message map array from the lenv section.
212 */
213int removeReadLocks(maps** conf){
214  int res=0;
215  int nberr=0;
216  map* queueLengthMap=getMapFromMaps(*conf,"uploadQueue","length");
217  maps* queueMaps=getMaps(*conf,"uploadQueue");
218  if(queueLengthMap!=NULL){
219    int cnt=atoi(queueLengthMap->value);
220    int i=0;
221    for(i=0;i<cnt;i++){
222      if(unlockFile(*conf,zoo_file_locks[i])<1){
223        map* argv=getMapArray(queueMaps->content,"input",i);
224        char* templateStr=_("Unable to unlock the file for %s after execution.");
225        char *tmpMessage=(char*)malloc((strlen(templateStr)+strlen(argv->value)+1)*sizeof(char));
226        sprintf(tmpMessage,templateStr,argv->value);
227        maps* lenv=getMaps(*conf,"lenv");
228        setMapArray(lenv->content,"message",nberr,tmpMessage);
229        free(tmpMessage);
230        res=-1;
231        nberr++;
232      }
233    }
234  }
235  free(zoo_file_locks);
236  return res;
237}
238
239/**
240 * Get the section name depending on number of features and/or pixels of each
241 * inputs and the threshold defined in a section.
242 * It supposes that your inputs has been published using MapServer support,
243 * implying that the number of features (nb_features), respectively pixels
244 * (nb_pixels), are defined. The section, identified by confId, should contain
245 * preview_max_features and preview_max_pixels defining the threshold values.
246 * @param conf the main configuration file maps pointer
247 * @param inputs the inputs maps pointer
248 * @param confId the section identifier
249 * @return "preview_conf" in case the numbers are lower than the threshold,
250 * "fullres_conf" in other cases.
251 */
252char* getConfiguration(maps** conf,maps** inputs,const char* confId){
253  maps* input=*inputs;
254  map* max_pixels=getMapFromMaps(*conf,confId,"preview_max_pixels");
255  map* max_features=getMapFromMaps(*conf,confId,"preview_max_features");
256  int i_max_pixels=atoi(max_pixels->value);
257  int i_max_features=atoi(max_features->value);
258  while(input!=NULL && input->content!=NULL){
259    map* tmpMap=getMap(input->content,"geodatatype");
260    if(tmpMap!=NULL){
261      map* currentNb;
262      if(strcasecmp(tmpMap->value,"raster")==0 ){
263        currentNb=getMap(input->content,"nb_pixels");
264        if(atoi(currentNb->value)>i_max_pixels)
265          return "fullres_conf";
266      }else{
267        if(strcasecmp(tmpMap->value,"vector")==0 ){
268          currentNb=getMap(input->content,"nb_features");
269          if(atoi(currentNb->value)>i_max_features)
270            return "fullres_conf";
271        }
272      }
273    }
274    input=input->next;
275  }
276  return "preview_conf";
277}
278
279/**
280 * Load and run a HPC Application corresponding to the service.
281 *
282 * @param main_conf the conf maps containing the main.cfg settings
283 * @param request the map containing the HTTP request
284 * @param s the service structure
285 * @param real_inputs the maps containing the inputs
286 * @param real_outputs the maps containing the outputs
287 */
288int zoo_hpc_support(maps** main_conf,map* request,service* s,maps **real_inputs,maps **real_outputs){
289  maps* m=*main_conf;
290  maps* inputs=*real_inputs;
291  maps* outputs=*real_outputs;
292  map* tmp0=getMapFromMaps(*main_conf,"lenv","cwd");
293  char *ntmp=tmp0->value;
294  map* tmp=NULL;
295  int res=-1;
296  // Get the configuration id depending on service type and defined thresholds
297  // then, set the configId key in the lenv section
298  char *serviceType;
299  map* mServiceType=getMap(s->content,"serviceType");
300  if(mServiceType!=NULL)
301    serviceType=mServiceType->value;
302  else
303    serviceType="HPC";
304  map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath");
305  map* uuid=getMapFromMaps(*main_conf,"lenv","usid");
306  map* confMap=getMapFromMaps(*main_conf,serviceType,getConfiguration(main_conf,real_inputs,serviceType));
307  char * configurationId=confMap->value;
308  setMapInMaps(*main_conf,"lenv","configId",configurationId);
309  // Dump lenv maps again after having set the configId ...
310  char *flenv =
311    (char *)
312    malloc ((strlen (tmpPath->value) + 
313             strlen (uuid->value) + 12) * sizeof (char));
314  sprintf (flenv, "%s/%s_lenv.cfg", tmpPath->value, uuid->value);
315  maps* lenvMaps=getMaps(m,"lenv");
316  dumpMapsToFile(lenvMaps,flenv,0);
317  free(flenv);
318
319  map* targetPathMap=getMapFromMaps(*main_conf,configurationId,"remote_data_path");
320 
321  pthread_t threads_pool[50];
322  // Force the HPC services to be called asynchronously
323  map* isAsync=getMapFromMaps(*main_conf,"lenv","async");
324  if(isAsync==NULL){
325    errorException(*main_conf,_("The synchronous mode is not supported by this type of service"),"NoSuchMode",s->name);
326    return -1;
327  }
328
329  maps* input=*real_inputs;
330  char **parameters=NULL;
331  int parameters_cnt=0;
332  while(input!=NULL && input->content!=NULL){
333    if(getMaps(*real_outputs,input->name)==NULL){
334      parameters_cnt+=1;
335      if(parameters_cnt==1)
336        parameters=(char**)malloc(parameters_cnt*sizeof(char*));
337      else
338        parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
339      if(getMap(input->content,"mimeType")!=NULL){
340        // Input is ComplexData
341        if(getMap(input->content,"cache_file")==NULL){
342          // Input data has been passed by value
343          // TODO: publish input through MapServer / use output publication
344          dumpMapsValuesToFiles(main_conf,&input);
345          addToMap(input->content,"toPublish","true");
346          addToMap(input->content,"useMapserver","true");
347        }
348        if(getMap(input->content,"cache_file")!=NULL){
349          map* length=getMap(input->content,"length");
350          if(length==NULL){
351            addToMap(input->content,"length","1");
352            length=getMap(input->content,"length");
353          }
354          int len=atoi(length->value);
355          int i=0;
356          for(i=0;i<len;i++){
357            map* tmp=getMapArray(input->content,"cache_file",i);
358            char* targetName=strrchr(tmp->value,'/');
359            char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
360            sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
361            setMapArray(input->content,"targetPath",i,targetPath);
362            setMapArray(input->content,"localPath",i,tmp->value);
363            addToUploadQueue(main_conf,input);
364            if(i==0){
365              parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
366              sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
367            }else{
368              char *tmpStr=zStrdup(parameters[parameters_cnt-1]);
369              parameters[parameters_cnt-1]=(char*)realloc(parameters[parameters_cnt-1],(strlen(tmpStr)+strlen(targetPath)+2)*sizeof(char));
370              sprintf(parameters[parameters_cnt-1],"%s %s",tmpStr,targetPath);
371              free(tmpStr);
372            }
373            free(targetPath);
374          }
375        }else{
376          // ???
377          fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
378          fflush(stderr);
379        }
380      }else{
381        // LitteralData and BboxData
382        if(getMap(input->content,"dataType")!=NULL){
383          // For LitteralData, simply pass the value
384          map* val=getMap(input->content,"value");
385          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(val->value)+3)*sizeof(char));
386          sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,val->value);
387        }
388      }
389    }
390    input=input->next;
391  }
392
393  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
394  invokeCallback(m,inputs,NULL,2,0);
395  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
396  dumpMaps(inputs);
397  if(getMapFromMaps(m,"lenv","mapError")!=NULL){
398    invokeCallback(m,inputs,NULL,7,0);
399    return -1;
400  }
401
402  // Upload data on HPC
403  if(runUpload(main_conf)==false){
404    errorException (m, _("Unable to lock the file for upload!"),
405                    "InternalError", NULL);
406    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
407    invokeCallback(m,inputs,NULL,7,0);
408    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
409    return -1;
410  }
411  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
412  invokeCallback(m,inputs,NULL,2,1);
413  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
414
415  // Add the filename to generate for every output to parameters
416  input=*real_outputs;
417  // TODO: fix appendOutputParameters
418  //appendOutputParameters(input,parameters,&parameters_cnt,s,uuid,targetPathMap);
419  dumpMaps(input);
420  while(input!=NULL){
421    // TODO: parse all outputs including inner outputs if required.
422    if(input->child==NULL){
423      // Name every files that should be produced by the service execution
424      map* mime=getMap(input->content,"mimeType");
425      char* targetName;
426      if(mime!=NULL){
427        bool hasExt=false;
428        map* fileExt=getFileExtensionMap(mime->value,&hasExt);
429        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+strlen(fileExt->value)+11)*sizeof(char));
430        sprintf(targetName,"output_%s_%s_%s.%s",s->name,input->name,uuid->value,fileExt->value);
431        freeMap(&fileExt);
432        free(fileExt);
433      }else{
434        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+14)*sizeof(char));
435        sprintf(targetName,"output_%s_%s_%s.tif",s->name,input->name,uuid->value);
436      }
437      char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
438      sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
439      free(targetName);
440      setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
441      if(strcasecmp(input->name,"wms_link")!=0&&
442         strcasecmp(input->name,"wcs_link")!=0 &&
443         strcasecmp(input->name,"wfs_link")!=0){
444        parameters_cnt+=1;
445        if(parameters_cnt==1)
446          parameters=(char**)malloc(parameters_cnt*sizeof(char*));
447        else
448          parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
449        // We should verify if any optional tag for output is required
450        // (i.e. -out output.tiff *int8*), meaning that we should search
451        // for a corresponding inputs name.
452        map* inValue=getMapFromMaps(*real_inputs,input->name,"value");
453        if(inValue!=NULL){
454          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+strlen(inValue->value)+4)*sizeof(char));
455          sprintf(parameters[parameters_cnt-1],"-%s %s %s",input->name,targetPath,inValue->value);
456        }else{
457          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
458          sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
459        }
460      }
461      free(targetPath);
462    }// In other case it means we need to return the cache_file as generated_file
463    else{
464      // Name every files that should be produced by the service execution
465      map* mime=getMap(input->child->content,"mimeType");
466      char* targetName;
467      if(mime!=NULL){
468        bool hasExt=false;
469        map* fileExt=getFileExtensionMap(mime->value,&hasExt);
470        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+strlen(fileExt->value)+11)*sizeof(char));
471        sprintf(targetName,"output_%s_%s_%s.%s",s->name,input->name,uuid->value,fileExt->value);
472        freeMap(&fileExt);
473        free(fileExt);
474      }else{
475        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+14)*sizeof(char));
476        sprintf(targetName,"output_%s_%s_%s.tif",s->name,input->name,uuid->value);
477      }
478      char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
479      sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
480      free(targetName);
481      addToMap(input->content,"generated_file",targetPath);
482      addToMap(input->content,"storage",targetPath);
483      if(strcasecmp(input->name,"wms_link")!=0&&
484         strcasecmp(input->name,"wcs_link")!=0 &&
485         strcasecmp(input->name,"wfs_link")!=0){
486        parameters_cnt+=1;
487        if(parameters_cnt==1)
488          parameters=(char**)malloc(parameters_cnt*sizeof(char*));
489        else
490          parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
491        // We should verify if any optional tag for output is required
492        // (i.e. -out output.tiff *int8*), meaning that we should search
493        // for a corresponding inputs name.
494        map* inValue=getMapFromMaps(*real_inputs,input->name,"value");
495        if(inValue!=NULL){
496          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+strlen(inValue->value)+4)*sizeof(char));
497          sprintf(parameters[parameters_cnt-1],"-%s %s %s",input->name,targetPath,inValue->value);
498        }else{
499          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
500          sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
501        }
502      }
503      free(targetPath);
504    }
505    input=input->next;
506  }
507  // Produce the SBATCH File locally
508  char *scriptPath=(char*)malloc((strlen(s->name)+strlen(tmpPath->value)+strlen(uuid->value)+10)*sizeof(char));
509  sprintf(scriptPath,"%s/zoo_%s_%s.sh",tmpPath->value,s->name,uuid->value);
510  setMapInMaps(*main_conf,"lenv","local_script",scriptPath);
511  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
512  fflush(stderr);
513  invokeCallback(m,inputs,NULL,3,0);
514  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
515  fflush(stderr);
516  FILE* scriptFile=fopen(scriptPath,"w+");
517  map* headerMap=getMapFromMaps(*main_conf,configurationId,"jobscript_header");
518  if(headerMap!=NULL){
519    // Use the header file if defined in the HPC section of the main.cfg file
520    struct stat f_status;
521    int s=stat(headerMap->value, &f_status);
522    if(s==0){
523      char* fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
524      FILE* f=fopen(headerMap->value,"rb");
525      fread(fcontent,f_status.st_size,1,f);
526      int fsize=f_status.st_size;
527      fcontent[fsize]=0;
528      fclose(f);
529      fprintf(scriptFile,"%s\n### --- ZOO-Service HEADER end --- ###\n\n",fcontent);
530      free(fcontent);
531    }else
532      fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service HEADER (no header found) *** ###\n\n");
533  }else
534    fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service HEADER *** ###\n\n");
535  maps* hpc_opts=getMaps(*main_conf,configurationId);
536  if(hpc_opts!=NULL){
537    map* hpc_opts_content=hpc_opts->content;
538    while(hpc_opts_content!=NULL){
539      if(strncasecmp(hpc_opts_content->name,"sbatch_options_",15)==0)
540        fprintf(scriptFile,"#SBATCH --%s=%s\n",strstr(hpc_opts_content->name,"sbatch_options_")+15,hpc_opts_content->value);
541      hpc_opts_content=hpc_opts_content->next;
542    }
543  }
544  fprintf(scriptFile,"#SBATCH --job-name=ZOO-Project_%s_%s\n\n",uuid->value,s->name);
545  map* mods=getMap(s->content,"hpcModules");
546  if(mods!=NULL)
547    fprintf(scriptFile,"#SBATCH --export=MODULES=%s\n",mods->value);
548
549  map* bodyMap=getMapFromMaps(*main_conf,configurationId,"jobscript_body");
550  if(bodyMap!=NULL){
551    // Use the header file if defined in the HPC section of the main.cfg file
552    struct stat f_status;
553    int s=stat(bodyMap->value, &f_status);
554    if(s==0){
555      char* fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
556      FILE* f=fopen(bodyMap->value,"rb");
557      fread(fcontent,f_status.st_size,1,f);
558      int fsize=f_status.st_size;
559      fcontent[fsize]=0;
560      fclose(f);
561      fprintf(scriptFile,"%s\n### --- ZOO-Service BODY end --- ###\n\n",fcontent);
562      free(fcontent);
563    }else
564      fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service BODY (no body found) *** ###\n\n");
565  }else
566    fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service BODY *** ###\n\n");
567
568  map* sp=getMap(s->content,"serviceProvider");
569 
570  // Require to produce the command line to be executed
571  fprintf(scriptFile,"\n\necho \"Job started at: $(date)\"\n");
572  fprintf(scriptFile,"echo \"Running service: [%s]\"\n",sp->value);
573  fprintf(scriptFile,"%s ",sp->value);
574  for(int i=0;i<parameters_cnt;i++){
575    fprintf(scriptFile," %s",parameters[i]);
576  }
577  for(int i=parameters_cnt-1;i>=0;i--){
578    free(parameters[i]);
579  }
580  free(parameters);
581  fprintf(scriptFile,"\n");
582  fprintf(scriptFile,"echo \"Job finished at: $(date)\"\n");
583  fflush(scriptFile);
584  fclose(scriptFile);
585  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
586  invokeCallback(m,inputs,NULL,3,1);
587  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
588
589  // Upload the SBATCH File to the remote host
590  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
591  invokeCallback(m,inputs,NULL,4,0);
592  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
593  targetPathMap=getMapFromMaps(*main_conf,configurationId,"remote_work_path");
594  if(targetPathMap==NULL){
595    setMapInMaps(*main_conf,"lenv","message",_("There is no remote_work_path defined in your section!"));
596    setMapInMaps(*main_conf,"lenv","status","failed");
597    errorException (m, _("There is no remote_work_path defined in your section!"),
598                    "InternalError", NULL);
599    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
600    fflush(stderr);
601    invokeCallback(m,NULL,NULL,7,0);
602    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
603    fflush(stderr);
604    return SERVICE_FAILED;
605  }
606  char* targetName=strrchr(scriptPath,'/');
607  char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
608  sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
609  setMapInMaps(*main_conf,"lenv","remote_script",targetPath);
610  SSHCON *test=ssh_connect(*main_conf);
611  ssh_copy(*main_conf,scriptPath,targetPath,ssh_get_cnt(*main_conf));
612  unlink(scriptPath);
613  free(scriptPath);
614  // Execute the SBATCH script remotely
615  addReadLocks(main_conf);
616  map* subStr=getMapFromMaps(*main_conf,configurationId,"sbatch_substr");
617  char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+137)*sizeof(char));
618  sprintf(command,"sbatch %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value);
619  if(ssh_exec(*main_conf,command,ssh_get_cnt(m))==0){
620    // The sbatch command has failed!
621    // Download the error log file from the HPC server
622    char tmpS[1024];
623    free(command);
624    command=(char*)malloc((strlen(targetPathMap->value)+strlen(uuid->value)+11)*sizeof(char));
625    sprintf(command,"%s/error_%s.log",targetPathMap->value,uuid->value);
626    targetName=strrchr(command,'/');
627    free(targetPath);
628    targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(targetName)+2)*sizeof(char));
629    sprintf(targetPath,"%s/%s",tmpPath->value,targetName);
630    if(ssh_fetch(*main_conf,targetPath,command,ssh_get_cnt(m))==0){
631      struct stat f_status;
632      int ts=stat(targetPath, &f_status);
633      if(ts==0) {
634        char* fcontent = NULL;
635        fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
636        FILE* f=fopen(targetPath,"rb");
637        fread(fcontent,f_status.st_size,1,f);
638        int fsize=f_status.st_size;
639        fcontent[fsize]=0;
640        fclose(f);
641        setMapInMaps(*main_conf,"lenv","message",fcontent);
642        free(fcontent);
643      }else
644        setMapInMaps(*main_conf,"lenv","message",_("No message provided"));
645    }else
646      setMapInMaps(*main_conf,"lenv","message",_("Unable to fetch the remote error log file"));
647    tmpPath=getMapFromMaps(*main_conf,"lenv","message");
648    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
649    fflush(stderr);
650    invokeCallback(m,NULL,NULL,7,0);
651    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
652    fflush(stderr);
653    sprintf(tmpS, "Cannot execute the HPC ZOO-Service %s: %s", s->name, tmpPath->value);
654    errorException(m,tmpS,"NoApplicableCode",NULL);
655    free(command);
656    free(targetPath);
657    ssh_close(*main_conf);
658    sleep(1);
659    return -1;
660  }
661  free(targetPath);
662  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
663  fflush(stderr);
664  invokeCallback(m,NULL,NULL,4,1);
665  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
666  fflush(stderr);
667  free(command);
668  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
669  fflush(stderr);
670
671  struct sockaddr_un addr;
672  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
673  fflush(stderr);
674  memset(&addr, 0, sizeof(addr));
675  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
676  fflush(stderr);
677  addr.sun_family = AF_UNIX;
678  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
679  fflush(stderr);
680  int rc, cl, fd = socket(AF_UNIX, SOCK_STREAM, 0);
681  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
682  fflush(stderr);
683  char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(uuid->value)+20));
684  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
685  fflush(stderr);
686  sprintf(sname,"%s/.wait_socket_%s.sock",tmpPath->value,uuid->value);
687  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
688  fflush(stderr);
689  strncpy(addr.sun_path, sname, sizeof(addr.sun_path)-1);
690  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
691  fflush(stderr);
692 
693  if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
694    perror("bind error");
695    setMapInMaps(m,"lenv","message",_("Unable to bind socket!"));
696    errorException (m, _("Unable to bind socket!"),
697                    "InternalError", NULL);
698    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
699    fflush(stderr);
700    invokeCallback(m,NULL,NULL,7,0);
701    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
702    fflush(stderr);
703    sleep(120);
704    return -1;
705  }
706  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
707  fflush(stderr);
708  if (listen(fd, 5) == -1) {
709    setMapInMaps(*main_conf,"lenv","message",_("Listen error"));
710    errorException (m, _("Listen error"),
711                    "InternalError", NULL);
712    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
713    fflush(stderr);
714    invokeCallback(m,NULL,NULL,7,0);
715    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
716    fflush(stderr);
717    return -1;
718  }
719  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
720  fflush(stderr);
721  if ( (cl = accept(fd, NULL, NULL)) == -1) {
722    setMapInMaps(*main_conf,"lenv","message",_("Accept error"));
723    errorException (m, _("Accept error"),
724                    "InternalError", NULL);
725    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
726    fflush(stderr);
727    invokeCallback(m,NULL,NULL,7,0);
728    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
729    fflush(stderr);
730    return -1;
731  }else{
732    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
733    fflush(stderr);
734    int hasPassed=-1;
735    char buf[11];
736    memset(&buf,0,11);
737    while ( (rc=read(cl,buf,10)) ) {     
738      if(rc==0){
739        sleep(1);
740        setMapInMaps(*main_conf,"lenv","message",_("Read closed"));
741        errorException (m, _("Read closed"),
742                        "InternalError", NULL);
743        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
744        fflush(stderr);
745        invokeCallback(m,NULL,NULL,7,0);
746        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
747        fflush(stderr);
748        return -1;
749      }else{
750        if(rc<0){
751          setMapInMaps(*main_conf,"lenv","message",_("Read error"));
752          errorException (m, _("Read error"),
753                          "InternalError", NULL);
754          fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
755          fflush(stderr);
756          invokeCallback(m,NULL,NULL,7,0);
757          fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
758          fflush(stderr);
759          return -1;
760        }
761      }
762      hasPassed=1;
763      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
764      fflush(stderr);
765      res=atoi(buf);
766      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
767      fflush(stderr);
768      unlink(sname);
769      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
770      fflush(stderr);
771      free(sname);
772      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
773      fflush(stderr);
774      removeReadLocks(main_conf);
775      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
776      fflush(stderr);
777 
778      if(res==3){
779        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
780        fflush(stderr);
781        invokeCallback(m,NULL,outputs,5,0);
782        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
783        fflush(stderr);
784        input=*real_outputs;
785        while(input!=NULL){
786          if(input->child==NULL){
787            map* generatedFile=getMap(input->content,"generated_file");
788            if(generatedFile!=NULL){
789              char* filename=strrchr(generatedFile->value,'/');
790              char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char));
791              sprintf(targetPath,"%s/%s",tmpPath->value,filename);
792              test=ssh_connect(*main_conf);
793              if(ssh_fetch(*main_conf,targetPath,generatedFile->value,ssh_get_cnt(m))==0){
794                setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
795                free(targetPath);
796              }else{
797                char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char));
798                sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename);
799                setMapInMaps(*main_conf,"lenv","message",tmpStr);
800                free(tmpStr);
801                invokeCallback(m,NULL,NULL,7,0);
802                return SERVICE_FAILED;
803              }
804            }       
805          }else{
806            fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
807            fflush(stderr);
808            map* generatedFile=getMap(input->content,"generated_file");
809            if(generatedFile!=NULL){
810              char* filename=strrchr(generatedFile->value,'/');
811              char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char));
812              sprintf(targetPath,"%s/%s",tmpPath->value,filename);
813              test=ssh_connect(*main_conf);
814              if(ssh_fetch(*main_conf,targetPath,generatedFile->value,ssh_get_cnt(m))==0){
815                maps* tmp=getMaps(*real_outputs,input->name);
816                char serviceName[9];
817                freeMap(&tmp->content);
818                free(tmp->content);
819                tmp->content=NULL;
820                maps* output=getMaps(*real_outputs,input->name);
821                setMapInMaps(output->child,"download_link","generated_file",targetPath);
822                setMapInMaps(output->child,"download_link","storage",targetPath);
823                setMapInMaps(output->child,"download_link","useMapserver","false");
824                setMapInMaps(output->child,"download_link","replicateStorageNext","true");
825                setMapInMaps(output->child,"download_link","asReference","true");
826                setMapInMaps(output->child,"download_link","inRequest","true");
827                setMapInMaps(output->child,"wms_link","generated_file",targetPath);
828                setMapInMaps(output->child,"wms_link","storage",targetPath);
829                setMapInMaps(output->child,"wms_link","useMapserver","true");
830                setMapInMaps(output->child,"wms_link","msOgc","WMS");
831                setMapInMaps(output->child,"wms_link","requestedMimeType","image/png");
832                setMapInMaps(output->child,"wms_link","asReference","true");
833                if(getMaps(output->child,"wcs_link")!=NULL){
834                  sprintf(serviceName,"wcs_link");
835                  setMapInMaps(output->child,serviceName,"msOgc","WCS");
836                }
837                else{
838                  sprintf(serviceName,"wfs_link");
839                  setMapInMaps(output->child,serviceName,"msOgc","WFS");
840                }
841                setMapInMaps(output->child,serviceName,"storage",targetPath);
842                setMapInMaps(output->child,serviceName,"generated_file",targetPath);
843                setMapInMaps(output->child,serviceName,"useMapserver","true");
844                setMapInMaps(output->child,serviceName,"asReference","true");
845              }else{
846                char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char));
847                sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename);
848                setMapInMaps(*main_conf,"lenv","message",tmpStr);
849                free(tmpStr);
850                invokeCallback(m,NULL,NULL,7,0);
851                return SERVICE_FAILED;
852              }
853              free(targetPath);
854            }
855          }
856          input=input->next;
857        }
858
859        // Read informations provided by FinalizeHPC as a configuration file
860        // then, remove the file.
861        map* jobid=getMapFromMaps(*main_conf,"lenv","usid");
862        map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath");
863        char *filePath=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+15)*sizeof(char));
864        sprintf(filePath,"%s/exec_status_%s",tmpPath->value,jobid->value);
865        maps* m = (maps *) malloc (MAPS_SIZE);
866        m->child=NULL;
867        m->next=NULL;
868        int saved_stdout = dup (fileno (stdout));
869        dup2 (fileno (stderr), fileno (stdout));
870        conf_read(filePath,m);
871        fflush(stdout);
872        dup2 (saved_stdout, fileno (stdout));
873        close(saved_stdout);
874        unlink(filePath);
875        free(filePath);
876        addMapsToMaps(main_conf,m);
877        freeMaps(&m);
878        free(m);
879      }else{
880        // Try to access remotely to the log file and return a more relevant error message
881        setMapInMaps(m,"lenv","message",_("HPC Execution failed!"));
882        errorException (m, _("HPC Execution failed!"),
883                        "InternalError", NULL);
884        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
885        fflush(stderr);
886        invokeCallback(m,NULL,NULL,7,0);
887        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
888        fflush(stderr);
889      }
890      //free(buf);
891    }
892    if(hasPassed<0){
893      perror("Failed to read");
894      setMapInMaps(*main_conf,"lenv","message",_("Unable to parse the value returned by remote execution"));
895      errorException (m, _("Unable to parse the value returned by remote execution"),
896                      "InternalError", NULL);
897      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
898      fflush(stderr);
899      invokeCallback(m,NULL,NULL,7,0);
900      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
901      fflush(stderr);
902      sleep(120);
903      return SERVICE_FAILED;
904    }
905  }
906  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
907  fflush(stderr);
908  ssh_close(*main_conf);
909  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
910  fflush(stderr);
911  return res;
912}
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png