@@ -17,14 +17,100 @@ from urlparse import urlparse
1717
1818main_redirector = "root://redirector.osgstorage.org"
1919stash_origin = "root://stash.osgconnect.net"
20+ writeback_host = "http://stash-xrd.osgconnect.net:1094"
2021
2122TIMEOUT = 300
2223DIFF = TIMEOUT * 10
2324
25+ def doWriteBack (source , destination ):
26+ """
27+ Do a write back to Stash using SciTokens
28+
29+ :param str source: The location of the local file
30+ :param str destination: The location of the remote file, in stash:// format
31+ """
32+ start1 = int (time .time ()* 1000 )
33+
34+ # Get the scitoken content
35+ scitoken_file = None
36+ if '_CONDOR_CREDS' in os .environ :
37+ # First, look for the scitokens.use file
38+ # Format: _CONDOR_CREDS=/var/lib/condor/execute/dir_908/.condor_creds
39+ scitoken_file = os .path .join (os .environ ['_CONDOR_CREDS' ], 'scitokens.use' )
40+ if not os .path .exists (scitoken_file ):
41+ scitoken_file = None
42+
43+ if not scitoken_file and os .path .exists (".condor_creds/scitokens.use" ):
44+ scitoken_file = ".condor_creds/scitokens.use"
45+
46+ if not scitoken_file :
47+ logging .error ("Unable to find scitokens.use file" )
48+ return 1
49+
50+
51+ with open (scitoken_file , 'r' ) as scitoken_obj :
52+ scitoken_contents = scitoken_obj .read ().strip ()
53+
54+ # Remove the stash:// at the beginning, don't need it
55+ destination = destination .replace ("stash://" , "" )
56+
57+ # Check if the source file is zero-length
58+ statinfo = os .stat (source )
59+ if statinfo .st_size == 0 :
60+ command = "curl -v --connect-timeout 30 --speed-time 5 --speed-limit 1024 -X PUT --fail --upload-file %s -H \" Authorization: Bearer %s\" %s%s" % (source , scitoken_contents , writeback_host , destination )
61+ else :
62+ command = "curl -v --connect-timeout 30 --speed-limit 1024 -X PUT --fail --upload-file %s -H \" Authorization: Bearer %s\" %s%s" % (source , scitoken_contents , writeback_host , destination )
63+
64+ if 'http_proxy' in os .environ :
65+ del os .environ ['http_proxy' ]
66+
67+ logging .debug ("curl command: %s" % command )
68+ curl = subprocess .Popen ([command ],shell = True ,stdout = subprocess .PIPE ,stderr = subprocess .PIPE )
69+ (stdout , stderr ) = curl .communicate ()
70+ curl_exit = curl .returncode
71+ if statinfo .st_size == 0 and curl_exit == 28 :
72+ logging .debug ("Got curl exit code 28, but that's ok for zero-length files. This doesn't capture connection timeouts" )
73+ curl_exit = 0
74+ elif curl_exit != 0 :
75+ logging .error (stdout )
76+ logging .error (stderr )
77+
78+ sitename = os .environ .setdefault ("OSG_SITE_NAME" , "siteNotFound" )
79+ end1 = int (time .time ()* 1000 )
80+ # Send the payload
81+ payload = {
82+ 'filename' : source ,
83+ 'sitename' : sitename ,
84+ 'timestamp' : end1 ,
85+ 'host' : writeback_host ,
86+ 'upload_size' : os .stat (source ).st_size ,
87+ 'status' : 'Success' ,
88+ 'tries' : 1 ,
89+ 'start1' : start1 ,
90+ 'end1' : end1 ,
91+ 'cache' : 'None' ,
92+ 'writeback' : 'True'
93+ }
94+
95+ payload .update (parse_job_ad ())
96+
97+ if curl_exit != 0 :
98+ payload ['status' ] = "Failure"
99+
100+ es_send (payload )
101+ return curl_exit
102+
24103
25104def doStashCpSingle (sourceFile , destination , cache , debug = False ):
26105
27- #cache=get_best_stashcache()
106+
107+ # Check if the desitnation is a protocol like stash:///user/blah
108+ if destination .startswith ("stash://" ):
109+ # Source file exists, must be a writeback
110+ return doWriteBack (sourceFile , destination )
111+
112+ if not cache :
113+ cache = get_best_stashcache ()
28114 logging .debug ("Using Cache %s" , cache )
29115
30116 sitename = os .environ .setdefault ("OSG_SITE_NAME" , "siteNotFound" )
@@ -37,7 +123,8 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
37123 payload ['filename' ] = sourceFile
38124 payload ['sitename' ] = sitename
39125 payload .update (parse_job_ad ())
40-
126+
127+
41128 # Calculate the starting time
42129 start1 = int (time .time ()* 1000 )
43130
@@ -73,7 +160,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
73160
74161 return 0
75162
76- except IOError , e :
163+ except IOError as e :
77164 logging .error ("Unable to copy with CVMFS, even though file exists: %s" , str (e ))
78165
79166 else :
@@ -364,18 +451,11 @@ def main():
364451 source = opts [0 ]
365452 destination = opts [1 ]
366453
367-
454+ cache = None
368455 # Check for manually entered cache to use
369456 if args .cache and len (args .cache ) > 0 :
370457 cache = args .cache
371- else :
372- cache = get_best_stashcache ()
373-
374- if not source .startswith ('/' ):
375- logging .warning ("DEPRECIATED: The source path does not begin with a '/', but it is required. This functionality will be removed in an upcoming release" )
376- source = "/" + source
377-
378-
458+
379459 if not args .recursive :
380460 result = doStashCpSingle (sourceFile = source , destination = destination , cache = cache , debug = args .debug )
381461 else :
0 commit comments