Ignore:
Timestamp:
Aug 7, 2017, 2:56:24 PM (7 years ago)
Author:
djay
Message:

Update the source code for HPC support. Automatically adding nested outputs for the HPC support (should this be available for every support?). Add capability to store the metadata in the Collection DataBase?. Addition of the zcfg2sql to import any existing ZCFG file into the Collection DB. Add the support to invoke a callback (for history purpose) in case a [callback] section contains at least one parameter defined (url). Add support to convert maps and map to JSON (for callback use only by now). Fix some memory leaks (some are still there).

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/prototype-v0/zoo-project/zoo-kernel/service_internal_hpc.c

    r822 r839  
    22 * Author : Gérald FENOY
    33 *
    4  * Copyright (c) 2015 GeoLabs SARL
     4 * Copyright (c) 2017 GeoLabs SARL
     5 *
     6 * This work was supported by public funds received in the framework of GEOSUD,
     7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
     8 * by the French National Research Agency
    59 *
    610 * Permission is hereby granted, free of charge, to any person obtaining a copy
     
    2226 * THE SOFTWARE.
    2327 *
    24  * See Ref: http://hg.orfeo-toolbox.org/OTB/ Copyright
    25  * Some parts of this code are derived from ITK. See ITKCopyright.txt for
    26  * details.
    2728 */
    2829
     
    3031#include "response_print.h"
    3132#include "server_internal.h"
     33#include "service_callback.h"
     34#include "mimetypes.h"
    3235#include <sys/un.h>
    3336
    34 void appendOutputParameters(maps* input,char** parameters,int* cnt,service* s,map* uuid,map* targetPathMap){
    35   while(input!=NULL){
    36     if(input->child==NULL){
    37       *cnt+=1;
    38       if(*cnt==1)
    39         parameters=(char**)malloc((*cnt)*sizeof(char*));
    40       else
    41         parameters=(char**)realloc(parameters,(*cnt)*sizeof(char*));
    42       // Name every files that should be produced by the service execution
    43       char* targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+10)*sizeof(char));
    44       sprintf(targetName,"output_%s_%s_%s",s->name,input->name,uuid->value);
    45       // TODO: We should verify if any optional tag for output is required (i.e. -out output.tiff int8)
    46       char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
    47       sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
    48       parameters[*cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
    49       sprintf(parameters[*cnt-1],"-%s %s",input->name,targetPath);
    50     }else
    51       appendOutputParameters(input->child,parameters,cnt,s,uuid,targetPathMap);
    52     input=input->next;
     37typedef struct {
     38  maps* conf;
     39  char* local_file;
     40  char* target_file;
     41} local_params;
     42
     43#ifdef PTHREADS
     44#endif
     45
     46void addNestedOutputs(service** s){
     47  if((*s)==NULL){
     48    return;
     49  }   
     50  if(*s==NULL || (*s)->outputs==NULL){
     51    return;
     52  }
     53  elements *out=(*s)->outputs;
     54  elements* cur=out;
     55  map* serviceType=getMap((*s)->content,"ServiceType");
     56  if(strncmp(serviceType->value,"HPC",3)!=0)
     57    return;
     58  while(cur!=NULL && cur->defaults!=NULL){
     59    map* mimeType=getMap(cur->defaults->content,"mimeType");
     60    if(mimeType!=NULL){
     61      int geo=isGeographic(mimeType->value);
     62      if(geo>0){
     63        elements *tmp[3]={
     64          dupElements(cur),
     65          dupElements(cur),
     66          dupElements(cur)
     67        };
     68        char *geoLink="wcs_link";
     69        if(geo==2){
     70          geoLink="wfs_link";
     71        }
     72        int i=0;
     73        for(;i<3;i++){
     74          if(tmp[i]->next!=NULL){
     75            freeElements(&tmp[i]->next);
     76            free(tmp[i]->next);
     77            tmp[i]->next=NULL;
     78          }
     79          free(tmp[i]->name);
     80          tmp[i]->format=zStrdup("ComplexData");
     81          freeMap(&tmp[i]->content);
     82          free(tmp[i]->content);
     83          tmp[i]->content=NULL;
     84          switch(i){
     85          case 0:
     86            tmp[i]->name=zStrdup("download_link");
     87            tmp[i]->content=createMap("Title",_("Download link"));
     88            addToMap(tmp[i]->content,"Abstract",_("The download link"));
     89            addToMap(tmp[i]->defaults->content,"useMapserver","false");
     90            if(tmp[i]->supported!=NULL){
     91              freeIOType(&tmp[i]->supported);
     92              free(tmp[i]->supported);
     93              tmp[i]->supported=NULL;
     94            }
     95            break;
     96          case 1:
     97            tmp[i]->name=zStrdup("wms_link");
     98            tmp[i]->content=createMap("Title",_("WMS link"));
     99            addToMap(tmp[i]->content,"Abstract",_("The WMS link"));
     100            if(tmp[i]->supported!=NULL && tmp[i]->supported->next!=NULL){
     101              freeIOType(&tmp[i]->supported->next);
     102              free(tmp[i]->supported->next);
     103              tmp[i]->supported->next=NULL;
     104            }else{
     105              if(tmp[i]->supported!=NULL)
     106                addToMap(tmp[i]->supported->content,"useMapserver","true");
     107              addToMap(tmp[i]->defaults->content,"useMapserver","true");
     108            }
     109            break;
     110          case 2:
     111            if(geo==2){
     112              tmp[i]->name=zStrdup("wfs_link");
     113              tmp[i]->content=createMap("Title",_("WFS link"));
     114              addToMap(tmp[i]->content,"Abstract",_("The WFS link"));
     115            }else{
     116              tmp[i]->name=zStrdup("wcs_link");
     117              tmp[i]->content=createMap("Title",_("WCS link"));
     118              addToMap(tmp[i]->content,"Abstract",_("The WCS link"));
     119            }
     120            if(tmp[i]->supported!=NULL && tmp[i]->supported->next!=NULL &&
     121               tmp[i]->supported->next->content!=NULL){
     122              freeIOType(&tmp[i]->supported);
     123              free(tmp[i]->supported);
     124              tmp[i]->supported=NULL;
     125              tmp[i]->supported=createIoType();
     126              iotype* cnext=cur->supported->next;
     127              tmp[i]->supported->content=createMap(cnext->content->name,cnext->content->value);
     128              addMapToMap(&tmp[i]->supported->content,cnext->content->next);
     129              addToMap(tmp[i]->supported->content,"useMapserver","true");
     130            }else
     131              addToMap(tmp[i]->defaults->content,"useMapserver","true");
     132            break;
     133          }
     134        }
     135       
     136        addToElements(&cur->child,tmp[0]);
     137        addToElements(&cur->child,tmp[1]);
     138        addToElements(&cur->child,tmp[2]);
     139        free(cur->format);
     140        cur->format=NULL;
     141        if(cur->defaults!=NULL){
     142          freeIOType(&cur->defaults);
     143          cur->defaults=NULL;
     144        }
     145        if(cur->supported!=NULL){
     146          freeIOType(&cur->supported);
     147          cur->supported=NULL;
     148        }
     149        freeElements(&tmp[2]);
     150        free(tmp[2]);
     151        freeElements(&tmp[1]);
     152        free(tmp[1]);
     153        freeElements(&tmp[0]);
     154        free(tmp[0]);
     155        //addToMap(cur->content,"internal","true");
     156      }
     157    }
     158    cur=cur->next;
    53159  }
    54160}
     
    71177  map* tmp=NULL;
    72178  int res=-1;
    73   map* targetPathMap=getMapFromMaps(*main_conf,"HPC","storagePath");
     179  char *serviceType;
     180  map* mServiceType=getMap(s->content,"serviceType");
     181  if(mServiceType!=NULL)
     182    serviceType=mServiceType->value;
     183  else
     184    serviceType="HPC";
     185  map* targetPathMap=getMapFromMaps(*main_conf,serviceType,"storagePath");
    74186  map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath");
    75187  map* uuid=getMapFromMaps(*main_conf,"lenv","usid");
    76 
    77   sleep(5);
     188  pthread_t threads_pool[50];
     189  // Force the HPC services to be called asynchronously
     190  map* isAsync=getMapFromMaps(*main_conf,"lenv","async");
     191  if(isAsync==NULL){
     192    errorException(*main_conf,_("The synchronous mode is not supported by this type of service"),"NoSuchMode",s->name);
     193    return -1;
     194  }
    78195
    79196  maps* input=*real_inputs;
    80197  char **parameters=NULL;
    81198  int parameters_cnt=0;
    82   while(input!=NULL){
    83     parameters_cnt+=1;
    84     if(parameters_cnt==1)
    85       parameters=(char**)malloc(parameters_cnt*sizeof(char*));
    86     else
    87       parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
    88     if(getMap(input->content,"mimeType")!=NULL){
    89       // Input is ComplexData
    90       SSHCON *test=ssh_connect(*main_conf);
    91       dumpMaps(getMaps(*main_conf,"lenv"));
    92       if(test==NULL){   
    93         sleep(3600);
    94         return -1;
    95       }
    96       if(getMap(input->content,"cache_file")==NULL){
    97         // Input data has been passed by value
    98         // TODO: store data remotely
    99         // TODO: publish input through MapServer / use output publication
    100         dumpMapsValuesToFiles(main_conf,&input);
    101       }
    102       if(getMap(input->content,"cache_file")!=NULL){
    103         // Input data passed by reference or by value
    104         map* tmp=getMap(input->content,"cache_file");
    105         char* targetName=strrchr(tmp->value,'/');
    106         char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
    107         sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
    108         ssh_copy(*main_conf,tmp->value,targetPath);
    109         parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
    110         sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
    111         free(targetPath);
     199  while(input!=NULL && input->content!=NULL){
     200    if(getMaps(*real_outputs,input->name)==NULL){
     201      parameters_cnt+=1;
     202      if(parameters_cnt==1)
     203        parameters=(char**)malloc(parameters_cnt*sizeof(char*));
     204      else
     205        parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
     206      if(getMap(input->content,"mimeType")!=NULL){
     207        // Input is ComplexData
     208        if(getMap(input->content,"cache_file")==NULL){
     209          // Input data has been passed by value
     210          // TODO: publish input through MapServer / use output publication
     211          dumpMapsValuesToFiles(main_conf,&input);
     212          addToMap(input->content,"toPublish","true");
     213        }
     214        if(getMap(input->content,"cache_file")!=NULL){
     215          map* length=getMap(input->content,"length");
     216          if(length==NULL){
     217            addToMap(input->content,"length","1");
     218            length=getMap(input->content,"length");
     219          }
     220          int len=atoi(length->value);
     221          int i=0;
     222          for(i=0;i<len;i++){
     223            map* tmp=getMapArray(input->content,"cache_file",i);
     224            char* targetName=strrchr(tmp->value,'/');
     225            char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
     226            sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
     227            setMapArray(input->content,"targetPath",i,targetPath);
     228            setMapArray(input->content,"localPath",i,tmp->value);
     229            addToUploadQueue(main_conf,input);
     230            if(i==0){
     231              parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
     232              sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
     233            }else{
     234              char *tmpStr=zStrdup(parameters[parameters_cnt-1]);
     235              parameters[parameters_cnt-1]=(char*)realloc(parameters[parameters_cnt-1],(strlen(tmpStr)+strlen(targetPath)+2)*sizeof(char));
     236              sprintf(parameters[parameters_cnt-1],"%s %s",tmpStr,targetPath);
     237              free(tmpStr);
     238            }
     239            free(targetPath);
     240          }
     241        }else{
     242          // ???
     243          fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
     244          fflush(stderr);
     245        }
    112246      }else{
    113         // ???
    114         fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
    115         fflush(stderr);
    116       }
    117     }else{
    118       // LitteralData and BboxData
    119       if(getMap(input->content,"dataType")!=NULL){
    120         // For LitteralData, simply pass the value
    121         map* val=getMap(input->content,"value");
    122         sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,val->value);
    123       }
    124      
     247        // LitteralData and BboxData
     248        if(getMap(input->content,"dataType")!=NULL){
     249          // For LitteralData, simply pass the value
     250          map* val=getMap(input->content,"value");
     251          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(val->value)+3)*sizeof(char));
     252          sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,val->value);
     253        }
     254      }
    125255    }
    126256    input=input->next;
    127257  }
     258
     259  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     260  invokeCallback(m,inputs,NULL,2,0);
     261  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     262
     263  // Upload data on HPC
     264  runUpload(main_conf);
     265 
     266  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     267  invokeCallback(m,inputs,NULL,2,1);
     268  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
    128269
    129270  // Add the filename to generate for every output to the parameters
     
    140281        parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
    141282      // Name every files that should be produced by the service execution
    142       char* targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+10)*sizeof(char));
    143       sprintf(targetName,"output_%s_%s_%s",s->name,input->name,uuid->value);
    144       // TODO: We should verify if any optional tag for output is required (i.e. -out output.tiff *int8*)
    145       // TODO: Add support for Array values
     283      map* mime=getMap(input->content,"mimeType");
     284      char* targetName;
     285      if(mime!=NULL){
     286        bool hasExt=false;
     287        map* fileExt=getFileExtensionMap(mime->value,&hasExt);
     288        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+strlen(fileExt->value)+11)*sizeof(char));
     289        sprintf(targetName,"output_%s_%s_%s.%s",s->name,input->name,uuid->value,fileExt->value);
     290        freeMap(&fileExt);
     291        free(fileExt);
     292      }else{
     293        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+14)*sizeof(char));
     294        sprintf(targetName,"output_%s_%s_%s.tif",s->name,input->name,uuid->value);
     295      }
    146296      char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
    147297      sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
    148298      setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
    149       parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
    150       sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
     299      // We should verify if any optional tag for output is required
     300      // (i.e. -out output.tiff *int8*), meaning that we should search
     301      // for a corresponding inputs name.
     302      map* inValue=getMapFromMaps(*real_inputs,input->name,"value");
     303      if(inValue!=NULL){
     304        parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+strlen(inValue->value)+4)*sizeof(char));
     305        sprintf(parameters[parameters_cnt-1],"-%s %s %s",input->name,targetPath,inValue->value);
     306      }else{
     307        parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
     308        sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
     309      }
     310      free(targetPath);
     311    }// In other case it means we need to return the cache_file as generated_file
     312    else{
     313      parameters_cnt+=1;
     314      if(parameters_cnt==1)
     315        parameters=(char**)malloc(parameters_cnt*sizeof(char*));
     316      else
     317        parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
     318      // Name every files that should be produced by the service execution
     319      map* mime=getMap(input->child->content,"mimeType");
     320      char* targetName;
     321      if(mime!=NULL){
     322        bool hasExt=false;
     323        map* fileExt=getFileExtensionMap(mime->value,&hasExt);
     324        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+strlen(fileExt->value)+11)*sizeof(char));
     325        sprintf(targetName,"output_%s_%s_%s.%s",s->name,input->name,uuid->value,fileExt->value);
     326        freeMap(&fileExt);
     327        free(fileExt);
     328      }else{
     329        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+14)*sizeof(char));
     330        sprintf(targetName,"output_%s_%s_%s.tif",s->name,input->name,uuid->value);
     331      }
     332      char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
     333      sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
     334      addToMap(input->content,"generated_file",targetPath);
     335      // We should verify if any optional tag for output is required
     336      // (i.e. -out output.tiff *int8*), meaning that we should search
     337      // for a corresponding inputs name.
     338      map* inValue=getMapFromMaps(*real_inputs,input->name,"value");
     339      if(inValue!=NULL){
     340        parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+strlen(inValue->value)+4)*sizeof(char));
     341        sprintf(parameters[parameters_cnt-1],"-%s %s %s",input->name,targetPath,inValue->value);
     342      }else{
     343        parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
     344        sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
     345      }
     346      free(targetPath);
    151347    }
    152348    input=input->next;
    153349  }
    154350 
     351  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     352  fflush(stderr);
     353  invokeCallback(m,inputs,NULL,3,0);
     354  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     355  fflush(stderr);
     356
    155357  // Produce the SBATCH File locally
    156   char *scriptPath=(char*)malloc((strlen(s->name)+strlen(tmpPath->value)+strlen(uuid->value)+9)*sizeof(char));
     358  char *scriptPath=(char*)malloc((strlen(s->name)+strlen(tmpPath->value)+strlen(uuid->value)+10)*sizeof(char));
    157359  sprintf(scriptPath,"%s/zoo_%s_%s.sh",tmpPath->value,s->name,uuid->value);
     360  setMapInMaps(*main_conf,"lenv","local_script",scriptPath);
     361  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     362  fflush(stderr);
     363  invokeCallback(m,inputs,NULL,3,0);
     364  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     365  fflush(stderr);
    158366  FILE* scriptFile=fopen(scriptPath,"w+");
    159   maps* hpc_opts=getMaps(*main_conf,"sbatch_options");
    160   map* hpc_opts_content=hpc_opts->content;
    161   map* headerMap=getMapFromMaps(*main_conf,"HPC","header");
     367  map* headerMap=getMapFromMaps(*main_conf,serviceType,"header");
    162368  if(headerMap!=NULL){
    163369    // Use the header file if defined in the HPC section of the main.cfg file
     
    171377      fcontent[fsize]=0;
    172378      fclose(f);
    173       fprintf(scriptFile,"%s\n### --- ZOO-Service HEADER end --- ###",fcontent);
     379      fprintf(scriptFile,"%s\n### --- ZOO-Service HEADER end --- ###\n\n",fcontent);
    174380      free(fcontent);
    175381    }else
     
    177383  }else
    178384    fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service HEADER *** ###\n\n");
    179  
    180   while(hpc_opts_content!=NULL){
    181     fprintf(scriptFile,"#SBATCH --%s=%s\n",hpc_opts_content->name,hpc_opts_content->value);
    182     hpc_opts_content=hpc_opts_content->next;
     385  maps* hpc_opts=getMaps(*main_conf,"sbatch_options");
     386  if(hpc_opts!=NULL){
     387    map* hpc_opts_content=hpc_opts->content;
     388    while(hpc_opts_content!=NULL){
     389      fprintf(scriptFile,"#SBATCH --%s=%s\n",hpc_opts_content->name,hpc_opts_content->value);
     390      hpc_opts_content=hpc_opts_content->next;
     391    }
    183392  }
    184393  fprintf(scriptFile,"#SBATCH --job-name=ZOO-Project_%s_%s\n\n",uuid->value,s->name);
     
    186395  if(mods!=NULL)
    187396    fprintf(scriptFile,"#SBATCH --export=MODULES=%s\n",mods->value);
    188   else
    189     fprintf(scriptFile,"#SBATCH --export=MODULES=\n");
     397
     398  map* bodyMap=getMapFromMaps(*main_conf,serviceType,"body");
     399  if(bodyMap!=NULL){
     400    // Use the header file if defined in the HPC section of the main.cfg file
     401    struct stat f_status;
     402    int s=stat(bodyMap->value, &f_status);
     403    if(s==0){
     404      char* fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
     405      FILE* f=fopen(bodyMap->value,"rb");
     406      fread(fcontent,f_status.st_size,1,f);
     407      int fsize=f_status.st_size;
     408      fcontent[fsize]=0;
     409      fclose(f);
     410      fprintf(scriptFile,"%s\n### --- ZOO-Service BODY end --- ###\n\n",fcontent);
     411      free(fcontent);
     412    }else
     413      fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service BODY (no body found) *** ###\n\n");
     414  }else
     415    fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service BODY *** ###\n\n");
     416
     417 
    190418  map* sp=getMap(s->content,"serviceProvider");
     419 
    191420  // Require to produce the command line to be executed
    192421  fprintf(scriptFile,"\n\necho \"Job started at: $(date)\"\n");
     
    195424  for(int i=0;i<parameters_cnt;i++){
    196425    fprintf(scriptFile," %s",parameters[i]);
    197     //free(parameters[i]);
     426  }
     427  for(int i=parameters_cnt-1;i>=0;i--){
     428    free(parameters[i]);
    198429  }
    199430  free(parameters);
    200431  fprintf(scriptFile,"\n");
    201432  fprintf(scriptFile,"echo \"Job finished at: $(date)\"\n");
    202  
    203433  fflush(scriptFile);
    204434  fclose(scriptFile);
     435 
     436  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     437  invokeCallback(m,inputs,NULL,3,1);
     438  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
    205439
    206440  // Upload the SBATCH File to the remote host
    207   targetPathMap=getMapFromMaps(*main_conf,"HPC","executePath");
     441  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     442  invokeCallback(m,inputs,NULL,4,0);
     443  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     444  targetPathMap=getMapFromMaps(*main_conf,serviceType,"executePath");
    208445  if(targetPathMap==NULL){
    209446    setMapInMaps(*main_conf,"lenv","message",_("There is no executePath defined in you HPC section!"));
     
    213450  char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
    214451  sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
     452  setMapInMaps(*main_conf,"lenv","remote_script",targetPath);
    215453  SSHCON *test=ssh_connect(*main_conf);
    216   ssh_copy(*main_conf,scriptPath,targetPath);
     454  ssh_copy(*main_conf,scriptPath,targetPath,ssh_get_cnt(*main_conf));
    217455 
    218456  // Execute the SBATCH script remotely
    219457  map* subStr=getMapFromMaps(*main_conf,"HPC","subStr");
    220   //char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+37)*sizeof(char));
    221458  char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+137)*sizeof(char));
    222   //sprintf(command,"ls # %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value);
    223459  sprintf(command,"sbatch %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value);
    224   if(ssh_exec(*main_conf,command)==0){
     460  if(ssh_exec(*main_conf,command,ssh_get_cnt(m))==0){
    225461    // The sbatch command has failed!
    226462    // Download the error log file from the HPC server
    227463    char tmpS[1024];
    228464    free(command);
    229     command=(char*)malloc((strlen(targetPathMap->value)+strlen(uuid->value)+22)*sizeof(char));
     465    command=(char*)malloc((strlen(targetPathMap->value)+strlen(uuid->value)+11)*sizeof(char));
    230466    sprintf(command,"%s/error_%s.log",targetPathMap->value,uuid->value);
    231467    targetName=strrchr(command,'/');
     
    233469    targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(targetName)+2)*sizeof(char));
    234470    sprintf(targetPath,"%s/%s",tmpPath->value,targetName);
    235     if(ssh_fetch(*main_conf,targetPath,command)==0){
     471    if(ssh_fetch(*main_conf,targetPath,command,ssh_get_cnt(m))==0){
    236472      struct stat f_status;
    237473      int ts=stat(targetPath, &f_status);
     
    251487      setMapInMaps(*main_conf,"lenv","message",_("Unable to fetch the remote error log file"));
    252488    tmpPath=getMapFromMaps(*main_conf,"lenv","message");
     489    invokeCallback(m,NULL,NULL,7,1);
    253490    sprintf(tmpS, "Cannot execute the HPC ZOO-Service %s: %s", s->name, tmpPath->value);
    254491    errorException(m,tmpS,"NoApplicableCode",NULL);
     
    259496    return -1;
    260497  }
     498  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     499  fflush(stderr);
     500  invokeCallback(m,NULL,NULL,4,1);
     501  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     502  fflush(stderr);
     503  free(command);
    261504
    262505  struct sockaddr_un addr;
     
    264507  addr.sun_family = AF_UNIX;
    265508  int rc, cl, fd = socket(AF_UNIX, SOCK_STREAM, 0);
    266   char buf[100];
    267   char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(uuid->value)+19));
     509  char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(uuid->value)+20));
    268510  sprintf(sname,"%s/.wait_socket_%s.sock",tmpPath->value,uuid->value);
    269511  strncpy(addr.sun_path, sname, sizeof(addr.sun_path)-1);
     512 
    270513  if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
    271514    perror("bind error");
     
    274517  }
    275518  if (listen(fd, 5) == -1) {
    276     perror("listen error");
    277     sleep(120);
     519    setMapInMaps(*main_conf,"lenv","message",_("Listen error"));
    278520    return -1;
    279521  }
    280   /*fd_set master, read_fds;
    281   int fdmax;
    282   FD_ZERO(&master);
    283   FD_ZERO(&read_fds);
    284   FD_SET(fd, &master);
    285   if (select(fd+1, &master, NULL, NULL, NULL) == -1) {
    286     perror("select");
    287     sleep(120);
     522  if ( (cl = accept(fd, NULL, NULL)) == -1) {
     523    setMapInMaps(*main_conf,"lenv","message",_("Accept error"));
    288524    return -1;
    289   }
    290   if (FD_ISSET(fd, &master)) {*/
    291     if ( (cl = accept(fd, NULL, NULL)) == -1) {
    292       perror("accept error");
     525  }else{
     526    int hasPassed=-1;
     527    char buf[11];
     528    memset(&buf,0,11);
     529    while ( (rc=read(cl,buf,10)) ) {     
     530      if(rc==0){
     531        sleep(1);
     532        setMapInMaps(*main_conf,"lenv","message",_("Read closed"));
     533        return -1;
     534      }else{
     535        if(rc<0){
     536          setMapInMaps(*main_conf,"lenv","message",_("Read error"));
     537          return -1;
     538        }
     539      }
     540      hasPassed=1;
     541      res=atoi(buf);
     542      unlink(sname);
     543      //free(sname); 
     544
     545      if(res==3){
     546        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     547        fflush(stderr);
     548        invokeCallback(m,NULL,outputs,5,0);
     549        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
     550        fflush(stderr);
     551        input=*real_outputs;
     552        while(input!=NULL){
     553          if(input->child==NULL){
     554            map* generatedFile=getMap(input->content,"generated_file");
     555            if(generatedFile!=NULL){
     556              char* filename=strrchr(generatedFile->value,'/');
     557              char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char));
     558              sprintf(targetPath,"%s/%s",tmpPath->value,filename);
     559              test=ssh_connect(*main_conf);
     560              if(ssh_fetch(*main_conf,targetPath,generatedFile->value,ssh_get_cnt(m))==0){
     561                setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
     562                free(targetPath);
     563              }else{
     564                char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char));
     565                sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename);
     566                setMapInMaps(*main_conf,"lenv","message",tmpStr);
     567                free(tmpStr);
     568                return SERVICE_FAILED;
     569              }
     570            }       
     571          }else{
     572            fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
     573            fflush(stderr);
     574            map* generatedFile=getMap(input->content,"generated_file");
     575            if(generatedFile!=NULL){
     576              char* filename=strrchr(generatedFile->value,'/');
     577              char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char));
     578              sprintf(targetPath,"%s/%s",tmpPath->value,filename);
     579              test=ssh_connect(*main_conf);
     580              if(ssh_fetch(*main_conf,targetPath,generatedFile->value,ssh_get_cnt(m))==0){
     581                maps* tmp=getMaps(*real_outputs,input->name);
     582                freeMap(&tmp->content);
     583                free(tmp->content);
     584                tmp->content=NULL;
     585                maps* output=getMaps(*real_outputs,input->name);
     586                setMapInMaps(output->child,"download_link","generated_file",targetPath);
     587                setMapInMaps(output->child,"download_link","useMapserver","false");
     588                setMapInMaps(output->child,"WMS_LINK","generated_file",targetPath);
     589                setMapInMaps(output->child,"WMS_LINK","useMapserver","true");
     590                setMapInMaps(output->child,"WCS_LINK","generated_file",targetPath);
     591                setMapInMaps(output->child,"WCS_LINK","useMapserver","true");
     592              }
     593            }
     594          }
     595          input=input->next;
     596        }
     597      }
     598      //free(buf);
     599    }
     600    if(hasPassed<0){
     601      perror("Failed to read");
     602      setMapInMaps(*main_conf,"lenv","message",_("Unable to parse the value returned by remote execution"));
    293603      sleep(120);
    294       return -1;
    295     }else{
    296       int hasPassed=-1;
    297       /*FD_SET(cl,&read_fds);
    298       if (select(cl+1, &read_fds, NULL, NULL, NULL) == -1) {
    299         perror("select");
    300         sleep(120);
    301         return -1;
    302         }*/
    303       while ( (rc=read(cl,buf,10)) ) {   
    304         if(rc==0){
    305           sleep(120);
    306           return -1;
    307         }else{
    308           if(rc<0){
    309             perror("read");
    310             sleep(120);
    311             return -1;
    312           }
    313         }
    314         hasPassed=1;
    315         res=atoi(buf);
    316         unlink(sname);
    317         if(res==3){
    318           input=*real_outputs;
    319           while(input!=NULL){
    320             // TODO: parse all outputs including inner outputs if required.
    321             if(input->child==NULL){
    322               //map* dataPath=getMapFromMaps(*main_conf,"main","dataPath");
    323               map* generatedFile=getMap(input->content,"generated_file");
    324               if(generatedFile!=NULL){
    325                 char* filename=strrchr(generatedFile->value,'/');
    326                 char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char));
    327                 sprintf(targetPath,"%s/%s",tmpPath->value,filename);
    328                 test=ssh_connect(*main_conf);
    329                 if(ssh_fetch(*main_conf,targetPath,generatedFile->value)==0){
    330                   setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
    331                 }else{
    332                   char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char));
    333                   sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename);
    334                   setMapInMaps(*main_conf,"lenv","message",tmpStr);
    335                   free(tmpStr);
    336                   return SERVICE_FAILED;
    337                 }
    338               }
    339             }/*else{
    340              // Generate the nested outputs based on each input value
    341              if(getMaps(*real_inputs,input->name)!=NULL && getMapFromMaps(*real_inputs,input->name,"mimeType")!=NULL){
    342              // Input was ComplexeData
    343              maps* output=getMaps(*real_outputs,input->name);
    344              map* cache=getMapFromMaps(*real_inputs,input->name,"cache_file");
    345              setMapInMaps(output->child,"download","generated_file",cache->value);
    346              setMapInMaps(output->child,"WCS_LINK","generated_file",cache->value);
    347              setMapInMaps(output->child,"WCS_LINK","useMs","true");
    348              setMapInMaps(output->child,"WMS_LINK","generated_file",cache->value);
    349              setMapInMaps(output->child,"WMS_LINK","useMs","true");
    350              }
    351              }*/
    352             input=input->next;
    353           }
    354         }
    355       }
    356       if(hasPassed<0){
    357         perror("Failed to read");
    358         setMapInMaps(*main_conf,"lenv","message",_("Unable to parse the value returned by remote execution"));
    359         sleep(120);
    360         return SERVICE_FAILED;
    361       }
     604      return SERVICE_FAILED;
    362605    }
    363     //}
     606  }
    364607  ssh_close(*main_conf);
    365   //sleep(120);
    366608  return res;
    367609}
Note: See TracChangeset for help on using the changeset viewer.

Search

Context Navigation

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png