IBM Data Engine is IBM's serverless SQL service on data in Cloud Object Storage. It allows the user to run ANSI SQL on Parquet, CSV and JSON data sets. It is based on Apache Spark SQL as the query engine in the background. It can also be used to pre-process and analyze the log archives that LogDNA writes. This notebook uses the the Python SDK from IBM Data Engine. Further details on the Python SDK could be found in this Git repository.
LogDNA is a cloud-based log management software that aggregates all system and application logs in one centralized logging system. LogDNA keeps your logs only for a limited period of time.
However, LogDNA can be configured to export logs from LogDNA to IBM Cloud Object Storage. Archived logs are in JSON format and preserve metadata associated with each line. Logs will be exported daily in a compressed format (.json.gz
).
This notebook gives an overview on the Data Engine features that help with preparing the log archives for further analysis. Furthermore it shows how to query the logs with SQL to filter out the "noise" in your data. The start of the notebook shows how to work with your own LogDNA data and the chapter 5 focuses on sample ingress log data.
Run the following cell at least once in your notebook environment in order to install required packages, such as the Data Engine client library:
!pip -q install ibmcloudsql
!pip -q install sqlparse
import ibmcloudsql
from pixiedust.display import *
import pandas as pd
logDNADump=''
targetUrl=''
logData=''
Pixiedust database opened successfully Table VERSION_TRACKER created successfully Table METRICS_TRACKER created successfully Share anonymous install statistics? (opt-out instructions) PixieDust will record metadata on its environment the next time the package is installed or updated. The data is anonymized and aggregated to help plan for future releases, and records only the following values: { "data_sent": currentDate, "runtime": "python", "application_version": currentPixiedustVersion, "space_id": nonIdentifyingUniqueId, "config": { "repository_id": "https://github.com/ibm-watson-data-lab/pixiedust", "target_runtimes": ["Data Science Experience"], "event_id": "web", "event_organizer": "dev-journeys" } } You can opt out by calling pixiedust.optOut() in a new cell.
Pixiedust runtime updated. Please restart kernel
Table USER_PREFERENCES created successfully
Table service_connections created successfully
All Resources
selected as resource group. In the section Services you can see your instances of Data Engine and Cloud Object Storage. Select the instance of Data Engine that you want to use. In the Data Engine dashboard page that opens up you find a section titled REST API with a button labelled Instance CRN. Click the button to copy the CRN into your clipboard and paste it here into the notebook. If you don't have an Data Engine instance created yet, create one first.cos://<endpoint>/<bucket>/[<prefix>]
. You have the option to use the Cloud Object Storage bucket that is associated with your project. In this case, execute the following section before you proceed. For more background information, check out the Data Engine documentation.
import getpass
import sqlparse
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import HtmlFormatter, Terminal256Formatter
from IPython.core.display import display, HTML
apikey=getpass.getpass('Enter IBM Cloud API Key (leave empty to use previous one): ') or apikey
instancecrn=input('Enter Data Engine Instance CRN (leave empty to use previous one): ') or instancecrn
sqlClient = ibmcloudsql.SQLQuery(apikey, instancecrn, client_info='Data Engine Starter Notebook')
sqlClient.logon()
print('\nYour Data Engine web console link:')
sqlClient.sql_ui_link()
print('\n')
# Specify where to write write query results:
if targetUrl == '':
targetUrl=input('Enter target URL for SQL results (like cos://us-south/resultBucket/notebookResults): ')
else:
targetUrl=input('Enter target URL for SQL results (leave empty to use ' + targetUrl + '): ') or targetUrl
# Specify where the location of logdna dumps:
if logDNADump == '':
logDNADump=input('Enter URL for LogDNA archive data (like cos://us-south/archiveBucket): ')
else:
logDNADump=input('Enter URL for LogDNA archive data (leave empty to use ' + logDNADump + '): ') or logDNADump
# Specify where to find the preprocessed log data:
if logData == '':
logData=input('Enter URL where to store preprocessed log data (like cos://us-south/preprocessedBucket): ')
else:
logData = input('\nEnter URL where to store preprocessed log data (leave empty to use ' + logData + '): ') or logData
# For pretty-printing SQL statements
def format_sql(sql):
formatted_sql = sqlparse.format(sql, reindent=True, indent_tabs=True, keyword_case='upper')
lexer = get_lexer_by_name("sql", stripall=True)
formatter = Terminal256Formatter(style='tango')
return (highlight(formatted_sql, lexer, formatter))
Enter IBM Cloud API Key (leave empty to use previous one): ········ Enter SQL Query Instance CRN (leave empty to use previous one): crn:v1:bluemix:public:sql-query:us-south:a/23f1e9853c41f6b566e71689ed8a1363:c49157a5-b308-4f6b-972c-455082a8f47e:: Your SQL Query web console link: https://sql-query.cloud.ibm.com/sqlquery/?instance_crn=crn:v1:bluemix:public:sql-query:us-south:a/23f1e9853c41f6b566e71689ed8a1363:c49157a5-b308-4f6b-972c-455082a8f47e:: Enter target URL for SQL results (like cos://us-south/resultBucket/notebookResults): cos://us-south/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud Enter URL for LogDNA archive data (like cos://us-south/archiveBucket): cos://us-south/cos-standard-6il/a2bea9c45b.2019-11-19.76.ld76.json.gz Enter URL where to store preprocessed log data (like cos://us-south/preprocessedBucket): cos://us-south/cos-standard-6il/a2bea9c45b.2019-11-19.76.ld76.json.gz
You first have to configure LogDNA to archive the log files to IBM Cloud Object Store. Once archiving is configured for your account, your logs will be exported on a daily or hourly basis in a compressed format (.json.gz
).
It is possible to query json.gz
with Data Engine directly. However, we would not recommend this format as it limits Spark's possibility to process the file in parallel. See How to Layout Big Data in IBM Cloud Object Storage for Spark SQL for details.
To improve query performance the data can be partitioned in more manageable junks, for example by hour or application. This allows Spark to skip whole partitions if the respective data is not queried.
Note: The following just demonstrates how the preparation of log files would look like AQL-wise. You probably want to run this regularly as an automated task on your LogDNA dumps to have the data prepared for log analysis.
sqlClient.logon()
sql = "SELECT *, " + \
"date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'yyyy') AS _year, " + \
"date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'D') AS _dayofyear, " + \
"date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'HH') AS _hour " + \
"FROM {} STORED AS JSON " + \
"INTO {} STORED AS JSON PARTITIONED BY (_year, _dayofyear, _hour)"
sql = sql.format(logDNADump, logData)
print(format_sql(sql))
jobId = sqlClient.submit_sql(sql)
print("Data Engine submitted and running in the background. Could take some time depending on the size of your archive data. jobId = " + jobId)
job_status = sqlClient.wait_for_job(jobId)
print("\nJob " + jobId + " finished with status: " + job_status)
if job_status == 'failed':
details = sqlClient.get_job(jobId)
print("\nError: {}\nError Message: {}".format(details['error'], details['error_message']))
print("\nResult stored in: " + sqlClient.get_job(jobId)['resultset_location'])
SELECT *, date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'yyyy') AS _year, date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'D') AS _dayofyear, date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'HH') AS _hour FROM cos://us-south/cos-standard-6il/a2bea9c45b.2019-11-19.76.ld76.json.gz STORED AS JSON INTO cos://us-south/cos-standard-6il/a2bea9c45b.2019-11-19.76.ld76.json.gz STORED AS JSON PARTITIONED BY (_year, _dayofyear, _hour) SQL query submitted and running in the background. Could take some time depending on the size of your archive data. jobId = f408ca6c-6eac-4878-aa36-467c7f22bb50 Job f408ca6c-6eac-4878-aa36-467c7f22bb50 finished with status: completed Result stored in: cos://s3.us-south.objectstorage.softlayer.net/cos-standard-6il/a2bea9c45b.2019-11-19.76.ld76.json.gz/jobid=f408ca6c-6eac-4878-aa36-467c7f22bb50
Now your preprocessed log data is ready for analysis. But before we get started, here is a quick overview on how to work with Data Engine compared to the LogDNA UI to perform tasks that you need for log analysis like filtering or searching log records.
WHERE _source._host IN("source1", "source2")
WHERE _source._app IN("app1", "app2")
WHERE _source.level IN("level1", "level2")
WHERE array_contains(_source._tag, "someTag")
<name of parsed field>:<some value>
<name of parsed field>:==<some value>
<name of parsed field>:=<some value>
<name of parsed field>:===<some value>
<name of parsed field>:[value1, value2]
<name of parsed field>:*
LIKE
operator and wildcards on specific fields.WHERE _source.nameOfParsedField LIKE "SomeValue%"
WHERE _source.nameOfParsedField LIKE "%SomeValue%"
LOWER()
. Make sure to use a lower case search pattern in this case, for example:WHERE LOWER(_source.nameOfParsedField) LIKE "%somevalue%"
WHERE _source.nameOfParsedField LIKE "value1%" OR _source.nameOfParsedField LIKE "value2%"
WHERE _source.nameOfParsedField IS NOT NULL
today at 11am
or a timeframe, e.g. last fri 4:30p to 11/12 1 AM
. _source._ts
which contains a timestamp in milli-second granularity in UTC. Find an example below on how to convert a local datetime string into a timestamp that you can use in your query.from datetime import datetime, timezone, timedelta
from dateutil import tz
# For example, we want to convert this date and time string
jumpToTime = "2019-8-17 21:54:55"
# Adjust for your timezone
#input_timezone = tz.gettz('America/New_York')
input_timezone = tz.gettz('Europe/Berlin')
jumpToUTC = datetime.strptime(jumpToTime, '%Y-%m-%d %H:%M:%S')\
.replace(tzinfo=input_timezone) \
.astimezone(tz=timezone.utc)
year = datetime.strftime(jumpToUTC, '%Y')
dayofyear = datetime.strftime(jumpToUTC, '%j')
hour = datetime.strftime(jumpToUTC, '%H')
jumpToTimestamp = jumpToUTC.timestamp() * 1000
print("WHERE _source._ts = {} AND _year = '{}' AND _dayofyear = '{}' AND _hour = {}".format(jumpToTimestamp, year, dayofyear, hour))
WHERE _source._ts = 1566071695000.0 AND _year = '2019' AND _dayofyear = '229' AND _hour = 19
sqlClient.logon()
sql = "SELECT _source._app AS application , _source._host AS source " + \
"FROM {} STORED AS JSON " + \
"GROUP BY application, source " + \
"ORDER BY application, source " + \
"INTO {} STORED AS CSV"
sql = sql.format(logData, targetUrl)
print(format_sql(sql))
result_df = sqlClient.run_sql(sql)
if isinstance(result_df, str):
print(result_df)
result_df.head(4)
SELECT _source._app AS application, _source._host AS SOURCE FROM cos://us-south/cos-standard-6il/a2bea9c45b.2019-11-19.76.ld76.json.gz STORED AS JSON GROUP BY application, source ORDER BY application, source INTO cos://us-south/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud STORED AS CSV
application | source | |
---|---|---|
0 | LogDNA Sample App | LogDNA Sample |
Schema information is needed to know which fields are available for querying. You can look at the LogDNA UI and expand the respective log line, there you find the field names.
As a general rule:
With Data Engine schema information can be retrieved by using DESCRIBE. Running DESCRIBE on the whole log file results in a schema that gives you the fields for all applications. To get a more manageable schema file, first retrieve the logs for the application you are interested in and then run DESCRIBE on it. This returns a JSON object. Then you can switch to the Data Engine UI to view its content. Alternatively you can run FLATTEN before DESCRIBE to flatten the nested JSON and then retrieve the result to view in the notebook.
sqlClient.logon()
sql = "SELECT * " + \
"FROM {} STORED AS JSON " + \
"INTO {} STORED AS JSON"
sql = sql.format(logData, targetUrl)
print(format_sql(sql))
jobid_ingress = sqlClient.submit_sql(sql)
job_status = sqlClient.wait_for_job(jobid_ingress)
if job_status == 'failed':
details = sqlClient.get_job(jobid_ingress)
print("Error: {}\nError Message: {}".format(details['error'], details['error_message']))
print("jobId: " + jobid_ingress + "\n")
details = sqlClient.get_job(jobid_ingress)
ingress_records = details['resultset_location']
sql = "SELECT * FROM DESCRIBE(FLATTEN({} STORED AS JSON)) " + \
"INTO {} STORED AS JSON"
sql = sql.format(ingress_records, targetUrl)
print(format_sql(sql))
jobid_schema = sqlClient.submit_sql(sql)
job_status = sqlClient.wait_for_job(jobid_schema)
if job_status == 'failed':
details = sqlClient.get_job(jobid_schema)
print("Error: {}\nError Message: {}".format(details['error'], details['error_message']))
print("jobId: " + jobid_schema)
print('\nUse Data Engine UI to view the results')
sqlClient.sql_ui_link()
SELECT * FROM cos://us-south/cos-standard-6il/a2bea9c45b.2019-11-19.76.ld76.json.gz STORED AS JSON INTO cos://us-south/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud STORED AS JSON jobId: e5323ba6-5e62-4c2b-bcd6-ee5cae7a177f SELECT * FROM DESCRIBE(FLATTEN(cos://s3.us-south.objectstorage.softlayer.net/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud/jobid=e5323ba6-5e62-4c2b-bcd6-ee5cae7a177f STORED AS JSON)) INTO cos://us-south/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud STORED AS JSON jobId: 81abb90e-de7d-44d6-a44d-022e518455dd Use SQL Query UI to view the results https://sql-query.cloud.ibm.com/sqlquery/?instance_crn=crn:v1:bluemix:public:sql-query:us-south:a/23f1e9853c41f6b566e71689ed8a1363:c49157a5-b308-4f6b-972c-455082a8f47e::
print('\nSchema information:')
result_schema = sqlClient.get_result(jobid_schema)
sqlClient.get_result(jobid_schema)
Schema information:
name | nullable | type | |
---|---|---|---|
0 | _source__app | True | string |
1 | _source__env | True | string |
2 | _source__file | True | string |
3 | _source__host | True | string |
4 | _source__ip | True | string |
5 | _source__lid | True | string |
6 | _source__line | True | string |
7 | _source__logtype | True | string |
8 | _source__ts | True | long |
9 | _source_acct | True | string |
10 | _source_addr | True | string |
11 | _source_auid | True | long |
12 | _source_cipher | True | string |
13 | _source_comm | True | string |
14 | _source_direction | True | string |
15 | _source_exe | True | string |
16 | _source_fp | True | string |
17 | _source_grantors | True | string |
18 | _source_hostname | True | string |
19 | _source_kind | True | string |
20 | _source_ksize | True | long |
21 | _source_laddr | True | string |
22 | _source_level | True | string |
23 | _source_lport | True | long |
24 | _source_mac | True | string |
25 | _source_msg | True | string |
26 | _source_name | True | string |
27 | _source_old-auid | True | long |
28 | _source_old-ses | True | long |
29 | _source_pfs | True | string |
30 | _source_pid | True | long |
31 | _source_res | True | string |
32 | _source_rport | True | long |
33 | _source_ses | True | long |
34 | _source_spid | True | long |
35 | _source_subj | True | string |
36 | _source_suid | True | long |
37 | _source_terminal | True | string |
38 | _source_tty | True | string |
39 | _source_type | True | string |
40 | _source_uid | True | long |
41 | _source_unit | True | string |
In the following we will use Data Engine to analyze the ingress log records of a test system's Kubernetes. First let's retrieve the request counts for each worker node along with the average request time and http
status codes.
Hints:
# Get day of year from date string
from datetime import datetime
dayofyear = datetime.strptime("2019-08-18", '%Y-%m-%d').timetuple().tm_yday
print(dayofyear)
230
sqlClient.logon()
# we switch now to Data Engine sample preprocessed data for the following queries
logData = 'cos://us-geo/sql/LogDNA'
# targetUrl =
sql = "SELECT _source._status AS status, COUNT(_source._status) AS status_count, _source.node AS node, " + \
"AVG(_source.request_time) AS request_time, " + \
"concat(date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd'), ' ', _hour, ':00:00') AS _datetime " + \
"FROM {} STORED AS JSON " + \
"WHERE _source._app = 'nginx-ingress' AND _source._status IS NOT NULL AND _year = 2019 AND _dayofyear BETWEEN 223 and 230 " + \
"GROUP BY _year, _dayofyear, _hour, _datetime, _source.node, _source._status " + \
"INTO {} STORED AS CSV "
sql = sql.format(logData, targetUrl)
print(format_sql(sql))
jobid = sqlClient.submit_sql(sql)
print("jobId: " + jobid)
SELECT _source._status AS status, COUNT(_source._status) AS status_count, _source.node AS node, AVG(_source.request_time) AS request_time, concat(date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd'), ' ', _hour, ':00:00') AS _datetime FROM cos://us-geo/sql/LogDNA STORED AS JSON WHERE _source._app = 'nginx-ingress' AND _source._status IS NOT NULL AND _year = 2019 AND _dayofyear BETWEEN 223 and 230 GROUP BY _year, _dayofyear, _hour, _datetime, _source.node, _source._status INTO cos://us-south/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud STORED AS CSV jobId: 4ac934da-6b45-46b1-a805-afda5f69fe3c
sqlClient.logon()
job_status = sqlClient.wait_for_job(jobid)
if job_status == 'failed':
details = sqlClient.get_job(jobid)
print("Error: {}\nError Message: {}".format(details['error'], details['error_message']))
result_df = sqlClient.get_result(jobid)
# Adjust datatypes for Pixiedust
result_df['datetime'] = pd.to_datetime(result_df['_datetime'])
result_df['status'] = result_df['status'].apply(str)
#print(result_df.head())
#print(result_df.info())
Now let's plot the data ...
from pixiedust.display import *
display(result_df)
Ok, that's too many data points. Let's resample the data. I chose 2 data points a day, but you can easily adjust the freq parameter.
summary = result_df.groupby([pd.Grouper(key='datetime', freq='12H'), 'node', 'status']) \
.agg({'status_count':'sum', 'request_time': 'mean'})
summary.reset_index(inplace=True)
display(summary)
Now, let's look at the HTTP status codes. Here we are probably most interested in failed requests.
daily_summary = result_df.groupby([pd.Grouper(key='datetime', freq='D'), 'node', 'status']) \
.agg({'status_count':'sum', 'request_time': 'mean'})
daily_summary.reset_index(inplace=True)
display(daily_summary)
On 2019-08-17
we see a lot of server errors (500
). To find out more about the failing requests, we retrieve the request URI info. For that we need to make some adjustments to the request URIs to not get flooded with groups, e.g. not found requests can have all sorts of request URIs, so map all of them to INVALID. In our data we use unique job IDs, CRNs or insåtance IDs as part of our request URIs to request information on these specific artifacts. However, for now we are only interested in the general request type, so we map these as well to a common URI. See the query below.
sqlClient.logon()
# logData =
# targetUrl =
sql = "SELECT _source._status AS status, " + \
"COUNT(_source._status) AS status_count, " + \
"_source.node AS node, " + \
"CASE " + \
"WHEN _source.request_uri RLIKE '/v2/sql_jobs/[^\S]+' THEN '/v2/sql_jobs/jobid' " + \
"WHEN _source.request_uri RLIKE '/active_instance/[^\S]+' THEN '/active_instance/instanceid' " + \
"WHEN _source.request_uri RLIKE '/v2/service_instances/[^\S]+' THEN '/v2/service_instances/crn' " + \
"WHEN _source.request_uri RLIKE '/dashboard/[^\S]+' THEN '/dashboard/id' " + \
"WHEN _source._status = 404 THEN 'INVALID' " + \
"ELSE _source.request_uri " + \
"END AS request_uri, " + \
"AVG(_source.request_time) AS request_time, " + \
"concat(date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd'), ' ', _hour, ':00:00') AS _datetime " + \
"FROM {} STORED AS JSON " + \
"WHERE _source._app = 'nginx-ingress' AND _source._status IS NOT NULL AND _dayofyear = 229 " + \
"GROUP BY _year, _dayofyear, _hour, _datetime, _source.node, _source._status, request_uri " + \
"INTO {} STORED AS CSV"
sql = sql.format(logData, targetUrl)
print(format_sql(sql))
jobid = sqlClient.submit_sql(sql)
print("jobId: " + jobid)
SELECT _source._status AS status, COUNT(_source._status) AS status_count, _source.node AS node, CASE WHEN _source.request_uri RLIKE '/v2/sql_jobs/[^\S]+' THEN '/v2/sql_jobs/jobid' WHEN _source.request_uri RLIKE '/active_instance/[^\S]+' THEN '/active_instance/instanceid' WHEN _source.request_uri RLIKE '/v2/service_instances/[^\S]+' THEN '/v2/service_instances/crn' WHEN _source.request_uri RLIKE '/dashboard/[^\S]+' THEN '/dashboard/id' WHEN _source._status = 404 THEN 'INVALID' ELSE _source.request_uri END AS request_uri, AVG(_source.request_time) AS request_time, concat(date_format(from_unixtime(_source._ts / 1000, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd'), ' ', _hour, ':00:00') AS _datetime FROM cos://us-geo/sql/LogDNA STORED AS JSON WHERE _source._app = 'nginx-ingress' AND _source._status IS NOT NULL AND _dayofyear = 229 GROUP BY _year, _dayofyear, _hour, _datetime, _source.node, _source._status, request_uri INTO cos://us-south/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud STORED AS CSV jobId: a4c6649f-246e-4a57-b7db-b182e9964152
sqlClient.logon()
job_status = sqlClient.wait_for_job(jobid)
if job_status == 'failed':
details = sqlClient.get_job(jobid)
print("Error: {}\nError Message: {}".format(details['error'], details['error_message']))
resultRange = sqlClient.get_result(jobid)
# Adjust datatypes for Pixiedust
resultRange['datetime'] = pd.to_datetime(resultRange['_datetime'])
resultRange['status'] = resultRange['status'].apply(str)
#print(resultRange.head())
#print(resultRange.info())
display(resultRange)
So we started to see many 500s at 10am and kept getting them until 9pm. Now we also want to know which request types encountered these 500 errors.
display(resultRange)
So much for that. Now we have a look at the request times.
display(daily_summary)
These are pretty high response times. However, we retrieved average request times and averages are affected by outliers, especially when probably most response times are sub-second and then you have some that take minutes. So let's just look at the long-running ones.
display(daily_summary)
So we have about 20 to 120 long-running requests every day returning 101. So for now we take out these requests, to see the average request times for the majority of the requests on the system. And that looks much better, now we see the expected sub-second response times:
display(daily_summary)
In the following we will give some examples on how to benefit from Data Engine's timeseries support when analyzing ingress log records. First we will show a simple example of retrieving the average request time per hour.
We first create a time-series for each combination of node and status with the following statement:
select
_source.node as node,
_source._status as status,
time_series_da(_source._ts, array(_source._status, _source.request_time)) as ts
from cos://us-geo/sql/LogDNA STORED AS JSON
WHERE
_source._app = 'nginx-ingress' AND
_source._status = 200 AND
_source._status IS NOT NULL AND
_year = 2019 AND
_dayofyear BETWEEN 224 and 227
group by _source.node, _source._status
In Data Engine, we treat Time-Series datatypes as first class citizens, so they can be stored as any other data type.
Using the created time-series, we can peform segmentation, in this case time based segmentation (hourly) per time-series and flatten the series which will split each segment of each time-series into its own row. This can be done with the below statement:
select
node,
status,
ts_flatten(ts_segment_by_time(ts, 3600000 , 3600000)) as (segment_id, segment_index, segment)
from da_ts_table
Finally, using the output windows from the above statement, we can reduce each window to a single value, in this case, the average per window. The following is how that is done:
select
node,
status,
segment_id,
segment_index,
ts_avg(ts_index(segment, 1)) as avg_request_time
from segments_table
sqlClient.logon()
logData = 'cos://us-geo/sql/LogDNA'
sql = "with " + \
"da_ts_table as ( " + \
"select " + \
"_source.node as node, " + \
"_source._status as status, " + \
"time_series_da(_source._ts, array(_source._status, _source.request_time)) as ts " + \
"from {} STORED AS JSON " + \
"WHERE " + \
"_source._app = 'nginx-ingress' AND " + \
"_source._status = 200 AND " + \
"_year = 2019 AND " + \
"_dayofyear between 224 and 229 " + \
"group by _source.node, _source._status " + \
"), " + \
"segment_table as ( " + \
"select " + \
"node, " + \
"status, " + \
"ts_flatten(ts_segment_by_time(ts, 3600000 , 3600000)) as (segment_id, segment_index, segment) " + \
"from da_ts_table " + \
") " + \
"select " + \
"node, " + \
"status, " + \
"segment_id, " + \
"segment_index, " + \
"ts_avg(ts_index(segment, 1)) as request_time, " + \
"from_unixtime(segment_id / 1000, 'yyyy-MM-dd HH:mm:ss') AS datetime " + \
"from segment_table INTO " + \
"{} STORED AS CSV"
sql = sql.format(logData, targetUrl)
print(format_sql(sql))
jobid = sqlClient.submit_sql(sql)
print("jobId: " + jobid)
WITH da_ts_table AS ( SELECT _source.node AS node, _source._status AS status, time_series_da(_source._ts, array(_source._status, _source.request_time)) AS ts FROM cos://us-geo/sql/LogDNA STORED AS JSON WHERE _source._app = 'nginx-ingress' AND _source._status = 200 AND _year = 2019 AND _dayofyear between 224 and 229 group by _source.node, _source._status ), segment_table as ( select node, status, ts_flatten(ts_segment_by_time(ts, 3600000 , 3600000)) as (segment_id, segment_index, segment) from da_ts_table ) select node, status, segment_id, segment_index, ts_avg(ts_index(segment, 1)) as request_time, from_unixtime(segment_id / 1000, 'yyyy-MM-dd HH:mm:ss') AS datetime from segment_table INTO cos://us-south/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud STORED AS CSV jobId: 6101c002-6823-47ed-bc23-f2b9acaf1a59
sqlClient.logon()
job_status = sqlClient.wait_for_job(jobid)
if job_status == 'failed':
details = sqlClient.get_job(jobid)
print("Error: {}\nError Message: {}".format(details['error'], details['error_message']))
timeseries = sqlClient.get_result(jobid)
So now we get a good overview when the system reacts slower based on hours.
from pixiedust.display import *
display(timeseries)
The power of using this method is not only its readability, but most importantly, its reuseability. Take for example:
ts_segment_by_time(ts, 3600000, 3600000)
This statement could easily be changed to apply to very different transformations. For instance, getting the average request time when a status code 500 occurs:
ts_segment_by_anchor(ts, ts_anchor_index(ts_anchor_gte(500.0), 0), 1800000, 1800000)
or when a user wants to get the average request time during bursts of activity:
ts_segment_by_smoothed_silence(ts, 0.5, 2.0, 1800000)
The same can be said of performing reduction on a window. Take for example:
ts_avg(ts_index(segment, 1))
The above statement can be anything from TS_SUM
(sum over window) to TS_FFT
(fft over window)
sqlClient.logon()
sql = "with " + \
"da_ts_table as ( " + \
"select " + \
"_source.node as node, " + \
"_source._status as status, " + \
"time_series_da(_source._ts, array(_source._status, _source.request_time)) as ts " + \
"from {} STORED AS JSON " + \
"WHERE " + \
"_source._app = 'nginx-ingress' AND " + \
"_source._status IS NOT NULL AND " + \
"_year = 2019 AND " + \
"_dayofyear between 224 and 229 " + \
"group by _source.node, _source._status " + \
"), " + \
"error_ts_table as ( " + \
"select " + \
"node, " + \
"status, " + \
"ts_flatten(ts_segment_by_anchor(ts, ts_anchor_index(ts_anchor_gte(500.0), 0), 1800000, 1800000)) as (segment_id, segment_index, error_ts) " + \
"from da_ts_table " + \
") " + \
"select " + \
"node, " + \
"status, " + \
"segment_id, " + \
"segment_index, " + \
"ts_avg(ts_index(error_ts, 1)) as request_time, " + \
"from_unixtime(segment_id / 1000, 'yyyy-MM-dd HH:mm:ss') AS datetime " + \
"from error_ts_table INTO " + \
"{} STORED AS CSV"
sql = sql.format(logData, targetUrl)
print(format_sql(sql))
jobid = sqlClient.submit_sql(sql)
print("jobId: " + jobid)
WITH da_ts_table AS ( SELECT _source.node AS node, _source._status AS status, time_series_da(_source._ts, array(_source._status, _source.request_time)) AS ts FROM cos://us-geo/sql/LogDNA STORED AS JSON WHERE _source._app = 'nginx-ingress' AND _source._status IS NOT NULL AND _year = 2019 AND _dayofyear between 224 and 229 group by _source.node, _source._status ), error_ts_table as ( select node, status, ts_flatten(ts_segment_by_anchor(ts, ts_anchor_index(ts_anchor_gte(500.0), 0), 1800000, 1800000)) as (segment_id, segment_index, error_ts) from da_ts_table ) select node, status, segment_id, segment_index, ts_avg(ts_index(error_ts, 1)) as request_time, from_unixtime(segment_id / 1000, 'yyyy-MM-dd HH:mm:ss') AS datetime from error_ts_table INTO cos://us-south/cos-standard-6il/s3.us-south.cloud-object-storage.appdomain.cloud STORED AS CSV jobId: aa201750-5298-4bcf-8a01-2b3f4d4394d9
sqlClient.logon()
job_status = sqlClient.wait_for_job(jobid)
if job_status == 'failed':
details = sqlClient.get_job(jobid)
print("Error: {}\nError Message: {}".format(details['error'], details['error_message']))
timeseries500 = sqlClient.get_result(jobid)
Here we look at just the average response times of requests where a 500 occured which requires only a minor adjustment in the SQL statement seen above.
from pixiedust.display import *
display(timeseries500)
Congratulations! You have learned to