stumbled upon parallel processing while working on Chatgpt API requests automation.
Project details:
- Collect data from forums.oracle.com, manually scraped data from forums.oracle.com for category (Oracle Data base XE)
- Automate chatgpt single chat completion API calls from R
- Make atleast 3 requests per minute to chatgpt
Challenges faced:
- Data collection using RSelenium, was unsuccessful due to errors in going to next page in the pagination section.
So, had to collect data manually (took 60 mins to collect urls for all open posts :-))
1400 forums posts urls to be crawled and wrangled as an html page, is where the issue lies.
As normal for loops in R taking lot of time, switched to apply functions like lapply in base packages
performance improved significantly. however, the http requests are independent to each other;
ideally, parallel processing would be a good fit.
Let’s see the performance gains and the details of parallel processing in this post
Code optimizations:
- Make atleast 3 requests per minute to chatgpt [Garbage collection, became a cocern after I noticed RStudio hanged few times, while running the program for all 1400 chat completions]
- 1. growing data frame in size, instead of defining the predeterministic size (downside - R gets confused with memory allocation)
- 2. For loops in R are too slow, instead use apply functions from base package.
- 3. lapply functions with lot of intermediary variables. unnecessary variable usage inside the functions.
- 4. use multiple processes/sessions to use the multicore CPUs to the fullest
Below are code snippets to solve the problem statement:
# read each html page in Data folder and retreive post URLS from them
# get the post details for each URL and store it in a data frame
# feed these post details to chatgpt
library(readr)
library(dplyr)
library(httr)
library(rvest)
library(jsonlite)
library(foreach)
library(future.apply)
library(future)
library(promises)
source("collect_chatpgt.R") # package contains the chatgpt request API automation
pgurls_df <- data.frame ("url", "pagenum")
colnames(pgurls_df) <- c("url", "pagenum")
data.files <- list.files(path = getwd(), pattern = "*.html", full.names = TRUE)
for (i in 1:length(data.files)) {
read_html (read_file(file = data.files[i])) -> response_body_html
response_body_html %>% html_nodes(".rw-ListItem-primaryText a") %>% html_attr(name = "href") -> pgUrls
pgUrls %>% as.data.frame.character() %>% mutate(pagenum=i) -> pgUrls
colnames(pgUrls) <- c("url", "pagenum")
merge.data.frame (pgurls_df, pgUrls, by = intersect(names(pgurls_df), names(pgUrls)), all.x = TRUE, all.y = TRUE) -> pgurls_df
}
# remove unused objects from memory
rm(list=c("pgUrls", "response_body_html", "i"))
# remove duplicate urls from the pgurls_df, if any
df_unique <- pgurls_df[!duplicated(pgurls_df$url), ]
df_unique$gpt_response <- "gpt"
e_cnt =0
w_cnt = 0
nreq = 0
n = nrow(df_unique)
core_output = data.table::data.table('n' = 1:n
, 'url' = rep("", n)
, 'qa_title' = rep("", n)
, 'qa_attrs' = rep("", n)
, 'qa_body' = rep("", n)
, 'prompt_content' = rep("", n)
, 'chatgpt_response' = rep("", n))
for(i in 1:n){
core_output$url[i] = df_unique[i, "url"]
}
meta_output <- data.table::data.table('n' = 1:n
, 'url' = rep("", n)
, 'error' = rep("" , n)
, 'message' = rep ("" , n)
, 'status_code' = rep("", n)
)
for(i in 1:n){
meta_output$url[i] = df_unique[i, "url"]
}
collect_posts_data <- function(){
lapply(core_output$n, function(i){
as.character(df_unique[i, "url"]) -> gurl
core_output$url[i] = gurl
gurl_data <- get_url_data(gurl)
gurl_data_html <- content(gurl_data, as = "text") %>% read_html()
qatitle <- gurl_data_html %>% html_nodes(".ds-Question-text .ds-Question-title") %>% html_text()
qaattrs <- gurl_data_html %>% html_nodes(".ds-Question-text .ds-Question-attrs") %>% html_text2()
qabody <- gurl_data_html %>% html_nodes(".ds-Question-text .ds-Question-body") %>% html_text()
if(length(qatitle)>0) core_output$qa_title[i] = as.character(qatitle)
if(length(qaattrs)>0) core_output$qa_attrs[i] = as.character(qaattrs)
if(length(qabody)>0) core_output$qa_body[i] = as.character(qabody)
})
}
system.time({ collect_posts_data() })
plan(multisession, workers = parallel::detectCores())
# Define the future loop
future_lapply(seq_along(2:nrow(df_unique)), function(i) {
nreq = i
v_rpm <- 3 # requests per minute
v_tpm <- 40000 # token per minute
tryCatch({
if(w_cnt >0) {Sys.sleep(time = 40)} else {Sys.sleep(time=20)}
if (length(core_output$qa_body[i]) > 0) {
gpt_prompt <- paste0("Question title:\n", core_output$qa_title[i], "\nquestion details:", core_output$qa_body[i])
gpt_response <- make_req_gpt(gpt_prompt)
if(gpt_response$status_code != 200){
warning(paste0("Request completed with error. Code: ", gpt_response$status_code
, ", message: ", gpt_response$error$message))
meta_output$url[i] = gurl
meta_output$error[i] = gpt_response$error$message
meta_output$message[i] = gpt_response$error$message
meta_output$status_code[i] = gpt_response$status_code
}
else {
response_content <- parse_json(content(gpt_response, "text", encoding = "UTF-8"))$choices[[1]]$message$content
core_output$url[i] = gurl
core_output$prompt_content[i] = gpt_prompt
core_output$chatgpt_response[i] = response_content
}
print(paste0("gpt response - " , response_content))
}
},
error = function(e) {e_cnt = e_cnt+1
print(e)},
warning = function(w) {
w_cnt = w_cnt+1
print(w)}
, finally = print("in Finally section"))
}) -> gpt_responses
# Clean up
future::plan("sequential")
t2 <- Sys.time()
print(paste0("time elapsed:" , t2-t1)