source: branches/prototype-v0/zoo-project/zoo-kernel/service_internal_hpc.c @ 822

Last change on this file since 822 was 822, checked in by djay, 7 years ago

Commit the minimal requirements for remote HPC support

  • Property svn:keywords set to Id
File size: 14.4 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 * Copyright (c) 2015 GeoLabs SARL
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 * THE SOFTWARE.
23 *
24 * See Ref: http://hg.orfeo-toolbox.org/OTB/ Copyright
25 * Some parts of this code are derived from ITK. See ITKCopyright.txt for
26 * details.
27 */
28
29#include "service_internal_hpc.h"
30#include "response_print.h"
31#include "server_internal.h"
32#include <sys/un.h>
33
34void appendOutputParameters(maps* input,char** parameters,int* cnt,service* s,map* uuid,map* targetPathMap){
35  while(input!=NULL){
36    if(input->child==NULL){
37      *cnt+=1;
38      if(*cnt==1)
39        parameters=(char**)malloc((*cnt)*sizeof(char*));
40      else
41        parameters=(char**)realloc(parameters,(*cnt)*sizeof(char*));
42      // Name every files that should be produced by the service execution
43      char* targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+10)*sizeof(char));
44      sprintf(targetName,"output_%s_%s_%s",s->name,input->name,uuid->value);
45      // TODO: We should verify if any optional tag for output is required (i.e. -out output.tiff int8)
46      char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
47      sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
48      parameters[*cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
49      sprintf(parameters[*cnt-1],"-%s %s",input->name,targetPath);
50    }else
51      appendOutputParameters(input->child,parameters,cnt,s,uuid,targetPathMap);
52    input=input->next;
53  }
54}
55
56/**
57 * Load and run a HPC Application corresponding to the service.
58 *
59 * @param main_conf the conf maps containing the main.cfg settings
60 * @param request the map containing the HTTP request
61 * @param s the service structure
62 * @param real_inputs the maps containing the inputs
63 * @param real_outputs the maps containing the outputs
64 */
65int zoo_hpc_support(maps** main_conf,map* request,service* s,maps **real_inputs,maps **real_outputs){
66  maps* m=*main_conf;
67  maps* inputs=*real_inputs;
68  maps* outputs=*real_outputs;
69  map* tmp0=getMapFromMaps(*main_conf,"lenv","cwd");
70  char *ntmp=tmp0->value;
71  map* tmp=NULL;
72  int res=-1;
73  map* targetPathMap=getMapFromMaps(*main_conf,"HPC","storagePath");
74  map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath");
75  map* uuid=getMapFromMaps(*main_conf,"lenv","usid");
76
77  sleep(5);
78
79  maps* input=*real_inputs;
80  char **parameters=NULL;
81  int parameters_cnt=0;
82  while(input!=NULL){
83    parameters_cnt+=1;
84    if(parameters_cnt==1)
85      parameters=(char**)malloc(parameters_cnt*sizeof(char*));
86    else
87      parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
88    if(getMap(input->content,"mimeType")!=NULL){
89      // Input is ComplexData
90      SSHCON *test=ssh_connect(*main_conf);
91      dumpMaps(getMaps(*main_conf,"lenv"));
92      if(test==NULL){   
93        sleep(3600);
94        return -1;
95      }
96      if(getMap(input->content,"cache_file")==NULL){
97        // Input data has been passed by value
98        // TODO: store data remotely
99        // TODO: publish input through MapServer / use output publication
100        dumpMapsValuesToFiles(main_conf,&input);
101      }
102      if(getMap(input->content,"cache_file")!=NULL){
103        // Input data passed by reference or by value
104        map* tmp=getMap(input->content,"cache_file");
105        char* targetName=strrchr(tmp->value,'/');
106        char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
107        sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
108        ssh_copy(*main_conf,tmp->value,targetPath);
109        parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
110        sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
111        free(targetPath);
112      }else{
113        // ???
114        fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
115        fflush(stderr);
116      }
117    }else{
118      // LitteralData and BboxData
119      if(getMap(input->content,"dataType")!=NULL){
120        // For LitteralData, simply pass the value
121        map* val=getMap(input->content,"value");
122        sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,val->value);
123      }
124     
125    }
126    input=input->next;
127  }
128
129  // Add the filename to generate for every output to the parameters
130  input=*real_outputs;
131  // TODO: fix appendOutputParameters
132  //appendOutputParameters(input,parameters,&parameters_cnt,s,uuid,targetPathMap);
133  while(input!=NULL){
134    // TODO: parse all outputs including inner outputs if required.
135    if(input->child==NULL){
136      parameters_cnt+=1;
137      if(parameters_cnt==1)
138        parameters=(char**)malloc(parameters_cnt*sizeof(char*));
139      else
140        parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
141      // Name every files that should be produced by the service execution
142      char* targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+10)*sizeof(char));
143      sprintf(targetName,"output_%s_%s_%s",s->name,input->name,uuid->value);
144      // TODO: We should verify if any optional tag for output is required (i.e. -out output.tiff *int8*)
145      // TODO: Add support for Array values
146      char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
147      sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
148      setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
149      parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
150      sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
151    }
152    input=input->next;
153  }
154 
155  // Produce the SBATCH File locally
156  char *scriptPath=(char*)malloc((strlen(s->name)+strlen(tmpPath->value)+strlen(uuid->value)+9)*sizeof(char));
157  sprintf(scriptPath,"%s/zoo_%s_%s.sh",tmpPath->value,s->name,uuid->value);
158  FILE* scriptFile=fopen(scriptPath,"w+");
159  maps* hpc_opts=getMaps(*main_conf,"sbatch_options");
160  map* hpc_opts_content=hpc_opts->content;
161  map* headerMap=getMapFromMaps(*main_conf,"HPC","header");
162  if(headerMap!=NULL){
163    // Use the header file if defined in the HPC section of the main.cfg file
164    struct stat f_status;
165    int s=stat(headerMap->value, &f_status);
166    if(s==0){
167      char* fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
168      FILE* f=fopen(headerMap->value,"rb");
169      fread(fcontent,f_status.st_size,1,f);
170      int fsize=f_status.st_size;
171      fcontent[fsize]=0;
172      fclose(f);
173      fprintf(scriptFile,"%s\n### --- ZOO-Service HEADER end --- ###",fcontent);
174      free(fcontent);
175    }else
176      fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service HEADER (no header found) *** ###\n\n");
177  }else
178    fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service HEADER *** ###\n\n");
179 
180  while(hpc_opts_content!=NULL){
181    fprintf(scriptFile,"#SBATCH --%s=%s\n",hpc_opts_content->name,hpc_opts_content->value);
182    hpc_opts_content=hpc_opts_content->next;
183  }
184  fprintf(scriptFile,"#SBATCH --job-name=ZOO-Project_%s_%s\n\n",uuid->value,s->name);
185  map* mods=getMap(s->content,"hpcModules");
186  if(mods!=NULL)
187    fprintf(scriptFile,"#SBATCH --export=MODULES=%s\n",mods->value);
188  else
189    fprintf(scriptFile,"#SBATCH --export=MODULES=\n");
190  map* sp=getMap(s->content,"serviceProvider");
191  // Require to produce the command line to be executed
192  fprintf(scriptFile,"\n\necho \"Job started at: $(date)\"\n");
193  fprintf(scriptFile,"echo \"Running service: [%s]\"\n",sp->value);
194  fprintf(scriptFile,"%s ",sp->value);
195  for(int i=0;i<parameters_cnt;i++){
196    fprintf(scriptFile," %s",parameters[i]);
197    //free(parameters[i]);
198  }
199  free(parameters);
200  fprintf(scriptFile,"\n");
201  fprintf(scriptFile,"echo \"Job finished at: $(date)\"\n");
202 
203  fflush(scriptFile);
204  fclose(scriptFile);
205
206  // Upload the SBATCH File to the remote host
207  targetPathMap=getMapFromMaps(*main_conf,"HPC","executePath");
208  if(targetPathMap==NULL){
209    setMapInMaps(*main_conf,"lenv","message",_("There is no executePath defined in you HPC section!"));
210    return SERVICE_FAILED;
211  }
212  char* targetName=strrchr(scriptPath,'/');
213  char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
214  sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
215  SSHCON *test=ssh_connect(*main_conf);
216  ssh_copy(*main_conf,scriptPath,targetPath);
217 
218  // Execute the SBATCH script remotely
219  map* subStr=getMapFromMaps(*main_conf,"HPC","subStr");
220  //char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+37)*sizeof(char));
221  char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+137)*sizeof(char));
222  //sprintf(command,"ls # %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value);
223  sprintf(command,"sbatch %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value);
224  if(ssh_exec(*main_conf,command)==0){
225    // The sbatch command has failed!
226    // Download the error log file from the HPC server
227    char tmpS[1024];
228    free(command);
229    command=(char*)malloc((strlen(targetPathMap->value)+strlen(uuid->value)+22)*sizeof(char));
230    sprintf(command,"%s/error_%s.log",targetPathMap->value,uuid->value);
231    targetName=strrchr(command,'/');
232    free(targetPath);
233    targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(targetName)+2)*sizeof(char));
234    sprintf(targetPath,"%s/%s",tmpPath->value,targetName);
235    if(ssh_fetch(*main_conf,targetPath,command)==0){
236      struct stat f_status;
237      int ts=stat(targetPath, &f_status);
238      if(ts==0) {
239        char* fcontent = NULL;
240        fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
241        FILE* f=fopen(targetPath,"rb");
242        fread(fcontent,f_status.st_size,1,f);
243        int fsize=f_status.st_size;
244        fcontent[fsize]=0;
245        fclose(f);
246        setMapInMaps(*main_conf,"lenv","message",fcontent);
247        free(fcontent);
248      }else
249        setMapInMaps(*main_conf,"lenv","message",_("No message provided"));
250    }else
251      setMapInMaps(*main_conf,"lenv","message",_("Unable to fetch the remote error log file"));
252    tmpPath=getMapFromMaps(*main_conf,"lenv","message");
253    sprintf(tmpS, "Cannot execute the HPC ZOO-Service %s: %s", s->name, tmpPath->value);
254    errorException(m,tmpS,"NoApplicableCode",NULL);
255    free(command);
256    free(targetPath);
257    ssh_close(*main_conf);
258    sleep(120);
259    return -1;
260  }
261
262  struct sockaddr_un addr;
263  memset(&addr, 0, sizeof(addr));
264  addr.sun_family = AF_UNIX;
265  int rc, cl, fd = socket(AF_UNIX, SOCK_STREAM, 0);
266  char buf[100];
267  char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(uuid->value)+19));
268  sprintf(sname,"%s/.wait_socket_%s.sock",tmpPath->value,uuid->value);
269  strncpy(addr.sun_path, sname, sizeof(addr.sun_path)-1);
270  if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
271    perror("bind error");
272    sleep(120);
273    return -1;
274  }
275  if (listen(fd, 5) == -1) {
276    perror("listen error");
277    sleep(120);
278    return -1;
279  }
280  /*fd_set master, read_fds;
281  int fdmax;
282  FD_ZERO(&master);
283  FD_ZERO(&read_fds);
284  FD_SET(fd, &master);
285  if (select(fd+1, &master, NULL, NULL, NULL) == -1) {
286    perror("select");
287    sleep(120);
288    return -1;
289  }
290  if (FD_ISSET(fd, &master)) {*/
291    if ( (cl = accept(fd, NULL, NULL)) == -1) {
292      perror("accept error");
293      sleep(120);
294      return -1;
295    }else{
296      int hasPassed=-1;
297      /*FD_SET(cl,&read_fds);
298      if (select(cl+1, &read_fds, NULL, NULL, NULL) == -1) {
299        perror("select");
300        sleep(120);
301        return -1;
302        }*/
303      while ( (rc=read(cl,buf,10)) ) {   
304        if(rc==0){
305          sleep(120);
306          return -1;
307        }else{
308          if(rc<0){
309            perror("read");
310            sleep(120);
311            return -1;
312          }
313        }
314        hasPassed=1;
315        res=atoi(buf);
316        unlink(sname);
317        if(res==3){
318          input=*real_outputs;
319          while(input!=NULL){
320            // TODO: parse all outputs including inner outputs if required.
321            if(input->child==NULL){
322              //map* dataPath=getMapFromMaps(*main_conf,"main","dataPath");
323              map* generatedFile=getMap(input->content,"generated_file");
324              if(generatedFile!=NULL){
325                char* filename=strrchr(generatedFile->value,'/');
326                char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char));
327                sprintf(targetPath,"%s/%s",tmpPath->value,filename);
328                test=ssh_connect(*main_conf);
329                if(ssh_fetch(*main_conf,targetPath,generatedFile->value)==0){
330                  setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
331                }else{
332                  char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char));
333                  sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename);
334                  setMapInMaps(*main_conf,"lenv","message",tmpStr);
335                  free(tmpStr);
336                  return SERVICE_FAILED;
337                }
338              }
339            }/*else{
340             // Generate the nested outputs based on each input value
341             if(getMaps(*real_inputs,input->name)!=NULL && getMapFromMaps(*real_inputs,input->name,"mimeType")!=NULL){
342             // Input was ComplexeData
343             maps* output=getMaps(*real_outputs,input->name);
344             map* cache=getMapFromMaps(*real_inputs,input->name,"cache_file");
345             setMapInMaps(output->child,"download","generated_file",cache->value);
346             setMapInMaps(output->child,"WCS_LINK","generated_file",cache->value);
347             setMapInMaps(output->child,"WCS_LINK","useMs","true");
348             setMapInMaps(output->child,"WMS_LINK","generated_file",cache->value);
349             setMapInMaps(output->child,"WMS_LINK","useMs","true");
350             }
351             }*/
352            input=input->next;
353          }
354        }
355      }
356      if(hasPassed<0){
357        perror("Failed to read");
358        setMapInMaps(*main_conf,"lenv","message",_("Unable to parse the value returned by remote execution"));
359        sleep(120);
360        return SERVICE_FAILED;
361      }
362    }
363    //}
364  ssh_close(*main_conf);
365  //sleep(120);
366  return res;
367}
368
369   
Note: See TracBrowser for help on using the repository browser.

Search

Context Navigation

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png