This section is only for people who is interested in the technical details. Using the Jython script for the complicated math workflow created at Composing workflows as an example, this section tells you how the Jython script works. The complete workflow script is available here.
The first part imports Jython and Java libraries to use. The important classes are InvocationWithNotification, which is an invoker that receives outputs as a notification message, OutputNotificationConsumer, which is a notification consumer to receives outputs, and NotificationSender, which is a class to sends notification messages.
# # This script is automatically generated by X Workflow Composer 0.7.9. # import sys, thread from java.lang import Throwable from java.util import Properties from java.io import FileInputStream from edu.indiana.extreme.xwf.engine.jython import InvokerWithNotification from edu.indiana.extreme.xwf.engine.jython import OutputNotificationConsumer from edu.indiana.extreme.xwf.engine.jython import NotificationSender
The Jython script uses the Java Properties class to handle runtime parameters. Here, it initializes properties with the default variable set by the composer. The parameters include a broker URL and a topic used for notification, input variables, x, y, x_2, and y_2, and the location of WSDL for each service. (Future work: Using port types of the abstract WSDLs, the script dynamically search the available concreate WSDLs and use one of them.)
properties = Properties() # Set up defaut parameter values. properties.setProperty( 'brokerURL', 'rainier.extreme.indiana.edu:12346') properties.setProperty( 'topic', 'test') properties.setProperty( 'x', '2') properties.setProperty( 'y', '3') properties.setProperty( 'x_2', '4') properties.setProperty( 'y_2', '5') # Set up default WSDL URLs. properties.setProperty( 'Adder_wsdl', '') properties.setProperty( 'Adder_2_wsdl', '') properties.setProperty( 'Multiplier_wsdl', '')
This part defines a function to print the usage. This helps to launch the workflow script from the command-line.
def usage(): print ''' Options: -f properties_file -brokerURL value -topic value -x value -y value -x_2 value -y_2 value -Adder_wsdl value -Adder_2_wsdl value -Multiplier_wsdl value ''' sys.exit(0)
This part processes the command-line arguments and set to the properties. Instead of passing parameters one by one, it also accept a parameter file that list all input parameters. The parameter file has to be in Java properties format.
# Process command-line arguments. if sys.argv[0][0] != '-': sys.argv = sys.argv[1:] while sys.argv: if sys.argv[0] == '-f': # Read parameters from a file. propertyFilename = sys.argv[1] inputStream = FileInputStream(propertyFilename) properties.load(inputStream) elif sys.argv[0] == '-brokerURL': properties.put('brokerURL', sys.argv[1]) elif sys.argv[0] == '-topic': properties.put('topic', sys.argv[1]) elif sys.argv[0] == '-x': properties.put('x', sys.argv[1]) elif sys.argv[0] == '-y': properties.put('y', sys.argv[1]) elif sys.argv[0] == '-x_2': properties.put('x_2', sys.argv[1]) elif sys.argv[0] == '-y_2': properties.put('y_2', sys.argv[1]) elif sys.argv[0] == '-Adder_wsdl': properties.put('Adder_wsdl', sys.argv[1]) elif sys.argv[0] == '-Adder_2_wsdl': properties.put('Adder_2_wsdl', sys.argv[1]) elif sys.argv[0] == '-Multiplier_wsdl': properties.put('Multiplier_wsdl', sys.argv[1]) else: usage() sys.argv = sys.argv[2:]
After processing the command-line arguments, first the script initializes a notification sender, "notifier". The notifier is used to send notification messages to let users know the status of the workflow. The first notification is to notify a user that the workflow has started.
topic = properties.getProperty('topic') brokerURL = properties.getProperty('brokerURL') notifier = NotificationSender(brokerURL, topic) notifier.workflowStarted()
After this point, everything is in a try-catch clause so that whenever something fails, it can send notification to notify the user.
This part sets up an OutputNotificationConsumer. The OutputNotificationConsumer listens to all notification messages and stores outputs of all service in the workflow.
try: consumer = OutputNotificationConsumer(topic, brokerURL) consumer.subscribe()
Finally, the script starts invoking services. At the beginning, two Adder services are ready to launch (see the complicated math workflow). The script define a function for each service, e.g. invokeAdder(), and calls the function to invoke the service.
First, the script gets a URL of the WSDL of the service from the properties, and instantiates an invoker, InvokerWithNotification, for this service. Then, it sets method name and input parameters. Since this services does not have any preceding service, it sets the input parameters from Java properties. If the input of the service is from its preceding service, it waits for the preceding service to finish and use the output as the input of the service. (We will see the example later.) Finally, it invokes the service. The invoke() method here is an non-blocking call. The output of the service is handled when necessary either as an input of its succeeding service or as an output of the workflow.
The important point here is that since two services are concurrently executable (there is no dependence between two), the script uses thread to call functions, e.g. thread.start_new_thread(invokeAdder, ()). This is important if an invoking function needs to wait the output from its preceding service (not the case in this example).
# Invoke Adder. Adder_wsdl = properties.getProperty('Adder_wsdl') Adder_invoker = InvokerWithNotification(Adder_wsdl, 'Adder', consumer) def invokeAdder(): Adder_invoker.setOperationName('Run') x_value = properties.getProperty('x') Adder_invoker.setInputParameter('x', x_value) y_value = properties.getProperty('y') Adder_invoker.setInputParameter('y', y_value) print 'Invoking Adder.' Adder_invoker.invoke() thread.start_new_thread(invokeAdder, ()) # Invoke Adder_2. Adder_2_wsdl = properties.getProperty('Adder_2_wsdl') Adder_2_invoker = InvokerWithNotification(Adder_2_wsdl, 'Adder_2', consumer) def invokeAdder_2(): Adder_2_invoker.setOperationName('Run') x_value = properties.getProperty('x_2') Adder_2_invoker.setInputParameter('x', x_value) y_value = properties.getProperty('y_2') Adder_2_invoker.setInputParameter('y', y_value) print 'Invoking Adder_2.' Adder_2_invoker.invoke() thread.start_new_thread(invokeAdder_2, ())
After invoking the two Adder services, the Multiplier service is ready to invoke. Since there is only one service ready to invoke this time, thread is not used to call the invoking function. Another difference is that input parameters are output of preceding services. The getOutput() methods are blocking call. They blocks until the outputs are available. When all the inputs are ready, the Multiplier service is invoked.
# Invoke Multiplier. Multiplier_wsdl = properties.getProperty('Multiplier_wsdl') Multiplier_invoker = InvokerWithNotification(Multiplier_wsdl, 'Multiplier', consumer) def invokeMultiplier(): Multiplier_invoker.setOperationName('Run') x_value = Adder_invoker.getOutput('z') Multiplier_invoker.setInputParameter('x', x_value) y_value = Adder_2_invoker.getOutput('z') Multiplier_invoker.setInputParameter('y', y_value) print 'Invoking Multiplier.' Multiplier_invoker.invoke() invokeMultiplier()
This part handles the output of the workflow specified by an Output component in the composer. (Future work: It might be a good idea to send the output of workflow as notification, especially when the workflow becomes a component of another workflow.)
# Wait output z z_value = Multiplier_invoker.getOutput('z') print 'z = ', z_value
This parts wait all running services to finish. In this example, there is no running service at this point, so there is no need to wait.
# Wait all executing services.
If the workflow finishes normally, it reaches this part. First, it sends a notification to notify users that the workflow has completed successfully. And it does some cleanups.
notifier.workflowCompleted() consumer.unsubscribe() print 'Everything is done successfully.'
If anything goes wrong during the workflow execution, the exception is thrown and caught here. It sends notification with the detailed error message.
except Throwable, e: print 'Error: ', e notifier.workflowIncompleted(e) consumer.unsubscribe()