- Timestamp:
- Aug 7, 2017, 2:56:24 PM (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/prototype-v0/zoo-project/zoo-kernel/service_internal_hpc.c
r822 r839 2 2 * Author : Gérald FENOY 3 3 * 4 * Copyright (c) 2015 GeoLabs SARL 4 * Copyright (c) 2017 GeoLabs SARL 5 * 6 * This work was supported by public funds received in the framework of GEOSUD, 7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed 8 * by the French National Research Agency 5 9 * 6 10 * Permission is hereby granted, free of charge, to any person obtaining a copy … … 22 26 * THE SOFTWARE. 23 27 * 24 * See Ref: http://hg.orfeo-toolbox.org/OTB/ Copyright25 * Some parts of this code are derived from ITK. See ITKCopyright.txt for26 * details.27 28 */ 28 29 … … 30 31 #include "response_print.h" 31 32 #include "server_internal.h" 33 #include "service_callback.h" 34 #include "mimetypes.h" 32 35 #include <sys/un.h> 33 36 34 void appendOutputParameters(maps* input,char** parameters,int* cnt,service* s,map* uuid,map* targetPathMap){ 35 while(input!=NULL){ 36 if(input->child==NULL){ 37 *cnt+=1; 38 if(*cnt==1) 39 parameters=(char**)malloc((*cnt)*sizeof(char*)); 40 else 41 parameters=(char**)realloc(parameters,(*cnt)*sizeof(char*)); 42 // Name every files that should be produced by the service execution 43 char* targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+10)*sizeof(char)); 44 sprintf(targetName,"output_%s_%s_%s",s->name,input->name,uuid->value); 45 // TODO: We should verify if any optional tag for output is required (i.e. -out output.tiff int8) 46 char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char)); 47 sprintf(targetPath,"%s/%s",targetPathMap->value,targetName); 48 parameters[*cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char)); 49 sprintf(parameters[*cnt-1],"-%s %s",input->name,targetPath); 50 }else 51 appendOutputParameters(input->child,parameters,cnt,s,uuid,targetPathMap); 52 input=input->next; 37 typedef struct { 38 maps* conf; 39 char* local_file; 40 char* target_file; 41 } local_params; 42 43 #ifdef PTHREADS 44 #endif 45 46 void addNestedOutputs(service** s){ 47 if((*s)==NULL){ 48 return; 49 } 50 if(*s==NULL || (*s)->outputs==NULL){ 51 return; 52 } 53 elements *out=(*s)->outputs; 54 elements* cur=out; 55 map* serviceType=getMap((*s)->content,"ServiceType"); 56 if(strncmp(serviceType->value,"HPC",3)!=0) 57 return; 58 while(cur!=NULL && cur->defaults!=NULL){ 59 map* mimeType=getMap(cur->defaults->content,"mimeType"); 60 if(mimeType!=NULL){ 61 int geo=isGeographic(mimeType->value); 62 if(geo>0){ 63 elements *tmp[3]={ 64 dupElements(cur), 65 dupElements(cur), 66 dupElements(cur) 67 }; 68 char *geoLink="wcs_link"; 69 if(geo==2){ 70 geoLink="wfs_link"; 71 } 72 int i=0; 73 for(;i<3;i++){ 74 if(tmp[i]->next!=NULL){ 75 freeElements(&tmp[i]->next); 76 free(tmp[i]->next); 77 tmp[i]->next=NULL; 78 } 79 free(tmp[i]->name); 80 tmp[i]->format=zStrdup("ComplexData"); 81 freeMap(&tmp[i]->content); 82 free(tmp[i]->content); 83 tmp[i]->content=NULL; 84 switch(i){ 85 case 0: 86 tmp[i]->name=zStrdup("download_link"); 87 tmp[i]->content=createMap("Title",_("Download link")); 88 addToMap(tmp[i]->content,"Abstract",_("The download link")); 89 addToMap(tmp[i]->defaults->content,"useMapserver","false"); 90 if(tmp[i]->supported!=NULL){ 91 freeIOType(&tmp[i]->supported); 92 free(tmp[i]->supported); 93 tmp[i]->supported=NULL; 94 } 95 break; 96 case 1: 97 tmp[i]->name=zStrdup("wms_link"); 98 tmp[i]->content=createMap("Title",_("WMS link")); 99 addToMap(tmp[i]->content,"Abstract",_("The WMS link")); 100 if(tmp[i]->supported!=NULL && tmp[i]->supported->next!=NULL){ 101 freeIOType(&tmp[i]->supported->next); 102 free(tmp[i]->supported->next); 103 tmp[i]->supported->next=NULL; 104 }else{ 105 if(tmp[i]->supported!=NULL) 106 addToMap(tmp[i]->supported->content,"useMapserver","true"); 107 addToMap(tmp[i]->defaults->content,"useMapserver","true"); 108 } 109 break; 110 case 2: 111 if(geo==2){ 112 tmp[i]->name=zStrdup("wfs_link"); 113 tmp[i]->content=createMap("Title",_("WFS link")); 114 addToMap(tmp[i]->content,"Abstract",_("The WFS link")); 115 }else{ 116 tmp[i]->name=zStrdup("wcs_link"); 117 tmp[i]->content=createMap("Title",_("WCS link")); 118 addToMap(tmp[i]->content,"Abstract",_("The WCS link")); 119 } 120 if(tmp[i]->supported!=NULL && tmp[i]->supported->next!=NULL && 121 tmp[i]->supported->next->content!=NULL){ 122 freeIOType(&tmp[i]->supported); 123 free(tmp[i]->supported); 124 tmp[i]->supported=NULL; 125 tmp[i]->supported=createIoType(); 126 iotype* cnext=cur->supported->next; 127 tmp[i]->supported->content=createMap(cnext->content->name,cnext->content->value); 128 addMapToMap(&tmp[i]->supported->content,cnext->content->next); 129 addToMap(tmp[i]->supported->content,"useMapserver","true"); 130 }else 131 addToMap(tmp[i]->defaults->content,"useMapserver","true"); 132 break; 133 } 134 } 135 136 addToElements(&cur->child,tmp[0]); 137 addToElements(&cur->child,tmp[1]); 138 addToElements(&cur->child,tmp[2]); 139 free(cur->format); 140 cur->format=NULL; 141 if(cur->defaults!=NULL){ 142 freeIOType(&cur->defaults); 143 cur->defaults=NULL; 144 } 145 if(cur->supported!=NULL){ 146 freeIOType(&cur->supported); 147 cur->supported=NULL; 148 } 149 freeElements(&tmp[2]); 150 free(tmp[2]); 151 freeElements(&tmp[1]); 152 free(tmp[1]); 153 freeElements(&tmp[0]); 154 free(tmp[0]); 155 //addToMap(cur->content,"internal","true"); 156 } 157 } 158 cur=cur->next; 53 159 } 54 160 } … … 71 177 map* tmp=NULL; 72 178 int res=-1; 73 map* targetPathMap=getMapFromMaps(*main_conf,"HPC","storagePath"); 179 char *serviceType; 180 map* mServiceType=getMap(s->content,"serviceType"); 181 if(mServiceType!=NULL) 182 serviceType=mServiceType->value; 183 else 184 serviceType="HPC"; 185 map* targetPathMap=getMapFromMaps(*main_conf,serviceType,"storagePath"); 74 186 map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath"); 75 187 map* uuid=getMapFromMaps(*main_conf,"lenv","usid"); 76 77 sleep(5); 188 pthread_t threads_pool[50]; 189 // Force the HPC services to be called asynchronously 190 map* isAsync=getMapFromMaps(*main_conf,"lenv","async"); 191 if(isAsync==NULL){ 192 errorException(*main_conf,_("The synchronous mode is not supported by this type of service"),"NoSuchMode",s->name); 193 return -1; 194 } 78 195 79 196 maps* input=*real_inputs; 80 197 char **parameters=NULL; 81 198 int parameters_cnt=0; 82 while(input!=NULL){ 83 parameters_cnt+=1; 84 if(parameters_cnt==1) 85 parameters=(char**)malloc(parameters_cnt*sizeof(char*)); 86 else 87 parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*)); 88 if(getMap(input->content,"mimeType")!=NULL){ 89 // Input is ComplexData 90 SSHCON *test=ssh_connect(*main_conf); 91 dumpMaps(getMaps(*main_conf,"lenv")); 92 if(test==NULL){ 93 sleep(3600); 94 return -1; 95 } 96 if(getMap(input->content,"cache_file")==NULL){ 97 // Input data has been passed by value 98 // TODO: store data remotely 99 // TODO: publish input through MapServer / use output publication 100 dumpMapsValuesToFiles(main_conf,&input); 101 } 102 if(getMap(input->content,"cache_file")!=NULL){ 103 // Input data passed by reference or by value 104 map* tmp=getMap(input->content,"cache_file"); 105 char* targetName=strrchr(tmp->value,'/'); 106 char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char)); 107 sprintf(targetPath,"%s/%s",targetPathMap->value,targetName); 108 ssh_copy(*main_conf,tmp->value,targetPath); 109 parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char)); 110 sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath); 111 free(targetPath); 199 while(input!=NULL && input->content!=NULL){ 200 if(getMaps(*real_outputs,input->name)==NULL){ 201 parameters_cnt+=1; 202 if(parameters_cnt==1) 203 parameters=(char**)malloc(parameters_cnt*sizeof(char*)); 204 else 205 parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*)); 206 if(getMap(input->content,"mimeType")!=NULL){ 207 // Input is ComplexData 208 if(getMap(input->content,"cache_file")==NULL){ 209 // Input data has been passed by value 210 // TODO: publish input through MapServer / use output publication 211 dumpMapsValuesToFiles(main_conf,&input); 212 addToMap(input->content,"toPublish","true"); 213 } 214 if(getMap(input->content,"cache_file")!=NULL){ 215 map* length=getMap(input->content,"length"); 216 if(length==NULL){ 217 addToMap(input->content,"length","1"); 218 length=getMap(input->content,"length"); 219 } 220 int len=atoi(length->value); 221 int i=0; 222 for(i=0;i<len;i++){ 223 map* tmp=getMapArray(input->content,"cache_file",i); 224 char* targetName=strrchr(tmp->value,'/'); 225 char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char)); 226 sprintf(targetPath,"%s/%s",targetPathMap->value,targetName); 227 setMapArray(input->content,"targetPath",i,targetPath); 228 setMapArray(input->content,"localPath",i,tmp->value); 229 addToUploadQueue(main_conf,input); 230 if(i==0){ 231 parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char)); 232 sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath); 233 }else{ 234 char *tmpStr=zStrdup(parameters[parameters_cnt-1]); 235 parameters[parameters_cnt-1]=(char*)realloc(parameters[parameters_cnt-1],(strlen(tmpStr)+strlen(targetPath)+2)*sizeof(char)); 236 sprintf(parameters[parameters_cnt-1],"%s %s",tmpStr,targetPath); 237 free(tmpStr); 238 } 239 free(targetPath); 240 } 241 }else{ 242 // ??? 243 fprintf(stderr,"%s %d\n",__FILE__,__LINE__); 244 fflush(stderr); 245 } 112 246 }else{ 113 // ??? 114 fprintf(stderr,"%s %d\n",__FILE__,__LINE__); 115 fflush(stderr); 116 } 117 }else{ 118 // LitteralData and BboxData 119 if(getMap(input->content,"dataType")!=NULL){ 120 // For LitteralData, simply pass the value 121 map* val=getMap(input->content,"value"); 122 sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,val->value); 123 } 124 247 // LitteralData and BboxData 248 if(getMap(input->content,"dataType")!=NULL){ 249 // For LitteralData, simply pass the value 250 map* val=getMap(input->content,"value"); 251 parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(val->value)+3)*sizeof(char)); 252 sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,val->value); 253 } 254 } 125 255 } 126 256 input=input->next; 127 257 } 258 259 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 260 invokeCallback(m,inputs,NULL,2,0); 261 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 262 263 // Upload data on HPC 264 runUpload(main_conf); 265 266 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 267 invokeCallback(m,inputs,NULL,2,1); 268 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 128 269 129 270 // Add the filename to generate for every output to the parameters … … 140 281 parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*)); 141 282 // Name every files that should be produced by the service execution 142 char* targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+10)*sizeof(char)); 143 sprintf(targetName,"output_%s_%s_%s",s->name,input->name,uuid->value); 144 // TODO: We should verify if any optional tag for output is required (i.e. -out output.tiff *int8*) 145 // TODO: Add support for Array values 283 map* mime=getMap(input->content,"mimeType"); 284 char* targetName; 285 if(mime!=NULL){ 286 bool hasExt=false; 287 map* fileExt=getFileExtensionMap(mime->value,&hasExt); 288 targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+strlen(fileExt->value)+11)*sizeof(char)); 289 sprintf(targetName,"output_%s_%s_%s.%s",s->name,input->name,uuid->value,fileExt->value); 290 freeMap(&fileExt); 291 free(fileExt); 292 }else{ 293 targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+14)*sizeof(char)); 294 sprintf(targetName,"output_%s_%s_%s.tif",s->name,input->name,uuid->value); 295 } 146 296 char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char)); 147 297 sprintf(targetPath,"%s/%s",targetPathMap->value,targetName); 148 298 setMapInMaps(*real_outputs,input->name,"generated_file",targetPath); 149 parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char)); 150 sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath); 299 // We should verify if any optional tag for output is required 300 // (i.e. -out output.tiff *int8*), meaning that we should search 301 // for a corresponding inputs name. 302 map* inValue=getMapFromMaps(*real_inputs,input->name,"value"); 303 if(inValue!=NULL){ 304 parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+strlen(inValue->value)+4)*sizeof(char)); 305 sprintf(parameters[parameters_cnt-1],"-%s %s %s",input->name,targetPath,inValue->value); 306 }else{ 307 parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char)); 308 sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath); 309 } 310 free(targetPath); 311 }// In other case it means we need to return the cache_file as generated_file 312 else{ 313 parameters_cnt+=1; 314 if(parameters_cnt==1) 315 parameters=(char**)malloc(parameters_cnt*sizeof(char*)); 316 else 317 parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*)); 318 // Name every files that should be produced by the service execution 319 map* mime=getMap(input->child->content,"mimeType"); 320 char* targetName; 321 if(mime!=NULL){ 322 bool hasExt=false; 323 map* fileExt=getFileExtensionMap(mime->value,&hasExt); 324 targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+strlen(fileExt->value)+11)*sizeof(char)); 325 sprintf(targetName,"output_%s_%s_%s.%s",s->name,input->name,uuid->value,fileExt->value); 326 freeMap(&fileExt); 327 free(fileExt); 328 }else{ 329 targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+14)*sizeof(char)); 330 sprintf(targetName,"output_%s_%s_%s.tif",s->name,input->name,uuid->value); 331 } 332 char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char)); 333 sprintf(targetPath,"%s/%s",targetPathMap->value,targetName); 334 addToMap(input->content,"generated_file",targetPath); 335 // We should verify if any optional tag for output is required 336 // (i.e. -out output.tiff *int8*), meaning that we should search 337 // for a corresponding inputs name. 338 map* inValue=getMapFromMaps(*real_inputs,input->name,"value"); 339 if(inValue!=NULL){ 340 parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+strlen(inValue->value)+4)*sizeof(char)); 341 sprintf(parameters[parameters_cnt-1],"-%s %s %s",input->name,targetPath,inValue->value); 342 }else{ 343 parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char)); 344 sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath); 345 } 346 free(targetPath); 151 347 } 152 348 input=input->next; 153 349 } 154 350 351 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 352 fflush(stderr); 353 invokeCallback(m,inputs,NULL,3,0); 354 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 355 fflush(stderr); 356 155 357 // Produce the SBATCH File locally 156 char *scriptPath=(char*)malloc((strlen(s->name)+strlen(tmpPath->value)+strlen(uuid->value)+ 9)*sizeof(char));358 char *scriptPath=(char*)malloc((strlen(s->name)+strlen(tmpPath->value)+strlen(uuid->value)+10)*sizeof(char)); 157 359 sprintf(scriptPath,"%s/zoo_%s_%s.sh",tmpPath->value,s->name,uuid->value); 360 setMapInMaps(*main_conf,"lenv","local_script",scriptPath); 361 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 362 fflush(stderr); 363 invokeCallback(m,inputs,NULL,3,0); 364 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 365 fflush(stderr); 158 366 FILE* scriptFile=fopen(scriptPath,"w+"); 159 maps* hpc_opts=getMaps(*main_conf,"sbatch_options"); 160 map* hpc_opts_content=hpc_opts->content; 161 map* headerMap=getMapFromMaps(*main_conf,"HPC","header"); 367 map* headerMap=getMapFromMaps(*main_conf,serviceType,"header"); 162 368 if(headerMap!=NULL){ 163 369 // Use the header file if defined in the HPC section of the main.cfg file … … 171 377 fcontent[fsize]=0; 172 378 fclose(f); 173 fprintf(scriptFile,"%s\n### --- ZOO-Service HEADER end --- ### ",fcontent);379 fprintf(scriptFile,"%s\n### --- ZOO-Service HEADER end --- ###\n\n",fcontent); 174 380 free(fcontent); 175 381 }else … … 177 383 }else 178 384 fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service HEADER *** ###\n\n"); 179 180 while(hpc_opts_content!=NULL){ 181 fprintf(scriptFile,"#SBATCH --%s=%s\n",hpc_opts_content->name,hpc_opts_content->value); 182 hpc_opts_content=hpc_opts_content->next; 385 maps* hpc_opts=getMaps(*main_conf,"sbatch_options"); 386 if(hpc_opts!=NULL){ 387 map* hpc_opts_content=hpc_opts->content; 388 while(hpc_opts_content!=NULL){ 389 fprintf(scriptFile,"#SBATCH --%s=%s\n",hpc_opts_content->name,hpc_opts_content->value); 390 hpc_opts_content=hpc_opts_content->next; 391 } 183 392 } 184 393 fprintf(scriptFile,"#SBATCH --job-name=ZOO-Project_%s_%s\n\n",uuid->value,s->name); … … 186 395 if(mods!=NULL) 187 396 fprintf(scriptFile,"#SBATCH --export=MODULES=%s\n",mods->value); 188 else 189 fprintf(scriptFile,"#SBATCH --export=MODULES=\n"); 397 398 map* bodyMap=getMapFromMaps(*main_conf,serviceType,"body"); 399 if(bodyMap!=NULL){ 400 // Use the header file if defined in the HPC section of the main.cfg file 401 struct stat f_status; 402 int s=stat(bodyMap->value, &f_status); 403 if(s==0){ 404 char* fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1)); 405 FILE* f=fopen(bodyMap->value,"rb"); 406 fread(fcontent,f_status.st_size,1,f); 407 int fsize=f_status.st_size; 408 fcontent[fsize]=0; 409 fclose(f); 410 fprintf(scriptFile,"%s\n### --- ZOO-Service BODY end --- ###\n\n",fcontent); 411 free(fcontent); 412 }else 413 fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service BODY (no body found) *** ###\n\n"); 414 }else 415 fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service BODY *** ###\n\n"); 416 417 190 418 map* sp=getMap(s->content,"serviceProvider"); 419 191 420 // Require to produce the command line to be executed 192 421 fprintf(scriptFile,"\n\necho \"Job started at: $(date)\"\n"); … … 195 424 for(int i=0;i<parameters_cnt;i++){ 196 425 fprintf(scriptFile," %s",parameters[i]); 197 //free(parameters[i]); 426 } 427 for(int i=parameters_cnt-1;i>=0;i--){ 428 free(parameters[i]); 198 429 } 199 430 free(parameters); 200 431 fprintf(scriptFile,"\n"); 201 432 fprintf(scriptFile,"echo \"Job finished at: $(date)\"\n"); 202 203 433 fflush(scriptFile); 204 434 fclose(scriptFile); 435 436 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 437 invokeCallback(m,inputs,NULL,3,1); 438 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 205 439 206 440 // Upload the SBATCH File to the remote host 207 targetPathMap=getMapFromMaps(*main_conf,"HPC","executePath"); 441 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 442 invokeCallback(m,inputs,NULL,4,0); 443 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 444 targetPathMap=getMapFromMaps(*main_conf,serviceType,"executePath"); 208 445 if(targetPathMap==NULL){ 209 446 setMapInMaps(*main_conf,"lenv","message",_("There is no executePath defined in you HPC section!")); … … 213 450 char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char)); 214 451 sprintf(targetPath,"%s/%s",targetPathMap->value,targetName); 452 setMapInMaps(*main_conf,"lenv","remote_script",targetPath); 215 453 SSHCON *test=ssh_connect(*main_conf); 216 ssh_copy(*main_conf,scriptPath,targetPath );454 ssh_copy(*main_conf,scriptPath,targetPath,ssh_get_cnt(*main_conf)); 217 455 218 456 // Execute the SBATCH script remotely 219 457 map* subStr=getMapFromMaps(*main_conf,"HPC","subStr"); 220 //char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+37)*sizeof(char));221 458 char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+137)*sizeof(char)); 222 //sprintf(command,"ls # %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value);223 459 sprintf(command,"sbatch %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value); 224 if(ssh_exec(*main_conf,command )==0){460 if(ssh_exec(*main_conf,command,ssh_get_cnt(m))==0){ 225 461 // The sbatch command has failed! 226 462 // Download the error log file from the HPC server 227 463 char tmpS[1024]; 228 464 free(command); 229 command=(char*)malloc((strlen(targetPathMap->value)+strlen(uuid->value)+ 22)*sizeof(char));465 command=(char*)malloc((strlen(targetPathMap->value)+strlen(uuid->value)+11)*sizeof(char)); 230 466 sprintf(command,"%s/error_%s.log",targetPathMap->value,uuid->value); 231 467 targetName=strrchr(command,'/'); … … 233 469 targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(targetName)+2)*sizeof(char)); 234 470 sprintf(targetPath,"%s/%s",tmpPath->value,targetName); 235 if(ssh_fetch(*main_conf,targetPath,command )==0){471 if(ssh_fetch(*main_conf,targetPath,command,ssh_get_cnt(m))==0){ 236 472 struct stat f_status; 237 473 int ts=stat(targetPath, &f_status); … … 251 487 setMapInMaps(*main_conf,"lenv","message",_("Unable to fetch the remote error log file")); 252 488 tmpPath=getMapFromMaps(*main_conf,"lenv","message"); 489 invokeCallback(m,NULL,NULL,7,1); 253 490 sprintf(tmpS, "Cannot execute the HPC ZOO-Service %s: %s", s->name, tmpPath->value); 254 491 errorException(m,tmpS,"NoApplicableCode",NULL); … … 259 496 return -1; 260 497 } 498 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 499 fflush(stderr); 500 invokeCallback(m,NULL,NULL,4,1); 501 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 502 fflush(stderr); 503 free(command); 261 504 262 505 struct sockaddr_un addr; … … 264 507 addr.sun_family = AF_UNIX; 265 508 int rc, cl, fd = socket(AF_UNIX, SOCK_STREAM, 0); 266 char buf[100]; 267 char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(uuid->value)+19)); 509 char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(uuid->value)+20)); 268 510 sprintf(sname,"%s/.wait_socket_%s.sock",tmpPath->value,uuid->value); 269 511 strncpy(addr.sun_path, sname, sizeof(addr.sun_path)-1); 512 270 513 if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { 271 514 perror("bind error"); … … 274 517 } 275 518 if (listen(fd, 5) == -1) { 276 perror("listen error"); 277 sleep(120); 519 setMapInMaps(*main_conf,"lenv","message",_("Listen error")); 278 520 return -1; 279 521 } 280 /*fd_set master, read_fds; 281 int fdmax; 282 FD_ZERO(&master); 283 FD_ZERO(&read_fds); 284 FD_SET(fd, &master); 285 if (select(fd+1, &master, NULL, NULL, NULL) == -1) { 286 perror("select"); 287 sleep(120); 522 if ( (cl = accept(fd, NULL, NULL)) == -1) { 523 setMapInMaps(*main_conf,"lenv","message",_("Accept error")); 288 524 return -1; 289 } 290 if (FD_ISSET(fd, &master)) {*/ 291 if ( (cl = accept(fd, NULL, NULL)) == -1) { 292 perror("accept error"); 525 }else{ 526 int hasPassed=-1; 527 char buf[11]; 528 memset(&buf,0,11); 529 while ( (rc=read(cl,buf,10)) ) { 530 if(rc==0){ 531 sleep(1); 532 setMapInMaps(*main_conf,"lenv","message",_("Read closed")); 533 return -1; 534 }else{ 535 if(rc<0){ 536 setMapInMaps(*main_conf,"lenv","message",_("Read error")); 537 return -1; 538 } 539 } 540 hasPassed=1; 541 res=atoi(buf); 542 unlink(sname); 543 //free(sname); 544 545 if(res==3){ 546 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 547 fflush(stderr); 548 invokeCallback(m,NULL,outputs,5,0); 549 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 550 fflush(stderr); 551 input=*real_outputs; 552 while(input!=NULL){ 553 if(input->child==NULL){ 554 map* generatedFile=getMap(input->content,"generated_file"); 555 if(generatedFile!=NULL){ 556 char* filename=strrchr(generatedFile->value,'/'); 557 char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char)); 558 sprintf(targetPath,"%s/%s",tmpPath->value,filename); 559 test=ssh_connect(*main_conf); 560 if(ssh_fetch(*main_conf,targetPath,generatedFile->value,ssh_get_cnt(m))==0){ 561 setMapInMaps(*real_outputs,input->name,"generated_file",targetPath); 562 free(targetPath); 563 }else{ 564 char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char)); 565 sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename); 566 setMapInMaps(*main_conf,"lenv","message",tmpStr); 567 free(tmpStr); 568 return SERVICE_FAILED; 569 } 570 } 571 }else{ 572 fprintf(stderr,"%s %d\n",__FILE__,__LINE__); 573 fflush(stderr); 574 map* generatedFile=getMap(input->content,"generated_file"); 575 if(generatedFile!=NULL){ 576 char* filename=strrchr(generatedFile->value,'/'); 577 char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char)); 578 sprintf(targetPath,"%s/%s",tmpPath->value,filename); 579 test=ssh_connect(*main_conf); 580 if(ssh_fetch(*main_conf,targetPath,generatedFile->value,ssh_get_cnt(m))==0){ 581 maps* tmp=getMaps(*real_outputs,input->name); 582 freeMap(&tmp->content); 583 free(tmp->content); 584 tmp->content=NULL; 585 maps* output=getMaps(*real_outputs,input->name); 586 setMapInMaps(output->child,"download_link","generated_file",targetPath); 587 setMapInMaps(output->child,"download_link","useMapserver","false"); 588 setMapInMaps(output->child,"WMS_LINK","generated_file",targetPath); 589 setMapInMaps(output->child,"WMS_LINK","useMapserver","true"); 590 setMapInMaps(output->child,"WCS_LINK","generated_file",targetPath); 591 setMapInMaps(output->child,"WCS_LINK","useMapserver","true"); 592 } 593 } 594 } 595 input=input->next; 596 } 597 } 598 //free(buf); 599 } 600 if(hasPassed<0){ 601 perror("Failed to read"); 602 setMapInMaps(*main_conf,"lenv","message",_("Unable to parse the value returned by remote execution")); 293 603 sleep(120); 294 return -1; 295 }else{ 296 int hasPassed=-1; 297 /*FD_SET(cl,&read_fds); 298 if (select(cl+1, &read_fds, NULL, NULL, NULL) == -1) { 299 perror("select"); 300 sleep(120); 301 return -1; 302 }*/ 303 while ( (rc=read(cl,buf,10)) ) { 304 if(rc==0){ 305 sleep(120); 306 return -1; 307 }else{ 308 if(rc<0){ 309 perror("read"); 310 sleep(120); 311 return -1; 312 } 313 } 314 hasPassed=1; 315 res=atoi(buf); 316 unlink(sname); 317 if(res==3){ 318 input=*real_outputs; 319 while(input!=NULL){ 320 // TODO: parse all outputs including inner outputs if required. 321 if(input->child==NULL){ 322 //map* dataPath=getMapFromMaps(*main_conf,"main","dataPath"); 323 map* generatedFile=getMap(input->content,"generated_file"); 324 if(generatedFile!=NULL){ 325 char* filename=strrchr(generatedFile->value,'/'); 326 char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char)); 327 sprintf(targetPath,"%s/%s",tmpPath->value,filename); 328 test=ssh_connect(*main_conf); 329 if(ssh_fetch(*main_conf,targetPath,generatedFile->value)==0){ 330 setMapInMaps(*real_outputs,input->name,"generated_file",targetPath); 331 }else{ 332 char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char)); 333 sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename); 334 setMapInMaps(*main_conf,"lenv","message",tmpStr); 335 free(tmpStr); 336 return SERVICE_FAILED; 337 } 338 } 339 }/*else{ 340 // Generate the nested outputs based on each input value 341 if(getMaps(*real_inputs,input->name)!=NULL && getMapFromMaps(*real_inputs,input->name,"mimeType")!=NULL){ 342 // Input was ComplexeData 343 maps* output=getMaps(*real_outputs,input->name); 344 map* cache=getMapFromMaps(*real_inputs,input->name,"cache_file"); 345 setMapInMaps(output->child,"download","generated_file",cache->value); 346 setMapInMaps(output->child,"WCS_LINK","generated_file",cache->value); 347 setMapInMaps(output->child,"WCS_LINK","useMs","true"); 348 setMapInMaps(output->child,"WMS_LINK","generated_file",cache->value); 349 setMapInMaps(output->child,"WMS_LINK","useMs","true"); 350 } 351 }*/ 352 input=input->next; 353 } 354 } 355 } 356 if(hasPassed<0){ 357 perror("Failed to read"); 358 setMapInMaps(*main_conf,"lenv","message",_("Unable to parse the value returned by remote execution")); 359 sleep(120); 360 return SERVICE_FAILED; 361 } 604 return SERVICE_FAILED; 362 605 } 363 //}606 } 364 607 ssh_close(*main_conf); 365 //sleep(120);366 608 return res; 367 609 }
Note: See TracChangeset
for help on using the changeset viewer.