key: cord-0045638-qjkxgeu7 authors: Mu, Weimin; Jin, Zongze; Zhu, Weilin; Liu, Fan; Li, Zhenzhen; Zhu, Ziyuan; Wang, Weiping title: QEScalor: Quantitative Elastic Scaling Framework in Distributed Streaming Processing date: 2020-05-26 journal: Computational Science - ICCS 2020 DOI: 10.1007/978-3-030-50371-0_11 sha: 4a436ed7e4105e741a0cf569a11a68a13c3e51b4 doc_id: 45638 cord_uid: qjkxgeu7 Recently, researchers usually use the elastic scaling techniques as a powerful means of the distributed stream processing systems to deal with the high-speed data stream which arrives continuously and fluctuates constantly. The existing methods allocate the same amount of resources to the instances of the same operator, but they ignore the correlation between the operator performance and resource provision. It may lead to the waste of the resources caused by the over-provision or the huge overhead of the scheduling caused by the under-provision. To solve the above problems, we present a quantitative elastic scaling framework, named QEScalor, to allocate resources for the operator instances quantitatively based on the actual performance requirements. The experimental results show that compared with the existing works, the QEScalor can not only achieve resource-efficient elastic scaling with lower cost, but also it can enhance the total performance of the DSPAs. Recently, the distributed stream processing systems (DSPSs) [1] [2] [3] [4] offer a powerful means to extract the valuable information from the data streams in time. We usually use the directed acyclic graph (DAG) [5] to model the data stream processing application (DSPA) in DSPSs. In the DAG, each vertex represents a kind of operations, named as the operator, and each edge represents a data stream between two operators. At the run time, the DSPS initiates a certain number of operator instances for each operator and deploy them on the runtime environment. Considering the constant fluctuating data stream, we adopt the elastic scaling in the DSPSs, which adjusts the number of the operators dynamically, to satisfy the QoS requirements. There have been many researches on the elastic scaling. Zacheilas et al. [6] adjusts the number of operators based on the state transition graph. Hidalgo et al. [7] evaluates the processing power of the operator through the benchmarking, and adjusts the number of the operator instances based on the threshold and the workload prediction. Wei et al. [8] only adjusts the CPU frequency of the virtual machines as the workload fluctuates to reduce the energy cost. Marangozova-Martin et al. [9] presents the method to allocate three levels of resources to the operator instances including virtual machines, processes and threads. The above methods allocate the same amount of resources to the instances of the same operator, but they ignore the correlation between the operator performance and resource provision. Actually, the unreasonable resources provision for the operator instance will cause some severe problems. For example, over-provision will result in a waste of resources. Besides, the under-provision means that the DSPSs will create a lot of instances to achieve high processing performance, which results in the huge overhead of the scheduling and the state transition. In this paper, we present a quantitative elastic scaling framework, named QEScalor, to allocate the resources for the operator instances quantitatively based on the actual performance requirements. This framework firstly builds the operator performance and resource provision model (OPRPM), then it generates the low-cost elastic scaling plan based on the OPRPM. The contributions of this paper are as follows: -We use the QEScalor, which first considers the correlation between the operator performance and resource provision, to enhance the performance of the resource provision. -We propose an online algorithm, named DSA. It learns the correlation samples of the operator performance and resource provision based on the gradient strategy and re-sampling mechanism. Besides, we use the random forest regression model (RFR) [10] to build the OPRPM to get the suitable resource provision options for the operator performance requirement. -We present a quantitative and cost-based elastic scaling algorithm, QCESA. It refers to the prediction of the workload [11] and the operator performance [12] to generate the low-cost scaling plan based on the OPRPM to achieve the resource-efficient elastic scaling to improve the performance. -We implement the QEScalor as a key part of our DataDock [11] . The experiment results show that our QEScalor can enhance the performance with the lower scaling cost on the real-world datasets. We organize the rest of our paper as follows. Section 2 describes the design of QEScalor. Section 3 shows the experimental results of our framework. Finally, Sect. 4 concludes our paper. In this subsection, we describe our framework QEScalor in detail in Fig. 1 , which contains three modules: the online operator performance sampler (OOPSer), the operator performance and resource provision modeler (OPRPMer) and the quantitative elasticity controller (QECer). As is shown in Fig. 1 , the QEScalor is an important module in the DataDock. In our previous work [11] , we present our distributed stream processing system, DataDock, mainly aiming at processing the heterogeneous data in real-time. We use the OOPSer to learn the correlation samples of the operator performance and resource provision online. Then we use these samples as the input of the OPRPMer to build the operator performance and resource provision model (OPRPM). At last, we use the QECer to adjust the scaling plan according to the workload prediction of the BGElasor [11] , the operator performance prediction of the OMOPredictor [12] and the OPRPM. As is shown in Fig. 1 , we take the DSPA including the new operator O 1 as an example to describe the work process of the QEScalor. The process contains three main stages. -Correlation Samples Online Learning. When we use the QECer to execute the DSPA including the new operator O 1 at time t 0 , it registers the O 1 on the OOPSer. The QECer starts enough instances of the O 1 with the default resources provision based on the cost-based elastic-scaling algorithm [11] and a single sampling instance with the RP A depended on the OOPSer. Then the OOPSer interacts with the Stream Distributor (SD) to allocate the workload between the normal instances and the sampling instance. The OOPSer continues to collect the operator performance and resource utilization metrics until the current sampling process convergences at time t i . The above sampling process will continue several rounds based on the gradient strategy until the operator performance no longer increases with the growth of resources. For example, the OOPSer learns the correlation samples of resource group RP A, RP B and RP C from time t 0 to t k . -Operator Performance and Resource Provision Modeling. During the sampling process, we use the OOPSer to invoke the OPRPMer to build the OPRPM using the RFR, when it completes the learning process of one kind of source provision. -Quantitative Elastic Scaling. We run the QECer periodically. It takes the workload prediction of the BGELasor and the operator performance prediction of the OMOPredictor as the input and makes the quantitative scaling decision based on the OPRPM. As is shown in Fig. 1 , from time t 0 to t j , the OPRPMer has learned the OPRPM of resource provision RP A and RP B. To process the workload from t j to t k , the QECer allocates two normal instances with the resource provision RP A and one instance with the resource provision RP B. And from t k to t l , the QECer allocates three instances with the resource provision RP A, RP B and RP C respectively. We adopt the OOPSer to collect the correlation samples of the operator performance and resource provision to build the OPRPM. We focus on the two types of resources: the CPU and the memory. We do not consider the network bandwidth in our work, because the network bandwidth is more sufficient and cheaper than the CPU and the memory in the data center. Therefore, we only consider the correlation samples learning of two types of operators [12] : the computeintensive Operator (COperator) and the compute-intensive operator mixed with the memory I/O (CMOperator). We propose the Dynamic-Sampling-Algorithm (DSA) to collect the correlation samples online. We show it in Algorithm 1. The sampling process consists of three steps: Firstly, we create the sampling operator instance with specific resource provision. Secondly, we do not stress test on the sampling operator instance until the performance of the operator instance converges. Thirdly, we continuously collect the correlation samples during the test. Since we spend some time in stress testing, it will take a long time to complete the sampling. In order to speed upsampling, we dynamically adjust the sampling step according to the gradient of the performance change of the sampling operator instance. Meanwhile, we use the re-sampling method to add sampling points when the operator performance fluctuates to improve accuracy. We are processing the online workload while sampling with the normal running of the DSPAs. We dynamically allocate the workload between the normal instances and the single sampling instance. It can reduce the time and resource overhead obviously compared with running the sampling alone. When we complete the sampling, we get the correlation sample set We use the OPRPMer to build the model of the operator performance and the resource provision, with which we can predict the operator performance based on the given resource provision. The correlation between operator performance and resource provision is commonly complex and nonlinear. The linear regression model can not capture the latent features of the correlation well, resulting in bad prediction. Besides, in our scenario, the correlation samples set CS O is commonly small. Using the single nonlinear regression model, like the SVR [13] , leads to overfitting easily. The ensemble learning model can improve the robustness of prediction by integrating many weak classifiers, which is more suitable for small sample learning. We adopt the random forest regression (RFR) model in the OPRPMer to capture the nonlinear correlation between the operator performance and the resource provision. According to the experiments, compared to the boosting models, such as the Adaboost [14] , GBDT [15] and XGBoost [16] , the RFR model performs better. Because the bootstrap strategy adopted by RFR model can avoid overfitting effectively when the sample set is small. We take the correlation sample set CS O learned by the OOPSer as the input to build the model in the OPRPMer. When invoked by the QECer, the OPRP-Mer takes r = (cpu, mem) as input to get the operator performance prediction p o corresponding to the r. In this section, we build the QECer, which can ensure the end-to-end latency with the minimum elastic-scaling cost. It contains two parts: the Cost Model and Quantitative & Cost-based Elastic Scaling Algorithm (QCESA). We build a cost model to evaluate the total cost of all elasticscaling actions for an operator from the current epoch S to the future epoch F. The total cost W o (Ins), the startup times C u o·t (Ins) and the shutdown times C d o·t (Ins) are defined as: where p r o is the cost of system resources used by the single instance with resource r for the operator o. |Ins t o·r | is the instance number of operator o with resource r at time t. In addition, P erf o·r denotes the performance of each operator and r∈Res |Ins t o·r |P erf o·r denotes the total performance of operator o at time t. W orkload t is the workload at epoch t. In order to satisfy the end-to-end latency, we ensure that the performance of each operator is not less than the workload. In other word, r∈Res |Ins t o·r |P erf o·r ≥ W orkload t at any time. And p u o is the startup-cost of a single o instance. p d o is the shutdown-cost of a single o instance. QCESA. To solve this expression min(W o ), we propose the Quantitative and Cost-based Elastic Scaling Algorithm (QCESA). We show it in Algorithm 2. The QCESA considers not only the cost of instance startup and shutdown, but also the correlation of operator performance and resource provision. We use the QCESA to balance these parts of the cost to guarantee a low cost. At first, we use the QCESA to compute the max workload workload max during t ∈ [t C , t F ]. Then use it to calculate all candidates at all time t ∈ [t C , t F ]. Each candidate is a combination of instances with different resource provision and instance number, of which the total performance P erf t cand·total ∈ [W orkload t , W orkload max ]. At last, we use dynamic programming to calculate the minimal cost. Settings. Our experiments run on Kubernetes (K8S) cluster, which we use as the Resource Manager on the DataDock, including eight servers. The version of K8S is 1.14.1. There are two types of servers in the K8S cluster: two GPU servers and six CPU servers. Each GPU server comprises 36 cores Intel Xeon CPU E5-2697 v4 2.30 GHz, 256 GB memory, two NVIDIA GeForce GTX 1060ti cards, and 500 GB disks. Each CPU server comprises 36 cores Intel Xeon CPU E5-2697 v4 2.30 GHz, 256 GB memory, and 500 GB disks. We use the GPU servers to run the JobManager, conducting the training and evaluation. We adopt the CPU servers to run the Task Manager, in which the operator instance runs. Besides, we conduct the evaluation of the OPRPMer with sklearn 0.22.1 running on python 3.7. Table 1 , we show our datasets and the intermediate results of our model at different stages. Firstly, we present the We use the real online workload processed by the DataDock in a day as the original dataset (OriWL-1day). In OOPser, we use two sampling algorithms which are FSSA and DSA to sample OriWL-1day. F1, F2, F3, F4 and F5 denote different steps of FSSA. A, B, C and D represent that the original dataset is processed by these operators. CO1 denotes CPU operator and CMO1 denotes CPU-Memory operator. In OPRPMer, we use DSA-A/B/C/D as the train set and use F1-A/B/C/D as the test set. Then we obtain the output of the random forest regression (RFR), DSA-RFR-A/B/C/D. In QECer, we should evaluate the system performance. From OPRMER, we use DSA-RFR-A/B/C/D as the input. From OMOPredictor, we adopt OP-PM-30 as the input, which represents the operator performance on Datadock online for 30 days. For the input of the BGElasor, we use OriWL-60days-FlowStat which represents the flow statistics for 60 days of data load on the DataDock online. F1 In our algorithm, we guarantee the latency to reduce the total cost. Thus, we evaluate the Quantitative Elasticity Controller from two aspects: the total cost and the end-to-end latency guarantee. We use the Cost-Balance-Algorithm (CBA) [11] as the baseline algorithm. The CBA considers the running cost and the operation cost. Compared to the CBA, the QCESA takes the operator resource provision into account. The performance of QCESA depends on the sampling of OOPSer and the predicted results of OPRPMer. For the OOPSer, we compare our method, DSA, with the Fixed-Step-Sampling-Algorithm (FSSA) to demonstrate that DSA is more accurate in the sampling stage to enhance scheduling accuracy and reduce the cost. For evaluating the OPRPMer, we compare the random forest regression model (RFR) with the following methods: Adaboost, GBDT and XGBoost, to demonstrate that RFR is more suitable for the current application scenarios. It can get more accurate prediction results and affect the overall performance of scheduling. Total Cost. In this part, we take the CMO1 as an example to compare by using the total cost of elastic scaling. We use the workload prediction to generate the scaling plan for the CMO1 and calculate the total cost. Moreover, to evaluate the effectiveness of the QCESA, we use four different resource provisions to test the CBA respectively. The four resource provision granularities are as follows: 1) r 1 = (cpu = 0.6 * core, mem = 33.2 MB), 2) r 2 = (cpu = 1.2 * core, mem = 33.3 MB), 3) r 3 = (1.8 * core, mem = 33.5 MB), 4) r 4 = (cpu = 2.4 * core, mem = 33.8 MB). In Fig. 2 , we can find that most of the time, the total cost of the QCESA is less than that of the CBA. Besides, as the system runs, the performance of the QCESA is becoming much higher than the CBA. End-to-End Latency Guarantee. In this part, we focus on the end-to-end latency guarantee. We still take the CMO1 as an example to run on the DataDock and monitor the end-to-end latency. In Fig. 3 , we can see that both the QCESA and the CBA can guarantee that the performance of the operator is no less than the workload. And the end-to-end latency always stays stable and satisfies the requirement of the QoS. The reason is that both algorithms start the instances before the workload rises. Thus they can process the workload timely. To measure the impact of sampling, we compare the DSA with the Fixed-Step-Sampling-Algorithm (FSSA) in OOPSer. We run the FSSA and the DSA separately to collect the correlation samples of the four operators. For each operator, we run the DSA with the sampling step set to 1 and use the sampling result as the baseline. Besides, we also run the FSSA with the sampling step set to 2, 3, 4 and 5 as the contrast evaluation. After we get all correlation samples of four operators with the FSSA and the DSA, we use the OPRPMer to build the OPRPMs. Then, we predict the operator performance based on the minimum sampling step using the OPRPMs. We use the Root Mean Square Errors (RMSE), the Mean Absolute Errors (MAE) and the Sampling Number (SN) to evaluate the effectiveness of the DSA. where x i is the operator performance of baseline, andx i is the predicted operator performance. As is shown in Table 2 , we can observe that the performance of the FSSA is not stable. When the step of the FSSA is 2, the CO1 and the CMO1 get the best performance. But when the step of the FSSA is 3, the CO2 and the CMO2 get the best performance. And the DSA performs well for all the four operators. Its performance is close to or even reaches the best performance. Moreover, the DSA has fewer sampling numbers when reaching the same performance. It benefits from the dynamical sampling strategy. We show the sampling result of the CO1 and the CMO2 in Fig. 4 . We can see where the performance fluctuates obviously, the sampling step is close to the minimum sampling step. Instead, when the performance changes smoothly, the DSA only uses a few sampling points to capture the main characteristics of performance changes. To enhance the total performance, we should select the better prediction method for the OPRPMer. So we compare the random forest regression model with the following methods: Adaboost, GBDT and XGBoost, to demonstrate the effectiveness of the RFR model in this scenario. We use the RMSE and the MAE to evaluate the performances of each model. There are several hyper-parameters in these approaches, we use the grid search and 10-folds cross-validation to select the key hyper-parameters. Besides, we normalize all the input to the range [0,1] using the Min-Max scaler. We repeat the experiment 10 times for each model to reduce the random experimental error and take the average of the whole test results as the final result. As for the RFR, we set bootstrap = T rue, criterion = mse , max featur− es = auto , min samples leaf = 1, min samples split = 2, n estimators = 100. As for the SVR, we set kernel = rbf , gamma = scale , C = 1.0. As for the Adaboost, we set base estimator = N one, learning rate = 1.0, loss = linear , n estimators = 50. As for the GBDT, we set n estimators = 100, crit− erion = f riedman mse , max features = N one, min samples leaf = 1. As for the XGBoost, we set booster = gbtree , learning rate = 0.1, max depth = 3, n estimators = 100. Table 3 shows that the effectiveness of the ensemble learning is significantly better than the SVR. Because in our scenario, the size of the correlation sample set is smaller, the advantage of the ensemble learning is more prominent. Besides, there is not much difference between the Adaboost, GBDT, and XGBoost. However, compared to the above boosting models, the RFR performs better. Because the RFR model adopts the bootstrap strategy, it can effectively prevent overfitting when the size of the sample set is small. In this paper, we present a quantitative elastic scaling framework, named QEScalor, to allocate resources for the operator instances quantitatively based on the actual performance requirements. It contains three key modules: the OOPSer, the OPRRMer and the QECer. Firstly, we use the OOPSer to learn the correlation samples of the operator performance and resource provision online. Then we use these samples as the input of the OPRPMer to build the operator performance and resource provision model (OPRPM) by using the random forest regression model. At last, we use the QECer to adjust the scaling plan according to the real workload fluctuation. The experimental results show that, compared with the state-of-the-art methods, the QEScalor is better on the realworld datasets. And we can address the problem which ignores the correlation between the operator performance and resource provision. TelegraphCQ: continuous dataflow processing The design of the borealis stream processing engine S4: distributed stream computing platform Apache flink TM : stream and batch processing in a single engine Elastic complex event processing exploiting prediction Self-adaptive processing graph with operator fission for elastic stream processing Pec: Proactive elastic collaborative resource scheduling in data stream processing Multi-level elasticity for data stream processing Random forests BGElasor: elastic-scaling framework for distributed streaming processing with deep neural network OMOPredictor: an online multi-step operator performance prediction framework in distributed streaming processing Support vector regression machines A desicion-theoretic generalization of on-line learning and an application to boosting Greedy function approximation: A gradient boosting machine XGBoost: a scalable tree boosting system Acknowledgements. This work is supported by the National Key Research and Development Plan (2018YFC0825101).