Skip to content

Commit 09cf58e

Browse files
committed
file copy to also use modern streaming
1 parent 5a83c6f commit 09cf58e

4 files changed

Lines changed: 29 additions & 9 deletions

File tree

cmd/server/main-server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,6 @@ func shutdownActivityUpdate() {
384384

385385
func createMainWshClient() {
386386
rpc := wshserver.GetMainRpcClient()
387-
wshfs.RpcClient = rpc
388387
wshutil.DefaultRouter.RegisterTrustedLeaf(rpc, wshutil.DefaultRoute)
389388
wps.Broker.SetClient(wshutil.DefaultRouter)
390389
localInitialEnv := envutil.PruneInitialEnv(envutil.SliceToMap(os.Environ()))
@@ -393,6 +392,8 @@ func createMainWshClient() {
393392
localConnWsh := wshutil.MakeWshRpc(wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, remoteImpl, "conn:local")
394393
go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName)
395394
wshutil.DefaultRouter.RegisterTrustedLeaf(localConnWsh, wshutil.MakeConnectionRouteId(wshrpc.LocalConnName))
395+
wshfs.RpcClient = localConnWsh
396+
wshfs.RpcClientRouteId = wshutil.MakeConnectionRouteId(wshrpc.LocalConnName)
396397
}
397398

398399
func grabAndRemoveEnvVars() error {

cmd/wsh/cmd/wshcmd-connserver.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func runListener(listener net.Listener, router *wshutil.WshRouter) {
183183
}
184184
}
185185

186-
func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, error) {
186+
func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, string, error) {
187187
routeId := wshutil.MakeConnectionRouteId(connServerConnName)
188188
rpcCtx := wshrpc.RpcContext{
189189
RouteId: routeId,
@@ -196,7 +196,7 @@ func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName stri
196196

197197
connServerClient := wshutil.MakeWshRpc(rpcCtx, wshremote.MakeRemoteRpcServerImpl(os.Stdout, router, bareClient, false, connServerInitialEnv, sockName), routeId)
198198
router.RegisterTrustedLeaf(connServerClient, routeId)
199-
return connServerClient, nil
199+
return connServerClient, routeId, nil
200200
}
201201

202202
func serverRunRouter() error {
@@ -236,11 +236,12 @@ func serverRunRouter() error {
236236
sockName := getRemoteDomainSocketName()
237237

238238
// setup the connserver rpc client first
239-
client, err := setupConnServerRpcClientWithRouter(router, sockName)
239+
client, bareRouteId, err := setupConnServerRpcClientWithRouter(router, sockName)
240240
if err != nil {
241241
return fmt.Errorf("error setting up connserver rpc client: %v", err)
242242
}
243243
wshfs.RpcClient = client
244+
wshfs.RpcClientRouteId = bareRouteId
244245

245246
log.Printf("trying to get JWT public key")
246247

@@ -360,11 +361,12 @@ func serverRunRouterDomainSocket(jwtToken string) error {
360361
log.Printf("got JWT public key")
361362

362363
// now setup the connserver rpc client
363-
client, err := setupConnServerRpcClientWithRouter(router, sockName)
364+
client, bareRouteId, err := setupConnServerRpcClientWithRouter(router, sockName)
364365
if err != nil {
365366
return fmt.Errorf("error setting up connserver rpc client: %v", err)
366367
}
367368
wshfs.RpcClient = client
369+
wshfs.RpcClientRouteId = bareRouteId
368370

369371
// set up the local domain socket listener for local wsh commands
370372
unixListener, err := MakeRemoteUnixListener()
@@ -402,6 +404,7 @@ func serverRunNormal(jwtToken string) error {
402404
return err
403405
}
404406
wshfs.RpcClient = RpcClient
407+
wshfs.RpcClientRouteId = RpcClientRouteId
405408
WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn)
406409
go func() {
407410
defer func() {

pkg/remote/fileshare/wshfs/wshfs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030

3131
// This needs to be set by whoever initializes the client, either main-server or wshcmd-connserver
3232
var RpcClient *wshutil.WshRpc
33+
var RpcClientRouteId string
3334

3435
func parseConnection(ctx context.Context, path string) (*connparse.Connection, error) {
3536
conn, err := connparse.ParseURIAndReplaceCurrentHost(ctx, path)

pkg/wshrpc/wshremote/wshremote_file.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818

1919
"github.com/wavetermdev/waveterm/pkg/panichandler"
2020
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
21-
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
2221
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
2322
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
2423
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
@@ -32,7 +31,6 @@ const RemoteFileTransferSizeLimit = 32 * 1024 * 1024
3231

3332
var DisableRecursiveFileOpts = true
3433

35-
3634
func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error {
3735
innerFilesEntries, err := os.ReadDir(path)
3836
if err != nil {
@@ -315,8 +313,25 @@ func (impl *ServerImpl) RemoteFileCopyCommand(ctx context.Context, data wshrpc.C
315313
}
316314
defer destFile.Close()
317315

318-
streamChan := wshclient.RemoteStreamFileCommand(wshfs.RpcClient, wshrpc.CommandRemoteStreamFileData{Path: srcConn.Path}, &wshrpc.RpcOpts{Timeout: opts.Timeout, Route: wshutil.MakeConnectionRouteId(srcConn.Host)})
319-
if err = fsutil.ReadFileStreamToWriter(readCtx, streamChan, destFile); err != nil {
316+
if wshfs.RpcClientRouteId == "" {
317+
return false, fmt.Errorf("stream broker route id not available for file copy")
318+
}
319+
writerRouteId := wshutil.MakeConnectionRouteId(srcConn.Host)
320+
reader, streamMeta := wshfs.RpcClient.StreamBroker.CreateStreamReader(wshfs.RpcClientRouteId, writerRouteId, 256*1024)
321+
log.Printf("RemoteFileCopyCommand: readroute=%s writeroute=%s", streamMeta.ReaderRouteId, streamMeta.WriterRouteId)
322+
defer reader.Close()
323+
go func() {
324+
<-readCtx.Done()
325+
reader.Close()
326+
}()
327+
streamData := wshrpc.CommandRemoteFileStreamData{
328+
Path: srcConn.Path,
329+
StreamMeta: *streamMeta,
330+
}
331+
if _, err = wshclient.RemoteFileStreamCommand(wshfs.RpcClient, streamData, &wshrpc.RpcOpts{Route: writerRouteId}); err != nil {
332+
return false, fmt.Errorf("error starting file stream for %q: %w", data.SrcUri, err)
333+
}
334+
if _, err = io.Copy(destFile, reader); err != nil {
320335
return false, fmt.Errorf("error copying file %q to %q: %w", data.SrcUri, data.DestUri, err)
321336
}
322337

0 commit comments

Comments
 (0)