diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index bbc5fc5fe..dfa5a8442 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -964,6 +964,19 @@ class DeltaSharingRestClient( httpRequest } + + import org.apache.http.HttpEntity + private def getEntityDebugStr(entity: HttpEntity): String = { + s"isRepeatable:${entity.isRepeatable},isChunked:${entity.isChunked}," + + s"getContentLength:${entity.getContentLength},isStreaming:${entity.isStreaming}" + } + + private def debugoutput(str: String): Unit = { + // scalastyle:off println + Console.println(s"----[linzhou]----$str") + logError(s"----[linzhou]----$str") + } + /** * Send the http request and return the table version in the header if any, and the response * content. @@ -981,12 +994,18 @@ class DeltaSharingRestClient( // Reset dsQueryId before calling RetryUtils, and before prepareHeaders. dsQueryId = Some(UUID.randomUUID().toString().split('-').head) RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) { + val startTime = System.currentTimeMillis() val profile = profileProvider.getProfile val response = client.execute( getHttpHost(profile.endpoint), prepareHeaders(httpRequest), HttpClientContext.create() ) + + def elapsedTime: Long = { + System.currentTimeMillis() - startTime + } + try { val status = response.getStatusLine() val entity = response.getEntity() @@ -1003,10 +1022,18 @@ class DeltaSharingRestClient( new InputStreamReader(new BoundedInputStream(input), UTF_8) ) var line: Option[String] = None - while ({ - line = Option(reader.readLine()); line.isDefined + + debugoutput(s"----[linzhou]----before while:" + + s"${(System.currentTimeMillis() - startTime)}ms, ${getEntityDebugStr(entity)}") + while ( { + line = Option(reader.readLine()); + line.isDefined }) { - lineBuffer += line.get + val a = line.get + debugoutput(s"----[linzhou]----in while: ($elapsedTime)ms, newLine:[$a]") + debugoutput(s"----[linzhou]----in while: ($elapsedTime)ms, " + + s"debug:[${getEntityDebugStr(entity)}]") + lineBuffer += a } lineBuffer.toList } @@ -1017,10 +1044,20 @@ class DeltaSharingRestClient( logError(error) lineBuffer += error lineBuffer.toList + case otherE: Exception => + val error = s"Request to delta sharing server failed due tooo ${otherE}." + logError(error) + throw otherE } finally { + debugoutput(s"----[linzhou]----in finally: ($elapsedTime)ms, " + + s"status:${response.getStatusLine}") input.close() } } + debugoutput(s"----[linzhou]----after: ($elapsedTime)ms, " + + s"status:${response.getStatusLine}") + debugoutput(s"----[linzhou]----after: ($elapsedTime)ms, " + + s"entity:${getEntityDebugStr(response.getEntity)}") val statusCode = status.getStatusCode if (!(statusCode == HttpStatus.SC_OK ||