ROOTPLOIT
Server: LiteSpeed
System: Linux in-mum-web1878.main-hosting.eu 5.14.0-570.21.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jun 11 07:22:35 EDT 2025 x86_64
User: u435929562 (435929562)
PHP: 7.4.33
Disabled: system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
Upload Files
File: //opt/go/pkg/mod/github.com/aws/[email protected]/service/s3/s3manager/upload_internal_test.go
//go:build go1.7
// +build go1.7

package s3manager

import (
	"bytes"
	"fmt"
	"io"
	"io/ioutil"
	random "math/rand"
	"net/http"
	"strconv"
	"sync"
	"sync/atomic"
	"testing"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/request"
	"github.com/aws/aws-sdk-go/awstesting/unit"
	"github.com/aws/aws-sdk-go/internal/sdkio"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/internal/s3testing"
)

const respBody = `<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadOutput>
   <Location>mockValue</Location>
   <Bucket>mockValue</Bucket>
   <Key>mockValue</Key>
   <ETag>mockValue</ETag>
</CompleteMultipartUploadOutput>`

type testReader struct {
	br *bytes.Reader
	m  sync.Mutex
}

func (r *testReader) Read(p []byte) (n int, err error) {
	r.m.Lock()
	defer r.m.Unlock()
	return r.br.Read(p)
}

func TestUploadByteSlicePool(t *testing.T) {
	cases := map[string]struct {
		PartSize      int64
		FileSize      int64
		Concurrency   int
		ExAllocations uint64
	}{
		"single part, single concurrency": {
			PartSize:      sdkio.MebiByte * 5,
			FileSize:      sdkio.MebiByte * 5,
			ExAllocations: 2,
			Concurrency:   1,
		},
		"multi-part, single concurrency": {
			PartSize:      sdkio.MebiByte * 5,
			FileSize:      sdkio.MebiByte * 10,
			ExAllocations: 2,
			Concurrency:   1,
		},
		"multi-part, multiple concurrency": {
			PartSize:      sdkio.MebiByte * 5,
			FileSize:      sdkio.MebiByte * 20,
			ExAllocations: 3,
			Concurrency:   2,
		},
	}

	for name, tt := range cases {
		t.Run(name, func(t *testing.T) {
			var p *recordedPartPool

			unswap := swapByteSlicePool(func(sliceSize int64) byteSlicePool {
				p = newRecordedPartPool(sliceSize)
				return p
			})
			defer unswap()

			sess := unit.Session.Copy()
			svc := s3.New(sess)
			svc.Handlers.Unmarshal.Clear()
			svc.Handlers.UnmarshalMeta.Clear()
			svc.Handlers.UnmarshalError.Clear()
			svc.Handlers.Send.Clear()
			svc.Handlers.Send.PushFront(func(r *request.Request) {
				if r.Body != nil {
					io.Copy(ioutil.Discard, r.Body)
				}

				r.HTTPResponse = &http.Response{
					StatusCode: 200,
					Body:       ioutil.NopCloser(bytes.NewReader([]byte(respBody))),
				}

				switch data := r.Data.(type) {
				case *s3.CreateMultipartUploadOutput:
					data.UploadId = aws.String("UPLOAD-ID")
				case *s3.UploadPartOutput:
					data.ETag = aws.String(fmt.Sprintf("ETAG%d", random.Int()))
				case *s3.CompleteMultipartUploadOutput:
					data.Location = aws.String("https://location")
					data.VersionId = aws.String("VERSION-ID")
				case *s3.PutObjectOutput:
					data.VersionId = aws.String("VERSION-ID")
				}
			})

			uploader := NewUploaderWithClient(svc, func(u *Uploader) {
				u.PartSize = tt.PartSize
				u.Concurrency = tt.Concurrency
			})

			expected := s3testing.GetTestBytes(int(tt.FileSize))
			_, err := uploader.Upload(&UploadInput{
				Bucket: aws.String("bucket"),
				Key:    aws.String("key"),
				Body:   &testReader{br: bytes.NewReader(expected)},
			})
			if err != nil {
				t.Errorf("expected no error, but got %v", err)
			}

			if v := atomic.LoadInt64(&p.recordedOutstanding); v != 0 {
				t.Fatalf("expected zero outsnatding pool parts, got %d", v)
			}

			gets, allocs := atomic.LoadUint64(&p.recordedGets), atomic.LoadUint64(&p.recordedAllocs)

			t.Logf("total gets %v, total allocations %v", gets, allocs)
			if e, a := tt.ExAllocations, allocs; a > e {
				t.Errorf("expected %v allocations, got %v", e, a)
			}
		})
	}
}

func TestUploadByteSlicePool_Failures(t *testing.T) {
	cases := map[string]struct {
		PartSize   int64
		FileSize   int64
		Operations []string
	}{
		"single part": {
			PartSize: sdkio.MebiByte * 5,
			FileSize: sdkio.MebiByte * 4,
			Operations: []string{
				"PutObject",
			},
		},
		"multi-part": {
			PartSize: sdkio.MebiByte * 5,
			FileSize: sdkio.MebiByte * 10,
			Operations: []string{
				"CreateMultipartUpload",
				"UploadPart",
				"CompleteMultipartUpload",
			},
		},
	}

	for name, tt := range cases {
		t.Run(name, func(t *testing.T) {
			for _, operation := range tt.Operations {
				t.Run(operation, func(t *testing.T) {
					var p *recordedPartPool

					unswap := swapByteSlicePool(func(sliceSize int64) byteSlicePool {
						p = newRecordedPartPool(sliceSize)
						return p
					})
					defer unswap()

					sess := unit.Session.Copy()
					svc := s3.New(sess)
					svc.Handlers.Unmarshal.Clear()
					svc.Handlers.UnmarshalMeta.Clear()
					svc.Handlers.UnmarshalError.Clear()
					svc.Handlers.Send.Clear()
					svc.Handlers.Send.PushFront(func(r *request.Request) {
						if r.Body != nil {
							io.Copy(ioutil.Discard, r.Body)
						}

						if r.Operation.Name == operation {
							r.Retryable = aws.Bool(false)
							r.Error = fmt.Errorf("request error")
							r.HTTPResponse = &http.Response{
								StatusCode: 500,
								Body:       ioutil.NopCloser(bytes.NewReader([]byte{})),
							}
							return
						}

						r.HTTPResponse = &http.Response{
							StatusCode: 200,
							Body:       ioutil.NopCloser(bytes.NewReader([]byte(respBody))),
						}

						switch data := r.Data.(type) {
						case *s3.CreateMultipartUploadOutput:
							data.UploadId = aws.String("UPLOAD-ID")
						case *s3.UploadPartOutput:
							data.ETag = aws.String(fmt.Sprintf("ETAG%d", random.Int()))
						case *s3.CompleteMultipartUploadOutput:
							data.Location = aws.String("https://location")
							data.VersionId = aws.String("VERSION-ID")
						case *s3.PutObjectOutput:
							data.VersionId = aws.String("VERSION-ID")
						}
					})

					uploader := NewUploaderWithClient(svc, func(u *Uploader) {
						u.Concurrency = 1
						u.PartSize = tt.PartSize
					})

					expected := s3testing.GetTestBytes(int(tt.FileSize))
					_, err := uploader.Upload(&UploadInput{
						Bucket: aws.String("bucket"),
						Key:    aws.String("key"),
						Body:   &testReader{br: bytes.NewReader(expected)},
					})
					if err == nil {
						t.Fatalf("expected error but got none")
					}

					if v := atomic.LoadInt64(&p.recordedOutstanding); v != 0 {
						t.Fatalf("expected zero outsnatding pool parts, got %d", v)
					}
				})
			}
		})
	}
}

func TestUploadByteSlicePoolConcurrentMultiPartSize(t *testing.T) {
	var (
		pools []*recordedPartPool
		mtx   sync.Mutex
	)

	unswap := swapByteSlicePool(func(sliceSize int64) byteSlicePool {
		mtx.Lock()
		defer mtx.Unlock()
		b := newRecordedPartPool(sliceSize)
		pools = append(pools, b)
		return b
	})
	defer unswap()

	sess := unit.Session.Copy()
	svc := s3.New(sess)
	svc.Handlers.Unmarshal.Clear()
	svc.Handlers.UnmarshalMeta.Clear()
	svc.Handlers.UnmarshalError.Clear()
	svc.Handlers.Send.Clear()
	svc.Handlers.Send.PushFront(func(r *request.Request) {
		if r.Body != nil {
			io.Copy(ioutil.Discard, r.Body)
		}

		r.HTTPResponse = &http.Response{
			StatusCode: 200,
			Body:       ioutil.NopCloser(bytes.NewReader([]byte(respBody))),
		}

		switch data := r.Data.(type) {
		case *s3.CreateMultipartUploadOutput:
			data.UploadId = aws.String("UPLOAD-ID")
		case *s3.UploadPartOutput:
			data.ETag = aws.String(fmt.Sprintf("ETAG%d", random.Int()))
		case *s3.CompleteMultipartUploadOutput:
			data.Location = aws.String("https://location")
			data.VersionId = aws.String("VERSION-ID")
		case *s3.PutObjectOutput:
			data.VersionId = aws.String("VERSION-ID")
		}
	})

	uploader := NewUploaderWithClient(svc, func(u *Uploader) {
		u.PartSize = 5 * sdkio.MebiByte
		u.Concurrency = 2
	})

	var wg sync.WaitGroup
	for i := 0; i < 2; i++ {
		wg.Add(2)
		go func() {
			defer wg.Done()
			expected := s3testing.GetTestBytes(int(15 * sdkio.MebiByte))
			_, err := uploader.Upload(&UploadInput{
				Bucket: aws.String("bucket"),
				Key:    aws.String("key"),
				Body:   &testReader{br: bytes.NewReader(expected)},
			})
			if err != nil {
				t.Errorf("expected no error, but got %v", err)
			}
		}()
		go func() {
			defer wg.Done()
			expected := s3testing.GetTestBytes(int(15 * sdkio.MebiByte))
			_, err := uploader.Upload(&UploadInput{
				Bucket: aws.String("bucket"),
				Key:    aws.String("key"),
				Body:   &testReader{br: bytes.NewReader(expected)},
			}, func(u *Uploader) {
				u.PartSize = 6 * sdkio.MebiByte
			})
			if err != nil {
				t.Errorf("expected no error, but got %v", err)
			}
		}()
	}

	wg.Wait()

	if e, a := 3, len(pools); e != a {
		t.Errorf("expected %v, got %v", e, a)
	}

	for _, p := range pools {
		if v := atomic.LoadInt64(&p.recordedOutstanding); v != 0 {
			t.Fatalf("expected zero outsnatding pool parts, got %d", v)
		}

		t.Logf("total gets %v, total allocations %v",
			atomic.LoadUint64(&p.recordedGets),
			atomic.LoadUint64(&p.recordedAllocs))
	}
}

func BenchmarkPools(b *testing.B) {
	cases := []struct {
		PartSize      int64
		FileSize      int64
		Concurrency   int
		ExAllocations uint64
	}{
		0: {
			PartSize:    sdkio.MebiByte * 5,
			FileSize:    sdkio.MebiByte * 5,
			Concurrency: 1,
		},
		1: {
			PartSize:    sdkio.MebiByte * 5,
			FileSize:    sdkio.MebiByte * 10,
			Concurrency: 1,
		},
		2: {
			PartSize:    sdkio.MebiByte * 5,
			FileSize:    sdkio.MebiByte * 20,
			Concurrency: 2,
		},
		3: {
			PartSize:    sdkio.MebiByte * 5,
			FileSize:    sdkio.MebiByte * 250,
			Concurrency: 10,
		},
	}

	sess := unit.Session.Copy()
	svc := s3.New(sess)
	svc.Handlers.Unmarshal.Clear()
	svc.Handlers.UnmarshalMeta.Clear()
	svc.Handlers.UnmarshalError.Clear()
	svc.Handlers.Send.Clear()
	svc.Handlers.Send.PushFront(func(r *request.Request) {
		if r.Body != nil {
			io.Copy(ioutil.Discard, r.Body)
		}

		r.HTTPResponse = &http.Response{
			StatusCode: 200,
			Body:       ioutil.NopCloser(bytes.NewReader([]byte{})),
		}

		switch data := r.Data.(type) {
		case *s3.CreateMultipartUploadOutput:
			data.UploadId = aws.String("UPLOAD-ID")
		case *s3.UploadPartOutput:
			data.ETag = aws.String(fmt.Sprintf("ETAG%d", random.Int()))
		case *s3.CompleteMultipartUploadOutput:
			data.Location = aws.String("https://location")
			data.VersionId = aws.String("VERSION-ID")
		case *s3.PutObjectOutput:
			data.VersionId = aws.String("VERSION-ID")
		}
	})

	pools := map[string]func(sliceSize int64) byteSlicePool{
		"sync.Pool": func(sliceSize int64) byteSlicePool {
			return newSyncSlicePool(sliceSize)
		},
		"custom": func(sliceSize int64) byteSlicePool {
			return newMaxSlicePool(sliceSize)
		},
	}

	for name, poolFunc := range pools {
		b.Run(name, func(b *testing.B) {
			unswap := swapByteSlicePool(poolFunc)
			defer unswap()
			for i, c := range cases {
				b.Run(strconv.Itoa(i), func(b *testing.B) {
					uploader := NewUploaderWithClient(svc, func(u *Uploader) {
						u.PartSize = c.PartSize
						u.Concurrency = c.Concurrency
					})

					expected := s3testing.GetTestBytes(int(c.FileSize))
					b.ResetTimer()
					_, err := uploader.Upload(&UploadInput{
						Bucket: aws.String("bucket"),
						Key:    aws.String("key"),
						Body:   &testReader{br: bytes.NewReader(expected)},
					})
					if err != nil {
						b.Fatalf("expected no error, but got %v", err)
					}
				})
			}
		})
	}
}