diff --git a/docker-compose.yml b/docker-compose.yml index ff92c756..5aa4fbf8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -106,7 +106,7 @@ services: volumes: - minio_data:/data healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:${MINIO_API_INTERNAL_PORT}/minio/health/live"] + test: [ "CMD", "curl", "-f", "http://localhost:${MINIO_API_INTERNAL_PORT}/minio/health/live" ] interval: 30s timeout: 20s retries: 3 @@ -223,7 +223,7 @@ services: cpus: '0.2' # Health check using existing /healthz endpoint healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:${THE_MONKEYS_GATEWAY_INTERNAL_PORT}/healthz"] + test: [ "CMD", "curl", "-f", "http://localhost:${THE_MONKEYS_GATEWAY_INTERNAL_PORT}/healthz" ] interval: 30s timeout: 10s retries: 3 @@ -262,7 +262,7 @@ services: cpus: '0.1' # Health check is built into Dockerfile, but can override here healthcheck: - test: ["CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_AUTHZ_INTERNAL_PORT}"] + test: [ "CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_AUTHZ_INTERNAL_PORT}" ] interval: 30s timeout: 10s retries: 3 @@ -284,10 +284,10 @@ services: - db-migrations - the_monkeys_db volumes: - - ./profile:/app/profile # Mount existing production files where code expects them - - ./blogs:/app/blogs # Mount existing production files where code expects them - - ./local_profiles:/app/local_profiles # Mount local profiles sync directory - - ./local_blogs:/app/local_blogs # Mount local blogs sync directory + - ./profile:/app/profile # Mount existing production files where code expects them + - ./blogs:/app/blogs # Mount existing production files where code expects them + - ./local_profiles:/app/local_profiles # Mount local profiles sync directory + - ./local_blogs:/app/local_blogs # Mount local blogs sync directory ports: - "${MICROSERVICES_STORAGE_PORT}:${MICROSERVICES_STORAGE_INTERNAL_PORT}" restart: unless-stopped @@ -304,7 +304,7 @@ services: cpus: '0.2' # Health check healthcheck: - test: ["CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_STORAGE_INTERNAL_PORT}"] + test: [ "CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_STORAGE_INTERNAL_PORT}" ] interval: 30s timeout: 10s retries: 3 @@ -343,7 +343,7 @@ services: cpus: '0.2' # Health check healthcheck: - test: ["CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_USER_INTERNAL_PORT}"] + test: [ "CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_USER_INTERNAL_PORT}" ] interval: 30s timeout: 10s retries: 3 @@ -383,94 +383,94 @@ services: cpus: '0.2' # Health check healthcheck: - test: ["CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_BLOG_INTERNAL_PORT}"] + test: [ "CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_BLOG_INTERNAL_PORT}" ] interval: 30s timeout: 10s retries: 3 start_period: 15s - the_monkeys_notification: - container_name: "the_monkeys_notification" - build: - context: . - dockerfile: microservices/the_monkeys_notification/Dockerfile - args: - NOTIFICATION_SERVICE_PORT: ${MICROSERVICES_NOTIFICATION_PORT} - # For production, you can switch to: - # dockerfile: microservices/the_monkeys_notification/Dockerfile.distroless - networks: - - monkeys-network - depends_on: - - rabbitmq - - db-migrations - - the_monkeys_db - - elasticsearch-node1 - - the_monkeys_blog - - the_monkeys_user - ports: - - "${MICROSERVICES_NOTIFICATION_PORT}:${MICROSERVICES_NOTIFICATION_INTERNAL_PORT}" - restart: unless-stopped - env_file: - - .env - # Security: Run as non-root user - user: "1001:1001" - # Resource limits for production - deploy: - resources: - limits: - memory: 256M - cpus: '0.7' - reservations: - memory: 128M - cpus: '0.2' - # Health check - healthcheck: - test: ["CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_NOTIFICATION_INTERNAL_PORT}"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 15s + # the_monkeys_notification: + # container_name: "the_monkeys_notification" + # build: + # context: . + # dockerfile: microservices/the_monkeys_notification/Dockerfile + # args: + # NOTIFICATION_SERVICE_PORT: ${MICROSERVICES_NOTIFICATION_PORT} + # # For production, you can switch to: + # # dockerfile: microservices/the_monkeys_notification/Dockerfile.distroless + # networks: + # - monkeys-network + # depends_on: + # - rabbitmq + # - db-migrations + # - the_monkeys_db + # - elasticsearch-node1 + # - the_monkeys_blog + # - the_monkeys_user + # ports: + # - "${MICROSERVICES_NOTIFICATION_PORT}:${MICROSERVICES_NOTIFICATION_INTERNAL_PORT}" + # restart: unless-stopped + # env_file: + # - .env + # # Security: Run as non-root user + # user: "1001:1001" + # # Resource limits for production + # deploy: + # resources: + # limits: + # memory: 256M + # cpus: '0.7' + # reservations: + # memory: 128M + # cpus: '0.2' + # # Health check + # healthcheck: + # test: [ "CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_NOTIFICATION_INTERNAL_PORT}" ] + # interval: 30s + # timeout: 10s + # retries: 3 + # start_period: 15s - the_monkeys_ai_engine: - container_name: "the_monkeys_ai_engine" - build: - context: . - dockerfile: microservices/the_monkeys_ai/Dockerfile - args: - AI_ENGINE_PORT: ${MICROSERVICES_AI_ENGINE_PORT} - AI_ENGINE_HEALTH_PORT: ${MICROSERVICES_AI_ENGINE_HEALTH_PORT} - # For production, you can switch to: - # dockerfile: microservices/the_monkeys_ai/Dockerfile.distroless - networks: - - monkeys-network - depends_on: - - rabbitmq - - the_monkeys_db - ports: - - "${MICROSERVICES_AI_ENGINE_PORT}:${MICROSERVICES_AI_ENGINE_INTERNAL_PORT}" - - "${MICROSERVICES_AI_ENGINE_HEALTH_PORT}:${MICROSERVICES_AI_ENGINE_HEALTH_INTERNAL_PORT}" # Health check endpoint (gRPC port + 1000) - restart: unless-stopped - env_file: - - .env - environment: - - PYTHONPATH=/app - # Security: Run as non-root user - user: "1001:1001" - # Resource limits for production - deploy: - resources: - limits: - memory: 512M - cpus: '1.0' - reservations: - memory: 256M - cpus: '0.3' - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:${MICROSERVICES_AI_ENGINE_HEALTH_INTERNAL_PORT}/health"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 60s + # the_monkeys_ai_engine: + # container_name: "the_monkeys_ai_engine" + # build: + # context: . + # dockerfile: microservices/the_monkeys_ai/Dockerfile + # args: + # AI_ENGINE_PORT: ${MICROSERVICES_AI_ENGINE_PORT} + # AI_ENGINE_HEALTH_PORT: ${MICROSERVICES_AI_ENGINE_HEALTH_PORT} + # # For production, you can switch to: + # # dockerfile: microservices/the_monkeys_ai/Dockerfile.distroless + # networks: + # - monkeys-network + # depends_on: + # - rabbitmq + # - the_monkeys_db + # ports: + # - "${MICROSERVICES_AI_ENGINE_PORT}:${MICROSERVICES_AI_ENGINE_INTERNAL_PORT}" + # - "${MICROSERVICES_AI_ENGINE_HEALTH_PORT}:${MICROSERVICES_AI_ENGINE_HEALTH_INTERNAL_PORT}" # Health check endpoint (gRPC port + 1000) + # restart: unless-stopped + # env_file: + # - .env + # environment: + # - PYTHONPATH=/app + # # Security: Run as non-root user + # user: "1001:1001" + # # Resource limits for production + # deploy: + # resources: + # limits: + # memory: 512M + # cpus: '1.0' + # reservations: + # memory: 256M + # cpus: '0.3' + # healthcheck: + # test: ["CMD", "curl", "-f", "http://localhost:${MICROSERVICES_AI_ENGINE_HEALTH_INTERNAL_PORT}/health"] + # interval: 30s + # timeout: 10s + # retries: 3 + # start_period: 60s the_monkeys_activity: container_name: "the_monkeys_activity" @@ -503,7 +503,7 @@ services: cpus: '0.1' # Health check is built into Dockerfile, but can override here healthcheck: - test: ["CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_ACTIVITY_INTERNAL_PORT}"] + test: [ "CMD", "/usr/local/bin/grpc_health_probe", "-addr=localhost:${MICROSERVICES_ACTIVITY_INTERNAL_PORT}" ] interval: 30s timeout: 10s retries: 3 @@ -519,4 +519,4 @@ volumes: elasticsearch-data1: driver: local minio_data: - driver: local \ No newline at end of file + driver: local diff --git a/microservices/the_monkeys_authz/internal/services/services.go b/microservices/the_monkeys_authz/internal/services/services.go index 0848ae86..887c4a5d 100644 --- a/microservices/the_monkeys_authz/internal/services/services.go +++ b/microservices/the_monkeys_authz/internal/services/services.go @@ -295,8 +295,12 @@ func (as *AuthzSvc) trackAuthActivity(user *models.TheMonkeysUser, action string color_depth = -1 } - screen_width, _ := strconv.ParseInt(strings.Split(clientInfo.ScreenResolution, "x")[0], 10, 16) - screen_height, _ := strconv.ParseInt(strings.Split(clientInfo.ScreenResolution, "x")[1], 10, 16) + resolution := strings.Split(clientInfo.ScreenResolution, "x") + var screen_width, screen_height int64 + if len(resolution) == 2 { + screen_width, _ = strconv.ParseInt(resolution[0], 10, 16) + screen_height, _ = strconv.ParseInt(resolution[1], 10, 16) + } timezone_offset, _ := strconv.ParseInt(clientInfo.TimezoneOffset, 10, 16) // Create comprehensive ClientInfo for activity tracking diff --git a/microservices/the_monkeys_blog/internal/database/v2_queries.go b/microservices/the_monkeys_blog/internal/database/v2_queries.go index bbf05421..da363404 100644 --- a/microservices/the_monkeys_blog/internal/database/v2_queries.go +++ b/microservices/the_monkeys_blog/internal/database/v2_queries.go @@ -12,15 +12,17 @@ import ( ) func (es *elasticsearchStorage) SaveBlog(ctx context.Context, blog map[string]interface{}) (*esapi.Response, error) { + blogId, _ := blog["blog_id"].(string) + bs, err := json.Marshal(blog) if err != nil { - es.log.Errorf("DraftABlog: cannot marshal the blog, error: %v", err) + es.log.Errorf("SaveBlog: cannot marshal the blog %s, error: %v", blogId, err) return nil, err } - document := strings.NewReader(string(bs)) + jsonStr := string(bs) + document := strings.NewReader(jsonStr) - blogId := blog["blog_id"].(string) req := esapi.IndexRequest{ Index: constants.ElasticsearchBlogIndex, DocumentID: blogId, @@ -29,16 +31,17 @@ func (es *elasticsearchStorage) SaveBlog(ctx context.Context, blog map[string]in insertResponse, err := req.Do(ctx, es.client) if err != nil { - es.log.Errorf("DraftABlog: error while indexing blog, error: %+v", err) + es.log.Errorf("SaveBlog: error while indexing blog %s, error: %+v", blogId, err) return insertResponse, err } if insertResponse.IsError() { - err = fmt.Errorf("DraftABlog: error indexing blog, response: %+v", insertResponse) + err = fmt.Errorf("SaveBlog: error indexing blog %s, response: %+v", blogId, insertResponse) es.log.Error(err) return insertResponse, err } + es.log.Infof("SaveBlog: successfully indexed blog %s", blogId) return insertResponse, nil } diff --git a/microservices/the_monkeys_blog/internal/services/service_v2.go b/microservices/the_monkeys_blog/internal/services/service_v2.go index 9a6d5fef..28e2a501 100644 --- a/microservices/the_monkeys_blog/internal/services/service_v2.go +++ b/microservices/the_monkeys_blog/internal/services/service_v2.go @@ -42,42 +42,48 @@ func (blog *BlogService) DraftBlogV2(stream grpc.BidiStreamingServer[anypb.Any, // Convert the struct to a map for further processing req := reqStruct.AsMap() - blog.logger.Debugw("draft blog v2", "blog_id", req["blog_id"], "owner", req["owner_account_id"]) - req["is_draft"] = true - blogId, _ := req["blog_id"].(string) ownerAccountId, _ := req["owner_account_id"].(string) + + req["is_draft"] = true + var ip, client string - if v, ok := req["Ip"]; ok && v != nil { + // Check both "Ip" and "ip" for backward/forward compatibility + if v, ok := req["ip"]; ok && v != nil { + ip, _ = v.(string) + } else if v, ok := req["Ip"]; ok && v != nil { ip, _ = v.(string) - } else { - ip = "" } - if v, ok := req["Client"]; ok && v != nil { + + if v, ok := req["client"]; ok && v != nil { + client, _ = v.(string) + } else if v, ok := req["Client"]; ok && v != nil { client, _ = v.(string) - } else { - client = "" } + tagsInterface, ok := req["tags"].([]interface{}) if !ok { - blog.logger.Errorf("Tags field is not of type []interface{}") + blog.logger.Debugf("Tags field is missing or not of type []interface{}, using default") tagsInterface = []interface{}{"untagged"} } tags := make([]string, len(tagsInterface)) for i, v := range tagsInterface { tags[i], ok = v.(string) if !ok { - blog.logger.Errorf("Tag value is not of type string") + blog.logger.Errorf("Tag value at index %d is not of type string: %v", i, v) return status.Errorf(codes.InvalidArgument, "Tag value is not of type string") } } - exists, _, _ := blog.osClient.DoesBlogExist(stream.Context(), req["blog_id"].(string)) + exists, _, err := blog.osClient.DoesBlogExist(stream.Context(), blogId) + if err != nil { + blog.logger.Errorf("Error checking blog existence for %s: %v", blogId, err) + } + if exists { - blog.logger.Debugw("update blog v2", "blog_id", blogId) - // Additional logic for existing blog handling + blog.logger.Debugw("DraftBlogV2: updating existing blog", "blog_id", blogId) } else { - blog.logger.Debugw("create blog v2", "blog_id", blogId, "owner", ownerAccountId) + blog.logger.Infow("DraftBlogV2: creating new blog", "blog_id", blogId, "owner", ownerAccountId) bx, err := json.Marshal(models.InterServiceMessage{ AccountId: ownerAccountId, BlogId: blogId, @@ -91,24 +97,28 @@ func (blog *BlogService) DraftBlogV2(stream grpc.BidiStreamingServer[anypb.Any, return status.Errorf(codes.Internal, "Something went wrong while drafting a blog") } if len(tags) == 0 { - req["Tags"] = []string{"untagged"} + req["tags"] = []string{"untagged"} } go func() { err := blog.qConn.PublishMessage(blog.config.RabbitMQ.Exchange, blog.config.RabbitMQ.RoutingKeys[1], bx) if err != nil { - blog.logger.Errorf("failed to publish blog create message to RabbitMQ: exchange=%s, routing_key=%s, error=%v", blog.config.RabbitMQ.Exchange, blog.config.RabbitMQ.RoutingKeys[1], err) + blog.logger.Errorf("failed to publish blog create message to RabbitMQ: error=%v", err) } }() go blog.trackBlogActivity(ownerAccountId, constants.BLOG_CREATE, "blog", blogId, req) } - _, err = blog.osClient.SaveBlog(stream.Context(), req) + saveResp, err := blog.osClient.SaveBlog(stream.Context(), req) if err != nil { - blog.logger.Errorf("Cannot store draft into opensearch: %v", err) + blog.logger.Errorf("DraftBlogV2: Cannot store draft into opensearch for blog %s: %v", blogId, err) return status.Errorf(codes.Internal, "Failed to store draft: %v", err) } + if saveResp.IsError() { + blog.logger.Errorf("DraftBlogV2: OpenSearch save error for blog %s: %v", blogId, saveResp.String()) + } + // // Respond back to the client // resp := &pb.BlogResponse{ // Blog: req.Blog, @@ -134,7 +144,7 @@ func (blog *BlogService) DraftBlogV2(stream grpc.BidiStreamingServer[anypb.Any, } func (blog *BlogService) BlogsOfFollowingAccounts(req *pb.FollowingAccounts, stream pb.BlogService_BlogsOfFollowingAccountsServer) error { - blog.logger.Debugf("Received request for blogs of following accounts: %v", req.AccountIds) + blog.logger.Debugf("BlogsOfFollowingAccounts: Received request for following accounts: %v", req.AccountIds) if len(req.AccountIds) == 0 { return status.Errorf(codes.InvalidArgument, "No account ids provided") @@ -310,22 +320,28 @@ func (blog *BlogService) GetBlogsBySlice(req *pb.GetBlogsBySliceReq, stream pb.B } func (blog *BlogService) GetBlog(ctx context.Context, req *pb.BlogReq) (*anypb.Any, error) { - blog.logger.Debugf("Received request for blog: %v", req) + blogId := req.GetBlogId() + accountId := req.GetAccountId() + isDraft := req.GetIsDraft() // Track blog reading activity action := constants.READ_BLOG - if req.IsDraft { + if isDraft { action = constants.READ_DRAFT } + go blog.trackBlogActivity(accountId, action, "blog", blogId, req) - go blog.trackBlogActivity(req.AccountId, action, "blog", req.BlogId, req) - - blogData, err := blog.osClient.GetBlogByBlogId(ctx, req.BlogId, req.IsDraft) + blogData, err := blog.osClient.GetBlogByBlogId(ctx, blogId, isDraft) if err != nil { - blog.logger.Errorf("Error fetching blog: %v", err) + blog.logger.Errorf("GetBlog: Error fetching blog %s from OpenSearch: %v", blogId, err) return nil, status.Errorf(codes.Internal, "Error fetching blog: %v", err) } + if blogData == nil { + blog.logger.Warnf("GetBlog: Blog %s not found in OpenSearch", blogId) + return nil, status.Errorf(codes.NotFound, "Blog not found") + } + delete(blogData, "action") delete(blogData, "Action") delete(blogData, "Ip") @@ -333,11 +349,10 @@ func (blog *BlogService) GetBlog(ctx context.Context, req *pb.BlogReq) (*anypb.A blogBytes, err := json.Marshal(blogData) if err != nil { - blog.logger.Errorf("Error marshalling blogs: %v", err) - return nil, status.Errorf(codes.Internal, "Error marshalling blogs: %v", err) + blog.logger.Errorf("GetBlog: Error marshalling blog %s: %v", blogId, err) + return nil, status.Errorf(codes.Internal, "Error marshalling blog: %v", err) } - // Pack the message into an Any message return &anypb.Any{ TypeUrl: "the-monkeys/the-monkeys/apis/serviceconn/gateway_blog/pb.BlogResponse", Value: blogBytes, diff --git a/microservices/the_monkeys_gateway/internal/blog/routes.go b/microservices/the_monkeys_gateway/internal/blog/routes.go index 3c4a8967..1d7fbc62 100644 --- a/microservices/the_monkeys_gateway/internal/blog/routes.go +++ b/microservices/the_monkeys_gateway/internal/blog/routes.go @@ -871,34 +871,21 @@ func (asc *BlogServiceClient) handleTextMessage(msg []byte, conn *websocket.Conn } } - // Save the incoming message for debugging purposes - // os.WriteFile("draft.json", msg, 0644) - // Step 1: Unmarshal into a generic map - var genericMap map[string]interface{} - if err := json.Unmarshal(msg, &genericMap); err != nil { - return fmt.Errorf("error unmarshalling message into generic map: %w", err) - } - - // Step 3: Marshal back into JSON - updatedJSON, err := json.Marshal(genericMap) - if err != nil { - return fmt.Errorf("error marshalling updated JSON: %w", err) - } - - // Step 4: Unmarshal into pb.DraftBlogRequest var draftBlog map[string]interface{} - if err := json.Unmarshal(updatedJSON, &draftBlog); err != nil { - return fmt.Errorf("error unmarshalling updated JSON into pb.DraftBlogRequest: %w", err) + if err := json.Unmarshal(msg, &draftBlog); err != nil { + asc.log.Errorf("Error unmarshalling websocket message for blog %s: %v", id, err) + return fmt.Errorf("error unmarshalling message: %w", err) } + // Enrich the blog data with metadata draftBlog["blog_id"] = id draftBlog["ip"] = clientInfo.IPAddress draftBlog["client"] = clientInfo.ClientType draftBlog["session_id"] = clientInfo.SessionID draftBlog["user_agent"] = clientInfo.UserAgent draftBlog["referrer"] = clientInfo.Referrer - draftBlog["platform"] = int32(platform) // Convert enum to int32 for protobuf Any + draftBlog["platform"] = int32(platform) // Only set the action and log the initial creation or update once if !*initialLogDone { @@ -909,26 +896,32 @@ func (asc *BlogServiceClient) handleTextMessage(msg []byte, conn *websocket.Conn // Convert draftBlog to google.protobuf.Any draftStruct, err := structpb.NewStruct(draftBlog) if err != nil { + asc.log.Errorf("Error converting blog map to structpb.Struct for blog %s: %v", id, err) return fmt.Errorf("error converting draftBlog to structpb.Struct: %w", err) } // Wrap *structpb.Struct in *anypb.Any anyMsg, err := anypb.New(draftStruct) if err != nil { + asc.log.Errorf("Error wrapping blog struct in anypb.Any for blog %s: %v", id, err) return fmt.Errorf("error wrapping structpb.Struct in anypb.Any: %w", err) } // Send the draft blog to the gRPC service if err := stream.Send(anyMsg); err != nil { + asc.log.Errorf("Error sending draft blog to gRPC stream for blog %s: %v", id, err) return fmt.Errorf("error sending draft blog to gRPC stream: %w", err) } // Receive the response from the gRPC service grpcResp, err := stream.Recv() if err != nil { + asc.log.Errorf("Error receiving response from gRPC stream for blog %s: %v", id, err) return fmt.Errorf("error receiving response from gRPC stream: %w", err) } + asc.log.Debugf("Received gRPC response for blog %s: %+v", id, grpcResp) + // Create success response response := map[string]interface{}{ "type": "success",