/** * Author : GĂ©rald FENOY * * Copyright 2017 GeoLabs SARL. All rights reserved. * * This work was supported by public funds received in the framework of GEOSUD, * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed * by the French National Research Agency * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ #include "service.h" #include "service_internal.h" #include "sshapi.h" #include "server_internal.h" #include #include #include #include #include #include #include #include #include #include #include extern "C" { /** * FinalizeHPC ZOO Service : * This service is used to inform a ZOO-Kernel waiting for the end of the * execution of a HPC service */ ZOO_DLL_EXPORT int FinalizeHPC(maps*& conf,maps*& inputs,maps*& outputs){ // Retrieve the jobid corresponding to the identifier generated by SLURM // by reading the file generated when running the SBATCH file map* jobid=getMapFromMaps(inputs,"jobid","value"); struct sockaddr_un addr; char buf[100]="3"; int fd,rc=NULL; int i=0; map* usid=getMapFromMaps(conf,"lenv","usid"); map* tmpPath=getMapFromMaps(conf,"main","tmpPath"); char *flenv = (char *) malloc ((strlen (tmpPath->value) + strlen (jobid->value) + 12) * sizeof (char)); sprintf (flenv, "%s/%s_lenv.cfg", tmpPath->value, jobid->value); maps* m = (maps *) malloc (MAPS_SIZE); m->child=NULL; m->next=NULL; map* configId=NULL; if(conf_read(flenv, m) != 2){ configId=getMapFromMaps(m,"lenv","configId"); setMapInMaps(conf,"lenv","configId",configId->value); }else{ setMapInMaps(conf,"lenv","message",_("Unable to read the lenv section file of the requested jobid")); return SERVICE_FAILED; } SSHCON *test=ssh_connect(conf); /*if(test==NULL){ setMapInMaps(conf,"lenv","message",_("Unable to connect using ssh.")); return SERVICE_FAILED; }*/ char *logPath=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+12)*sizeof(char)); sprintf(logPath,"%s/exec_out_%s",tmpPath->value,jobid->value); struct stat f_status; int ts=stat(logPath, &f_status); char* fcontent = NULL; if(ts==0) { fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1)); FILE* f=fopen(logPath,"rb"); fread(fcontent,f_status.st_size,1,f); int fsize=f_status.st_size; fcontent[fsize]=0; fclose(f); }else{ setMapInMaps(conf,"lenv","message",_("No service with this jobid can be found")); return SERVICE_FAILED; } free(logPath); // Run scontrol to check if the service execution ended. // Store all the informations returned by scontrol command as a cfg file to // be parsed back by the ZOO-Kernel waiting for the execution of the remote // service maps* tmpMaps=createMaps("henv"); char* command=(char*)malloc((126)*sizeof(char)); sprintf(command,"scontrol show jobid | grep -A24 JobId=%s",fcontent); if(ssh_exec(conf,command,ssh_get_cnt(conf))==0){ free(command); setMapInMaps(conf,"lenv","message",_("Failed to run scontrol remotely")); // TODO: check status in db and if available continue in other case return SERVICE_FAILED return SERVICE_FAILED; }else{ free(command); logPath=(char*)malloc((strlen(tmpPath->value)+strlen(usid->value)+11)*sizeof(char)); sprintf(logPath,"%s/exec_out_%s",tmpPath->value,usid->value); int ts=stat(logPath, &f_status); if(ts==0) { fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1)); FILE* f=fopen(logPath,"rb"); fread(fcontent,f_status.st_size,1,f); int fsize=f_status.st_size; fcontent[fsize]=0; fclose(f); free(logPath); char *token, *saveptr; token = strtok_r (fcontent, " ", &saveptr); while (token != NULL) { char *token1, *saveptr1; char *tmpToken=strdup(token); token1 = strtok_r (tmpToken, "=", &saveptr1); int isNext=-1; int hasTwoElements=0; char *name=NULL; while (token1 != NULL) { if(hasTwoElements==0) name=strdup(token1); if(hasTwoElements<1) hasTwoElements+=1; else{ char *value=strdup(token1); if(value[strlen(value)-1]=='\n') value[strlen(value)-1]=0; if(strlen(name)>0 && strlen(value)>0){ if(tmpMaps->content==NULL) tmpMaps->content=createMap(name,value); else addToMap(tmpMaps->content,name,value); free(value); } free(name); hasTwoElements=0; } token1 = strtok_r (NULL, "=", &saveptr1); } free(tmpToken); token = strtok_r (NULL, " ", &saveptr); } }else{ setMapInMaps(conf,"lenv","message",_("Unable to access the downloaded execution log file")); return SERVICE_FAILED; } } logPath=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+15)*sizeof(char)); sprintf(logPath,"%s/exec_status_%s",tmpPath->value,jobid->value); dumpMapsToFile(tmpMaps,logPath,0); char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+21)); sprintf(sname,"%s/.wait_socket_%s.sock",tmpPath->value,jobid->value); if ( (fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { perror("socket error"); setMapInMaps(conf,"lenv","message",_("Socket error")); return SERVICE_FAILED; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, sname, sizeof(addr.sun_path)-1); if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { perror("connect error"); setMapInMaps(conf,"lenv","message",_("Unable to connect")); return SERVICE_FAILED; } if (write(fd, "3", 1) != rc) { if (rc < 0) { perror("write error"); setMapInMaps(conf,"lenv","message",_("Unable to announce the successful execution of the HPC service")); close(fd); return SERVICE_FAILED; } } close(fd); setOutputValue(outputs,"Result",(char*)"\"FinalizeHPC run successfully\"",32); unlink(flenv); free(flenv); return SERVICE_SUCCEEDED; } /** * FinalizeHPC1 ZOO Service : * This service is used to inform a ZOO-Kernel waiting for the end of the * execution of a HPC service * * format="AllocCPUS"; for i in $(sacct -e) ; do format="$format,$i"; done; format="$(echo $format | sed "s:AllocCPUS,::")" ; echo $format; sacct --format=$format -p | grep "997f-11e8-9f78-0050569320d2" * * AllocCPUS,AllocGRES,AllocNodes,AllocTRES,Account,AssocID,AveCPU,AveCPUFreq,AveDiskRead,AveDiskWrite,AvePages,AveRSS,AveVMSize,BlockID,Cluster,Comment,ConsumedEnergy,ConsumedEnergyRaw,CPUTime,CPUTimeRAW,DerivedExitCode,Elapsed,Eligible,End,ExitCode,GID,Group,JobID,JobIDRaw,JobName,Layout,MaxDiskRead,MaxDiskReadNode,MaxDiskReadTask,MaxDiskWrite,MaxDiskWriteNode,MaxDiskWriteTask,MaxPages,MaxPagesNode,MaxPagesTask,MaxRSS,MaxRSSNode,MaxRSSTask,MaxVMSize,MaxVMSizeNode,MaxVMSizeTask,MinCPU,MinCPUNode,MinCPUTask,NCPUS,NNodes,NodeList,NTasks,Priority,Partition,QOS,QOSRAW,ReqCPUFreq,ReqCPUFreqMin,ReqCPUFreqMax,ReqCPUFreqGov,ReqCPUS,ReqGRES,ReqMem,ReqNodes,ReqTRES,Reservation,ReservationId,Reserved,ResvCPU,ResvCPURAW,Start,State,Submit,Suspended,SystemCPU,Timelimit,TotalCPU,UID,User,UserCPU,WCKey,WCKeyID * 28||1|cpu=28,node=1|geosud|258|||||||||cluster||||00:00:56|56|0:0|00:00:02|2018-08-06T15:48:13|2018-08-06T15:48:16|0:0|1019|geosud|883299|883299|ZOO-Project_5bd1c32b-997f-11e8-9f78-0050569320d2_GSDBandMath_6_2_005||||||||||||||||||||28|1|muse044||4294360886|defq|qos_geosud|20|Unknown|Unknown|Unknown|Unknown|1||0n|1|cpu=1,node=1|||00:00:01|00:00:01|1|2018-08-06T15:48:14|COMPLETED|2018-08-06T15:48:13|00:00:00||UNLIMITED|00:00:00|1229|geosudwps|||0| * */ ZOO_DLL_EXPORT int FinalizeHPC1(maps*& conf,maps*& inputs,maps*& outputs){ // Retrieve the jobid corresponding to the identifier generated by SLURM // by reading the file generated when running the SBATCH file map* jobid=getMapFromMaps(inputs,"jobid","value"); struct sockaddr_un addr; char buf[100]="3"; int fd,rc=NULL; int i=0; map* usid=getMapFromMaps(conf,"lenv","usid"); map* tmpPath=getMapFromMaps(conf,"main","tmpPath"); char *flenv = (char *) malloc ((strlen (tmpPath->value) + strlen (jobid->value) + 12) * sizeof (char)); sprintf (flenv, "%s/%s_lenv.cfg", tmpPath->value, jobid->value); maps* m = (maps *) malloc (MAPS_SIZE); m->child=NULL; m->next=NULL; map* configId=NULL; if(conf_read(flenv, m) != 2){ configId=getMapFromMaps(m,"lenv","configId"); setMapInMaps(conf,"lenv","configId",configId->value); }else{ setMapInMaps(conf,"lenv","message",_("Unable to read the lenv section file of the requested jobid")); return SERVICE_FAILED; } SSHCON *test=ssh_connect(conf); /*if(test==NULL){ setMapInMaps(conf,"lenv","message",_("Unable to connect using ssh.")); return SERVICE_FAILED; }*/ char *logPath=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+12)*sizeof(char)); sprintf(logPath,"%s/exec_out_%s",tmpPath->value,jobid->value); struct stat f_status; int ts=stat(logPath, &f_status); char* fcontent = NULL; if(ts==0) { fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1)); FILE* f=fopen(logPath,"rb"); fread(fcontent,f_status.st_size,1,f); int fsize=f_status.st_size; fcontent[fsize]=0; fclose(f); }else{ setMapInMaps(conf,"lenv","message",_("No service with this jobid can be found")); return SERVICE_FAILED; } free(logPath); // Run sacct to check if the service execution ended. // Store all the informations returned by scontrol command as a cfg file to // be parsed back by the ZOO-Kernel waiting for the execution of the remote // service maps* tmpMaps=createMaps("henv"); map* tmpMap=getMapFromMaps(conf,configId->value,"remote_command_opt"); char* command=(char*)malloc((126+strlen(tmpMap->value))*sizeof(char)); sprintf(command,"sacct --format=%s -p | grep \"%s\" | sed \"s:||:|None|:g;s:||:|None|:g\"",tmpMap->value,jobid->value); if(ssh_exec(conf,command,ssh_get_cnt(conf))==0){ free(command); setMapInMaps(conf,"lenv","message",_("Failed to run sacct remotely")); // TODO: check status in db and if available continue in other case return SERVICE_FAILED return SERVICE_FAILED; }else{ free(command); logPath=(char*)malloc((strlen(tmpPath->value)+strlen(usid->value)+11)*sizeof(char)); sprintf(logPath,"%s/exec_out_%s",tmpPath->value,usid->value); int ts=stat(logPath, &f_status); if(ts==0) { fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1)); FILE* f=fopen(logPath,"rb"); fread(fcontent,f_status.st_size,1,f); int fsize=f_status.st_size; fcontent[fsize]=0; fclose(f); free(logPath); char *token, *saveptr; char *token1, *saveptr1; token = strtok_r (tmpMap->value, ",", &saveptr); token1 = strtok_r (fcontent, "|", &saveptr1); while (token != NULL) { fprintf(stderr,"%s %d %s \n",__FILE__,__LINE__,token); fflush(stderr); fprintf(stderr,"%s %d %s %s \n",__FILE__,__LINE__,token,token1); fflush(stderr); if(token1 != NULL){ if(tmpMaps->content==NULL) tmpMaps->content=createMap(token,token1); else addToMap(tmpMaps->content,token,token1); } token = strtok_r (NULL, ",", &saveptr); token1 = strtok_r (NULL, "|", &saveptr1); } }else{ free(logPath); setMapInMaps(conf,"lenv","message",_("Unable to access the downloaded execution log file")); return SERVICE_FAILED; } } tmpMap=getMapFromMaps(tmpMaps,"henv","JobId"); if(tmpMap!=NULL){ char* tmpStr=(char*)malloc((32)*sizeof(char)); sprintf(tmpStr,"slurm-%s.out",tmpMap->value); addToMap(tmpMaps->content,"StdErr",tmpStr); free(tmpStr); } logPath=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+15)*sizeof(char)); sprintf(logPath,"%s/exec_status_%s",tmpPath->value,jobid->value); dumpMapsToFile(tmpMaps,logPath,0); char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+21)); sprintf(sname,"%s/.wait_socket_%s.sock",tmpPath->value,jobid->value); if ( (fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { perror("socket error"); setMapInMaps(conf,"lenv","message",_("Socket error")); return SERVICE_FAILED; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, sname, sizeof(addr.sun_path)-1); if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { perror("connect error"); setMapInMaps(conf,"lenv","message",_("Unable to connect")); return SERVICE_FAILED; } if (write(fd, "3", 1) != rc) { if (rc < 0) { perror("write error"); setMapInMaps(conf,"lenv","message",_("Unable to announce the successful execution of the HPC service")); close(fd); return SERVICE_FAILED; } } close(fd); unlink(flenv); free(flenv); setOutputValue(outputs,"Result",(char*)"\"FinalizeHPC run successfully\"",32); return SERVICE_SUCCEEDED; } }