Sunday, July 24, 2016

Inet Source Operator

Objective

In my first stream program with infosphere, I read from a URL. In infosphere to do this, there is an operator called InetSourceOperator.

Stream processing language file will look like below.



namespace eduwatch;
use com.ibm.streamsx.inet::InetSource ;

composite Main {
type StockQuote = tuple<rstring name,rstring ticker,int64 volume,float64 previous,
float64 change,float64 dayHigh,float64 dayLow,float64 peRatio,float64 pegRatio>;
graph
stream<rstring rawQuote> DjiaRaw = InetSource()
{
param
URIList : [
"http://download.finance.yahoo.com/d/quotes.csv?s=pg+ko+AAPL+goog&f=nsvpclghrr5&e=.csv"
];
incrementalFetch: false;
fetchInterval : 7.0;
punctPerFetch: true;
() as rawQuote = FileSink(DjiaRaw)
{
param 
file : "rawQuote.csv"; format : csv;
writePunctuations: true; flush: 1u;
}


}



Sample Output

Punctuation received: WindowMarker
"\"Procter & Gamble Company (The) \",\"pg\",6165618,85.26,\"+0.46 - +0.54%\",\"4:00pm - <b>85.72</b>\",85.45,85.98,27.02,4.01"
"\"Coca-Cola Company (The) Common \",\"ko\",8836148,45.45,\"+0.38 - +0.84%\",\"4:00pm - <b>45.83</b>\",45.46,45.84,27.59,5.25"
"\"Apple Inc.\",\"AAPL\",28313669,99.43,\"-0.77 - -0.77%\",\"4:00pm - <b>98.66</b>\",98.31,99.30,10.98,1.30"
"\"Alphabet Inc.\",\"goog\",1259823,738.63,\"+4.11 - +0.56%\",\"4:00pm - <b>742.74</b>\",736.56,743.24,30.22,1.29"
""
Punctuation received: WindowMarker
"\"Procter & Gamble Company (The) \",\"pg\",6165618,85.26,\"+0.46 - +0.54%\",\"4:00pm - <b>85.72</b>\",85.45,85.98,27.02,4.01"
"\"Coca-Cola Company (The) Common \",\"ko\",8836148,45.45,\"+0.38 - +0.84%\",\"4:00pm - <b>45.83</b>\",45.46,45.84,27.59,5.25"
"\"Apple Inc.\",\"AAPL\",28313669,99.43,\"-0.77 - -0.77%\",\"4:00pm - <b>98.66</b>\",98.31,99.30,10.98,1.30"
"\"Alphabet Inc.\",\"goog\",1259823,738.63,\"+4.11 - +0.56%\",\"4:00pm - <b>742.74</b>\",736.56,743.24,30.22,1.29"
""
Punctuation received: WindowMarker
"\"Procter & Gamble Company (The) \",\"pg\",6165618,85.26,\"+0.46 - +0.54%\",\"4:00pm - <b>85.72</b>\",85.45,85.98,27.02,4.01"

Reference

Friday, July 22, 2016

My first stream

One of the big feature of big data is to do analytics on "data in motion". In this big dynamic world change is the only constant. Similar thing happens to the data also. How to proceed ? Infosphere streams are there to encourage us.

  1. First i have to create a constant source of data.
  2. To create constant source of data, I look at the stock market.
  3. This URL has stock market data.
  4. Now i have to create the program.
In this program two important things are used Topology and TStream
 
package project1;


import org.jsoup.Connection;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;


import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.Topology;
import com.ibm.streamsx.topology.context.StreamsContextFactory;


public class StockStreaming {



/**
* The topology object contains information about the structure of our graph
* (that is, our application), including how the data is generated and
* processed
*/
private static Topology topology = new Topology("stockSensor");


private static Connection connection = Jsoup.connect(URL);
private static Element doc = null;
private static Element quoteSummary = null;


/**
* The endlessSource()method will repeatedly call the function’s overridden
* get()method and return the stock price each time.
*
* TStream
*/
private static TStream<String> readings = topology.endlessSource(() -> {
try {
doc = connection.get();
quoteSummary = doc.getElementById("yfi_rt_quote_summary");
StringBuilder builder = new StringBuilder();
builder.append(getValueByClass(quoteSummary, "time_rtq"));
builder.append(" ");
builder.append(getValueByClass(quoteSummary, "time_rtq_ticker"));
builder.append(" ");
builder.append(getValueByClass(quoteSummary, "up_g time_rtq_content"));
return builder.toString();
} catch (Throwable th) {
th.printStackTrace();
return "ERR :" + th.getMessage();
}
});


public static void main(String[] args) throws Exception {
readings.print();
StreamsContextFactory.getEmbedded().submit(topology);


}


public static String getValueByClass(Element quoteSummary, String filter) {
Elements lastPriceElement = quoteSummary.getElementsByClass(filter);
if (lastPriceElement != null) {
return lastPriceElement.text();


}
return "";
}


}

  • Program output (It is printing stock price and time tirelessly (my stream of data)

10:08am 2,830.00
10:08am 2,830.00
10:09am 2,830.00
10:09am 2,830.00
10:09am 2,830.00



  • I do  not want to see the same repeated values.

if (previousValue.equals(current)) {
return "NC";
} else {
previousValue = current;
return builder.toString();
}


After this checking, it printed like below.
NC
NC
10:18am 2,830.05
NC


This is not an elegant way.  Streaming has lot of functions like filter, transform, sink. Can they help me? Could I filter out those which are not changed.


Step 1: Created a sample java object to store values.
private String time;
private String price;
private boolean up; // it indicates whether stock is going up (true) or going down (false) based on last previous value.


Earlier I have readings stream now I added a filter on top of that.
static TStream<StockTuple> filteredStock = readings.filter(stock -> {
if (stock.getPrice() != null){
if (!previousValue.equals(stock.getPrice())) {
// price changed
previousValue = stock.getPrice();
return true;
}
}
return false;
});


Sample Output:
StockTuple [time=1:18pm, price=2,842.00, up=false]
StockTuple [time=1:18pm, price=2,840.10, up=false]
StockTuple [time=1:18pm, price=2,841.75, up=false]
StockTuple [time=1:18pm, price=2,840.10, up=false]
StockTuple [time=1:18pm, price=2,841.60, up=false]


So now it is only printing when there is price change. Wait, I forgot to set up field, default value is false, so all values are false. But this is a small change.


static TStream<StockTuple> filteredStock = readings.filter(stock -> {
if (stock.getPrice() != null) {
if (!previousValue.equals(stock.getPrice())) {
// price changed
try {
float prev = Float.parseFloat(previousValue.trim().replace(",",""));
float curr = Float.parseFloat(stock.getPrice().trim().replace(",", ""));
stock.setUp(curr > prev);
} catch (NumberFormatException ignore) {
ignore.printStackTrace();
}
previousValue = stock.getPrice();
return true;
}
}
return false;
});


Sample output:
StockTuple [time=1:30pm, price=2,841.15, up=true]
StockTuple [time=1:30pm, price=2,841.00, up=false]
StockTuple [time=1:30pm, price=2,841.80, up=true]
StockTuple [time=1:31pm, price=2,841.75, up=false]
StockTuple [time=1:31pm, price=2,841.00, up=false]
StockTuple [time=1:31pm, price=2,841.65, up=true]


Now some unwise strategy (never do this in real market, promise )


If there are ten consecutive up , sale the stock.
If there are ten consecutive down, purchase the stock.
Otherwise do nothing.

How to implement this?

static TStream<StockTupleWithDecision> transformedStock = filteredStock.transform(new Function<StockTuple,StockTupleWithDecision>() {
boolean[] direction = new boolean[10];
int directionIndex = 0;

/**
*
*/
private static final long serialVersionUID = 6605874629186401010L;

@Override
public StockTupleWithDecision apply(StockTuple v) {
StockTupleWithDecision decision = new StockTupleWithDecision();
decision.setPrice(v.getPrice());
decision.setTime(v.getTime());
decision.setUp(v.isUp());
if (!(directionIndex < direction.length - 1)){
for (int i = 1; i < direction.length; i++) {
if (direction[i - 1] != direction[i]){
// mix of up and down -> No Action
decision.setDecision("No Action");
break;
}
direction[i - 1] = direction[i];
}
if (decision.getDecision()== null){
if (v.isUp()){
// price is going up
decision.setDecision("SALE");
} else {
// price is going down
decision.setDecision("BUY");
}
}
}  else {
// less than 10
decision.setDecision("Watching");
}
direction[directionIndex] = v.isUp();
directionIndex = ++directionIndex % direction.length;
return decision;
}
});

Sample output:
StockTuple [time=11:03, price=55.06, up=false] (;-) Watching
StockTuple [time=11:04, price=55.08, up=true] (;-) Watching
StockTuple [time=11:04, price=55.09, up=true] (;-) Watching
StockTuple [time=11:04, price=55.10, up=true] (;-) Watching
StockTuple [time=11:04, price=55.12, up=true] (;-) Watching
StockTuple [time=11:05, price=55.09, up=false] (;-) Watching
StockTuple [time=11:05, price=55.10, up=true] (;-) Watching
StockTuple [time=11:05, price=55.09, up=false] (;-) Watching
StockTuple [time=11:05, price=55.11, up=true] (;-) Watching
StockTuple [time=11:05, price=55.10, up=false] (;-) No Action

Market is like that, keep on watching, or change your action and then see the reaction.