[Saga-devel] saga SVN commit 3250: /trunk/adaptors/ssh/
amerzky at cct.lsu.edu
amerzky at cct.lsu.edu
Fri Jan 9 06:02:37 CST 2009
User: amerzky
Date: 2009/01/09 06:02 AM
Modified:
/trunk/adaptors/ssh/
Makefile, configure, configure.in
/trunk/adaptors/ssh/config/
saga_ssh.m4
/trunk/adaptors/ssh/ssh_context/
ssh_context_adaptor.cpp, ssh_context_adaptor.ini.in
/trunk/adaptors/ssh/ssh_file/
README, ssh_file_adaptor.cpp, ssh_file_adaptor_sshfs.cpp
/trunk/adaptors/ssh/ssh_job/
ssh_job.cpp, ssh_job.hpp, ssh_job_adaptor.cpp, ssh_job_adaptor.ini, ssh_job_service.cpp, ssh_job_service.hpp
Log:
ssh job adaptor is working.
main limitation: signals are sent to the local ssh process,
_not_ to the remotely started process. That is probably not
what the user wants. That also holds for suspend/resume, which
act on the local ssh process.
reason: I simply do not know how to find the PID of the remote
job - ssh does not report it. BTW: using libssh would not solve
that problem. At the moment, I can't think of a clean solution.
Any opinion?
A
File Changes:
Directory: /trunk/adaptors/ssh/config/
======================================
File [modified]: saga_ssh.m4
Delta lines: +37 -0
===================================================================
--- trunk/adaptors/ssh/config/saga_ssh.m4 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/config/saga_ssh.m4 2009-01-09 12:01:58 UTC (rev 3250)
@@ -16,6 +16,8 @@
#
# AC_SUBST(HAVE_SSH)
# AC_SUBST(SAGA_SSH)
+# AC_SUBST(HAVE_SCP)
+# AC_SUBST(SAGA_SCP)
# AC_SUBST(HAVE_SSHFS)
# AC_SUBST(SAGA_SSHFS)
#
@@ -67,6 +69,41 @@
])
+AC_DEFUN([AX_SAGA_CHECK_SCP],
+[
+ AC_ARG_WITH([scp],
+ [AS_HELP_STRING([--with-scp],
+ [path to scp binary @<:@default=check@:>@])],
+ [],
+ [with_scp=check])
+
+ if test "x$with_scp" = "xcheck"; then
+
+ AC_PATH_PROG(SAGA_SCP, scp)
+
+ HAVE_SCP=no
+
+ if ! test "x$SAGA_SCP" = "x"; then
+ HAVE_SCP=yes
+ fi
+
+ else
+
+ if test -x "$with_scp"; then
+ HAVE_SCP=yes
+ SAGA_SCP=$with_scp
+ else
+ HAVE_SCP=no
+ fi
+
+ fi
+
+
+ AC_SUBST(HAVE_SCP)
+ AC_SUBST(SAGA_SCP)
+])
+
+
AC_DEFUN([AX_SAGA_CHECK_SSHFS],
[
AC_ARG_WITH([sshfs],
Directory: /trunk/adaptors/ssh/
===============================
File [modified]: Makefile
Delta lines: +4 -0
===================================================================
--- trunk/adaptors/ssh/Makefile 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/Makefile 2009-01-09 12:01:58 UTC (rev 3250)
@@ -22,3 +22,7 @@
-include $(SAGA_MAKE_INCLUDE_ROOT)/saga.mk
+ssh_context: config
+ssh_file: config
+ssh_job: config
+
File [modified]: configure
Delta lines: +93 -6
===================================================================
--- trunk/adaptors/ssh/configure 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/configure 2009-01-09 12:01:58 UTC (rev 3250)
@@ -622,9 +622,12 @@
TMP_SAGA_INSTTYPE
SAGA_SSH
HAVE_SSH
+SAGA_SCP
+HAVE_SCP
SAGA_SSHFS
HAVE_SSHFS
CPP_HAVE_SSH
+CPP_HAVE_SCP
CPP_HAVE_SSHFS
BUILD_ADAPTOR_CONTEXT
BUILD_ADAPTOR_FILE
@@ -1206,6 +1209,7 @@
--with-PACKAGE[=ARG] use PACKAGE [ARG=yes]
--without-PACKAGE do not use PACKAGE (same as --with-PACKAGE=no)
--with-ssh path to ssh binary [default=check]
+ --with-scp path to scp binary [default=check]
--with-sshfs path to sshfs binary [default=check]
Some influential environment variables:
@@ -1914,6 +1918,80 @@
if test "$HAVE_SSH" = "yes"; then
+# Check whether --with-scp was given.
+if test "${with_scp+set}" = set; then
+ withval=$with_scp;
+else
+ with_scp=check
+fi
+
+
+ if test "x$with_scp" = "xcheck"; then
+
+ # Extract the first word of "scp", so it can be a program name with args.
+set dummy scp; ac_word=$2
+{ echo "$as_me:$LINENO: checking for $ac_word" >&5
+echo $ECHO_N "checking for $ac_word... $ECHO_C" >&6; }
+if test "${ac_cv_path_SAGA_SCP+set}" = set; then
+ echo $ECHO_N "(cached) $ECHO_C" >&6
+else
+ case $SAGA_SCP in
+ [\\/]* | ?:[\\/]*)
+ ac_cv_path_SAGA_SCP="$SAGA_SCP" # Let the user override the test with a path.
+ ;;
+ *)
+ as_save_IFS=$IFS; IFS=$PATH_SEPARATOR
+for as_dir in $PATH
+do
+ IFS=$as_save_IFS
+ test -z "$as_dir" && as_dir=.
+ for ac_exec_ext in '' $ac_executable_extensions; do
+ if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+ ac_cv_path_SAGA_SCP="$as_dir/$ac_word$ac_exec_ext"
+ echo "$as_me:$LINENO: found $as_dir/$ac_word$ac_exec_ext" >&5
+ break 2
+ fi
+done
+done
+IFS=$as_save_IFS
+
+ ;;
+esac
+fi
+SAGA_SCP=$ac_cv_path_SAGA_SCP
+if test -n "$SAGA_SCP"; then
+ { echo "$as_me:$LINENO: result: $SAGA_SCP" >&5
+echo "${ECHO_T}$SAGA_SCP" >&6; }
+else
+ { echo "$as_me:$LINENO: result: no" >&5
+echo "${ECHO_T}no" >&6; }
+fi
+
+
+
+ HAVE_SCP=no
+
+ if ! test "x$SAGA_SCP" = "x"; then
+ HAVE_SCP=yes
+ fi
+
+ else
+
+ if test -x "$with_scp"; then
+ HAVE_SCP=yes
+ SAGA_SCP=$with_scp
+ else
+ HAVE_SCP=no
+ fi
+
+ fi
+
+
+
+
+
+
+
# Check whether --with-sshfs was given.
if test "${with_sshfs+set}" = set; then
withval=$with_sshfs;
@@ -1995,14 +2073,15 @@
# translate defines into integers
CPP_HAVE_SSH=0
+CPP_HAVE_SCP=0
+CPP_HAVE_SSHFS=0
-if test "$HAVE_SSH" = "yes"; then
+if test "$HAVE_SSH-$HAVE_SCP-$HAVE_SSHFS" = "yes-yes-yes"; then
CPP_HAVE_SSH=1
+ CPP_HAVE_SCP=1
+ CPP_HAVE_SSHFS=1
fi
-if test "$HAVE_SSHFS" = "yes"; then
- CPP_HAVE_SSH=1
-fi
@@ -2021,7 +2100,7 @@
BUILD_ADAPTOR_JOB="yes"
fi
- if test "x$HAVE_SSHFS" = "xyes"; then
+ if test "$HAVE_SSHFS-$HAVE_SCP" = "yes-yes"; then
if test "x$SAGA_HAVE_ADAPTOR_FILE" = "xyes"; then
BUILD_ADAPTOR_FILE="yes"
fi
@@ -2033,6 +2112,10 @@
SAGA_SSH_S="($SAGA_SSH)"
fi
+if test "x$HAVE_SCP" = "xyes"; then
+ SAGA_SCP_S="($SAGA_SCP)"
+fi
+
if test "x$HAVE_SSHFS" = "xyes"; then
SAGA_SSHFS_S="($SAGA_SSHFS)"
fi
@@ -2725,9 +2808,12 @@
TMP_SAGA_INSTTYPE!$TMP_SAGA_INSTTYPE$ac_delim
SAGA_SSH!$SAGA_SSH$ac_delim
HAVE_SSH!$HAVE_SSH$ac_delim
+SAGA_SCP!$SAGA_SCP$ac_delim
+HAVE_SCP!$HAVE_SCP$ac_delim
SAGA_SSHFS!$SAGA_SSHFS$ac_delim
HAVE_SSHFS!$HAVE_SSHFS$ac_delim
CPP_HAVE_SSH!$CPP_HAVE_SSH$ac_delim
+CPP_HAVE_SCP!$CPP_HAVE_SCP$ac_delim
CPP_HAVE_SSHFS!$CPP_HAVE_SSHFS$ac_delim
BUILD_ADAPTOR_CONTEXT!$BUILD_ADAPTOR_CONTEXT$ac_delim
BUILD_ADAPTOR_FILE!$BUILD_ADAPTOR_FILE$ac_delim
@@ -2736,7 +2822,7 @@
LTLIBOBJS!$LTLIBOBJS$ac_delim
_ACEOF
- if test `sed -n "s/.*$ac_delim\$/X/p" conf$$subs.sed | grep -c X` = 55; then
+ if test `sed -n "s/.*$ac_delim\$/X/p" conf$$subs.sed | grep -c X` = 58; then
break
elif $ac_last_try; then
{ { echo "$as_me:$LINENO: error: could not make $CONFIG_STATUS" >&5
@@ -3076,6 +3162,7 @@
echo " Using SAGA from : $TMP_SAGA_LOCATION ($TMP_SAGA_INSTTYPE) " | $TEE $OUT
echo " " | $TEE $OUT
echo " SSH found : $HAVE_SSH $SAGA_SSH_S " | $TEE $OUT
+echo " SCP found : $HAVE_SCP $SAGA_SCP_S " | $TEE $OUT
echo " SSHFS found : $HAVE_SSHFS $SAGA_SSHFS_S " | $TEE $OUT
echo " " | $TEE $OUT
echo " ======================================================== " | $TEE $OUT
File [modified]: configure.in
Delta lines: +13 -6
===================================================================
--- trunk/adaptors/ssh/configure.in 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/configure.in 2009-01-09 12:01:58 UTC (rev 3250)
@@ -22,6 +22,7 @@
AX_SAGA_CHECK_SSH()
if test "$HAVE_SSH" = "yes"; then
+ AX_SAGA_CHECK_SCP()
AX_SAGA_CHECK_SSHFS()
fi
@@ -30,16 +31,17 @@
# translate defines into integers
CPP_HAVE_SSH=0
+CPP_HAVE_SCP=0
+CPP_HAVE_SSHFS=0
-if test "$HAVE_SSH" = "yes"; then
+if test "$HAVE_SSH-$HAVE_SCP-$HAVE_SSHFS" = "yes-yes-yes"; then
CPP_HAVE_SSH=1
+ CPP_HAVE_SCP=1
+ CPP_HAVE_SSHFS=1
fi
-if test "$HAVE_SSHFS" = "yes"; then
- CPP_HAVE_SSH=1
-fi
-
AC_SUBST(CPP_HAVE_SSH)
+AC_SUBST(CPP_HAVE_SCP)
AC_SUBST(CPP_HAVE_SSHFS)
BUILD_ADAPTOR_CONTEXT=no
@@ -56,7 +58,7 @@
BUILD_ADAPTOR_JOB="yes"
fi
- if test "x$HAVE_SSHFS" = "xyes"; then
+ if test "$HAVE_SSHFS-$HAVE_SCP" = "yes-yes"; then
if test "x$SAGA_HAVE_ADAPTOR_FILE" = "xyes"; then
BUILD_ADAPTOR_FILE="yes"
fi
@@ -68,6 +70,10 @@
SAGA_SSH_S="($SAGA_SSH)"
fi
+if test "x$HAVE_SCP" = "xyes"; then
+ SAGA_SCP_S="($SAGA_SCP)"
+fi
+
if test "x$HAVE_SSHFS" = "xyes"; then
SAGA_SSHFS_S="($SAGA_SSHFS)"
fi
@@ -95,6 +101,7 @@
echo " Using SAGA from : $TMP_SAGA_LOCATION ($TMP_SAGA_INSTTYPE) " | $TEE $OUT
echo " " | $TEE $OUT
echo " SSH found : $HAVE_SSH $SAGA_SSH_S " | $TEE $OUT
+echo " SCP found : $HAVE_SCP $SAGA_SCP_S " | $TEE $OUT
echo " SSHFS found : $HAVE_SSHFS $SAGA_SSHFS_S " | $TEE $OUT
echo " " | $TEE $OUT
echo " ======================================================== " | $TEE $OUT
Directory: /trunk/adaptors/ssh/ssh_context/
===========================================
File [modified]: ssh_context_adaptor.cpp
Delta lines: +0 -5
===================================================================
--- trunk/adaptors/ssh/ssh_context/ssh_context_adaptor.cpp 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_context/ssh_context_adaptor.cpp 2009-01-09 12:01:58 UTC (rev 3250)
@@ -81,8 +81,6 @@
cert_info_t ci;
- std::cerr << " === key : " << saga::attributes::context_userkey << std::endl;
-
if ( attr.attribute_exists (saga::attributes::context_userkey) )
{
// this call looks for a valid proxy file in the location described by
@@ -118,7 +116,6 @@
}
else
{
- std::cerr << " === " << ci.errormessage << std::endl;
SAGA_ADAPTOR_THROW (ci.errormessage, saga::NoSuccess);
}
}
@@ -168,8 +165,6 @@
ci.success = true;
- std::cerr << " === path: " << path << std::endl;
-
// fall back to default if needed
if ( path == "" )
{
File [modified]: ssh_context_adaptor.ini.in
Delta lines: +4 -1
===================================================================
--- trunk/adaptors/ssh/ssh_context/ssh_context_adaptor.ini.in 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_context/ssh_context_adaptor.ini.in 2009-01-09 12:01:58 UTC (rev 3250)
@@ -13,6 +13,9 @@
ssh_bin = @SAGA_SSH@
ssh_opt = -o StrictHostKeyChecking=no
+ scp_bin = @SAGA_SCP@
+ scp_opt = -o StrictHostKeyChecking=no
+
sshfs_bin = @SAGA_SSHFS@
- sshfs_opt = -o workaround=nonodelay
+ sshfs_opt = -o workaround=nonodelay -o StrictHostKeyChecking=no
Directory: /trunk/adaptors/ssh/ssh_file/
========================================
File [modified]: README
Delta lines: +5 -9
===================================================================
--- trunk/adaptors/ssh/ssh_file/README 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_file/README 2009-01-09 12:01:58 UTC (rev 3250)
@@ -13,14 +13,10 @@
The adaptor mounts the remote file systems into
- ~/.saga/adaptors/ssh/ssh_file/mnt/<saga_object_id>
+ ~/.saga/adaptors/ssh/ssh_file/mnt/<unique_id>
-Obviously, multiple directory objects accessing the same remote file system will
-thus mount that file system multiple times. At the moment is unclear if that is
-a problem (beyond the object initialization latency).
+Latency for mounting a fs is a <10 seconds (usually about 1). The
+mounts are refcounted, so the last object using the mount is doing the
+umount. (The 'sshfs_keepalive' option determines if the filesystem is
+umounted when the respective SAGA object gets destroyed.)
-The 'sshfs_keepalive' option determines if the filesystem is umounted when the
-respective SAGA object gets destroyed.
-
-TODO: Security and context management needs documentation.
-
File [modified]: ssh_file_adaptor.cpp
Delta lines: +0 -5
===================================================================
--- trunk/adaptors/ssh/ssh_file/ssh_file_adaptor.cpp 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_file/ssh_file_adaptor.cpp 2009-01-09 12:01:58 UTC (rev 3250)
@@ -19,8 +19,6 @@
saga::impl::adaptor_selector::adaptor_info_list_type
file_adaptor::adaptor_register (saga::impl::session * s)
{
- std::cout << " === ctor" << std::endl;
-
// list of implemented cpi's
saga::impl::adaptor_selector::adaptor_info_list_type infos;
preference_type prefs;
@@ -34,7 +32,6 @@
// on destruction, umount all sshfs file systems - if keepalive is not set
file_adaptor::~file_adaptor (void)
{
- std::cout << " === dtor" << std::endl;
// for ( unsigned int i = 0; i < mounted_.size (); i++ )
// {
// // release the sshfs shared_ptr. On sshfs destruction, the file system
@@ -48,8 +45,6 @@
const saga::session & s,
const saga::url & u)
{
- std::cout << " === get_sshfs " << u << "\n";
-
// get the id for the (potential) mount point.
// id is host:port
std::string host = u.get_host ();
File [modified]: ssh_file_adaptor_sshfs.cpp
Delta lines: +1 -5
===================================================================
--- trunk/adaptors/ssh/ssh_file/ssh_file_adaptor_sshfs.cpp 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_file/ssh_file_adaptor_sshfs.cpp 2009-01-09 12:01:58 UTC (rev 3250)
@@ -18,9 +18,7 @@
, s_ (s)
, url_ (u)
{
- SAGA_LOG_ALWAYS("=============================================");
- SAGA_LOG_ALWAYS(u.get_string ().c_str ());
- SAGA_LOG_ALWAYS("=============================================");
+ SAGA_LOG_DEBUG(u.get_string ().c_str ());
// can we handle that URL?
if ( url_.get_scheme () != "any" &&
@@ -188,8 +186,6 @@
saga::url u ("file://localhost/");
u.set_path (mount_);
- std::cout << " ======= trying to create " << u << "\n";
-
saga::filesystem::directory d (s_, u,
saga::filesystem::Create |
saga::filesystem::CreateParents);
Directory: /trunk/adaptors/ssh/ssh_job/
=======================================
File [modified]: ssh_job.cpp
Delta lines: +146 -19
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job.cpp 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job.cpp 2009-01-09 12:01:58 UTC (rev 3250)
@@ -33,14 +33,125 @@
{
// constructor
+ //
+ // note that we can never be job::self - that is usually handled by the local
+ // adaptor.
job_cpi_impl::job_cpi_impl (proxy * p,
cpi_info const & info,
saga::ini::ini const & glob_ini,
saga::ini::ini const & adap_ini,
TR1::shared_ptr <saga::adaptor> adaptor)
- : base_cpi (p, info, adaptor, cpi::Noflags)
+ : base_cpi (p, info, adaptor, cpi::Noflags),
+ js_ ("fork://localhost/")
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ instance_data idata (this);
+ adaptor_data_type adata (this);
+
+ ini_ = adap_ini.get_section ("preferences").get_entries ();
+ rm_ = idata->rm_;
+
+ SAGA_LOG_DEBUG ("SSH Job c'tor");
+ SAGA_LOG_DEBUG (rm_.get_string ().c_str ());
+
+ host_ = rm_.get_host ();
+
+ std::stringstream ss;
+ ss << rm_.get_port ();
+ port_ = ss.str ();
+
+ ssh_bin_ = ini_["ssh_bin"];
+ scp_bin_ = ini_["scp_bin"];
+
+ ssh_opt_ = saga::adaptors::utils::split (ini_["ssh_opt"]);
+ scp_opt_ = saga::adaptors::utils::split (ini_["scp_opt"]);
+
+ if ( rm_.get_scheme () != "ssh" &&
+ rm_.get_scheme () != "any" &&
+ rm_.get_scheme () != "" )
+ {
+ SAGA_ADAPTOR_THROW (std::string ("Adaptor only supports 'ssh' and 'any' URL schemes, not ")
+ + rm_.get_scheme ().c_str (),
+ saga::BadParameter);
+ }
+
+ // trust the job service to give us a session with a valid context
+ std::vector <saga::context> contexts = p->get_session ().list_contexts ();
+ ctx_ = contexts[0];
+
+ // sanity check
+ if ( ctx_.get_attribute ("Type") != "ssh" )
+ {
+ SAGA_ADAPTOR_THROW_NO_CONTEXT ("no ssh context found for session",
+ saga::NoSuccess);
+ }
+
+ key_ = ctx_.get_attribute ("UserKey");
+ user_ = ctx_.get_attribute ("UserID");
+
+ SAGA_LOG_DEBUG (rm_.get_string ().c_str ());
+
+
+ // FIXME: make sure that rm_.get_host () and jd['CandidateHost'] match
+
+ if ( idata->init_from_jobid_ )
+ {
+ old_jobid_ = idata->jobid_;
+
+ // FIXME: extract default adaptors jobid from jobid, get job from default job
+ // service, and get job from there, too. Also get jd, and create old/new
+ // jd
+
+ j_ = js_.get_job (new_jobid_);
+ }
+ else
+ {
+ // we don't init from jobid, this we init from job description
+
+ // we simply add the ssh command in front of the job cmd line, and reuse the
+ // jd
+ old_jd_ = idata->jd_; // just save
+ new_jd_ = idata->jd_; // save and fix for ssh
+
+ // save old exe and args
+ std::string old_exe;
+ std::vector <std::string> old_args;
+
+ old_exe = old_jd_.get_attribute (saga::job::attributes::description_executable);
+
+ if ( old_jd_.attribute_exists (saga::job::attributes::description_arguments) )
+ {
+ old_args = old_jd_.get_vector_attribute (saga::job::attributes::description_arguments);
+ }
+
+ // create new args and exe, and set them in the new jd
+ std::string new_exe = ssh_bin_;
+ std::vector <std::string> new_args = ssh_opt_;
+ std::vector <std::string> new_hosts;
+
+ // add ssh specific args
+ new_args.push_back ("-i");
+ new_args.push_back (key_);
+ new_args.push_back (user_ + "@" + host_);
+
+ // readd old exe and args
+ new_args.push_back (old_exe);
+
+ for ( int i = 0; i < old_args.size (); i++ )
+ {
+ new_args.push_back (old_args[i]);
+ }
+
+ // we only want to run on localhost
+ new_hosts.push_back ("localhost");
+
+ // fill jd with new settings
+ new_jd_.set_attribute (saga::job::attributes::description_executable, new_exe);
+ new_jd_.set_vector_attribute (saga::job::attributes::description_arguments, new_args);
+ new_jd_.set_vector_attribute (saga::job::attributes::description_candidate_hosts, new_hosts);
+
+ // create the job with the altered jd
+ j_ = js_.create_job (new_jd_);
+ }
}
@@ -53,63 +164,75 @@
// SAGA API functions
void job_cpi_impl::sync_get_state (saga::job::state & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ ret = j_.get_state ();
}
void job_cpi_impl::sync_get_description (saga::job::description & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // simply return the description we originally received
+ // FIXME: need to re-create the description for reconnected jobs, i.e. need
+ // to remove the ssh specific command/arguments etc.
+ ret = old_jd_;
+
}
void job_cpi_impl::sync_get_job_id (std::string & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: translate local jobid into ssh jobid
+ ret = j_.get_job_id ();
}
// access streams for communication with the child
void job_cpi_impl::sync_get_stdin (saga::job::ostream & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ ret = j_.get_stdin ();
}
void job_cpi_impl::sync_get_stdout (saga::job::istream & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ ret = j_.get_stdout ();
}
void job_cpi_impl::sync_get_stderr (saga::job::istream & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ ret = j_.get_stderr ();
}
void job_cpi_impl::sync_checkpoint (saga::impl::void_t & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ j_.checkpoint ();
}
- void job_cpi_impl::sync_migrate (saga::impl::void_t & ret,
- saga::job::description jd)
+ void job_cpi_impl::sync_migrate (saga::impl::void_t & ret,
+ saga::job::description jd)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: jd should be translated just as in the c'tor
+ j_.migrate (jd);
}
void job_cpi_impl::sync_signal (saga::impl::void_t & ret,
- int signal)
+ int signal)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: this is a tricky one: at the moment, the signal goes to the local
+ // ssh process, not to the remote process! Not easy to fix...
+ j_.signal (signal);
}
// suspend the child process
void job_cpi_impl::sync_suspend (saga::impl::void_t & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: this is a tricky one: at the moment, the signal goes to the local
+ // ssh process, not to the remote process! Not easy to fix...
+ j_.suspend ();
}
// suspend the child process
void job_cpi_impl::sync_resume (saga::impl::void_t & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: this is a tricky one: at the moment, the signal goes to the local
+ // ssh process, not to the remote process! Not easy to fix...
+ j_.resume ();
}
@@ -117,20 +240,24 @@
// inherited from the task interface
void job_cpi_impl::sync_run (saga::impl::void_t & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ j_.run ();
}
void job_cpi_impl::sync_cancel (saga::impl::void_t & ret,
double timeout)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: this is a tricky one: at the moment, the signal goes to the local
+ // ssh process, not to the remote process! Not easy to fix...
+ j_.cancel (timeout);
}
// wait for the child process to terminate
void job_cpi_impl::sync_wait (bool & ret,
double timeout)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: this is a tricky one: at the moment, signals are catched from the
+ // local ssh process, not to the remote process! Not easy to fix...
+ j_.wait (timeout);
}
} // namespace ssh_job
File [modified]: ssh_job.hpp
Delta lines: +27 -0
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job.hpp 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job.hpp 2009-01-09 12:01:58 UTC (rev 3250)
@@ -41,7 +41,34 @@
// adaptor data
typedef saga::adaptors::adaptor_data <adaptor> adaptor_data_type;
+ // state
+ saga::url rm_;
+ saga::context ctx_;
+ saga::job::service js_;
+ std::string host_;
+ std::string user_;
+ std::string key_;
+ std::string port_;
+
+ std::string ssh_bin_;
+ std::string scp_bin_;
+
+ std::vector <std::string> ssh_opt_;
+ std::vector <std::string> scp_opt_;
+
+ std::map <std::string, std::string> ini_;
+ std::map <std::string, std::string> env_;
+
+ saga::job::job j_; // forward to default job
+
+ saga::job::description old_jd_ ; // original jd
+ saga::job::description new_jd_ ; // new local job description including ssh
+
+ std::string old_jobid_; // original job id
+ std::string new_jobid_; // new local job id
+
+
public:
// constructor of the job adaptor
job_cpi_impl (proxy * p,
File [modified]: ssh_job_adaptor.cpp
Delta lines: +0 -6
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job_adaptor.cpp 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job_adaptor.cpp 2009-01-09 12:01:58 UTC (rev 3250)
@@ -32,12 +32,6 @@
saga::impl::adaptor_selector::adaptor_info_list_type list;
// create empty preference list
- // these list should be filled with properties of the adaptor,
- // which can be used to select adaptors with specific preferences.
- // Example:
- // 'security' -> 'gsi'
- // 'logging' -> 'yes'
- // 'auditing' -> 'no'
preference_type prefs;
// create file adaptor infos (each adaptor instance gets its own uuid)
File [modified]: ssh_job_adaptor.ini
Delta lines: +1 -0
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job_adaptor.ini 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job_adaptor.ini 2009-01-09 12:01:58 UTC (rev 3250)
@@ -8,6 +8,7 @@
name = ssh_job
# path = $[saga.location]/lib
# enabled = true
+ preferences = saga.adaptor_suite.ssh.preferences
[saga.adaptors.ssh_job.preferences]
# adaptor specific configuration
File [modified]: ssh_job_service.cpp
Delta lines: +151 -20
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job_service.cpp 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job_service.cpp 2009-01-09 12:01:58 UTC (rev 3250)
@@ -36,9 +36,130 @@
saga::ini::ini const & glob_ini,
saga::ini::ini const & adap_ini,
TR1::shared_ptr <saga::adaptor> adaptor)
- : base_cpi (p, info, adaptor, cpi::Noflags)
+ : base_cpi (p, info, adaptor, cpi::Noflags),
+ // Create a local job adaptor to spawn off ssh commands at will.
+ // If that throws, we simply pass on the exception.
+ js_ ("fork://localhost/")
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ adaptor_data adata (this);
+ instance_data idata (this);
+
+ ini_ = adap_ini.get_section ("preferences").get_entries ();
+ rm_ = idata->rm_;
+
+ host_ = rm_.get_host ();
+
+ std::stringstream ss;
+ ss << rm_.get_port ();
+ port_ = ss.str ();
+
+ ssh_bin_ = ini_["ssh_bin"];
+ scp_bin_ = ini_["scp_bin"];
+
+ ssh_opt_ = saga::adaptors::utils::split (ini_["ssh_opt"]);
+ scp_opt_ = saga::adaptors::utils::split (ini_["scp_opt"]);
+
+ if ( rm_.get_scheme () != "ssh" &&
+ rm_.get_scheme () != "any" &&
+ rm_.get_scheme () != "" )
+ {
+ SAGA_ADAPTOR_THROW (std::string ("Adaptor only supports 'ssh' and 'any' URL schemes, not ")
+ + rm_.get_scheme ().c_str (),
+ saga::BadParameter);
+ }
+
+ // check if we have a context for ssh
+ std::vector <saga::context> contexts = p->get_session ().list_contexts ();
+ std::vector <saga::context> ssh_contexts;
+
+ for ( int i = 0; i < contexts.size (); i++ )
+ {
+ if ( contexts[i].get_attribute ("Type") == "ssh" )
+ {
+ ssh_contexts.push_back (contexts[i]);
+ }
+ }
+
+ if ( 0 == ssh_contexts.size () )
+ {
+ // FIXME: isn't a warning enough? ssh may be configured ok out-of-bound.
+ SAGA_ADAPTOR_THROW_NO_CONTEXT ("no ssh context found for session",
+ saga::NoSuccess);
+ }
+
+ SAGA_LOG_DEBUG (rm_.get_string ().c_str ())
+
+
+ // We copy over the ssh identity file of the context we used, so that jobs
+ // running on that instance can use it to contact other instances using the
+ // same context. Note that we do _not_ copy the identity files of other
+ // contexts, as we don't want to spread credentials beyond their respective
+ // universe.
+ //
+ // FIXME: need to pass location to started SAGA jobs on that host, via some
+ // environment variable
+ //
+ // FIXME: the saga ssh context should evaluate that variable, and try to
+ // pick up that identity
+ //
+ // FIXME: we need to be able to create multiple default ssh contexts
+
+ for ( int i = 0; i < ssh_contexts.size (); i++ )
+ {
+ // try that context
+ ctx_ = contexts[i];
+
+ key_ = ctx_.get_attribute ("UserKey");
+ user_ = ctx_.get_attribute ("UserID");
+
+ saga::adaptors::utils::process proc;
+
+ if ( ini_["distribute_idendity"] == "yes" ||
+ ini_["distribute_idendity"] == "true" )
+ {
+ SAGA_LOG_DEBUG (" copying identity file");
+
+ proc.set_cmd (scp_bin_);
+ proc.set_args (scp_opt_);
+
+ // FIXME: ensure that context is complete
+ proc.add_args ("-i", key_);
+
+ // file to stage
+ // FIXME: ensure that context is complete
+ // FIXME: we silently assume that the .ssh dirctory exists
+ // FIXME: the target below SHOULD not exist *aehem*
+ proc.add_arg (key_);
+ proc.add_arg (user_ + "@" + host_ + ":.ssh/id_saga");
+ }
+ else
+ {
+ SAGA_LOG_DEBUG (" running ssh test");
+
+ proc.set_cmd (ssh_bin_);
+ proc.set_args (ssh_opt_);
+
+ // FIXME: ensure that context is complete
+ proc.add_args ("-i", key_);
+ proc.add_arg ( user_ + "@" + host_);
+ proc.add_arg ("true");
+ }
+
+
+ (void) proc.run_sync ();
+
+ if ( proc.done () )
+ {
+ // remember keyu location, so we can tell the started jobs about it
+ env_["SAGA_ADAPTOR_SSH_KEY"] = ".ssh/id_saga";
+
+ // we are done
+ return;
+ }
+ }
+
+ // no context was ok for scp or ssh - flag error
+ SAGA_ADAPTOR_THROW ("Could not connect to remote host", saga::NoSuccess);
}
// destructor
@@ -48,39 +169,49 @@
//////////////////////////////////////////////////////////////////////
// SAGA API functions
- void
- job_service_cpi_impl::sync_create_job (saga::job::job & ret,
- saga::job::description jd)
+ void job_service_cpi_impl::sync_create_job (saga::job::job & ret,
+ saga::job::description jd)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ SAGA_LOG_DEBUG ("SSH Create Job");
+ SAGA_LOG_DEBUG (rm_.get_string ().c_str ());
+
+ // we need only the context we identified as valid for the host.
+ saga::session s;
+ s.add_context (ctx_);
+
+ saga::job::job job = saga::adaptors::job (rm_, jd, s);
+ ret = job;
}
- void
- job_service_cpi_impl::sync_run_job (saga::job::job & ret,
- std::string cmd,
- std::string host,
- saga::job::ostream & in,
- saga::job::istream & out,
- saga::job::istream & err)
+ void job_service_cpi_impl::sync_run_job (saga::job::job & ret,
+ std::string cmd,
+ std::string host,
+ saga::job::ostream & in,
+ saga::job::istream & out,
+ saga::job::istream & err)
{
+ // we rely on the package fallback
SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
}
- void
- job_service_cpi_impl::sync_list (std::vector <std::string> & ret)
+ void job_service_cpi_impl::sync_list (std::vector <std::string> & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: we should grep for ssh jobs
+ // FIXME: translate local jobids into ssh jobids
+ ret = js_.list ();
}
- void
- job_service_cpi_impl::sync_get_job (saga::job::job & ret,
- std::string jobid)
+ void job_service_cpi_impl::sync_get_job (saga::job::job & ret,
+ std::string jobid)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ // FIXME: need to translate the ssh jobid into local jobid
+ // FIXME: correctly initialize the ssh::job implmentation, please
+ ret = js_.get_job (jobid);
}
void job_service_cpi_impl::sync_get_self (saga::job::self & ret)
{
+ // the ssh adaptor can never implement job::self
SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
}
File [modified]: ssh_job_service.hpp
Delta lines: +21 -1
===================================================================
--- trunk/adaptors/ssh/ssh_job/ssh_job_service.hpp 2009-01-09 11:19:55 UTC (rev 3249)
+++ trunk/adaptors/ssh/ssh_job/ssh_job_service.hpp 2009-01-09 12:01:58 UTC (rev 3250)
@@ -42,8 +42,28 @@
base_cpi;
// adaptor data
- typedef saga::adaptors::adaptor_data <adaptor> adaptor_data_type;
+ typedef saga::adaptors::adaptor_data <adaptor> adaptor_data;
+ // state
+ saga::url rm_;
+ saga::session s_;
+ saga::context ctx_;
+ saga::job::service js_;
+
+ std::string host_;
+ std::string user_;
+ std::string key_;
+ std::string port_;
+
+ std::string ssh_bin_;
+ std::string scp_bin_;
+
+ std::vector <std::string> ssh_opt_;
+ std::vector <std::string> scp_opt_;
+
+ std::map <std::string, std::string> ini_;
+ std::map <std::string, std::string> env_;
+
public:
// constructor of the job_service cpi
job_service_cpi_impl (proxy * p,
More information about the saga-devel
mailing list