[Saga-devel] saga SVN commit 3250: /trunk/adaptors/ssh/

amerzky at cct.lsu.edu amerzky at cct.lsu.edu
Fri Jan 9 06:02:37 CST 2009


User: amerzky
Date: 2009/01/09 06:02 AM

Modified:
 /trunk/adaptors/ssh/
  Makefile, configure, configure.in
 /trunk/adaptors/ssh/config/
  saga_ssh.m4
 /trunk/adaptors/ssh/ssh_context/
  ssh_context_adaptor.cpp, ssh_context_adaptor.ini.in
 /trunk/adaptors/ssh/ssh_file/
  README, ssh_file_adaptor.cpp, ssh_file_adaptor_sshfs.cpp
 /trunk/adaptors/ssh/ssh_job/
  ssh_job.cpp, ssh_job.hpp, ssh_job_adaptor.cpp, ssh_job_adaptor.ini, ssh_job_service.cpp, ssh_job_service.hpp

Log:
 ssh job adaptor is working.  
 
 main limitation: signals are sent to the local ssh process,
 _not_ to the remotely started process.  That is probably not
 what the user wants.  That also holds for suspend/resume, which
 act on the local ssh process.
 
 reason: I simply do not know how to find the PID of the remote
 job - ssh does not report it.  BTW: using libssh would not solve
 that problem.  At the moment, I can't think of a clean solution.
 Any opinion?
 
 A

File Changes:

Directory: /trunk/adaptors/ssh/config/
======================================

File [modified]: saga_ssh.m4
Delta lines: +37 -0
===================================================================
--- trunk/adaptors/ssh/config/saga_ssh.m4	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/config/saga_ssh.m4	2009-01-09 12:01:58 UTC (rev 3250)
@@ -16,6 +16,8 @@
 #
 #     AC_SUBST(HAVE_SSH)
 #     AC_SUBST(SAGA_SSH)
+#     AC_SUBST(HAVE_SCP)
+#     AC_SUBST(SAGA_SCP)
 #     AC_SUBST(HAVE_SSHFS)
 #     AC_SUBST(SAGA_SSHFS)
 #
@@ -67,6 +69,41 @@
 ])
 
 
+AC_DEFUN([AX_SAGA_CHECK_SCP],
+[
+  AC_ARG_WITH([scp],
+              [AS_HELP_STRING([--with-scp],
+              [path to scp binary @<:@default=check@:>@])],
+              [],
+              [with_scp=check])
+
+  if test "x$with_scp" = "xcheck"; then
+  
+    AC_PATH_PROG(SAGA_SCP, scp)
+
+    HAVE_SCP=no
+
+    if ! test "x$SAGA_SCP" = "x"; then
+      HAVE_SCP=yes
+    fi
+  
+  else
+
+    if test -x "$with_scp"; then
+      HAVE_SCP=yes
+      SAGA_SCP=$with_scp
+    else
+      HAVE_SCP=no
+    fi
+
+  fi
+
+
+  AC_SUBST(HAVE_SCP)
+  AC_SUBST(SAGA_SCP)
+])
+
+
 AC_DEFUN([AX_SAGA_CHECK_SSHFS],
 [
   AC_ARG_WITH([sshfs],

Directory: /trunk/adaptors/ssh/
===============================

File [modified]: Makefile
Delta lines: +4 -0
===================================================================
--- trunk/adaptors/ssh/Makefile	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/Makefile	2009-01-09 12:01:58 UTC (rev 3250)
@@ -22,3 +22,7 @@
 
 -include $(SAGA_MAKE_INCLUDE_ROOT)/saga.mk
 
+ssh_context: config
+ssh_file:    config
+ssh_job:     config
+

File [modified]: configure
Delta lines: +93 -6
===================================================================
--- trunk/adaptors/ssh/configure	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/configure	2009-01-09 12:01:58 UTC (rev 3250)
@@ -622,9 +622,12 @@
 TMP_SAGA_INSTTYPE
 SAGA_SSH
 HAVE_SSH
+SAGA_SCP
+HAVE_SCP
 SAGA_SSHFS
 HAVE_SSHFS
 CPP_HAVE_SSH
+CPP_HAVE_SCP
 CPP_HAVE_SSHFS
 BUILD_ADAPTOR_CONTEXT
 BUILD_ADAPTOR_FILE
@@ -1206,6 +1209,7 @@
   --with-PACKAGE[=ARG]    use PACKAGE [ARG=yes]
   --without-PACKAGE       do not use PACKAGE (same as --with-PACKAGE=no)
   --with-ssh              path to ssh binary [default=check]
+  --with-scp              path to scp binary [default=check]
   --with-sshfs            path to sshfs binary [default=check]
 
 Some influential environment variables:
@@ -1914,6 +1918,80 @@
 if test "$HAVE_SSH" = "yes"; then
 
 
+# Check whether --with-scp was given.
+if test "${with_scp+set}" = set; then
+  withval=$with_scp;
+else
+  with_scp=check
+fi
+
+
+  if test "x$with_scp" = "xcheck"; then
+
+    # Extract the first word of "scp", so it can be a program name with args.
+set dummy scp; ac_word=$2
+{ echo "$as_me:$LINENO: checking for $ac_word" >&5
+echo $ECHO_N "checking for $ac_word... $ECHO_C" >&6; }
+if test "${ac_cv_path_SAGA_SCP+set}" = set; then
+  echo $ECHO_N "(cached) $ECHO_C" >&6
+else
+  case $SAGA_SCP in
+  [\\/]* | ?:[\\/]*)
+  ac_cv_path_SAGA_SCP="$SAGA_SCP" # Let the user override the test with a path.
+  ;;
+  *)
+  as_save_IFS=$IFS; IFS=$PATH_SEPARATOR
+for as_dir in $PATH
+do
+  IFS=$as_save_IFS
+  test -z "$as_dir" && as_dir=.
+  for ac_exec_ext in '' $ac_executable_extensions; do
+  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+    ac_cv_path_SAGA_SCP="$as_dir/$ac_word$ac_exec_ext"
+    echo "$as_me:$LINENO: found $as_dir/$ac_word$ac_exec_ext" >&5
+    break 2
+  fi
+done
+done
+IFS=$as_save_IFS
+
+  ;;
+esac
+fi
+SAGA_SCP=$ac_cv_path_SAGA_SCP
+if test -n "$SAGA_SCP"; then
+  { echo "$as_me:$LINENO: result: $SAGA_SCP" >&5
+echo "${ECHO_T}$SAGA_SCP" >&6; }
+else
+  { echo "$as_me:$LINENO: result: no" >&5
+echo "${ECHO_T}no" >&6; }
+fi
+
+
+
+    HAVE_SCP=no
+
+    if ! test "x$SAGA_SCP" = "x"; then
+      HAVE_SCP=yes
+    fi
+
+  else
+
+    if test -x "$with_scp"; then
+      HAVE_SCP=yes
+      SAGA_SCP=$with_scp
+    else
+      HAVE_SCP=no
+    fi
+
+  fi
+
+
+
+
+
+
+
 # Check whether --with-sshfs was given.
 if test "${with_sshfs+set}" = set; then
   withval=$with_sshfs;
@@ -1995,14 +2073,15 @@
 
 # translate defines into integers
 CPP_HAVE_SSH=0
+CPP_HAVE_SCP=0
+CPP_HAVE_SSHFS=0
 
-if test "$HAVE_SSH" = "yes"; then
+if test "$HAVE_SSH-$HAVE_SCP-$HAVE_SSHFS" = "yes-yes-yes"; then
   CPP_HAVE_SSH=1
+  CPP_HAVE_SCP=1
+  CPP_HAVE_SSHFS=1
 fi
 
-if test "$HAVE_SSHFS" = "yes"; then
-  CPP_HAVE_SSH=1
-fi
 
 
 
@@ -2021,7 +2100,7 @@
     BUILD_ADAPTOR_JOB="yes"
   fi
 
-  if test "x$HAVE_SSHFS" = "xyes"; then
+  if test "$HAVE_SSHFS-$HAVE_SCP" = "yes-yes"; then
     if test "x$SAGA_HAVE_ADAPTOR_FILE" = "xyes"; then
       BUILD_ADAPTOR_FILE="yes"
     fi
@@ -2033,6 +2112,10 @@
   SAGA_SSH_S="($SAGA_SSH)"
 fi
 
+if test "x$HAVE_SCP" = "xyes"; then
+  SAGA_SCP_S="($SAGA_SCP)"
+fi
+
 if test "x$HAVE_SSHFS" = "xyes"; then
   SAGA_SSHFS_S="($SAGA_SSHFS)"
 fi
@@ -2725,9 +2808,12 @@
 TMP_SAGA_INSTTYPE!$TMP_SAGA_INSTTYPE$ac_delim
 SAGA_SSH!$SAGA_SSH$ac_delim
 HAVE_SSH!$HAVE_SSH$ac_delim
+SAGA_SCP!$SAGA_SCP$ac_delim
+HAVE_SCP!$HAVE_SCP$ac_delim
 SAGA_SSHFS!$SAGA_SSHFS$ac_delim
 HAVE_SSHFS!$HAVE_SSHFS$ac_delim
 CPP_HAVE_SSH!$CPP_HAVE_SSH$ac_delim
+CPP_HAVE_SCP!$CPP_HAVE_SCP$ac_delim
 CPP_HAVE_SSHFS!$CPP_HAVE_SSHFS$ac_delim
 BUILD_ADAPTOR_CONTEXT!$BUILD_ADAPTOR_CONTEXT$ac_delim
 BUILD_ADAPTOR_FILE!$BUILD_ADAPTOR_FILE$ac_delim
@@ -2736,7 +2822,7 @@
 LTLIBOBJS!$LTLIBOBJS$ac_delim
 _ACEOF
 
-  if test `sed -n "s/.*$ac_delim\$/X/p" conf$$subs.sed | grep -c X` = 55; then
+  if test `sed -n "s/.*$ac_delim\$/X/p" conf$$subs.sed | grep -c X` = 58; then
     break
   elif $ac_last_try; then
     { { echo "$as_me:$LINENO: error: could not make $CONFIG_STATUS" >&5
@@ -3076,6 +3162,7 @@
 echo " Using SAGA from       : $TMP_SAGA_LOCATION ($TMP_SAGA_INSTTYPE) " | $TEE $OUT
 echo "                                                                 " | $TEE $OUT
 echo " SSH found             : $HAVE_SSH  $SAGA_SSH_S                  " | $TEE $OUT
+echo " SCP found             : $HAVE_SCP  $SAGA_SCP_S                  " | $TEE $OUT
 echo " SSHFS found           : $HAVE_SSHFS  $SAGA_SSHFS_S              " | $TEE $OUT
 echo "                                                                 " | $TEE $OUT
 echo " ========================================================        " | $TEE $OUT

File [modified]: configure.in
Delta lines: +13 -6
===================================================================
--- trunk/adaptors/ssh/configure.in	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/configure.in	2009-01-09 12:01:58 UTC (rev 3250)
@@ -22,6 +22,7 @@
 AX_SAGA_CHECK_SSH()
 
 if test "$HAVE_SSH" = "yes"; then
+  AX_SAGA_CHECK_SCP()
   AX_SAGA_CHECK_SSHFS()
 fi
 
@@ -30,16 +31,17 @@
 
 # translate defines into integers
 CPP_HAVE_SSH=0
+CPP_HAVE_SCP=0
+CPP_HAVE_SSHFS=0
 
-if test "$HAVE_SSH" = "yes"; then
+if test "$HAVE_SSH-$HAVE_SCP-$HAVE_SSHFS" = "yes-yes-yes"; then
   CPP_HAVE_SSH=1
+  CPP_HAVE_SCP=1
+  CPP_HAVE_SSHFS=1
 fi
 
-if test "$HAVE_SSHFS" = "yes"; then
-  CPP_HAVE_SSH=1
-fi
-
 AC_SUBST(CPP_HAVE_SSH)
+AC_SUBST(CPP_HAVE_SCP)
 AC_SUBST(CPP_HAVE_SSHFS)
 
 BUILD_ADAPTOR_CONTEXT=no
@@ -56,7 +58,7 @@
     BUILD_ADAPTOR_JOB="yes"
   fi
 
-  if test "x$HAVE_SSHFS" = "xyes"; then
+  if test "$HAVE_SSHFS-$HAVE_SCP" = "yes-yes"; then
     if test "x$SAGA_HAVE_ADAPTOR_FILE" = "xyes"; then
       BUILD_ADAPTOR_FILE="yes"
     fi
@@ -68,6 +70,10 @@
   SAGA_SSH_S="($SAGA_SSH)"
 fi
 
+if test "x$HAVE_SCP" = "xyes"; then
+  SAGA_SCP_S="($SAGA_SCP)"
+fi
+
 if test "x$HAVE_SSHFS" = "xyes"; then
   SAGA_SSHFS_S="($SAGA_SSHFS)"
 fi
@@ -95,6 +101,7 @@
 echo " Using SAGA from       : $TMP_SAGA_LOCATION ($TMP_SAGA_INSTTYPE) " | $TEE $OUT
 echo "                                                                 " | $TEE $OUT
 echo " SSH found             : $HAVE_SSH  $SAGA_SSH_S                  " | $TEE $OUT
+echo " SCP found             : $HAVE_SCP  $SAGA_SCP_S                  " | $TEE $OUT
 echo " SSHFS found           : $HAVE_SSHFS  $SAGA_SSHFS_S              " | $TEE $OUT
 echo "                                                                 " | $TEE $OUT
 echo " ========================================================        " | $TEE $OUT

Directory: /trunk/adaptors/ssh/ssh_context/
===========================================

File [modified]: ssh_context_adaptor.cpp
Delta lines: +0 -5
===================================================================
--- trunk/adaptors/ssh/ssh_context/ssh_context_adaptor.cpp	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_context/ssh_context_adaptor.cpp	2009-01-09 12:01:58 UTC (rev 3250)
@@ -81,8 +81,6 @@
 
     cert_info_t ci;
 
-    std::cerr << " === key : " << saga::attributes::context_userkey << std::endl;
-
     if ( attr.attribute_exists (saga::attributes::context_userkey) ) 
     {
       // this call looks for a valid proxy file in the location described by
@@ -118,7 +116,6 @@
     }
     else
     {
-      std::cerr << " === " << ci.errormessage << std::endl;
       SAGA_ADAPTOR_THROW (ci.errormessage, saga::NoSuccess);
     }
   }
@@ -168,8 +165,6 @@
 
   ci.success = true;
 
-  std::cerr << " === path: " << path << std::endl;
-
   // fall back to default if needed
   if ( path == "" )
   {

File [modified]: ssh_context_adaptor.ini.in
Delta lines: +4 -1
===================================================================
--- trunk/adaptors/ssh/ssh_context/ssh_context_adaptor.ini.in	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_context/ssh_context_adaptor.ini.in	2009-01-09 12:01:58 UTC (rev 3250)
@@ -13,6 +13,9 @@
   ssh_bin         = @SAGA_SSH@
   ssh_opt         = -o StrictHostKeyChecking=no
 
+  scp_bin         = @SAGA_SCP@
+  scp_opt         = -o StrictHostKeyChecking=no
+
   sshfs_bin       = @SAGA_SSHFS@
-  sshfs_opt       = -o workaround=nonodelay 
+  sshfs_opt       = -o workaround=nonodelay -o StrictHostKeyChecking=no
 

Directory: /trunk/adaptors/ssh/ssh_file/
========================================

File [modified]: README
Delta lines: +5 -9
===================================================================
--- trunk/adaptors/ssh/ssh_file/README	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_file/README	2009-01-09 12:01:58 UTC (rev 3250)
@@ -13,14 +13,10 @@
 
 The adaptor mounts the remote file systems into
 
-  ~/.saga/adaptors/ssh/ssh_file/mnt/<saga_object_id>
+  ~/.saga/adaptors/ssh/ssh_file/mnt/<unique_id>
 
-Obviously, multiple directory objects accessing the same remote file system will
-thus mount that file system multiple times.  At the moment is unclear if that is
-a problem (beyond the object initialization latency).
+Latency for mounting a fs is a <10 seconds (usually about 1).  The
+mounts are refcounted, so the last object using the mount is doing the
+umount.  (The 'sshfs_keepalive' option determines if the filesystem is
+umounted when the respective SAGA object gets destroyed.)
 
-The 'sshfs_keepalive' option determines if the filesystem is umounted when the
-respective SAGA object gets destroyed.
-
-TODO: Security and context management needs documentation.
-

File [modified]: ssh_file_adaptor.cpp
Delta lines: +0 -5
===================================================================
--- trunk/adaptors/ssh/ssh_file/ssh_file_adaptor.cpp	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_file/ssh_file_adaptor.cpp	2009-01-09 12:01:58 UTC (rev 3250)
@@ -19,8 +19,6 @@
   saga::impl::adaptor_selector::adaptor_info_list_type
     file_adaptor::adaptor_register (saga::impl::session * s)
   {
-    std::cout  << " === ctor" << std::endl;
-
     // list of implemented cpi's
     saga::impl::adaptor_selector::adaptor_info_list_type infos;
     preference_type prefs; 
@@ -34,7 +32,6 @@
   // on destruction, umount all sshfs file systems - if keepalive is not set
   file_adaptor::~file_adaptor (void)
   {
-    std::cout  << " === dtor" << std::endl;
     // for ( unsigned int i = 0; i < mounted_.size (); i++ )
     // {
     //   // release the sshfs shared_ptr.  On sshfs destruction, the file system
@@ -48,8 +45,6 @@
                                                    const saga::session & s, 
                                                    const saga::url     & u)
   {
-    std::cout << " === get_sshfs " << u << "\n";
-
     // get the id for the (potential) mount point.
     // id is host:port
     std::string host = u.get_host ();

File [modified]: ssh_file_adaptor_sshfs.cpp
Delta lines: +1 -5
===================================================================
--- trunk/adaptors/ssh/ssh_file/ssh_file_adaptor_sshfs.cpp	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_file/ssh_file_adaptor_sshfs.cpp	2009-01-09 12:01:58 UTC (rev 3250)
@@ -18,9 +18,7 @@
     , s_   (s)
     , url_ (u)
   {
-    SAGA_LOG_ALWAYS("=============================================");
-    SAGA_LOG_ALWAYS(u.get_string ().c_str ());
-    SAGA_LOG_ALWAYS("=============================================");
+    SAGA_LOG_DEBUG(u.get_string ().c_str ());
 
     // can we handle that URL?
     if ( url_.get_scheme () != "any" &&
@@ -188,8 +186,6 @@
       saga::url u ("file://localhost/");
       u.set_path (mount_);
 
-      std::cout << " ======= trying to create " << u << "\n";
-
       saga::filesystem::directory d (s_, u, 
                                      saga::filesystem::Create |
                                      saga::filesystem::CreateParents);

Directory: /trunk/adaptors/ssh/ssh_job/
=======================================

File [modified]: ssh_job.cpp
Delta lines: +146 -19
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job.cpp	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job.cpp	2009-01-09 12:01:58 UTC (rev 3250)
@@ -33,14 +33,125 @@
 {
 
   // constructor
+  //
+  // note that we can never be job::self - that is usually handled by the local
+  // adaptor.
   job_cpi_impl::job_cpi_impl (proxy                           * p, 
                               cpi_info const                  & info,
                               saga::ini::ini const            & glob_ini, 
                               saga::ini::ini const            & adap_ini,
                               TR1::shared_ptr <saga::adaptor>   adaptor)
-    : base_cpi  (p, info, adaptor, cpi::Noflags)
+    : base_cpi  (p, info, adaptor, cpi::Noflags),
+      js_ ("fork://localhost/")
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    instance_data     idata (this);
+    adaptor_data_type adata (this);
+
+    ini_ = adap_ini.get_section ("preferences").get_entries ();
+    rm_  = idata->rm_;
+
+    SAGA_LOG_DEBUG ("SSH Job c'tor");
+    SAGA_LOG_DEBUG (rm_.get_string ().c_str ());
+
+    host_ = rm_.get_host ();
+
+    std::stringstream ss;
+    ss << rm_.get_port ();
+    port_ = ss.str ();
+
+    ssh_bin_ = ini_["ssh_bin"];
+    scp_bin_ = ini_["scp_bin"];
+
+    ssh_opt_ = saga::adaptors::utils::split (ini_["ssh_opt"]);
+    scp_opt_ = saga::adaptors::utils::split (ini_["scp_opt"]);
+
+    if ( rm_.get_scheme () != "ssh" &&
+         rm_.get_scheme () != "any" &&
+         rm_.get_scheme () != "" )
+    {
+      SAGA_ADAPTOR_THROW (std::string ("Adaptor only supports 'ssh' and 'any' URL schemes, not ")
+                          + rm_.get_scheme ().c_str (),
+                          saga::BadParameter);
+    }
+
+    // trust the job service to give us a session with a valid context
+    std::vector <saga::context> contexts = p->get_session ().list_contexts ();    
+    ctx_ = contexts[0];
+
+    // sanity check
+    if ( ctx_.get_attribute ("Type") != "ssh" )
+    {
+      SAGA_ADAPTOR_THROW_NO_CONTEXT ("no ssh context found for session",
+                                     saga::NoSuccess);
+    }
+
+    key_  = ctx_.get_attribute ("UserKey");
+    user_ = ctx_.get_attribute ("UserID");
+
+    SAGA_LOG_DEBUG (rm_.get_string ().c_str ());
+
+
+    // FIXME: make sure that rm_.get_host () and jd['CandidateHost'] match
+
+    if ( idata->init_from_jobid_ )
+    {
+      old_jobid_ = idata->jobid_;
+      
+      // FIXME: extract default adaptors jobid from jobid, get job from default job
+      // service, and get job from there, too.  Also get jd, and create old/new
+      // jd
+
+      j_ = js_.get_job (new_jobid_);
+    }
+    else
+    {
+      // we don't init from jobid, this we init from job description
+
+      // we simply add the ssh command in front of the job cmd line, and reuse the
+      // jd
+      old_jd_ = idata->jd_; // just save
+      new_jd_ = idata->jd_; // save and fix for ssh
+
+      // save old exe and args
+      std::string               old_exe;
+      std::vector <std::string> old_args;
+
+      old_exe = old_jd_.get_attribute (saga::job::attributes::description_executable);
+
+      if ( old_jd_.attribute_exists (saga::job::attributes::description_arguments) )
+      {
+        old_args = old_jd_.get_vector_attribute (saga::job::attributes::description_arguments);
+      }
+
+      // create new args and exe, and set them in the new jd
+      std::string               new_exe  = ssh_bin_;
+      std::vector <std::string> new_args = ssh_opt_;
+      std::vector <std::string> new_hosts;
+
+      // add ssh specific args
+      new_args.push_back ("-i");
+      new_args.push_back (key_);
+      new_args.push_back (user_ + "@" + host_);
+
+      // readd old exe and args
+      new_args.push_back (old_exe);
+
+      for ( int i = 0; i < old_args.size (); i++ )
+      {
+        new_args.push_back (old_args[i]);
+      }
+
+      // we only want to run on localhost
+      new_hosts.push_back ("localhost");
+
+      // fill jd with new settings
+      new_jd_.set_attribute        (saga::job::attributes::description_executable,       new_exe);
+      new_jd_.set_vector_attribute (saga::job::attributes::description_arguments,        new_args);
+      new_jd_.set_vector_attribute (saga::job::attributes::description_candidate_hosts,  new_hosts);
+
+      // create the job with the altered jd
+      j_ = js_.create_job (new_jd_);
+    }
   }
 
 
@@ -53,63 +164,75 @@
   //  SAGA API functions
   void job_cpi_impl::sync_get_state (saga::job::state & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    ret = j_.get_state ();
   }
 
   void job_cpi_impl::sync_get_description (saga::job::description & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // simply return the description we originally received
+    // FIXME: need to re-create the description for reconnected jobs, i.e. need
+    // to remove the ssh specific command/arguments etc.
+    ret = old_jd_;
+    
   }
 
   void job_cpi_impl::sync_get_job_id (std::string & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: translate local jobid into ssh jobid
+    ret = j_.get_job_id ();
   }
 
   // access streams for communication with the child
   void job_cpi_impl::sync_get_stdin (saga::job::ostream & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    ret = j_.get_stdin ();
   }
 
   void job_cpi_impl::sync_get_stdout (saga::job::istream & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    ret = j_.get_stdout ();
   }
 
   void job_cpi_impl::sync_get_stderr (saga::job::istream & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    ret = j_.get_stderr ();
   }
 
   void job_cpi_impl::sync_checkpoint (saga::impl::void_t & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    j_.checkpoint ();
   }
 
-  void job_cpi_impl::sync_migrate (saga::impl::void_t           & ret, 
-                                   saga::job::description   jd)
+  void job_cpi_impl::sync_migrate (saga::impl::void_t   & ret, 
+                                   saga::job::description jd)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: jd should be translated just as in the c'tor
+    j_.migrate (jd);
   }
 
   void job_cpi_impl::sync_signal (saga::impl::void_t & ret, 
-                                  int            signal)
+                                  int                  signal)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: this is a tricky one: at the moment, the signal goes to the local
+    // ssh process, not to the remote process!  Not easy to fix...
+    j_.signal (signal);
   }
 
 
   //  suspend the child process 
   void job_cpi_impl::sync_suspend (saga::impl::void_t & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: this is a tricky one: at the moment, the signal goes to the local
+    // ssh process, not to the remote process!  Not easy to fix...
+    j_.suspend ();
   }
 
   //  suspend the child process 
   void job_cpi_impl::sync_resume (saga::impl::void_t & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: this is a tricky one: at the moment, the signal goes to the local
+    // ssh process, not to the remote process!  Not easy to fix...
+    j_.resume ();
   }
 
 
@@ -117,20 +240,24 @@
   // inherited from the task interface
   void job_cpi_impl::sync_run (saga::impl::void_t & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    j_.run ();
   }
 
   void job_cpi_impl::sync_cancel (saga::impl::void_t & ret, 
                                   double timeout)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: this is a tricky one: at the moment, the signal goes to the local
+    // ssh process, not to the remote process!  Not easy to fix...
+    j_.cancel (timeout);
   }
 
   //  wait for the child process to terminate
   void job_cpi_impl::sync_wait (bool   & ret, 
                                 double   timeout)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: this is a tricky one: at the moment, signals are catched from the
+    // local ssh process, not to the remote process!  Not easy to fix...
+    j_.wait (timeout);
   }
 
 } // namespace ssh_job

File [modified]: ssh_job.hpp
Delta lines: +27 -0
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job.hpp	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job.hpp	2009-01-09 12:01:58 UTC (rev 3250)
@@ -41,7 +41,34 @@
       // adaptor data
       typedef saga::adaptors::adaptor_data <adaptor> adaptor_data_type;
 
+      // state
+      saga::url                            rm_;
+      saga::context                        ctx_;
+      saga::job::service                   js_;
 
+      std::string                          host_;
+      std::string                          user_;
+      std::string                          key_;
+      std::string                          port_;
+
+      std::string                          ssh_bin_;
+      std::string                          scp_bin_;
+
+      std::vector <std::string>            ssh_opt_;
+      std::vector <std::string>            scp_opt_;
+
+      std::map <std::string, std::string>  ini_;
+      std::map <std::string, std::string>  env_;
+
+      saga::job::job                       j_;        // forward to default job
+
+      saga::job::description               old_jd_ ;  // original jd
+      saga::job::description               new_jd_ ;  // new local job description including ssh
+
+      std::string                          old_jobid_; // original job id
+      std::string                          new_jobid_; // new local job id
+
+
     public:
       // constructor of the job adaptor
       job_cpi_impl  (proxy                           * p, 

File [modified]: ssh_job_adaptor.cpp
Delta lines: +0 -6
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job_adaptor.cpp	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job_adaptor.cpp	2009-01-09 12:01:58 UTC (rev 3250)
@@ -32,12 +32,6 @@
     saga::impl::adaptor_selector::adaptor_info_list_type list;
 
     // create empty preference list
-    // these list should be filled with properties of the adaptor, 
-    // which can be used to select adaptors with specific preferences.
-    // Example:
-    //   'security' -> 'gsi'
-    //   'logging'  -> 'yes'
-    //   'auditing' -> 'no'
     preference_type prefs; 
 
     // create file adaptor infos (each adaptor instance gets its own uuid)

File [modified]: ssh_job_adaptor.ini
Delta lines: +1 -0
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job_adaptor.ini	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job_adaptor.ini	2009-01-09 12:01:58 UTC (rev 3250)
@@ -8,6 +8,7 @@
   name      = ssh_job
 # path      = $[saga.location]/lib
 # enabled   = true
+  preferences = saga.adaptor_suite.ssh.preferences
 
 [saga.adaptors.ssh_job.preferences]
   # adaptor specific configuration

File [modified]: ssh_job_service.cpp
Delta lines: +151 -20
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job_service.cpp	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job_service.cpp	2009-01-09 12:01:58 UTC (rev 3250)
@@ -36,9 +36,130 @@
                                               saga::ini::ini const & glob_ini, 
                                               saga::ini::ini const & adap_ini,
                                               TR1::shared_ptr <saga::adaptor> adaptor)
-    : base_cpi (p, info, adaptor, cpi::Noflags)
+    : base_cpi (p, info, adaptor, cpi::Noflags), 
+      // Create a local job adaptor to spawn off ssh commands at will.  
+      // If that throws, we simply pass on the exception.
+      js_ ("fork://localhost/")
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    adaptor_data  adata (this);
+    instance_data idata (this);
+
+    ini_ = adap_ini.get_section ("preferences").get_entries ();
+    rm_  = idata->rm_;
+
+    host_ = rm_.get_host ();
+
+    std::stringstream ss;
+    ss << rm_.get_port ();
+    port_ = ss.str ();
+
+    ssh_bin_ = ini_["ssh_bin"];
+    scp_bin_ = ini_["scp_bin"];
+
+    ssh_opt_ = saga::adaptors::utils::split (ini_["ssh_opt"]);
+    scp_opt_ = saga::adaptors::utils::split (ini_["scp_opt"]);
+
+    if ( rm_.get_scheme () != "ssh" &&
+         rm_.get_scheme () != "any" &&
+         rm_.get_scheme () != "" )
+    {
+      SAGA_ADAPTOR_THROW (std::string ("Adaptor only supports 'ssh' and 'any' URL schemes, not ")
+                          + rm_.get_scheme ().c_str (),
+                          saga::BadParameter);
+    }
+
+    // check if we have a context for ssh
+    std::vector <saga::context> contexts = p->get_session ().list_contexts ();    
+    std::vector <saga::context> ssh_contexts;
+
+    for ( int i = 0; i < contexts.size (); i++ )
+    {
+      if ( contexts[i].get_attribute ("Type") == "ssh" )
+      {
+        ssh_contexts.push_back (contexts[i]);
+      }
+    }
+    
+    if ( 0 == ssh_contexts.size () )
+    {
+      // FIXME: isn't a warning enough?  ssh may be configured ok out-of-bound.
+      SAGA_ADAPTOR_THROW_NO_CONTEXT ("no ssh context found for session",
+                                     saga::NoSuccess);
+    }
+
+    SAGA_LOG_DEBUG (rm_.get_string ().c_str ())
+
+
+    // We copy over the ssh identity file of the context we used, so that jobs
+    // running on that instance can use it to contact other instances using the
+    // same context.  Note that we do _not_ copy the identity files of other
+    // contexts, as we don't want to spread credentials beyond their respective
+    // universe.
+    //
+    // FIXME: need to pass location to started SAGA jobs on that host, via some
+    // environment variable
+    //
+    // FIXME: the saga ssh context should evaluate that variable, and try to
+    // pick up that identity
+    //
+    // FIXME: we need to be able to create multiple default ssh contexts
+
+    for ( int i = 0; i < ssh_contexts.size (); i++ )
+    {
+      // try that context
+      ctx_  = contexts[i];
+
+      key_  = ctx_.get_attribute ("UserKey");
+      user_ = ctx_.get_attribute ("UserID");
+
+      saga::adaptors::utils::process proc;
+
+      if ( ini_["distribute_idendity"] == "yes"  ||
+           ini_["distribute_idendity"] == "true" )
+      {
+        SAGA_LOG_DEBUG (" copying identity file");
+
+        proc.set_cmd  (scp_bin_);
+        proc.set_args (scp_opt_);
+
+        // FIXME: ensure that context is complete
+        proc.add_args ("-i", key_);
+
+        // file to stage
+        // FIXME: ensure that context is complete
+        // FIXME: we silently assume that the .ssh dirctory exists
+        // FIXME: the target below SHOULD not exist *aehem*
+        proc.add_arg (key_);
+        proc.add_arg (user_ + "@" + host_ + ":.ssh/id_saga");
+      }
+      else
+      {
+        SAGA_LOG_DEBUG (" running ssh test");
+
+        proc.set_cmd  (ssh_bin_);
+        proc.set_args (ssh_opt_);
+
+        // FIXME: ensure that context is complete
+        proc.add_args ("-i", key_);
+        proc.add_arg  (      user_ + "@" + host_);
+        proc.add_arg  ("true");
+      }
+
+
+      (void) proc.run_sync ();
+
+      if ( proc.done () )
+      {
+        // remember keyu location, so we can tell the started jobs about it
+        env_["SAGA_ADAPTOR_SSH_KEY"] = ".ssh/id_saga";
+
+        // we are done
+        return;
+      }
+    }
+
+    // no context was ok for scp or ssh - flag error
+    SAGA_ADAPTOR_THROW ("Could not connect to remote host", saga::NoSuccess);
   }
 
   // destructor
@@ -48,39 +169,49 @@
 
   //////////////////////////////////////////////////////////////////////
   // SAGA API functions
-  void 
-    job_service_cpi_impl::sync_create_job (saga::job::job         & ret, 
-                                           saga::job::description   jd)
+  void job_service_cpi_impl::sync_create_job (saga::job::job         & ret, 
+                                              saga::job::description   jd)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    SAGA_LOG_DEBUG ("SSH Create Job");
+    SAGA_LOG_DEBUG (rm_.get_string ().c_str ());
+
+    // we need only the context we identified as valid for the host.
+    saga::session s;
+    s.add_context (ctx_);
+
+    saga::job::job job = saga::adaptors::job (rm_, jd, s);
+    ret = job;
   }
 
-  void 
-    job_service_cpi_impl::sync_run_job (saga::job::job     & ret, 
-                                        std::string          cmd, 
-                                        std::string          host, 
-                                        saga::job::ostream & in, 
-                                        saga::job::istream & out, 
-                                        saga::job::istream & err)
+  void job_service_cpi_impl::sync_run_job (saga::job::job     & ret, 
+                                           std::string          cmd, 
+                                           std::string          host, 
+                                           saga::job::ostream & in, 
+                                           saga::job::istream & out, 
+                                           saga::job::istream & err)
   {
+    // we rely on the package fallback
     SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
   }
 
-  void 
-    job_service_cpi_impl::sync_list (std::vector <std::string> & ret)
+  void job_service_cpi_impl::sync_list (std::vector <std::string> & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: we should grep for ssh jobs
+    // FIXME: translate local jobids into ssh jobids
+    ret = js_.list ();
   }
 
-  void
-    job_service_cpi_impl::sync_get_job (saga::job::job & ret, 
-                                        std::string      jobid)
+  void job_service_cpi_impl::sync_get_job (saga::job::job & ret, 
+                                           std::string      jobid)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    // FIXME: need to translate the ssh jobid into local jobid
+    // FIXME: correctly initialize the ssh::job implmentation, please
+    ret = js_.get_job (jobid);
   }
 
   void job_service_cpi_impl::sync_get_self (saga::job::self & ret)
   {
+    // the ssh adaptor can never implement job::self
     SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
   }
 

File [modified]: ssh_job_service.hpp
Delta lines: +21 -1
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job_service.hpp	2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job_service.hpp	2009-01-09 12:01:58 UTC (rev 3250)
@@ -42,8 +42,28 @@
               base_cpi;
 
       // adaptor data
-      typedef saga::adaptors::adaptor_data <adaptor> adaptor_data_type;
+      typedef saga::adaptors::adaptor_data <adaptor> adaptor_data;
 
+      // state
+      saga::url                            rm_;
+      saga::session                        s_;
+      saga::context                        ctx_;
+      saga::job::service                   js_;
+
+      std::string                          host_;
+      std::string                          user_;
+      std::string                          key_;
+      std::string                          port_;
+
+      std::string                          ssh_bin_;
+      std::string                          scp_bin_;
+
+      std::vector <std::string>            ssh_opt_;
+      std::vector <std::string>            scp_opt_;
+
+      std::map <std::string, std::string>  ini_;
+      std::map <std::string, std::string>  env_;
+
     public:
       // constructor of the job_service cpi
       job_service_cpi_impl  (proxy                           * p, 



More information about the saga-devel mailing list